591 lines
16 KiB
Python
591 lines
16 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
Workflow Engine v1.3 (Enterprise Edition)
|
||
========================================
|
||
|
||
Features:
|
||
- org_id multi-tenant isolation
|
||
- DAG workflow with persistence (sqlor)
|
||
- Subflow support
|
||
- Human task node
|
||
- RBAC-based task assignment
|
||
- Query APIs for UI
|
||
"""
|
||
import asyncio
|
||
import yaml
|
||
from random import randint
|
||
from appPublic.log import debug
|
||
from appPublic.timeUtils import timestampstr
|
||
from sqlor.dbpools import get_sor_context
|
||
|
||
# ---------------------------------------------------------------------
|
||
# Table definitions
|
||
# ---------------------------------------------------------------------
|
||
|
||
FLOW_DEFINITION_TABLE = {
|
||
"summary": [{"name": "flow_definition", "primary": "id"}],
|
||
"fields": [
|
||
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
|
||
{"name": "org_id", "type": "str", "length": 32},
|
||
{"name": "name", "type": "str", "length": 128},
|
||
{"name": "version", "type": "str", "length": 32},
|
||
{"name": "dsl", "type": "text"},
|
||
{"name": "created_at", "type": "timestamp"}
|
||
]
|
||
}
|
||
|
||
FLOW_INSTANCE_TABLE = {
|
||
"summary": [{"name": "flow_instance", "primary": "id"}],
|
||
"fields": [
|
||
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
|
||
{"name": "org_id", "type": "str", "length": 32, "nullable": "no"},
|
||
{"name": "flow_def_id", "type": "str", "length": 32},
|
||
{"name": "status", "type": "str", "length": 16},
|
||
{"name": "ctx", "type": "text"},
|
||
{"name": "active_nodes", "type": "text"},
|
||
{"name": "created_at", "type": "timestamp"}
|
||
]
|
||
}
|
||
|
||
NODE_EXECUTION_TABLE = {
|
||
"summary": [{"name": "node_execution", "primary": "id"}],
|
||
"fields": [
|
||
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
|
||
{"name": "type", "type": "str", "length": 32},
|
||
{"name": "org_id", "type": "str", "length": 32},
|
||
{"name": "instance_id", "type": "str", "length": 32},
|
||
{"name": "node_id", "type": "str", "length": 64},
|
||
{"name": "input_ctx", "type": "text"},
|
||
{"name": "output_ctx", "type": "text"},
|
||
{"name": "ctx", "type": "text"},
|
||
{"name": "ctx_ext", "type": "text"},
|
||
{"name": "status", "type": "str", "length": 16},
|
||
{"name": "error", "type": "text"},
|
||
{"name": "input", "type": "text"},
|
||
{"name": "output", "type": "text"},
|
||
{"name": "subinst_id", "type": "str", "length": 32},
|
||
{"name": "role", "type": "str", "length": 64},
|
||
{"name": "assignee", "type": "str", "length": 64},
|
||
{"name": "running_at", "type": "timestamp"},
|
||
{"name": "stopping_at", "type": "timestamp"},
|
||
{"name": "created_at", "type": "timestamp"}
|
||
]
|
||
}
|
||
|
||
SUBFLOW_INSTANCE_TABLE = {
|
||
"summary": [{"name": "subflow_instance", "primary": "id"}],
|
||
"fields": [
|
||
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
|
||
{"name": "org_id", "type": "str", "length": 32},
|
||
{"name": "instance_id", "type": "str", "length": 32},
|
||
{"name": "node_id", "type": "str", "length": 64},
|
||
{"name": "child_instance_id", "type": "str", "length": 32},
|
||
{"name": "status", "type": "str", "length": 16},
|
||
{"name": "created_at", "type": "timestamp"}
|
||
]
|
||
}
|
||
|
||
HUMAN_TASK_TABLE = {
|
||
"summary": [{"name": "human_task", "primary": "id"}],
|
||
"fields": [
|
||
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
|
||
{"name": "org_id", "type": "str", "length": 32},
|
||
{"name": "instance_id", "type": "str", "length": 32},
|
||
{"name": "node_id", "type": "str", "length": 64},
|
||
{"name": "role", "type": "str", "length": 64},
|
||
{"name": "assignee", "type": "str", "length": 64},
|
||
{"name": "status", "type": "str", "length": 16},
|
||
{"name": "input", "type": "text"},
|
||
{"name": "output", "type": "text"},
|
||
{"name": "timeout_at", "type": "timestamp"},
|
||
{"name": "created_at", "type": "timestamp"},
|
||
{"name": "completed_at", "type": "timestamp"}
|
||
]
|
||
}
|
||
|
||
# ---------------------------------------------------------------------
|
||
# Engine
|
||
# ---------------------------------------------------------------------
|
||
|
||
import yaml
|
||
import json
|
||
from ahserver.serverenv import ServerEnv
|
||
import re
|
||
|
||
def extract_some_value(s):
|
||
"""
|
||
从 'some(100)' 或 'some(50%)' 中提取数值。
|
||
Args:
|
||
s (str): 输入的字符串
|
||
|
||
Returns: float类型值, 小于1的小数点数字(%模式)或其他
|
||
"""
|
||
# 正则解释: 匹配 some( 后跟数字,然后是可选的 % 符号,最后是 )
|
||
match = re.search(r'some\((\d+)(%)?\)', s)
|
||
if match:
|
||
value = float(match.group(1)) # 提取数字部分并转为整数
|
||
unit = match.group(2) # 提取单位部分 (%)
|
||
if unit == '%':
|
||
return value / 100.0
|
||
return value
|
||
else:
|
||
return None
|
||
|
||
class FlowEngine:
|
||
|
||
# ------------------------------
|
||
# Definition / Instance
|
||
# ------------------------------
|
||
def __init__(self, backid):
|
||
self.aio = []
|
||
self.backid = backid
|
||
|
||
async def bgtask(self):
|
||
env = ServerEnv()
|
||
self.aio = []
|
||
cnt = 0
|
||
while True:
|
||
aio = []
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
sql = """select * from flow_instance
|
||
where backid=${backid}$
|
||
and status = 'running'"""
|
||
aio = await sor.sqlExe(sql,{
|
||
'backid': self.backid
|
||
})
|
||
for r in aio:
|
||
await elf.step(r)
|
||
await asyncio.sleep(1)
|
||
|
||
async def create_definition(self, org_id, name, description, version, dsl_text, ctxfields):
|
||
env = ServerEnv()
|
||
data = yaml.safe_load(dsl_text)
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
fid = data['id']
|
||
await sor.C('flow_definition', {
|
||
'id': fid,
|
||
'org_id': org_id,
|
||
'name': name,
|
||
'description': description,
|
||
'version': version,
|
||
'ctxfields': json.dumps(ctxfields),
|
||
'created_at': timestampstr(),
|
||
'dsl': dsl_text
|
||
})
|
||
return fid
|
||
|
||
async def create_instance(self, org_id, flow_def_id, ctx=None):
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
iid = env.uuid()
|
||
ns = {
|
||
'id': iid,
|
||
'backid': self.backid,
|
||
'round': 1,
|
||
'org_id': org_id,
|
||
'flow_def_id': flow_def_id,
|
||
'status': 'running',
|
||
'ctx': json.dumps(ctx or {}),
|
||
'created_at': timestampstr()
|
||
}
|
||
await sor.C('flow_instance', ns.copy())
|
||
await self.create_start_node_execution(sor, ns)
|
||
return iid
|
||
|
||
async def create_start_node_execution(self, sor, inst):
|
||
env = ServerEnv()
|
||
id = env.uuid()
|
||
await sor.C('node_execution', {
|
||
'id': id,
|
||
'type': 'start',
|
||
'instance_id': inst['id'],
|
||
'org_id': inst['org_id'],
|
||
'inst_round': inst['round'],
|
||
'status': 'done',
|
||
'node_id': 'start',
|
||
'created_at': timestampstr()
|
||
})
|
||
async def create_end_node_execution(self, sor, inst):
|
||
env = ServerEnv()
|
||
id = env.uuid()
|
||
await sor.C('node_execution', {
|
||
'id': id,
|
||
'type': 'end',
|
||
'instance_id': inst['id'],
|
||
'org_id': inst['org_id'],
|
||
'inst_round': inst['round'],
|
||
'status': 'pending',
|
||
'node_id': 'end',
|
||
'created_at': timestampstr()
|
||
})
|
||
# ------------------------------
|
||
# Execution
|
||
# ------------------------------
|
||
|
||
async def step(self, inst):
|
||
"""
|
||
从节点执行表中取出所有状态不是结束的执行节点,检查是否可以推进一步
|
||
结束状态为:
|
||
* succeeded
|
||
* failed
|
||
* cancelled
|
||
推进:如果
|
||
"""
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
flow_def = (await sor.R(
|
||
'flow_definition',
|
||
{'id': inst['flow_def_id']}
|
||
))[0]
|
||
|
||
dsl = yaml.safe_load(flow_def['dsl'])
|
||
ctx = json.loads(inst['ctx'])
|
||
active = set(json.loads(inst['active_nodes']) or [dsl['start']])
|
||
next_nodes = set()
|
||
sql = "select * from node_execution where instance_id=${instance_id}$ and status != 'completed'"
|
||
active_nes = await sor.sqlExe(sql, {
|
||
'instance_id': inst.id
|
||
})
|
||
for ne in active_nes:
|
||
node_id = ne.node_id
|
||
node = dsl['nodes'][node_id]
|
||
node['id'] = node_id
|
||
ntype = node['type']
|
||
status = ne.status
|
||
if status == 'pending':
|
||
if ntype == 'human':
|
||
if ne.assignee is None:
|
||
await self.auto_assign(sor, inst, ne, node)
|
||
elif ntype == 'task':
|
||
await self.try_start_running(sor, inst, ne, node)
|
||
elif ntype == 'subflow':
|
||
await self.try_start_subflow(sor, inst, ne, node)
|
||
elif ntype == 'start':
|
||
await self.node_transfer(sor, inst, ne, node, flow_def)
|
||
await sor.U('node_execution', {
|
||
'id': ne.id,
|
||
'status': 'completed'
|
||
})
|
||
elif ntype == 'end':
|
||
flg = await self.is_ok_to_step_next(sor, node, edges, inst)
|
||
if flg:
|
||
await sor.U('node_execution', {
|
||
'id': ne.id,
|
||
'status': 'completed'
|
||
})
|
||
await sor.U('flow_instance', {
|
||
'id': inst.id,
|
||
'status': 'completed'
|
||
})
|
||
break;
|
||
elif status in ['done', 'cancelled', 'failed']:
|
||
await self.node_transfer(sor, inst, ne, node, flow_def)
|
||
await sor.U('node_execution', {
|
||
'id': ne.id,
|
||
'status': 'completed'
|
||
})
|
||
self.check_instance_completed(sor, inst)
|
||
|
||
|
||
# ------------------------------
|
||
# Human task APIs
|
||
# ------------------------------
|
||
|
||
async def auto_assign(self, sor, inst, ne, node):
|
||
recs = await env.get_org_users(sor, inst.org_id, ne.role)
|
||
if recs:
|
||
i = randint(0, len(recs) - 1)
|
||
await sor.U('node_execution', {
|
||
'id':ne.id,
|
||
'assignee': recs[i].id
|
||
})
|
||
|
||
async def check_instance_completed(self, sor, inst):
|
||
sql = "select * from node_execution where instance_id=${instance_id} and type='end'"
|
||
recs = await sor.sqlExe(sql, {'instance_id', inst.id})
|
||
if recs:
|
||
if recs[0].status == 'completed':
|
||
await sor.U('flow_instance', {
|
||
'id': inst.id,
|
||
'status': 'completed'
|
||
})
|
||
|
||
async def node_transfer(self, sor, dsl, inst, ne, node, flow_def):
|
||
nnode = []
|
||
for edge in dsl.get('edges', []):
|
||
if edge['from'] != ne.node_id:
|
||
continue
|
||
cond = edge.get('when')
|
||
if cond and not eval(cond, {}, {'ctx': ctx}):
|
||
continue
|
||
m_on = edge.get('foreach')
|
||
ns = {
|
||
'm_on': m_on,
|
||
'ctx': ctx,
|
||
'node': node
|
||
}
|
||
on_array = await self.get_multiple_array(m_on)
|
||
for e in on_array:
|
||
nnodes.add((edge['to'], str(e)))
|
||
for node, ctx_ext in nnodes:
|
||
x = await self.is_ok_to_create_new_node_exe(sor, node, edges, inst)
|
||
if x:
|
||
env = ServerEnv()
|
||
id = env.uuid()
|
||
ns = {
|
||
'id': id,
|
||
'type': node['type'],
|
||
'inst_round': inst.round,
|
||
'org_id': inst.org_id,
|
||
'node_id': node['id'],
|
||
'input_ctx': node.get('input_ctx'),
|
||
'output_ctx': node.get('output_ctx'),
|
||
'status': 'pending',
|
||
'ctx': json.dumps(inst.ctx.copy(), ensure_ascii=False),
|
||
'ctx_ext': json.dumps(ctx_ext.copy(), ensure_ascii=False),
|
||
'created_at': timestampstr()
|
||
}
|
||
if node['type'] == 'human':
|
||
ns.update({
|
||
'role': node.get('role')
|
||
})
|
||
elif node['type'] == 'task':
|
||
pass
|
||
elif node['type'] == 'subflow':
|
||
eng = env.template_engine
|
||
ctx = inst.ctx
|
||
if node.get('input_ctx'):
|
||
ctx = eng.renders(node['input_ctx'], ctx)
|
||
|
||
subinst_id = await self.create_instance(org_id, node.flow_def_id, ctx=ctx)
|
||
ns.update({
|
||
'subinst_id': subinst_id
|
||
})
|
||
elif node['type'] == 'end':
|
||
await self.create_end_node_execution(sor, inst)
|
||
continue
|
||
|
||
await sor.C('node_execution', ns)
|
||
|
||
async def is_ok_to_create_new_node_exe(self, sor, node, edges, inst):
|
||
join = node.get('join')
|
||
if join is None:
|
||
return True
|
||
return False
|
||
|
||
async def create_new_node_exe(self, sor, node, edges, inst):
|
||
flg = await self.is_ok_to_create_new_node_exe(sor, node, edges, inst)
|
||
if flg:
|
||
ctx = inst['ctx'].copy,
|
||
ctx_ext = ext_data or {}
|
||
id = env.uuid()
|
||
await sor.C('node_execution', {
|
||
'id': id,
|
||
'org_id': inst['org_id'],
|
||
'instance_id': inst['id'],
|
||
'node_id': node['id'],
|
||
'ctx': json.dumps(ctx, ensure_ascii=False),
|
||
'ctx_ext': json.dumps(ctx_ext, ensure_ascii=False),
|
||
'input_ctx': node['input_ctx'],
|
||
'status': 'pendding'
|
||
})
|
||
return id
|
||
|
||
async def is_ok_to_step_next(self, sor, node, edges, inst):
|
||
join = node.get('join')
|
||
if not join:
|
||
return True
|
||
if join == 'any':
|
||
return True
|
||
backnodes = [e['from'] for e in edges if e['to'] == node['id']]
|
||
sql = """select * from node_execution
|
||
where node_id in ${backnodes}$
|
||
and instance_id = ${instance_id}$"""
|
||
recs = sor.sqlExe(sql, {
|
||
'backnodes': backnodes,
|
||
'instance_id': inst['id']
|
||
})
|
||
if join == 'all':
|
||
for r in recs:
|
||
if r.status in ['pending', 'running']:
|
||
return False
|
||
return True
|
||
if join == 'xor':
|
||
backnodes = [id for id in backnodes if id != node['id']]
|
||
sql = """update from node_execution
|
||
set status = 'cancelled'
|
||
where status in ['pending', 'running']
|
||
and node_id in ${backnodes}$
|
||
and instance_id = ${instance_id}$"""
|
||
await sor.sqlExe(sql, {
|
||
'backnodes': backnodes,
|
||
'instance_id': inst['id']
|
||
})
|
||
return True
|
||
some = extract_some_value(join)
|
||
sql = """select * from node_transfer
|
||
where instance_id=${instance_id}$
|
||
and to_node_id = ${node_id}$"""
|
||
trecs = sor.sqlExe(sql, {
|
||
'node_id': node['id'],
|
||
'instance_id': inst['id']
|
||
})
|
||
if some >= 1 and some <= len(trecs):
|
||
return True
|
||
if some <= float(len(trecs))/float(len(recs)):
|
||
return True
|
||
return False
|
||
|
||
async def start_running(self, sor, node, edges, inst):
|
||
if self.s_ok_to_step_next(sor, node, edges, inst):
|
||
await sor.R('node_execution', {
|
||
'id': node['id'],
|
||
'status': 'running',
|
||
'running_time': timestampstr()
|
||
})
|
||
try:
|
||
out = await self.run_auto_task(node)
|
||
sor.U('node_execution', {
|
||
'id': node['id'],
|
||
'status': 'done',
|
||
'stopping_at': timestampstr(),
|
||
'output_ctx': json.dumps(out, ensure_ascii=False)
|
||
})
|
||
except Exception as e:
|
||
sor.U('node_execution', {
|
||
'id': node['id'],
|
||
'status': 'failed',
|
||
'stopping_at': timestampstr()
|
||
})
|
||
|
||
async def list_human_tasks(self, org_id, user_roles):
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
return await sor.R('human_task', {
|
||
'org_id': org_id,
|
||
'status': 'pending',
|
||
'role': ('in', user_roles)
|
||
})
|
||
|
||
async def complete_human_task(self, org_id, task_id, user_id, output):
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
rows = await sor.R('human_task', {
|
||
'id': task_id,
|
||
'org_id': org_id,
|
||
'status': 'pending'
|
||
})
|
||
if not rows:
|
||
return
|
||
|
||
task = rows[0]
|
||
await sor.U('human_task', {
|
||
'id': task_id,
|
||
'assignee': user_id,
|
||
'status': 'done',
|
||
'output': json.dumps(output),
|
||
'completed_at': env.now()
|
||
})
|
||
|
||
inst = (await sor.R(
|
||
'flow_instance',
|
||
{'id': task['instance_id']}
|
||
))[0]
|
||
|
||
ctx = json.loads(inst['ctx'])
|
||
ctx.update(output)
|
||
|
||
await sor.U('flow_instance', {
|
||
'id': inst['id'],
|
||
'ctx': json.dumps(ctx)
|
||
})
|
||
|
||
def get_engine():
|
||
env = ServerEnv()
|
||
cnt = len(env.flow_engines)
|
||
id = randint(0, cnt-1)
|
||
return env.flow_engines[id]
|
||
|
||
async def list_org_instances(request, flow_def_id=None):
|
||
env = request._run_ns
|
||
orgid = await env.get_userorgid()
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
sql = """select b.name, a.*
|
||
from flow_instance a, flow_definition b
|
||
where a.flow_def_id = b.id
|
||
and a.status='running'
|
||
and a.org_id=${orgid}$"""
|
||
if flow_def_id:
|
||
sql += ' and a.flow_def_id = ${flow_def_id}$'
|
||
recs = await sor.sqlExe(sql, {
|
||
'orgid': orgid,
|
||
'flow_def_id': flow_def_id
|
||
})
|
||
return recs
|
||
return []
|
||
|
||
async def get_my_flow_works(request, inst_id=None):
|
||
env = request._run_ns
|
||
orgid = await env.get_userorgid()
|
||
userid = await env.get_user()
|
||
myroles = await env.get_user_roles(userid)
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
sql = """select *
|
||
from node_execute
|
||
where a.type = 'human'
|
||
and status in ['pending', 'running']
|
||
and (assignee = ${userid}$ or (assignee is NULL and role in ${myroles}$))"""
|
||
if inst_id:
|
||
sql += ' and instance_id = ${inst_id}$'
|
||
recs = await sor.sqlExe(sql, {
|
||
'userid': userid,
|
||
'myroles': myroles,
|
||
'inst_id': inst_id
|
||
})
|
||
return recs
|
||
return []
|
||
|
||
async def get_exists_workflows(request):
|
||
env = request._run_ns
|
||
orgid = await env.get_userorgid()
|
||
orgids = ['0', orgid]
|
||
sql = "select * from flow_definition where org_id in ${orgids}$"
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
recs = await sor.sqlExe(sql, {'orgids': orgids})
|
||
return recs
|
||
return []
|
||
|
||
async def new_instance(request, flow_def_id, params_kw):
|
||
env = request._run_ns
|
||
orgid = await env.get_userorgid()
|
||
engine = get_engine()
|
||
fiid = await engine.create_instance(orgid,
|
||
flow_def_id, ctx=params_kw)
|
||
return fiid
|
||
|
||
async def add_new_workflow(request, params_kw={}):
|
||
name = params_kw.name
|
||
dsl = params_kw.dsl
|
||
env = request._run_ns
|
||
org_id = await env.get_userorgid()
|
||
ctxfields = params_kw.ctxfields or []
|
||
description = params_kw.description
|
||
engine = get_engine()
|
||
id = await engine.create_definition(org_id, name, description, '1', dsl, ctxfields)
|
||
return {
|
||
'status': 'SUCCEEDED',
|
||
'data': {
|
||
'dagflow_id': id
|
||
}
|
||
}
|
||
|
||
|
||
async def get_org_flow_definition(request):
|
||
env = request._run_ns
|
||
orgid = await env.get_userorgid()
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
sql = """select * from flow_definition where org_id in ${orgids}$"""
|
||
recs = await sor.sqlExe(sql, {'orgids': ['0', orgid]})
|
||
return recs
|
||
return []
|