dagflow/dagflow.py
2026-01-26 21:17:15 +08:00

219 lines
5.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
Workflow Engine v1.0
===================
Features:
- FlowDefinition / FlowInstance separation
- YAML DSL for flow definition
- Conditional transitions
- Concurrent nodes (fork / join)
- Subflow support
- End node semantics
This is a **complete, minimal, coherent reference implementation**.
No framework binding (aiohttp/sqlor) yet engine core only.
"""
import yaml
import uuid
from typing import Dict, List, Set, Optional
# -----------------------------
# Definition layer
# -----------------------------
class NodeDefinition:
def __init__(self, id: str, type: str, config: dict | None = None):
self.id = id
self.type = type # task / decision / join / subflow / end
self.config = config or {}
class EdgeDefinition:
def __init__(self, source: str, target: str, condition: str | None = None):
self.source = source
self.target = target
self.condition = condition
class FlowDefinition:
def __init__(self, id: str, start: str,
nodes: Dict[str, NodeDefinition],
edges: List[EdgeDefinition]):
self.id = id
self.start = start
self.nodes = nodes
self.edges = edges
def outgoing(self, node_id: str) -> List[EdgeDefinition]:
return [e for e in self.edges if e.source == node_id]
class FlowDefinitionLoader:
@staticmethod
def from_yaml(text: str) -> FlowDefinition:
data = yaml.safe_load(text)
nodes = {
nid: NodeDefinition(nid, v['type'], v)
for nid, v in data['nodes'].items()
}
edges = [EdgeDefinition(e['from'], e['to'], e.get('when'))
for e in data.get('edges', [])]
return FlowDefinition(
id=data['id'],
start=data['start'],
nodes=nodes,
edges=edges
)
# -----------------------------
# Instance layer
# -----------------------------
class NodeExecution:
def __init__(self, instance_id: str, node_id: str, input_ctx: dict):
self.instance_id = instance_id
self.node_id = node_id
self.input_ctx = input_ctx
self.output_ctx: Optional[dict] = None
self.status = 'running' # running / success / failed
self.error: Optional[str] = None
class FlowInstance:
def __init__(self, flow_def: FlowDefinition, ctx: dict | None = None):
self.id = uuid.uuid4().hex
self.flow_def = flow_def
self.ctx = ctx or {}
self.active_nodes: Set[str] = {flow_def.start}
self.status = 'running'
self.executions: List[NodeExecution] = []
self.completed_nodes: Set[str] = set()
# -----------------------------
# Node executors
# -----------------------------
class BaseNodeExecutor:
def run(self, instance: FlowInstance, node_def: NodeDefinition) -> Optional[dict]:
raise NotImplementedError
class TaskNodeExecutor(BaseNodeExecutor):
def run(self, instance, node_def):
# Placeholder: real impl should call skill / function
return {}
class DecisionNodeExecutor(BaseNodeExecutor):
def run(self, instance, node_def):
return {}
class JoinNodeExecutor(BaseNodeExecutor):
def run(self, instance, node_def):
wait_for = set(node_def.config.get('wait_for', []))
if not wait_for.issubset(instance.completed_nodes):
return None # block
return {}
class SubFlowNodeExecutor(BaseNodeExecutor):
def run(self, instance, node_def):
# Simplified: real impl would create & step child FlowInstance
return {}
class EndNodeExecutor(BaseNodeExecutor):
def run(self, instance, node_def):
return {}
EXECUTORS = {
'task': TaskNodeExecutor(),
'decision': DecisionNodeExecutor(),
'join': JoinNodeExecutor(),
'subflow': SubFlowNodeExecutor(),
'end': EndNodeExecutor(),
}
# -----------------------------
# Engine
# -----------------------------
class FlowEngine:
def step(self, instance: FlowInstance):
if instance.status != 'running':
return
next_active: Set[str] = set()
for node_id in list(instance.active_nodes):
node_def = instance.flow_def.nodes[node_id]
executor = EXECUTORS[node_def.type]
execution = NodeExecution(instance.id, node_id, instance.ctx.copy())
result = executor.run(instance, node_def)
if result is None:
# blocked (join / subflow)
next_active.add(node_id)
continue
execution.output_ctx = result
execution.status = 'success'
instance.executions.append(execution)
instance.completed_nodes.add(node_id)
# merge ctx
instance.ctx.update(result)
# compute transitions
for edge in instance.flow_def.outgoing(node_id):
if edge.condition:
if not eval(edge.condition, {}, {'ctx': instance.ctx}):
continue
next_active.add(edge.target)
instance.active_nodes = next_active
# end check
if instance.active_nodes and all(
instance.flow_def.nodes[n].type == 'end'
for n in instance.active_nodes
):
instance.status = 'finished'
# -----------------------------
# Example
# -----------------------------
if __name__ == '__main__':
yaml_text = """
id: demo_flow
start: start
nodes:
start:
type: task
end:
type: end
edges:
- from: start
to: end
"""
flow_def = FlowDefinitionLoader.from_yaml(yaml_text)
instance = FlowInstance(flow_def, ctx={'hello': 'world'})
engine = FlowEngine()
while instance.status == 'running':
engine.step(instance)
print('finished:', instance.ctx)