first commit

This commit is contained in:
yumoqing 2026-01-26 21:17:15 +08:00
commit 846f2d68fd
7 changed files with 567 additions and 0 deletions

0
README.md Normal file
View File

218
dagflow.py Normal file
View File

@ -0,0 +1,218 @@
# -*- coding: utf-8 -*-
"""
Workflow Engine v1.0
===================
Features:
- FlowDefinition / FlowInstance separation
- YAML DSL for flow definition
- Conditional transitions
- Concurrent nodes (fork / join)
- Subflow support
- End node semantics
This is a **complete, minimal, coherent reference implementation**.
No framework binding (aiohttp/sqlor) yet engine core only.
"""
import yaml
import uuid
from typing import Dict, List, Set, Optional
# -----------------------------
# Definition layer
# -----------------------------
class NodeDefinition:
def __init__(self, id: str, type: str, config: dict | None = None):
self.id = id
self.type = type # task / decision / join / subflow / end
self.config = config or {}
class EdgeDefinition:
def __init__(self, source: str, target: str, condition: str | None = None):
self.source = source
self.target = target
self.condition = condition
class FlowDefinition:
def __init__(self, id: str, start: str,
nodes: Dict[str, NodeDefinition],
edges: List[EdgeDefinition]):
self.id = id
self.start = start
self.nodes = nodes
self.edges = edges
def outgoing(self, node_id: str) -> List[EdgeDefinition]:
return [e for e in self.edges if e.source == node_id]
class FlowDefinitionLoader:
@staticmethod
def from_yaml(text: str) -> FlowDefinition:
data = yaml.safe_load(text)
nodes = {
nid: NodeDefinition(nid, v['type'], v)
for nid, v in data['nodes'].items()
}
edges = [EdgeDefinition(e['from'], e['to'], e.get('when'))
for e in data.get('edges', [])]
return FlowDefinition(
id=data['id'],
start=data['start'],
nodes=nodes,
edges=edges
)
# -----------------------------
# Instance layer
# -----------------------------
class NodeExecution:
def __init__(self, instance_id: str, node_id: str, input_ctx: dict):
self.instance_id = instance_id
self.node_id = node_id
self.input_ctx = input_ctx
self.output_ctx: Optional[dict] = None
self.status = 'running' # running / success / failed
self.error: Optional[str] = None
class FlowInstance:
def __init__(self, flow_def: FlowDefinition, ctx: dict | None = None):
self.id = uuid.uuid4().hex
self.flow_def = flow_def
self.ctx = ctx or {}
self.active_nodes: Set[str] = {flow_def.start}
self.status = 'running'
self.executions: List[NodeExecution] = []
self.completed_nodes: Set[str] = set()
# -----------------------------
# Node executors
# -----------------------------
class BaseNodeExecutor:
def run(self, instance: FlowInstance, node_def: NodeDefinition) -> Optional[dict]:
raise NotImplementedError
class TaskNodeExecutor(BaseNodeExecutor):
def run(self, instance, node_def):
# Placeholder: real impl should call skill / function
return {}
class DecisionNodeExecutor(BaseNodeExecutor):
def run(self, instance, node_def):
return {}
class JoinNodeExecutor(BaseNodeExecutor):
def run(self, instance, node_def):
wait_for = set(node_def.config.get('wait_for', []))
if not wait_for.issubset(instance.completed_nodes):
return None # block
return {}
class SubFlowNodeExecutor(BaseNodeExecutor):
def run(self, instance, node_def):
# Simplified: real impl would create & step child FlowInstance
return {}
class EndNodeExecutor(BaseNodeExecutor):
def run(self, instance, node_def):
return {}
EXECUTORS = {
'task': TaskNodeExecutor(),
'decision': DecisionNodeExecutor(),
'join': JoinNodeExecutor(),
'subflow': SubFlowNodeExecutor(),
'end': EndNodeExecutor(),
}
# -----------------------------
# Engine
# -----------------------------
class FlowEngine:
def step(self, instance: FlowInstance):
if instance.status != 'running':
return
next_active: Set[str] = set()
for node_id in list(instance.active_nodes):
node_def = instance.flow_def.nodes[node_id]
executor = EXECUTORS[node_def.type]
execution = NodeExecution(instance.id, node_id, instance.ctx.copy())
result = executor.run(instance, node_def)
if result is None:
# blocked (join / subflow)
next_active.add(node_id)
continue
execution.output_ctx = result
execution.status = 'success'
instance.executions.append(execution)
instance.completed_nodes.add(node_id)
# merge ctx
instance.ctx.update(result)
# compute transitions
for edge in instance.flow_def.outgoing(node_id):
if edge.condition:
if not eval(edge.condition, {}, {'ctx': instance.ctx}):
continue
next_active.add(edge.target)
instance.active_nodes = next_active
# end check
if instance.active_nodes and all(
instance.flow_def.nodes[n].type == 'end'
for n in instance.active_nodes
):
instance.status = 'finished'
# -----------------------------
# Example
# -----------------------------
if __name__ == '__main__':
yaml_text = """
id: demo_flow
start: start
nodes:
start:
type: task
end:
type: end
edges:
- from: start
to: end
"""
flow_def = FlowDefinitionLoader.from_yaml(yaml_text)
instance = FlowInstance(flow_def, ctx={'hello': 'world'})
engine = FlowEngine()
while instance.status == 'running':
engine.step(instance)
print('finished:', instance.ctx)

271
dagflow/dagflow.py Normal file
View File

@ -0,0 +1,271 @@
# -*- coding: utf-8 -*-
"""
Workflow Engine v1.1 (sqlor-backed)
=================================
This version adds:
- Persistent tables designed per table.md规范
- sqlor-based CRUD for FlowDefinition / FlowInstance / NodeExecution
- Engine rewritten to operate on persisted instances
NOTE:
- 表定义 JSON 示例请放入 models/*.json
- 本文件假设 sqlor / ServerEnv 已可用
"""
# ---------------------------------------------------------------------
# 表定义models/*.json——按 table.md 规范
# ---------------------------------------------------------------------
# models/flow_definition.json
FLOW_DEFINITION_TABLE = {
"summary": [{
"name": "flow_definition",
"title": "流程定义",
"primary": "id",
"catelog": "entity"
}],
"fields": [
{"name": "id", "title": "定义ID", "type": "str", "length": 32, "nullable": "no"},
{"name": "name", "title": "流程名称", "type": "str", "length": 128},
{"name": "version", "title": "版本", "type": "str", "length": 32},
{"name": "dsl", "title": "YAML DSL", "type": "text"},
{"name": "created_at", "title": "创建时间", "type": "timestamp"}
],
"indexes": [
{"name": "idx_flow_def_name", "idxtype": "index", "idxfields": ["name"]}
]
}
# models/flow_instance.json
FLOW_INSTANCE_TABLE = {
"summary": [{
"name": "flow_instance",
"title": "流程实例",
"primary": "id",
"catelog": "entity"
}],
"fields": [
{"name": "id", "title": "实例ID", "type": "str", "length": 32, "nullable": "no"},
{"name": "flow_def_id", "title": "流程定义ID", "type": "str", "length": 32},
{"name": "status", "title": "状态", "type": "str", "length": 16},
{"name": "ctx", "title": "上下文(JSON)", "type": "text"},
{"name": "active_nodes", "title": "当前节点(JSON)", "type": "text"},
{"name": "created_at", "title": "创建时间", "type": "timestamp"}
],
"indexes": [
{"name": "idx_flow_inst_def", "idxtype": "index", "idxfields": ["flow_def_id"]}
]
}
# models/node_execution.json
NODE_EXECUTION_TABLE = {
"summary": [{
"name": "node_execution",
"title": "节点执行记录",
"primary": "id",
"catelog": "relation"
}],
"fields": [
{"name": "id", "title": "执行ID", "type": "str", "length": 32, "nullable": "no"},
{"name": "instance_id", "title": "流程实例ID", "type": "str", "length": 32},
{"name": "node_id", "title": "节点ID", "type": "str", "length": 64},
{"name": "input_ctx", "title": "输入上下文", "type": "text"},
{"name": "output_ctx", "title": "输出上下文", "type": "text"},
{"name": "status", "title": "状态", "type": "str", "length": 16},
{"name": "error", "title": "错误信息", "type": "text"},
{"name": "created_at", "title": "执行时间", "type": "timestamp"}
],
"indexes": [
{"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]}
]
}
# models/subflow_instance.json
SUBFLOW_INSTANCE_TABLE = {
"summary": [{
"name": "subflow_instance",
"title": "子流程实例",
"primary": "id",
"catelog": "relation"
}],
"fields": [
{"name": "id", "title": "子流程ID", "type": "str", "length": 32, "nullable": "no"},
{"name": "parent_instance_id", "title": "父流程实例ID", "type": "str", "length": 32},
{"name": "parent_node_id", "title": "父节点ID", "type": "str", "length": 64},
{"name": "child_instance_id", "title": "子流程实例ID", "type": "str", "length": 32},
{"name": "status", "title": "状态", "type": "str", "length": 16},
{"name": "created_at", "title": "创建时间", "type": "timestamp"}
],
"indexes": [
{"name": "idx_subflow_parent", "idxtype": "index", "idxfields": ["parent_instance_id"]}
]
}
],
"indexes": [
{"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]}
]
}
# ---------------------------------------------------------------------
# Engine implementation (sqlor)
# ---------------------------------------------------------------------
import yaml
import json
from sqlor.dbpools import get_sor_context
from ahserver.serverenv import ServerEnv
class FlowEngine:
"""Persistent workflow engine"""
async def create_definition(self, name: str, version: str, dsl_text: str):
env = ServerEnv()
async with (env, 'workflow') as sor:
flow_def_id = env.uuid()
await sor.C('flow_definition', {
'id': flow_def_id,
'name': name,
'version': version,
'dsl': dsl_text
})
return flow_def_id
async def create_instance(self, flow_def_id: str, ctx: dict | None = None):
env = ServerEnv()
async with (env, 'workflow') as sor:
inst_id = env.uuid()
await sor.C('flow_instance', {
'id': inst_id,
'flow_def_id': flow_def_id,
'status': 'running',
'ctx': json.dumps(ctx or {}),
'active_nodes': json.dumps([])
})
return inst_id
async def step(self, instance_id: str):
env = ServerEnv()
async with (env, 'workflow') as sor:
rows = await sor.R('flow_instance', {'id': instance_id})
if not rows:
return
inst = rows[0]
if inst['status'] != 'running':
return
defs = await sor.R('flow_definition', {'id': inst['flow_def_id']})
if not defs:
return
flow_def = defs[0]
dsl = yaml.safe_load(flow_def['dsl'])(flow_def['dsl'])
ctx = json.loads(inst['ctx'])
active_nodes = set(json.loads(inst['active_nodes']))
if not active_nodes:
active_nodes = {dsl['start']}
next_nodes = set()
for node_id in active_nodes:
node_def = dsl['nodes'][node_id]
ntype = node_def['type']
# --- SubFlow handling ---
if ntype == 'subflow':
rows = await sor.R('subflow_instance', {
'parent_instance_id': instance_id,
'parent_node_id': node_id
})
# 解析 input / output mapping
input_map = node_def.get('input', {})
output_map = node_def.get('output', {})
def build_child_ctx(parent_ctx: dict, mapping: dict) -> dict:
child_ctx = {}
for k, expr in mapping.items():
# expr like: ctx.xxx.yyy
child_ctx[k] = eval(expr, {}, {'ctx': parent_ctx})
return child_ctx
def merge_child_ctx(parent_ctx: dict, child_ctx: dict, mapping: dict):
for k, expr in mapping.items():
# expr like: ctx.xxx
target = expr.replace('ctx.', '')
parent_ctx[target] = child_ctx.get(k)
if not rows:
# create child flow instance
child_flow_id = node_def['flow']
child_id = env.uuid()
child_ctx = build_child_ctx(ctx, input_map)
await sor.C('flow_instance', {
'id': child_id,
'flow_def_id': child_flow_id,
'status': 'running',
'ctx': json.dumps(child_ctx),
'active_nodes': json.dumps([])
})
await sor.C('subflow_instance', {
'id': env.uuid(),
'parent_instance_id': instance_id,
'parent_node_id': node_id,
'child_instance_id': child_id,
'status': 'running'
})
next_nodes.add(node_id)
continue
sub = rows[0]
child_rows = await sor.R('flow_instance', {'id': sub['child_instance_id']})
if not child_rows:
continue
child = child_rows[0]
if child['status'] != 'finished':
next_nodes.add(node_id)
continue
# merge ctx by output mapping
child_ctx = json.loads(child['ctx'])
merge_child_ctx(ctx, child_ctx, output_map)
await sor.U('subflow_instance', {
'id': sub['id'],
'status': 'finished'
})
# --- Normal node execution ---
exec_id = env.uuid()
await sor.C('node_execution', {
'id': exec_id,
'instance_id': instance_id,
'node_id': node_id,
'input_ctx': json.dumps(ctx),
'status': 'success'
})
for edge in dsl.get('edges', []):
if edge['from'] != node_id:
continue
cond = edge.get('when')
if cond and not eval(cond, {}, {'ctx': ctx}):
continue
next_nodes.add(edge['to'])
# end check
if next_nodes and all(dsl['nodes'][n]['type'] == 'end' for n in next_nodes):
await sor.U('flow_instance', {
'id': instance_id,
'status': 'finished',
'active_nodes': json.dumps(list(next_nodes))
})
else:
await sor.U('flow_instance', {
'id': instance_id,
'active_nodes': json.dumps(list(next_nodes))
})

