This commit is contained in:
yumoqing 2026-02-11 17:53:29 +08:00
parent cd9a231b6c
commit 8a9d682c13
3 changed files with 0 additions and 463 deletions

View File

@ -1,218 +0,0 @@
# -*- coding: utf-8 -*-
"""
Workflow Engine v1.0
===================
Features:
- FlowDefinition / FlowInstance separation
- YAML DSL for flow definition
- Conditional transitions
- Concurrent nodes (fork / join)
- Subflow support
- End node semantics
This is a **complete, minimal, coherent reference implementation**.
No framework binding (aiohttp/sqlor) yet engine core only.
"""
import yaml
import uuid
from typing import Dict, List, Set, Optional
# -----------------------------
# Definition layer
# -----------------------------
class NodeDefinition:
def __init__(self, id: str, type: str, config: dict | None = None):
self.id = id
self.type = type # task / decision / join / subflow / end
self.config = config or {}
class EdgeDefinition:
def __init__(self, source: str, target: str, condition: str | None = None):
self.source = source
self.target = target
self.condition = condition
class FlowDefinition:
def __init__(self, id: str, start: str,
nodes: Dict[str, NodeDefinition],
edges: List[EdgeDefinition]):
self.id = id
self.start = start
self.nodes = nodes
self.edges = edges
def outgoing(self, node_id: str) -> List[EdgeDefinition]:
return [e for e in self.edges if e.source == node_id]
class FlowDefinitionLoader:
@staticmethod
def from_yaml(text: str) -> FlowDefinition:
data = yaml.safe_load(text)
nodes = {
nid: NodeDefinition(nid, v['type'], v)
for nid, v in data['nodes'].items()
}
edges = [EdgeDefinition(e['from'], e['to'], e.get('when'))
for e in data.get('edges', [])]
return FlowDefinition(
id=data['id'],
start=data['start'],
nodes=nodes,
edges=edges
)
# -----------------------------
# Instance layer
# -----------------------------
class NodeExecution:
def __init__(self, instance_id: str, node_id: str, input_ctx: dict):
self.instance_id = instance_id
self.node_id = node_id
self.input_ctx = input_ctx
self.output_ctx: Optional[dict] = None
self.status = 'running' # running / success / failed
self.error: Optional[str] = None
class FlowInstance:
def __init__(self, flow_def: FlowDefinition, ctx: dict | None = None):
self.id = uuid.uuid4().hex
self.flow_def = flow_def
self.ctx = ctx or {}
self.active_nodes: Set[str] = {flow_def.start}
self.status = 'running'
self.executions: List[NodeExecution] = []
self.completed_nodes: Set[str] = set()
# -----------------------------
# Node executors
# -----------------------------
class BaseNodeExecutor:
def run(self, instance: FlowInstance, node_def: NodeDefinition) -> Optional[dict]:
raise NotImplementedError
class TaskNodeExecutor(BaseNodeExecutor):
def run(self, instance, node_def):
# Placeholder: real impl should call skill / function
return {}
class DecisionNodeExecutor(BaseNodeExecutor):
def run(self, instance, node_def):
return {}
class JoinNodeExecutor(BaseNodeExecutor):
def run(self, instance, node_def):
wait_for = set(node_def.config.get('wait_for', []))
if not wait_for.issubset(instance.completed_nodes):
return None # block
return {}
class SubFlowNodeExecutor(BaseNodeExecutor):
def run(self, instance, node_def):
# Simplified: real impl would create & step child FlowInstance
return {}
class EndNodeExecutor(BaseNodeExecutor):
def run(self, instance, node_def):
return {}
EXECUTORS = {
'task': TaskNodeExecutor(),
'decision': DecisionNodeExecutor(),
'join': JoinNodeExecutor(),
'subflow': SubFlowNodeExecutor(),
'end': EndNodeExecutor(),
}
# -----------------------------
# Engine
# -----------------------------
class FlowEngine:
def step(self, instance: FlowInstance):
if instance.status != 'running':
return
next_active: Set[str] = set()
for node_id in list(instance.active_nodes):
node_def = instance.flow_def.nodes[node_id]
executor = EXECUTORS[node_def.type]
execution = NodeExecution(instance.id, node_id, instance.ctx.copy())
result = executor.run(instance, node_def)
if result is None:
# blocked (join / subflow)
next_active.add(node_id)
continue
execution.output_ctx = result
execution.status = 'success'
instance.executions.append(execution)
instance.completed_nodes.add(node_id)
# merge ctx
instance.ctx.update(result)
# compute transitions
for edge in instance.flow_def.outgoing(node_id):
if edge.condition:
if not eval(edge.condition, {}, {'ctx': instance.ctx}):
continue
next_active.add(edge.target)
instance.active_nodes = next_active
# end check
if instance.active_nodes and all(
instance.flow_def.nodes[n].type == 'end'
for n in instance.active_nodes
):
instance.status = 'finished'
# -----------------------------
# Example
# -----------------------------
if __name__ == '__main__':
yaml_text = """
id: demo_flow
start: start
nodes:
start:
type: task
end:
type: end
edges:
- from: start
to: end
"""
flow_def = FlowDefinitionLoader.from_yaml(yaml_text)
instance = FlowInstance(flow_def, ctx={'hello': 'world'})
engine = FlowEngine()
while instance.status == 'running':
engine.step(instance)
print('finished:', instance.ctx)

