"""Pipeline state machine - dynamic DAG resolution from pipeline_steps table.""" import json from typing import Dict, List, Optional # Step states STATE_PENDING = "pending" STATE_RUNNING = "running" STATE_COMPLETED = "completed" STATE_FAILED = "failed" STATE_SKIPPED = "skipped" # Pipeline task states TASK_SUBMITTED = "submitted" TASK_RUNNING = "running" TASK_COMPLETED = "completed" TASK_FAILED = "failed" TASK_PAUSED = "paused" TASK_CANCELLED = "cancelled" def build_step_graph(step_records: list) -> Dict[str, dict]: """Build dependency map from pipeline_steps table records. Args: step_records: list of dicts from pipeline_steps table, each with: - step_name, step_type, display_name, step_order, deps (JSON string) Returns: {step_name: {"deps": [...], "dependents": [...], "step_type": "...", "display_name": "...", "order": int}} """ dep_map = {} for rec in step_records: name = rec['step_name'] deps_raw = rec.get('deps', '[]') if isinstance(deps_raw, str): deps = json.loads(deps_raw) if deps_raw else [] else: deps = deps_raw if deps_raw else [] dep_map[name] = { "deps": deps, "dependents": [], "step_type": rec.get('step_type', name), "display_name": rec.get('display_name', name), "order": rec.get('step_order', 0), } # Build reverse mapping (dependents) for name, info in dep_map.items(): for dep in info["deps"]: if dep in dep_map: dep_map[dep]["dependents"].append(name) return dep_map def find_next_step(step_graph: Dict[str, dict], step_states: Dict[str, str]) -> Optional[str]: """Find the next step to execute: pending + all deps completed. Args: step_graph: from build_step_graph() step_states: {step_name: current_state} Returns: step_name or None if all done / blocked """ candidates = [] for name, info in step_graph.items(): if step_states.get(name) != STATE_PENDING: continue deps_ok = all( step_states.get(dep) == STATE_COMPLETED for dep in info["deps"] ) if deps_ok: candidates.append(name) if not candidates: return None # Sort by order, return first candidates.sort(key=lambda s: step_graph[s]["order"]) return candidates[0] def get_cascade_rerun_steps(step_graph: Dict[str, dict], from_step: str) -> List[str]: """BFS from modified step through dependents. Returns ordered list.""" if from_step not in step_graph: return [] visited = set() queue = [from_step] result = [] while queue: current = queue.pop(0) if current in visited: continue visited.add(current) result.append(current) for dep in step_graph.get(current, {}).get("dependents", []): if dep not in visited: queue.append(dep) result.sort(key=lambda s: step_graph.get(s, {}).get("order", 999)) return result def get_rerun_from_next(step_graph: Dict[str, dict], from_step: str) -> List[str]: """When output is modified, rerun from downstream dependents only.""" if from_step not in step_graph: return [] direct_dependents = step_graph[from_step]["dependents"] all_steps = set() for d in direct_dependents: all_steps.update(get_cascade_rerun_steps(step_graph, d)) result = list(all_steps) result.sort(key=lambda s: step_graph.get(s, {}).get("order", 999)) return result def check_all_completed(step_states: Dict[str, str]) -> bool: """Check if all steps are completed or skipped.""" for state in step_states.values(): if state not in (STATE_COMPLETED, STATE_SKIPPED): return False return True def check_any_failed(step_states: Dict[str, str]) -> bool: """Check if any step has failed.""" return any(s == STATE_FAILED for s in step_states.values())