From 172b7115cb1cab464e7b2742b399865e84513161 Mon Sep 17 00:00:00 2001 From: yumoqing Date: Thu, 12 Mar 2026 13:33:24 +0800 Subject: [PATCH] bugfix --- dagflow/dagflow.py | 67 +++++++++++++++++++++++++++++++++++-- models/flow_definition.json | 2 ++ models/flow_instance.json | 1 + models/nod_transfer.json | 17 ++++++++++ models/node_execution.json | 28 +++++++++++----- 5 files changed, 104 insertions(+), 11 deletions(-) create mode 100644 models/nod_transfer.json diff --git a/dagflow/dagflow.py b/dagflow/dagflow.py index cde1006..c646d32 100644 --- a/dagflow/dagflow.py +++ b/dagflow/dagflow.py @@ -49,13 +49,21 @@ NODE_EXECUTION_TABLE = { "summary": [{"name": "node_execution", "primary": "id"}], "fields": [ {"name": "id", "type": "str", "length": 32, "nullable": "no"}, + {"name": "type", "type": "str", "length": 32}, {"name": "org_id", "type": "str", "length": 32}, {"name": "instance_id", "type": "str", "length": 32}, {"name": "node_id", "type": "str", "length": 64}, {"name": "input_ctx", "type": "text"}, {"name": "output_ctx", "type": "text"}, + {"name": "ctx", "type": "text"}, + {"name": "ctx_ext", "type": "text"}, {"name": "status", "type": "str", "length": 16}, {"name": "error", "type": "text"}, + {"name": "input", "type": "text"}, + {"name": "output", "type": "text"}, + {"name": "subinst_id", "type": "str", "length": 32}, + {"name": "role", "type": "str", "length": 64}, + {"name": "assignee", "type": "str", "length": 64}, {"name": "running_at", "type": "timestamp"}, {"name": "stopping_at", "type": "timestamp"}, {"name": "created_at", "type": "timestamp"} @@ -176,6 +184,13 @@ class FlowEngine: # ------------------------------ async def step(self, org_id, instance_id): + """ + 从节点执行表中取出所有状态不是结束的执行节点,检查是否可以推进一步 + 结束状态为: + * succeeded + * failed + * cancelled + 推进:如果 env = ServerEnv() async with get_sor_context(env, 'dagflow') as sor: insts = await sor.R('flow_instance', { @@ -224,7 +239,7 @@ class FlowEngine: next_nodes.add(node_id) continue if ntype == 'task': - if node.get('join') + if node.get('join'): await sor.C('node_execution', { 'id': env.uuid(), 'org_id': org_id, @@ -292,6 +307,30 @@ class FlowEngine: nnodes.add((edge['to'], str(e))) return nnodes + async def is_ok_to_create_new_node_exe(self, sor, node, edges, inst): + join = node.get('join) + if join is None: + return True + return False + + async def create_new_node_exe(self, sor, node, edges, inst): + flg = await self.is_ok_to_create_new_node_exe(sor, node, edges, inst) + if flg: + ctx = inst['ctx'].copy, + ctx_ext = ext_data or {} + id = env.uuid() + await sor.C('node_execution', { + 'id': id, + 'org_id': inst['org_id'], + 'instance_id': inst['id], + 'node_id': node['id'], + 'ctx': json.dumps(ctx, ensure_ascii=False), + 'ctx_ext': json.dumps(ctx_ext, ensure_ascii=False), + 'input_ctx': node['input_ctx'], + 'status': 'pendding' + }) + return id + async def is_ok_to_start(self, sor, node, edges, inst): join = node.get('join') if not join: @@ -401,5 +440,27 @@ where instance_id=${instance_id}$ }) async def add_new_workflow(request, params_kw={}): - arams_kw.name - params_kw.dsl + name = params_kw.name + dsl = params_kw.dsl + env = request._run_ns + orgid = await env.get_userorgid() + async with get_sor_context(env, 'dagflow') as sor: + ns = { + 'id': env.uuid(), + 'name': name, + 'dsl': dsl, + 'version': '1', + 'created_at': timestampstr(), + 'org_id': orgid + } + await sor.C('dagflow_definition', ns) + return { + 'status': 'SUCCEEDED', + 'data': { + 'dagflow_id': ns['id'] + } + } + return { + 'status': 'FAILED', + 'error': 'add workflow error' + } diff --git a/models/flow_definition.json b/models/flow_definition.json index faf91b9..34011b6 100644 --- a/models/flow_definition.json +++ b/models/flow_definition.json @@ -8,8 +8,10 @@ "fields": [ {"name": "id", "title": "定义ID", "type": "str", "length": 32, "nullable": "no"}, {"name": "name", "title": "流程名称", "type": "str", "length": 128}, +{"name": "description", "title": "流程描述", "type": "text"}, {"name": "version", "title": "版本", "type": "str", "length": 32}, {"name": "dsl", "title": "YAML DSL", "type": "text"}, +{"name": "ctxfields", "title": "上下文属性集", "type": "text"}, {"name": "created_at", "title": "创建时间", "type": "timestamp"} ], "indexes": [ diff --git a/models/flow_instance.json b/models/flow_instance.json index ecf175c..511f733 100644 --- a/models/flow_instance.json +++ b/models/flow_instance.json @@ -7,6 +7,7 @@ }], "fields": [ {"name": "id", "title": "实例ID", "type": "str", "length": 32, "nullable": "no"}, +{"name": "org_id", "title": "机构id", "type": "str", "length": 32, "nullable": "no"}, {"name": "flow_def_id", "title": "流程定义ID", "type": "str", "length": 32}, {"name": "status", "title": "状态", "type": "str", "length": 16}, {"name": "ctx", "title": "上下文(JSON)", "type": "text"}, diff --git a/models/nod_transfer.json b/models/nod_transfer.json new file mode 100644 index 0000000..19c294e --- /dev/null +++ b/models/nod_transfer.json @@ -0,0 +1,17 @@ +{ +"summary": [{ +"name": "node_transfer", +"title": "节点转移表", +"primary": "id", +"catelog": "relation" +}], +"fields": [ + {"name": "id", "title": "执行ID", "type": "str", "length": 32, "nullable": "no"}, + {"name": "instance_id", "title": "流程实例ID", "type": "str", "length": 32}, + {"name": "from_neid", "title": "从节点id", "type": "str", "length": 32}, + {"name": "to_neid", "title": "到节点id", "type": "str", "length": 32}, +], +"indexes": [ +{"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]} +] +} diff --git a/models/node_execution.json b/models/node_execution.json index 9fff91a..b8e79e5 100644 --- a/models/node_execution.json +++ b/models/node_execution.json @@ -6,14 +6,26 @@ "catelog": "relation" }], "fields": [ -{"name": "id", "title": "执行ID", "type": "str", "length": 32, "nullable": "no"}, -{"name": "instance_id", "title": "流程实例ID", "type": "str", "length": 32}, -{"name": "node_id", "title": "节点ID", "type": "str", "length": 64}, -{"name": "input_ctx", "title": "输入上下文", "type": "text"}, -{"name": "output_ctx", "title": "输出上下文", "type": "text"}, -{"name": "status", "title": "状态", "type": "str", "length": 16}, -{"name": "error", "title": "错误信息", "type": "text"}, -{"name": "created_at", "title": "执行时间", "type": "timestamp"} + {"name": "id", "title": "执行ID", "type": "str", "length": 32, "nullable": "no"}, + {"name": "type", "title": "节点类型", "type": "str", "length": 32}, + {"name": "org_id", "title": "机构id", "type": "str", "length": 32}, + {"name": "instance_id", "title": "流程实例ID", "type": "str", "length": 32}, + {"name": "node_id", "title": "节点ID", "type": "str", "length": 64}, + {"name": "input_ctx", "title": "输入上下文", "type": "text"}, + {"name": "output_ctx", "title": "输出上下文", "type": "text"}, + {"name": "ctx", "title": "执行时上下文", "type": "text"}, + {"name": "ctx_ext", "title": "执行时附加上下文", "type": "text"}, + {"name": "status", "title": "状态", "type": "str", "length": 16}, + {"name": "error", "title": "错误信息", "type": "text"}, + {"name": "input", "title": "执行实际输入", "type": "text"}, + {"name": "output", "title": "执行实际输出", "type": "text"}, + {"name": "subinst_id", "title": "子流程实例id", "type": "str", "length": 32}, + {"name": "role", "title": "执行人角色", "type": "str", "length": 64}, + {"name": "assignee", "title": "执行人id", "type": "str", "length": 64}, + {"name": "running_at", "title": "执行时点", "type": "timestamp"}, + {"name": "stopping_at", "title": "结束时点", "type": "timestamp"}, + {"name": "created_at", "title": "执行时间", "type": "timestamp"} + {"name": "next_nodeid", "title": "下一节点id", "type", "str", "length": 32} ], "indexes": [ {"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]}