"""Step type registry — pluggable step_type metadata. Each pipeline can define its own step_types. The registry tracks: - handler function (already in handler.py) - metadata: display_name, category, is_interactive, form_schema, on_timeout Built-in interactive types: human_task, approval_gate """ import logging from typing import Dict, Optional logger = logging.getLogger("pipeline.step_registry") # step_type -> metadata dict _REGISTRY: Dict[str, dict] = {} # Built-in interactive step types BUILTIN_INTERACTIVE = { "human_task": { "display_name": "人工任务", "category": "interactive", "is_interactive": True, "description": "需要人工填写表单或执行操作后继续", }, "approval_gate": { "display_name": "审批关卡", "category": "interactive", "is_interactive": True, "description": "需要审批人通过后继续,可驳回", }, } def register_step_type(step_type: str, metadata: dict): """Register a step type with metadata. Args: step_type: unique key matching pipeline_steps.step_type metadata: dict with keys: - display_name (str): 显示名称 - category (str): 分类 (media/llm/interactive/devops/...) - is_interactive (bool): 是否需要人工介入, default False - description (str): 描述 - form_schema (dict): 人工任务表单JSON Schema (可选) - on_timeout (str): 超时策略 skip/escalate/fail (可选) - timeout_hours (int): 超时小时数 (可选) """ existing = _REGISTRY.get(step_type, {}) existing.update(metadata) _REGISTRY[step_type] = existing logger.info(f"Registered step_type: {step_type} (interactive={metadata.get('is_interactive', False)})") def get_step_type(step_type: str) -> Optional[dict]: """Get step type metadata. Returns None if not registered.""" return _REGISTRY.get(step_type) def is_interactive(step_type: str) -> bool: """Check if a step type requires human interaction.""" meta = _REGISTRY.get(step_type, {}) return meta.get("is_interactive", False) def list_step_types() -> Dict[str, dict]: """List all registered step types with metadata.""" return dict(_REGISTRY) def unregister_step_type(step_type: str): """Remove a step type from registry.""" removed = _REGISTRY.pop(step_type, None) if removed: logger.info(f"Unregistered step_type: {step_type}") def load_builtin_types(): """Load built-in interactive step types.""" for st, meta in BUILTIN_INTERACTIVE.items(): register_step_type(st, meta)