From 313266638d1e8d8d06d0d3860fae42948bd479a8 Mon Sep 17 00:00:00 2001 From: yumoqing Date: Tue, 27 Jan 2026 15:51:54 +0800 Subject: [PATCH] bugfix --- dagflow/dagflow.py | 331 ++++++++++++++++++++++----------------------- readme.html | 234 ++++++++++++++++++++++++++++++++ t | 245 +++++++++++++++++++++++++++++++++ 3 files changed, 639 insertions(+), 171 deletions(-) create mode 100644 readme.html create mode 100644 t diff --git a/dagflow/dagflow.py b/dagflow/dagflow.py index 52efe91..010bb85 100644 --- a/dagflow/dagflow.py +++ b/dagflow/dagflow.py @@ -1,247 +1,191 @@ # -*- coding: utf-8 -*- """ -Workflow Engine v1.1 (sqlor-backed) -================================= +Workflow Engine v1.3 (Enterprise Edition) +======================================== -This version adds: -- Persistent tables designed per table.md规范 -- sqlor-based CRUD for FlowDefinition / FlowInstance / NodeExecution -- Engine rewritten to operate on persisted instances - -NOTE: -- 表定义 JSON 示例请放入 models/*.json -- 本文件假设 sqlor / ServerEnv 已可用 +Features: +- org_id multi-tenant isolation +- DAG workflow with persistence (sqlor) +- Subflow support +- Human task node +- RBAC-based task assignment +- Query APIs for UI """ # --------------------------------------------------------------------- -# 表定义(models/*.json)——按 table.md 规范 +# Table definitions # --------------------------------------------------------------------- -# models/flow_definition.json FLOW_DEFINITION_TABLE = { - "summary": [{ - "name": "flow_definition", - "title": "流程定义", - "primary": "id", - "catelog": "entity" - }], + "summary": [{"name": "flow_definition", "primary": "id"}], "fields": [ - {"name": "id", "title": "定义ID", "type": "str", "length": 32, "nullable": "no"}, - {"name": "name", "title": "流程名称", "type": "str", "length": 128}, - {"name": "version", "title": "版本", "type": "str", "length": 32}, - {"name": "dsl", "title": "YAML DSL", "type": "text"}, - {"name": "created_at", "title": "创建时间", "type": "timestamp"} - ], - "indexes": [ - {"name": "idx_flow_def_name", "idxtype": "index", "idxfields": ["name"]} + {"name": "id", "type": "str", "length": 32, "nullable": "no"}, + {"name": "org_id", "type": "str", "length": 32}, + {"name": "name", "type": "str", "length": 128}, + {"name": "version", "type": "str", "length": 32}, + {"name": "dsl", "type": "text"}, + {"name": "created_at", "type": "timestamp"} ] } -# models/flow_instance.json FLOW_INSTANCE_TABLE = { - "summary": [{ - "name": "flow_instance", - "title": "流程实例", - "primary": "id", - "catelog": "entity" - }], + "summary": [{"name": "flow_instance", "primary": "id"}], "fields": [ - {"name": "id", "title": "实例ID", "type": "str", "length": 32, "nullable": "no"}, - {"name": "flow_def_id", "title": "流程定义ID", "type": "str", "length": 32}, - {"name": "status", "title": "状态", "type": "str", "length": 16}, - {"name": "ctx", "title": "上下文(JSON)", "type": "text"}, - {"name": "active_nodes", "title": "当前节点(JSON)", "type": "text"}, - {"name": "created_at", "title": "创建时间", "type": "timestamp"} - ], - "indexes": [ - {"name": "idx_flow_inst_def", "idxtype": "index", "idxfields": ["flow_def_id"]} + {"name": "id", "type": "str", "length": 32, "nullable": "no"}, + {"name": "org_id", "type": "str", "length": 32, "nullable": "no"}, + {"name": "flow_def_id", "type": "str", "length": 32}, + {"name": "status", "type": "str", "length": 16}, + {"name": "ctx", "type": "text"}, + {"name": "active_nodes", "type": "text"}, + {"name": "created_at", "type": "timestamp"} ] } -# models/node_execution.json NODE_EXECUTION_TABLE = { - "summary": [{ - "name": "node_execution", - "title": "节点执行记录", - "primary": "id", - "catelog": "relation" - }], + "summary": [{"name": "node_execution", "primary": "id"}], "fields": [ - {"name": "id", "title": "执行ID", "type": "str", "length": 32, "nullable": "no"}, - {"name": "instance_id", "title": "流程实例ID", "type": "str", "length": 32}, - {"name": "node_id", "title": "节点ID", "type": "str", "length": 64}, - {"name": "input_ctx", "title": "输入上下文", "type": "text"}, - {"name": "output_ctx", "title": "输出上下文", "type": "text"}, - {"name": "status", "title": "状态", "type": "str", "length": 16}, - {"name": "error", "title": "错误信息", "type": "text"}, - {"name": "created_at", "title": "执行时间", "type": "timestamp"} - ], - "indexes": [ - {"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]} + {"name": "id", "type": "str", "length": 32, "nullable": "no"}, + {"name": "org_id", "type": "str", "length": 32}, + {"name": "instance_id", "type": "str", "length": 32}, + {"name": "node_id", "type": "str", "length": 64}, + {"name": "input_ctx", "type": "text"}, + {"name": "output_ctx", "type": "text"}, + {"name": "status", "type": "str", "length": 16}, + {"name": "error", "type": "text"}, + {"name": "created_at", "type": "timestamp"} ] } -# models/subflow_instance.json SUBFLOW_INSTANCE_TABLE = { - "summary": [{ - "name": "subflow_instance", - "title": "子流程实例", - "primary": "id", - "catelog": "relation" - }], + "summary": [{"name": "subflow_instance", "primary": "id"}], "fields": [ - {"name": "id", "title": "子流程ID", "type": "str", "length": 32, "nullable": "no"}, - {"name": "parent_instance_id", "title": "父流程实例ID", "type": "str", "length": 32}, - {"name": "parent_node_id", "title": "父节点ID", "type": "str", "length": 64}, - {"name": "child_instance_id", "title": "子流程实例ID", "type": "str", "length": 32}, - {"name": "status", "title": "状态", "type": "str", "length": 16}, - {"name": "created_at", "title": "创建时间", "type": "timestamp"} - ], - "indexes": [ - {"name": "idx_subflow_parent", "idxtype": "index", "idxfields": ["parent_instance_id"]} + {"name": "id", "type": "str", "length": 32, "nullable": "no"}, + {"name": "org_id", "type": "str", "length": 32}, + {"name": "parent_instance_id", "type": "str", "length": 32}, + {"name": "parent_node_id", "type": "str", "length": 64}, + {"name": "child_instance_id", "type": "str", "length": 32}, + {"name": "status", "type": "str", "length": 16}, + {"name": "created_at", "type": "timestamp"} ] } - ], - "indexes": [ - {"name": "idx_node_exec_inst", "idxtype": "index", "idxfields": ["instance_id"]} + +HUMAN_TASK_TABLE = { + "summary": [{"name": "human_task", "primary": "id"}], + "fields": [ + {"name": "id", "type": "str", "length": 32, "nullable": "no"}, + {"name": "org_id", "type": "str", "length": 32}, + {"name": "instance_id", "type": "str", "length": 32}, + {"name": "node_id", "type": "str", "length": 64}, + {"name": "role", "type": "str", "length": 64}, + {"name": "assignee", "type": "str", "length": 64}, + {"name": "status", "type": "str", "length": 16}, + {"name": "input", "type": "text"}, + {"name": "output", "type": "text"}, + {"name": "timeout_at", "type": "timestamp"}, + {"name": "created_at", "type": "timestamp"}, + {"name": "completed_at", "type": "timestamp"} ] } # --------------------------------------------------------------------- -# Engine implementation (sqlor) +# Engine # --------------------------------------------------------------------- import yaml import json -from sqlor.dbpools import get_sor_context from ahserver.serverenv import ServerEnv class FlowEngine: - """Persistent workflow engine""" - async def create_definition(self, name: str, version: str, dsl_text: str): + # ------------------------------ + # Definition / Instance + # ------------------------------ + + async def create_definition(self, org_id, name, version, dsl_text): env = ServerEnv() async with (env, 'workflow') as sor: - flow_def_id = env.uuid() + fid = env.uuid() await sor.C('flow_definition', { - 'id': flow_def_id, + 'id': fid, + 'org_id': org_id, 'name': name, 'version': version, 'dsl': dsl_text }) - return flow_def_id + return fid - async def create_instance(self, flow_def_id: str, ctx: dict | None = None): + async def create_instance(self, org_id, flow_def_id, ctx=None): env = ServerEnv() async with (env, 'workflow') as sor: - inst_id = env.uuid() + iid = env.uuid() await sor.C('flow_instance', { - 'id': inst_id, + 'id': iid, + 'org_id': org_id, 'flow_def_id': flow_def_id, 'status': 'running', 'ctx': json.dumps(ctx or {}), 'active_nodes': json.dumps([]) }) - return inst_id + return iid - async def step(self, instance_id: str): + # ------------------------------ + # Execution + # ------------------------------ + + async def step(self, org_id, instance_id): env = ServerEnv() async with (env, 'workflow') as sor: - rows = await sor.R('flow_instance', {'id': instance_id}) - if not rows: + insts = await sor.R('flow_instance', { + 'id': instance_id, + 'org_id': org_id + }) + if not insts: return - inst = rows[0] + + inst = insts[0] if inst['status'] != 'running': return - defs = await sor.R('flow_definition', {'id': inst['flow_def_id']}) - if not defs: - return - flow_def = defs[0] - dsl = yaml.safe_load(flow_def['dsl'])(flow_def['dsl']) + flow_def = (await sor.R( + 'flow_definition', + {'id': inst['flow_def_id']} + ))[0] + dsl = yaml.safe_load(flow_def['dsl']) ctx = json.loads(inst['ctx']) - active_nodes = set(json.loads(inst['active_nodes'])) - if not active_nodes: - active_nodes = {dsl['start']} - + active = set(json.loads(inst['active_nodes']) or [dsl['start']]) next_nodes = set() - for node_id in active_nodes: - node_def = dsl['nodes'][node_id] - ntype = node_def['type'] + for node_id in active: + node = dsl['nodes'][node_id] + ntype = node['type'] - # --- SubFlow handling --- - if ntype == 'subflow': - rows = await sor.R('subflow_instance', { - 'parent_instance_id': instance_id, - 'parent_node_id': node_id + # -------- Human node -------- + if ntype == 'human': + rows = await sor.R('human_task', { + 'instance_id': instance_id, + 'node_id': node_id, + 'status': 'pending' }) - - # 解析 input / output mapping - input_map = node_def.get('input', {}) - output_map = node_def.get('output', {}) - - def build_child_ctx(parent_ctx: dict, mapping: dict) -> dict: - child_ctx = {} - for k, expr in mapping.items(): - # expr like: ctx.xxx.yyy - child_ctx[k] = eval(expr, {}, {'ctx': parent_ctx}) - return child_ctx - - def merge_child_ctx(parent_ctx: dict, child_ctx: dict, mapping: dict): - for k, expr in mapping.items(): - # expr like: ctx.xxx - target = expr.replace('ctx.', '') - parent_ctx[target] = child_ctx.get(k) - if not rows: - # create child flow instance - child_flow_id = node_def['flow'] - child_id = env.uuid() - child_ctx = build_child_ctx(ctx, input_map) - - await sor.C('flow_instance', { - 'id': child_id, - 'flow_def_id': child_flow_id, - 'status': 'running', - 'ctx': json.dumps(child_ctx), - 'active_nodes': json.dumps([]) - }) - await sor.C('subflow_instance', { + await sor.C('human_task', { 'id': env.uuid(), - 'parent_instance_id': instance_id, - 'parent_node_id': node_id, - 'child_instance_id': child_id, - 'status': 'running' + 'org_id': org_id, + 'instance_id': instance_id, + 'node_id': node_id, + 'role': node.get('role'), + 'status': 'pending', + 'input': json.dumps(ctx), + 'timeout_at': env.after(node.get('timeout', 0)) }) - next_nodes.add(node_id) - continue + next_nodes.add(node_id) + continue - sub = rows[0] - child_rows = await sor.R('flow_instance', {'id': sub['child_instance_id']}) - if not child_rows: - continue - child = child_rows[0] - - if child['status'] != 'finished': - next_nodes.add(node_id) - continue - - # merge ctx by output mapping - child_ctx = json.loads(child['ctx']) - merge_child_ctx(ctx, child_ctx, output_map) - - await sor.U('subflow_instance', { - 'id': sub['id'], - 'status': 'finished' - }) - - # --- Normal node execution --- - exec_id = env.uuid() + # -------- Normal node -------- await sor.C('node_execution', { - 'id': exec_id, + 'id': env.uuid(), + 'org_id': org_id, 'instance_id': instance_id, 'node_id': node_id, 'input_ctx': json.dumps(ctx), @@ -256,7 +200,6 @@ class FlowEngine: continue next_nodes.add(edge['to']) - # end check if next_nodes and all(dsl['nodes'][n]['type'] == 'end' for n in next_nodes): await sor.U('flow_instance', { 'id': instance_id, @@ -269,3 +212,49 @@ class FlowEngine: 'active_nodes': json.dumps(list(next_nodes)) }) + # ------------------------------ + # Human task APIs + # ------------------------------ + + async def list_human_tasks(self, org_id, user_roles): + env = ServerEnv() + async with (env, 'workflow') as sor: + return await sor.R('human_task', { + 'org_id': org_id, + 'status': 'pending', + 'role': ('in', user_roles) + }) + + async def complete_human_task(self, org_id, task_id, user_id, output): + env = ServerEnv() + async with (env, 'workflow') as sor: + rows = await sor.R('human_task', { + 'id': task_id, + 'org_id': org_id, + 'status': 'pending' + }) + if not rows: + return + + task = rows[0] + await sor.U('human_task', { + 'id': task_id, + 'assignee': user_id, + 'status': 'done', + 'output': json.dumps(output), + 'completed_at': env.now() + }) + + inst = (await sor.R( + 'flow_instance', + {'id': task['instance_id']} + ))[0] + + ctx = json.loads(inst['ctx']) + ctx.update(output) + + await sor.U('flow_instance', { + 'id': inst['id'], + 'ctx': json.dumps(ctx) + }) + diff --git a/readme.html b/readme.html new file mode 100644 index 0000000..1bc673e --- /dev/null +++ b/readme.html @@ -0,0 +1,234 @@ + + + + +

