"""Pipeline state machine - dynamic DAG resolution from pipeline_steps table.""" import json from typing import Dict, List, Optional # Step states STATE_PENDING = "pending" STATE_RUNNING = "running" STATE_COMPLETED = "completed" STATE_FAILED = "failed" STATE_SKIPPED = "skipped" STATE_WAITING = "waiting" # 等待人工输入 STATE_REJECTED = "rejected" # 审批驳回 # Pipeline task states TASK_SUBMITTED = "submitted" TASK_RUNNING = "running" TASK_COMPLETED = "completed" TASK_FAILED = "failed" TASK_PAUSED = "paused" TASK_CANCELLED = "cancelled" TASK_WAITING = "waiting" # 等待人工步骤完成 # Human task statuses HUMAN_PENDING = "pending" HUMAN_SUBMITTED = "submitted" HUMAN_APPROVED = "approved" HUMAN_REJECTED = "rejected" HUMAN_EXPIRED = "expired" def build_step_graph(step_records: list) -> Dict[str, dict]: """Build dependency map from pipeline_steps table records. Args: step_records: list of dicts from pipeline_steps table, each with: - step_name, step_type, display_name, step_order, deps (JSON string) Returns: {step_name: {"deps": [...], "dependents": [...], "step_type": "...", "display_name": "...", "order": int}} """ dep_map = {} for rec in step_records: name = rec['step_name'] deps_raw = rec.get('deps', '[]') if isinstance(deps_raw, str): deps = json.loads(deps_raw) if deps_raw else [] else: deps = deps_raw if deps_raw else [] dep_map[name] = { "deps": deps, "dependents": [], "step_type": rec.get('step_type', name), "display_name": rec.get('display_name', name), "order": rec.get('step_order', 0), } # Build reverse mapping (dependents) for name, info in dep_map.items(): for dep in info["deps"]: if dep in dep_map: dep_map[dep]["dependents"].append(name) return dep_map def find_next_step(step_graph: Dict[str, dict], step_states: Dict[str, str]) -> Optional[str]: """Find the next step to execute: pending + all deps completed/skipped. Args: step_graph: from build_step_graph() step_states: {step_name: current_state} Returns: step_name or None if all done / blocked / waiting """ # Terminal states that count as "done" for dependency resolution done_states = {STATE_COMPLETED, STATE_SKIPPED} candidates = [] for name, info in step_graph.items(): if step_states.get(name) != STATE_PENDING: continue deps_ok = all( step_states.get(dep) in done_states for dep in info["deps"] ) if deps_ok: candidates.append(name) if not candidates: return None # Sort by order, return first candidates.sort(key=lambda s: step_graph[s]["order"]) return candidates[0] def has_waiting_steps(step_states: Dict[str, str]) -> bool: """Check if any step is in waiting state.""" return any(s == STATE_WAITING for s in step_states.values()) def has_rejected_steps(step_states: Dict[str, str]) -> bool: """Check if any step has been rejected.""" return any(s == STATE_REJECTED for s in step_states.values()) def get_cascade_rerun_steps(step_graph: Dict[str, dict], from_step: str) -> List[str]: """BFS from modified step through dependents. Returns ordered list.""" if from_step not in step_graph: return [] visited = set() queue = [from_step] result = [] while queue: current = queue.pop(0) if current in visited: continue visited.add(current) result.append(current) for dep in step_graph.get(current, {}).get("dependents", []): if dep not in visited: queue.append(dep) result.sort(key=lambda s: step_graph.get(s, {}).get("order", 999)) return result def get_rerun_from_next(step_graph: Dict[str, dict], from_step: str) -> List[str]: """When output is modified, rerun from downstream dependents only.""" if from_step not in step_graph: return [] direct_dependents = step_graph[from_step]["dependents"] all_steps = set() for d in direct_dependents: all_steps.update(get_cascade_rerun_steps(step_graph, d)) result = list(all_steps) result.sort(key=lambda s: step_graph.get(s, {}).get("order", 999)) return result def check_all_completed(step_states: Dict[str, str]) -> bool: """Check if all steps are completed or skipped.""" for state in step_states.values(): if state not in (STATE_COMPLETED, STATE_SKIPPED): return False return True def check_any_failed(step_states: Dict[str, str]) -> bool: """Check if any step has failed or been rejected.""" return any(s in (STATE_FAILED, STATE_REJECTED) for s in step_states.values())