0
dagflow/__init__.py Normal file
View File

245
t
View File

@ -1,245 +0,0 @@
# DagFlow Enterprise Workflow Engine
DagFlow 是一个 **企业级 DAG 工作流引擎**,基于 `sqlor + ahserver` 构建支持多组织org\_id隔离、子流程、人工节点审批任务、RBAC 角色分配,并提供清晰的流程实例与任务查询接口,适用于:
* 企业流程编排
* 审批流 / 工单流
* 自动化运维 / AI Agent 编排
* 多租户 SaaS 工作流系统
- - -
## 核心特性
### ✅ 多租户隔离org\_id
* 所有核心数据表均包含 `org_id`
* 流程定义、流程实例、任务实例完全隔离
* 天然支持同一流程在不同组织复用
### ✅ DAG 工作流引擎
* 基于 YAML DSL 描述流程
* 支持条件边(`when`
* 支持并行节点
* 自动推进 / 收敛至 end 节点
### ✅ 持久化执行sqlor
* 流程定义、实例、节点执行、人工任务全部持久化
* 引擎无状态,可水平扩展
* 支持调度器 / worker 分离部署
### ✅ 人工节点Human Task
* human 节点会生成待办任务
* 流程在人工节点阻塞
* 支持:
* 角色role分配
* 用户领取 / 完成
* 输出回写流程上下文
### ✅ 子流程SubFlow
* 支持流程嵌套
* 父子流程上下文映射
* 子流程完成后自动回写父流程
- - -
## 表结构说明
### flow\_definition流程定义
| 字段 | 说明 |
| --- | --- |
| id | 流程定义 ID |
| org\_id | 组织 ID |
| name | 流程名称 |
| version | 版本 |
| dsl | YAML DSL |
| created\_at | 创建时间 |
- - -
### flow\_instance流程实例
| 字段 | 说明 |
| --- | --- |
| id | 实例 ID |
| org\_id | 组织 ID |
| flow\_def\_id | 流程定义 ID |
| status | running / finished |
| ctx | 流程上下文JSON |
| active\_nodes | 当前活跃节点 |
| created\_at | 创建时间 |
- - -
### node\_execution节点执行记录
| 字段 | 说明 |
| --- | --- |
| id | 执行记录 ID |
| org\_id | 组织 ID |
| instance\_id | 流程实例 ID |
| node\_id | 节点 ID |
| input\_ctx | 输入上下文 |
| output\_ctx | 输出上下文 |
| status | success / failed |
| error | 错误信息 |
| created\_at | 执行时间 |
- - -
### human\_task人工任务
| 字段 | 说明 |
| --- | --- |
| id | 任务 ID |
| org\_id | 组织 ID |
| instance\_id | 流程实例 ID |
| node\_id | 节点 ID |
| role | 角色 |
| assignee | 实际处理人 |
| status | pending / done |
| input | 输入上下文 |
| output | 输出结果 |
| timeout\_at | 超时时间 |
| created\_at | 创建时间 |
| completed\_at | 完成时间 |
- - -
## DSL 示例
yaml
复制代码
`start: submit nodes: submit: type: normal approve: type: human role: manager timeout: 86400 end: type: end edges: - from: submit to: approve - from: approve to: end`
- - -
## 核心 APIEngine
### 创建流程定义
python
复制代码
`await engine.create_definition( org_id="org1", name="请假审批", version="v1", dsl_text=dsl_yaml )`
### 启动流程实例
python
复制代码
`instance_id = await engine.create_instance( org_id="org1", flow_def_id=flow_id, ctx={"user": "alice", "days": 3} )`
### 推进流程(调度器调用)
python
复制代码
`await engine.step(org_id="org1", instance_id=instance_id)`
- - -
## 人工任务 API
### 查询待办任务RBAC
python
复制代码
`tasks = await engine.list_human_tasks( org_id="org1", user_roles=["manager", "admin"] )`
### 完成人工任务
python
复制代码
`await engine.complete_human_task( org_id="org1", task_id=task_id, user_id="bob", output={"approved": True} )`
完成后:
* 任务状态 → done
* 输出自动合并到流程 ctx
* 流程可继续推进
- - -
## 架构设计
less
复制代码
`[ HTTP API ] | [ FlowEngine ] | [ sqlor ] | [ MySQL / PostgreSQL / SQLite ]`
* Engine 无状态
* Scheduler 可多实例
* 支持分布式部署
- - -
## 适用场景
* 企业审批流(请假 / 报销 / 发布)
* 自动化运维流程
* AI Agent 调度与编排
* SaaS 多租户流程平台
- - -
## 后续可扩展方向
* ✔ 乐观锁 / version 防并发
* ✔ 并行网关 / 排他网关
* ✔ 任务转派 / 会签
* ✔ SLA / 超时补偿
* ✔ 流程可视化建模器