This commit is contained in:
yumoqing 2026-03-11 18:09:27 +08:00
parent e27f743cce
commit 54a1c7a82c
3 changed files with 229 additions and 42 deletions

109
README.md
View File

@ -115,6 +115,24 @@
* input_ctx / output_ctx * input_ctx / output_ctx
* status * status
input_ctx和output_ctx是jinja2模版的字符串
input_ctx使用流程实例上下文ctx作为数据来渲染结果作为节点执行的输入字典
output_ctx使用节点执行结果的字典数据作为数据来渲染结果update到流程的ctx中
实例节点有以下节点状态:
* pending 创建后的初始状态
* running 执行状态
* done 执行成功状态
* failed 执行失败状态
* cancelled 取消状态
状态变化:
* pending 转running 如果节点没有join属性则在下个循环直接转running 并调用节点执行方法,记录开始执行时间
* running zhuan done 节点执行成功,并记录完成时间
* running 转 failed 节点执行失败, 并记录完成时间
* pending 转 cancel 取消执行(手工或流程实例被取消)
--- ---
### 3.4 subflow_instance ### 3.4 subflow_instance
@ -123,8 +141,8 @@
关键字段: 关键字段:
* parent_instance_id * instance_id
* parent_node_id * node_id
* child_instance_id * child_instance_id
* status * status
@ -147,9 +165,56 @@ nodes:
end: end:
type: end type: end
task1:
title: task1 title
type: task
description: desc
input_ctx:["k1:g1", "k2.kk1:g2"] # 可选, 缺省使用流程实例的全部ctx
output_ctx:["k1:ck1", "k2:ck2.x"] # 可选, 缺失全部输出更新到流程实例的ctx
subflow1:
title: subf
type: subflow
subflow_id: payment_flow_v2
input_ctx:["k1:g1", "k2.kk1:g2"] # 可选, 缺省使用流程实例的全部ctx
output_ctx:["k1:ck1", "k2:ck2.x"] # 可选, 子流程实例的ctx到本流程实例的缺失全部输出更新到流程实例的ctx
humantask1:
title: conform
type: human
description: desc
input_ctx:["k1:g1", "k2.kk1:g2"] # 可选, 缺省使用流程实例的全部ctx
output_ctx:["k1:ck1", "k2:ck2.x"] # 可选, 缺失全部输出更新到流程实例的ctx
edges: edges:
- from: start - from: start
to: end to: end
multile_on: ctx.charpter #
when: ctx.x == 'xxx' # 可选,转移条件
```
### 流程定义节点数据要求
除开始节点和结束节点外,所有节点都需要有
```
nodes:
node1:
type: # task, human, subflow之一
title # 标题
description: # 节点描述
input_ctx: # jinja2模版字典数据字符串用ctx渲染用于构造给节点任务的输入字典可选 缺省使用流程实例的ctx
output_ctx: # jinja2模版字典数据字符串用节点执行结果字典渲染结果结果更新ctx 可选, 缺失全部输出更新到流程实例的ctx
```
### join节点
当一个节点中有join属性是为join节点join 节点只有前置节点满足join条件是才能执行
```
nodes:
end:
type: end
join:
- all
- some(5 or 80%)
- xor
- any
``` ```
--- ---
@ -173,6 +238,15 @@ edges:
### 4.3 并发Fork / Join ### 4.3 并发Fork / Join
#### 多实例节点
根据上下文的数据如果是数组为每个数据建立一个节点实例
```yaml
edges:
- from: gen_docs
to: evalution_chapter
multiple_on: ctx.docs.chapters
```
#### Fork隐式 #### Fork隐式
```yaml ```yaml
@ -183,18 +257,6 @@ edges:
to: enrich to: enrich
``` ```
#### Join
```yaml
nodes:
join:
type: join
wait_for:
- validate
- enrich
```
join 节点只有在所有依赖节点完成后才会继续。
--- ---
@ -224,7 +286,7 @@ nodes:
1. 父流程第一次到达 subflow 节点: 1. 父流程第一次到达 subflow 节点:
* 创建子流程实例 * 创建子流程实例
* 根据 input mapping 构建子 ctx * 根据 input_ctx 构建子 ctx
* 父流程阻塞在该节点 * 父流程阻塞在该节点
2. 子流程运行中: 2. 子流程运行中:
@ -233,25 +295,11 @@ nodes:
3. 子流程完成: 3. 子流程完成:
* 根据 output mapping 合并 ctx * 根据 output_ctx 合并 ctx
* 父流程继续流转 * 父流程继续流转
--- ---
### 5.3 input / output mapping 原则
* 子流程只能看到显式声明的输入
* 父流程只接收显式声明的输出
* ctx 不做隐式全量共享
这样可以保证:
* 数据边界清晰
* 子流程可复用
* 行为可审计
---
## 6. 执行模型 ## 6. 执行模型
### 6.1 推进方式 ### 6.1 推进方式
@ -298,7 +346,6 @@ status = finished
* 失败补偿 * 失败补偿
* 超时 / 重试 * 超时 / 重试
* 表达式安全沙箱 * 表达式安全沙箱
* 人工任务
--- ---

