diff --git a/dagflow/dagflow.py b/dagflow/dagflow.py index c482348..d586dd3 100644 --- a/dagflow/dagflow.py +++ b/dagflow/dagflow.py @@ -358,7 +358,18 @@ where backid=${backid}$ ne.running_at = timestampstr() await sor.U('node_execution', ne.copy()) + async def autorun_task(self, ne_id, params): + env = ServerEnv() + try: + data = await env.run_skillagent(params) + debug(f'任务完成,返回数据={data}') + self.self.task_success_callback(ne_id, data) + except Exception as e: + message = f'任务失败错误信息{e}' + self.self.task_error_callback(ne_id, message) + async def run_auto_task(self, ne): + async def run_task( env = ServerEnv() users = await env.get_org_users(ne.org_id) if not users: @@ -366,16 +377,12 @@ where backid=${backid}$ return sub_ctx = self.build_node_input_ctx(ne) try: - f = partial(self.task_success_callback, ne.id) - ef = partial(self.task_error_callback, ne.id) params = DictObject(**{ "prompt": sub_ctx, "callerid": users[0].id, - "callerorgid": users[0].orgid, - "callback":f, - "errback": ef + "callerorgid": users[0].orgid }) - asyncio.create_task(env.run_skillagent(params)) + asyncio.create_task(self.autorun_task(ne_id, params)) except Exception as e: await self.task_error_callback(ne.id, str(e)) # ------------------------------ diff --git a/dagflow/init.py b/dagflow/init.py index 14dabbf..4e87d1d 100644 --- a/dagflow/init.py +++ b/dagflow/init.py @@ -22,11 +22,10 @@ async def run_skillagent(params): f = params['callback'] data = {'data1': 'test'} debug(f'任务模拟完成,返回数据={data}') - f(data) + return data else: f = params['errback'] - message = '任务模拟失败错误信息' - f(message) + raise Exception('伪造的错误信息') async def dagbacktask(engines, app): tasks = []