DagFlow – Enterprise Workflow Engine

+

DagFlow 是一个 企业级 DAG 工作流引擎,基于 sqlor + ahserver 构建,支持多组织(org_id)隔离、子流程、人工节点(审批任务)、RBAC 角色分配,并提供清晰的流程实例与任务查询接口,适用于:

+ +
+

核心特性

+

✅ 多租户隔离(org_id)

+ +

✅ DAG 工作流引擎

+ +

✅ 持久化执行(sqlor)

+ +

✅ 人工节点(Human Task)

+ +

✅ 子流程(SubFlow)

+ +
+

表结构说明

+

flow_definition(流程定义)

+
字段说明
id流程定义 ID
org_id组织 ID
name流程名称
version版本
dslYAML DSL
created_at创建时间
+
+

flow_instance(流程实例)

+
字段说明
id实例 ID
org_id组织 ID
flow_def_id流程定义 ID
statusrunning / finished
ctx流程上下文(JSON)
active_nodes当前活跃节点
created_at创建时间
+
+

node_execution(节点执行记录)

+
字段说明
id执行记录 ID
org_id组织 ID
instance_id流程实例 ID
node_id节点 ID
input_ctx输入上下文
output_ctx输出上下文
statussuccess / failed
error错误信息
created_at执行时间
+
+

