- 去掉独立 aiohttp 服务器,改为标准模块(load_pipeline_service) - 存储从文件系统改 MySQL(sqlor) - 新增 3 张数据表:pipeline_tasks/task_steps/artifacts - 多租户隔离(tenant_id) - 通用 DAG 调度引擎(读 pipeline_steps 表,不硬编码业务) - 可插拔步骤处理器(register_handler by step_type) - artifact 版本管理 + 级联重跑 - init/data.json 标准 appcodes 格式 - 完整 README 文档
53 lines
1.7 KiB
Python
53 lines
1.7 KiB
Python
"""Pipeline step handler registry.
|
|
|
|
Handlers are async functions: async def handler(tenant_id, task_id, step_name, input_data, config) -> output_data
|
|
Register via register_handler(step_type, fn).
|
|
Lookup via get_handler(step_type).
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Callable, Dict, Optional
|
|
|
|
logger = logging.getLogger("pipeline.handlers")
|
|
|
|
# step_type -> handler function
|
|
_HANDLERS: Dict[str, Callable] = {}
|
|
|
|
|
|
def register_handler(step_type: str, handler: Callable):
|
|
"""Register a step handler function.
|
|
|
|
Args:
|
|
step_type: matches pipeline_steps.step_type
|
|
handler: async def handler(tenant_id, task_id, step_name, input_data, config) -> dict
|
|
"""
|
|
_HANDLERS[step_type] = handler
|
|
logger.info(f"Registered handler: {step_type}")
|
|
|
|
|
|
def get_handler(step_type: str) -> Optional[Callable]:
|
|
"""Get handler for a step type. Returns None if not registered."""
|
|
return _HANDLERS.get(step_type)
|
|
|
|
|
|
def list_handlers() -> Dict[str, str]:
|
|
"""List all registered handlers. Returns {step_type: function_name}."""
|
|
return {k: v.__name__ for k, v in _HANDLERS.items()}
|
|
|
|
|
|
async def default_handler(tenant_id: str, task_id: str, step_name: str, input_data: dict, config: dict) -> dict:
|
|
"""Default handler - passthrough for testing. Returns input as output."""
|
|
logger.info(f"Default handler: step={step_name}, task={task_id}, tenant={tenant_id}")
|
|
await asyncio.sleep(0.1)
|
|
return {
|
|
"step": step_name,
|
|
"status": "completed",
|
|
"input_summary": str(input_data)[:500],
|
|
}
|
|
|
|
|
|
def register_default_handler():
|
|
"""Register the default handler as fallback for '__default__'."""
|
|
_HANDLERS["__default__"] = default_handler
|