diff --git a/dagflow/dagflow.py b/dagflow/dagflow.py index 862c9fd..119ed6e 100644 --- a/dagflow/dagflow.py +++ b/dagflow/dagflow.py @@ -312,11 +312,11 @@ where backid=${backid}$ elif ntype == 'task': flg= self.is_ok_to_step_next(sor, node, edges, inst) if flg: - await self.try_start_running(sor, inst, ne, node) + await self.start_running(sor, inst, ne, node) elif ntype == 'subflow': flg= self.is_ok_to_step_next(sor, node, edges, inst) if flg: - await self.try_start_subflow(sor, inst, ne, node) + await self.start_subflow(sor, inst, ne, node) elif status in ['done', 'cancelled']: debug(f'节点完成,检查后续节点') await self.node_transfer(sor, dsl, inst, ne, node, flow_def) @@ -328,13 +328,42 @@ where backid=${backid}$ debug(f'节点执行失败,需要运维人员参与') + def build_node_input_ctx(self, inst, ne): + ctx = json.loads(ne.ctx) + if ne.ctx_ext: + ctx_ext = json.loads(ns.ctx_ext) + return f'{ctx}\n附加内容:\n{ctx_ext}' + return ctx + + async def start_subflow(self, sor, inst, ne, node): + subflow_id = node.get('subflow_id') + if subflow_id is None: + debug(f'{node=} 没有subflow_id属性') + return + sub_ctx = self.build_node_input_ctx(inst, ne) + ne.subinst_id = await self.create_instance( + ne.org_id, + subflow_id, ctx=sub_ctx) + ne.status = 'running' + ne.running_at = timestampstr() + await sor.U('node_execution', ne.copy()) + async def run_auto_task(self, ne): env = ServerEnv() users = get_org_users(ne.orgid) if not users: debug(f'{orgid=} 没有用户') return - + sub_ctx = self.build_node_input_ctx(inst, ne) + try: + f = partial(self.task_success_callback, ne.id) + ef = partial(self.task_error_callback, ne.id) + params = { + prompt: sub_ctx, + callback:f, + errback: ef + } + asyncio.create_task(env.run_skillagent, params) # ------------------------------ # Human task APIs # ------------------------------ @@ -493,6 +522,52 @@ where instance_id=${instance_id}$ return True return False + async def task_error_callback(self, ne_id, errmsg): + env = Server() + async with get_sor_context(env, 'dagflow') as sor: + recs = await sor.R('node_execution', {'id': ne_id}) + if not recs: + debug(f'{ne_id=} 没找到node_execution记录') + return + ne = recs[0] + ne.status = 'failed', + ne.output = errmsg + await sor.U('node_execution', ne) + + async def task_success_callback(self, ne_id, result): + env = ServerEnv() + async with get_sor_context(env, 'dagflow') as sor: + recs = await sor.R('node_execution', {'id': ne_id}) + if not recs: + debug(f'{ne_id=} 没找到node_execution记录') + return + ne = recs[0] + inst_id = ne.instance_id + insts = await sor.R('flow_instance', {'id': inst_id}) + if insts + debug(f'{inst_id=} 没找到flow_instance记录') + return + inst = insts[0] + ne.output = result if result is None else \ + result if isinstance(result, str) else \ + json.dumps(result, ensure_ascii=False) + + ne.status = 'done' + if result: + data = { + 'output': result + } + tmpl = env.template_engine.renders(ne.output_ctx, data) + ne.output = tmpl + d = json.loads(tmpl) + jctx = json.loads(inst.ctx) + jctx.update(d) + inst.ctx = json.dumps(ctx, ensure_ascii=False) + await sor.U('flow_instance', inst) + else: + ne.output = None + await sor.U('node_execution', ne) + async def start_running(self, sor, ne, edges, inst): await sor.R('node_execution', { 'id': ne['id'], diff --git a/dagflow/init.py b/dagflow/init.py index 9795923..a05179f 100644 --- a/dagflow/init.py +++ b/dagflow/init.py @@ -1,8 +1,10 @@ import asyncio from functools import partial from appPublic.jsonConfig import getConfig +from appPublic.log import debug, exception, error from ahserver.serverenv import ServerEnv from ahserver.configuredServer import add_cleanupctx +from random import randint from .dagflow import ( FlowEngine, get_org_flow_definition, @@ -13,6 +15,19 @@ from .dagflow import ( add_new_workflow ) +async def run_skillagent(params): + i = randint(0,10) + await asyncio.sleep(i) + if i<= 7: + f = params['callback'] + data = {'data1': 'test'} + debug(f'任务模拟完成,返回数据={data}') + f(data) + else: + f = params['errback'] + message = '任务模拟失败错误信息' + f(message) + async def dagbacktask(engines, app): tasks = [] for e in engines: @@ -40,3 +55,4 @@ def load_dagflow(): env.get_org_flow_definition = get_org_flow_definition env.get_my_flow_works = get_my_flow_works env.list_org_instances = list_org_instances + env.run_skillagent = run_skillagent