# DagFlow – Enterprise Workflow Engine DagFlow 是一个 **企业级 DAG 工作流引擎**,基于 `sqlor + ahserver` 构建,支持多组织(org\_id)隔离、子流程、人工节点(审批任务)、RBAC 角色分配,并提供清晰的流程实例与任务查询接口,适用于: * 企业流程编排 * 审批流 / 工单流 * 自动化运维 / AI Agent 编排 * 多租户 SaaS 工作流系统 - - - ## 核心特性 ### ✅ 多租户隔离(org\_id) * 所有核心数据表均包含 `org_id` * 流程定义、流程实例、任务实例完全隔离 * 天然支持同一流程在不同组织复用 ### ✅ DAG 工作流引擎 * 基于 YAML DSL 描述流程 * 支持条件边(`when`) * 支持并行节点 * 自动推进 / 收敛至 end 节点 ### ✅ 持久化执行(sqlor) * 流程定义、实例、节点执行、人工任务全部持久化 * 引擎无状态,可水平扩展 * 支持调度器 / worker 分离部署 ### ✅ 人工节点(Human Task) * human 节点会生成待办任务 * 流程在人工节点阻塞 * 支持: * 角色(role)分配 * 用户领取 / 完成 * 输出回写流程上下文 ### ✅ 子流程(SubFlow) * 支持流程嵌套 * 父子流程上下文映射 * 子流程完成后自动回写父流程 - - - ## 表结构说明 ### flow\_definition(流程定义) | 字段 | 说明 | | --- | --- | | id | 流程定义 ID | | org\_id | 组织 ID | | name | 流程名称 | | version | 版本 | | dsl | YAML DSL | | created\_at | 创建时间 | - - - ### flow\_instance(流程实例) | 字段 | 说明 | | --- | --- | | id | 实例 ID | | org\_id | 组织 ID | | flow\_def\_id | 流程定义 ID | | status | running / finished | | ctx | 流程上下文(JSON) | | active\_nodes | 当前活跃节点 | | created\_at | 创建时间 | - - - ### node\_execution(节点执行记录) | 字段 | 说明 | | --- | --- | | id | 执行记录 ID | | org\_id | 组织 ID | | instance\_id | 流程实例 ID | | node\_id | 节点 ID | | input\_ctx | 输入上下文 | | output\_ctx | 输出上下文 | | status | success / failed | | error | 错误信息 | | created\_at | 执行时间 | - - - ### human\_task(人工任务) | 字段 | 说明 | | --- | --- | | id | 任务 ID | | org\_id | 组织 ID | | instance\_id | 流程实例 ID | | node\_id | 节点 ID | | role | 角色 | | assignee | 实际处理人 | | status | pending / done | | input | 输入上下文 | | output | 输出结果 | | timeout\_at | 超时时间 | | created\_at | 创建时间 | | completed\_at | 完成时间 | - - - ## DSL 示例 yaml 复制代码 `start: submit nodes: submit: type: normal approve: type: human role: manager timeout: 86400 end: type: end edges: - from: submit to: approve - from: approve to: end` - - - ## 核心 API(Engine) ### 创建流程定义 python 复制代码 `await engine.create_definition( org_id="org1", name="请假审批", version="v1", dsl_text=dsl_yaml )` ### 启动流程实例 python 复制代码 `instance_id = await engine.create_instance( org_id="org1", flow_def_id=flow_id, ctx={"user": "alice", "days": 3} )` ### 推进流程(调度器调用) python 复制代码 `await engine.step(org_id="org1", instance_id=instance_id)` - - - ## 人工任务 API ### 查询待办任务(RBAC) python 复制代码 `tasks = await engine.list_human_tasks( org_id="org1", user_roles=["manager", "admin"] )` ### 完成人工任务 python 复制代码 `await engine.complete_human_task( org_id="org1", task_id=task_id, user_id="bob", output={"approved": True} )` 完成后: * 任务状态 → done * 输出自动合并到流程 ctx * 流程可继续推进 - - - ## 架构设计 less 复制代码 `[ HTTP API ] | [ FlowEngine ] | [ sqlor ] | [ MySQL / PostgreSQL / SQLite ]` * Engine 无状态 * Scheduler 可多实例 * 支持分布式部署 - - - ## 适用场景 * 企业审批流(请假 / 报销 / 发布) * 自动化运维流程 * AI Agent 调度与编排 * SaaS 多租户流程平台 - - - ## 后续可扩展方向 * ✔ 乐观锁 / version 防并发 * ✔ 并行网关 / 排他网关 * ✔ 任务转派 / 会签 * ✔ SLA / 超时补偿 * ✔ 流程可视化建模器