View File

@ -0,0 +1,18 @@
{
"summary": [{
"name": "flow_definition",
"title": "流程定义",
"primary": "id",
"catelog": "entity"
}],
"fields": [
{"name": "id", "title": "定义ID", "type": "str", "length": 32, "nullable": "no"},
{"name": "name", "title": "流程名称", "type": "str", "length": 128},
{"name": "version", "title": "版本", "type": "str", "length": 32},
{"name": "dsl", "title": "YAML DSL", "type": "text"},
{"name": "created_at", "title": "创建时间", "type": "timestamp"}
],
"indexes": [
{"name": "idx_flow_def_name", "idxtype": "index", "idxfields": ["name"]}
]
}

19
models/flow_instance.json Normal file
View File

@ -0,0 +1,19 @@
{
"summary": [{
"name": "flow_instance",
"title": "流程实例",
"primary": "id",
"catelog": "entity"
}],
"fields": [
{"name": "id", "title": "实例ID", "type": "str", "length": 32, "nullable": "no"},
{"name": "flow_def_id", "title": "流程定义ID", "type": "str", "length": 32},
{"name": "status", "title": "状态", "type": "str", "length": 16},
{"name": "ctx", "title": "上下文(JSON)", "type": "text"},
{"name": "active_nodes", "title": "当前节点(JSON)", "type": "text"},
{"name": "created_at", "title": "创建时间", "type": "timestamp"}
],
"indexes": [
{"name": "idx_flow_inst_def", "idxtype": "index", "idxfields": ["flow_def_id"]}
]
}

