DagFlow – Enterprise Workflow Engine

DagFlow 是一个 企业级 DAG 工作流引擎,基于 sqlor + ahserver 构建,支持多组织(org_id)隔离、子流程、人工节点(审批任务)、RBAC 角色分配,并提供清晰的流程实例与任务查询接口,适用于:


核心特性

✅ 多租户隔离(org_id)

✅ DAG 工作流引擎

✅ 持久化执行(sqlor)

✅ 人工节点(Human Task)

✅ 子流程(SubFlow)


表结构说明

flow_definition(流程定义)

字段说明
id流程定义 ID
org_id组织 ID
name流程名称
version版本
dslYAML DSL
created_at创建时间

flow_instance(流程实例)

字段说明
id实例 ID
org_id组织 ID
flow_def_id流程定义 ID
statusrunning / finished
ctx流程上下文(JSON)
active_nodes当前活跃节点
created_at创建时间

node_execution(节点执行记录)

字段说明
id执行记录 ID
org_id组织 ID
instance_id流程实例 ID
node_id节点 ID
input_ctx输入上下文
output_ctx输出上下文
statussuccess / failed
error错误信息
created_at执行时间

human_task(人工任务)

字段说明
id任务 ID
org_id组织 ID
instance_id流程实例 ID
node_id节点 ID
role角色
assignee实际处理人
statuspending / done
input输入上下文
output输出结果
timeout_at超时时间
created_at创建时间
completed_at完成时间

DSL 示例

yaml
start: submit nodes: submit: type: normal approve: type: human role: manager timeout: 86400 end: type: end edges: - from: submit to: approve - from: approve to: end

核心 API(Engine)

创建流程定义

python
await engine.create_definition( org_id="org1", name="请假审批", version="v1", dsl_text=dsl_yaml )

启动流程实例

python
instance_id = await engine.create_instance( org_id="org1", flow_def_id=flow_id, ctx={"user": "alice", "days": 3} )

推进流程(调度器调用)

python
await engine.step(org_id="org1", instance_id=instance_id)

人工任务 API

查询待办任务(RBAC)

python
tasks = await engine.list_human_tasks( org_id="org1", user_roles=["manager", "admin"] )

完成人工任务

python
await engine.complete_human_task( org_id="org1", task_id=task_id, user_id="bob", output={"approved": True} )

完成后:


架构设计

less
[ HTTP API ] | [ FlowEngine ] | [ sqlor ] | [ MySQL / PostgreSQL / SQLite ]

适用场景


后续可扩展方向