yumoqing 2448ad45f7 refactor: 改造为通用产线执行引擎模块
- 去掉独立 aiohttp 服务器,改为标准模块(load_pipeline_service)
- 存储从文件系统改 MySQL(sqlor)
- 新增 3 张数据表:pipeline_tasks/task_steps/artifacts
- 多租户隔离(tenant_id)
- 通用 DAG 调度引擎(读 pipeline_steps 表,不硬编码业务)
- 可插拔步骤处理器(register_handler by step_type)
- artifact 版本管理 + 级联重跑
- init/data.json 标准 appcodes 格式
- 完整 README 文档
2026-06-11 17:30:06 +08:00

53 lines
1.7 KiB
Python

"""Pipeline step handler registry.
Handlers are async functions: async def handler(tenant_id, task_id, step_name, input_data, config) -> output_data
Register via register_handler(step_type, fn).
Lookup via get_handler(step_type).
"""
import asyncio
import logging
from typing import Callable, Dict, Optional
logger = logging.getLogger("pipeline.handlers")
# step_type -> handler function
_HANDLERS: Dict[str, Callable] = {}
def register_handler(step_type: str, handler: Callable):
"""Register a step handler function.
Args:
step_type: matches pipeline_steps.step_type
handler: async def handler(tenant_id, task_id, step_name, input_data, config) -> dict
"""
_HANDLERS[step_type] = handler
logger.info(f"Registered handler: {step_type}")
def get_handler(step_type: str) -> Optional[Callable]:
"""Get handler for a step type. Returns None if not registered."""
return _HANDLERS.get(step_type)
def list_handlers() -> Dict[str, str]:
"""List all registered handlers. Returns {step_type: function_name}."""
return {k: v.__name__ for k, v in _HANDLERS.items()}
async def default_handler(tenant_id: str, task_id: str, step_name: str, input_data: dict, config: dict) -> dict:
"""Default handler - passthrough for testing. Returns input as output."""
logger.info(f"Default handler: step={step_name}, task={task_id}, tenant={tenant_id}")
await asyncio.sleep(0.1)
return {
"step": step_name,
"status": "completed",
"input_summary": str(input_data)[:500],
}
def register_default_handler():
"""Register the default handler as fallback for '__default__'."""
_HANDLERS["__default__"] = default_handler