- New states: waiting (step/task), rejected (step) - New tables: pipeline_human_tasks, pipeline_step_types - New module: step_registry.py — pluggable step_type metadata - New module: human.py — human_complete, approval_approve, approval_reject - Executor: detects interactive step_types, creates human_tasks, enters waiting - Reject with rollback: approval_reject(rollback_to=step) resets steps and re-runs - API: human_task_complete, approval_approve, approval_reject, human_task_list - API: pipeline_step_types, pipeline_register_step_type, pipeline_unregister_step_type - Built-in interactive types: human_task, approval_gate - Updated DDL and appcodes
252 lines
8.7 KiB
Python
252 lines
8.7 KiB
Python
"""Pipeline execution engine - schedules and runs steps.
|
|
|
|
Supports both automatic steps (machine handlers) and interactive steps
|
|
(human_task, approval_gate) that pause execution waiting for human input.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import traceback
|
|
from typing import Dict
|
|
|
|
from .state import (
|
|
STATE_PENDING, STATE_RUNNING, STATE_COMPLETED, STATE_FAILED, STATE_WAITING,
|
|
TASK_RUNNING, TASK_COMPLETED, TASK_FAILED, TASK_PAUSED, TASK_WAITING,
|
|
build_step_graph, find_next_step, has_waiting_steps,
|
|
check_all_completed, check_any_failed,
|
|
)
|
|
from .storage import (
|
|
get_pipeline_steps, get_step_states,
|
|
update_task_state, update_step_state,
|
|
save_artifact, get_artifact, create_human_task,
|
|
)
|
|
from .handler import get_handler
|
|
from .step_registry import is_interactive, get_step_type
|
|
|
|
logger = logging.getLogger("pipeline.executor")
|
|
|
|
# Active tasks: task_id -> asyncio.Task
|
|
_active_tasks: Dict[str, asyncio.Task] = {}
|
|
|
|
|
|
async def start_task(task_id: str):
|
|
"""Start executing a pipeline task."""
|
|
task = asyncio.create_task(_run_task(task_id))
|
|
_active_tasks[task_id] = task
|
|
return task
|
|
|
|
|
|
async def resume_task(task_id: str):
|
|
"""Resume a paused or waiting task."""
|
|
task = asyncio.create_task(_run_task(task_id))
|
|
_active_tasks[task_id] = task
|
|
return task
|
|
|
|
|
|
async def stop_task(task_id: str):
|
|
"""Cancel a running task."""
|
|
task = _active_tasks.get(task_id)
|
|
if task and not task.done():
|
|
task.cancel()
|
|
_active_tasks.pop(task_id, None)
|
|
return True
|
|
return False
|
|
|
|
|
|
def is_running(task_id: str) -> bool:
|
|
"""Check if a task is currently executing."""
|
|
task = _active_tasks.get(task_id)
|
|
return task is not None and not task.done()
|
|
|
|
|
|
async def _run_task(task_id: str):
|
|
"""Main execution loop for a pipeline task."""
|
|
try:
|
|
# Get task info (no tenant_id needed here - internal execution)
|
|
db_info = await _get_task_raw(task_id)
|
|
if not db_info:
|
|
logger.error(f"Task {task_id} not found")
|
|
return
|
|
|
|
pipeline_id = db_info.get('pipeline_id', '')
|
|
|
|
# Load step definitions and build graph
|
|
step_records = await get_pipeline_steps(pipeline_id)
|
|
if not step_records:
|
|
logger.error(f"No steps defined for pipeline {pipeline_id}")
|
|
await update_task_state(task_id, TASK_FAILED)
|
|
return
|
|
|
|
step_graph = build_step_graph(step_records)
|
|
|
|
await update_task_state(task_id, TASK_RUNNING)
|
|
|
|
while True:
|
|
# Check if paused/cancelled
|
|
current = await _get_task_raw(task_id)
|
|
if not current:
|
|
break
|
|
state = current.get('state', current.get('State', ''))
|
|
if state in (TASK_PAUSED, TASK_FAILED, 'cancelled'):
|
|
logger.info(f"Task {task_id} stopped: {state}")
|
|
break
|
|
|
|
# Get current step states
|
|
step_states = await get_step_states(task_id)
|
|
|
|
# Check completion
|
|
if check_all_completed(step_states):
|
|
await update_task_state(task_id, TASK_COMPLETED)
|
|
logger.info(f"Task {task_id} completed")
|
|
break
|
|
|
|
if check_any_failed(step_states):
|
|
await update_task_state(task_id, TASK_FAILED)
|
|
logger.warning(f"Task {task_id} failed (step failure)")
|
|
break
|
|
|
|
# Check if any step is waiting for human input
|
|
if has_waiting_steps(step_states):
|
|
await update_task_state(task_id, TASK_WAITING)
|
|
logger.info(f"Task {task_id} waiting for human input")
|
|
break
|
|
|
|
# Find next executable step
|
|
next_step = find_next_step(step_graph, step_states)
|
|
if not next_step:
|
|
# No executable step and no waiting steps - deadlock
|
|
logger.warning(f"Task {task_id}: no executable step, states={step_states}")
|
|
await update_task_state(task_id, TASK_FAILED)
|
|
break
|
|
|
|
# Execute the step
|
|
await _execute_step(task_id, next_step, step_graph, current)
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info(f"Task {task_id} cancelled")
|
|
except Exception as e:
|
|
logger.error(f"Task {task_id} error: {e}\n{traceback.format_exc()}")
|
|
finally:
|
|
_active_tasks.pop(task_id, None)
|
|
|
|
|
|
async def _get_task_raw(task_id: str) -> dict:
|
|
"""Get task record without tenant filtering (internal use only)."""
|
|
from sqlor.dbpools import DBPools
|
|
db, dbname = DBPools(), 'pipeline'
|
|
async with db.sqlorContext(dbname) as sor:
|
|
recs = await sor.R('pipeline_tasks', {'id': task_id})
|
|
if not recs:
|
|
return None
|
|
rec = recs[0]
|
|
if hasattr(rec, '__dict__'):
|
|
return {k: getattr(rec, k) for k in dir(rec) if not k.startswith('_')}
|
|
return dict(rec)
|
|
|
|
|
|
async def _execute_step(task_id: str, step_name: str, step_graph: dict, task_info: dict):
|
|
"""Execute a single step.
|
|
|
|
For interactive step_types (human_task, approval_gate):
|
|
- Creates a human_tasks record
|
|
- Sets step to WAITING state
|
|
- Execution loop will detect waiting and pause the task
|
|
|
|
For automatic step_types:
|
|
- Calls handler, saves artifact, marks completed
|
|
"""
|
|
step_info = step_graph[step_name]
|
|
step_type = step_info["step_type"]
|
|
version = task_info.get('current_version', task_info.get('current_Version', 1))
|
|
tenant_id = task_info.get('tenant_id', task_info.get('tenant_Id', ''))
|
|
|
|
await update_step_state(task_id, step_name, STATE_RUNNING)
|
|
|
|
try:
|
|
# Gather inputs from dependency outputs
|
|
input_data = await _gather_inputs(task_id, version, step_info["deps"])
|
|
|
|
# Save input artifact
|
|
await save_artifact(task_id, version, step_name, "input", input_data)
|
|
|
|
# Check if this is an interactive step type
|
|
if is_interactive(step_type):
|
|
await _handle_interactive_step(
|
|
task_id, step_name, step_type, version, input_data, step_info
|
|
)
|
|
return
|
|
|
|
# Look up handler by step_type
|
|
handler = get_handler(step_type)
|
|
if not handler:
|
|
handler = get_handler("__default__")
|
|
if not handler:
|
|
raise ValueError(f"No handler for step_type '{step_type}' and no default handler")
|
|
|
|
# Load step config for handler
|
|
step_config = step_info.get("step_config", {})
|
|
if isinstance(step_config, str):
|
|
try:
|
|
step_config = json.loads(step_config)
|
|
except (json.JSONDecodeError, TypeError):
|
|
step_config = {}
|
|
|
|
# Execute handler
|
|
output_data = await handler(tenant_id, task_id, step_name, input_data, step_config)
|
|
|
|
# Save output artifact
|
|
await save_artifact(task_id, version, step_name, "output", output_data)
|
|
await update_step_state(task_id, step_name, STATE_COMPLETED)
|
|
logger.info(f"Step {step_name} completed for task {task_id}")
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
logger.error(f"Step {step_name} failed for task {task_id}: {error_msg}")
|
|
await update_step_state(task_id, step_name, STATE_FAILED, error_msg)
|
|
|
|
|
|
async def _handle_interactive_step(task_id, step_name, step_type, version, input_data, step_info):
|
|
"""Handle an interactive step — create human task record and enter WAITING."""
|
|
# Get step type metadata
|
|
meta = get_step_type(step_type) or {}
|
|
|
|
# Extract assignment info from step_config
|
|
step_config = step_info.get("step_config", {})
|
|
if isinstance(step_config, str):
|
|
try:
|
|
step_config = json.loads(step_config)
|
|
except (json.JSONDecodeError, TypeError):
|
|
step_config = {}
|
|
|
|
assignee_role = step_config.get("assignee_role", "")
|
|
assignee_id = step_config.get("assignee_id", "")
|
|
form_schema = step_config.get("form_schema") or meta.get("form_schema")
|
|
timeout_hours = step_config.get("timeout_hours") or meta.get("timeout_hours")
|
|
|
|
# Create human task record
|
|
await create_human_task(
|
|
task_id=task_id,
|
|
step_name=step_name,
|
|
version=version,
|
|
task_type=step_type,
|
|
assignee_role=assignee_role,
|
|
assignee_id=assignee_id,
|
|
form_schema=form_schema,
|
|
timeout_hours=timeout_hours,
|
|
)
|
|
|
|
# Set step to waiting
|
|
await update_step_state(task_id, step_name, STATE_WAITING)
|
|
logger.info(f"Step {step_name} waiting for human input (type={step_type}, role={assignee_role})")
|
|
|
|
|
|
async def _gather_inputs(task_id: str, version: int, deps: list) -> dict:
|
|
"""Gather input data from dependency step outputs."""
|
|
inputs = {}
|
|
for dep in deps:
|
|
art = await get_artifact(task_id, version, dep, "output")
|
|
if art:
|
|
inputs[dep] = art
|
|
return inputs
|