View File

@ -0,0 +1,21 @@
{
"summary": [{
"name": "node_execution",
"title": "节点执行记录",
"primary": "id",
"catelog": "relation"
}],
"fields": [
{"name": "id", "title": "执行ID", "type": "str", "length": 32, "nullable": "no"},
{"name": "instance_id", "title": "流程实例ID", "type": "str", "length": 32},
{"name": "node_id", "title": "节点ID", "type": "str", "length": 64},
{"name": "input_ctx", "title": "输入上下文", "type": "text"},
{"name": "output_ctx", "title": "输出上下文", "type": "text"},
{"name": "status", "title": "状态", "type": "str", "length": 16},
{"name": "error", "title": "错误信息", "type": "text"},
{"name": "created_at", "title": "执行时间", "type": "timestamp"}
],
"indexes": [
{"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]}
]
}

View File

@ -0,0 +1,20 @@
{
"summary": [{
"name": "subflow_instance",
"title": "子流程实例",
"primary": "id",
"catelog": "relation"
}],
"fields": [
{"name": "id", "title": "子流程ID", "type": "str", "length": 32, "nullable": "no"},
{"name": "parent_instance_id", "title": "父流程实例ID", "type": "str", "length": 32},
{"name": "parent_node_id", "title": "父节点ID", "type": "str", "length": 64},
{"name": "child_instance_id", "title": "子流程实例ID", "type": "str", "length": 32},
{"name": "status", "title": "状态", "type": "str", "length": 16},
{"name": "created_at", "title": "创建时间", "type": "timestamp"}
],
"indexes": [
{"name": "idx_subflow_parent", "idxtype": "index", "idxfields": ["parent_instance_id"]},
{"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]}
]
}