From 34ac11fa71330191d5b0c9776a6faac9ed03ce68 Mon Sep 17 00:00:00 2001 From: yumoqing Date: Thu, 12 Mar 2026 18:05:08 +0800 Subject: [PATCH] bugfix --- README.md | 5 +- dagflow/dagflow.py | 279 +++++++++++++++++++++-------------- dagflow/init.py | 23 ++- models/flow_instance.json | 3 +- models/node_execution.json | 2 +- models/subflow_instance.json | 20 --- 6 files changed, 190 insertions(+), 142 deletions(-) delete mode 100644 models/subflow_instance.json diff --git a/README.md b/README.md index f689645..ea9a93d 100644 --- a/README.md +++ b/README.md @@ -127,12 +127,13 @@ output_ctx使用节点执行结果的字典数据作为数据来渲染,结果u * done 执行成功状态 * failed 执行失败状态 * cancelled 取消状态 +* completed 处理完成 状态变化: -* pending 转running 如果节点没有join属性,则在下个循环直接转running, 并调用节点执行方法,记录开始执行时间 +* pending 转running 如果节点有join属性,检查join条件,满足条件就转执行,否则等条件满足, 没有join属性则直接转running, 并调用节点执行方法,记录开始执行时间, 手工节点则由人工点击工作开始转换 * running zhuan done 节点执行成功,并记录完成时间 * running 转 failed 节点执行失败, 并记录完成时间 * pending 转 cancel 取消执行(手工或流程实例被取消) - +* done, cancelled, failed 转 completed 检查从节点出发的边,做节点转移 --- ### 3.4 subflow_instance diff --git a/dagflow/dagflow.py b/dagflow/dagflow.py index c646d32..27349b2 100644 --- a/dagflow/dagflow.py +++ b/dagflow/dagflow.py @@ -12,6 +12,7 @@ Features: - Query APIs for UI """ import asyncio +from random import randint from appPublic.log import debug from appPublic.timeUtils import timestampstr from sqlor.dbpools import get_sor_context @@ -134,25 +135,28 @@ class FlowEngine: # ------------------------------ # Definition / Instance # ------------------------------ - def __init__(self): + def __init__(self, backid): self.aio = [] + self.backid = backid async def bgtask(self): env = ServerEnv() self.aio = [] cnt = 0 while True: + aio = [] async with get_sor_context(env, 'dagflow') as sor: - self.aio = await sor.R('flow_instance', {'status': 'running'}) - if cnt >= 60: - debug(f'{self.aio=}') - cnt = 0 - for r in self.aio: - self.step(r.org_id, r.id) - await asyncio.sleep(2) - cnt += 1 + sql = """select * from flow_instance +where backid=${backid}$ + and status = 'running'""" + aio = await sor.sqlExe(sql,{ + 'backid': self.backid + }) + for r in aio: + self.step(r) + await asyncio.sleep(1) - async def create_definition(self, org_id, name, version, dsl_text): + async def create_definition(self, org_id, name, description, version, dsl_text, ctxfields): env = ServerEnv() async with get_sor_context(env, 'dagflow') as sor: fid = env.uuid() @@ -160,7 +164,10 @@ class FlowEngine: 'id': fid, 'org_id': org_id, 'name': name, + 'description': description, 'version': version, + 'ctxfields': json.dumps(ctxfields), + 'created_at': timestampstr(), 'dsl': dsl_text }) return fid @@ -169,21 +176,51 @@ class FlowEngine: env = ServerEnv() async with get_sor_context(env, 'dagflow') as sor: iid = env.uuid() - await sor.C('flow_instance', { + ns = { 'id': iid, + 'backid': self.backid, + 'round': 1, 'org_id': org_id, 'flow_def_id': flow_def_id, 'status': 'running', 'ctx': json.dumps(ctx or {}), - 'active_nodes': json.dumps([]) + 'created_at': timestampstr() }) + await sor.C('flow_instance', ns.copy()) + await self.create_start_node_execution(sor, ns) return iid + async def create_start_node_execution(self, sor, inst): + env = ServerEnv() + id = env.uuid() + await sor.C('node_execution', { + 'id': id, + 'type': 'start', + 'instance_id': inst['id'], + 'org_id': inst['org_id'], + 'inst_round': inst['round'], + 'status': 'done', + 'node_id': 'start', + 'created_at': timestampstr() + }) + async def create_end_node_execution(self, sor, inst): + env = ServerEnv() + id = env.uuid() + await sor.C('node_execution', { + 'id': id, + 'type': 'end', + 'instance_id': inst['id'], + 'org_id': inst['org_id'], + 'inst_round': inst['round'], + 'status': 'pending', + 'node_id': 'end', + 'created_at': timestampstr() + }) # ------------------------------ # Execution # ------------------------------ - async def step(self, org_id, instance_id): + async def step(self, inst): """ 从节点执行表中取出所有状态不是结束的执行节点,检查是否可以推进一步 结束状态为: @@ -193,17 +230,6 @@ class FlowEngine: 推进:如果 env = ServerEnv() async with get_sor_context(env, 'dagflow') as sor: - insts = await sor.R('flow_instance', { - 'id': instance_id, - 'org_id': org_id - }) - if not insts: - return - - inst = insts[0] - if inst['status'] != 'running': - return - flow_def = (await sor.R( 'flow_definition', {'id': inst['flow_def_id']} @@ -213,85 +239,78 @@ class FlowEngine: ctx = json.loads(inst['ctx']) active = set(json.loads(inst['active_nodes']) or [dsl['start']]) next_nodes = set() - - for node_id in active: + sql = "select * from node_execution where instance_id=${instance_id}$ and status != 'completed'" + active_nes = await sor.sqlExe(sql, { + 'instance_id': inst.id + }) + for ne in active_nes: + node_id = ne.node_id node = dsl['nodes'][node_id] + node['id'] = node_id ntype = node['type'] - - # -------- Human node -------- - if ntype == 'human': - rows = await sor.R('human_task', { - 'instance_id': instance_id, - 'node_id': node_id, - 'status': 'pending' - }) - if not rows: - await sor.C('human_task', { - 'id': env.uuid(), - 'org_id': org_id, - 'instance_id': instance_id, - 'node_id': node_id, - 'role': node.get('role'), - 'status': 'pending', - 'input': json.dumps(ctx), - 'timeout_at': env.after(node.get('timeout', 0)) + status = ne.status + if status == 'pending': + if ntype == 'human': + if ne.assignee is None: + await self.auto_assign(sor, inst, ne, node) + elif ntype == 'task': + await self.try_start_running(sor, inst, ne, node) + elif ntype == 'subflow': + await self.try_start_subflow(sor, inst, ne, node) + elif ntype == 'start': + await self.node_transfer(sor, inst, ne, node, flow_def) + await sor.U('node_execution', { + 'id': ne.id, + 'status': 'completed' }) - next_nodes.add(node_id) - continue - if ntype == 'task': - if node.get('join'): - await sor.C('node_execution', { - 'id': env.uuid(), - 'org_id': org_id, - 'instance_id': instance_id, - 'node_id': node_id, - 'input_ctx': json.dumps(ctx), - 'status': 'pendding' + elif ntype == 'end': + flg = await self.is_ok_to_step_next(sor, node, edges, inst) + if flg: + await sor.U('node_execution', { + 'id': ne.id, + 'status': 'completed' + }) + await sor.U('flow_instance', { + 'id': inst.id, + 'status': 'completed' + }) + break; + elif status in ['done', 'cancelled', 'failed']: + await self.node_transfer(sor, inst, ne, node, flow_def) + await sor.U('node_execution', { + 'id': ne.id, + 'status': 'completed' }) - - # ---- 如果节点状态在pending状态 --- - if node['status'] == 'pending' and ntype != 'human': - is_ok = await self.is_ok_to_start(sor, node, inst) - if is_ok: - await self.start_running(sor, node) - continue - # -------- Normal node -------- - """ - await sor.C('node_execution', { - 'id': env.uuid(), - 'org_id': org_id, - 'instance_id': instance_id, - 'node_id': node_id, - 'input_ctx': json.dumps(ctx), - 'status': 'pendding' - }) - """ - if nstatus == 'done' or nstatus == 'failed': - n_nodes = await self.node_transfer(node, dsl) - next_nodes += n_nodes - - if next_nodes and all(dsl['nodes'][n]['type'] == 'end' for n in next_nodes): - await sor.U('flow_instance', { - 'id': instance_id, - 'status': 'finished', - 'active_nodes': json.dumps(list(next_nodes)) - }) - else: - await sor.U('flow_instance', { - 'id': instance_id, - 'active_nodes': json.dumps(list(next_nodes)) - }) + self.check_instance_completed(sor, inst) + # ------------------------------ # Human task APIs # ------------------------------ - async def node_transfer(self, node): - if node['status'] not in ['done', 'failed', 'cancel']: - return + async def auto_assign(self, sor, inst, ne, node): + recs = await env.get_org_users(sor, inst.org_id, ne.role) + if recs: + i = randint(len(recs) - 1) + await sor.U('node_execution', { + 'id':ne.id, + 'assignee': recs[i].id + }) + + async def check_instance_completed(self, sor, inst): + sql = "select * from node_execution where instance_id=${instance_id} and type='end'" + recs = await sor.sqlExe(sql, {'instance_id', inst.id}) + if recs: + if recs[0].status == 'completed': + await sor.U('flow_instance', { + 'id': inst.id, + 'status': 'completed' + }) + + async def node_transfer(self, sor, dsl, inst, ne, node, flow_def): nnode = [] for edge in dsl.get('edges', []): - if edge['from'] != node_id: + if edge['from'] != ne.node_id: continue cond = edge.get('when') if cond and not eval(cond, {}, {'ctx': ctx}): @@ -305,7 +324,47 @@ class FlowEngine: on_array = await self.get_multiple_array(m_on): for e in on_array: nnodes.add((edge['to'], str(e))) - return nnodes + for node, ctx_ext in nnodes: + x = await self.is_ok_to_create_new_node_exe(sor, node, edges, inst) + if x: + env = ServerEnv() + id = env.uuid() + ns = { + 'id': id, + 'type': node['type'], + 'inst_round': inst.round, + 'org_id': inst.org_id, + 'node_id': node['id'], + 'input_ctx': node.get('input_ctx'), + 'output_ctx': node.get('output_ctx'), + 'status': 'pending', + 'ctx': json.dumps(inst.ctx.copy(), ensure_ascii=False), + 'ctx_ext': json.dumps(ctx_ext.copy(), ensure_ascii=False), + 'created_at': timestampstr() + } + if node['type'] == 'human': + ns.update({ + 'role': node.get('role') + }) + elif node['type'] == 'task': + pass + elif node['type'] == 'subflow': + eng = env.template_engine + ctx = inst.ctx + if node.get('input_ctx'): + ctx = eng.renders(node['input_ctx'], ctx) + + subinst_id = await self.create_instance(org_id, node.flow_def_id, ctx=ctx) + ns.update({ + 'subinst_id': subinst_id + }) + elif node['type'] == 'end': + ns.update(' + await self.create_end_node_execution(sor, inst) + + await sor.C('node_execution', ns) + + if node.join async def is_ok_to_create_new_node_exe(self, sor, node, edges, inst): join = node.get('join) @@ -331,7 +390,7 @@ class FlowEngine: }) return id - async def is_ok_to_start(self, sor, node, edges, inst): + async def is_ok_to_step_next(self, sor, node, edges, inst): join = node.get('join') if not join: return True @@ -439,28 +498,24 @@ where instance_id=${instance_id}$ 'ctx': json.dumps(ctx) }) +def get_engine(): + env = ServerEnv() + cnt = len(env.flow_engines) + id = randint(cnt-1) + return env.flow_engines[id] + async def add_new_workflow(request, params_kw={}): name = params_kw.name dsl = params_kw.dsl env = request._run_ns orgid = await env.get_userorgid() - async with get_sor_context(env, 'dagflow') as sor: - ns = { - 'id': env.uuid(), - 'name': name, - 'dsl': dsl, - 'version': '1', - 'created_at': timestampstr(), - 'org_id': orgid - } - await sor.C('dagflow_definition', ns) - return { - 'status': 'SUCCEEDED', - 'data': { - 'dagflow_id': ns['id'] - } - } + ctxfileds = params_kw.ctxfields or [] + description = params_kw.description + engine = get_engine() + id = await engine.create_definition(org_id, name, description, '1', dsl, ctxfields) return { - 'status': 'FAILED', - 'error': 'add workflow error' + 'status': 'SUCCEEDED', + 'data': { + 'dagflow_id': id + } } diff --git a/dagflow/init.py b/dagflow/init.py index a45ba45..ed1b2c8 100644 --- a/dagflow/init.py +++ b/dagflow/init.py @@ -1,18 +1,29 @@ import asyncio from functools import partial +from appPublic.jsonConfig import getConfig from ahserver.serverenv import ServerEnv from ahserver.configuredServer import add_cleanupctx from .dagflow import FlowEngine -async def dagbacktask(engine, app): - task = asyncio.create_task(engine.bgtask()) +async def dagbacktask(engines, app): + tasks = [] + for e in engines: + task = asyncio.create_task(e.bgtask()) + tasks.append(task) yield - task.cancel() + for task in tasks: + task.cancel() def load_dagflow(): - engine = FlowEngine() - f = partial(dagbacktask, engine) + config = getConfig() + flow_engines = [] + cnt = config.dagflow_job_cnt or 1 + for i in range(cnt): + e = FlowEngine(i) + flow_engines.append(e) + + f = partial(dagbacktask, flow_engines) add_cleanupctx(f) env = ServerEnv() - env.workflow_engine = engine + env.flow_engines = flow_engines diff --git a/models/flow_instance.json b/models/flow_instance.json index 511f733..f59c504 100644 --- a/models/flow_instance.json +++ b/models/flow_instance.json @@ -7,11 +7,12 @@ }], "fields": [ {"name": "id", "title": "实例ID", "type": "str", "length": 32, "nullable": "no"}, +{"name": "backid", "title": "后台id", "type": "short", "nullable": "no"}, +{"name": "round", "title": "当前论次", "type": "short"}, {"name": "org_id", "title": "机构id", "type": "str", "length": 32, "nullable": "no"}, {"name": "flow_def_id", "title": "流程定义ID", "type": "str", "length": 32}, {"name": "status", "title": "状态", "type": "str", "length": 16}, {"name": "ctx", "title": "上下文(JSON)", "type": "text"}, -{"name": "active_nodes", "title": "当前节点(JSON)", "type": "text"}, {"name": "created_at", "title": "创建时间", "type": "timestamp"} ], "indexes": [ diff --git a/models/node_execution.json b/models/node_execution.json index b8e79e5..3519aba 100644 --- a/models/node_execution.json +++ b/models/node_execution.json @@ -8,6 +8,7 @@ "fields": [ {"name": "id", "title": "执行ID", "type": "str", "length": 32, "nullable": "no"}, {"name": "type", "title": "节点类型", "type": "str", "length": 32}, + {"name": "inst_round", "title": "执行论次", "type": "short"}, {"name": "org_id", "title": "机构id", "type": "str", "length": 32}, {"name": "instance_id", "title": "流程实例ID", "type": "str", "length": 32}, {"name": "node_id", "title": "节点ID", "type": "str", "length": 64}, @@ -25,7 +26,6 @@ {"name": "running_at", "title": "执行时点", "type": "timestamp"}, {"name": "stopping_at", "title": "结束时点", "type": "timestamp"}, {"name": "created_at", "title": "执行时间", "type": "timestamp"} - {"name": "next_nodeid", "title": "下一节点id", "type", "str", "length": 32} ], "indexes": [ {"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]} diff --git a/models/subflow_instance.json b/models/subflow_instance.json deleted file mode 100644 index 5f8db98..0000000 --- a/models/subflow_instance.json +++ /dev/null @@ -1,20 +0,0 @@ -{ -"summary": [{ -"name": "subflow_instance", -"title": "子流程实例", -"primary": "id", -"catelog": "relation" -}], -"fields": [ -{"name": "id", "title": "子流程ID", "type": "str", "length": 32, "nullable": "no"}, -{"name": "parent_instance_id", "title": "父流程实例ID", "type": "str", "length": 32}, -{"name": "parent_node_id", "title": "父节点ID", "type": "str", "length": 64}, -{"name": "child_instance_id", "title": "子流程实例ID", "type": "str", "length": 32}, -{"name": "status", "title": "状态", "type": "str", "length": 16}, -{"name": "created_at", "title": "创建时间", "type": "timestamp"} -], -"indexes": [ -{"name": "idx_subflow_parent", "idxtype": "index", "idxfields": ["parent_instance_id"]}, -{"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]} -] -}