From 54a1c7a82cf39a46b69dda480aed74d51585b764 Mon Sep 17 00:00:00 2001 From: yumoqing Date: Wed, 11 Mar 2026 18:09:27 +0800 Subject: [PATCH] bugfix --- README.md | 109 +++++++++++++++++++++++---------- SKILL.md | 15 +++++ dagflow/dagflow.py | 147 +++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 229 insertions(+), 42 deletions(-) create mode 100644 SKILL.md diff --git a/README.md b/README.md index cb66dcd..f689645 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,24 @@ * input_ctx / output_ctx * status +input_ctx和output_ctx是jinja2模版的字符串, + +input_ctx使用流程实例上下文(ctx)作为数据来渲染,结果作为节点执行的输入字典 + +output_ctx使用节点执行结果的字典数据作为数据来渲染,结果update到流程的ctx中 + +实例节点有以下节点状态: +* pending 创建后的初始状态 +* running 执行状态 +* done 执行成功状态 +* failed 执行失败状态 +* cancelled 取消状态 +状态变化: +* pending 转running 如果节点没有join属性,则在下个循环直接转running, 并调用节点执行方法,记录开始执行时间 +* running zhuan done 节点执行成功,并记录完成时间 +* running 转 failed 节点执行失败, 并记录完成时间 +* pending 转 cancel 取消执行(手工或流程实例被取消) + --- ### 3.4 subflow_instance @@ -123,8 +141,8 @@ 关键字段: -* parent_instance_id -* parent_node_id +* instance_id +* node_id * child_instance_id * status @@ -146,10 +164,57 @@ nodes: end: type: end + + task1: + title: task1 title + type: task + description: desc + input_ctx:["k1:g1", "k2.kk1:g2"] # 可选, 缺省使用流程实例的全部ctx + output_ctx:["k1:ck1", "k2:ck2.x"] # 可选, 缺失全部输出更新到流程实例的ctx + + subflow1: + title: subf + type: subflow + subflow_id: payment_flow_v2 + input_ctx:["k1:g1", "k2.kk1:g2"] # 可选, 缺省使用流程实例的全部ctx + output_ctx:["k1:ck1", "k2:ck2.x"] # 可选, 子流程实例的ctx到本流程实例的缺失全部输出更新到流程实例的ctx + + humantask1: + title: conform + type: human + description: desc + input_ctx:["k1:g1", "k2.kk1:g2"] # 可选, 缺省使用流程实例的全部ctx + output_ctx:["k1:ck1", "k2:ck2.x"] # 可选, 缺失全部输出更新到流程实例的ctx edges: - from: start to: end + multile_on: ctx.charpter # + when: ctx.x == 'xxx' # 可选,转移条件 +``` +### 流程定义节点数据要求 +除开始节点和结束节点外,所有节点都需要有 +``` +nodes: + node1: + type: # task, human, subflow之一 + title: # 标题 + description: # 节点描述 + input_ctx: # jinja2模版字典数据字符串,用ctx渲染,用于构造给节点任务的输入字典,可选, 缺省使用流程实例的ctx + output_ctx: # jinja2模版字典数据字符串,用节点执行结果字典渲染,结果结果更新ctx, 可选, 缺失全部输出更新到流程实例的ctx +``` + +### join节点 +当一个节点中有join属性是为join节点,join 节点只有前置节点满足join条件是才能执行 +``` +nodes: + end: + type: end + join: + - all + - some(5 or 80%) + - xor + - any ``` --- @@ -173,6 +238,15 @@ edges: ### 4.3 并发(Fork / Join) +#### 多实例节点 +根据上下文的数据如果是数组为每个数据建立一个节点实例 +```yaml +edges: + - from: gen_docs + to: evalution_chapter + multiple_on: ctx.docs.chapters +``` + #### Fork(隐式) ```yaml @@ -183,18 +257,6 @@ edges: to: enrich ``` -#### Join - -```yaml -nodes: - join: - type: join - wait_for: - - validate - - enrich -``` - -join 节点只有在所有依赖节点完成后才会继续。 --- @@ -224,7 +286,7 @@ nodes: 1. 父流程第一次到达 subflow 节点: * 创建子流程实例 - * 根据 input mapping 构建子 ctx + * 根据 input_ctx 构建子 ctx * 父流程阻塞在该节点 2. 子流程运行中: @@ -233,25 +295,11 @@ nodes: 3. 子流程完成: - * 根据 output mapping 合并 ctx + * 根据 output_ctx 合并 ctx * 父流程继续流转 --- -### 5.3 input / output mapping 原则 - -* 子流程只能看到显式声明的输入 -* 父流程只接收显式声明的输出 -* ctx 不做隐式全量共享 - -这样可以保证: - -* 数据边界清晰 -* 子流程可复用 -* 行为可审计 - ---- - ## 6. 执行模型 ### 6.1 推进方式 @@ -298,7 +346,6 @@ status = finished * 失败补偿 * 超时 / 重试 * 表达式安全沙箱 -* 人工任务 --- diff --git a/SKILL.md b/SKILL.md new file mode 100644 index 0000000..04c2843 --- /dev/null +++ b/SKILL.md @@ -0,0 +1,15 @@ +--- +name: workflow-builder +description: 根据用户输入创建工作流 +--- +# workflow-builder +为用户输入的任务创建工作流, 生成dagflow能使用的工作流并将工作流插入到数据库中 + +## 流程引擎 +流程引擎具体说明请看[README.md](README.md) +## 新建流程 +用用户输入的剔除选择厂家的内容后的文本作为文生视频的输入文本来生成视频,使用方式: +将用户输入拆解为流程引擎添加流程的json数据,然后调用以下函数 +``` +CALL:{ "function": "add_new_workflow", "params":{"name":流程名字,"dsl":YAML格式的符合上述流程规范的流程定义} +``` diff --git a/dagflow/dagflow.py b/dagflow/dagflow.py index 103b8c1..cde1006 100644 --- a/dagflow/dagflow.py +++ b/dagflow/dagflow.py @@ -13,6 +13,7 @@ Features: """ import asyncio from appPublic.log import debug +from appPublic.timeUtils import timestampstr from sqlor.dbpools import get_sor_context # --------------------------------------------------------------------- @@ -55,6 +56,8 @@ NODE_EXECUTION_TABLE = { {"name": "output_ctx", "type": "text"}, {"name": "status", "type": "str", "length": 16}, {"name": "error", "type": "text"}, + {"name": "running_at", "type": "timestamp"}, + {"name": "stopping_at", "type": "timestamp"}, {"name": "created_at", "type": "timestamp"} ] } @@ -64,8 +67,8 @@ SUBFLOW_INSTANCE_TABLE = { "fields": [ {"name": "id", "type": "str", "length": 32, "nullable": "no"}, {"name": "org_id", "type": "str", "length": 32}, - {"name": "parent_instance_id", "type": "str", "length": 32}, - {"name": "parent_node_id", "type": "str", "length": 64}, + {"name": "instance_id", "type": "str", "length": 32}, + {"name": "node_id", "type": "str", "length": 64}, {"name": "child_instance_id", "type": "str", "length": 32}, {"name": "status", "type": "str", "length": 16}, {"name": "created_at", "type": "timestamp"} @@ -97,7 +100,26 @@ HUMAN_TASK_TABLE = { import yaml import json from ahserver.serverenv import ServerEnv +import re +def extract_some_value(s): + """ + 从 'some(100)' 或 'some(50%)' 中提取数值。 + Args: + s (str): 输入的字符串 + + Returns: float类型值, 小于1的小数点数字(%模式)或其他 + """ + # 正则解释: 匹配 some( 后跟数字,然后是可选的 % 符号,最后是 ) + match = re.search(r'some\((\d+)(%)?\)', s) + if match: + value = float(match.group(1)) # 提取数字部分并转为整数 + unit = match.group(2) # 提取单位部分 (%) + if unit == '%': + return value / 100.0 + return value + else: + return None class FlowEngine: @@ -201,24 +223,37 @@ class FlowEngine: }) next_nodes.add(node_id) continue + if ntype == 'task': + if node.get('join') + await sor.C('node_execution', { + 'id': env.uuid(), + 'org_id': org_id, + 'instance_id': instance_id, + 'node_id': node_id, + 'input_ctx': json.dumps(ctx), + 'status': 'pendding' + }) + # ---- 如果节点状态在pending状态 --- + if node['status'] == 'pending' and ntype != 'human': + is_ok = await self.is_ok_to_start(sor, node, inst) + if is_ok: + await self.start_running(sor, node) + continue # -------- Normal node -------- + """ await sor.C('node_execution', { 'id': env.uuid(), 'org_id': org_id, 'instance_id': instance_id, 'node_id': node_id, 'input_ctx': json.dumps(ctx), - 'status': 'success' + 'status': 'pendding' }) - - for edge in dsl.get('edges', []): - if edge['from'] != node_id: - continue - cond = edge.get('when') - if cond and not eval(cond, {}, {'ctx': ctx}): - continue - next_nodes.add(edge['to']) + """ + if nstatus == 'done' or nstatus == 'failed': + n_nodes = await self.node_transfer(node, dsl) + next_nodes += n_nodes if next_nodes and all(dsl['nodes'][n]['type'] == 'end' for n in next_nodes): await sor.U('flow_instance', { @@ -236,6 +271,93 @@ class FlowEngine: # Human task APIs # ------------------------------ + async def node_transfer(self, node): + if node['status'] not in ['done', 'failed', 'cancel']: + return + nnode = [] + for edge in dsl.get('edges', []): + if edge['from'] != node_id: + continue + cond = edge.get('when') + if cond and not eval(cond, {}, {'ctx': ctx}): + continue + m_on = edge.get('multiple_on') + ns = { + 'm_on': m_on, + 'ctx': ctx, + 'node': node + } + on_array = await self.get_multiple_array(m_on): + for e in on_array: + nnodes.add((edge['to'], str(e))) + return nnodes + + async def is_ok_to_start(self, sor, node, edges, inst): + join = node.get('join') + if not join: + return True + if join == 'any': + return True + backnodes = [e['from'] for e in edges if e['to'] == node['id']] + sql = """select * from node_execution +where node_id in ${backnodes}$ + and instance_id = ${instance_id}$""" + recs = sor.sqlExe(sql, { + 'backnodes': backnodes, + 'instance_id': inst['id'] + }) + if join == 'all': + for r in recs: + if r.status in ['pending', 'running']: + return False + return True + if join == 'xor': + backnodes = [id for id in backnodes if id != node['id']] + sql = """update from node_execution +set status = 'cancelled' +where status in ['pending', 'running'] + and node_id in ${backnodes}$ + and instance_id = ${instance_id}$""" + await sor.sqlExe(sql, { + 'backnodes': backnodes, + 'instance_id': inst['id'] + }) + return True + some = extract_some_value(join): + sql = """select * from node_transfer +where instance_id=${instance_id}$ + and to_node_id = ${node_id}$""" + trecs = sor.sqlExe(sql, { + 'node_id': node['id'], + 'instance_id': inst['id'] + }) + if some >= 1 and some <= len(trecs): + return True + if some <= float(len(trecs))/float(len(recs)): + return True + return False + + async def start_running(self, sor, node, edges, inst): + if self.is_ok_to_start(sor, node, edges, inst): + await sor.R('node_execution', { + 'id': node['id'], + 'status': 'running' + 'running_time': }) + try: + out = await self.run_auto_task(node) + sor.U('node_execution', { + 'id': node['id'], + 'status': 'done', + 'stopping_at': timestampstr(), + 'output_ctx': json.dumps(out, ensure_ascii=False) + }) + except Exception as e: + sor.U('node_execution', { + 'id': node['id'], + 'status': 'failed', + 'stopping_at': timestampstr() + } + async def list_human_tasks(self, org_id, user_roles): env = ServerEnv() async with get_sor_context(env, 'dagflow') as sor: @@ -278,3 +400,6 @@ class FlowEngine: 'ctx': json.dumps(ctx) }) +async def add_new_workflow(request, params_kw={}): + arams_kw.name + params_kw.dsl