7.3 KiB
7.3 KiB
Workflow Engine 使用说明书
本文档用于说明当前 Workflow Engine 的设计目标、核心概念、数据模型、DSL 用法、执行语义以及使用示例。不涉及未来规划能力。
1. 引擎定位与设计原则
1.1 引擎是什么
这是一个:
- DAG(有向无环图)流程引擎内核
- 支持 条件转移 / 并发 / 子流程
- 支持 Definition / Instance 分离
- 支持 数据库持久化与恢复
它不是:
- 简单任务编排脚本
- BPMN UI 工具
- 强绑定某种业务或 skill 的框架
1.2 核心设计原则
- 流程定义不可变,实例可变
- 所有运行状态必须可持久化
- 流程控制与业务执行解耦
- 子流程是边界,而不是函数调用
- 数据流必须显式声明(input / output mapping)
2. 核心概念说明
2.1 FlowDefinition(流程定义)
- 描述“流程长什么样”
- 由 YAML DSL 表达
- 可被多个流程实例复用
包含内容:
- 流程id(id)
- 节点集合(nodes)
- 边集合(edges)
- 流程启动上下文规格说明(ctx_spec)
2.2 FlowInstance(流程实例)
-
流程的一次真实运行
-
持有自己的:
- ctx(上下文)
- status(running / finished)
所有流转都发生在实例中,而不是定义中。
2.3 Node 类型
| 类型 | 说明 |
|---|---|
| start | 开始节点 |
| task | 普通执行节点(业务 / skill) |
| haman | 手工节点 |
| subflow | 子流程节点 |
| end | 结束节点 |
3. 数据模型(数据库表)
3.1 flow_definition
用于存储流程定义(DSL)。
关键字段:
- id:定义 ID
- name / version:业务标识
- dsl:YAML 内容
3.2 flow_instance
用于存储流程运行状态。
关键字段:
- id:实例 ID
- flow_def_id:关联定义
- ctx:JSON 上下文
- status:running / finished
3.3 node_execution
用于记录节点执行历史(审计 / 回放)。
关键字段:
- instance_id
- node_id
- input_ctx / output_ctx
- status
input_ctx和output_ctx是jinja2模版的字符串,
input_ctx使用流程实例上下文(ctx)作为数据来渲染,结果作为节点执行的输入字典
output_ctx使用节点执行结果的字典数据作为数据来渲染,结果update到流程的ctx中
实例节点有以下节点状态:
- pending 创建后的初始状态
- running 执行状态
- done 执行成功状态
- failed 执行失败状态
- cancelled 取消状态
- completed 处理完成 状态变化:
- pending 转running 如果节点有join属性,检查join条件,满足条件就转执行,否则等条件满足, 没有join属性则直接转running, 并调用节点执行方法,记录开始执行时间, 手工节点则由人工点击工作开始转换
- running zhuan done 节点执行成功,并记录完成时间
- running 转 failed 节点执行失败, 并记录完成时间
- pending 转 cancel 取消执行(手工或流程实例被取消)
- done, cancelled, failed 转 completed 检查从节点出发的边,做节点转移
3.4 subflow_instance
用于描述父子流程关系。
关键字段:
- instance_id
- node_id
- child_instance_id
- status
这是“子流程存在性”的核心表。
4. YAML DSL 说明
4.1 基本结构
id: order_flow_v1
start: start
nodes:
start: # 每个流程必须要有一个start
type: start # 类型必须是start
end: # 每个流程必须要有一个end
type: end # 类型必须上end
task1:
title: task1 title
type: task
description: desc
input_ctx: jinja2的模版字符串 # 可选, 缺省使用流程实例的全部ctx
output_ctx: jinja2的模版字符串 可选, 缺失全部输出更新到流程实例的ctx
subflow1:
title: subf
type: subflow
subflow_id: payment_flow_v2
input_ctx: jinja2的模版字符串 # 可选, 缺省使用流程实例的全部ctx
output_ctx: jinja2的模版字符串 # 可选, 子流程实例的ctx到本流程实例的缺失全部输出更新到流程实例的ctx
humantask1:
title: conform
type: human
description: desc
input_ctx: jinja2的模版字符串 # 可选, 缺省使用流程实例的全部ctx
output_ctx: jinja2的模版字符串 # 可选, 缺失全部输出更新到流程实例的ctx
edges:
- from: start
to: end
multile_on: ctx.charpter #
when: ctx.x == 'xxx' # 可选,转移条件
流程定义节点数据要求
除开始节点和结束节点外,所有节点都需要有
nodes:
node1:
type: # task, human, subflow之一
title: # 标题
description: # 节点描述
input_ctx: # jinja2模版字典数据字符串,用ctx渲染,用于构造给节点任务的输入字典,可选, 缺省使用流程实例的ctx
output_ctx: # jinja2模版字典数据字符串,用节点执行结果字典渲染,结果结果更新ctx, 可选, 缺失全部输出更新到流程实例的ctx
join节点
当一个节点中有join属性是为join节点,join 节点只有前置节点满足join条件是才能执行
nodes:
end:
type: end
join:
- all
- some(5 or 80%)
- xor
- any
4.2 条件转移
edges:
- from: decision
to: approved
when: ctx.ok == true
- from: decision
to: rejected
when: ctx.ok == false
条件表达式在运行时基于 ctx 求值。
4.3 并发(Fork / Join)
多实例节点
根据上下文的数据如果是数组为每个数据建立一个节点实例
edges:
- from: gen_docs
to: evalution_chapter
foreach: ctx.docs.chapters
Fork(隐式)
edges:
- from: fetch
to: validate
- from: fetch
to: enrich
5. 子流程(SubFlow)说明
5.1 子流程的定义方式
nodes:
payment:
type: subflow
flow: payment_flow_v1
input:
order_id: ctx.order.id
amount: ctx.order.total
output:
pay_status: ctx.payment.status
pay_id: ctx.payment.id
5.2 子流程的执行语义
-
父流程第一次到达 subflow 节点:
- 创建子流程实例
- 根据 input_ctx 构建子 ctx
- 父流程阻塞在该节点
-
子流程运行中:
- 父流程不推进
-
子流程完成:
- 根据 output_ctx 合并 ctx
- 父流程继续流转
6. 执行模型
6.1 推进方式
引擎通过反复调用:
engine.step(instance_id)
来推进流程。
- 每次 step 只推进“可推进的节点”
- 阻塞节点(join / subflow)会被保留在 active_nodes 中
6.2 结束判定
当:
- active_nodes 中全部为 end 节点
则流程实例状态被置为:
status = finished
7. 当前能力边界说明
已支持
- DAG 流程
- 条件转移
- 并发 / join
- 子流程(含 input/output mapping)
- 持久化 / 可恢复
明确未支持(但结构允许)
- 失败补偿
- 超时 / 重试
- 表达式安全沙箱
8. 总结
当前这套 Workflow Engine 是一个:
结构清晰、语义明确、可扩展的流程引擎内核
它适合作为:
- skills / agents 编排底座
- 业务流程控制层
- 长任务调度引擎
在不引入新能力的前提下,本说明书描述的即为当前版本的完整行为定义。