15
SKILL.md Normal file
View File

@ -0,0 +1,15 @@
---
name: workflow-builder
description: 根据用户输入创建工作流
---
# workflow-builder
为用户输入的任务创建工作流, 生成dagflow能使用的工作流并将工作流插入到数据库中
## 流程引擎
流程引擎具体说明请看[README.md](README.md)
## 新建流程
用用户输入的剔除选择厂家的内容后的文本作为文生视频的输入文本来生成视频,使用方式:
将用户输入拆解为流程引擎添加流程的json数据然后调用以下函数
```
CALL{ "function": "add_new_workflow", "params":{"name":流程名字,"dsl":YAML格式的符合上述流程规范的流程定义}
```

View File

@ -13,6 +13,7 @@ Features:
""" """
import asyncio import asyncio
from appPublic.log import debug from appPublic.log import debug
from appPublic.timeUtils import timestampstr
from sqlor.dbpools import get_sor_context from sqlor.dbpools import get_sor_context
# --------------------------------------------------------------------- # ---------------------------------------------------------------------
@ -55,6 +56,8 @@ NODE_EXECUTION_TABLE = {
{"name": "output_ctx", "type": "text"}, {"name": "output_ctx", "type": "text"},
{"name": "status", "type": "str", "length": 16}, {"name": "status", "type": "str", "length": 16},
{"name": "error", "type": "text"}, {"name": "error", "type": "text"},
{"name": "running_at", "type": "timestamp"},
{"name": "stopping_at", "type": "timestamp"},
{"name": "created_at", "type": "timestamp"} {"name": "created_at", "type": "timestamp"}
] ]
} }
@ -64,8 +67,8 @@ SUBFLOW_INSTANCE_TABLE = {
"fields": [ "fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"}, {"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "type": "str", "length": 32}, {"name": "org_id", "type": "str", "length": 32},
{"name": "parent_instance_id", "type": "str", "length": 32}, {"name": "instance_id", "type": "str", "length": 32},
{"name": "parent_node_id", "type": "str", "length": 64}, {"name": "node_id", "type": "str", "length": 64},
{"name": "child_instance_id", "type": "str", "length": 32}, {"name": "child_instance_id", "type": "str", "length": 32},
{"name": "status", "type": "str", "length": 16}, {"name": "status", "type": "str", "length": 16},
{"name": "created_at", "type": "timestamp"} {"name": "created_at", "type": "timestamp"}
@ -97,7 +100,26 @@ HUMAN_TASK_TABLE = {
import yaml import yaml
import json import json
from ahserver.serverenv import ServerEnv from ahserver.serverenv import ServerEnv
import re
def extract_some_value(s):
"""
'some(100)' 'some(50%)' 中提取数值
Args:
s (str): 输入的字符串
Returns: float类型值 小于1的小数点数字%模式或其他
"""
# 正则解释: 匹配 some( 后跟数字,然后是可选的 % 符号,最后是 )
match = re.search(r'some\((\d+)(%)?\)', s)
if match:
value = float(match.group(1)) # 提取数字部分并转为整数
unit = match.group(2) # 提取单位部分 (%)
if unit == '%':
return value / 100.0
return value
else:
return None
class FlowEngine: class FlowEngine:
@ -201,24 +223,37 @@ class FlowEngine:
}) })
next_nodes.add(node_id) next_nodes.add(node_id)
continue continue
if ntype == 'task':
if node.get('join')
await sor.C('node_execution', {
'id': env.uuid(),
'org_id': org_id,
'instance_id': instance_id,
'node_id': node_id,
'input_ctx': json.dumps(ctx),
'status': 'pendding'
})
# ---- 如果节点状态在pending状态 ---
if node['status'] == 'pending' and ntype != 'human':
is_ok = await self.is_ok_to_start(sor, node, inst)
if is_ok:
await self.start_running(sor, node)
continue
# -------- Normal node -------- # -------- Normal node --------
"""
await sor.C('node_execution', { await sor.C('node_execution', {
'id': env.uuid(), 'id': env.uuid(),
'org_id': org_id, 'org_id': org_id,
'instance_id': instance_id, 'instance_id': instance_id,
'node_id': node_id, 'node_id': node_id,
'input_ctx': json.dumps(ctx), 'input_ctx': json.dumps(ctx),
'status': 'success' 'status': 'pendding'
}) })
"""
for edge in dsl.get('edges', []): if nstatus == 'done' or nstatus == 'failed':
if edge['from'] != node_id: n_nodes = await self.node_transfer(node, dsl)
continue next_nodes += n_nodes
cond = edge.get('when')
if cond and not eval(cond, {}, {'ctx': ctx}):
continue
next_nodes.add(edge['to'])
if next_nodes and all(dsl['nodes'][n]['type'] == 'end' for n in next_nodes): if next_nodes and all(dsl['nodes'][n]['type'] == 'end' for n in next_nodes):
await sor.U('flow_instance', { await sor.U('flow_instance', {
@ -236,6 +271,93 @@ class FlowEngine:
# Human task APIs # Human task APIs
# ------------------------------ # ------------------------------
async def node_transfer(self, node):
if node['status'] not in ['done', 'failed', 'cancel']:
return
nnode = []
for edge in dsl.get('edges', []):
if edge['from'] != node_id:
continue
cond = edge.get('when')
if cond and not eval(cond, {}, {'ctx': ctx}):
continue
m_on = edge.get('multiple_on')
ns = {
'm_on': m_on,
'ctx': ctx,
'node': node
}
on_array = await self.get_multiple_array(m_on):
for e in on_array:
nnodes.add((edge['to'], str(e)))
return nnodes
async def is_ok_to_start(self, sor, node, edges, inst):
join = node.get('join')
if not join:
return True
if join == 'any':
return True
backnodes = [e['from'] for e in edges if e['to'] == node['id']]
sql = """select * from node_execution
where node_id in ${backnodes}$
and instance_id = ${instance_id}$"""
recs = sor.sqlExe(sql, {
'backnodes': backnodes,
'instance_id': inst['id']
})
if join == 'all':
for r in recs:
if r.status in ['pending', 'running']:
return False
return True
if join == 'xor':
backnodes = [id for id in backnodes if id != node['id']]
sql = """update from node_execution
set status = 'cancelled'
where status in ['pending', 'running']
and node_id in ${backnodes}$
and instance_id = ${instance_id}$"""
await sor.sqlExe(sql, {
'backnodes': backnodes,
'instance_id': inst['id']
})
return True
some = extract_some_value(join):
sql = """select * from node_transfer
where instance_id=${instance_id}$
and to_node_id = ${node_id}$"""
trecs = sor.sqlExe(sql, {
'node_id': node['id'],
'instance_id': inst['id']
})
if some >= 1 and some <= len(trecs):
return True
if some <= float(len(trecs))/float(len(recs)):
return True
return False
async def start_running(self, sor, node, edges, inst):
if self.is_ok_to_start(sor, node, edges, inst):
await sor.R('node_execution', {
'id': node['id'],
'status': 'running'
'running_time': })
try:
out = await self.run_auto_task(node)
sor.U('node_execution', {
'id': node['id'],
'status': 'done',
'stopping_at': timestampstr(),
'output_ctx': json.dumps(out, ensure_ascii=False)
})
except Exception as e:
sor.U('node_execution', {
'id': node['id'],
'status': 'failed',
'stopping_at': timestampstr()
}
async def list_human_tasks(self, org_id, user_roles): async def list_human_tasks(self, org_id, user_roles):
env = ServerEnv() env = ServerEnv()
async with get_sor_context(env, 'dagflow') as sor: async with get_sor_context(env, 'dagflow') as sor:
@ -278,3 +400,6 @@ class FlowEngine:
'ctx': json.dumps(ctx) 'ctx': json.dumps(ctx)
}) })
async def add_new_workflow(request, params_kw={}):
arams_kw.name
params_kw.dsl