diff --git a/dagflow/dagflow.py b/dagflow/dagflow.py index 8ccd18c..fe6ab90 100644 --- a/dagflow/dagflow.py +++ b/dagflow/dagflow.py @@ -285,46 +285,58 @@ where backid=${backid}$ node['id'] = node_id ntype = node['type'] status = ne.status - if status == 'pending': - if ntype == 'human': - if ne.assignee is None: - await self.auto_assign(sor, inst, ne, node) - elif ntype == 'task': - await self.try_start_running(sor, inst, ne, node) - elif ntype == 'subflow': - await self.try_start_subflow(sor, inst, ne, node) - elif ntype == 'start': + if ntype == 'start': + await sor.U('node_execution', { + 'id': ne.id, + 'status': 'completed' + }) + await self.node_transfer(sor, dsl, inst, ne, node, flow_def) + elif ntype == 'end': + flg = await self.is_ok_to_step_next(sor, node, edges, inst) + if flg: + await sor.U('node_execution', { + 'id': ne.id, + 'status': 'completed' + }) + await sor.U('flow_instance', { + 'id': inst.id, + 'status': 'completed' + }) + break; + else: + if status == 'pending': + if ntype == 'human': + if ne.assignee is None: + await self.auto_assign(sor, inst, ne, node) + elif ntype == 'task': + flg= self.is_ok_to_step_next(sor, node, edges, inst) + if flg: + await self.try_start_running(sor, inst, ne, node) + elif ntype == 'subflow': + flg= self.is_ok_to_step_next(sor, node, edges, inst) + if flg: + await self.try_start_subflow(sor, inst, ne, node) + elif status in ['done', 'cancelled']: + debug(f'节点完成,检查后续节点') await self.node_transfer(sor, dsl, inst, ne, node, flow_def) await sor.U('node_execution', { 'id': ne.id, 'status': 'completed' }) - elif ntype == 'end': - flg = await self.is_ok_to_step_next(sor, node, edges, inst) - if flg: - await sor.U('node_execution', { - 'id': ne.id, - 'status': 'completed' - }) - await sor.U('flow_instance', { - 'id': inst.id, - 'status': 'completed' - }) - break; - elif status in ['done', 'cancelled', 'failed']: - debug(f'节点完成,检查后续节点') - await self.node_transfer(sor, dsl, inst, ne, node, flow_def) - await sor.U('node_execution', { - 'id': ne.id, - 'status': 'completed' - }) - await self.check_instance_completed(sor, inst) + elif status == 'failed': + debug(f'节点执行失败,需要运维人员参与') + async def run_auto_task(self, ne): + env = ServerEnv() + users = get_org_users(ne.orgid) + if not users: + debug(f'{orgid=} 没有用户') + return + # ------------------------------ # Human task APIs # ------------------------------ - async def auto_assign(self, sor, inst, ne, node): recs = await env.get_org_users(sor, inst.org_id, ne.role) if recs: @@ -334,15 +346,11 @@ where backid=${backid}$ 'assignee': recs[i].id }) - async def check_instance_completed(self, sor, inst): - sql = "select * from node_execution where instance_id=${instance_id} and type='end'" - recs = await sor.sqlExe(sql, {'instance_id': inst.id}) - if recs: - if recs[0].status == 'completed': - await sor.U('flow_instance', { - 'id': inst.id, - 'status': 'completed' - }) + async def instance_completed(self, sor, inst): + await sor.U('flow_instance', { + 'id': inst.id, + 'status': 'completed' + }) async def node_transfer(self, sor, dsl, inst, ne, node, flow_def): nnodes = [] @@ -455,7 +463,7 @@ where node_id in ${backnodes}$ }) if join == 'all': for r in recs: - if r.status in ['pending', 'running']: + if r.status != 'completed': return False return True if join == 'xor': @@ -484,28 +492,20 @@ where instance_id=${instance_id}$ return True return False - async def start_running(self, sor, node, edges, inst): - flg= self.is_ok_to_step_next(sor, node, edges, inst) - if flg: - await sor.R('node_execution', { - 'id': node['id'], - 'status': 'running', - 'running_time': timestampstr() + async def start_running(self, sor, ne, edges, inst): + await sor.R('node_execution', { + 'id': ne['id'], + 'status': 'running', + 'running_time': timestampstr() + }) + try: + out = await self.run_auto_task(ne) + except Exception as e: + await sor.U('node_execution', { + 'id': ne['id'], + 'status': 'failed', + 'stopping_at': timestampstr() }) - try: - out = await self.run_auto_task(node) - await sor.U('node_execution', { - 'id': node['id'], - 'status': 'done', - 'stopping_at': timestampstr(), - 'output_ctx': json.dumps(out, ensure_ascii=False) - }) - except Exception as e: - await sor.U('node_execution', { - 'id': node['id'], - 'status': 'failed', - 'stopping_at': timestampstr() - }) async def list_human_tasks(self, org_id, user_roles): env = ServerEnv()