This commit is contained in:
yumoqing 2026-03-12 18:05:08 +08:00
parent 172b7115cb
commit 34ac11fa71
6 changed files with 190 additions and 142 deletions

View File

@ -127,12 +127,13 @@ output_ctx使用节点执行结果的字典数据作为数据来渲染结果u
* done 执行成功状态
* failed 执行失败状态
* cancelled 取消状态
* completed 处理完成
状态变化:
* pending 转running 如果节点没有join属性在下个循环直接转running 并调用节点执行方法,记录开始执行时间
* pending 转running 如果节点有join属性检查join条件满足条件就转执行否则等条件满足 没有join属性则直接转running 并调用节点执行方法,记录开始执行时间, 手工节点则由人工点击工作开始转换
* running zhuan done 节点执行成功,并记录完成时间
* running 转 failed 节点执行失败, 并记录完成时间
* pending 转 cancel 取消执行(手工或流程实例被取消)
* done cancelled, failed 转 completed 检查从节点出发的边,做节点转移
---
### 3.4 subflow_instance

View File

@ -12,6 +12,7 @@ Features:
- Query APIs for UI
"""
import asyncio
from random import randint
from appPublic.log import debug
from appPublic.timeUtils import timestampstr
from sqlor.dbpools import get_sor_context
@ -134,25 +135,28 @@ class FlowEngine:
# ------------------------------
# Definition / Instance
# ------------------------------
def __init__(self):
def __init__(self, backid):
self.aio = []
self.backid = backid
async def bgtask(self):
env = ServerEnv()
self.aio = []
cnt = 0
while True:
aio = []
async with get_sor_context(env, 'dagflow') as sor:
self.aio = await sor.R('flow_instance', {'status': 'running'})
if cnt >= 60:
debug(f'{self.aio=}')
cnt = 0
for r in self.aio:
self.step(r.org_id, r.id)
await asyncio.sleep(2)
cnt += 1
sql = """select * from flow_instance
where backid=${backid}$
and status = 'running'"""
aio = await sor.sqlExe(sql,{
'backid': self.backid
})
for r in aio:
self.step(r)
await asyncio.sleep(1)
async def create_definition(self, org_id, name, version, dsl_text):
async def create_definition(self, org_id, name, description, version, dsl_text, ctxfields):
env = ServerEnv()
async with get_sor_context(env, 'dagflow') as sor:
fid = env.uuid()
@ -160,7 +164,10 @@ class FlowEngine:
'id': fid,
'org_id': org_id,
'name': name,
'description': description,
'version': version,
'ctxfields': json.dumps(ctxfields),
'created_at': timestampstr(),
'dsl': dsl_text
})
return fid
@ -169,21 +176,51 @@ class FlowEngine:
env = ServerEnv()
async with get_sor_context(env, 'dagflow') as sor:
iid = env.uuid()
await sor.C('flow_instance', {
ns = {
'id': iid,
'backid': self.backid,
'round': 1,
'org_id': org_id,
'flow_def_id': flow_def_id,
'status': 'running',
'ctx': json.dumps(ctx or {}),
'active_nodes': json.dumps([])
'created_at': timestampstr()
})
await sor.C('flow_instance', ns.copy())
await self.create_start_node_execution(sor, ns)
return iid
async def create_start_node_execution(self, sor, inst):
env = ServerEnv()
id = env.uuid()
await sor.C('node_execution', {
'id': id,
'type': 'start',
'instance_id': inst['id'],
'org_id': inst['org_id'],
'inst_round': inst['round'],
'status': 'done',
'node_id': 'start',
'created_at': timestampstr()
})
async def create_end_node_execution(self, sor, inst):
env = ServerEnv()
id = env.uuid()
await sor.C('node_execution', {
'id': id,
'type': 'end',
'instance_id': inst['id'],
'org_id': inst['org_id'],
'inst_round': inst['round'],
'status': 'pending',
'node_id': 'end',
'created_at': timestampstr()
})
# ------------------------------
# Execution
# ------------------------------
async def step(self, org_id, instance_id):
async def step(self, inst):
"""
从节点执行表中取出所有状态不是结束的执行节点检查是否可以推进一步
结束状态为
@ -193,17 +230,6 @@ class FlowEngine:
推进如果
env = ServerEnv()
async with get_sor_context(env, 'dagflow') as sor:
insts = await sor.R('flow_instance', {
'id': instance_id,
'org_id': org_id
})
if not insts:
return
inst = insts[0]
if inst['status'] != 'running':
return
flow_def = (await sor.R(
'flow_definition',
{'id': inst['flow_def_id']}
@ -213,85 +239,78 @@ class FlowEngine:
ctx = json.loads(inst['ctx'])
active = set(json.loads(inst['active_nodes']) or [dsl['start']])
next_nodes = set()
for node_id in active:
sql = "select * from node_execution where instance_id=${instance_id}$ and status != 'completed'"
active_nes = await sor.sqlExe(sql, {
'instance_id': inst.id
})
for ne in active_nes:
node_id = ne.node_id
node = dsl['nodes'][node_id]
node['id'] = node_id
ntype = node['type']
# -------- Human node --------
status = ne.status
if status == 'pending':
if ntype == 'human':
rows = await sor.R('human_task', {
'instance_id': instance_id,
'node_id': node_id,
'status': 'pending'
if ne.assignee is None:
await self.auto_assign(sor, inst, ne, node)
elif ntype == 'task':
await self.try_start_running(sor, inst, ne, node)
elif ntype == 'subflow':
await self.try_start_subflow(sor, inst, ne, node)
elif ntype == 'start':
await self.node_transfer(sor, inst, ne, node, flow_def)
await sor.U('node_execution', {
'id': ne.id,
'status': 'completed'
})
if not rows:
await sor.C('human_task', {
'id': env.uuid(),
'org_id': org_id,
'instance_id': instance_id,
'node_id': node_id,
'role': node.get('role'),
'status': 'pending',
'input': json.dumps(ctx),
'timeout_at': env.after(node.get('timeout', 0))
elif ntype == 'end':
flg = await self.is_ok_to_step_next(sor, node, edges, inst)
if flg:
await sor.U('node_execution', {
'id': ne.id,
'status': 'completed'
})
next_nodes.add(node_id)
continue
if ntype == 'task':
if node.get('join'):
await sor.C('node_execution', {
'id': env.uuid(),
'org_id': org_id,
'instance_id': instance_id,
'node_id': node_id,
'input_ctx': json.dumps(ctx),
'status': 'pendding'
})
# ---- 如果节点状态在pending状态 ---
if node['status'] == 'pending' and ntype != 'human':
is_ok = await self.is_ok_to_start(sor, node, inst)
if is_ok:
await self.start_running(sor, node)
continue
# -------- Normal node --------
"""
await sor.C('node_execution', {
'id': env.uuid(),
'org_id': org_id,
'instance_id': instance_id,
'node_id': node_id,
'input_ctx': json.dumps(ctx),
'status': 'pendding'
})
"""
if nstatus == 'done' or nstatus == 'failed':
n_nodes = await self.node_transfer(node, dsl)
next_nodes += n_nodes
if next_nodes and all(dsl['nodes'][n]['type'] == 'end' for n in next_nodes):
await sor.U('flow_instance', {
'id': instance_id,
'status': 'finished',
'active_nodes': json.dumps(list(next_nodes))
'id': inst.id,
'status': 'completed'
})
else:
await sor.U('flow_instance', {
'id': instance_id,
'active_nodes': json.dumps(list(next_nodes))
break;
elif status in ['done', 'cancelled', 'failed']:
await self.node_transfer(sor, inst, ne, node, flow_def)
await sor.U('node_execution', {
'id': ne.id,
'status': 'completed'
})
self.check_instance_completed(sor, inst)
# ------------------------------
# Human task APIs
# ------------------------------
async def node_transfer(self, node):
if node['status'] not in ['done', 'failed', 'cancel']:
return
async def auto_assign(self, sor, inst, ne, node):
recs = await env.get_org_users(sor, inst.org_id, ne.role)
if recs:
i = randint(len(recs) - 1)
await sor.U('node_execution', {
'id':ne.id,
'assignee': recs[i].id
})
async def check_instance_completed(self, sor, inst):
sql = "select * from node_execution where instance_id=${instance_id} and type='end'"
recs = await sor.sqlExe(sql, {'instance_id', inst.id})
if recs:
if recs[0].status == 'completed':
await sor.U('flow_instance', {
'id': inst.id,
'status': 'completed'
})
async def node_transfer(self, sor, dsl, inst, ne, node, flow_def):
nnode = []
for edge in dsl.get('edges', []):
if edge['from'] != node_id:
if edge['from'] != ne.node_id:
continue
cond = edge.get('when')
if cond and not eval(cond, {}, {'ctx': ctx}):
@ -305,7 +324,47 @@ class FlowEngine:
on_array = await self.get_multiple_array(m_on):
for e in on_array:
nnodes.add((edge['to'], str(e)))
return nnodes
for node, ctx_ext in nnodes:
x = await self.is_ok_to_create_new_node_exe(sor, node, edges, inst)
if x:
env = ServerEnv()
id = env.uuid()
ns = {
'id': id,
'type': node['type'],
'inst_round': inst.round,
'org_id': inst.org_id,
'node_id': node['id'],
'input_ctx': node.get('input_ctx'),
'output_ctx': node.get('output_ctx'),
'status': 'pending',
'ctx': json.dumps(inst.ctx.copy(), ensure_ascii=False),
'ctx_ext': json.dumps(ctx_ext.copy(), ensure_ascii=False),
'created_at': timestampstr()
}
if node['type'] == 'human':
ns.update({
'role': node.get('role')
})
elif node['type'] == 'task':
pass
elif node['type'] == 'subflow':
eng = env.template_engine
ctx = inst.ctx
if node.get('input_ctx'):
ctx = eng.renders(node['input_ctx'], ctx)
subinst_id = await self.create_instance(org_id, node.flow_def_id, ctx=ctx)
ns.update({
'subinst_id': subinst_id
})
elif node['type'] == 'end':
ns.update('
await self.create_end_node_execution(sor, inst)
await sor.C('node_execution', ns)
if node.join
async def is_ok_to_create_new_node_exe(self, sor, node, edges, inst):
join = node.get('join)
@ -331,7 +390,7 @@ class FlowEngine:
})
return id
async def is_ok_to_start(self, sor, node, edges, inst):
async def is_ok_to_step_next(self, sor, node, edges, inst):
join = node.get('join')
if not join:
return True
@ -439,28 +498,24 @@ where instance_id=${instance_id}$
'ctx': json.dumps(ctx)
})
def get_engine():
env = ServerEnv()
cnt = len(env.flow_engines)
id = randint(cnt-1)
return env.flow_engines[id]
async def add_new_workflow(request, params_kw={}):
name = params_kw.name
dsl = params_kw.dsl
env = request._run_ns
orgid = await env.get_userorgid()
async with get_sor_context(env, 'dagflow') as sor:
ns = {
'id': env.uuid(),
'name': name,
'dsl': dsl,
'version': '1',
'created_at': timestampstr(),
'org_id': orgid
}
await sor.C('dagflow_definition', ns)
ctxfileds = params_kw.ctxfields or []
description = params_kw.description
engine = get_engine()
id = await engine.create_definition(org_id, name, description, '1', dsl, ctxfields)
return {
'status': 'SUCCEEDED',
'data': {
'dagflow_id': ns['id']
'dagflow_id': id
}
}
return {
'status': 'FAILED',
'error': 'add workflow error'
}

View File

@ -1,18 +1,29 @@
import asyncio
from functools import partial
from appPublic.jsonConfig import getConfig
from ahserver.serverenv import ServerEnv
from ahserver.configuredServer import add_cleanupctx
from .dagflow import FlowEngine
async def dagbacktask(engine, app):
task = asyncio.create_task(engine.bgtask())
async def dagbacktask(engines, app):
tasks = []
for e in engines:
task = asyncio.create_task(e.bgtask())
tasks.append(task)
yield
for task in tasks:
task.cancel()
def load_dagflow():
engine = FlowEngine()
f = partial(dagbacktask, engine)
config = getConfig()
flow_engines = []
cnt = config.dagflow_job_cnt or 1
for i in range(cnt):
e = FlowEngine(i)
flow_engines.append(e)
f = partial(dagbacktask, flow_engines)
add_cleanupctx(f)
env = ServerEnv()
env.workflow_engine = engine
env.flow_engines = flow_engines

View File

@ -7,11 +7,12 @@
}],
"fields": [
{"name": "id", "title": "实例ID", "type": "str", "length": 32, "nullable": "no"},
{"name": "backid", "title": "后台id", "type": "short", "nullable": "no"},
{"name": "round", "title": "当前论次", "type": "short"},
{"name": "org_id", "title": "机构id", "type": "str", "length": 32, "nullable": "no"},
{"name": "flow_def_id", "title": "流程定义ID", "type": "str", "length": 32},
{"name": "status", "title": "状态", "type": "str", "length": 16},
{"name": "ctx", "title": "上下文(JSON)", "type": "text"},
{"name": "active_nodes", "title": "当前节点(JSON)", "type": "text"},
{"name": "created_at", "title": "创建时间", "type": "timestamp"}
],
"indexes": [

View File

@ -8,6 +8,7 @@
"fields": [
{"name": "id", "title": "执行ID", "type": "str", "length": 32, "nullable": "no"},
{"name": "type", "title": "节点类型", "type": "str", "length": 32},
{"name": "inst_round", "title": "执行论次", "type": "short"},
{"name": "org_id", "title": "机构id", "type": "str", "length": 32},
{"name": "instance_id", "title": "流程实例ID", "type": "str", "length": 32},
{"name": "node_id", "title": "节点ID", "type": "str", "length": 64},
@ -25,7 +26,6 @@
{"name": "running_at", "title": "执行时点", "type": "timestamp"},
{"name": "stopping_at", "title": "结束时点", "type": "timestamp"},
{"name": "created_at", "title": "执行时间", "type": "timestamp"}
{"name": "next_nodeid", "title": "下一节点id", "type", "str", "length": 32}
],
"indexes": [
{"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]}

View File

@ -1,20 +0,0 @@
{
"summary": [{
"name": "subflow_instance",
"title": "子流程实例",
"primary": "id",
"catelog": "relation"
}],
"fields": [
{"name": "id", "title": "子流程ID", "type": "str", "length": 32, "nullable": "no"},
{"name": "parent_instance_id", "title": "父流程实例ID", "type": "str", "length": 32},
{"name": "parent_node_id", "title": "父节点ID", "type": "str", "length": 64},
{"name": "child_instance_id", "title": "子流程实例ID", "type": "str", "length": 32},
{"name": "status", "title": "状态", "type": "str", "length": 16},
{"name": "created_at", "title": "创建时间", "type": "timestamp"}
],
"indexes": [
{"name": "idx_subflow_parent", "idxtype": "index", "idxfields": ["parent_instance_id"]},
{"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]}
]
}