743 lines
21 KiB
Python
743 lines
21 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
Workflow Engine v1.3 (Enterprise Edition)
|
||
========================================
|
||
|
||
Features:
|
||
- org_id multi-tenant isolation
|
||
- DAG workflow with persistence (sqlor)
|
||
- Subflow support
|
||
- Human task node
|
||
- RBAC-based task assignment
|
||
- Query APIs for UI
|
||
"""
|
||
import asyncio
|
||
from functools import partial
|
||
import yaml
|
||
from random import randint
|
||
from appPublic.log import debug, exception, error, info
|
||
from appPublic.timeUtils import timestampstr
|
||
from appPublic.dictObject import DictObject
|
||
from sqlor.dbpools import get_sor_context
|
||
from asteval import Interpreter
|
||
|
||
# 1. 创建一个全局的 Interpreter 实例(复用以提高性能)
|
||
aeval = Interpreter()
|
||
|
||
# 2. 在你的 node_transfer 或条件判断逻辑中
|
||
def safe_eval_condition(cond_str, ctx_dict):
|
||
"""
|
||
安全地评估条件表达式
|
||
Args:
|
||
cond_str: 表达式字符串,例如 "ctx['age'] > 18 and ctx['score'] >= 60"
|
||
ctx_dict: 上下文数据字典
|
||
Returns: None 错误或的确是None
|
||
其他: 表达式结果
|
||
"""
|
||
try:
|
||
# 将 ctx 注入到 asteval 的符号表中
|
||
aeval.symtable['ctx'] = ctx_dict
|
||
|
||
# 执行表达式
|
||
# mode='eval' 表示只允许单个表达式,不允许语句(如赋值、import)
|
||
result = aeval(cond_str, mode='eval')
|
||
|
||
# 检查是否有语法错误或运行时错误
|
||
if aeval.error:
|
||
print(f"asteval Error: {aeval.error}")
|
||
return False
|
||
|
||
return bool(result)
|
||
|
||
except Exception as e:
|
||
# asteval 通常很安全,但以防万一
|
||
print(f"Eval Exception: {e}")
|
||
return False
|
||
|
||
# ---------------------------------------------------------------------
|
||
# Table definitions
|
||
# ---------------------------------------------------------------------
|
||
|
||
FLOW_DEFINITION_TABLE = {
|
||
"summary": [{"name": "flow_definition", "primary": "id"}],
|
||
"fields": [
|
||
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
|
||
{"name": "org_id", "type": "str", "length": 32},
|
||
{"name": "name", "type": "str", "length": 128},
|
||
{"name": "version", "type": "str", "length": 32},
|
||
{"name": "dsl", "type": "text"},
|
||
{"name": "created_at", "type": "timestamp"}
|
||
]
|
||
}
|
||
|
||
FLOW_INSTANCE_TABLE = {
|
||
"summary": [{"name": "flow_instance", "primary": "id"}],
|
||
"fields": [
|
||
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
|
||
{"name": "org_id", "type": "str", "length": 32, "nullable": "no"},
|
||
{"name": "flow_def_id", "type": "str", "length": 32},
|
||
{"name": "status", "type": "str", "length": 16},
|
||
{"name": "ctx", "type": "text"},
|
||
{"name": "active_nodes", "type": "text"},
|
||
{"name": "created_at", "type": "timestamp"}
|
||
]
|
||
}
|
||
|
||
NODE_EXECUTION_TABLE = {
|
||
"summary": [{"name": "node_execution", "primary": "id"}],
|
||
"fields": [
|
||
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
|
||
{"name": "type", "type": "str", "length": 32},
|
||
{"name": "org_id", "type": "str", "length": 32},
|
||
{"name": "instance_id", "type": "str", "length": 32},
|
||
{"name": "node_id", "type": "str", "length": 64},
|
||
{"name": "input_ctx", "type": "text"},
|
||
{"name": "output_ctx", "type": "text"},
|
||
{"name": "ctx", "type": "text"},
|
||
{"name": "ctx_ext", "type": "text"},
|
||
{"name": "status", "type": "str", "length": 16},
|
||
{"name": "error", "type": "text"},
|
||
{"name": "input", "type": "text"},
|
||
{"name": "output", "type": "text"},
|
||
{"name": "subinst_id", "type": "str", "length": 32},
|
||
{"name": "role", "type": "str", "length": 64},
|
||
{"name": "assignee", "type": "str", "length": 64},
|
||
{"name": "running_at", "type": "timestamp"},
|
||
{"name": "stopping_at", "type": "timestamp"},
|
||
{"name": "created_at", "type": "timestamp"}
|
||
]
|
||
}
|
||
|
||
SUBFLOW_INSTANCE_TABLE = {
|
||
"summary": [{"name": "subflow_instance", "primary": "id"}],
|
||
"fields": [
|
||
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
|
||
{"name": "org_id", "type": "str", "length": 32},
|
||
{"name": "instance_id", "type": "str", "length": 32},
|
||
{"name": "node_id", "type": "str", "length": 64},
|
||
{"name": "child_instance_id", "type": "str", "length": 32},
|
||
{"name": "status", "type": "str", "length": 16},
|
||
{"name": "created_at", "type": "timestamp"}
|
||
]
|
||
}
|
||
|
||
HUMAN_TASK_TABLE = {
|
||
"summary": [{"name": "human_task", "primary": "id"}],
|
||
"fields": [
|
||
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
|
||
{"name": "org_id", "type": "str", "length": 32},
|
||
{"name": "instance_id", "type": "str", "length": 32},
|
||
{"name": "node_id", "type": "str", "length": 64},
|
||
{"name": "role", "type": "str", "length": 64},
|
||
{"name": "assignee", "type": "str", "length": 64},
|
||
{"name": "status", "type": "str", "length": 16},
|
||
{"name": "input", "type": "text"},
|
||
{"name": "output", "type": "text"},
|
||
{"name": "timeout_at", "type": "timestamp"},
|
||
{"name": "created_at", "type": "timestamp"},
|
||
{"name": "completed_at", "type": "timestamp"}
|
||
]
|
||
}
|
||
|
||
# ---------------------------------------------------------------------
|
||
# Engine
|
||
# ---------------------------------------------------------------------
|
||
|
||
import yaml
|
||
import json
|
||
from ahserver.serverenv import ServerEnv
|
||
import re
|
||
|
||
def extract_some_value(s):
|
||
"""
|
||
从 'some(100)' 或 'some(50%)' 中提取数值。
|
||
Args:
|
||
s (str): 输入的字符串
|
||
|
||
Returns: float类型值, 小于1的小数点数字(%模式)或其他
|
||
"""
|
||
# 正则解释: 匹配 some( 后跟数字,然后是可选的 % 符号,最后是 )
|
||
match = re.search(r'some\((\d+)(%)?\)', s)
|
||
if match:
|
||
value = float(match.group(1)) # 提取数字部分并转为整数
|
||
unit = match.group(2) # 提取单位部分 (%)
|
||
if unit == '%':
|
||
return value / 100.0
|
||
return value
|
||
else:
|
||
return None
|
||
|
||
class FlowEngine:
|
||
|
||
# ------------------------------
|
||
# Definition / Instance
|
||
# ------------------------------
|
||
def __init__(self, backid):
|
||
self.aio = []
|
||
self.backid = backid
|
||
|
||
async def bgtask(self):
|
||
env = ServerEnv()
|
||
self.aio = []
|
||
cnt = 0
|
||
while True:
|
||
aio = []
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
sql = """select * from flow_instance
|
||
where backid=${backid}$
|
||
and status = 'running'"""
|
||
aio = await sor.sqlExe(sql,{
|
||
'backid': self.backid
|
||
})
|
||
debug(f'aio={aio}')
|
||
for r in aio:
|
||
await self.step(r)
|
||
await asyncio.sleep(1)
|
||
|
||
async def create_definition(self, org_id, name, description, version, dsl_text, ctxfields):
|
||
env = ServerEnv()
|
||
data = yaml.safe_load(dsl_text)
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
fid = data['id']
|
||
await sor.C('flow_definition', {
|
||
'id': fid,
|
||
'org_id': org_id,
|
||
'name': name,
|
||
'description': description,
|
||
'version': version,
|
||
'ctxfields': json.dumps(ctxfields),
|
||
'created_at': timestampstr(),
|
||
'dsl': dsl_text
|
||
})
|
||
return fid
|
||
|
||
async def create_instance(self, org_id, flow_def_id, ctx=None):
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
iid = env.uuid()
|
||
ns = {
|
||
'id': iid,
|
||
'backid': self.backid,
|
||
'round': 1,
|
||
'org_id': org_id,
|
||
'flow_def_id': flow_def_id,
|
||
'status': 'running',
|
||
'ctx': json.dumps(ctx or {}),
|
||
'created_at': timestampstr()
|
||
}
|
||
await sor.C('flow_instance', ns.copy())
|
||
await self.create_start_node_execution(sor, ns)
|
||
return iid
|
||
|
||
async def create_start_node_execution(self, sor, inst):
|
||
env = ServerEnv()
|
||
id = env.uuid()
|
||
await sor.C('node_execution', {
|
||
'id': id,
|
||
'type': 'start',
|
||
'instance_id': inst['id'],
|
||
'org_id': inst['org_id'],
|
||
'inst_round': inst['round'],
|
||
'status': 'done',
|
||
'node_id': 'start',
|
||
'created_at': timestampstr()
|
||
})
|
||
async def create_end_node_execution(self, sor, inst):
|
||
env = ServerEnv()
|
||
id = env.uuid()
|
||
await sor.C('node_execution', {
|
||
'id': id,
|
||
'type': 'end',
|
||
'instance_id': inst['id'],
|
||
'org_id': inst['org_id'],
|
||
'inst_round': inst['round'],
|
||
'status': 'pending',
|
||
'node_id': 'end',
|
||
'created_at': timestampstr()
|
||
})
|
||
# ------------------------------
|
||
# Execution
|
||
# ------------------------------
|
||
|
||
async def step(self, inst):
|
||
"""
|
||
从节点执行表中取出所有状态不是结束的执行节点,检查是否可以推进一步
|
||
结束状态为:
|
||
* succeeded
|
||
* failed
|
||
* cancelled
|
||
推进:如果
|
||
"""
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
flow_def = (await sor.R(
|
||
'flow_definition',
|
||
{'id': inst['flow_def_id']}
|
||
))[0]
|
||
|
||
dsl = yaml.safe_load(flow_def['dsl'])
|
||
edges = dsl.get('edges',[])
|
||
ctx = json.loads(inst['ctx'])
|
||
sql = "select * from node_execution where instance_id=${instance_id}$ and status != 'completed'"
|
||
active_nes = await sor.sqlExe(sql, {
|
||
'instance_id': inst.id
|
||
})
|
||
debug(f'活动节点:{active_nes}')
|
||
for ne in active_nes:
|
||
node_id = ne.node_id
|
||
node = dsl['nodes'][node_id]
|
||
node['id'] = node_id
|
||
ntype = node['type']
|
||
status = ne.status
|
||
if ntype == 'start':
|
||
await sor.U('node_execution', {
|
||
'id': ne.id,
|
||
'status': 'completed'
|
||
})
|
||
await self.node_transfer(sor, dsl, inst, ne, node, flow_def)
|
||
elif ntype == 'end':
|
||
flg = await self.is_ok_to_step_next(sor, node, edges, inst)
|
||
if flg:
|
||
await sor.U('node_execution', {
|
||
'id': ne.id,
|
||
'status': 'completed'
|
||
})
|
||
await sor.U('flow_instance', {
|
||
'id': inst.id,
|
||
'status': 'completed'
|
||
})
|
||
break;
|
||
else:
|
||
if status == 'pending':
|
||
if ntype == 'human':
|
||
if ne.assignee is None:
|
||
await self.auto_assign(sor, inst, ne, node)
|
||
elif ntype == 'task':
|
||
debug(f'{ne.node_id}是pending状态的自动任务节点')
|
||
flg = await self.is_ok_to_step_next(sor, node, edges, inst)
|
||
if flg:
|
||
debug(f'{ne.node_id}可以进入下一步')
|
||
await self.start_running(sor, inst, ne)
|
||
else:
|
||
debug(f'{ne.id=}, {node=}, {edges=}不能走下一步')
|
||
elif ntype == 'subflow':
|
||
flg = await self.is_ok_to_step_next(sor, node, edges, inst)
|
||
if flg:
|
||
await self.start_subflow(sor, inst, ne, node)
|
||
else:
|
||
debug(f'{ne.id=}, {node=}, {edges=}不能走下一步')
|
||
elif status in ['done', 'cancelled']:
|
||
debug(f'节点完成,检查后续节点')
|
||
await self.node_transfer(sor, dsl, inst, ne, node, flow_def)
|
||
await sor.U('node_execution', {
|
||
'id': ne.id,
|
||
'status': 'completed'
|
||
})
|
||
elif status == 'failed':
|
||
debug(f'节点执行失败,需要运维人员参与')
|
||
debug(f'{ne.node_id}处理完成')
|
||
|
||
|
||
def build_node_input_ctx(self, ne):
|
||
ctx = json.loads(ne.ctx)
|
||
if ne.ctx_ext:
|
||
ctx_ext = json.loads(ns.ctx_ext)
|
||
return f'{ctx}\n附加内容:\n{ctx_ext}'
|
||
return ctx
|
||
|
||
async def start_subflow(self, sor, inst, ne, node):
|
||
subflow_id = node.get('subflow_id')
|
||
if subflow_id is None:
|
||
debug(f'{node=} 没有subflow_id属性')
|
||
return
|
||
sub_ctx = self.build_node_input_ctx(ne)
|
||
ne.subinst_id = await self.create_instance(
|
||
ne.org_id,
|
||
subflow_id, ctx=sub_ctx)
|
||
ne.status = 'running'
|
||
ne.running_at = timestampstr()
|
||
await sor.U('node_execution', ne.copy())
|
||
|
||
async def autorun_task(self, ne_id, params):
|
||
env = ServerEnv()
|
||
try:
|
||
data = await env.run_skillagent(params)
|
||
debug(f'任务完成,返回数据={data}')
|
||
self.self.task_success_callback(ne_id, data)
|
||
except Exception as e:
|
||
message = f'任务失败错误信息{e}'
|
||
self.self.task_error_callback(ne_id, message)
|
||
|
||
async def run_auto_task(self, ne):
|
||
env = ServerEnv()
|
||
users = await env.get_org_users(ne.org_id)
|
||
if not users:
|
||
debug(f'{ne.org_id=} 没有用户')
|
||
return
|
||
sub_ctx = self.build_node_input_ctx(ne)
|
||
try:
|
||
params = DictObject(**{
|
||
"prompt": sub_ctx,
|
||
"callerid": users[0].id,
|
||
"callerorgid": users[0].orgid
|
||
})
|
||
asyncio.create_task(self.autorun_task(ne.id, params))
|
||
except Exception as e:
|
||
await self.task_error_callback(ne.id, str(e))
|
||
# ------------------------------
|
||
# Human task APIs
|
||
# ------------------------------
|
||
async def auto_assign(self, sor, inst, ne, node):
|
||
recs = await env.get_org_users(sor, inst.org_id, ne.role)
|
||
if recs:
|
||
i = randint(0, len(recs) - 1)
|
||
await sor.U('node_execution', {
|
||
'id':ne.id,
|
||
'assignee': recs[i].id
|
||
})
|
||
|
||
async def instance_completed(self, sor, inst):
|
||
await sor.U('flow_instance', {
|
||
'id': inst.id,
|
||
'status': 'completed'
|
||
})
|
||
|
||
async def node_transfer(self, sor, dsl, inst, ne, node, flow_def):
|
||
nnodes = []
|
||
ctx = json.loads(inst.ctx)
|
||
debug(f'{dsl=}, {inst=}, {ne=}, {node=}')
|
||
for edge in dsl.get('edges', []):
|
||
if edge['from'] != ne.node_id:
|
||
continue
|
||
cond = edge.get('when')
|
||
if cond and not safe_eval_condition(cond, {'ctx': ctx}):
|
||
debug(f'{cond=}, {ctx=},不匹配分支,{edge=}')
|
||
continue
|
||
m_on = edge.get('foreach')
|
||
if m_on:
|
||
on_array = safe_eval_condition(m_on, ctx)
|
||
for e in on_array:
|
||
nnodes.append((edge['to'], str(e), edge.copy()))
|
||
else:
|
||
nnodes.append((edge['to'], None, edge.copy()))
|
||
|
||
if len(nnodes) == 0:
|
||
debug(f'{ne=}, {edge}, 没有找到下一个节点')
|
||
return
|
||
debug(f'{nnodes=}, 找到的下节点。。。。。')
|
||
for node_id, ctx_ext, edge in nnodes:
|
||
node = dsl['nodes'][node_id]
|
||
if isinstance(ctx_ext, dict) or isinstance(ctx_ext, list):
|
||
ctx_ext = json.dumps(ctx_ext, ensure_ascii=False)
|
||
x = await self.is_ok_to_create_new_node_exe(sor, node, edge, inst)
|
||
if x:
|
||
env = ServerEnv()
|
||
id = env.uuid()
|
||
ns = {
|
||
'id': id,
|
||
'type': node['type'],
|
||
'inst_round': inst.round,
|
||
'instance_id': inst.id,
|
||
'org_id': inst.org_id,
|
||
'node_id': node_id,
|
||
'input_ctx': node.get('input_ctx'),
|
||
'output_ctx': node.get('output_ctx'),
|
||
'status': 'pending',
|
||
'ctx': inst.ctx,
|
||
'ctx_ext': ctx_ext,
|
||
'created_at': timestampstr()
|
||
}
|
||
if node['type'] == 'human':
|
||
ns.update({
|
||
'role': node.get('role')
|
||
})
|
||
elif node['type'] == 'task':
|
||
pass
|
||
elif node['type'] == 'subflow':
|
||
eng = env.template_engine
|
||
ctx = inst.ctx
|
||
if node.get('input_ctx'):
|
||
ctx = eng.renders(node['input_ctx'], ctx)
|
||
|
||
subinst_id = await self.create_instance(org_id, node.flow_def_id, ctx=ctx)
|
||
ns.update({
|
||
'subinst_id': subinst_id
|
||
})
|
||
elif node['type'] == 'end':
|
||
await self.create_end_node_execution(sor, inst)
|
||
debug('f{node=}是终点节点,直接创建结束节点')
|
||
continue
|
||
|
||
await sor.C('node_execution', ns)
|
||
debug(f'{ns=} 节点创建完成')
|
||
else:
|
||
debug(f'{node=}, {edge=}, {inst=}is_ok_to_create_new_node_exe() 返回了{x}')
|
||
|
||
async def is_ok_to_create_new_node_exe(self, sor, node, edges, inst):
|
||
join = node.get('join')
|
||
if join is None:
|
||
return True
|
||
return False
|
||
|
||
async def create_new_node_exe(self, sor, node, edges, inst):
|
||
flg = await self.is_ok_to_create_new_node_exe(sor, node, edges, inst)
|
||
if flg:
|
||
ctx = inst['ctx'].copy,
|
||
ctx_ext = ext_data or {}
|
||
id = env.uuid()
|
||
await sor.C('node_execution', {
|
||
'id': id,
|
||
'org_id': inst['org_id'],
|
||
'instance_id': inst['id'],
|
||
'node_id': node['id'],
|
||
'ctx': json.dumps(ctx, ensure_ascii=False),
|
||
'ctx_ext': json.dumps(ctx_ext, ensure_ascii=False),
|
||
'input_ctx': node['input_ctx'],
|
||
'status': 'pendding'
|
||
})
|
||
return id
|
||
|
||
async def is_ok_to_step_next(self, sor, node, edges, inst):
|
||
join = node.get('join')
|
||
if not join:
|
||
return True
|
||
if join == 'any':
|
||
return True
|
||
backnodes = [e['from'] for e in edges if e['to'] == node['id']]
|
||
sql = """select * from node_execution
|
||
where node_id in ${backnodes}$
|
||
and instance_id = ${instance_id}$"""
|
||
recs = await sor.sqlExe(sql, {
|
||
'backnodes': backnodes,
|
||
'instance_id': inst['id']
|
||
})
|
||
if join == 'all':
|
||
for r in recs:
|
||
if r.status != 'completed':
|
||
debug(f'前向任务未结束{r}')
|
||
return False
|
||
return True
|
||
if join == 'xor':
|
||
backnodes = [id for id in backnodes if id != node['id']]
|
||
sql = """update from node_execution
|
||
set status = 'cancelled'
|
||
where status in ['pending', 'running']
|
||
and node_id in ${backnodes}$
|
||
and instance_id = ${instance_id}$"""
|
||
await sor.sqlExe(sql, {
|
||
'backnodes': backnodes,
|
||
'instance_id': inst['id']
|
||
})
|
||
return True
|
||
some = extract_some_value(join)
|
||
sql = """select * from node_transfer
|
||
where instance_id=${instance_id}$
|
||
and to_node_id = ${node_id}$"""
|
||
trecs = await sor.sqlExe(sql, {
|
||
'node_id': node['id'],
|
||
'instance_id': inst['id']
|
||
})
|
||
if some >= 1 and some <= len(trecs):
|
||
return True
|
||
if some <= float(len(trecs))/float(len(recs)):
|
||
return True
|
||
debug(f'部分成功模式:{some=},已完成:{len(trecs)=},总数:{len(recs)=}')
|
||
return False
|
||
|
||
async def task_error_callback(self, ne_id, errmsg):
|
||
debug(f'task_error_callback():{ne_id=}, {errmsg=}')
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
recs = await sor.R('node_execution', {'id': ne_id})
|
||
if not recs:
|
||
debug(f'{ne_id=} 没找到node_execution记录')
|
||
return
|
||
ne = recs[0]
|
||
ne.status = 'failed',
|
||
ne.output = errmsg
|
||
await sor.U('node_execution', ne)
|
||
|
||
async def task_success_callback(self, ne_id, result):
|
||
debug(f'task_error_callback():{ne_id=}, {result=}')
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
recs = await sor.R('node_execution', {'id': ne_id})
|
||
if not recs:
|
||
debug(f'{ne_id=} 没找到node_execution记录')
|
||
return
|
||
ne = recs[0]
|
||
inst_id = ne.instance_id
|
||
insts = await sor.R('flow_instance', {'id': inst_id})
|
||
if insts:
|
||
debug(f'{inst_id=} 没找到flow_instance记录')
|
||
return
|
||
inst = insts[0]
|
||
ne.output = result if result is None else \
|
||
result if isinstance(result, str) else \
|
||
json.dumps(result, ensure_ascii=False)
|
||
|
||
ne.status = 'done'
|
||
if result:
|
||
data = {
|
||
'output': result
|
||
}
|
||
tmpl = env.template_engine.renders(ne.output_ctx, data)
|
||
ne.output = tmpl
|
||
d = json.loads(tmpl)
|
||
jctx = json.loads(inst.ctx)
|
||
jctx.update(d)
|
||
inst.ctx = json.dumps(ctx, ensure_ascii=False)
|
||
await sor.U('flow_instance', inst)
|
||
else:
|
||
ne.output = None
|
||
await sor.U('node_execution', ne)
|
||
|
||
async def start_running(self, sor, inst, ne):
|
||
debug(f'{ne.node_id}节点运行')
|
||
await sor.R('node_execution', {
|
||
'id': ne['id'],
|
||
'status': 'running',
|
||
'running_time': timestampstr()
|
||
})
|
||
try:
|
||
out = await self.run_auto_task(ne)
|
||
except Exception as e:
|
||
await sor.U('node_execution', {
|
||
'id': ne['id'],
|
||
'status': 'failed',
|
||
'output': str(e),
|
||
'stopping_at': timestampstr()
|
||
})
|
||
exception(f'执行失败:{e}')
|
||
|
||
async def list_human_tasks(self, org_id, user_roles):
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
return await sor.R('human_task', {
|
||
'org_id': org_id,
|
||
'status': 'pending',
|
||
'role': ('in', user_roles)
|
||
})
|
||
|
||
async def complete_human_task(self, org_id, task_id, user_id, output):
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
rows = await sor.R('human_task', {
|
||
'id': task_id,
|
||
'org_id': org_id,
|
||
'status': 'pending'
|
||
})
|
||
if not rows:
|
||
return
|
||
|
||
task = rows[0]
|
||
await sor.U('human_task', {
|
||
'id': task_id,
|
||
'assignee': user_id,
|
||
'status': 'done',
|
||
'output': json.dumps(output),
|
||
'completed_at': env.now()
|
||
})
|
||
|
||
inst = (await sor.R(
|
||
'flow_instance',
|
||
{'id': task['instance_id']}
|
||
))[0]
|
||
|
||
ctx = json.loads(inst['ctx'])
|
||
ctx.update(output)
|
||
|
||
await sor.U('flow_instance', {
|
||
'id': inst['id'],
|
||
'ctx': json.dumps(ctx)
|
||
})
|
||
|
||
def get_engine():
|
||
env = ServerEnv()
|
||
cnt = len(env.flow_engines)
|
||
id = randint(0, cnt-1)
|
||
return env.flow_engines[id]
|
||
|
||
async def list_org_instances(request, flow_def_id=None):
|
||
env = request._run_ns
|
||
orgid = await env.get_userorgid()
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
sql = """select b.name, a.*
|
||
from flow_instance a, flow_definition b
|
||
where a.flow_def_id = b.id
|
||
and a.status='running'
|
||
and a.org_id=${orgid}$"""
|
||
if flow_def_id:
|
||
sql += ' and a.flow_def_id = ${flow_def_id}$'
|
||
recs = await sor.sqlExe(sql, {
|
||
'orgid': orgid,
|
||
'flow_def_id': flow_def_id
|
||
})
|
||
return recs
|
||
return []
|
||
|
||
async def get_my_flow_works(request, inst_id=None):
|
||
env = request._run_ns
|
||
orgid = await env.get_userorgid()
|
||
userid = await env.get_user()
|
||
myroles = await env.get_user_roles(userid)
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
sql = """select *
|
||
from node_execute
|
||
where type = 'human'
|
||
and status in ['pending', 'running']
|
||
and (assignee = ${userid}$ or (assignee is NULL and role in ${myroles}$))"""
|
||
if inst_id:
|
||
sql += ' and instance_id = ${inst_id}$'
|
||
recs = await sor.sqlExe(sql, {
|
||
'userid': userid,
|
||
'myroles': myroles,
|
||
'inst_id': inst_id
|
||
})
|
||
return recs
|
||
return []
|
||
|
||
async def get_exists_workflows(request):
|
||
env = request._run_ns
|
||
orgid = await env.get_userorgid()
|
||
orgids = ['0', orgid]
|
||
sql = "select * from flow_definition where org_id in ${orgids}$"
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
recs = await sor.sqlExe(sql, {'orgids': orgids})
|
||
return recs
|
||
return []
|
||
|
||
async def new_instance(request, flow_def_id, params_kw):
|
||
env = request._run_ns
|
||
orgid = await env.get_userorgid()
|
||
engine = get_engine()
|
||
fiid = await engine.create_instance(orgid,
|
||
flow_def_id, ctx=params_kw)
|
||
return fiid
|
||
|
||
async def add_new_workflow(request, params_kw={}):
|
||
name = params_kw.name
|
||
dsl = params_kw.dsl
|
||
env = request._run_ns
|
||
org_id = await env.get_userorgid()
|
||
ctxfields = params_kw.ctxfields or []
|
||
description = params_kw.description
|
||
engine = get_engine()
|
||
id = await engine.create_definition(org_id, name, description, '1', dsl, ctxfields)
|
||
return {
|
||
'status': 'SUCCEEDED',
|
||
'data': {
|
||
'dagflow_id': id
|
||
}
|
||
}
|
||
|
||
|
||
async def get_org_flow_definition(request):
|
||
env = request._run_ns
|
||
orgid = await env.get_userorgid()
|
||
async with get_sor_context(env, 'dagflow') as sor:
|
||
sql = """select * from flow_definition where org_id in ${orgids}$"""
|
||
recs = await sor.sqlExe(sql, {'orgids': ['0', orgid]})
|
||
return recs
|
||
return []
|