This commit is contained in:
yumoqing 2026-03-16 17:06:14 +08:00
parent 1c11a995d7
commit 61f7e03b89

View File

@ -17,6 +17,40 @@ from random import randint
from appPublic.log import debug, exception, error, info
from appPublic.timeUtils import timestampstr
from sqlor.dbpools import get_sor_context
from asteval import Interpreter, NameNotFound
# 1. 创建一个全局的 Interpreter 实例(复用以提高性能)
aeval = Interpreter()
# 2. 在你的 node_transfer 或条件判断逻辑中
def safe_eval_condition(cond_str, ctx_dict):
"""
安全地评估条件表达式
Args:
cond_str: 表达式字符串例如 "ctx['age'] > 18 and ctx['score'] >= 60"
ctx_dict: 上下文数据字典
Returns: None 错误或的确是None
其他: 表达式结果
"""
try:
# 将 ctx 注入到 asteval 的符号表中
aeval.symtable['ctx'] = ctx_dict
# 执行表达式
# mode='eval' 表示只允许单个表达式不允许语句如赋值、import
result = aeval(cond_str, mode='eval')
# 检查是否有语法错误或运行时错误
if aeval.error:
print(f"asteval Error: {aeval.error}")
return False
return bool(result)
except Exception as e:
# asteval 通常很安全,但以防万一
print(f"Eval Exception: {e}")
return False
# ---------------------------------------------------------------------
# Table definitions
@ -311,21 +345,27 @@ where backid=${backid}$
async def node_transfer(self, sor, dsl, inst, ne, node, flow_def):
nnode = []
ctx = inst.ctx.copy()
for edge in dsl.get('edges', []):
if edge['from'] != ne.node_id:
continue
cond = edge.get('when')
if cond and not eval(cond, {}, {'ctx': ctx}):
if cond and not safe_eval_condition(cond, {'ctx': ctx}):
continue
m_on = edge.get('foreach')
ns = {
if m_on:
ns = DictObject(**{
'sor': sor,
'm_on': m_on,
'ctx': ctx,
'node': node
}
on_array = await self.get_multiple_array(m_on)
})
on_array = safe_eval_condition(m_on, ctx)
for e in on_array:
nnodes.add((edge['to'], str(e)))
else:
nnodes.add(edge['to'], None)
for node, ctx_ext in nnodes:
x = await self.is_ok_to_create_new_node_exe(sor, node, edges, inst)
if x:
@ -400,7 +440,7 @@ where backid=${backid}$
sql = """select * from node_execution
where node_id in ${backnodes}$
and instance_id = ${instance_id}$"""
recs = sor.sqlExe(sql, {
recs = await sor.sqlExe(sql, {
'backnodes': backnodes,
'instance_id': inst['id']
})
@ -425,7 +465,7 @@ where status in ['pending', 'running']
sql = """select * from node_transfer
where instance_id=${instance_id}$
and to_node_id = ${node_id}$"""
trecs = sor.sqlExe(sql, {
trecs = await sor.sqlExe(sql, {
'node_id': node['id'],
'instance_id': inst['id']
})
@ -436,7 +476,8 @@ where instance_id=${instance_id}$
return False
async def start_running(self, sor, node, edges, inst):
if self.s_ok_to_step_next(sor, node, edges, inst):
flg= self.is_ok_to_step_next(sor, node, edges, inst)
if flg:
await sor.R('node_execution', {
'id': node['id'],
'status': 'running',
@ -444,14 +485,14 @@ where instance_id=${instance_id}$
})
try:
out = await self.run_auto_task(node)
sor.U('node_execution', {
await sor.U('node_execution', {
'id': node['id'],
'status': 'done',
'stopping_at': timestampstr(),
'output_ctx': json.dumps(out, ensure_ascii=False)
})
except Exception as e:
sor.U('node_execution', {
await sor.U('node_execution', {
'id': node['id'],
'status': 'failed',
'stopping_at': timestampstr()
@ -531,7 +572,7 @@ async def get_my_flow_works(request, inst_id=None):
async with get_sor_context(env, 'dagflow') as sor:
sql = """select *
from node_execute
where a.type = 'human'
where type = 'human'
and status in ['pending', 'running']
and (assignee = ${userid}$ or (assignee is NULL and role in ${myroles}$))"""
if inst_id: