diff --git a/dagflow.py b/dagflow.py deleted file mode 100644 index cbdd283..0000000 --- a/dagflow.py +++ /dev/null @@ -1,218 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Workflow Engine v1.0 -=================== - -Features: -- FlowDefinition / FlowInstance separation -- YAML DSL for flow definition -- Conditional transitions -- Concurrent nodes (fork / join) -- Subflow support -- End node semantics - -This is a **complete, minimal, coherent reference implementation**. -No framework binding (aiohttp/sqlor) yet – engine core only. -""" - -import yaml -import uuid -from typing import Dict, List, Set, Optional - -# ----------------------------- -# Definition layer -# ----------------------------- - -class NodeDefinition: - def __init__(self, id: str, type: str, config: dict | None = None): - self.id = id - self.type = type # task / decision / join / subflow / end - self.config = config or {} - - -class EdgeDefinition: - def __init__(self, source: str, target: str, condition: str | None = None): - self.source = source - self.target = target - self.condition = condition - - -class FlowDefinition: - def __init__(self, id: str, start: str, - nodes: Dict[str, NodeDefinition], - edges: List[EdgeDefinition]): - self.id = id - self.start = start - self.nodes = nodes - self.edges = edges - - def outgoing(self, node_id: str) -> List[EdgeDefinition]: - return [e for e in self.edges if e.source == node_id] - - -class FlowDefinitionLoader: - @staticmethod - def from_yaml(text: str) -> FlowDefinition: - data = yaml.safe_load(text) - nodes = { - nid: NodeDefinition(nid, v['type'], v) - for nid, v in data['nodes'].items() - } - edges = [EdgeDefinition(e['from'], e['to'], e.get('when')) - for e in data.get('edges', [])] - return FlowDefinition( - id=data['id'], - start=data['start'], - nodes=nodes, - edges=edges - ) - - -# ----------------------------- -# Instance layer -# ----------------------------- - -class NodeExecution: - def __init__(self, instance_id: str, node_id: str, input_ctx: dict): - self.instance_id = instance_id - self.node_id = node_id - self.input_ctx = input_ctx - self.output_ctx: Optional[dict] = None - self.status = 'running' # running / success / failed - self.error: Optional[str] = None - - -class FlowInstance: - def __init__(self, flow_def: FlowDefinition, ctx: dict | None = None): - self.id = uuid.uuid4().hex - self.flow_def = flow_def - self.ctx = ctx or {} - self.active_nodes: Set[str] = {flow_def.start} - self.status = 'running' - self.executions: List[NodeExecution] = [] - self.completed_nodes: Set[str] = set() - - -# ----------------------------- -# Node executors -# ----------------------------- - -class BaseNodeExecutor: - def run(self, instance: FlowInstance, node_def: NodeDefinition) -> Optional[dict]: - raise NotImplementedError - - -class TaskNodeExecutor(BaseNodeExecutor): - def run(self, instance, node_def): - # Placeholder: real impl should call skill / function - return {} - - -class DecisionNodeExecutor(BaseNodeExecutor): - def run(self, instance, node_def): - return {} - - -class JoinNodeExecutor(BaseNodeExecutor): - def run(self, instance, node_def): - wait_for = set(node_def.config.get('wait_for', [])) - if not wait_for.issubset(instance.completed_nodes): - return None # block - return {} - - -class SubFlowNodeExecutor(BaseNodeExecutor): - def run(self, instance, node_def): - # Simplified: real impl would create & step child FlowInstance - return {} - - -class EndNodeExecutor(BaseNodeExecutor): - def run(self, instance, node_def): - return {} - - -EXECUTORS = { - 'task': TaskNodeExecutor(), - 'decision': DecisionNodeExecutor(), - 'join': JoinNodeExecutor(), - 'subflow': SubFlowNodeExecutor(), - 'end': EndNodeExecutor(), -} - - -# ----------------------------- -# Engine -# ----------------------------- - -class FlowEngine: - def step(self, instance: FlowInstance): - if instance.status != 'running': - return - - next_active: Set[str] = set() - - for node_id in list(instance.active_nodes): - node_def = instance.flow_def.nodes[node_id] - executor = EXECUTORS[node_def.type] - - execution = NodeExecution(instance.id, node_id, instance.ctx.copy()) - result = executor.run(instance, node_def) - - if result is None: - # blocked (join / subflow) - next_active.add(node_id) - continue - - execution.output_ctx = result - execution.status = 'success' - instance.executions.append(execution) - instance.completed_nodes.add(node_id) - - # merge ctx - instance.ctx.update(result) - - # compute transitions - for edge in instance.flow_def.outgoing(node_id): - if edge.condition: - if not eval(edge.condition, {}, {'ctx': instance.ctx}): - continue - next_active.add(edge.target) - - instance.active_nodes = next_active - - # end check - if instance.active_nodes and all( - instance.flow_def.nodes[n].type == 'end' - for n in instance.active_nodes - ): - instance.status = 'finished' - - -# ----------------------------- -# Example -# ----------------------------- - -if __name__ == '__main__': - yaml_text = """ - id: demo_flow - start: start - nodes: - start: - type: task - end: - type: end - edges: - - from: start - to: end - """ - - flow_def = FlowDefinitionLoader.from_yaml(yaml_text) - instance = FlowInstance(flow_def, ctx={'hello': 'world'}) - - engine = FlowEngine() - while instance.status == 'running': - engine.step(instance) - - print('finished:', instance.ctx) - diff --git a/dagflow/__init__.py b/dagflow/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/t b/t deleted file mode 100644 index f31e8a0..0000000 --- a/t +++ /dev/null @@ -1,245 +0,0 @@ -# DagFlow – Enterprise Workflow Engine - -DagFlow 是一个 **企业级 DAG 工作流引擎**,基于 `sqlor + ahserver` 构建,支持多组织(org\_id)隔离、子流程、人工节点(审批任务)、RBAC 角色分配,并提供清晰的流程实例与任务查询接口,适用于: - -* 企业流程编排 - -* 审批流 / 工单流 - -* 自动化运维 / AI Agent 编排 - -* 多租户 SaaS 工作流系统 - - -- - - - -## 核心特性 - -### ✅ 多租户隔离(org\_id) - -* 所有核心数据表均包含 `org_id` - -* 流程定义、流程实例、任务实例完全隔离 - -* 天然支持同一流程在不同组织复用 - - -### ✅ DAG 工作流引擎 - -* 基于 YAML DSL 描述流程 - -* 支持条件边(`when`) - -* 支持并行节点 - -* 自动推进 / 收敛至 end 节点 - - -### ✅ 持久化执行(sqlor) - -* 流程定义、实例、节点执行、人工任务全部持久化 - -* 引擎无状态,可水平扩展 - -* 支持调度器 / worker 分离部署 - - -### ✅ 人工节点(Human Task) - -* human 节点会生成待办任务 - -* 流程在人工节点阻塞 - -* 支持: - - * 角色(role)分配 - - * 用户领取 / 完成 - - * 输出回写流程上下文 - - -### ✅ 子流程(SubFlow) - -* 支持流程嵌套 - -* 父子流程上下文映射 - -* 子流程完成后自动回写父流程 - - -- - - - -## 表结构说明 - -### flow\_definition(流程定义) - -| 字段 | 说明 | -| --- | --- | -| id | 流程定义 ID | -| org\_id | 组织 ID | -| name | 流程名称 | -| version | 版本 | -| dsl | YAML DSL | -| created\_at | 创建时间 | - -- - - - -### flow\_instance(流程实例) - -| 字段 | 说明 | -| --- | --- | -| id | 实例 ID | -| org\_id | 组织 ID | -| flow\_def\_id | 流程定义 ID | -| status | running / finished | -| ctx | 流程上下文(JSON) | -| active\_nodes | 当前活跃节点 | -| created\_at | 创建时间 | - -- - - - -### node\_execution(节点执行记录) - -| 字段 | 说明 | -| --- | --- | -| id | 执行记录 ID | -| org\_id | 组织 ID | -| instance\_id | 流程实例 ID | -| node\_id | 节点 ID | -| input\_ctx | 输入上下文 | -| output\_ctx | 输出上下文 | -| status | success / failed | -| error | 错误信息 | -| created\_at | 执行时间 | - -- - - - -### human\_task(人工任务) - -| 字段 | 说明 | -| --- | --- | -| id | 任务 ID | -| org\_id | 组织 ID | -| instance\_id | 流程实例 ID | -| node\_id | 节点 ID | -| role | 角色 | -| assignee | 实际处理人 | -| status | pending / done | -| input | 输入上下文 | -| output | 输出结果 | -| timeout\_at | 超时时间 | -| created\_at | 创建时间 | -| completed\_at | 完成时间 | - -- - - - -## DSL 示例 - -yaml - -复制代码 - -`start: submit nodes: submit: type: normal approve: type: human role: manager timeout: 86400 end: type: end edges: - from: submit to: approve - from: approve to: end` - -- - - - -## 核心 API(Engine) - -### 创建流程定义 - -python - -复制代码 - -`await engine.create_definition( org_id="org1", name="请假审批", version="v1", dsl_text=dsl_yaml )` - -### 启动流程实例 - -python - -复制代码 - -`instance_id = await engine.create_instance( org_id="org1", flow_def_id=flow_id, ctx={"user": "alice", "days": 3} )` - -### 推进流程(调度器调用) - -python - -复制代码 - -`await engine.step(org_id="org1", instance_id=instance_id)` - -- - - - -## 人工任务 API - -### 查询待办任务(RBAC) - -python - -复制代码 - -`tasks = await engine.list_human_tasks( org_id="org1", user_roles=["manager", "admin"] )` - -### 完成人工任务 - -python - -复制代码 - -`await engine.complete_human_task( org_id="org1", task_id=task_id, user_id="bob", output={"approved": True} )` - -完成后: - -* 任务状态 → done - -* 输出自动合并到流程 ctx - -* 流程可继续推进 - - -- - - - -## 架构设计 - -less - -复制代码 - -`[ HTTP API ] | [ FlowEngine ] | [ sqlor ] | [ MySQL / PostgreSQL / SQLite ]` - -* Engine 无状态 - -* Scheduler 可多实例 - -* 支持分布式部署 - - -- - - - -## 适用场景 - -* 企业审批流(请假 / 报销 / 发布) - -* 自动化运维流程 - -* AI Agent 调度与编排 - -* SaaS 多租户流程平台 - - -- - - - -## 后续可扩展方向 - -* ✔ 乐观锁 / version 防并发 - -* ✔ 并行网关 / 排他网关 - -* ✔ 任务转派 / 会签 - -* ✔ SLA / 超时补偿 - -* ✔ 流程可视化建模器 -