dagflow/dagflow/dagflow.py
2026-03-16 17:08:39 +08:00

631 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
Workflow Engine v1.3 (Enterprise Edition)
========================================
Features:
- org_id multi-tenant isolation
- DAG workflow with persistence (sqlor)
- Subflow support
- Human task node
- RBAC-based task assignment
- Query APIs for UI
"""
import asyncio
import yaml
from random import randint
from appPublic.log import debug, exception, error, info
from appPublic.timeUtils import timestampstr
from sqlor.dbpools import get_sor_context
from asteval import Interpreter
# 1. 创建一个全局的 Interpreter 实例(复用以提高性能)
aeval = Interpreter()
# 2. 在你的 node_transfer 或条件判断逻辑中
def safe_eval_condition(cond_str, ctx_dict):
"""
安全地评估条件表达式
Args:
cond_str: 表达式字符串,例如 "ctx['age'] > 18 and ctx['score'] >= 60"
ctx_dict: 上下文数据字典
Returns: None 错误或的确是None
其他: 表达式结果
"""
try:
# 将 ctx 注入到 asteval 的符号表中
aeval.symtable['ctx'] = ctx_dict
# 执行表达式
# mode='eval' 表示只允许单个表达式不允许语句如赋值、import
result = aeval(cond_str, mode='eval')
# 检查是否有语法错误或运行时错误
if aeval.error:
print(f"asteval Error: {aeval.error}")
return False
return bool(result)
except Exception as e:
# asteval 通常很安全,但以防万一
print(f"Eval Exception: {e}")
return False
# ---------------------------------------------------------------------
# Table definitions
# ---------------------------------------------------------------------
FLOW_DEFINITION_TABLE = {
"summary": [{"name": "flow_definition", "primary": "id"}],
"fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "type": "str", "length": 32},
{"name": "name", "type": "str", "length": 128},
{"name": "version", "type": "str", "length": 32},
{"name": "dsl", "type": "text"},
{"name": "created_at", "type": "timestamp"}
]
}
FLOW_INSTANCE_TABLE = {
"summary": [{"name": "flow_instance", "primary": "id"}],
"fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "type": "str", "length": 32, "nullable": "no"},
{"name": "flow_def_id", "type": "str", "length": 32},
{"name": "status", "type": "str", "length": 16},
{"name": "ctx", "type": "text"},
{"name": "active_nodes", "type": "text"},
{"name": "created_at", "type": "timestamp"}
]
}
NODE_EXECUTION_TABLE = {
"summary": [{"name": "node_execution", "primary": "id"}],
"fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "type", "type": "str", "length": 32},
{"name": "org_id", "type": "str", "length": 32},
{"name": "instance_id", "type": "str", "length": 32},
{"name": "node_id", "type": "str", "length": 64},
{"name": "input_ctx", "type": "text"},
{"name": "output_ctx", "type": "text"},
{"name": "ctx", "type": "text"},
{"name": "ctx_ext", "type": "text"},
{"name": "status", "type": "str", "length": 16},
{"name": "error", "type": "text"},
{"name": "input", "type": "text"},
{"name": "output", "type": "text"},
{"name": "subinst_id", "type": "str", "length": 32},
{"name": "role", "type": "str", "length": 64},
{"name": "assignee", "type": "str", "length": 64},
{"name": "running_at", "type": "timestamp"},
{"name": "stopping_at", "type": "timestamp"},
{"name": "created_at", "type": "timestamp"}
]
}
SUBFLOW_INSTANCE_TABLE = {
"summary": [{"name": "subflow_instance", "primary": "id"}],
"fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "type": "str", "length": 32},
{"name": "instance_id", "type": "str", "length": 32},
{"name": "node_id", "type": "str", "length": 64},
{"name": "child_instance_id", "type": "str", "length": 32},
{"name": "status", "type": "str", "length": 16},
{"name": "created_at", "type": "timestamp"}
]
}
HUMAN_TASK_TABLE = {
"summary": [{"name": "human_task", "primary": "id"}],
"fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "type": "str", "length": 32},
{"name": "instance_id", "type": "str", "length": 32},
{"name": "node_id", "type": "str", "length": 64},
{"name": "role", "type": "str", "length": 64},
{"name": "assignee", "type": "str", "length": 64},
{"name": "status", "type": "str", "length": 16},
{"name": "input", "type": "text"},
{"name": "output", "type": "text"},
{"name": "timeout_at", "type": "timestamp"},
{"name": "created_at", "type": "timestamp"},
{"name": "completed_at", "type": "timestamp"}
]
}
# ---------------------------------------------------------------------
# Engine
# ---------------------------------------------------------------------
import yaml
import json
from ahserver.serverenv import ServerEnv
import re
def extract_some_value(s):
"""
'some(100)''some(50%)' 中提取数值。
Args:
s (str): 输入的字符串
Returns: float类型值 小于1的小数点数字%模式)或其他
"""
# 正则解释: 匹配 some( 后跟数字,然后是可选的 % 符号,最后是 )
match = re.search(r'some\((\d+)(%)?\)', s)
if match:
value = float(match.group(1)) # 提取数字部分并转为整数
unit = match.group(2) # 提取单位部分 (%)
if unit == '%':
return value / 100.0
return value
else:
return None
class FlowEngine:
# ------------------------------
# Definition / Instance
# ------------------------------
def __init__(self, backid):
self.aio = []
self.backid = backid
async def bgtask(self):
env = ServerEnv()
self.aio = []
cnt = 0
while True:
aio = []
async with get_sor_context(env, 'dagflow') as sor:
sql = """select * from flow_instance
where backid=${backid}$
and status = 'running'"""
aio = await sor.sqlExe(sql,{
'backid': self.backid
})
debug(f'aio={aio}')
for r in aio:
await self.step(r)
await asyncio.sleep(1)
async def create_definition(self, org_id, name, description, version, dsl_text, ctxfields):
env = ServerEnv()
data = yaml.safe_load(dsl_text)
async with get_sor_context(env, 'dagflow') as sor:
fid = data['id']
await sor.C('flow_definition', {
'id': fid,
'org_id': org_id,
'name': name,
'description': description,
'version': version,
'ctxfields': json.dumps(ctxfields),
'created_at': timestampstr(),
'dsl': dsl_text
})
return fid
async def create_instance(self, org_id, flow_def_id, ctx=None):
env = ServerEnv()
async with get_sor_context(env, 'dagflow') as sor:
iid = env.uuid()
ns = {
'id': iid,
'backid': self.backid,
'round': 1,
'org_id': org_id,
'flow_def_id': flow_def_id,
'status': 'running',
'ctx': json.dumps(ctx or {}),
'created_at': timestampstr()
}
await sor.C('flow_instance', ns.copy())
await self.create_start_node_execution(sor, ns)
return iid
async def create_start_node_execution(self, sor, inst):
env = ServerEnv()
id = env.uuid()
await sor.C('node_execution', {
'id': id,
'type': 'start',
'instance_id': inst['id'],
'org_id': inst['org_id'],
'inst_round': inst['round'],
'status': 'done',
'node_id': 'start',
'created_at': timestampstr()
})
async def create_end_node_execution(self, sor, inst):
env = ServerEnv()
id = env.uuid()
await sor.C('node_execution', {
'id': id,
'type': 'end',
'instance_id': inst['id'],
'org_id': inst['org_id'],
'inst_round': inst['round'],
'status': 'pending',
'node_id': 'end',
'created_at': timestampstr()
})
# ------------------------------
# Execution
# ------------------------------
async def step(self, inst):
"""
从节点执行表中取出所有状态不是结束的执行节点,检查是否可以推进一步
结束状态为:
* succeeded
* failed
* cancelled
推进:如果
"""
env = ServerEnv()
async with get_sor_context(env, 'dagflow') as sor:
flow_def = (await sor.R(
'flow_definition',
{'id': inst['flow_def_id']}
))[0]
dsl = yaml.safe_load(flow_def['dsl'])
ctx = json.loads(inst['ctx'])
sql = "select * from node_execution where instance_id=${instance_id}$ and status != 'completed'"
active_nes = await sor.sqlExe(sql, {
'instance_id': inst.id
})
for ne in active_nes:
node_id = ne.node_id
node = dsl['nodes'][node_id]
node['id'] = node_id
ntype = node['type']
status = ne.status
if status == 'pending':
if ntype == 'human':
if ne.assignee is None:
await self.auto_assign(sor, inst, ne, node)
elif ntype == 'task':
await self.try_start_running(sor, inst, ne, node)
elif ntype == 'subflow':
await self.try_start_subflow(sor, inst, ne, node)
elif ntype == 'start':
await self.node_transfer(sor, dsl, inst, ne, node, flow_def)
await sor.U('node_execution', {
'id': ne.id,
'status': 'completed'
})
elif ntype == 'end':
flg = await self.is_ok_to_step_next(sor, node, edges, inst)
if flg:
await sor.U('node_execution', {
'id': ne.id,
'status': 'completed'
})
await sor.U('flow_instance', {
'id': inst.id,
'status': 'completed'
})
break;
elif status in ['done', 'cancelled', 'failed']:
await self.node_transfer(sor, dsl, inst, ne, node, flow_def)
await sor.U('node_execution', {
'id': ne.id,
'status': 'completed'
})
self.check_instance_completed(sor, inst)
# ------------------------------
# Human task APIs
# ------------------------------
async def auto_assign(self, sor, inst, ne, node):
recs = await env.get_org_users(sor, inst.org_id, ne.role)
if recs:
i = randint(0, len(recs) - 1)
await sor.U('node_execution', {
'id':ne.id,
'assignee': recs[i].id
})
async def check_instance_completed(self, sor, inst):
sql = "select * from node_execution where instance_id=${instance_id} and type='end'"
recs = await sor.sqlExe(sql, {'instance_id', inst.id})
if recs:
if recs[0].status == 'completed':
await sor.U('flow_instance', {
'id': inst.id,
'status': 'completed'
})
async def node_transfer(self, sor, dsl, inst, ne, node, flow_def):
nnode = []
ctx = inst.ctx.copy()
for edge in dsl.get('edges', []):
if edge['from'] != ne.node_id:
continue
cond = edge.get('when')
if cond and not safe_eval_condition(cond, {'ctx': ctx}):
continue
m_on = edge.get('foreach')
if m_on:
ns = DictObject(**{
'sor': sor,
'm_on': m_on,
'ctx': ctx,
'node': node
})
on_array = safe_eval_condition(m_on, ctx)
for e in on_array:
nnodes.add((edge['to'], str(e)))
else:
nnodes.add(edge['to'], None)
for node, ctx_ext in nnodes:
x = await self.is_ok_to_create_new_node_exe(sor, node, edges, inst)
if x:
env = ServerEnv()
id = env.uuid()
ns = {
'id': id,
'type': node['type'],
'inst_round': inst.round,
'org_id': inst.org_id,
'node_id': node['id'],
'input_ctx': node.get('input_ctx'),
'output_ctx': node.get('output_ctx'),
'status': 'pending',
'ctx': json.dumps(inst.ctx.copy(), ensure_ascii=False),
'ctx_ext': json.dumps(ctx_ext.copy(), ensure_ascii=False),
'created_at': timestampstr()
}
if node['type'] == 'human':
ns.update({
'role': node.get('role')
})
elif node['type'] == 'task':
pass
elif node['type'] == 'subflow':
eng = env.template_engine
ctx = inst.ctx
if node.get('input_ctx'):
ctx = eng.renders(node['input_ctx'], ctx)
subinst_id = await self.create_instance(org_id, node.flow_def_id, ctx=ctx)
ns.update({
'subinst_id': subinst_id
})
elif node['type'] == 'end':
await self.create_end_node_execution(sor, inst)
continue
await sor.C('node_execution', ns)
async def is_ok_to_create_new_node_exe(self, sor, node, edges, inst):
join = node.get('join')
if join is None:
return True
return False
async def create_new_node_exe(self, sor, node, edges, inst):
flg = await self.is_ok_to_create_new_node_exe(sor, node, edges, inst)
if flg:
ctx = inst['ctx'].copy,
ctx_ext = ext_data or {}
id = env.uuid()
await sor.C('node_execution', {
'id': id,
'org_id': inst['org_id'],
'instance_id': inst['id'],
'node_id': node['id'],
'ctx': json.dumps(ctx, ensure_ascii=False),
'ctx_ext': json.dumps(ctx_ext, ensure_ascii=False),
'input_ctx': node['input_ctx'],
'status': 'pendding'
})
return id
async def is_ok_to_step_next(self, sor, node, edges, inst):
join = node.get('join')
if not join:
return True
if join == 'any':
return True
backnodes = [e['from'] for e in edges if e['to'] == node['id']]
sql = """select * from node_execution
where node_id in ${backnodes}$
and instance_id = ${instance_id}$"""
recs = await sor.sqlExe(sql, {
'backnodes': backnodes,
'instance_id': inst['id']
})
if join == 'all':
for r in recs:
if r.status in ['pending', 'running']:
return False
return True
if join == 'xor':
backnodes = [id for id in backnodes if id != node['id']]
sql = """update from node_execution
set status = 'cancelled'
where status in ['pending', 'running']
and node_id in ${backnodes}$
and instance_id = ${instance_id}$"""
await sor.sqlExe(sql, {
'backnodes': backnodes,
'instance_id': inst['id']
})
return True
some = extract_some_value(join)
sql = """select * from node_transfer
where instance_id=${instance_id}$
and to_node_id = ${node_id}$"""
trecs = await sor.sqlExe(sql, {
'node_id': node['id'],
'instance_id': inst['id']
})
if some >= 1 and some <= len(trecs):
return True
if some <= float(len(trecs))/float(len(recs)):
return True
return False
async def start_running(self, sor, node, edges, inst):
flg= self.is_ok_to_step_next(sor, node, edges, inst)
if flg:
await sor.R('node_execution', {
'id': node['id'],
'status': 'running',
'running_time': timestampstr()
})
try:
out = await self.run_auto_task(node)
await sor.U('node_execution', {
'id': node['id'],
'status': 'done',
'stopping_at': timestampstr(),
'output_ctx': json.dumps(out, ensure_ascii=False)
})
except Exception as e:
await sor.U('node_execution', {
'id': node['id'],
'status': 'failed',
'stopping_at': timestampstr()
})
async def list_human_tasks(self, org_id, user_roles):
env = ServerEnv()
async with get_sor_context(env, 'dagflow') as sor:
return await sor.R('human_task', {
'org_id': org_id,
'status': 'pending',
'role': ('in', user_roles)
})
async def complete_human_task(self, org_id, task_id, user_id, output):
env = ServerEnv()
async with get_sor_context(env, 'dagflow') as sor:
rows = await sor.R('human_task', {
'id': task_id,
'org_id': org_id,
'status': 'pending'
})
if not rows:
return
task = rows[0]
await sor.U('human_task', {
'id': task_id,
'assignee': user_id,
'status': 'done',
'output': json.dumps(output),
'completed_at': env.now()
})
inst = (await sor.R(
'flow_instance',
{'id': task['instance_id']}
))[0]
ctx = json.loads(inst['ctx'])
ctx.update(output)
await sor.U('flow_instance', {
'id': inst['id'],
'ctx': json.dumps(ctx)
})
def get_engine():
env = ServerEnv()
cnt = len(env.flow_engines)
id = randint(0, cnt-1)
return env.flow_engines[id]
async def list_org_instances(request, flow_def_id=None):
env = request._run_ns
orgid = await env.get_userorgid()
async with get_sor_context(env, 'dagflow') as sor:
sql = """select b.name, a.*
from flow_instance a, flow_definition b
where a.flow_def_id = b.id
and a.status='running'
and a.org_id=${orgid}$"""
if flow_def_id:
sql += ' and a.flow_def_id = ${flow_def_id}$'
recs = await sor.sqlExe(sql, {
'orgid': orgid,
'flow_def_id': flow_def_id
})
return recs
return []
async def get_my_flow_works(request, inst_id=None):
env = request._run_ns
orgid = await env.get_userorgid()
userid = await env.get_user()
myroles = await env.get_user_roles(userid)
async with get_sor_context(env, 'dagflow') as sor:
sql = """select *
from node_execute
where type = 'human'
and status in ['pending', 'running']
and (assignee = ${userid}$ or (assignee is NULL and role in ${myroles}$))"""
if inst_id:
sql += ' and instance_id = ${inst_id}$'
recs = await sor.sqlExe(sql, {
'userid': userid,
'myroles': myroles,
'inst_id': inst_id
})
return recs
return []
async def get_exists_workflows(request):
env = request._run_ns
orgid = await env.get_userorgid()
orgids = ['0', orgid]
sql = "select * from flow_definition where org_id in ${orgids}$"
async with get_sor_context(env, 'dagflow') as sor:
recs = await sor.sqlExe(sql, {'orgids': orgids})
return recs
return []
async def new_instance(request, flow_def_id, params_kw):
env = request._run_ns
orgid = await env.get_userorgid()
engine = get_engine()
fiid = await engine.create_instance(orgid,
flow_def_id, ctx=params_kw)
return fiid
async def add_new_workflow(request, params_kw={}):
name = params_kw.name
dsl = params_kw.dsl
env = request._run_ns
org_id = await env.get_userorgid()
ctxfields = params_kw.ctxfields or []
description = params_kw.description
engine = get_engine()
id = await engine.create_definition(org_id, name, description, '1', dsl, ctxfields)
return {
'status': 'SUCCEEDED',
'data': {
'dagflow_id': id
}
}
async def get_org_flow_definition(request):
env = request._run_ns
orgid = await env.get_userorgid()
async with get_sor_context(env, 'dagflow') as sor:
sql = """select * from flow_definition where org_id in ${orgids}$"""
recs = await sor.sqlExe(sql, {'orgids': ['0', orgid]})
return recs
return []