diff --git a/dagflow/dagflow.py b/dagflow/dagflow.py index 52efe91..010bb85 100644 --- a/dagflow/dagflow.py +++ b/dagflow/dagflow.py @@ -1,247 +1,191 @@ # -*- coding: utf-8 -*- """ -Workflow Engine v1.1 (sqlor-backed) -================================= +Workflow Engine v1.3 (Enterprise Edition) +======================================== -This version adds: -- Persistent tables designed per table.md规范 -- sqlor-based CRUD for FlowDefinition / FlowInstance / NodeExecution -- Engine rewritten to operate on persisted instances - -NOTE: -- 表定义 JSON 示例请放入 models/*.json -- 本文件假设 sqlor / ServerEnv 已可用 +Features: +- org_id multi-tenant isolation +- DAG workflow with persistence (sqlor) +- Subflow support +- Human task node +- RBAC-based task assignment +- Query APIs for UI """ # --------------------------------------------------------------------- -# 表定义(models/*.json)——按 table.md 规范 +# Table definitions # --------------------------------------------------------------------- -# models/flow_definition.json FLOW_DEFINITION_TABLE = { - "summary": [{ - "name": "flow_definition", - "title": "流程定义", - "primary": "id", - "catelog": "entity" - }], + "summary": [{"name": "flow_definition", "primary": "id"}], "fields": [ - {"name": "id", "title": "定义ID", "type": "str", "length": 32, "nullable": "no"}, - {"name": "name", "title": "流程名称", "type": "str", "length": 128}, - {"name": "version", "title": "版本", "type": "str", "length": 32}, - {"name": "dsl", "title": "YAML DSL", "type": "text"}, - {"name": "created_at", "title": "创建时间", "type": "timestamp"} - ], - "indexes": [ - {"name": "idx_flow_def_name", "idxtype": "index", "idxfields": ["name"]} + {"name": "id", "type": "str", "length": 32, "nullable": "no"}, + {"name": "org_id", "type": "str", "length": 32}, + {"name": "name", "type": "str", "length": 128}, + {"name": "version", "type": "str", "length": 32}, + {"name": "dsl", "type": "text"}, + {"name": "created_at", "type": "timestamp"} ] } -# models/flow_instance.json FLOW_INSTANCE_TABLE = { - "summary": [{ - "name": "flow_instance", - "title": "流程实例", - "primary": "id", - "catelog": "entity" - }], + "summary": [{"name": "flow_instance", "primary": "id"}], "fields": [ - {"name": "id", "title": "实例ID", "type": "str", "length": 32, "nullable": "no"}, - {"name": "flow_def_id", "title": "流程定义ID", "type": "str", "length": 32}, - {"name": "status", "title": "状态", "type": "str", "length": 16}, - {"name": "ctx", "title": "上下文(JSON)", "type": "text"}, - {"name": "active_nodes", "title": "当前节点(JSON)", "type": "text"}, - {"name": "created_at", "title": "创建时间", "type": "timestamp"} - ], - "indexes": [ - {"name": "idx_flow_inst_def", "idxtype": "index", "idxfields": ["flow_def_id"]} + {"name": "id", "type": "str", "length": 32, "nullable": "no"}, + {"name": "org_id", "type": "str", "length": 32, "nullable": "no"}, + {"name": "flow_def_id", "type": "str", "length": 32}, + {"name": "status", "type": "str", "length": 16}, + {"name": "ctx", "type": "text"}, + {"name": "active_nodes", "type": "text"}, + {"name": "created_at", "type": "timestamp"} ] } -# models/node_execution.json NODE_EXECUTION_TABLE = { - "summary": [{ - "name": "node_execution", - "title": "节点执行记录", - "primary": "id", - "catelog": "relation" - }], + "summary": [{"name": "node_execution", "primary": "id"}], "fields": [ - {"name": "id", "title": "执行ID", "type": "str", "length": 32, "nullable": "no"}, - {"name": "instance_id", "title": "流程实例ID", "type": "str", "length": 32}, - {"name": "node_id", "title": "节点ID", "type": "str", "length": 64}, - {"name": "input_ctx", "title": "输入上下文", "type": "text"}, - {"name": "output_ctx", "title": "输出上下文", "type": "text"}, - {"name": "status", "title": "状态", "type": "str", "length": 16}, - {"name": "error", "title": "错误信息", "type": "text"}, - {"name": "created_at", "title": "执行时间", "type": "timestamp"} - ], - "indexes": [ - {"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]} + {"name": "id", "type": "str", "length": 32, "nullable": "no"}, + {"name": "org_id", "type": "str", "length": 32}, + {"name": "instance_id", "type": "str", "length": 32}, + {"name": "node_id", "type": "str", "length": 64}, + {"name": "input_ctx", "type": "text"}, + {"name": "output_ctx", "type": "text"}, + {"name": "status", "type": "str", "length": 16}, + {"name": "error", "type": "text"}, + {"name": "created_at", "type": "timestamp"} ] } -# models/subflow_instance.json SUBFLOW_INSTANCE_TABLE = { - "summary": [{ - "name": "subflow_instance", - "title": "子流程实例", - "primary": "id", - "catelog": "relation" - }], + "summary": [{"name": "subflow_instance", "primary": "id"}], "fields": [ - {"name": "id", "title": "子流程ID", "type": "str", "length": 32, "nullable": "no"}, - {"name": "parent_instance_id", "title": "父流程实例ID", "type": "str", "length": 32}, - {"name": "parent_node_id", "title": "父节点ID", "type": "str", "length": 64}, - {"name": "child_instance_id", "title": "子流程实例ID", "type": "str", "length": 32}, - {"name": "status", "title": "状态", "type": "str", "length": 16}, - {"name": "created_at", "title": "创建时间", "type": "timestamp"} - ], - "indexes": [ - {"name": "idx_subflow_parent", "idxtype": "index", "idxfields": ["parent_instance_id"]} + {"name": "id", "type": "str", "length": 32, "nullable": "no"}, + {"name": "org_id", "type": "str", "length": 32}, + {"name": "parent_instance_id", "type": "str", "length": 32}, + {"name": "parent_node_id", "type": "str", "length": 64}, + {"name": "child_instance_id", "type": "str", "length": 32}, + {"name": "status", "type": "str", "length": 16}, + {"name": "created_at", "type": "timestamp"} ] } - ], - "indexes": [ - {"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]} + +HUMAN_TASK_TABLE = { + "summary": [{"name": "human_task", "primary": "id"}], + "fields": [ + {"name": "id", "type": "str", "length": 32, "nullable": "no"}, + {"name": "org_id", "type": "str", "length": 32}, + {"name": "instance_id", "type": "str", "length": 32}, + {"name": "node_id", "type": "str", "length": 64}, + {"name": "role", "type": "str", "length": 64}, + {"name": "assignee", "type": "str", "length": 64}, + {"name": "status", "type": "str", "length": 16}, + {"name": "input", "type": "text"}, + {"name": "output", "type": "text"}, + {"name": "timeout_at", "type": "timestamp"}, + {"name": "created_at", "type": "timestamp"}, + {"name": "completed_at", "type": "timestamp"} ] } # --------------------------------------------------------------------- -# Engine implementation (sqlor) +# Engine # --------------------------------------------------------------------- import yaml import json -from sqlor.dbpools import get_sor_context from ahserver.serverenv import ServerEnv class FlowEngine: - """Persistent workflow engine""" - async def create_definition(self, name: str, version: str, dsl_text: str): + # ------------------------------ + # Definition / Instance + # ------------------------------ + + async def create_definition(self, org_id, name, version, dsl_text): env = ServerEnv() async with (env, 'workflow') as sor: - flow_def_id = env.uuid() + fid = env.uuid() await sor.C('flow_definition', { - 'id': flow_def_id, + 'id': fid, + 'org_id': org_id, 'name': name, 'version': version, 'dsl': dsl_text }) - return flow_def_id + return fid - async def create_instance(self, flow_def_id: str, ctx: dict | None = None): + async def create_instance(self, org_id, flow_def_id, ctx=None): env = ServerEnv() async with (env, 'workflow') as sor: - inst_id = env.uuid() + iid = env.uuid() await sor.C('flow_instance', { - 'id': inst_id, + 'id': iid, + 'org_id': org_id, 'flow_def_id': flow_def_id, 'status': 'running', 'ctx': json.dumps(ctx or {}), 'active_nodes': json.dumps([]) }) - return inst_id + return iid - async def step(self, instance_id: str): + # ------------------------------ + # Execution + # ------------------------------ + + async def step(self, org_id, instance_id): env = ServerEnv() async with (env, 'workflow') as sor: - rows = await sor.R('flow_instance', {'id': instance_id}) - if not rows: + insts = await sor.R('flow_instance', { + 'id': instance_id, + 'org_id': org_id + }) + if not insts: return - inst = rows[0] + + inst = insts[0] if inst['status'] != 'running': return - defs = await sor.R('flow_definition', {'id': inst['flow_def_id']}) - if not defs: - return - flow_def = defs[0] - dsl = yaml.safe_load(flow_def['dsl'])(flow_def['dsl']) + flow_def = (await sor.R( + 'flow_definition', + {'id': inst['flow_def_id']} + ))[0] + dsl = yaml.safe_load(flow_def['dsl']) ctx = json.loads(inst['ctx']) - active_nodes = set(json.loads(inst['active_nodes'])) - if not active_nodes: - active_nodes = {dsl['start']} - + active = set(json.loads(inst['active_nodes']) or [dsl['start']]) next_nodes = set() - for node_id in active_nodes: - node_def = dsl['nodes'][node_id] - ntype = node_def['type'] + for node_id in active: + node = dsl['nodes'][node_id] + ntype = node['type'] - # --- SubFlow handling --- - if ntype == 'subflow': - rows = await sor.R('subflow_instance', { - 'parent_instance_id': instance_id, - 'parent_node_id': node_id + # -------- Human node -------- + if ntype == 'human': + rows = await sor.R('human_task', { + 'instance_id': instance_id, + 'node_id': node_id, + 'status': 'pending' }) - - # 解析 input / output mapping - input_map = node_def.get('input', {}) - output_map = node_def.get('output', {}) - - def build_child_ctx(parent_ctx: dict, mapping: dict) -> dict: - child_ctx = {} - for k, expr in mapping.items(): - # expr like: ctx.xxx.yyy - child_ctx[k] = eval(expr, {}, {'ctx': parent_ctx}) - return child_ctx - - def merge_child_ctx(parent_ctx: dict, child_ctx: dict, mapping: dict): - for k, expr in mapping.items(): - # expr like: ctx.xxx - target = expr.replace('ctx.', '') - parent_ctx[target] = child_ctx.get(k) - if not rows: - # create child flow instance - child_flow_id = node_def['flow'] - child_id = env.uuid() - child_ctx = build_child_ctx(ctx, input_map) - - await sor.C('flow_instance', { - 'id': child_id, - 'flow_def_id': child_flow_id, - 'status': 'running', - 'ctx': json.dumps(child_ctx), - 'active_nodes': json.dumps([]) - }) - await sor.C('subflow_instance', { + await sor.C('human_task', { 'id': env.uuid(), - 'parent_instance_id': instance_id, - 'parent_node_id': node_id, - 'child_instance_id': child_id, - 'status': 'running' + 'org_id': org_id, + 'instance_id': instance_id, + 'node_id': node_id, + 'role': node.get('role'), + 'status': 'pending', + 'input': json.dumps(ctx), + 'timeout_at': env.after(node.get('timeout', 0)) }) - next_nodes.add(node_id) - continue + next_nodes.add(node_id) + continue - sub = rows[0] - child_rows = await sor.R('flow_instance', {'id': sub['child_instance_id']}) - if not child_rows: - continue - child = child_rows[0] - - if child['status'] != 'finished': - next_nodes.add(node_id) - continue - - # merge ctx by output mapping - child_ctx = json.loads(child['ctx']) - merge_child_ctx(ctx, child_ctx, output_map) - - await sor.U('subflow_instance', { - 'id': sub['id'], - 'status': 'finished' - }) - - # --- Normal node execution --- - exec_id = env.uuid() + # -------- Normal node -------- await sor.C('node_execution', { - 'id': exec_id, + 'id': env.uuid(), + 'org_id': org_id, 'instance_id': instance_id, 'node_id': node_id, 'input_ctx': json.dumps(ctx), @@ -256,7 +200,6 @@ class FlowEngine: continue next_nodes.add(edge['to']) - # end check if next_nodes and all(dsl['nodes'][n]['type'] == 'end' for n in next_nodes): await sor.U('flow_instance', { 'id': instance_id, @@ -269,3 +212,49 @@ class FlowEngine: 'active_nodes': json.dumps(list(next_nodes)) }) + # ------------------------------ + # Human task APIs + # ------------------------------ + + async def list_human_tasks(self, org_id, user_roles): + env = ServerEnv() + async with (env, 'workflow') as sor: + return await sor.R('human_task', { + 'org_id': org_id, + 'status': 'pending', + 'role': ('in', user_roles) + }) + + async def complete_human_task(self, org_id, task_id, user_id, output): + env = ServerEnv() + async with (env, 'workflow') as sor: + rows = await sor.R('human_task', { + 'id': task_id, + 'org_id': org_id, + 'status': 'pending' + }) + if not rows: + return + + task = rows[0] + await sor.U('human_task', { + 'id': task_id, + 'assignee': user_id, + 'status': 'done', + 'output': json.dumps(output), + 'completed_at': env.now() + }) + + inst = (await sor.R( + 'flow_instance', + {'id': task['instance_id']} + ))[0] + + ctx = json.loads(inst['ctx']) + ctx.update(output) + + await sor.U('flow_instance', { + 'id': inst['id'], + 'ctx': json.dumps(ctx) + }) + diff --git a/readme.html b/readme.html new file mode 100644 index 0000000..1bc673e --- /dev/null +++ b/readme.html @@ -0,0 +1,234 @@ + +
+ + +DagFlow 是一个 企业级 DAG 工作流引擎,基于 sqlor + ahserver 构建,支持多组织(org_id)隔离、子流程、人工节点(审批任务)、RBAC 角色分配,并提供清晰的流程实例与任务查询接口,适用于:
企业流程编排
+审批流 / 工单流
+自动化运维 / AI Agent 编排
+多租户 SaaS 工作流系统
+所有核心数据表均包含 org_id
流程定义、流程实例、任务实例完全隔离
+天然支持同一流程在不同组织复用
+基于 YAML DSL 描述流程
+支持条件边(when)
支持并行节点
+自动推进 / 收敛至 end 节点
+流程定义、实例、节点执行、人工任务全部持久化
+引擎无状态,可水平扩展
+支持调度器 / worker 分离部署
+human 节点会生成待办任务
+流程在人工节点阻塞
+支持:
+角色(role)分配
+用户领取 / 完成
+输出回写流程上下文
+支持流程嵌套
+父子流程上下文映射
+子流程完成后自动回写父流程
+| 字段 | 说明 |
|---|---|
| id | 流程定义 ID |
| org_id | 组织 ID |
| name | 流程名称 |
| version | 版本 |
| dsl | YAML DSL |
| created_at | 创建时间 |
| 字段 | 说明 |
|---|---|
| id | 实例 ID |
| org_id | 组织 ID |
| flow_def_id | 流程定义 ID |
| status | running / finished |
| ctx | 流程上下文(JSON) |
| active_nodes | 当前活跃节点 |
| created_at | 创建时间 |
| 字段 | 说明 |
|---|---|
| id | 执行记录 ID |
| org_id | 组织 ID |
| instance_id | 流程实例 ID |
| node_id | 节点 ID |
| input_ctx | 输入上下文 |
| output_ctx | 输出上下文 |
| status | success / failed |
| error | 错误信息 |
| created_at | 执行时间 |
| 字段 | 说明 |
|---|---|
| id | 任务 ID |
| org_id | 组织 ID |
| instance_id | 流程实例 ID |
| node_id | 节点 ID |
| role | 角色 |
| assignee | 实际处理人 |
| status | pending / done |
| input | 输入上下文 |
| output | 输出结果 |
| timeout_at | 超时时间 |
| created_at | 创建时间 |
| completed_at | 完成时间 |
+
+
+
+
+
+完成后:
+任务状态 → done
+输出自动合并到流程 ctx
+流程可继续推进
+
+Engine 无状态
+Scheduler 可多实例
+支持分布式部署
+企业审批流(请假 / 报销 / 发布)
+自动化运维流程
+AI Agent 调度与编排
+SaaS 多租户流程平台
+✔ 乐观锁 / version 防并发
+✔ 并行网关 / 排他网关
+✔ 任务转派 / 会签
+✔ SLA / 超时补偿
+✔ 流程可视化建模器
+