"""Pipeline execution engine - schedules and runs steps. Supports both automatic steps (machine handlers) and interactive steps (human_task, approval_gate) that pause execution waiting for human input. """ import asyncio import json import logging import traceback from typing import Dict from .state import ( STATE_PENDING, STATE_RUNNING, STATE_COMPLETED, STATE_FAILED, STATE_WAITING, TASK_RUNNING, TASK_COMPLETED, TASK_FAILED, TASK_PAUSED, TASK_WAITING, build_step_graph, find_next_step, has_waiting_steps, check_all_completed, check_any_failed, ) from .storage import ( get_pipeline_steps, get_step_states, update_task_state, update_step_state, save_artifact, get_artifact, create_human_task, ) from .handler import get_handler from .step_registry import is_interactive, get_step_type logger = logging.getLogger("pipeline.executor") # Active tasks: task_id -> asyncio.Task _active_tasks: Dict[str, asyncio.Task] = {} async def start_task(task_id: str): """Start executing a pipeline task.""" task = asyncio.create_task(_run_task(task_id)) _active_tasks[task_id] = task return task async def resume_task(task_id: str): """Resume a paused or waiting task.""" task = asyncio.create_task(_run_task(task_id)) _active_tasks[task_id] = task return task async def stop_task(task_id: str): """Cancel a running task.""" task = _active_tasks.get(task_id) if task and not task.done(): task.cancel() _active_tasks.pop(task_id, None) return True return False def is_running(task_id: str) -> bool: """Check if a task is currently executing.""" task = _active_tasks.get(task_id) return task is not None and not task.done() async def _run_task(task_id: str): """Main execution loop for a pipeline task.""" try: # Get task info (no tenant_id needed here - internal execution) db_info = await _get_task_raw(task_id) if not db_info: logger.error(f"Task {task_id} not found") return pipeline_id = db_info.get('pipeline_id', '') # Load step definitions and build graph step_records = await get_pipeline_steps(pipeline_id) if not step_records: logger.error(f"No steps defined for pipeline {pipeline_id}") await update_task_state(task_id, TASK_FAILED) return step_graph = build_step_graph(step_records) await update_task_state(task_id, TASK_RUNNING) while True: # Check if paused/cancelled current = await _get_task_raw(task_id) if not current: break state = current.get('state', current.get('State', '')) if state in (TASK_PAUSED, TASK_FAILED, 'cancelled'): logger.info(f"Task {task_id} stopped: {state}") break # Get current step states step_states = await get_step_states(task_id) # Check completion if check_all_completed(step_states): await update_task_state(task_id, TASK_COMPLETED) logger.info(f"Task {task_id} completed") break if check_any_failed(step_states): await update_task_state(task_id, TASK_FAILED) logger.warning(f"Task {task_id} failed (step failure)") break # Check if any step is waiting for human input if has_waiting_steps(step_states): await update_task_state(task_id, TASK_WAITING) logger.info(f"Task {task_id} waiting for human input") break # Find next executable step next_step = find_next_step(step_graph, step_states) if not next_step: # No executable step and no waiting steps - deadlock logger.warning(f"Task {task_id}: no executable step, states={step_states}") await update_task_state(task_id, TASK_FAILED) break # Execute the step await _execute_step(task_id, next_step, step_graph, current) except asyncio.CancelledError: logger.info(f"Task {task_id} cancelled") except Exception as e: logger.error(f"Task {task_id} error: {e}\n{traceback.format_exc()}") finally: _active_tasks.pop(task_id, None) async def _get_task_raw(task_id: str) -> dict: """Get task record without tenant filtering (internal use only).""" from sqlor.dbpools import DBPools db, dbname = DBPools(), 'pipeline' async with db.sqlorContext(dbname) as sor: recs = await sor.R('pipeline_tasks', {'id': task_id}) if not recs: return None rec = recs[0] if hasattr(rec, '__dict__'): return {k: getattr(rec, k) for k in dir(rec) if not k.startswith('_')} return dict(rec) async def _execute_step(task_id: str, step_name: str, step_graph: dict, task_info: dict): """Execute a single step. For interactive step_types (human_task, approval_gate): - Creates a human_tasks record - Sets step to WAITING state - Execution loop will detect waiting and pause the task For automatic step_types: - Calls handler, saves artifact, marks completed """ step_info = step_graph[step_name] step_type = step_info["step_type"] version = task_info.get('current_version', task_info.get('current_Version', 1)) tenant_id = task_info.get('tenant_id', task_info.get('tenant_Id', '')) await update_step_state(task_id, step_name, STATE_RUNNING) try: # Gather inputs from dependency outputs input_data = await _gather_inputs(task_id, version, step_info["deps"]) # Save input artifact await save_artifact(task_id, version, step_name, "input", input_data) # Check if this is an interactive step type if is_interactive(step_type): await _handle_interactive_step( task_id, step_name, step_type, version, input_data, step_info ) return # Look up handler by step_type handler = get_handler(step_type) if not handler: handler = get_handler("__default__") if not handler: raise ValueError(f"No handler for step_type '{step_type}' and no default handler") # Load step config for handler step_config = step_info.get("step_config", {}) if isinstance(step_config, str): try: step_config = json.loads(step_config) except (json.JSONDecodeError, TypeError): step_config = {} # Execute handler output_data = await handler(tenant_id, task_id, step_name, input_data, step_config) # Save output artifact await save_artifact(task_id, version, step_name, "output", output_data) await update_step_state(task_id, step_name, STATE_COMPLETED) logger.info(f"Step {step_name} completed for task {task_id}") except Exception as e: error_msg = str(e) logger.error(f"Step {step_name} failed for task {task_id}: {error_msg}") await update_step_state(task_id, step_name, STATE_FAILED, error_msg) async def _handle_interactive_step(task_id, step_name, step_type, version, input_data, step_info): """Handle an interactive step — create human task record and enter WAITING.""" # Get step type metadata meta = get_step_type(step_type) or {} # Extract assignment info from step_config step_config = step_info.get("step_config", {}) if isinstance(step_config, str): try: step_config = json.loads(step_config) except (json.JSONDecodeError, TypeError): step_config = {} assignee_role = step_config.get("assignee_role", "") assignee_id = step_config.get("assignee_id", "") form_schema = step_config.get("form_schema") or meta.get("form_schema") timeout_hours = step_config.get("timeout_hours") or meta.get("timeout_hours") # Create human task record await create_human_task( task_id=task_id, step_name=step_name, version=version, task_type=step_type, assignee_role=assignee_role, assignee_id=assignee_id, form_schema=form_schema, timeout_hours=timeout_hours, ) # Set step to waiting await update_step_state(task_id, step_name, STATE_WAITING) logger.info(f"Step {step_name} waiting for human input (type={step_type}, role={assignee_role})") async def _gather_inputs(task_id: str, version: int, deps: list) -> dict: """Gather input data from dependency step outputs.""" inputs = {} for dep in deps: art = await get_artifact(task_id, version, dep, "output") if art: inputs[dep] = art return inputs