dagflow/dagflow/dagflow.py
2026-02-11 17:39:22 +08:00

274 lines
9.2 KiB
Python

# -*- coding: utf-8 -*-
"""
Workflow Engine v1.3 (Enterprise Edition)
========================================
Features:
- org_id multi-tenant isolation
- DAG workflow with persistence (sqlor)
- Subflow support
- Human task node
- RBAC-based task assignment
- Query APIs for UI
"""
# ---------------------------------------------------------------------
# Table definitions
# ---------------------------------------------------------------------
FLOW_DEFINITION_TABLE = {
"summary": [{"name": "flow_definition", "primary": "id"}],
"fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "type": "str", "length": 32},
{"name": "name", "type": "str", "length": 128},
{"name": "version", "type": "str", "length": 32},
{"name": "dsl", "type": "text"},
{"name": "created_at", "type": "timestamp"}
]
}
FLOW_INSTANCE_TABLE = {
"summary": [{"name": "flow_instance", "primary": "id"}],
"fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "type": "str", "length": 32, "nullable": "no"},
{"name": "flow_def_id", "type": "str", "length": 32},
{"name": "status", "type": "str", "length": 16},
{"name": "ctx", "type": "text"},
{"name": "active_nodes", "type": "text"},
{"name": "created_at", "type": "timestamp"}
]
}
NODE_EXECUTION_TABLE = {
"summary": [{"name": "node_execution", "primary": "id"}],
"fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "type": "str", "length": 32},
{"name": "instance_id", "type": "str", "length": 32},
{"name": "node_id", "type": "str", "length": 64},
{"name": "input_ctx", "type": "text"},
{"name": "output_ctx", "type": "text"},
{"name": "status", "type": "str", "length": 16},
{"name": "error", "type": "text"},
{"name": "created_at", "type": "timestamp"}
]
}
SUBFLOW_INSTANCE_TABLE = {
"summary": [{"name": "subflow_instance", "primary": "id"}],
"fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "type": "str", "length": 32},
{"name": "parent_instance_id", "type": "str", "length": 32},
{"name": "parent_node_id", "type": "str", "length": 64},
{"name": "child_instance_id", "type": "str", "length": 32},
{"name": "status", "type": "str", "length": 16},
{"name": "created_at", "type": "timestamp"}
]
}
HUMAN_TASK_TABLE = {
"summary": [{"name": "human_task", "primary": "id"}],
"fields": [
{"name": "id", "type": "str", "length": 32, "nullable": "no"},
{"name": "org_id", "type": "str", "length": 32},
{"name": "instance_id", "type": "str", "length": 32},
{"name": "node_id", "type": "str", "length": 64},
{"name": "role", "type": "str", "length": 64},
{"name": "assignee", "type": "str", "length": 64},
{"name": "status", "type": "str", "length": 16},
{"name": "input", "type": "text"},
{"name": "output", "type": "text"},
{"name": "timeout_at", "type": "timestamp"},
{"name": "created_at", "type": "timestamp"},
{"name": "completed_at", "type": "timestamp"}
]
}
# ---------------------------------------------------------------------
# Engine
# ---------------------------------------------------------------------
import yaml
import json
from ahserver.serverenv import ServerEnv
class FlowEngine:
# ------------------------------
# Definition / Instance
# ------------------------------
def __init__(self):
self.aio = []
async def bgtask(self, app):
env = ServerEnv()
self.aio = []
async with (env, 'workflow') as sor:
fid = env.uuid()
self.aio = await sor.R('flow_instance', {'status': 'running'})
while True:
for r in self.aio:
self.step(r.org_id, r.id)
await asyncio.sleep(0.5)
async def create_definition(self, org_id, name, version, dsl_text):
env = ServerEnv()
async with (env, 'workflow') as sor:
fid = env.uuid()
await sor.C('flow_definition', {
'id': fid,
'org_id': org_id,
'name': name,
'version': version,
'dsl': dsl_text
})
return fid
async def create_instance(self, org_id, flow_def_id, ctx=None):
env = ServerEnv()
async with (env, 'workflow') as sor:
iid = env.uuid()
await sor.C('flow_instance', {
'id': iid,
'org_id': org_id,
'flow_def_id': flow_def_id,
'status': 'running',
'ctx': json.dumps(ctx or {}),
'active_nodes': json.dumps([])
})
return iid
# ------------------------------
# Execution
# ------------------------------
async def step(self, org_id, instance_id):
env = ServerEnv()
async with get_sor_context(env, 'workflow') as sor:
insts = await sor.R('flow_instance', {
'id': instance_id,
'org_id': org_id
})
if not insts:
return
inst = insts[0]
if inst['status'] != 'running':
return
flow_def = (await sor.R(
'flow_definition',
{'id': inst['flow_def_id']}
))[0]
dsl = yaml.safe_load(flow_def['dsl'])
ctx = json.loads(inst['ctx'])
active = set(json.loads(inst['active_nodes']) or [dsl['start']])
next_nodes = set()
for node_id in active:
node = dsl['nodes'][node_id]
ntype = node['type']
# -------- Human node --------
if ntype == 'human':
rows = await sor.R('human_task', {
'instance_id': instance_id,
'node_id': node_id,
'status': 'pending'
})
if not rows:
await sor.C('human_task', {
'id': env.uuid(),
'org_id': org_id,
'instance_id': instance_id,
'node_id': node_id,
'role': node.get('role'),
'status': 'pending',
'input': json.dumps(ctx),
'timeout_at': env.after(node.get('timeout', 0))
})
next_nodes.add(node_id)
continue
# -------- Normal node --------
await sor.C('node_execution', {
'id': env.uuid(),
'org_id': org_id,
'instance_id': instance_id,
'node_id': node_id,
'input_ctx': json.dumps(ctx),
'status': 'success'
})
for edge in dsl.get('edges', []):
if edge['from'] != node_id:
continue
cond = edge.get('when')
if cond and not eval(cond, {}, {'ctx': ctx}):
continue
next_nodes.add(edge['to'])
if next_nodes and all(dsl['nodes'][n]['type'] == 'end' for n in next_nodes):
await sor.U('flow_instance', {
'id': instance_id,
'status': 'finished',
'active_nodes': json.dumps(list(next_nodes))
})
else:
await sor.U('flow_instance', {
'id': instance_id,
'active_nodes': json.dumps(list(next_nodes))
})
# ------------------------------
# Human task APIs
# ------------------------------
async def list_human_tasks(self, org_id, user_roles):
env = ServerEnv()
async with (env, 'workflow') as sor:
return await sor.R('human_task', {
'org_id': org_id,
'status': 'pending',
'role': ('in', user_roles)
})
async def complete_human_task(self, org_id, task_id, user_id, output):
env = ServerEnv()
async with (env, 'workflow') as sor:
rows = await sor.R('human_task', {
'id': task_id,
'org_id': org_id,
'status': 'pending'
})
if not rows:
return
task = rows[0]
await sor.U('human_task', {
'id': task_id,
'assignee': user_id,
'status': 'done',
'output': json.dumps(output),
'completed_at': env.now()
})
inst = (await sor.R(
'flow_instance',
{'id': task['instance_id']}
))[0]
ctx = json.loads(inst['ctx'])
ctx.update(output)
await sor.U('flow_instance', {
'id': inst['id'],
'ctx': json.dumps(ctx)
})