yumoqing 2448ad45f7 refactor: 改造为通用产线执行引擎模块
- 去掉独立 aiohttp 服务器,改为标准模块(load_pipeline_service)
- 存储从文件系统改 MySQL(sqlor)
- 新增 3 张数据表:pipeline_tasks/task_steps/artifacts
- 多租户隔离(tenant_id)
- 通用 DAG 调度引擎(读 pipeline_steps 表,不硬编码业务)
- 可插拔步骤处理器(register_handler by step_type)
- artifact 版本管理 + 级联重跑
- init/data.json 标准 appcodes 格式
- 完整 README 文档
2026-06-11 17:30:06 +08:00

181 lines
6.0 KiB
Python

"""Pipeline execution engine - schedules and runs steps."""
import asyncio
import logging
import traceback
from typing import Dict
from .state import (
STATE_PENDING, STATE_RUNNING, STATE_COMPLETED, STATE_FAILED,
TASK_RUNNING, TASK_COMPLETED, TASK_FAILED, TASK_PAUSED,
build_step_graph, find_next_step,
check_all_completed, check_any_failed,
)
from .storage import (
get_pipeline_steps, get_step_states,
update_task_state, update_step_state,
save_artifact, get_artifact,
)
from .handler import get_handler
logger = logging.getLogger("pipeline.executor")
# Active tasks: task_id -> asyncio.Task
_active_tasks: Dict[str, asyncio.Task] = {}
async def start_task(task_id: str):
"""Start executing a pipeline task."""
task = asyncio.create_task(_run_task(task_id))
_active_tasks[task_id] = task
return task
async def resume_task(task_id: str):
"""Resume a paused task."""
task = asyncio.create_task(_run_task(task_id))
_active_tasks[task_id] = task
return task
async def stop_task(task_id: str):
"""Cancel a running task."""
task = _active_tasks.get(task_id)
if task and not task.done():
task.cancel()
_active_tasks.pop(task_id, None)
return True
return False
def is_running(task_id: str) -> bool:
"""Check if a task is currently executing."""
task = _active_tasks.get(task_id)
return task is not None and not task.done()
async def _run_task(task_id: str):
"""Main execution loop for a pipeline task."""
try:
# Get task info (no tenant_id needed here - internal execution)
db_info = await _get_task_raw(task_id)
if not db_info:
logger.error(f"Task {task_id} not found")
return
pipeline_id = db_info.get('pipeline_id', '')
# Load step definitions and build graph
step_records = await get_pipeline_steps(pipeline_id)
if not step_records:
logger.error(f"No steps defined for pipeline {pipeline_id}")
await update_task_state(task_id, TASK_FAILED)
return
step_graph = build_step_graph(step_records)
await update_task_state(task_id, TASK_RUNNING)
while True:
# Check if paused/cancelled
current = await _get_task_raw(task_id)
if not current:
break
state = current.get('state', current.get('State', ''))
if state in (TASK_PAUSED, TASK_FAILED, 'cancelled'):
logger.info(f"Task {task_id} stopped: {state}")
break
# Get current step states
step_states = await get_step_states(task_id)
# Check completion
if check_all_completed(step_states):
await update_task_state(task_id, TASK_COMPLETED)
logger.info(f"Task {task_id} completed")
break
if check_any_failed(step_states):
await update_task_state(task_id, TASK_FAILED)
logger.warning(f"Task {task_id} failed (step failure)")
break
# Find next executable step
next_step = find_next_step(step_graph, step_states)
if not next_step:
# No executable step but not all completed - deadlock or waiting
logger.warning(f"Task {task_id}: no executable step, states={step_states}")
await update_task_state(task_id, TASK_FAILED)
break
# Execute the step
await _execute_step(task_id, next_step, step_graph, current)
except asyncio.CancelledError:
logger.info(f"Task {task_id} cancelled")
except Exception as e:
logger.error(f"Task {task_id} error: {e}\n{traceback.format_exc()}")
finally:
_active_tasks.pop(task_id, None)
async def _get_task_raw(task_id: str) -> dict:
"""Get task record without tenant filtering (internal use only)."""
from sqlor.dbpools import DBPools
db, dbname = DBPools(), 'pipeline'
async with db.sqlorContext(dbname) as sor:
recs = await sor.R('pipeline_tasks', {'id': task_id})
if not recs:
return None
rec = recs[0]
if hasattr(rec, '__dict__'):
return {k: getattr(rec, k) for k in dir(rec) if not k.startswith('_')}
return dict(rec)
async def _execute_step(task_id: str, step_name: str, step_graph: dict, task_info: dict):
"""Execute a single step."""
step_info = step_graph[step_name]
step_type = step_info["step_type"]
version = task_info.get('current_version', task_info.get('current_Version', 1))
tenant_id = task_info.get('tenant_id', task_info.get('tenant_Id', ''))
await update_step_state(task_id, step_name, STATE_RUNNING)
try:
# Gather inputs from dependency outputs
input_data = await _gather_inputs(task_id, version, step_info["deps"])
# Save input artifact
await save_artifact(task_id, version, step_name, "input", input_data)
# Look up handler by step_type
handler = get_handler(step_type)
if not handler:
handler = get_handler("__default__")
if not handler:
raise ValueError(f"No handler for step_type '{step_type}' and no default handler")
# Execute handler
output_data = await handler(tenant_id, task_id, step_name, input_data, {})
# Save output artifact
await save_artifact(task_id, version, step_name, "output", output_data)
await update_step_state(task_id, step_name, STATE_COMPLETED)
logger.info(f"Step {step_name} completed for task {task_id}")
except Exception as e:
error_msg = str(e)
logger.error(f"Step {step_name} failed for task {task_id}: {error_msg}")
await update_step_state(task_id, step_name, STATE_FAILED, error_msg)
async def _gather_inputs(task_id: str, version: int, deps: list) -> dict:
"""Gather input data from dependency step outputs."""
inputs = {}
for dep in deps:
art = await get_artifact(task_id, version, dep, "output")
if art:
inputs[dep] = art
return inputs