commit 846f2d68fd9a5ccb82a1edeeee1cb112c19437b3 Author: yumoqing Date: Mon Jan 26 21:17:15 2026 +0800 first commit diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/dagflow.py b/dagflow.py new file mode 100644 index 0000000..cbdd283 --- /dev/null +++ b/dagflow.py @@ -0,0 +1,218 @@ +# -*- coding: utf-8 -*- +""" +Workflow Engine v1.0 +=================== + +Features: +- FlowDefinition / FlowInstance separation +- YAML DSL for flow definition +- Conditional transitions +- Concurrent nodes (fork / join) +- Subflow support +- End node semantics + +This is a **complete, minimal, coherent reference implementation**. +No framework binding (aiohttp/sqlor) yet – engine core only. +""" + +import yaml +import uuid +from typing import Dict, List, Set, Optional + +# ----------------------------- +# Definition layer +# ----------------------------- + +class NodeDefinition: + def __init__(self, id: str, type: str, config: dict | None = None): + self.id = id + self.type = type # task / decision / join / subflow / end + self.config = config or {} + + +class EdgeDefinition: + def __init__(self, source: str, target: str, condition: str | None = None): + self.source = source + self.target = target + self.condition = condition + + +class FlowDefinition: + def __init__(self, id: str, start: str, + nodes: Dict[str, NodeDefinition], + edges: List[EdgeDefinition]): + self.id = id + self.start = start + self.nodes = nodes + self.edges = edges + + def outgoing(self, node_id: str) -> List[EdgeDefinition]: + return [e for e in self.edges if e.source == node_id] + + +class FlowDefinitionLoader: + @staticmethod + def from_yaml(text: str) -> FlowDefinition: + data = yaml.safe_load(text) + nodes = { + nid: NodeDefinition(nid, v['type'], v) + for nid, v in data['nodes'].items() + } + edges = [EdgeDefinition(e['from'], e['to'], e.get('when')) + for e in data.get('edges', [])] + return FlowDefinition( + id=data['id'], + start=data['start'], + nodes=nodes, + edges=edges + ) + + +# ----------------------------- +# Instance layer +# ----------------------------- + +class NodeExecution: + def __init__(self, instance_id: str, node_id: str, input_ctx: dict): + self.instance_id = instance_id + self.node_id = node_id + self.input_ctx = input_ctx + self.output_ctx: Optional[dict] = None + self.status = 'running' # running / success / failed + self.error: Optional[str] = None + + +class FlowInstance: + def __init__(self, flow_def: FlowDefinition, ctx: dict | None = None): + self.id = uuid.uuid4().hex + self.flow_def = flow_def + self.ctx = ctx or {} + self.active_nodes: Set[str] = {flow_def.start} + self.status = 'running' + self.executions: List[NodeExecution] = [] + self.completed_nodes: Set[str] = set() + + +# ----------------------------- +# Node executors +# ----------------------------- + +class BaseNodeExecutor: + def run(self, instance: FlowInstance, node_def: NodeDefinition) -> Optional[dict]: + raise NotImplementedError + + +class TaskNodeExecutor(BaseNodeExecutor): + def run(self, instance, node_def): + # Placeholder: real impl should call skill / function + return {} + + +class DecisionNodeExecutor(BaseNodeExecutor): + def run(self, instance, node_def): + return {} + + +class JoinNodeExecutor(BaseNodeExecutor): + def run(self, instance, node_def): + wait_for = set(node_def.config.get('wait_for', [])) + if not wait_for.issubset(instance.completed_nodes): + return None # block + return {} + + +class SubFlowNodeExecutor(BaseNodeExecutor): + def run(self, instance, node_def): + # Simplified: real impl would create & step child FlowInstance + return {} + + +class EndNodeExecutor(BaseNodeExecutor): + def run(self, instance, node_def): + return {} + + +EXECUTORS = { + 'task': TaskNodeExecutor(), + 'decision': DecisionNodeExecutor(), + 'join': JoinNodeExecutor(), + 'subflow': SubFlowNodeExecutor(), + 'end': EndNodeExecutor(), +} + + +# ----------------------------- +# Engine +# ----------------------------- + +class FlowEngine: + def step(self, instance: FlowInstance): + if instance.status != 'running': + return + + next_active: Set[str] = set() + + for node_id in list(instance.active_nodes): + node_def = instance.flow_def.nodes[node_id] + executor = EXECUTORS[node_def.type] + + execution = NodeExecution(instance.id, node_id, instance.ctx.copy()) + result = executor.run(instance, node_def) + + if result is None: + # blocked (join / subflow) + next_active.add(node_id) + continue + + execution.output_ctx = result + execution.status = 'success' + instance.executions.append(execution) + instance.completed_nodes.add(node_id) + + # merge ctx + instance.ctx.update(result) + + # compute transitions + for edge in instance.flow_def.outgoing(node_id): + if edge.condition: + if not eval(edge.condition, {}, {'ctx': instance.ctx}): + continue + next_active.add(edge.target) + + instance.active_nodes = next_active + + # end check + if instance.active_nodes and all( + instance.flow_def.nodes[n].type == 'end' + for n in instance.active_nodes + ): + instance.status = 'finished' + + +# ----------------------------- +# Example +# ----------------------------- + +if __name__ == '__main__': + yaml_text = """ + id: demo_flow + start: start + nodes: + start: + type: task + end: + type: end + edges: + - from: start + to: end + """ + + flow_def = FlowDefinitionLoader.from_yaml(yaml_text) + instance = FlowInstance(flow_def, ctx={'hello': 'world'}) + + engine = FlowEngine() + while instance.status == 'running': + engine.step(instance) + + print('finished:', instance.ctx) + diff --git a/dagflow/dagflow.py b/dagflow/dagflow.py new file mode 100644 index 0000000..52efe91 --- /dev/null +++ b/dagflow/dagflow.py @@ -0,0 +1,271 @@ +# -*- coding: utf-8 -*- +""" +Workflow Engine v1.1 (sqlor-backed) +================================= + +This version adds: +- Persistent tables designed per table.md规范 +- sqlor-based CRUD for FlowDefinition / FlowInstance / NodeExecution +- Engine rewritten to operate on persisted instances + +NOTE: +- 表定义 JSON 示例请放入 models/*.json +- 本文件假设 sqlor / ServerEnv 已可用 +""" + +# --------------------------------------------------------------------- +# 表定义(models/*.json)——按 table.md 规范 +# --------------------------------------------------------------------- + +# models/flow_definition.json +FLOW_DEFINITION_TABLE = { + "summary": [{ + "name": "flow_definition", + "title": "流程定义", + "primary": "id", + "catelog": "entity" + }], + "fields": [ + {"name": "id", "title": "定义ID", "type": "str", "length": 32, "nullable": "no"}, + {"name": "name", "title": "流程名称", "type": "str", "length": 128}, + {"name": "version", "title": "版本", "type": "str", "length": 32}, + {"name": "dsl", "title": "YAML DSL", "type": "text"}, + {"name": "created_at", "title": "创建时间", "type": "timestamp"} + ], + "indexes": [ + {"name": "idx_flow_def_name", "idxtype": "index", "idxfields": ["name"]} + ] +} + +# models/flow_instance.json +FLOW_INSTANCE_TABLE = { + "summary": [{ + "name": "flow_instance", + "title": "流程实例", + "primary": "id", + "catelog": "entity" + }], + "fields": [ + {"name": "id", "title": "实例ID", "type": "str", "length": 32, "nullable": "no"}, + {"name": "flow_def_id", "title": "流程定义ID", "type": "str", "length": 32}, + {"name": "status", "title": "状态", "type": "str", "length": 16}, + {"name": "ctx", "title": "上下文(JSON)", "type": "text"}, + {"name": "active_nodes", "title": "当前节点(JSON)", "type": "text"}, + {"name": "created_at", "title": "创建时间", "type": "timestamp"} + ], + "indexes": [ + {"name": "idx_flow_inst_def", "idxtype": "index", "idxfields": ["flow_def_id"]} + ] +} + +# models/node_execution.json +NODE_EXECUTION_TABLE = { + "summary": [{ + "name": "node_execution", + "title": "节点执行记录", + "primary": "id", + "catelog": "relation" + }], + "fields": [ + {"name": "id", "title": "执行ID", "type": "str", "length": 32, "nullable": "no"}, + {"name": "instance_id", "title": "流程实例ID", "type": "str", "length": 32}, + {"name": "node_id", "title": "节点ID", "type": "str", "length": 64}, + {"name": "input_ctx", "title": "输入上下文", "type": "text"}, + {"name": "output_ctx", "title": "输出上下文", "type": "text"}, + {"name": "status", "title": "状态", "type": "str", "length": 16}, + {"name": "error", "title": "错误信息", "type": "text"}, + {"name": "created_at", "title": "执行时间", "type": "timestamp"} + ], + "indexes": [ + {"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]} + ] +} + +# models/subflow_instance.json +SUBFLOW_INSTANCE_TABLE = { + "summary": [{ + "name": "subflow_instance", + "title": "子流程实例", + "primary": "id", + "catelog": "relation" + }], + "fields": [ + {"name": "id", "title": "子流程ID", "type": "str", "length": 32, "nullable": "no"}, + {"name": "parent_instance_id", "title": "父流程实例ID", "type": "str", "length": 32}, + {"name": "parent_node_id", "title": "父节点ID", "type": "str", "length": 64}, + {"name": "child_instance_id", "title": "子流程实例ID", "type": "str", "length": 32}, + {"name": "status", "title": "状态", "type": "str", "length": 16}, + {"name": "created_at", "title": "创建时间", "type": "timestamp"} + ], + "indexes": [ + {"name": "idx_subflow_parent", "idxtype": "index", "idxfields": ["parent_instance_id"]} + ] +} + ], + "indexes": [ + {"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]} + ] +} + +# --------------------------------------------------------------------- +# Engine implementation (sqlor) +# --------------------------------------------------------------------- + +import yaml +import json +from sqlor.dbpools import get_sor_context +from ahserver.serverenv import ServerEnv + + +class FlowEngine: + """Persistent workflow engine""" + + async def create_definition(self, name: str, version: str, dsl_text: str): + env = ServerEnv() + async with (env, 'workflow') as sor: + flow_def_id = env.uuid() + await sor.C('flow_definition', { + 'id': flow_def_id, + 'name': name, + 'version': version, + 'dsl': dsl_text + }) + return flow_def_id + + async def create_instance(self, flow_def_id: str, ctx: dict | None = None): + env = ServerEnv() + async with (env, 'workflow') as sor: + inst_id = env.uuid() + await sor.C('flow_instance', { + 'id': inst_id, + 'flow_def_id': flow_def_id, + 'status': 'running', + 'ctx': json.dumps(ctx or {}), + 'active_nodes': json.dumps([]) + }) + return inst_id + + async def step(self, instance_id: str): + env = ServerEnv() + async with (env, 'workflow') as sor: + rows = await sor.R('flow_instance', {'id': instance_id}) + if not rows: + return + inst = rows[0] + if inst['status'] != 'running': + return + + defs = await sor.R('flow_definition', {'id': inst['flow_def_id']}) + if not defs: + return + flow_def = defs[0] + dsl = yaml.safe_load(flow_def['dsl'])(flow_def['dsl']) + + ctx = json.loads(inst['ctx']) + active_nodes = set(json.loads(inst['active_nodes'])) + if not active_nodes: + active_nodes = {dsl['start']} + + next_nodes = set() + + for node_id in active_nodes: + node_def = dsl['nodes'][node_id] + ntype = node_def['type'] + + # --- SubFlow handling --- + if ntype == 'subflow': + rows = await sor.R('subflow_instance', { + 'parent_instance_id': instance_id, + 'parent_node_id': node_id + }) + + # 解析 input / output mapping + input_map = node_def.get('input', {}) + output_map = node_def.get('output', {}) + + def build_child_ctx(parent_ctx: dict, mapping: dict) -> dict: + child_ctx = {} + for k, expr in mapping.items(): + # expr like: ctx.xxx.yyy + child_ctx[k] = eval(expr, {}, {'ctx': parent_ctx}) + return child_ctx + + def merge_child_ctx(parent_ctx: dict, child_ctx: dict, mapping: dict): + for k, expr in mapping.items(): + # expr like: ctx.xxx + target = expr.replace('ctx.', '') + parent_ctx[target] = child_ctx.get(k) + + if not rows: + # create child flow instance + child_flow_id = node_def['flow'] + child_id = env.uuid() + child_ctx = build_child_ctx(ctx, input_map) + + await sor.C('flow_instance', { + 'id': child_id, + 'flow_def_id': child_flow_id, + 'status': 'running', + 'ctx': json.dumps(child_ctx), + 'active_nodes': json.dumps([]) + }) + await sor.C('subflow_instance', { + 'id': env.uuid(), + 'parent_instance_id': instance_id, + 'parent_node_id': node_id, + 'child_instance_id': child_id, + 'status': 'running' + }) + next_nodes.add(node_id) + continue + + sub = rows[0] + child_rows = await sor.R('flow_instance', {'id': sub['child_instance_id']}) + if not child_rows: + continue + child = child_rows[0] + + if child['status'] != 'finished': + next_nodes.add(node_id) + continue + + # merge ctx by output mapping + child_ctx = json.loads(child['ctx']) + merge_child_ctx(ctx, child_ctx, output_map) + + await sor.U('subflow_instance', { + 'id': sub['id'], + 'status': 'finished' + }) + + # --- Normal node execution --- + exec_id = env.uuid() + await sor.C('node_execution', { + 'id': exec_id, + 'instance_id': instance_id, + 'node_id': node_id, + 'input_ctx': json.dumps(ctx), + 'status': 'success' + }) + + for edge in dsl.get('edges', []): + if edge['from'] != node_id: + continue + cond = edge.get('when') + if cond and not eval(cond, {}, {'ctx': ctx}): + continue + next_nodes.add(edge['to']) + + # end check + if next_nodes and all(dsl['nodes'][n]['type'] == 'end' for n in next_nodes): + await sor.U('flow_instance', { + 'id': instance_id, + 'status': 'finished', + 'active_nodes': json.dumps(list(next_nodes)) + }) + else: + await sor.U('flow_instance', { + 'id': instance_id, + 'active_nodes': json.dumps(list(next_nodes)) + }) + diff --git a/models/flow_definition.json b/models/flow_definition.json new file mode 100644 index 0000000..faf91b9 --- /dev/null +++ b/models/flow_definition.json @@ -0,0 +1,18 @@ +{ +"summary": [{ +"name": "flow_definition", +"title": "流程定义", +"primary": "id", +"catelog": "entity" +}], +"fields": [ +{"name": "id", "title": "定义ID", "type": "str", "length": 32, "nullable": "no"}, +{"name": "name", "title": "流程名称", "type": "str", "length": 128}, +{"name": "version", "title": "版本", "type": "str", "length": 32}, +{"name": "dsl", "title": "YAML DSL", "type": "text"}, +{"name": "created_at", "title": "创建时间", "type": "timestamp"} +], +"indexes": [ +{"name": "idx_flow_def_name", "idxtype": "index", "idxfields": ["name"]} +] +} diff --git a/models/flow_instance.json b/models/flow_instance.json new file mode 100644 index 0000000..ecf175c --- /dev/null +++ b/models/flow_instance.json @@ -0,0 +1,19 @@ +{ +"summary": [{ +"name": "flow_instance", +"title": "流程实例", +"primary": "id", +"catelog": "entity" +}], +"fields": [ +{"name": "id", "title": "实例ID", "type": "str", "length": 32, "nullable": "no"}, +{"name": "flow_def_id", "title": "流程定义ID", "type": "str", "length": 32}, +{"name": "status", "title": "状态", "type": "str", "length": 16}, +{"name": "ctx", "title": "上下文(JSON)", "type": "text"}, +{"name": "active_nodes", "title": "当前节点(JSON)", "type": "text"}, +{"name": "created_at", "title": "创建时间", "type": "timestamp"} +], +"indexes": [ +{"name": "idx_flow_inst_def", "idxtype": "index", "idxfields": ["flow_def_id"]} +] +} diff --git a/models/node_execution.json b/models/node_execution.json new file mode 100644 index 0000000..9fff91a --- /dev/null +++ b/models/node_execution.json @@ -0,0 +1,21 @@ +{ +"summary": [{ +"name": "node_execution", +"title": "节点执行记录", +"primary": "id", +"catelog": "relation" +}], +"fields": [ +{"name": "id", "title": "执行ID", "type": "str", "length": 32, "nullable": "no"}, +{"name": "instance_id", "title": "流程实例ID", "type": "str", "length": 32}, +{"name": "node_id", "title": "节点ID", "type": "str", "length": 64}, +{"name": "input_ctx", "title": "输入上下文", "type": "text"}, +{"name": "output_ctx", "title": "输出上下文", "type": "text"}, +{"name": "status", "title": "状态", "type": "str", "length": 16}, +{"name": "error", "title": "错误信息", "type": "text"}, +{"name": "created_at", "title": "执行时间", "type": "timestamp"} +], +"indexes": [ +{"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]} +] +} diff --git a/models/subflow_instance.json b/models/subflow_instance.json new file mode 100644 index 0000000..5f8db98 --- /dev/null +++ b/models/subflow_instance.json @@ -0,0 +1,20 @@ +{ +"summary": [{ +"name": "subflow_instance", +"title": "子流程实例", +"primary": "id", +"catelog": "relation" +}], +"fields": [ +{"name": "id", "title": "子流程ID", "type": "str", "length": 32, "nullable": "no"}, +{"name": "parent_instance_id", "title": "父流程实例ID", "type": "str", "length": 32}, +{"name": "parent_node_id", "title": "父节点ID", "type": "str", "length": 64}, +{"name": "child_instance_id", "title": "子流程实例ID", "type": "str", "length": 32}, +{"name": "status", "title": "状态", "type": "str", "length": 16}, +{"name": "created_at", "title": "创建时间", "type": "timestamp"} +], +"indexes": [ +{"name": "idx_subflow_parent", "idxtype": "index", "idxfields": ["parent_instance_id"]}, +{"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]} +] +}