human_task(人工任务)

+
字段说明
id任务 ID
org_id组织 ID
instance_id流程实例 ID
node_id节点 ID
role角色
assignee实际处理人
statuspending / done
input输入上下文
output输出结果
timeout_at超时时间
created_at创建时间
completed_at完成时间
+
+

DSL 示例

+
yaml
start: submit +nodes: + submit: + type: normal + approve: + type: human + role: manager + timeout: 86400 + end: + type: end + +edges: + - from: submit + to: approve + - from: approve + to: end +
+
+

核心 API(Engine)

+

创建流程定义

+
python
await engine.create_definition( + org_id="org1", + name="请假审批", + version="v1", + dsl_text=dsl_yaml +) +
+

启动流程实例

+
python
instance_id = await engine.create_instance( + org_id="org1", + flow_def_id=flow_id, + ctx={"user": "alice", "days": 3} +) +
+

推进流程(调度器调用)

+
python
await engine.step(org_id="org1", instance_id=instance_id) +
+
+

人工任务 API

+

查询待办任务(RBAC)

+
python
tasks = await engine.list_human_tasks( + org_id="org1", + user_roles=["manager", "admin"] +) +
+

完成人工任务

+
python
await engine.complete_human_task( + org_id="org1", + task_id=task_id, + user_id="bob", + output={"approved": True} +) +
+

