This commit is contained in:
yumoqing 2026-03-17 15:43:35 +08:00
parent c59e2dbaae
commit c12553990b

View File

@ -285,20 +285,12 @@ where backid=${backid}$
node['id'] = node_id
ntype = node['type']
status = ne.status
if status == 'pending':
if ntype == 'human':
if ne.assignee is None:
await self.auto_assign(sor, inst, ne, node)
elif ntype == 'task':
await self.try_start_running(sor, inst, ne, node)
elif ntype == 'subflow':
await self.try_start_subflow(sor, inst, ne, node)
elif ntype == 'start':
await self.node_transfer(sor, dsl, inst, ne, node, flow_def)
if ntype == 'start':
await sor.U('node_execution', {
'id': ne.id,
'status': 'completed'
})
await self.node_transfer(sor, dsl, inst, ne, node, flow_def)
elif ntype == 'end':
flg = await self.is_ok_to_step_next(sor, node, edges, inst)
if flg:
@ -311,20 +303,40 @@ where backid=${backid}$
'status': 'completed'
})
break;
elif status in ['done', 'cancelled', 'failed']:
else:
if status == 'pending':
if ntype == 'human':
if ne.assignee is None:
await self.auto_assign(sor, inst, ne, node)
elif ntype == 'task':
flg= self.is_ok_to_step_next(sor, node, edges, inst)
if flg:
await self.try_start_running(sor, inst, ne, node)
elif ntype == 'subflow':
flg= self.is_ok_to_step_next(sor, node, edges, inst)
if flg:
await self.try_start_subflow(sor, inst, ne, node)
elif status in ['done', 'cancelled']:
debug(f'节点完成,检查后续节点')
await self.node_transfer(sor, dsl, inst, ne, node, flow_def)
await sor.U('node_execution', {
'id': ne.id,
'status': 'completed'
})
await self.check_instance_completed(sor, inst)
elif status == 'failed':
debug(f'节点执行失败,需要运维人员参与')
async def run_auto_task(self, ne):
env = ServerEnv()
users = get_org_users(ne.orgid)
if not users:
debug(f'{orgid=} 没有用户')
return
# ------------------------------
# Human task APIs
# ------------------------------
async def auto_assign(self, sor, inst, ne, node):
recs = await env.get_org_users(sor, inst.org_id, ne.role)
if recs:
@ -334,11 +346,7 @@ where backid=${backid}$
'assignee': recs[i].id
})
async def check_instance_completed(self, sor, inst):
sql = "select * from node_execution where instance_id=${instance_id} and type='end'"
recs = await sor.sqlExe(sql, {'instance_id': inst.id})
if recs:
if recs[0].status == 'completed':
async def instance_completed(self, sor, inst):
await sor.U('flow_instance', {
'id': inst.id,
'status': 'completed'
@ -455,7 +463,7 @@ where node_id in ${backnodes}$
})
if join == 'all':
for r in recs:
if r.status in ['pending', 'running']:
if r.status != 'completed':
return False
return True
if join == 'xor':
@ -484,25 +492,17 @@ where instance_id=${instance_id}$
return True
return False
async def start_running(self, sor, node, edges, inst):
flg= self.is_ok_to_step_next(sor, node, edges, inst)
if flg:
async def start_running(self, sor, ne, edges, inst):
await sor.R('node_execution', {
'id': node['id'],
'id': ne['id'],
'status': 'running',
'running_time': timestampstr()
})
try:
out = await self.run_auto_task(node)
await sor.U('node_execution', {
'id': node['id'],
'status': 'done',
'stopping_at': timestampstr(),
'output_ctx': json.dumps(out, ensure_ascii=False)
})
out = await self.run_auto_task(ne)
except Exception as e:
await sor.U('node_execution', {
'id': node['id'],
'id': ne['id'],
'status': 'failed',
'stopping_at': timestampstr()
})