dagflow/dagflow/dagflow.py
2026-03-17 16:55:15 +08:00

716 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
Workflow Engine v1.3 (Enterprise Edition)
========================================
Features:
- org_id multi-tenant isolation
- DAG workflow with persistence (sqlor)
- Subflow support
- Human task node
- RBAC-based task assignment
- Query APIs for UI
"""
import asyncio
import yaml
from random import randint
from appPublic.log import debug, exception, error, info
from appPublic.timeUtils import timestampstr
from sqlor.dbpools import get_sor_context
from asteval import Interpreter
# 1. 创建一个全局的 Interpreter 实例(复用以提高性能)
aeval = Interpreter()
# 2. 在你的 node_transfer 或条件判断逻辑中
def safe_eval_condition(cond_str, ctx_dict):
"""
安全地评估条件表达式
Args:
cond_str: 表达式字符串,例如 "ctx['age'] > 18 and ctx['score'] >= 60"
ctx_dict: 上下文数据字典
Returns: None 错误或的确是None
其他: 表达式结果
"""
try:
# 将 ctx 注入到 asteval 的符号表中
aeval.symtable['ctx'] = ctx_dict
# 执行表达式
# mode='eval' 表示只允许单个表达式不允许语句如赋值、import
result = aeval(cond_str, mode='eval')
# 检查是否有语法错误或运行时错误
if aeval.error:
print(f"asteval Error: {aeval.error}")
return False
return bool(result)
except Exception as e:
# asteval 通常很安全,但以防万一
print(f"Eval Exception: {e}")
return False
# ---------------------------------------------------------------------
# Table definitions
# ---------------------------------------------------------------------
FLOW_DEFINITION_TABLE = {
"summary": [{"name": "flow_definition", "primary": "id"}],
"fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "type": "str", "length": 32},
{"name": "name", "type": "str", "length": 128},
{"name": "version", "type": "str", "length": 32},
{"name": "dsl", "type": "text"},
{"name": "created_at", "type": "timestamp"}
]
}
FLOW_INSTANCE_TABLE = {
"summary": [{"name": "flow_instance", "primary": "id"}],
"fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "type": "str", "length": 32, "nullable": "no"},
{"name": "flow_def_id", "type": "str", "length": 32},
{"name": "status", "type": "str", "length": 16},
{"name": "ctx", "type": "text"},
{"name": "active_nodes", "type": "text"},
{"name": "created_at", "type": "timestamp"}
]
}
NODE_EXECUTION_TABLE = {
"summary": [{"name": "node_execution", "primary": "id"}],
"fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "type", "type": "str", "length": 32},
{"name": "org_id", "type": "str", "length": 32},
{"name": "instance_id", "type": "str", "length": 32},
{"name": "node_id", "type": "str", "length": 64},
{"name": "input_ctx", "type": "text"},
{"name": "output_ctx", "type": "text"},
{"name": "ctx", "type": "text"},
{"name": "ctx_ext", "type": "text"},
{"name": "status", "type": "str", "length": 16},
{"name": "error", "type": "text"},
{"name": "input", "type": "text"},
{"name": "output", "type": "text"},
{"name": "subinst_id", "type": "str", "length": 32},
{"name": "role", "type": "str", "length": 64},
{"name": "assignee", "type": "str", "length": 64},
{"name": "running_at", "type": "timestamp"},
{"name": "stopping_at", "type": "timestamp"},
{"name": "created_at", "type": "timestamp"}
]
}
SUBFLOW_INSTANCE_TABLE = {
"summary": [{"name": "subflow_instance", "primary": "id"}],
"fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "type": "str", "length": 32},
{"name": "instance_id", "type": "str", "length": 32},
{"name": "node_id", "type": "str", "length": 64},
{"name": "child_instance_id", "type": "str", "length": 32},
{"name": "status", "type": "str", "length": 16},
{"name": "created_at", "type": "timestamp"}
]
}
HUMAN_TASK_TABLE = {
"summary": [{"name": "human_task", "primary": "id"}],
"fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "type": "str", "length": 32},
{"name": "instance_id", "type": "str", "length": 32},
{"name": "node_id", "type": "str", "length": 64},
{"name": "role", "type": "str", "length": 64},
{"name": "assignee", "type": "str", "length": 64},
{"name": "status", "type": "str", "length": 16},
{"name": "input", "type": "text"},
{"name": "output", "type": "text"},
{"name": "timeout_at", "type": "timestamp"},
{"name": "created_at", "type": "timestamp"},
{"name": "completed_at", "type": "timestamp"}
]
}
# ---------------------------------------------------------------------
# Engine
# ---------------------------------------------------------------------
import yaml
import json
from ahserver.serverenv import ServerEnv
import re
def extract_some_value(s):
"""
'some(100)''some(50%)' 中提取数值。
Args:
s (str): 输入的字符串
Returns: float类型值 小于1的小数点数字%模式)或其他
"""
# 正则解释: 匹配 some( 后跟数字,然后是可选的 % 符号,最后是 )
match = re.search(r'some\((\d+)(%)?\)', s)
if match:
value = float(match.group(1)) # 提取数字部分并转为整数
unit = match.group(2) # 提取单位部分 (%)
if unit == '%':
return value / 100.0
return value
else:
return None
class FlowEngine:
# ------------------------------
# Definition / Instance
# ------------------------------
def __init__(self, backid):
self.aio = []
self.backid = backid
async def bgtask(self):
env = ServerEnv()
self.aio = []
cnt = 0
while True:
aio = []
async with get_sor_context(env, 'dagflow') as sor:
sql = """select * from flow_instance
where backid=${backid}$
and status = 'running'"""
aio = await sor.sqlExe(sql,{
'backid': self.backid
})
debug(f'aio={aio}')
for r in aio:
await self.step(r)
await asyncio.sleep(1)
async def create_definition(self, org_id, name, description, version, dsl_text, ctxfields):
env = ServerEnv()
data = yaml.safe_load(dsl_text)
async with get_sor_context(env, 'dagflow') as sor:
fid = data['id']
await sor.C('flow_definition', {
'id': fid,
'org_id': org_id,
'name': name,
'description': description,
'version': version,
'ctxfields': json.dumps(ctxfields),
'created_at': timestampstr(),
'dsl': dsl_text
})
return fid
async def create_instance(self, org_id, flow_def_id, ctx=None):
env = ServerEnv()
async with get_sor_context(env, 'dagflow') as sor:
iid = env.uuid()
ns = {
'id': iid,
'backid': self.backid,
'round': 1,
'org_id': org_id,
'flow_def_id': flow_def_id,
'status': 'running',
'ctx': json.dumps(ctx or {}),
'created_at': timestampstr()
}
await sor.C('flow_instance', ns.copy())
await self.create_start_node_execution(sor, ns)
return iid
async def create_start_node_execution(self, sor, inst):
env = ServerEnv()
id = env.uuid()
await sor.C('node_execution', {
'id': id,
'type': 'start',
'instance_id': inst['id'],
'org_id': inst['org_id'],
'inst_round': inst['round'],
'status': 'done',
'node_id': 'start',
'created_at': timestampstr()
})
async def create_end_node_execution(self, sor, inst):
env = ServerEnv()
id = env.uuid()
await sor.C('node_execution', {
'id': id,
'type': 'end',
'instance_id': inst['id'],
'org_id': inst['org_id'],
'inst_round': inst['round'],
'status': 'pending',
'node_id': 'end',
'created_at': timestampstr()
})
# ------------------------------
# Execution
# ------------------------------
async def step(self, inst):
"""
从节点执行表中取出所有状态不是结束的执行节点,检查是否可以推进一步
结束状态为:
* succeeded
* failed
* cancelled
推进:如果
"""
env = ServerEnv()
async with get_sor_context(env, 'dagflow') as sor:
flow_def = (await sor.R(
'flow_definition',
{'id': inst['flow_def_id']}
))[0]
dsl = yaml.safe_load(flow_def['dsl'])
edges = dsl.get('edges',[])
ctx = json.loads(inst['ctx'])
sql = "select * from node_execution where instance_id=${instance_id}$ and status != 'completed'"
active_nes = await sor.sqlExe(sql, {
'instance_id': inst.id
})
for ne in active_nes:
node_id = ne.node_id
node = dsl['nodes'][node_id]
node['id'] = node_id
ntype = node['type']
status = ne.status
if ntype == 'start':
await sor.U('node_execution', {
'id': ne.id,
'status': 'completed'
})
await self.node_transfer(sor, dsl, inst, ne, node, flow_def)
elif ntype == 'end':
flg = await self.is_ok_to_step_next(sor, node, edges, inst)
if flg:
await sor.U('node_execution', {
'id': ne.id,
'status': 'completed'
})
await sor.U('flow_instance', {
'id': inst.id,
'status': 'completed'
})
break;
else:
if status == 'pending':
if ntype == 'human':
if ne.assignee is None:
await self.auto_assign(sor, inst, ne, node)
elif ntype == 'task':
flg= self.is_ok_to_step_next(sor, node, edges, inst)
if flg:
await self.start_running(sor, inst, ne, node)
elif ntype == 'subflow':
flg= self.is_ok_to_step_next(sor, node, edges, inst)
if flg:
await self.start_subflow(sor, inst, ne, node)
elif status in ['done', 'cancelled']:
debug(f'节点完成,检查后续节点')
await self.node_transfer(sor, dsl, inst, ne, node, flow_def)
await sor.U('node_execution', {
'id': ne.id,
'status': 'completed'
})
elif status == 'failed':
debug(f'节点执行失败,需要运维人员参与')
def build_node_input_ctx(self, inst, ne):
ctx = json.loads(ne.ctx)
if ne.ctx_ext:
ctx_ext = json.loads(ns.ctx_ext)
return f'{ctx}\n附加内容:\n{ctx_ext}'
return ctx
async def start_subflow(self, sor, inst, ne, node):
subflow_id = node.get('subflow_id')
if subflow_id is None:
debug(f'{node=} 没有subflow_id属性')
return
sub_ctx = self.build_node_input_ctx(inst, ne)
ne.subinst_id = await self.create_instance(
ne.org_id,
subflow_id, ctx=sub_ctx)
ne.status = 'running'
ne.running_at = timestampstr()
await sor.U('node_execution', ne.copy())
async def run_auto_task(self, ne):
env = ServerEnv()
users = get_org_users(ne.orgid)
if not users:
debug(f'{orgid=} 没有用户')
return
sub_ctx = self.build_node_input_ctx(inst, ne)
try:
f = partial(self.task_success_callback, ne.id)
ef = partial(self.task_error_callback, ne.id)
params = {
prompt: sub_ctx,
callback:f,
errback: ef
}
asyncio.create_task(env.run_skillagent, params)
# ------------------------------
# Human task APIs
# ------------------------------
async def auto_assign(self, sor, inst, ne, node):
recs = await env.get_org_users(sor, inst.org_id, ne.role)
if recs:
i = randint(0, len(recs) - 1)
await sor.U('node_execution', {
'id':ne.id,
'assignee': recs[i].id
})
async def instance_completed(self, sor, inst):
await sor.U('flow_instance', {
'id': inst.id,
'status': 'completed'
})
async def node_transfer(self, sor, dsl, inst, ne, node, flow_def):
nnodes = []
ctx = json.loads(inst.ctx)
debug(f'{dsl=}, {inst=}, {ne=}, {node=}')
for edge in dsl.get('edges', []):
if edge['from'] != ne.node_id:
continue
cond = edge.get('when')
if cond and not safe_eval_condition(cond, {'ctx': ctx}):
debug(f'{cond=}, {ctx=},不匹配分支,{edge=}')
continue
m_on = edge.get('foreach')
if m_on:
on_array = safe_eval_condition(m_on, ctx)
for e in on_array:
nnodes.append((edge['to'], str(e), edge.copy()))
else:
nnodes.append((edge['to'], None, edge.copy()))
if len(nnodes) == 0:
debug(f'{ne=}, {edge}, 没有找到下一个节点')
return
debug(f'{nnodes=}, 找到的下节点。。。。。')
for node_id, ctx_ext, edge in nnodes:
node = dsl['nodes'][node_id]
if isinstance(ctx_ext, dict) or isinstance(ctx_ext, list):
ctx_ext = json.dumps(ctx_ext, ensure_ascii=False)
x = await self.is_ok_to_create_new_node_exe(sor, node, edge, inst)
if x:
env = ServerEnv()
id = env.uuid()
ns = {
'id': id,
'type': node['type'],
'inst_round': inst.round,
'instance_id': inst.id,
'org_id': inst.org_id,
'node_id': node_id,
'input_ctx': node.get('input_ctx'),
'output_ctx': node.get('output_ctx'),
'status': 'pending',
'ctx': inst.ctx,
'ctx_ext': ctx_ext,
'created_at': timestampstr()
}
if node['type'] == 'human':
ns.update({
'role': node.get('role')
})
elif node['type'] == 'task':
pass
elif node['type'] == 'subflow':
eng = env.template_engine
ctx = inst.ctx
if node.get('input_ctx'):
ctx = eng.renders(node['input_ctx'], ctx)
subinst_id = await self.create_instance(org_id, node.flow_def_id, ctx=ctx)
ns.update({
'subinst_id': subinst_id
})
elif node['type'] == 'end':
await self.create_end_node_execution(sor, inst)
debug('f{node=}是终点节点,直接创建结束节点')
continue
await sor.C('node_execution', ns)
debug(f'{ns=} 节点创建完成')
else:
debug(f'{node=}, {edge=}, {inst=}is_ok_to_create_new_node_exe() 返回了{x}')
async def is_ok_to_create_new_node_exe(self, sor, node, edges, inst):
join = node.get('join')
if join is None:
return True
return False
async def create_new_node_exe(self, sor, node, edges, inst):
flg = await self.is_ok_to_create_new_node_exe(sor, node, edges, inst)
if flg:
ctx = inst['ctx'].copy,
ctx_ext = ext_data or {}
id = env.uuid()
await sor.C('node_execution', {
'id': id,
'org_id': inst['org_id'],
'instance_id': inst['id'],
'node_id': node['id'],
'ctx': json.dumps(ctx, ensure_ascii=False),
'ctx_ext': json.dumps(ctx_ext, ensure_ascii=False),
'input_ctx': node['input_ctx'],
'status': 'pendding'
})
return id
async def is_ok_to_step_next(self, sor, node, edges, inst):
join = node.get('join')
if not join:
return True
if join == 'any':
return True
backnodes = [e['from'] for e in edges if e['to'] == node['id']]
sql = """select * from node_execution
where node_id in ${backnodes}$
and instance_id = ${instance_id}$"""
recs = await sor.sqlExe(sql, {
'backnodes': backnodes,
'instance_id': inst['id']
})
if join == 'all':
for r in recs:
if r.status != 'completed':
return False
return True
if join == 'xor':
backnodes = [id for id in backnodes if id != node['id']]
sql = """update from node_execution
set status = 'cancelled'
where status in ['pending', 'running']
and node_id in ${backnodes}$
and instance_id = ${instance_id}$"""
await sor.sqlExe(sql, {
'backnodes': backnodes,
'instance_id': inst['id']
})
return True
some = extract_some_value(join)
sql = """select * from node_transfer
where instance_id=${instance_id}$
and to_node_id = ${node_id}$"""
trecs = await sor.sqlExe(sql, {
'node_id': node['id'],
'instance_id': inst['id']
})
if some >= 1 and some <= len(trecs):
return True
if some <= float(len(trecs))/float(len(recs)):
return True
return False
async def task_error_callback(self, ne_id, errmsg):
env = Server()
async with get_sor_context(env, 'dagflow') as sor:
recs = await sor.R('node_execution', {'id': ne_id})
if not recs:
debug(f'{ne_id=} 没找到node_execution记录')
return
ne = recs[0]
ne.status = 'failed',
ne.output = errmsg
await sor.U('node_execution', ne)
async def task_success_callback(self, ne_id, result):
env = ServerEnv()
async with get_sor_context(env, 'dagflow') as sor:
recs = await sor.R('node_execution', {'id': ne_id})
if not recs:
debug(f'{ne_id=} 没找到node_execution记录')
return
ne = recs[0]
inst_id = ne.instance_id
insts = await sor.R('flow_instance', {'id': inst_id})
if insts
debug(f'{inst_id=} 没找到flow_instance记录')
return
inst = insts[0]
ne.output = result if result is None else \
result if isinstance(result, str) else \
json.dumps(result, ensure_ascii=False)
ne.status = 'done'
if result:
data = {
'output': result
}
tmpl = env.template_engine.renders(ne.output_ctx, data)
ne.output = tmpl
d = json.loads(tmpl)
jctx = json.loads(inst.ctx)
jctx.update(d)
inst.ctx = json.dumps(ctx, ensure_ascii=False)
await sor.U('flow_instance', inst)
else:
ne.output = None
await sor.U('node_execution', ne)
async def start_running(self, sor, ne, edges, inst):
await sor.R('node_execution', {
'id': ne['id'],
'status': 'running',
'running_time': timestampstr()
})
try:
out = await self.run_auto_task(ne)
except Exception as e:
await sor.U('node_execution', {
'id': ne['id'],
'status': 'failed',
'stopping_at': timestampstr()
})
async def list_human_tasks(self, org_id, user_roles):
env = ServerEnv()
async with get_sor_context(env, 'dagflow') as sor:
return await sor.R('human_task', {
'org_id': org_id,
'status': 'pending',
'role': ('in', user_roles)
})
async def complete_human_task(self, org_id, task_id, user_id, output):
env = ServerEnv()
async with get_sor_context(env, 'dagflow') as sor:
rows = await sor.R('human_task', {
'id': task_id,
'org_id': org_id,
'status': 'pending'
})
if not rows:
return
task = rows[0]
await sor.U('human_task', {
'id': task_id,
'assignee': user_id,
'status': 'done',
'output': json.dumps(output),
'completed_at': env.now()
})
inst = (await sor.R(
'flow_instance',
{'id': task['instance_id']}
))[0]
ctx = json.loads(inst['ctx'])
ctx.update(output)
await sor.U('flow_instance', {
'id': inst['id'],
'ctx': json.dumps(ctx)
})
def get_engine():
env = ServerEnv()
cnt = len(env.flow_engines)
id = randint(0, cnt-1)
return env.flow_engines[id]
async def list_org_instances(request, flow_def_id=None):
env = request._run_ns
orgid = await env.get_userorgid()
async with get_sor_context(env, 'dagflow') as sor:
sql = """select b.name, a.*
from flow_instance a, flow_definition b
where a.flow_def_id = b.id
and a.status='running'
and a.org_id=${orgid}$"""
if flow_def_id:
sql += ' and a.flow_def_id = ${flow_def_id}$'
recs = await sor.sqlExe(sql, {
'orgid': orgid,
'flow_def_id': flow_def_id
})
return recs
return []
async def get_my_flow_works(request, inst_id=None):
env = request._run_ns
orgid = await env.get_userorgid()
userid = await env.get_user()
myroles = await env.get_user_roles(userid)
async with get_sor_context(env, 'dagflow') as sor:
sql = """select *
from node_execute
where type = 'human'
and status in ['pending', 'running']
and (assignee = ${userid}$ or (assignee is NULL and role in ${myroles}$))"""
if inst_id:
sql += ' and instance_id = ${inst_id}$'
recs = await sor.sqlExe(sql, {
'userid': userid,
'myroles': myroles,
'inst_id': inst_id
})
return recs
return []
async def get_exists_workflows(request):
env = request._run_ns
orgid = await env.get_userorgid()
orgids = ['0', orgid]
sql = "select * from flow_definition where org_id in ${orgids}$"
async with get_sor_context(env, 'dagflow') as sor:
recs = await sor.sqlExe(sql, {'orgids': orgids})
return recs
return []
async def new_instance(request, flow_def_id, params_kw):
env = request._run_ns
orgid = await env.get_userorgid()
engine = get_engine()
fiid = await engine.create_instance(orgid,
flow_def_id, ctx=params_kw)
return fiid
async def add_new_workflow(request, params_kw={}):
name = params_kw.name
dsl = params_kw.dsl
env = request._run_ns
org_id = await env.get_userorgid()
ctxfields = params_kw.ctxfields or []
description = params_kw.description
engine = get_engine()
id = await engine.create_definition(org_id, name, description, '1', dsl, ctxfields)
return {
'status': 'SUCCEEDED',
'data': {
'dagflow_id': id
}
}
async def get_org_flow_definition(request):
env = request._run_ns
orgid = await env.get_userorgid()
async with get_sor_context(env, 'dagflow') as sor:
sql = """select * from flow_definition where org_id in ${orgids}$"""
recs = await sor.sqlExe(sql, {'orgids': ['0', orgid]})
return recs
return []