diff --git a/.gitignore b/.gitignore index 45eef94..4584e58 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,25 @@ +# Python __pycache__/ -*.pyc +*.py[cod] *.egg-info/ +dist/ build/ +*.egg + +# Virtual environment +py3/ +pkgs/ + +# Runtime +logs/ +*.pid + +# IDE +.vscode/ +.idea/ + +# OS +.DS_Store + +# DDL output +mysql.ddl.sql diff --git a/README.md b/README.md index 37f5b8b..55762a5 100644 --- a/README.md +++ b/README.md @@ -1,53 +1,117 @@ -# Pipeline-Service — 产线任务引擎(早期版本) +# pipeline-service 通用产线执行引擎 -KTV/MV 视频制作流水线的后端引擎,负责编排和执行多步骤产线任务。 +## 定位 -## 功能 +通用产线执行引擎模块。把 Hermes Agent 验证过的业务流程固化为可重复、可并发的产线业务环境。 -- **任务存储**:JSON 文件存储产线任务状态 -- **步骤编排**:支持串行/并行步骤执行 -- **状态管理**:pending → running → completed/failed -- **版本追踪**:每个步骤的输入/输出可追溯 +## 核心价值 + +- Hermes Agent 中用 cron/delegate/terminal 跑通的流程 → 固化为产线步骤定义 → pipeline-service 自动调度执行 +- 一次验证,无限次自动执行 +- 多租户并发:同一产线,不同租户同时使用,数据完全隔离 ## 架构 -```python -pipeline_service/ -├── engine.py # 产线执行引擎(步骤调度) -├── models.py # 数据模型(Task, Step, Node) -├── storage.py # JSON 文件存储层 -└── __init__.py +``` +宿主应用 (pipeline-app / sage / 其他) + │ + ├── load_pipeline_service() ← 注册函数到 ServerEnv + │ + ├── pipeline_submit(tenant_id, pipeline_id, owner_id, title, params) + ├── pipeline_list(tenant_id, pipeline_id?) + ├── pipeline_detail(tenant_id, task_id) + ├── pipeline_node(tenant_id, task_id, step_name, version?) + ├── pipeline_modify(tenant_id, task_id, updates, rerun_from) + ├── pipeline_pause(tenant_id, task_id) + ├── pipeline_resume(tenant_id, task_id) + ├── pipeline_cancel(tenant_id, task_id) + └── pipeline_register_handler(step_type, fn) ← 注册步骤处理器 ``` -## 使用方式 +## 引擎工作原理 -```python -from pipeline_service.engine import PipelineEngine +1. **提交任务** → 读取 pipeline_steps 表的步骤定义 → 创建 pipeline_task_steps 记录 → 启动执行 +2. **执行循环** → 解析 DAG 依赖图 → 找到可执行步骤(所有前置完成)→ 调用 handler → 存 artifact → 继续下一步 +3. **多租户** → 所有查询按 tenant_id 隔离 → 同一产线多租户并发不冲突 +4. **人工干预** → 修改节点 artifact → 创建新版本 → BFS 计算受影响步骤 → 级联重跑 -engine = PipelineEngine(storage_dir="./tasks") -task_id = engine.submit({ - "name": "KTV视频制作", - "steps": [ - {"type": "ai_video", "config": {...}}, - {"type": "demucs", "config": {...}}, - {"type": "asr", "config": {...}}, - {"type": "ffmpeg_compose", "config": {...}} - ] -}) -engine.run(task_id) -``` +## 数据表 -## 当前状态 - -本模块为早期原型,核心引擎已编写但: -- 未与 Sage 平台联调 -- 未部署到生产环境 -- 后续由 pipeline-app 替代为独立服务架构 - -## 相关仓库 - -| 仓库 | 说明 | +| 表名 | 用途 | |------|------| -| [pipeline-app](https://git.opencomputing.cn/yumoqing/pipeline-app) | 替代方案:独立 ahserver 后端服务 | -| [pipeline](https://git.opencomputing.cn/yumoqing/pipeline) | Sage 前端桥接模块 | -| [showcase](https://git.opencomputing.cn/yumoqing/showcase) | 展示平台(产线产出物展示) | +| pipeline_tasks | 任务主表(tenant_id 隔离) | +| pipeline_task_steps | 任务步骤执行记录 | +| pipeline_artifacts | 步骤产物(input/output,支持版本) | +| pipeline_steps | 产线步骤定义(由 pipeline_core 模块管理) | +| pipelines | 产线定义(由 pipeline_core 模块管理) | + +## 步骤处理器 + +可插拔注册,统一接口: + +```python +async def handler(tenant_id, task_id, step_name, input_data, config) -> dict: + # 处理逻辑 + return output_data + +# 注册 +from pipeline_service import register_handler +register_handler("llm_generate", handler) +``` + +处理器按 `step_type` 匹配。步骤定义中的 `step_type` 对应 handler 注册名。 + +## 宿主集成 + +任何应用只需一行代码即可使用: + +```python +from pipeline_service.init import load_pipeline_service +load_pipeline_service() +``` + +宿主负责: +- HTTP 路由(ahserver 管) +- 用户认证(RBAC 管) +- 前端交互(bricks 管) +- 产线定义和定价(pipeline_core/ops/dist 管) + +pipeline-service 只做:调度 + 执行 + 存储。 + +## 目录结构 + +``` +pipeline-service/ +├── pipeline_service/ +│ ├── __init__.py # 包导出 +│ ├── init.py # load_pipeline_service() + ServerEnv 注册 +│ ├── state.py # DAG 解析、步骤状态机 +│ ├── handler.py # 步骤处理器注册表 +│ ├── storage.py # MySQL 存储层(sqlor) +│ └── executor.py # 执行循环 +├── models/ +│ ├── pipeline_tasks.json +│ ├── pipeline_task_steps.json +│ └── pipeline_artifacts.json +├── init/ +│ └── data.json # appcodes 初始化数据 +├── scripts/ +│ └── load_path.py # RBAC 权限注册 +├── pyproject.toml +├── build.sh +└── README.md +``` + +## 构建与部署 + +```bash +cd ~/repos/pipeline-service +pip install . + +# 建表 +json2ddl mysql models/ > mysql.ddl.sql +mysql -u root -p pipeline < mysql.ddl.sql + +# 加载 appcodes +# (通过宿主应用的 build.sh 自动加载 init/data.json) +``` diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..a65fdec --- /dev/null +++ b/build.sh @@ -0,0 +1,49 @@ +#!/usr/bin/env bash +set -e +cdir=$(cd "$(dirname "$0")" && pwd) +cd "$cdir" + +echo "=== pipeline-service Build ===" + +# 1. Create venv if not exists +if [ ! -d py3 ]; then + python3 -m venv py3 +fi +source py3/bin/activate + +# 2. Install foundation packages +mkdir -p pkgs +for m in apppublic sqlor ahserver xls2ddl; do + echo "install $m ..." + cd "$cdir/pkgs" + if [ ! -d "$m" ]; then + git clone https://git.opencomputing.cn/yumoqing/$m 2>/dev/null || echo "SKIP: $m" + fi + if [ -d "$m" ]; then + cd "$m" + "$cdir/py3/bin/pip" install . 2>&1 | tail -1 + fi + cd "$cdir" +done + +# 3. Install self +echo "install pipeline_service ..." +"$cdir/py3/bin/pip" install . 2>&1 | tail -1 + +# 4. Generate DDL from models +if [ -d models ] && ls models/*.json >/dev/null 2>&1; then + echo "generating DDL ..." + "$cdir/py3/bin/json2ddl" mysql models/ > "$cdir/mysql.ddl.sql" 2>/dev/null || echo " DDL generation skipped (json2ddl not available)" +fi + +# 5. Create runtime dirs +mkdir -p "$cdir/logs" + +chmod +x "$cdir/scripts/load_path.py" 2>/dev/null || true +echo "=== Build complete ===" +echo "" +echo "Next steps:" +echo " 1. CREATE DATABASE pipeline;" +echo " 2. mysql -u root -p pipeline < mysql.ddl.sql" +echo " 3. Load appcodes: init/data.json via host app build.sh" +echo " 4. Register step handlers in host app" diff --git a/init/data.json b/init/data.json new file mode 100644 index 0000000..0b4f724 --- /dev/null +++ b/init/data.json @@ -0,0 +1,49 @@ +{ + "appcodes": [ + { + "parentid": "task_state", + "parentname": "任务状态", + "items": [ + {"k": "submitted", "v": "已提交"}, + {"k": "running", "v": "运行中"}, + {"k": "completed", "v": "已完成"}, + {"k": "failed", "v": "失败"}, + {"k": "paused", "v": "已暂停"}, + {"k": "cancelled", "v": "已取消"} + ] + }, + { + "parentid": "step_state", + "parentname": "步骤状态", + "items": [ + {"k": "pending", "v": "等待中"}, + {"k": "running", "v": "执行中"}, + {"k": "completed", "v": "已完成"}, + {"k": "failed", "v": "失败"}, + {"k": "skipped", "v": "已跳过"} + ] + }, + { + "parentid": "io_type", + "parentname": "产物类型", + "items": [ + {"k": "input", "v": "输入"}, + {"k": "output", "v": "输出"} + ] + }, + { + "parentid": "step_type", + "parentname": "步骤类型", + "items": [ + {"k": "audio_process", "v": "音频处理"}, + {"k": "video_process", "v": "视频处理"}, + {"k": "llm_generate", "v": "LLM生成"}, + {"k": "llm_evaluate", "v": "LLM评估"}, + {"k": "api_call", "v": "外部API调用"}, + {"k": "file_process", "v": "文件处理"}, + {"k": "composite", "v": "合成"}, + {"k": "custom", "v": "自定义"} + ] + } + ] +} diff --git a/models/pipeline_artifacts.json b/models/pipeline_artifacts.json new file mode 100644 index 0000000..ac9bc2c --- /dev/null +++ b/models/pipeline_artifacts.json @@ -0,0 +1,27 @@ +{ + "summary": [ + { + "name": "pipeline_artifacts", + "title": "步骤产物表", + "primary": ["id"], + "catelog": "entity" + } + ], + "fields": [ + {"name": "id", "title": "主键ID", "type": "str", "length": 32, "nullable": "no"}, + {"name": "task_id", "title": "任务ID", "type": "str", "length": 32, "nullable": "no"}, + {"name": "version", "title": "版本号", "type": "int", "nullable": "no", "default": "1"}, + {"name": "step_name", "title": "步骤名称", "type": "str", "length": 64, "nullable": "no"}, + {"name": "io_type", "title": "产物类型(input/output)", "type": "str", "length": 16, "nullable": "no"}, + {"name": "data", "title": "产物数据(JSON)", "type": "longtext"}, + {"name": "saved_at", "title": "保存时间", "type": "timestamp", "nullable": "no"} + ], + "indexes": [ + {"name": "idx_pa_task_ver", "idxtype": "index", "idxfields": ["task_id", "version"]}, + {"name": "idx_pa_step", "idxtype": "index", "idxfields": ["step_name"]}, + {"name": "idx_pa_lookup", "idxtype": "unique", "idxfields": ["task_id", "version", "step_name", "io_type"]} + ], + "codes": [ + {"field": "io_type", "table": "appcodes_kv", "valuefield": "k", "textfield": "v", "cond": "parentid='io_type'"} + ] +} diff --git a/models/pipeline_task_steps.json b/models/pipeline_task_steps.json new file mode 100644 index 0000000..1c0197e --- /dev/null +++ b/models/pipeline_task_steps.json @@ -0,0 +1,31 @@ +{ + "summary": [ + { + "name": "pipeline_task_steps", + "title": "任务步骤执行表", + "primary": ["id"], + "catelog": "entity" + } + ], + "fields": [ + {"name": "id", "title": "主键ID", "type": "str", "length": 32, "nullable": "no"}, + {"name": "task_id", "title": "任务ID", "type": "str", "length": 32, "nullable": "no"}, + {"name": "step_name", "title": "步骤名称", "type": "str", "length": 64, "nullable": "no"}, + {"name": "step_type", "title": "步骤类型(用于handler匹配)", "type": "str", "length": 64, "nullable": "no"}, + {"name": "display_name", "title": "显示名称", "type": "str", "length": 128}, + {"name": "step_order", "title": "执行顺序", "type": "int", "nullable": "no", "default": "0"}, + {"name": "deps", "title": "依赖步骤(JSON数组)", "type": "text", "default": "'[]'"}, + {"name": "state", "title": "步骤状态", "type": "str", "length": 32, "nullable": "no", "default": "pending"}, + {"name": "error_msg", "title": "错误信息", "type": "text"}, + {"name": "started_at", "title": "开始执行时间", "type": "timestamp"}, + {"name": "completed_at", "title": "完成时间", "type": "timestamp"} + ], + "indexes": [ + {"name": "idx_pts_task", "idxtype": "index", "idxfields": ["task_id"]}, + {"name": "idx_pts_state", "idxtype": "index", "idxfields": ["state"]} + ], + "codes": [ + {"field": "state", "table": "appcodes_kv", "valuefield": "k", "textfield": "v", "cond": "parentid='step_state'"}, + {"field": "step_type", "table": "appcodes_kv", "valuefield": "k", "textfield": "v", "cond": "parentid='step_type'"} + ] +} diff --git a/models/pipeline_tasks.json b/models/pipeline_tasks.json new file mode 100644 index 0000000..f8bf983 --- /dev/null +++ b/models/pipeline_tasks.json @@ -0,0 +1,31 @@ +{ + "summary": [ + { + "name": "pipeline_tasks", + "title": "产线任务表", + "primary": ["id"], + "catelog": "entity" + } + ], + "fields": [ + {"name": "id", "title": "主键ID", "type": "str", "length": 32, "nullable": "no"}, + {"name": "tenant_id", "title": "租户ID", "type": "str", "length": 32, "nullable": "no"}, + {"name": "pipeline_id", "title": "产线定义ID", "type": "str", "length": 32, "nullable": "no"}, + {"name": "owner_id", "title": "提交人ID", "type": "str", "length": 32, "nullable": "no"}, + {"name": "title", "title": "任务标题", "type": "str", "length": 255, "nullable": "no"}, + {"name": "state", "title": "任务状态", "type": "str", "length": 32, "nullable": "no", "default": "submitted"}, + {"name": "current_version", "title": "当前版本号", "type": "int", "nullable": "no", "default": "1"}, + {"name": "params", "title": "提交参数", "type": "text"}, + {"name": "created_at", "title": "创建时间", "type": "timestamp", "nullable": "no"}, + {"name": "updated_at", "title": "更新时间", "type": "timestamp", "nullable": "no"} + ], + "indexes": [ + {"name": "idx_pt_tenant", "idxtype": "index", "idxfields": ["tenant_id"]}, + {"name": "idx_pt_pipeline", "idxtype": "index", "idxfields": ["pipeline_id"]}, + {"name": "idx_pt_owner", "idxtype": "index", "idxfields": ["owner_id"]}, + {"name": "idx_pt_state", "idxtype": "index", "idxfields": ["state"]} + ], + "codes": [ + {"field": "state", "table": "appcodes_kv", "valuefield": "k", "textfield": "v", "cond": "parentid='task_state'"} + ] +} diff --git a/pipeline_service/__init__.py b/pipeline_service/__init__.py index cdeb3ca..992cbd7 100644 --- a/pipeline_service/__init__.py +++ b/pipeline_service/__init__.py @@ -1,3 +1,21 @@ -"""Pipeline Service - Hermes backend for KTV/MV production pipelines""" +"""pipeline_service - 通用产线执行引擎模块""" -__version__ = "1.0.0" +from .init import ( + load_pipeline_service, + pipeline_submit, + pipeline_list, + pipeline_detail, + pipeline_node, + pipeline_modify, + pipeline_pause, + pipeline_resume, + pipeline_cancel, + pipeline_handlers, +) +from .handler import register_handler, list_handlers, get_handler +from .state import ( + STATE_PENDING, STATE_RUNNING, STATE_COMPLETED, STATE_FAILED, STATE_SKIPPED, + TASK_SUBMITTED, TASK_RUNNING, TASK_COMPLETED, TASK_FAILED, TASK_PAUSED, TASK_CANCELLED, +) + +__version__ = "2.0.0" diff --git a/pipeline_service/executor.py b/pipeline_service/executor.py index e9b0c1d..9333f49 100644 --- a/pipeline_service/executor.py +++ b/pipeline_service/executor.py @@ -1,182 +1,180 @@ -"""Pipeline step executor - runs steps asynchronously. - -Each step function receives (pipeline_id, manifest, params) and returns output data. -The executor handles state transitions, artifact storage, and dependency resolution. -""" +"""Pipeline execution engine - schedules and runs steps.""" import asyncio import logging -from typing import Dict, Optional +import traceback +from typing import Dict from .state import ( STATE_PENDING, STATE_RUNNING, STATE_COMPLETED, STATE_FAILED, - PIPELINE_RUNNING, PIPELINE_COMPLETED, PIPELINE_FAILED, PIPELINE_PAUSED, - build_dependency_map, + TASK_RUNNING, TASK_COMPLETED, TASK_FAILED, TASK_PAUSED, + build_step_graph, find_next_step, + check_all_completed, check_any_failed, ) from .storage import ( - get_manifest, save_manifest, save_artifact, - update_step_state, create_new_version, reset_steps, + get_pipeline_steps, get_step_states, + update_task_state, update_step_state, + save_artifact, get_artifact, ) +from .handler import get_handler logger = logging.getLogger("pipeline.executor") -# Active pipeline execution tasks (pipeline_id -> asyncio.Task) +# Active tasks: task_id -> asyncio.Task _active_tasks: Dict[str, asyncio.Task] = {} -async def start_pipeline(pipeline_id: str): - """Start executing a pipeline from the beginning.""" - task = asyncio.create_task(_run_pipeline(pipeline_id)) - _active_tasks[pipeline_id] = task +async def start_task(task_id: str): + """Start executing a pipeline task.""" + task = asyncio.create_task(_run_task(task_id)) + _active_tasks[task_id] = task return task -async def resume_pipeline(pipeline_id: str): - """Resume a paused pipeline.""" - task = asyncio.create_task(_run_pipeline(pipeline_id)) - _active_tasks[pipeline_id] = task +async def resume_task(task_id: str): + """Resume a paused task.""" + task = asyncio.create_task(_run_task(task_id)) + _active_tasks[task_id] = task return task -async def stop_pipeline(pipeline_id: str): - """Cancel a running pipeline.""" - task = _active_tasks.get(pipeline_id) +async def stop_task(task_id: str): + """Cancel a running task.""" + task = _active_tasks.get(task_id) if task and not task.done(): task.cancel() - del _active_tasks[pipeline_id] + _active_tasks.pop(task_id, None) return True return False -def is_running(pipeline_id: str) -> bool: - """Check if a pipeline is currently running.""" - task = _active_tasks.get(pipeline_id) +def is_running(task_id: str) -> bool: + """Check if a task is currently executing.""" + task = _active_tasks.get(task_id) return task is not None and not task.done() -async def _run_pipeline(pipeline_id: str): - """Main pipeline execution loop.""" +async def _run_task(task_id: str): + """Main execution loop for a pipeline task.""" try: + # Get task info (no tenant_id needed here - internal execution) + db_info = await _get_task_raw(task_id) + if not db_info: + logger.error(f"Task {task_id} not found") + return + + pipeline_id = db_info.get('pipeline_id', '') + + # Load step definitions and build graph + step_records = await get_pipeline_steps(pipeline_id) + if not step_records: + logger.error(f"No steps defined for pipeline {pipeline_id}") + await update_task_state(task_id, TASK_FAILED) + return + + step_graph = build_step_graph(step_records) + + await update_task_state(task_id, TASK_RUNNING) + while True: - manifest = get_manifest(pipeline_id) - if not manifest: - logger.error(f"Pipeline {pipeline_id} not found") + # Check if paused/cancelled + current = await _get_task_raw(task_id) + if not current: + break + state = current.get('state', current.get('State', '')) + if state in (TASK_PAUSED, TASK_FAILED, 'cancelled'): + logger.info(f"Task {task_id} stopped: {state}") break - if manifest["state"] == PIPELINE_PAUSED: - logger.info(f"Pipeline {pipeline_id} paused, waiting for user action") + # Get current step states + step_states = await get_step_states(task_id) + + # Check completion + if check_all_completed(step_states): + await update_task_state(task_id, TASK_COMPLETED) + logger.info(f"Task {task_id} completed") break - if manifest["state"] == PIPELINE_COMPLETED: - logger.info(f"Pipeline {pipeline_id} already completed") + if check_any_failed(step_states): + await update_task_state(task_id, TASK_FAILED) + logger.warning(f"Task {task_id} failed (step failure)") break - # Find next pending step with all deps completed - next_step = _find_next_step(manifest) + # Find next executable step + next_step = find_next_step(step_graph, step_states) if not next_step: - # All steps done - manifest["state"] = PIPELINE_COMPLETED - save_manifest(pipeline_id, manifest) - logger.info(f"Pipeline {pipeline_id} completed") + # No executable step but not all completed - deadlock or waiting + logger.warning(f"Task {task_id}: no executable step, states={step_states}") + await update_task_state(task_id, TASK_FAILED) break # Execute the step - await _execute_step(pipeline_id, next_step) + await _execute_step(task_id, next_step, step_graph, current) except asyncio.CancelledError: - logger.info(f"Pipeline {pipeline_id} cancelled") + logger.info(f"Task {task_id} cancelled") except Exception as e: - logger.error(f"Pipeline {pipeline_id} error: {e}") + logger.error(f"Task {task_id} error: {e}\n{traceback.format_exc()}") finally: - _active_tasks.pop(pipeline_id, None) + _active_tasks.pop(task_id, None) -def _find_next_step(manifest: dict) -> Optional[str]: - """Find the next step to execute (pending, all deps completed).""" - steps = manifest.get("steps", {}) - for name, info in sorted(steps.items(), key=lambda x: x[1].get("order", 0)): - if info["state"] != STATE_PENDING: - continue - # Check all dependencies are completed - deps_ok = all( - steps.get(dep, {}).get("state") == STATE_COMPLETED - for dep in info.get("deps", []) - ) - if deps_ok: - return name - return None +async def _get_task_raw(task_id: str) -> dict: + """Get task record without tenant filtering (internal use only).""" + from sqlor.dbpools import DBPools + db, dbname = DBPools(), 'pipeline' + async with db.sqlorContext(dbname) as sor: + recs = await sor.R('pipeline_tasks', {'id': task_id}) + if not recs: + return None + rec = recs[0] + if hasattr(rec, '__dict__'): + return {k: getattr(rec, k) for k in dir(rec) if not k.startswith('_')} + return dict(rec) -async def _execute_step(pipeline_id: str, step_name: str): - """Execute a single pipeline step.""" - update_step_state(pipeline_id, step_name, STATE_RUNNING) +async def _execute_step(task_id: str, step_name: str, step_graph: dict, task_info: dict): + """Execute a single step.""" + step_info = step_graph[step_name] + step_type = step_info["step_type"] + version = task_info.get('current_version', task_info.get('current_Version', 1)) + tenant_id = task_info.get('tenant_id', task_info.get('tenant_Id', '')) + + await update_step_state(task_id, step_name, STATE_RUNNING) try: - # Load input artifacts from dependencies - manifest = get_manifest(pipeline_id) - version = manifest["current_version"] - input_data = await _gather_step_inputs(pipeline_id, step_name, version, manifest) + # Gather inputs from dependency outputs + input_data = await _gather_inputs(task_id, version, step_info["deps"]) # Save input artifact - save_artifact(pipeline_id, version, step_name, "input", input_data) + await save_artifact(task_id, version, step_name, "input", input_data) - # Execute step handler - handler = STEP_HANDLERS.get(step_name, _default_handler) - output_data = await handler(pipeline_id, step_name, input_data, manifest) + # Look up handler by step_type + handler = get_handler(step_type) + if not handler: + handler = get_handler("__default__") + if not handler: + raise ValueError(f"No handler for step_type '{step_type}' and no default handler") + + # Execute handler + output_data = await handler(tenant_id, task_id, step_name, input_data, {}) # Save output artifact - save_artifact(pipeline_id, version, step_name, "output", output_data) - update_step_state(pipeline_id, step_name, STATE_COMPLETED) - - logger.info(f"Step {step_name} completed for {pipeline_id}") + await save_artifact(task_id, version, step_name, "output", output_data) + await update_step_state(task_id, step_name, STATE_COMPLETED) + logger.info(f"Step {step_name} completed for task {task_id}") except Exception as e: - logger.error(f"Step {step_name} failed for {pipeline_id}: {e}") - update_step_state(pipeline_id, step_name, STATE_FAILED, str(e)) + error_msg = str(e) + logger.error(f"Step {step_name} failed for task {task_id}: {error_msg}") + await update_step_state(task_id, step_name, STATE_FAILED, error_msg) -async def _gather_step_inputs(pipeline_id: str, step_name: str, version: int, manifest: dict) -> dict: - """Gather input data for a step from its dependencies' outputs.""" - from .storage import get_artifact - steps = manifest.get("steps", {}) - step_info = steps.get(step_name, {}) - deps = step_info.get("deps", []) - +async def _gather_inputs(task_id: str, version: int, deps: list) -> dict: + """Gather input data from dependency step outputs.""" inputs = {} for dep in deps: - dep_artifact = get_artifact(pipeline_id, version, dep, "output") - if dep_artifact: - inputs[dep] = dep_artifact.get("data", {}) - - # Also include initial params - if "_params" in inputs: - inputs["params"] = inputs.pop("_params") - - # Check if there's a user-modified input for this step - user_input = get_artifact(pipeline_id, version, step_name, "input") - if user_input and user_input.get("data"): - # User modified input exists, use it - return user_input["data"] - + art = await get_artifact(task_id, version, dep, "output") + if art: + inputs[dep] = art return inputs - - -# === Step Handlers === - -async def _default_handler(pipeline_id: str, step_name: str, input_data: dict, manifest: dict) -> dict: - """Default handler - stub that returns input as output (for testing).""" - logger.info(f"Default handler for {step_name}, pipeline {pipeline_id}") - # Simulate some processing time - await asyncio.sleep(0.5) - return {"step": step_name, "status": "completed", "input_summary": str(input_data)[:200]} - - -# Real handlers will be registered here -STEP_HANDLERS = { - # Will be populated by actual KTV pipeline step implementations -} - - -def register_handler(step_name: str, handler): - """Register a step handler function.""" - STEP_HANDLERS[step_name] = handler diff --git a/pipeline_service/handler.py b/pipeline_service/handler.py new file mode 100644 index 0000000..6251f9a --- /dev/null +++ b/pipeline_service/handler.py @@ -0,0 +1,52 @@ +"""Pipeline step handler registry. + +Handlers are async functions: async def handler(tenant_id, task_id, step_name, input_data, config) -> output_data +Register via register_handler(step_type, fn). +Lookup via get_handler(step_type). +""" + +import asyncio +import logging +from typing import Callable, Dict, Optional + +logger = logging.getLogger("pipeline.handlers") + +# step_type -> handler function +_HANDLERS: Dict[str, Callable] = {} + + +def register_handler(step_type: str, handler: Callable): + """Register a step handler function. + + Args: + step_type: matches pipeline_steps.step_type + handler: async def handler(tenant_id, task_id, step_name, input_data, config) -> dict + """ + _HANDLERS[step_type] = handler + logger.info(f"Registered handler: {step_type}") + + +def get_handler(step_type: str) -> Optional[Callable]: + """Get handler for a step type. Returns None if not registered.""" + return _HANDLERS.get(step_type) + + +def list_handlers() -> Dict[str, str]: + """List all registered handlers. Returns {step_type: function_name}.""" + return {k: v.__name__ for k, v in _HANDLERS.items()} + + +async def default_handler(tenant_id: str, task_id: str, step_name: str, input_data: dict, config: dict) -> dict: + """Default handler - passthrough for testing. Returns input as output.""" + logger.info(f"Default handler: step={step_name}, task={task_id}, tenant={tenant_id}") + await asyncio.sleep(0.1) + return { + "step": step_name, + "status": "completed", + "input_summary": str(input_data)[:500], + } + + +def register_default_handler(): + """Register the default handler as fallback for '__default__'.""" + _HANDLERS["__default__"] = default_handler diff --git a/pipeline_service/init.py b/pipeline_service/init.py new file mode 100644 index 0000000..4da9511 --- /dev/null +++ b/pipeline_service/init.py @@ -0,0 +1,282 @@ +"""pipeline_service - 通用产线执行引擎模块。 + +把 Hermes Agent 验证过的业务流程固化为可重复、可并发的产线业务环境。 +支持多租户隔离、DAG 步骤调度、可插拔步骤处理器、artifact 版本管理。 + +任何宿主应用都可以通过 load_pipeline_service() 加载本模块。 +""" + +import json +from ahserver.serverenv import ServerEnv +from appPublic.uniqueID import getID +from appPublic.log import debug + +from .state import ( + TASK_SUBMITTED, TASK_RUNNING, TASK_COMPLETED, TASK_FAILED, TASK_PAUSED, TASK_CANCELLED, + build_step_graph, get_cascade_rerun_steps, get_rerun_from_next, +) +from .storage import ( + create_task, init_task_steps, get_task, get_task_steps, + get_artifact, get_all_artifacts, list_tasks, + update_task_state, update_task_version, + get_pipeline_steps, reset_steps, save_artifact, +) +from .executor import start_task, resume_task, stop_task, is_running +from .handler import register_handler, list_handlers, register_default_handler + +MODULE_NAME = "pipeline_service" +MODULE_VERSION = "2.0.0" + + +async def pipeline_submit(tenant_id, pipeline_id, owner_id, title, params=None): + """提交新产线任务。 + + Args: + tenant_id: 租户ID(由宿主应用提供,可以是 org_id、user_id 等) + pipeline_id: 产线定义ID(来自 pipelines 表) + owner_id: 提交人ID + title: 任务标题 + params: 提交参数(dict) + + Returns: + JSON string with status, task_id + """ + result = {"success": False} + try: + if not tenant_id or not pipeline_id: + result["message"] = "缺少 tenant_id 或 pipeline_id" + return json.dumps(result, ensure_ascii=False) + + params = params or {} + task_id = await create_task(tenant_id, pipeline_id, owner_id, title, params) + + # Read step definitions from pipeline_steps table + step_records = await get_pipeline_steps(pipeline_id) + if not step_records: + result["message"] = f"产线 {pipeline_id} 没有步骤定义" + return json.dumps(result, ensure_ascii=False) + + # Create step execution records + await init_task_steps(task_id, step_records) + + # Start execution + await start_task(task_id) + + result["success"] = True + result["task_id"] = task_id + result["message"] = "任务已提交并开始执行" + except Exception as e: + result["message"] = str(e) + return json.dumps(result, ensure_ascii=False) + + +async def pipeline_list(tenant_id, pipeline_id=None, limit=100): + """查询租户的任务列表。""" + result = {"success": False} + try: + tasks = await list_tasks(tenant_id, pipeline_id, limit) + result["success"] = True + result["tasks"] = tasks + result["total"] = len(tasks) + except Exception as e: + result["message"] = str(e) + return json.dumps(result, ensure_ascii=False, default=str) + + +async def pipeline_detail(tenant_id, task_id): + """获取任务详情 + 步骤状态树。""" + result = {"success": False} + try: + task = await get_task(tenant_id, task_id) + if not task: + result["message"] = "任务不存在" + return json.dumps(result, ensure_ascii=False) + + steps = await get_task_steps(task_id) + task["steps"] = steps + task["is_running"] = is_running(task_id) + + result["success"] = True + result["task"] = task + except Exception as e: + result["message"] = str(e) + return json.dumps(result, ensure_ascii=False, default=str) + + +async def pipeline_node(tenant_id, task_id, step_name, version=None): + """获取某节点某版本的 input/output artifact。""" + result = {"success": False} + try: + task = await get_task(tenant_id, task_id) + if not task: + result["message"] = "任务不存在" + return json.dumps(result, ensure_ascii=False) + + v = version or task.get("current_version", 1) + if isinstance(v, str): + v = int(v) + + input_data = await get_artifact(task_id, v, step_name, "input") + output_data = await get_artifact(task_id, v, step_name, "output") + + result["success"] = True + result["step_name"] = step_name + result["version"] = v + result["input"] = input_data + result["output"] = output_data + except Exception as e: + result["message"] = str(e) + return json.dumps(result, ensure_ascii=False, default=str) + + +async def pipeline_modify(tenant_id, task_id, updates, rerun_from="node"): + """修改节点 artifact 并触发级联重跑。 + + Args: + updates: {step_name: {content: {...}}, ...} + rerun_from: "node" = 从该节点重跑, "next" = 从下游节点重跑 + """ + result = {"success": False} + try: + task = await get_task(tenant_id, task_id) + if not task: + result["message"] = "任务不存在" + return json.dumps(result, ensure_ascii=False) + + if is_running(task_id): + result["message"] = "任务正在执行中,请先暂停" + return json.dumps(result, ensure_ascii=False) + + pipeline_id = task.get("pipeline_id", task.get("Pipeline_id", "")) + current_version = task.get("current_version", task.get("current_Version", 1)) + if isinstance(current_version, str): + current_version = int(current_version) + + # Load step graph + step_records = await get_pipeline_steps(pipeline_id) + step_graph = build_step_graph(step_records) + + # Calculate affected steps + all_rerun = set() + for step_name in updates: + if step_name not in step_graph: + result["message"] = f"未知步骤: {step_name}" + return json.dumps(result, ensure_ascii=False) + if rerun_from == "node": + affected = get_cascade_rerun_steps(step_graph, step_name) + else: + affected = get_rerun_from_next(step_graph, step_name) + all_rerun.update(affected) + + # Create new version + new_version = current_version + 1 + await update_task_version(task_id, new_version) + + # Save modified artifacts + for step_name, step_update in updates.items(): + content = step_update.get("content", step_update) + io_type = "input" if rerun_from == "node" else "output" + await save_artifact(task_id, new_version, step_name, io_type, content) + + # Reset affected steps + all_rerun_list = sorted(all_rerun, key=lambda s: step_graph.get(s, {}).get("order", 999)) + await reset_steps(task_id, all_rerun_list) + + # Update task state and resume + await update_task_state(task_id, TASK_RUNNING) + await resume_task(task_id) + + result["success"] = True + result["new_version"] = new_version + result["rerun_steps"] = all_rerun_list + result["message"] = f"创建 v{new_version},重跑 {len(all_rerun_list)} 个步骤" + except Exception as e: + result["message"] = str(e) + return json.dumps(result, ensure_ascii=False, default=str) + + +async def pipeline_pause(tenant_id, task_id): + """暂停任务。""" + result = {"success": False} + try: + task = await get_task(tenant_id, task_id) + if not task: + result["message"] = "任务不存在" + return json.dumps(result, ensure_ascii=False) + + await stop_task(task_id) + await update_task_state(task_id, TASK_PAUSED) + + result["success"] = True + result["message"] = "任务已暂停" + except Exception as e: + result["message"] = str(e) + return json.dumps(result, ensure_ascii=False) + + +async def pipeline_resume(tenant_id, task_id): + """恢复任务。""" + result = {"success": False} + try: + task = await get_task(tenant_id, task_id) + if not task: + result["message"] = "任务不存在" + return json.dumps(result, ensure_ascii=False) + + await update_task_state(task_id, TASK_RUNNING) + await resume_task(task_id) + + result["success"] = True + result["message"] = "任务已恢复" + except Exception as e: + result["message"] = str(e) + return json.dumps(result, ensure_ascii=False) + + +async def pipeline_cancel(tenant_id, task_id): + """取消任务。""" + result = {"success": False} + try: + task = await get_task(tenant_id, task_id) + if not task: + result["message"] = "任务不存在" + return json.dumps(result, ensure_ascii=False) + + await stop_task(task_id) + await update_task_state(task_id, TASK_CANCELLED) + + result["success"] = True + result["message"] = "任务已取消" + except Exception as e: + result["message"] = str(e) + return json.dumps(result, ensure_ascii=False) + + +def pipeline_handlers(): + """查看已注册的步骤处理器。""" + return json.dumps(list_handlers(), ensure_ascii=False) + + +def load_pipeline_service(): + """注册所有函数到 ServerEnv。任何宿主应用调用此函数即可使用产线引擎。""" + env = ServerEnv() + + # Task lifecycle + env.pipeline_submit = pipeline_submit + env.pipeline_list = pipeline_list + env.pipeline_detail = pipeline_detail + env.pipeline_node = pipeline_node + env.pipeline_modify = pipeline_modify + env.pipeline_pause = pipeline_pause + env.pipeline_resume = pipeline_resume + env.pipeline_cancel = pipeline_cancel + + # Handler management + env.pipeline_register_handler = register_handler + env.pipeline_handlers = pipeline_handlers + + # Register default handler + register_default_handler() + + debug(f"[{MODULE_NAME}] v{MODULE_VERSION} loaded — generic pipeline execution engine") + return True diff --git a/pipeline_service/server.py b/pipeline_service/server.py deleted file mode 100644 index 95aad24..0000000 --- a/pipeline_service/server.py +++ /dev/null @@ -1,361 +0,0 @@ -"""Pipeline Service - aiohttp HTTP API server. - -Endpoints: - POST /v1/task/submit - Create and start a new pipeline - GET /v1/tasks - List pipelines for a user - GET /v1/task/{id} - Get pipeline detail with node tree - GET /v1/task/{id}/node/{step} - Get node input/output - POST /v1/task/update - Modify node artifact + cascade rerun - POST /v1/task/{id}/pause - Pause a running pipeline - POST /v1/task/{id}/resume - Resume a paused pipeline - GET /v1/health - Health check -""" - -import json -import logging -import os -from aiohttp import web - -from .state import ( - PIPELINE_SUBMITTED, PIPELINE_RUNNING, PIPELINE_COMPLETED, PIPELINE_FAILED, PIPELINE_PAUSED, - get_cascade_rerun_steps, get_rerun_from_next, build_dependency_map, -) -from .storage import ( - create_pipeline, get_manifest, save_manifest, - get_artifact, save_artifact, get_all_artifacts, - create_new_version, reset_steps, get_user_pipelines, -) -from .executor import start_pipeline, resume_pipeline, stop_pipeline, is_running - -logger = logging.getLogger("pipeline.api") - -JSON_HEADERS = {"Content-Type": "application/json; charset=utf-8"} - - -def json_response(data, status=200): - return web.Response( - text=json.dumps(data, ensure_ascii=False), - status=status, - headers=JSON_HEADERS, - ) - - -def error_response(message, status=400): - return json_response({"status": "error", "message": message}, status=status) - - -# === Health === - -async def handle_health(request): - return json_response({"status": "ok", "service": "pipeline-service", "version": "1.0.0"}) - - -# === Submit New Pipeline === - -async def handle_submit(request): - """POST /v1/task/submit - Body: {mode, title, lyrics?, input_audio?, input_video?, user_id, ...} - """ - try: - body = await request.json() - except Exception: - return error_response("Invalid JSON body") - - mode = body.get("mode") - title = body.get("title") - user_id = body.get("user_id") - - if not mode: - return error_response("Missing required field: mode") - if not title: - return error_response("Missing required field: title") - if not user_id: - return error_response("Missing required field: user_id") - - # Validate mode - valid_modes = ["audio_lyrics", "video_lyrics", "lyrics_only"] - if mode not in valid_modes: - return error_response(f"Invalid mode: {mode}. Must be one of: {valid_modes}") - - # Validate mode-specific requirements - if mode == "audio_lyrics" and not body.get("input_audio"): - return error_response("Mode audio_lyrics requires input_audio") - if mode == "video_lyrics" and not body.get("input_video"): - return error_response("Mode video_lyrics requires input_video") - - # Extract params (everything except mode/title/user_id) - params = {k: v for k, v in body.items() if k not in ("mode", "title", "user_id")} - - try: - manifest = create_pipeline(user_id, mode, title, params) - except Exception as e: - return error_response(f"Failed to create pipeline: {e}", 500) - - pipeline_id = manifest["pipeline_id"] - - # Start execution in background - await start_pipeline(pipeline_id) - - return json_response({ - "status": "ok", - "pipeline_id": pipeline_id, - "mode": mode, - "title": title, - "message": f"Pipeline created and started", - }) - - -# === List User Pipelines === - -async def handle_list(request): - """GET /v1/tasks?user_id=xxx""" - user_id = request.query.get("user_id") - if not user_id: - return error_response("Missing query param: user_id") - - pipeline_ids = get_user_pipelines(user_id) - tasks = [] - for pid in pipeline_ids: - manifest = get_manifest(pid) - if manifest: - tasks.append({ - "pipeline_id": manifest["pipeline_id"], - "mode": manifest["mode"], - "title": manifest["title"], - "state": manifest["state"], - "current_version": manifest["current_version"], - "created_at": manifest["created_at"], - "updated_at": manifest["updated_at"], - }) - - return json_response({"status": "ok", "tasks": tasks, "total": len(tasks)}) - - -# === Pipeline Detail === - -async def handle_detail(request): - """GET /v1/task/{id}""" - pipeline_id = request.match_info["id"] - manifest = get_manifest(pipeline_id) - if not manifest: - return error_response(f"Pipeline not found: {pipeline_id}", 404) - - version = manifest["current_version"] - artifacts = get_all_artifacts(pipeline_id, version) - - # Build artifact summary - artifact_summary = {} - for key, art in artifacts.items(): - artifact_summary[key] = { - "step": art.get("step"), - "type": art.get("type"), - "version": art.get("version"), - "saved_at": art.get("saved_at"), - } - - return json_response({ - "status": "ok", - "pipeline_id": manifest["pipeline_id"], - "user_id": manifest["user_id"], - "mode": manifest["mode"], - "title": manifest["title"], - "state": manifest["state"], - "current_version": version, - "created_at": manifest["created_at"], - "updated_at": manifest["updated_at"], - "steps": manifest["steps"], - "versions": manifest["versions"], - "artifacts": artifact_summary, - "is_running": is_running(pipeline_id), - }) - - -# === Node Input/Output === - -async def handle_node(request): - """GET /v1/task/{id}/node/{step}?version=N""" - pipeline_id = request.match_info["id"] - step = request.match_info["step"] - version = request.query.get("version") - - manifest = get_manifest(pipeline_id) - if not manifest: - return error_response(f"Pipeline not found: {pipeline_id}", 404) - - if step not in manifest["steps"]: - return error_response(f"Step not found: {step}") - - v = int(version) if version else manifest["current_version"] - - input_art = get_artifact(pipeline_id, v, step, "input") - output_art = get_artifact(pipeline_id, v, step, "output") - - step_info = manifest["steps"][step] - - return json_response({ - "status": "ok", - "pipeline_id": pipeline_id, - "step": step, - "display_name": step_info.get("display_name", step), - "version": v, - "state": step_info["state"], - "input": input_art["data"] if input_art else None, - "output": output_art["data"] if output_art else None, - "input_version": input_art.get("version") if input_art else None, - "output_version": output_art.get("version") if output_art else None, - }) - - -# === Modify Node + Cascade Rerun === - -async def handle_update(request): - """POST /v1/task/update - Body: {pipeline_id, updates: {step: {content: ...}}, rerun_from: "node"|"next"} - - rerun_from: - - "node": input changed, rerun from this step - - "next": output changed, rerun from next step(s) - """ - try: - body = await request.json() - except Exception: - return error_response("Invalid JSON body") - - pipeline_id = body.get("pipeline_id") - updates = body.get("updates", {}) - rerun_from = body.get("rerun_from", "node") - - if not pipeline_id: - return error_response("Missing pipeline_id") - if not updates: - return error_response("Missing or empty updates") - - manifest = get_manifest(pipeline_id) - if not manifest: - return error_response(f"Pipeline not found: {pipeline_id}", 404) - - if manifest["state"] in (PIPELINE_RUNNING,): - if is_running(pipeline_id): - return error_response("Pipeline is currently running. Pause it first.") - - mode = manifest["mode"] - - # Collect all steps that need rerunning - all_rerun = set() - for step_name, step_update in updates.items(): - if step_name not in manifest["steps"]: - return error_response(f"Unknown step: {step_name}") - - content = step_update.get("content", {}) - - if rerun_from == "node": - # Input modified - save as new input, rerun from this step - affected = get_cascade_rerun_steps(mode, step_name) - else: - # Output modified - save as new output, rerun from next steps - affected = get_rerun_from_next(mode, step_name) - - all_rerun.update(affected) - - # Create new version - change_desc = f"修改步骤: {', '.join(updates.keys())} (rerun_from={rerun_from})" - new_version = create_new_version(pipeline_id, change_desc) - - # Save modified artifacts - for step_name, step_update in updates.items(): - content = step_update.get("content", {}) - io_type = "input" if rerun_from == "node" else "output" - save_artifact(pipeline_id, new_version, step_name, io_type, content) - - # Reset affected steps to pending - all_rerun_list = sorted(all_rerun, key=lambda s: manifest["steps"].get(s, {}).get("order", 0)) - reset_steps(pipeline_id, all_rerun_list) - - # Update manifest - manifest = get_manifest(pipeline_id) - manifest["state"] = PIPELINE_RUNNING - save_manifest(pipeline_id, manifest) - - # Resume execution - await resume_pipeline(pipeline_id) - - return json_response({ - "status": "ok", - "pipeline_id": pipeline_id, - "new_version": new_version, - "rerun_steps": all_rerun_list, - "rerun_from": rerun_from, - "message": f"Created v{new_version}, rerunning {len(all_rerun_list)} steps", - }) - - -# === Pause/Resume === - -async def handle_pause(request): - """POST /v1/task/{id}/pause""" - pipeline_id = request.match_info["id"] - manifest = get_manifest(pipeline_id) - if not manifest: - return error_response(f"Pipeline not found: {pipeline_id}", 404) - - await stop_pipeline(pipeline_id) - manifest["state"] = PIPELINE_PAUSED - save_manifest(pipeline_id, manifest) - - return json_response({"status": "ok", "pipeline_id": pipeline_id, "state": PIPELINE_PAUSED}) - - -async def handle_resume(request): - """POST /v1/task/{id}/resume""" - pipeline_id = request.match_info["id"] - manifest = get_manifest(pipeline_id) - if not manifest: - return error_response(f"Pipeline not found: {pipeline_id}", 404) - - manifest["state"] = PIPELINE_RUNNING - save_manifest(pipeline_id, manifest) - await resume_pipeline(pipeline_id) - - return json_response({"status": "ok", "pipeline_id": pipeline_id, "state": PIPELINE_RUNNING}) - - -# === CORS Middleware === - -@web.middleware -async def cors_middleware(request, handler): - response = await handler(request) - response.headers["Access-Control-Allow-Origin"] = "*" - response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS" - response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization" - return response - - -# === App Setup === - -def create_app(): - app = web.Application(middlewares=[cors_middleware]) - - app.router.add_get("/v1/health", handle_health) - app.router.add_post("/v1/task/submit", handle_submit) - app.router.add_get("/v1/tasks", handle_list) - app.router.add_get("/v1/task/{id}", handle_detail) - app.router.add_get("/v1/task/{id}/node/{step}", handle_node) - app.router.add_post("/v1/task/update", handle_update) - app.router.add_post("/v1/task/{id}/pause", handle_pause) - app.router.add_post("/v1/task/{id}/resume", handle_resume) - - return app - - -def main(): - logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s") - port = int(os.environ.get("PIPELINE_PORT", "8190")) - host = os.environ.get("PIPELINE_HOST", "0.0.0.0") - - app = create_app() - logger.info(f"Starting Pipeline Service on {host}:{port}") - web.run_app(app, host=host, port=port) - - -if __name__ == "__main__": - main() diff --git a/pipeline_service/state.py b/pipeline_service/state.py index ee9b160..29b5267 100644 --- a/pipeline_service/state.py +++ b/pipeline_service/state.py @@ -1,62 +1,7 @@ -"""Pipeline state machine - step definitions, dependency graph, and ordering.""" +"""Pipeline state machine - dynamic DAG resolution from pipeline_steps table.""" -from typing import Dict, List, Optional, Tuple - -# Step definitions per mode: (step_name, dependencies, display_name) -MODE_STEPS: Dict[str, List[Tuple[str, List[str], str]]] = { - "audio_lyrics": [ # Mode A - ("audio_preparing", [], "音频准备"), - ("demucs_separating", ["audio_preparing"], "人声分离"), - ("lyric_generating", ["demucs_separating"], "歌词生成"), - ("lyric_evaluating", ["lyric_generating"], "歌词评估"), - ("music_generating", ["lyric_evaluating"], "音乐生成"), - ("music_polling", ["music_generating"], "音乐轮询"), - ("lyric_calibrating", ["music_polling", "demucs_separating"], "歌词校准"), - ("subtitle_rendering", ["lyric_calibrating"], "字幕渲染"), - ("subtitle_exporting", ["subtitle_rendering"], "字幕导出"), - ("character_designing", ["lyric_calibrating"], "角色设计"), - ("character_image_generating", ["character_designing"], "角色图生成"), - ("storyboard_generating", ["character_designing", "music_polling"], "分镜剧本"), - ("scene_video_generating", ["storyboard_generating", "character_image_generating"], "分镜视频生成"), - ("scene_video_evaluating", ["scene_video_generating"], "分镜视频评估"), - ("scene_video_concatenating", ["scene_video_evaluating"], "分镜视频拼接"), - ("ktv_synthesizing", ["scene_video_concatenating", "subtitle_rendering", "music_polling"], "KTV合成"), - ], - "video_lyrics": [ # Mode B - ("video_preparing", [], "视频准备"), - ("demucs_separating", ["video_preparing"], "人声分离"), - ("lyric_generating", ["demucs_separating"], "歌词生成"), - ("lyric_evaluating", ["lyric_generating"], "歌词评估"), - ("music_generating", ["lyric_evaluating"], "音乐生成"), - ("music_polling", ["music_generating"], "音乐轮询"), - ("lyric_calibrating", ["music_polling", "demucs_separating"], "歌词校准"), - ("subtitle_rendering", ["lyric_calibrating"], "字幕渲染"), - ("subtitle_exporting", ["subtitle_rendering"], "字幕导出"), - ("character_designing", ["lyric_calibrating"], "角色设计"), - ("character_image_generating", ["character_designing"], "角色图生成"), - ("storyboard_generating", ["character_designing", "music_polling"], "分镜剧本"), - ("scene_video_generating", ["storyboard_generating", "character_image_generating"], "分镜视频生成"), - ("scene_video_evaluating", ["scene_video_generating"], "分镜视频评估"), - ("scene_video_concatenating", ["scene_video_evaluating"], "分镜视频拼接"), - ("ktv_synthesizing", ["scene_video_concatenating", "subtitle_rendering", "music_polling"], "KTV合成"), - ], - "lyrics_only": [ # Mode C - ("lyric_generating", [], "歌词生成"), - ("lyric_evaluating", ["lyric_generating"], "歌词评估"), - ("music_generating", ["lyric_evaluating"], "音乐生成"), - ("music_polling", ["music_generating"], "音乐轮询"), - ("lyric_calibrating", ["music_polling"], "歌词校准"), - ("subtitle_rendering", ["lyric_calibrating"], "字幕渲染"), - ("subtitle_exporting", ["subtitle_rendering"], "字幕导出"), - ("character_designing", ["lyric_calibrating"], "角色设计"), - ("character_image_generating", ["character_designing"], "角色图生成"), - ("storyboard_generating", ["character_designing", "music_polling"], "分镜剧本"), - ("scene_video_generating", ["storyboard_generating", "character_image_generating"], "分镜视频生成"), - ("scene_video_evaluating", ["scene_video_generating"], "分镜视频评估"), - ("scene_video_concatenating", ["scene_video_evaluating"], "分镜视频拼接"), - ("ktv_synthesizing", ["scene_video_concatenating", "subtitle_rendering", "music_polling"], "KTV合成"), - ], -} +import json +from typing import Dict, List, Optional # Step states STATE_PENDING = "pending" @@ -65,49 +10,84 @@ STATE_COMPLETED = "completed" STATE_FAILED = "failed" STATE_SKIPPED = "skipped" -# Pipeline states -PIPELINE_SUBMITTED = "submitted" -PIPELINE_RUNNING = "running" -PIPELINE_COMPLETED = "completed" -PIPELINE_FAILED = "failed" -PIPELINE_PAUSED = "paused" # waiting for user modification +# Pipeline task states +TASK_SUBMITTED = "submitted" +TASK_RUNNING = "running" +TASK_COMPLETED = "completed" +TASK_FAILED = "failed" +TASK_PAUSED = "paused" +TASK_CANCELLED = "cancelled" -def get_step_graph(mode: str) -> List[Tuple[str, List[str], str]]: - """Get step definitions for a mode. Returns [(name, deps, display_name), ...]""" - if mode not in MODE_STEPS: - raise ValueError(f"Unknown mode: {mode}. Available: {list(MODE_STEPS.keys())}") - return MODE_STEPS[mode] - - -def build_dependency_map(mode: str) -> Dict[str, dict]: - """Build a dependency map for a mode. - Returns: {step_name: {"deps": [...], "dependents": [...], "display_name": "...", "order": int}} +def build_step_graph(step_records: list) -> Dict[str, dict]: + """Build dependency map from pipeline_steps table records. + + Args: + step_records: list of dicts from pipeline_steps table, each with: + - step_name, step_type, display_name, step_order, deps (JSON string) + + Returns: + {step_name: {"deps": [...], "dependents": [...], "step_type": "...", + "display_name": "...", "order": int}} """ - steps = get_step_graph(mode) dep_map = {} - for i, (name, deps, display) in enumerate(steps): + for rec in step_records: + name = rec['step_name'] + deps_raw = rec.get('deps', '[]') + if isinstance(deps_raw, str): + deps = json.loads(deps_raw) if deps_raw else [] + else: + deps = deps_raw if deps_raw else [] + dep_map[name] = { - "deps": list(deps), + "deps": deps, "dependents": [], - "display_name": display, - "order": i + 1, + "step_type": rec.get('step_type', name), + "display_name": rec.get('display_name', name), + "order": rec.get('step_order', 0), } - # Build reverse mapping + + # Build reverse mapping (dependents) for name, info in dep_map.items(): for dep in info["deps"]: if dep in dep_map: dep_map[dep]["dependents"].append(name) + return dep_map -def get_cascade_rerun_steps(mode: str, from_step: str) -> List[str]: - """Get all steps that need to be rerun when a step is modified. - BFS from the modified step through dependents. - Returns ordered list of step names. +def find_next_step(step_graph: Dict[str, dict], step_states: Dict[str, str]) -> Optional[str]: + """Find the next step to execute: pending + all deps completed. + + Args: + step_graph: from build_step_graph() + step_states: {step_name: current_state} + + Returns: + step_name or None if all done / blocked """ - dep_map = build_dependency_map(mode) - if from_step not in dep_map: + candidates = [] + for name, info in step_graph.items(): + if step_states.get(name) != STATE_PENDING: + continue + deps_ok = all( + step_states.get(dep) == STATE_COMPLETED + for dep in info["deps"] + ) + if deps_ok: + candidates.append(name) + + if not candidates: + return None + + # Sort by order, return first + candidates.sort(key=lambda s: step_graph[s]["order"]) + return candidates[0] + + +def get_cascade_rerun_steps(step_graph: Dict[str, dict], from_step: str) -> List[str]: + """BFS from modified step through dependents. Returns ordered list.""" + if from_step not in step_graph: return [] visited = set() @@ -120,26 +100,37 @@ def get_cascade_rerun_steps(mode: str, from_step: str) -> List[str]: continue visited.add(current) result.append(current) - for dep in dep_map.get(current, {}).get("dependents", []): + for dep in step_graph.get(current, {}).get("dependents", []): if dep not in visited: queue.append(dep) - # Sort by order - result.sort(key=lambda s: dep_map.get(s, {}).get("order", 999)) + result.sort(key=lambda s: step_graph.get(s, {}).get("order", 999)) return result -def get_rerun_from_next(mode: str, from_step: str) -> List[str]: - """When output is modified, rerun from the NEXT steps (dependents only, not the step itself).""" - dep_map = build_dependency_map(mode) - if from_step not in dep_map: +def get_rerun_from_next(step_graph: Dict[str, dict], from_step: str) -> List[str]: + """When output is modified, rerun from downstream dependents only.""" + if from_step not in step_graph: return [] - direct_dependents = dep_map[from_step]["dependents"] + direct_dependents = step_graph[from_step]["dependents"] all_steps = set() for d in direct_dependents: - all_steps.update(get_cascade_rerun_steps(mode, d)) + all_steps.update(get_cascade_rerun_steps(step_graph, d)) result = list(all_steps) - result.sort(key=lambda s: dep_map.get(s, {}).get("order", 999)) + result.sort(key=lambda s: step_graph.get(s, {}).get("order", 999)) return result + + +def check_all_completed(step_states: Dict[str, str]) -> bool: + """Check if all steps are completed or skipped.""" + for state in step_states.values(): + if state not in (STATE_COMPLETED, STATE_SKIPPED): + return False + return True + + +def check_any_failed(step_states: Dict[str, str]) -> bool: + """Check if any step has failed.""" + return any(s == STATE_FAILED for s in step_states.values()) diff --git a/pipeline_service/storage.py b/pipeline_service/storage.py index d30f0bf..50b51b5 100644 --- a/pipeline_service/storage.py +++ b/pipeline_service/storage.py @@ -1,258 +1,214 @@ -"""File-based storage for pipeline tasks, artifacts, and versions.""" +"""Pipeline storage layer - MySQL via sqlor.""" import json -import os -import shutil -import uuid -from datetime import datetime -from typing import Any, Dict, List, Optional +from typing import Dict, List, Optional +from sqlor.dbpools import DBPools +from appPublic.uniqueID import getID +from appPublic.log import debug -from .state import ( - STATE_PENDING, STATE_RUNNING, STATE_COMPLETED, STATE_FAILED, - PIPELINE_SUBMITTED, PIPELINE_RUNNING, - build_dependency_map, -) - -# Base data directory -DATA_DIR = os.environ.get("PIPELINE_DATA_DIR", os.path.expanduser("~/pipeline_data")) +DBNAME = "pipeline" -def _ensure_dir(path: str): - os.makedirs(path, exist_ok=True) +def _get_db(): + return DBPools(), DBNAME -def _pipeline_dir(pipeline_id: str) -> str: - return os.path.join(DATA_DIR, pipeline_id) +async def get_pipeline_steps(pipeline_id: str) -> list: + """Read step definitions from pipeline_steps table (defined by pipeline_core).""" + db, dbname = _get_db() + async with db.sqlorContext(dbname) as sor: + recs = await sor.R('pipeline_steps', {'pipeline_id': pipeline_id}, sort='step_order') + return list(recs) if recs else [] -def _version_dir(pipeline_id: str, version: int) -> str: - return os.path.join(_pipeline_dir(pipeline_id), f"v{version}") - - -def _manifest_path(pipeline_id: str) -> str: - return os.path.join(_pipeline_dir(pipeline_id), "manifest.json") - - -def _read_json(path: str) -> dict: - with open(path, "r", encoding="utf-8") as f: - return json.load(f) - - -def _write_json(path: str, data: dict): - _ensure_dir(os.path.dirname(path)) - with open(path, "w", encoding="utf-8") as f: - json.dump(data, f, ensure_ascii=False, indent=2) - - -def generate_pipeline_id() -> str: - return f"ktv_{uuid.uuid4().hex[:12]}" - - -def create_pipeline(user_id: str, mode: str, title: str, params: dict) -> dict: - """Create a new pipeline task. Returns the manifest.""" - pipeline_id = generate_pipeline_id() - dep_map = build_dependency_map(mode) - now = datetime.now().isoformat() - - # Build steps - steps = {} - for name, info in dep_map.items(): - steps[name] = { - "order": info["order"], - "display_name": info["display_name"], - "deps": info["deps"], - "dependents": info["dependents"], - "state": STATE_PENDING, - "version": 1, - "started_at": None, - "completed_at": None, - "error": None, +async def create_task(tenant_id: str, pipeline_id: str, owner_id: str, title: str, params: dict) -> str: + """Create a new pipeline task. Returns task_id.""" + db, dbname = _get_db() + async with db.sqlorContext(dbname) as sor: + task_id = getID() + data = { + "id": task_id, + "tenant_id": tenant_id, + "pipeline_id": pipeline_id, + "owner_id": owner_id, + "title": title, + "state": "submitted", + "current_version": 1, + "params": json.dumps(params, ensure_ascii=False, default=str), } + await sor.C('pipeline_tasks', data) + return task_id - manifest = { - "pipeline_id": pipeline_id, - "user_id": user_id, - "mode": mode, - "title": title, - "params": params, - "created_at": now, - "updated_at": now, - "current_version": 1, - "state": PIPELINE_SUBMITTED, - "steps": steps, - "versions": { - "1": { - "created_at": now, - "changes": "初始版本", + +async def init_task_steps(task_id: str, step_records: list): + """Create step execution records from pipeline step definitions.""" + db, dbname = _get_db() + async with db.sqlorContext(dbname) as sor: + for rec in step_records: + step_id = getID() + data = { + "id": step_id, + "task_id": task_id, + "step_name": rec['step_name'], + "step_type": rec.get('step_type', rec['step_name']), + "display_name": rec.get('display_name', rec['step_name']), + "step_order": rec.get('step_order', 0), + "deps": rec.get('deps', '[]') if isinstance(rec.get('deps'), str) else json.dumps(rec.get('deps', [])), + "state": "pending", } - }, - } - - # Write to disk - pdir = _pipeline_dir(pipeline_id) - _ensure_dir(pdir) - _ensure_dir(_version_dir(pipeline_id, 1)) - _write_json(_manifest_path(pipeline_id), manifest) - - # Store initial params as artifact - save_artifact(pipeline_id, 1, "_params", "input", params) - - # Index by user - _add_to_user_index(user_id, pipeline_id) - - return manifest + await sor.C('pipeline_task_steps', data) -def get_manifest(pipeline_id: str) -> Optional[dict]: - """Read pipeline manifest.""" - path = _manifest_path(pipeline_id) - if not os.path.exists(path): - return None - return _read_json(path) +async def get_task(tenant_id: str, task_id: str) -> Optional[dict]: + """Get task record, filtered by tenant.""" + db, dbname = _get_db() + async with db.sqlorContext(dbname) as sor: + recs = await sor.R('pipeline_tasks', {'id': task_id, 'tenant_id': tenant_id}) + if not recs: + return None + rec = recs[0] + if hasattr(rec, '__dict__'): + return {k: getattr(rec, k) for k in dir(rec) if not k.startswith('_')} + return dict(rec) -def save_manifest(pipeline_id: str, manifest: dict): - """Save pipeline manifest.""" - manifest["updated_at"] = datetime.now().isoformat() - _write_json(_manifest_path(pipeline_id), manifest) +async def get_task_steps(task_id: str) -> list: + """Get all step records for a task.""" + db, dbname = _get_db() + async with db.sqlorContext(dbname) as sor: + recs = await sor.R('pipeline_task_steps', {'task_id': task_id}, sort='step_order') + result = [] + for rec in (recs or []): + if hasattr(rec, '__dict__'): + result.append({k: getattr(rec, k) for k in dir(rec) if not k.startswith('_')}) + else: + result.append(dict(rec)) + return result -def save_artifact(pipeline_id: str, version: int, step: str, io_type: str, data: Any): - """Save artifact data for a step. - io_type: 'input' or 'output' - """ - vdir = _version_dir(pipeline_id, version) - _ensure_dir(vdir) - path = os.path.join(vdir, f"{step}.{io_type}.json") - _write_json(path, {"step": step, "version": version, "type": io_type, "data": data, - "saved_at": datetime.now().isoformat()}) +async def get_step_states(task_id: str) -> Dict[str, str]: + """Get {step_name: state} for all steps of a task.""" + steps = await get_task_steps(task_id) + return {s['step_name']: s['state'] for s in steps} -def get_artifact(pipeline_id: str, version: int, step: str, io_type: str) -> Optional[dict]: - """Read artifact data for a step.""" - vdir = _version_dir(pipeline_id, version) - path = os.path.join(vdir, f"{step}.{io_type}.json") - if not os.path.exists(path): - # Try previous versions - for v in range(version, 0, -1): - path = os.path.join(_version_dir(pipeline_id, v), f"{step}.{io_type}.json") - if os.path.exists(path): - return _read_json(path) - return None - return _read_json(path) +async def update_task_state(task_id: str, state: str): + """Update task state.""" + db, dbname = _get_db() + async with db.sqlorContext(dbname) as sor: + await sor.U('pipeline_tasks', {"id": task_id, "state": state}) -def get_all_artifacts(pipeline_id: str, version: int) -> Dict[str, dict]: - """Get all artifacts for a specific version.""" - vdir = _version_dir(pipeline_id, version) - if not os.path.exists(vdir): - return {} - artifacts = {} - for fname in os.listdir(vdir): - if fname.endswith(".json"): - fpath = os.path.join(vdir, fname) - try: - data = _read_json(fpath) - key = fname.replace(".json", "") - artifacts[key] = data - except Exception: - pass - return artifacts +async def update_task_version(task_id: str, version: int): + """Update task current_version.""" + db, dbname = _get_db() + async with db.sqlorContext(dbname) as sor: + await sor.U('pipeline_tasks', {"id": task_id, "current_version": version}) -def create_new_version(pipeline_id: str, changes: str) -> int: - """Create a new version directory. Returns new version number.""" - manifest = get_manifest(pipeline_id) - if not manifest: - raise ValueError(f"Pipeline not found: {pipeline_id}") - - new_version = manifest["current_version"] + 1 - manifest["current_version"] = new_version - manifest["versions"][str(new_version)] = { - "created_at": datetime.now().isoformat(), - "changes": changes, - } - - # Copy previous version artifacts to new version (hard links) - prev_vdir = _version_dir(pipeline_id, new_version - 1) - new_vdir = _version_dir(pipeline_id, new_version) - _ensure_dir(new_vdir) - - if os.path.exists(prev_vdir): - for fname in os.listdir(prev_vdir): - src = os.path.join(prev_vdir, fname) - dst = os.path.join(new_vdir, fname) - if os.path.isfile(src) and not os.path.exists(dst): - try: - os.link(src, dst) # hard link - except OSError: - shutil.copy2(src, dst) - - save_manifest(pipeline_id, manifest) - return new_version +async def update_step_state(task_id: str, step_name: str, state: str, error_msg: str = None): + """Update step state.""" + db, dbname = _get_db() + async with db.sqlorContext(dbname) as sor: + recs = await sor.R('pipeline_task_steps', {'task_id': task_id, 'step_name': step_name}) + if not recs: + return + rec = recs[0] + rec_id = rec.id if hasattr(rec, 'id') else rec['id'] + data = {"id": rec_id, "state": state} + if state == "running": + data["started_at"] = "NOW()" + elif state in ("completed", "failed"): + data["completed_at"] = "NOW()" + if error_msg: + data["error_msg"] = error_msg + await sor.U('pipeline_task_steps', data) -def reset_steps(pipeline_id: str, step_names: List[str]): +async def save_artifact(task_id: str, version: int, step_name: str, io_type: str, data: dict): + """Save or update an artifact.""" + db, dbname = _get_db() + async with db.sqlorContext(dbname) as sor: + # Check if exists + existing = await sor.R('pipeline_artifacts', { + 'task_id': task_id, 'version': version, + 'step_name': step_name, 'io_type': io_type + }) + data_json = json.dumps(data, ensure_ascii=False, default=str) + if existing: + rec = existing[0] + rec_id = rec.id if hasattr(rec, 'id') else rec['id'] + await sor.U('pipeline_artifacts', {"id": rec_id, "data": data_json}) + else: + await sor.C('pipeline_artifacts', { + "id": getID(), + "task_id": task_id, + "version": version, + "step_name": step_name, + "io_type": io_type, + "data": data_json, + }) + + +async def get_artifact(task_id: str, version: int, step_name: str, io_type: str) -> Optional[dict]: + """Get artifact data.""" + db, dbname = _get_db() + async with db.sqlorContext(dbname) as sor: + recs = await sor.R('pipeline_artifacts', { + 'task_id': task_id, 'version': version, + 'step_name': step_name, 'io_type': io_type + }) + if not recs: + return None + rec = recs[0] + raw = rec.data if hasattr(rec, 'data') else rec['data'] + if isinstance(raw, str): + return json.loads(raw) + return raw + + +async def get_all_artifacts(task_id: str, version: int) -> Dict[str, dict]: + """Get all artifacts for a task version. Returns {step_name_io_type: data}.""" + db, dbname = _get_db() + async with db.sqlorContext(dbname) as sor: + recs = await sor.R('pipeline_artifacts', {'task_id': task_id, 'version': version}) + result = {} + for rec in (recs or []): + sn = rec.step_name if hasattr(rec, 'step_name') else rec['step_name'] + io = rec.io_type if hasattr(rec, 'io_type') else rec['io_type'] + raw = rec.data if hasattr(rec, 'data') else rec['data'] + key = f"{sn}_{io}" + result[key] = json.loads(raw) if isinstance(raw, str) else raw + return result + + +async def list_tasks(tenant_id: str, pipeline_id: str = None, limit: int = 100) -> list: + """List tasks for a tenant.""" + db, dbname = _get_db() + async with db.sqlorContext(dbname) as sor: + filters = {'tenant_id': tenant_id} + if pipeline_id: + filters['pipeline_id'] = pipeline_id + recs = await sor.R('pipeline_tasks', filters, sort='created_at desc') + result = [] + for rec in (recs or [])[:limit]: + if hasattr(rec, '__dict__'): + result.append({k: getattr(rec, k) for k in dir(rec) if not k.startswith('_')}) + else: + result.append(dict(rec)) + return result + + +async def reset_steps(task_id: str, step_names: list): """Reset specified steps to pending state.""" - manifest = get_manifest(pipeline_id) - if not manifest: - return - for name in step_names: - if name in manifest["steps"]: - manifest["steps"][name]["state"] = STATE_PENDING - manifest["steps"][name]["error"] = None - manifest["steps"][name]["started_at"] = None - manifest["steps"][name]["completed_at"] = None - save_manifest(pipeline_id, manifest) - - -def update_step_state(pipeline_id: str, step: str, state: str, error: str = None): - """Update a step's state.""" - manifest = get_manifest(pipeline_id) - if not manifest or step not in manifest["steps"]: - return - now = datetime.now().isoformat() - manifest["steps"][step]["state"] = state - if state == STATE_RUNNING: - manifest["steps"][step]["started_at"] = now - elif state in (STATE_COMPLETED, STATE_FAILED): - manifest["steps"][step]["completed_at"] = now - if error: - manifest["steps"][step]["error"] = error - - # Update pipeline state - all_states = [s["state"] for s in manifest["steps"].values()] - if all(s == STATE_COMPLETED for s in all_states): - manifest["state"] = "completed" - elif any(s == STATE_FAILED for s in all_states): - manifest["state"] = "failed" - elif any(s == STATE_RUNNING for s in all_states): - manifest["state"] = PIPELINE_RUNNING - - save_manifest(pipeline_id, manifest) - - -# === User Index === - -def _user_index_path(user_id: str) -> str: - return os.path.join(DATA_DIR, f"_user_{user_id}.json") - - -def _add_to_user_index(user_id: str, pipeline_id: str): - path = _user_index_path(user_id) - if os.path.exists(path): - data = _read_json(path) - else: - data = {"user_id": user_id, "pipelines": []} - if pipeline_id not in data["pipelines"]: - data["pipelines"].append(pipeline_id) - _write_json(path, data) - - -def get_user_pipelines(user_id: str) -> List[str]: - """Get all pipeline IDs for a user.""" - path = _user_index_path(user_id) - if not os.path.exists(path): - return [] - data = _read_json(path) - return data.get("pipelines", []) + db, dbname = _get_db() + async with db.sqlorContext(dbname) as sor: + for sn in step_names: + recs = await sor.R('pipeline_task_steps', {'task_id': task_id, 'step_name': sn}) + if recs: + rec = recs[0] + rec_id = rec.id if hasattr(rec, 'id') else rec['id'] + await sor.U('pipeline_task_steps', { + "id": rec_id, "state": "pending", + "error_msg": None, "started_at": None, "completed_at": None + }) diff --git a/pyproject.toml b/pyproject.toml index d74c297..8961ab9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,12 +3,12 @@ requires = ["setuptools>=45", "wheel"] build-backend = "setuptools.build_meta" [project] -name = "pipeline-service" -version = "1.0.0" -description = "Hermes Pipeline Backend - manages KTV/MV production pipeline tasks" +name = "pipeline_service" +version = "2.0.0" +description = "通用产线执行引擎模块 — DAG调度、多租户隔离、可插拔步骤处理器、artifact版本管理" requires-python = ">=3.8" dependencies = [ - "aiohttp>=3.8", + "sqlor", ] [tool.setuptools.packages.find] diff --git a/scripts/load_path.py b/scripts/load_path.py new file mode 100644 index 0000000..47c3b97 --- /dev/null +++ b/scripts/load_path.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +"""pipeline_service 模块 RBAC 权限注册。 + +注意: pipeline_service 是纯后端引擎模块,无 wwwroot 前端文件。 +此脚本仅注册模块 API 可能被引用的路径。 +宿主应用的 load_path.py 负责注册实际的 dspy 路径。 +""" + +import os +import sys +import subprocess + +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) + +# Find Sage root +SAGE_ROOT = None +for candidate in [ + os.path.join(SCRIPT_DIR, "..", ".."), + os.path.expanduser("~/repos/sage"), + os.path.expanduser("~/sage"), +]: + if os.path.isdir(os.path.join(candidate, "wwwroot")) and os.path.isdir(os.path.join(candidate, "py3")): + SAGE_ROOT = os.path.abspath(candidate) + break + +if not SAGE_ROOT: + print("ERROR: Cannot find Sage root directory") + sys.exit(1) + +SET_ROLE_PERM = os.path.join(SAGE_ROOT, "set_role_perm.py") +PYTHON = os.path.join(SAGE_ROOT, "py3", "bin", "python") + +# pipeline_service is a backend-only module, no wwwroot paths needed. +# The host app's load_path.py handles the dspy paths that call pipeline_submit etc. +# This script exists for consistency and future extensibility. + +PATHS_LOGINED = [] +PATHS_ANY = [] + + +def register_paths(): + for path in PATHS_ANY: + subprocess.run([PYTHON, SET_ROLE_PERM, "any", path], cwd=SAGE_ROOT) + print(f" any: {path}") + + for path in PATHS_LOGINED: + subprocess.run([PYTHON, SET_ROLE_PERM, "logined", path], cwd=SAGE_ROOT) + print(f" logined: {path}") + + +if __name__ == "__main__": + print(f"=== pipeline_service RBAC registration ===") + print(f"Sage root: {SAGE_ROOT}") + if not PATHS_LOGINED and not PATHS_ANY: + print("No paths to register (backend-only module)") + else: + register_paths() + print("Done.")