dagflow/README.md
2026-03-13 17:14:16 +08:00

7.3 KiB
Raw Permalink Blame History

Workflow Engine 使用说明书

本文档用于说明当前 Workflow Engine 的设计目标、核心概念、数据模型、DSL 用法、执行语义以及使用示例。不涉及未来规划能力。


1. 引擎定位与设计原则

1.1 引擎是什么

这是一个:

  • DAG有向无环图流程引擎内核
  • 支持 条件转移 / 并发 / 子流程
  • 支持 Definition / Instance 分离
  • 支持 数据库持久化与恢复

它不是:

  • 简单任务编排脚本
  • BPMN UI 工具
  • 强绑定某种业务或 skill 的框架

1.2 核心设计原则

  1. 流程定义不可变,实例可变
  2. 所有运行状态必须可持久化
  3. 流程控制与业务执行解耦
  4. 子流程是边界,而不是函数调用
  5. 数据流必须显式声明input / output mapping

2. 核心概念说明

2.1 FlowDefinition流程定义

  • 描述“流程长什么样”
  • 由 YAML DSL 表达
  • 可被多个流程实例复用

包含内容:

  • 流程id(id)
  • 节点集合nodes
  • 边集合edges
  • 流程启动上下文规格说明ctx_spec)

2.2 FlowInstance流程实例

  • 流程的一次真实运行

  • 持有自己的:

    • ctx上下文
    • statusrunning / finished

所有流转都发生在实例中,而不是定义中。


2.3 Node 类型

类型 说明
start 开始节点
task 普通执行节点(业务 / skill
haman 手工节点
subflow 子流程节点
end 结束节点

3. 数据模型(数据库表)

3.1 flow_definition

用于存储流程定义DSL

关键字段:

  • id定义 ID
  • name / version业务标识
  • dslYAML 内容

3.2 flow_instance

用于存储流程运行状态。

关键字段:

  • id实例 ID
  • flow_def_id关联定义
  • ctxJSON 上下文
  • statusrunning / finished

3.3 node_execution

用于记录节点执行历史(审计 / 回放)。

关键字段:

  • instance_id
  • node_id
  • input_ctx / output_ctx
  • status

input_ctx和output_ctx是jinja2模版的字符串

input_ctx使用流程实例上下文ctx作为数据来渲染结果作为节点执行的输入字典

output_ctx使用节点执行结果的字典数据作为数据来渲染结果update到流程的ctx中

实例节点有以下节点状态:

  • pending 创建后的初始状态
  • running 执行状态
  • done 执行成功状态
  • failed 执行失败状态
  • cancelled 取消状态
  • completed 处理完成 状态变化:
  • pending 转running 如果节点有join属性检查join条件满足条件就转执行否则等条件满足 没有join属性则直接转running 并调用节点执行方法,记录开始执行时间, 手工节点则由人工点击工作开始转换
  • running zhuan done 节点执行成功,并记录完成时间
  • running 转 failed 节点执行失败, 并记录完成时间
  • pending 转 cancel 取消执行(手工或流程实例被取消)
  • done cancelled, failed 转 completed 检查从节点出发的边,做节点转移

3.4 subflow_instance

用于描述父子流程关系。

关键字段:

  • instance_id
  • node_id
  • child_instance_id
  • status

这是“子流程存在性”的核心表。


4. YAML DSL 说明

4.1 基本结构

id: order_flow_v1
start: start

nodes:
  start:			# 每个流程必须要有一个start
    type: start		# 类型必须是start

  end:				# 每个流程必须要有一个end
    type: end		# 类型必须上end
  
  task1:
    title: task1 title
	type: task
	description: desc
	input_ctx: jinja2的模版字符串	# 可选, 缺省使用流程实例的全部ctx
	output_ctx:	jinja2的模版字符串 可选, 缺失全部输出更新到流程实例的ctx
  
  subflow1:
	title: subf
	type: subflow
	subflow_id: payment_flow_v2
	input_ctx: jinja2的模版字符串 	# 可选, 缺省使用流程实例的全部ctx
	output_ctx: jinja2的模版字符串 	# 可选, 子流程实例的ctx到本流程实例的缺失全部输出更新到流程实例的ctx

  humantask1:
  	title: conform
	type: human
	description: desc
	input_ctx: jinja2的模版字符串 	# 可选, 缺省使用流程实例的全部ctx
	output_ctx: jinja2的模版字符串 	# 可选, 缺失全部输出更新到流程实例的ctx

edges:
  - from: start
    to: end
	multile_on: ctx.charpter			# 
	when: ctx.x == 'xxx' # 可选,转移条件

流程定义节点数据要求

除开始节点和结束节点外,所有节点都需要有

nodes:
  node1:
    type: 			# task, human, subflow之一
	title			# 标题
	description:	# 节点描述
	input_ctx:		# jinja2模版字典数据字符串用ctx渲染用于构造给节点任务的输入字典可选 缺省使用流程实例的ctx
	output_ctx:		# jinja2模版字典数据字符串用节点执行结果字典渲染结果结果更新ctx 可选, 缺失全部输出更新到流程实例的ctx

join节点

当一个节点中有join属性是为join节点join 节点只有前置节点满足join条件是才能执行

nodes:
  end:
    type: end
	join:
	  - all
	  - some(5 or 80%)
	  - xor
	  - any

4.2 条件转移

edges:
  - from: decision
    to: approved
    when: ctx.ok == true

  - from: decision
    to: rejected
    when: ctx.ok == false

条件表达式在运行时基于 ctx 求值。


4.3 并发Fork / Join

多实例节点

根据上下文的数据如果是数组为每个数据建立一个节点实例

edges:
  - from: gen_docs
    to: evalution_chapter
	foreach: ctx.docs.chapters

Fork隐式

edges:
  - from: fetch
    to: validate
  - from: fetch
    to: enrich

5. 子流程SubFlow说明

5.1 子流程的定义方式

nodes:
  payment:
    type: subflow
    flow: payment_flow_v1

    input:
      order_id: ctx.order.id
      amount: ctx.order.total

    output:
      pay_status: ctx.payment.status
      pay_id: ctx.payment.id

5.2 子流程的执行语义

  1. 父流程第一次到达 subflow 节点:

    • 创建子流程实例
    • 根据 input_ctx 构建子 ctx
    • 父流程阻塞在该节点
  2. 子流程运行中:

    • 父流程不推进
  3. 子流程完成:

    • 根据 output_ctx 合并 ctx
    • 父流程继续流转

6. 执行模型

6.1 推进方式

引擎通过反复调用:

engine.step(instance_id)

来推进流程。

  • 每次 step 只推进“可推进的节点”
  • 阻塞节点join / subflow会被保留在 active_nodes 中

6.2 结束判定

当:

  • active_nodes 中全部为 end 节点

则流程实例状态被置为:

status = finished

7. 当前能力边界说明

已支持

  • DAG 流程
  • 条件转移
  • 并发 / join
  • 子流程(含 input/output mapping
  • 持久化 / 可恢复

明确未支持(但结构允许)

  • 失败补偿
  • 超时 / 重试
  • 表达式安全沙箱

8. 总结

当前这套 Workflow Engine 是一个:

结构清晰、语义明确、可扩展的流程引擎内核

它适合作为:

  • skills / agents 编排底座
  • 业务流程控制层
  • 长任务调度引擎

在不引入新能力的前提下,本说明书描述的即为当前版本的完整行为定义