完成后:

+ +
+

架构设计

+
less
[ HTTP API ] + | +[ FlowEngine ] + | +[ sqlor ] + | +[ MySQL / PostgreSQL / SQLite ] +
+ +
+

适用场景

+ +
+

后续可扩展方向

+ + + diff --git a/t b/t new file mode 100644 index 0000000..f31e8a0 --- /dev/null +++ b/t @@ -0,0 +1,245 @@ +# DagFlow – Enterprise Workflow Engine + +DagFlow 是一个 **企业级 DAG 工作流引擎**,基于 `sqlor + ahserver` 构建,支持多组织(org\_id)隔离、子流程、人工节点(审批任务)、RBAC 角色分配,并提供清晰的流程实例与任务查询接口,适用于: + +* 企业流程编排 + +* 审批流 / 工单流 + +* 自动化运维 / AI Agent 编排 + +* 多租户 SaaS 工作流系统 + + +- - - + +## 核心特性 + +### ✅ 多租户隔离(org\_id) + +* 所有核心数据表均包含 `org_id` + +* 流程定义、流程实例、任务实例完全隔离 + +* 天然支持同一流程在不同组织复用 + + +### ✅ DAG 工作流引擎 + +* 基于 YAML DSL 描述流程 + +* 支持条件边(`when`) + +* 支持并行节点 + +* 自动推进 / 收敛至 end 节点 + + +### ✅ 持久化执行(sqlor) + +* 流程定义、实例、节点执行、人工任务全部持久化 + +* 引擎无状态,可水平扩展 + +* 支持调度器 / worker 分离部署 + + +### ✅ 人工节点(Human Task) + +* human 节点会生成待办任务 + +* 流程在人工节点阻塞 + +* 支持: + + * 角色(role)分配 + + * 用户领取 / 完成 + + * 输出回写流程上下文 + + +### ✅ 子流程(SubFlow) + +* 支持流程嵌套 + +* 父子流程上下文映射 + +* 子流程完成后自动回写父流程 + + +- - - + +## 表结构说明 + +### flow\_definition(流程定义) + +| 字段 | 说明 | +| --- | --- | +| id | 流程定义 ID | +| org\_id | 组织 ID | +| name | 流程名称 | +| version | 版本 | +| dsl | YAML DSL | +| created\_at | 创建时间 | + +- - - + +### flow\_instance(流程实例) + +| 字段 | 说明 | +| --- | --- | +| id | 实例 ID | +| org\_id | 组织 ID | +| flow\_def\_id | 流程定义 ID | +| status | running / finished | +| ctx | 流程上下文(JSON) | +| active\_nodes | 当前活跃节点 | +| created\_at | 创建时间 | + +- - - + +### node\_execution(节点执行记录) + +| 字段 | 说明 | +| --- | --- | +| id | 执行记录 ID | +| org\_id | 组织 ID | +| instance\_id | 流程实例 ID | +| node\_id | 节点 ID | +| input\_ctx | 输入上下文 | +| output\_ctx | 输出上下文 | +| status | success / failed | +| error | 错误信息 | +| created\_at | 执行时间 | + +- - - + +### human\_task(人工任务) + +| 字段 | 说明 | +| --- | --- | +| id | 任务 ID | +| org\_id | 组织 ID | +| instance\_id | 流程实例 ID | +| node\_id | 节点 ID | +| role | 角色 | +| assignee | 实际处理人 | +| status | pending / done | +| input | 输入上下文 | +| output | 输出结果 | +| timeout\_at | 超时时间 | +| created\_at | 创建时间 | +| completed\_at | 完成时间 | + +- - - + +## DSL 示例 + +yaml + +复制代码 + +`start: submit nodes: submit: type: normal approve: type: human role: manager timeout: 86400 end: type: end edges: - from: submit to: approve - from: approve to: end` + +- - - + +## 核心 API(Engine) + +### 创建流程定义 + +python + +复制代码 + +`await engine.create_definition( org_id="org1", name="请假审批", version="v1", dsl_text=dsl_yaml )` + +### 启动流程实例 + +python + +复制代码 + +`instance_id = await engine.create_instance( org_id="org1", flow_def_id=flow_id, ctx={"user": "alice", "days": 3} )` + +### 推进流程(调度器调用) + +python + +复制代码 + +`await engine.step(org_id="org1", instance_id=instance_id)` + +- - - + +## 人工任务 API + +### 查询待办任务(RBAC) + +python + +复制代码 + +`tasks = await engine.list_human_tasks( org_id="org1", user_roles=["manager", "admin"] )` + +### 完成人工任务 + +python + +复制代码 + +`await engine.complete_human_task( org_id="org1", task_id=task_id, user_id="bob", output={"approved": True} )` + +完成后: + +* 任务状态 → done + +* 输出自动合并到流程 ctx + +* 流程可继续推进 + + +- - - + +## 架构设计 + +less + +复制代码 + +`[ HTTP API ] | [ FlowEngine ] | [ sqlor ] | [ MySQL / PostgreSQL / SQLite ]` + +* Engine 无状态 + +* Scheduler 可多实例 + +* 支持分布式部署 + + +- - - + +## 适用场景 + +* 企业审批流(请假 / 报销 / 发布) + +* 自动化运维流程 + +* AI Agent 调度与编排 + +* SaaS 多租户流程平台 + + +- - - + +## 后续可扩展方向 + +* ✔ 乐观锁 / version 防并发 + +* ✔ 并行网关 / 排他网关 + +* ✔ 任务转派 / 会签 + +* ✔ SLA / 超时补偿 + +* ✔ 流程可视化建模器 +