From 61f7e03b89efcd0a284edc27f54ea9e85f1f210a Mon Sep 17 00:00:00 2001 From: yumoqing Date: Mon, 16 Mar 2026 17:06:14 +0800 Subject: [PATCH] bugfix --- dagflow/dagflow.py | 71 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 56 insertions(+), 15 deletions(-) diff --git a/dagflow/dagflow.py b/dagflow/dagflow.py index 0832c35..ac6d4d0 100644 --- a/dagflow/dagflow.py +++ b/dagflow/dagflow.py @@ -17,6 +17,40 @@ from random import randint from appPublic.log import debug, exception, error, info from appPublic.timeUtils import timestampstr from sqlor.dbpools import get_sor_context +from asteval import Interpreter, NameNotFound + +# 1. 创建一个全局的 Interpreter 实例(复用以提高性能) +aeval = Interpreter() + +# 2. 在你的 node_transfer 或条件判断逻辑中 +def safe_eval_condition(cond_str, ctx_dict): + """ + 安全地评估条件表达式 + Args: + cond_str: 表达式字符串,例如 "ctx['age'] > 18 and ctx['score'] >= 60" + ctx_dict: 上下文数据字典 + Returns: None 错误或的确是None + 其他: 表达式结果 + """ + try: + # 将 ctx 注入到 asteval 的符号表中 + aeval.symtable['ctx'] = ctx_dict + + # 执行表达式 + # mode='eval' 表示只允许单个表达式,不允许语句(如赋值、import) + result = aeval(cond_str, mode='eval') + + # 检查是否有语法错误或运行时错误 + if aeval.error: + print(f"asteval Error: {aeval.error}") + return False + + return bool(result) + + except Exception as e: + # asteval 通常很安全,但以防万一 + print(f"Eval Exception: {e}") + return False # --------------------------------------------------------------------- # Table definitions @@ -311,21 +345,27 @@ where backid=${backid}$ async def node_transfer(self, sor, dsl, inst, ne, node, flow_def): nnode = [] + ctx = inst.ctx.copy() for edge in dsl.get('edges', []): if edge['from'] != ne.node_id: continue cond = edge.get('when') - if cond and not eval(cond, {}, {'ctx': ctx}): + if cond and not safe_eval_condition(cond, {'ctx': ctx}): continue m_on = edge.get('foreach') - ns = { - 'm_on': m_on, - 'ctx': ctx, - 'node': node - } - on_array = await self.get_multiple_array(m_on) - for e in on_array: - nnodes.add((edge['to'], str(e))) + if m_on: + ns = DictObject(**{ + 'sor': sor, + 'm_on': m_on, + 'ctx': ctx, + 'node': node + }) + on_array = safe_eval_condition(m_on, ctx) + for e in on_array: + nnodes.add((edge['to'], str(e))) + else: + nnodes.add(edge['to'], None) + for node, ctx_ext in nnodes: x = await self.is_ok_to_create_new_node_exe(sor, node, edges, inst) if x: @@ -400,7 +440,7 @@ where backid=${backid}$ sql = """select * from node_execution where node_id in ${backnodes}$ and instance_id = ${instance_id}$""" - recs = sor.sqlExe(sql, { + recs = await sor.sqlExe(sql, { 'backnodes': backnodes, 'instance_id': inst['id'] }) @@ -425,7 +465,7 @@ where status in ['pending', 'running'] sql = """select * from node_transfer where instance_id=${instance_id}$ and to_node_id = ${node_id}$""" - trecs = sor.sqlExe(sql, { + trecs = await sor.sqlExe(sql, { 'node_id': node['id'], 'instance_id': inst['id'] }) @@ -436,7 +476,8 @@ where instance_id=${instance_id}$ return False async def start_running(self, sor, node, edges, inst): - if self.s_ok_to_step_next(sor, node, edges, inst): + flg= self.is_ok_to_step_next(sor, node, edges, inst) + if flg: await sor.R('node_execution', { 'id': node['id'], 'status': 'running', @@ -444,14 +485,14 @@ where instance_id=${instance_id}$ }) try: out = await self.run_auto_task(node) - sor.U('node_execution', { + await sor.U('node_execution', { 'id': node['id'], 'status': 'done', 'stopping_at': timestampstr(), 'output_ctx': json.dumps(out, ensure_ascii=False) }) except Exception as e: - sor.U('node_execution', { + await sor.U('node_execution', { 'id': node['id'], 'status': 'failed', 'stopping_at': timestampstr() @@ -531,7 +572,7 @@ async def get_my_flow_works(request, inst_id=None): async with get_sor_context(env, 'dagflow') as sor: sql = """select * from node_execute -where a.type = 'human' +where type = 'human' and status in ['pending', 'running'] and (assignee = ${userid}$ or (assignee is NULL and role in ${myroles}$))""" if inst_id: