This commit is contained in:
yumoqing 2026-02-11 18:13:35 +08:00
parent 8a9d682c13
commit 975f14ed30

View File

@ -17,74 +17,74 @@ Features:
# --------------------------------------------------------------------- # ---------------------------------------------------------------------
FLOW_DEFINITION_TABLE = { FLOW_DEFINITION_TABLE = {
"summary": [{"name": "flow_definition", "primary": "id"}], "summary": [{"name": "flow_definition", "primary": "id"}],
"fields": [ "fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"}, {"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "type": "str", "length": 32}, {"name": "org_id", "type": "str", "length": 32},
{"name": "name", "type": "str", "length": 128}, {"name": "name", "type": "str", "length": 128},
{"name": "version", "type": "str", "length": 32}, {"name": "version", "type": "str", "length": 32},
{"name": "dsl", "type": "text"}, {"name": "dsl", "type": "text"},
{"name": "created_at", "type": "timestamp"} {"name": "created_at", "type": "timestamp"}
] ]
} }
FLOW_INSTANCE_TABLE = { FLOW_INSTANCE_TABLE = {
"summary": [{"name": "flow_instance", "primary": "id"}], "summary": [{"name": "flow_instance", "primary": "id"}],
"fields": [ "fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"}, {"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "type": "str", "length": 32, "nullable": "no"}, {"name": "org_id", "type": "str", "length": 32, "nullable": "no"},
{"name": "flow_def_id", "type": "str", "length": 32}, {"name": "flow_def_id", "type": "str", "length": 32},
{"name": "status", "type": "str", "length": 16}, {"name": "status", "type": "str", "length": 16},
{"name": "ctx", "type": "text"}, {"name": "ctx", "type": "text"},
{"name": "active_nodes", "type": "text"}, {"name": "active_nodes", "type": "text"},
{"name": "created_at", "type": "timestamp"} {"name": "created_at", "type": "timestamp"}
] ]
} }
NODE_EXECUTION_TABLE = { NODE_EXECUTION_TABLE = {
"summary": [{"name": "node_execution", "primary": "id"}], "summary": [{"name": "node_execution", "primary": "id"}],
"fields": [ "fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"}, {"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "type": "str", "length": 32}, {"name": "org_id", "type": "str", "length": 32},
{"name": "instance_id", "type": "str", "length": 32}, {"name": "instance_id", "type": "str", "length": 32},
{"name": "node_id", "type": "str", "length": 64}, {"name": "node_id", "type": "str", "length": 64},
{"name": "input_ctx", "type": "text"}, {"name": "input_ctx", "type": "text"},
{"name": "output_ctx", "type": "text"}, {"name": "output_ctx", "type": "text"},
{"name": "status", "type": "str", "length": 16}, {"name": "status", "type": "str", "length": 16},
{"name": "error", "type": "text"}, {"name": "error", "type": "text"},
{"name": "created_at", "type": "timestamp"} {"name": "created_at", "type": "timestamp"}
] ]
} }
SUBFLOW_INSTANCE_TABLE = { SUBFLOW_INSTANCE_TABLE = {
"summary": [{"name": "subflow_instance", "primary": "id"}], "summary": [{"name": "subflow_instance", "primary": "id"}],
"fields": [ "fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"}, {"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "type": "str", "length": 32}, {"name": "org_id", "type": "str", "length": 32},
{"name": "parent_instance_id", "type": "str", "length": 32}, {"name": "parent_instance_id", "type": "str", "length": 32},
{"name": "parent_node_id", "type": "str", "length": 64}, {"name": "parent_node_id", "type": "str", "length": 64},
{"name": "child_instance_id", "type": "str", "length": 32}, {"name": "child_instance_id", "type": "str", "length": 32},
{"name": "status", "type": "str", "length": 16}, {"name": "status", "type": "str", "length": 16},
{"name": "created_at", "type": "timestamp"} {"name": "created_at", "type": "timestamp"}
] ]
} }
HUMAN_TASK_TABLE = { HUMAN_TASK_TABLE = {
"summary": [{"name": "human_task", "primary": "id"}], "summary": [{"name": "human_task", "primary": "id"}],
"fields": [ "fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"}, {"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "type": "str", "length": 32}, {"name": "org_id", "type": "str", "length": 32},
{"name": "instance_id", "type": "str", "length": 32}, {"name": "instance_id", "type": "str", "length": 32},
{"name": "node_id", "type": "str", "length": 64}, {"name": "node_id", "type": "str", "length": 64},
{"name": "role", "type": "str", "length": 64}, {"name": "role", "type": "str", "length": 64},
{"name": "assignee", "type": "str", "length": 64}, {"name": "assignee", "type": "str", "length": 64},
{"name": "status", "type": "str", "length": 16}, {"name": "status", "type": "str", "length": 16},
{"name": "input", "type": "text"}, {"name": "input", "type": "text"},
{"name": "output", "type": "text"}, {"name": "output", "type": "text"},
{"name": "timeout_at", "type": "timestamp"}, {"name": "timeout_at", "type": "timestamp"},
{"name": "created_at", "type": "timestamp"}, {"name": "created_at", "type": "timestamp"},
{"name": "completed_at", "type": "timestamp"} {"name": "completed_at", "type": "timestamp"}
] ]
} }
# --------------------------------------------------------------------- # ---------------------------------------------------------------------
@ -98,176 +98,176 @@ from ahserver.serverenv import ServerEnv
class FlowEngine: class FlowEngine:
# ------------------------------ # ------------------------------
# Definition / Instance # Definition / Instance
# ------------------------------ # ------------------------------
def __init__(self): def __init__(self):
self.aio = [] self.aio = []
async def bgtask(self, app): async def bgtask(self, app):
env = ServerEnv() env = ServerEnv()
self.aio = [] self.aio = []
async with (env, 'workflow') as sor: async with (env, 'workflow') as sor:
fid = env.uuid() fid = env.uuid()
self.aio = await sor.R('flow_instance', {'status': 'running'}) self.aio = await sor.R('flow_instance', {'status': 'running'})
while True: while True:
for r in self.aio: for r in self.aio:
self.step(r.org_id, r.id) self.step(r.org_id, r.id)
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
async def create_definition(self, org_id, name, version, dsl_text): async def create_definition(self, org_id, name, version, dsl_text):
env = ServerEnv() env = ServerEnv()
async with (env, 'workflow') as sor: async with (env, 'workflow') as sor:
fid = env.uuid() fid = env.uuid()
await sor.C('flow_definition', { await sor.C('flow_definition', {
'id': fid, 'id': fid,
'org_id': org_id, 'org_id': org_id,
'name': name, 'name': name,
'version': version, 'version': version,
'dsl': dsl_text 'dsl': dsl_text
}) })
return fid return fid
async def create_instance(self, org_id, flow_def_id, ctx=None): async def create_instance(self, org_id, flow_def_id, ctx=None):
env = ServerEnv() env = ServerEnv()
async with (env, 'workflow') as sor: async with (env, 'workflow') as sor:
iid = env.uuid() iid = env.uuid()
await sor.C('flow_instance', { await sor.C('flow_instance', {
'id': iid, 'id': iid,
'org_id': org_id, 'org_id': org_id,
'flow_def_id': flow_def_id, 'flow_def_id': flow_def_id,
'status': 'running', 'status': 'running',
'ctx': json.dumps(ctx or {}), 'ctx': json.dumps(ctx or {}),
'active_nodes': json.dumps([]) 'active_nodes': json.dumps([])
}) })
return iid return iid
# ------------------------------ # ------------------------------
# Execution # Execution
# ------------------------------ # ------------------------------
async def step(self, org_id, instance_id): async def step(self, org_id, instance_id):
env = ServerEnv() env = ServerEnv()
async with get_sor_context(env, 'workflow') as sor: async with get_sor_context(env, 'workflow') as sor:
insts = await sor.R('flow_instance', { insts = await sor.R('flow_instance', {
'id': instance_id, 'id': instance_id,
'org_id': org_id 'org_id': org_id
}) })
if not insts: if not insts:
return return
inst = insts[0] inst = insts[0]
if inst['status'] != 'running': if inst['status'] != 'running':
return return
flow_def = (await sor.R( flow_def = (await sor.R(
'flow_definition', 'flow_definition',
{'id': inst['flow_def_id']} {'id': inst['flow_def_id']}
))[0] ))[0]
dsl = yaml.safe_load(flow_def['dsl']) dsl = yaml.safe_load(flow_def['dsl'])
ctx = json.loads(inst['ctx']) ctx = json.loads(inst['ctx'])
active = set(json.loads(inst['active_nodes']) or [dsl['start']]) active = set(json.loads(inst['active_nodes']) or [dsl['start']])
next_nodes = set() next_nodes = set()
for node_id in active: for node_id in active:
node = dsl['nodes'][node_id] node = dsl['nodes'][node_id]
ntype = node['type'] ntype = node['type']
# -------- Human node -------- # -------- Human node --------
if ntype == 'human': if ntype == 'human':
rows = await sor.R('human_task', { rows = await sor.R('human_task', {
'instance_id': instance_id, 'instance_id': instance_id,
'node_id': node_id, 'node_id': node_id,
'status': 'pending' 'status': 'pending'
}) })
if not rows: if not rows:
await sor.C('human_task', { await sor.C('human_task', {
'id': env.uuid(), 'id': env.uuid(),
'org_id': org_id, 'org_id': org_id,
'instance_id': instance_id, 'instance_id': instance_id,
'node_id': node_id, 'node_id': node_id,
'role': node.get('role'), 'role': node.get('role'),
'status': 'pending', 'status': 'pending',
'input': json.dumps(ctx), 'input': json.dumps(ctx),
'timeout_at': env.after(node.get('timeout', 0)) 'timeout_at': env.after(node.get('timeout', 0))
}) })
next_nodes.add(node_id) next_nodes.add(node_id)
continue continue
# -------- Normal node -------- # -------- Normal node --------
await sor.C('node_execution', { await sor.C('node_execution', {
'id': env.uuid(), 'id': env.uuid(),
'org_id': org_id, 'org_id': org_id,
'instance_id': instance_id, 'instance_id': instance_id,
'node_id': node_id, 'node_id': node_id,
'input_ctx': json.dumps(ctx), 'input_ctx': json.dumps(ctx),
'status': 'success' 'status': 'success'
}) })
for edge in dsl.get('edges', []): for edge in dsl.get('edges', []):
if edge['from'] != node_id: if edge['from'] != node_id:
continue continue
cond = edge.get('when') cond = edge.get('when')
if cond and not eval(cond, {}, {'ctx': ctx}): if cond and not eval(cond, {}, {'ctx': ctx}):
continue continue
next_nodes.add(edge['to']) next_nodes.add(edge['to'])
if next_nodes and all(dsl['nodes'][n]['type'] == 'end' for n in next_nodes): if next_nodes and all(dsl['nodes'][n]['type'] == 'end' for n in next_nodes):
await sor.U('flow_instance', { await sor.U('flow_instance', {
'id': instance_id, 'id': instance_id,
'status': 'finished', 'status': 'finished',
'active_nodes': json.dumps(list(next_nodes)) 'active_nodes': json.dumps(list(next_nodes))
}) })
else: else:
await sor.U('flow_instance', { await sor.U('flow_instance', {
'id': instance_id, 'id': instance_id,
'active_nodes': json.dumps(list(next_nodes)) 'active_nodes': json.dumps(list(next_nodes))
}) })
# ------------------------------ # ------------------------------
# Human task APIs # Human task APIs
# ------------------------------ # ------------------------------
async def list_human_tasks(self, org_id, user_roles): async def list_human_tasks(self, org_id, user_roles):
env = ServerEnv() env = ServerEnv()
async with (env, 'workflow') as sor: async with (env, 'workflow') as sor:
return await sor.R('human_task', { return await sor.R('human_task', {
'org_id': org_id, 'org_id': org_id,
'status': 'pending', 'status': 'pending',
'role': ('in', user_roles) 'role': ('in', user_roles)
}) })
async def complete_human_task(self, org_id, task_id, user_id, output): async def complete_human_task(self, org_id, task_id, user_id, output):
env = ServerEnv() env = ServerEnv()
async with (env, 'workflow') as sor: async with (env, 'workflow') as sor:
rows = await sor.R('human_task', { rows = await sor.R('human_task', {
'id': task_id, 'id': task_id,
'org_id': org_id, 'org_id': org_id,
'status': 'pending' 'status': 'pending'
}) })
if not rows: if not rows:
return return
task = rows[0] task = rows[0]
await sor.U('human_task', { await sor.U('human_task', {
'id': task_id, 'id': task_id,
'assignee': user_id, 'assignee': user_id,
'status': 'done', 'status': 'done',
'output': json.dumps(output), 'output': json.dumps(output),
'completed_at': env.now() 'completed_at': env.now()
}) })
inst = (await sor.R( inst = (await sor.R(
'flow_instance', 'flow_instance',
{'id': task['instance_id']} {'id': task['instance_id']}
))[0] ))[0]
ctx = json.loads(inst['ctx']) ctx = json.loads(inst['ctx'])
ctx.update(output) ctx.update(output)
await sor.U('flow_instance', { await sor.U('flow_instance', {
'id': inst['id'], 'id': inst['id'],
'ctx': json.dumps(ctx) 'ctx': json.dumps(ctx)
}) })