diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..45eef94 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +__pycache__/ +*.pyc +*.egg-info/ +build/ diff --git a/pipeline_service/__init__.py b/pipeline_service/__init__.py new file mode 100644 index 0000000..cdeb3ca --- /dev/null +++ b/pipeline_service/__init__.py @@ -0,0 +1,3 @@ +"""Pipeline Service - Hermes backend for KTV/MV production pipelines""" + +__version__ = "1.0.0" diff --git a/pipeline_service/executor.py b/pipeline_service/executor.py new file mode 100644 index 0000000..e9b0c1d --- /dev/null +++ b/pipeline_service/executor.py @@ -0,0 +1,182 @@ +"""Pipeline step executor - runs steps asynchronously. + +Each step function receives (pipeline_id, manifest, params) and returns output data. +The executor handles state transitions, artifact storage, and dependency resolution. +""" + +import asyncio +import logging +from typing import Dict, Optional + +from .state import ( + STATE_PENDING, STATE_RUNNING, STATE_COMPLETED, STATE_FAILED, + PIPELINE_RUNNING, PIPELINE_COMPLETED, PIPELINE_FAILED, PIPELINE_PAUSED, + build_dependency_map, +) +from .storage import ( + get_manifest, save_manifest, save_artifact, + update_step_state, create_new_version, reset_steps, +) + +logger = logging.getLogger("pipeline.executor") + +# Active pipeline execution tasks (pipeline_id -> asyncio.Task) +_active_tasks: Dict[str, asyncio.Task] = {} + + +async def start_pipeline(pipeline_id: str): + """Start executing a pipeline from the beginning.""" + task = asyncio.create_task(_run_pipeline(pipeline_id)) + _active_tasks[pipeline_id] = task + return task + + +async def resume_pipeline(pipeline_id: str): + """Resume a paused pipeline.""" + task = asyncio.create_task(_run_pipeline(pipeline_id)) + _active_tasks[pipeline_id] = task + return task + + +async def stop_pipeline(pipeline_id: str): + """Cancel a running pipeline.""" + task = _active_tasks.get(pipeline_id) + if task and not task.done(): + task.cancel() + del _active_tasks[pipeline_id] + return True + return False + + +def is_running(pipeline_id: str) -> bool: + """Check if a pipeline is currently running.""" + task = _active_tasks.get(pipeline_id) + return task is not None and not task.done() + + +async def _run_pipeline(pipeline_id: str): + """Main pipeline execution loop.""" + try: + while True: + manifest = get_manifest(pipeline_id) + if not manifest: + logger.error(f"Pipeline {pipeline_id} not found") + break + + if manifest["state"] == PIPELINE_PAUSED: + logger.info(f"Pipeline {pipeline_id} paused, waiting for user action") + break + + if manifest["state"] == PIPELINE_COMPLETED: + logger.info(f"Pipeline {pipeline_id} already completed") + break + + # Find next pending step with all deps completed + next_step = _find_next_step(manifest) + if not next_step: + # All steps done + manifest["state"] = PIPELINE_COMPLETED + save_manifest(pipeline_id, manifest) + logger.info(f"Pipeline {pipeline_id} completed") + break + + # Execute the step + await _execute_step(pipeline_id, next_step) + + except asyncio.CancelledError: + logger.info(f"Pipeline {pipeline_id} cancelled") + except Exception as e: + logger.error(f"Pipeline {pipeline_id} error: {e}") + finally: + _active_tasks.pop(pipeline_id, None) + + +def _find_next_step(manifest: dict) -> Optional[str]: + """Find the next step to execute (pending, all deps completed).""" + steps = manifest.get("steps", {}) + for name, info in sorted(steps.items(), key=lambda x: x[1].get("order", 0)): + if info["state"] != STATE_PENDING: + continue + # Check all dependencies are completed + deps_ok = all( + steps.get(dep, {}).get("state") == STATE_COMPLETED + for dep in info.get("deps", []) + ) + if deps_ok: + return name + return None + + +async def _execute_step(pipeline_id: str, step_name: str): + """Execute a single pipeline step.""" + update_step_state(pipeline_id, step_name, STATE_RUNNING) + + try: + # Load input artifacts from dependencies + manifest = get_manifest(pipeline_id) + version = manifest["current_version"] + input_data = await _gather_step_inputs(pipeline_id, step_name, version, manifest) + + # Save input artifact + save_artifact(pipeline_id, version, step_name, "input", input_data) + + # Execute step handler + handler = STEP_HANDLERS.get(step_name, _default_handler) + output_data = await handler(pipeline_id, step_name, input_data, manifest) + + # Save output artifact + save_artifact(pipeline_id, version, step_name, "output", output_data) + update_step_state(pipeline_id, step_name, STATE_COMPLETED) + + logger.info(f"Step {step_name} completed for {pipeline_id}") + + except Exception as e: + logger.error(f"Step {step_name} failed for {pipeline_id}: {e}") + update_step_state(pipeline_id, step_name, STATE_FAILED, str(e)) + + +async def _gather_step_inputs(pipeline_id: str, step_name: str, version: int, manifest: dict) -> dict: + """Gather input data for a step from its dependencies' outputs.""" + from .storage import get_artifact + steps = manifest.get("steps", {}) + step_info = steps.get(step_name, {}) + deps = step_info.get("deps", []) + + inputs = {} + for dep in deps: + dep_artifact = get_artifact(pipeline_id, version, dep, "output") + if dep_artifact: + inputs[dep] = dep_artifact.get("data", {}) + + # Also include initial params + if "_params" in inputs: + inputs["params"] = inputs.pop("_params") + + # Check if there's a user-modified input for this step + user_input = get_artifact(pipeline_id, version, step_name, "input") + if user_input and user_input.get("data"): + # User modified input exists, use it + return user_input["data"] + + return inputs + + +# === Step Handlers === + +async def _default_handler(pipeline_id: str, step_name: str, input_data: dict, manifest: dict) -> dict: + """Default handler - stub that returns input as output (for testing).""" + logger.info(f"Default handler for {step_name}, pipeline {pipeline_id}") + # Simulate some processing time + await asyncio.sleep(0.5) + return {"step": step_name, "status": "completed", "input_summary": str(input_data)[:200]} + + +# Real handlers will be registered here +STEP_HANDLERS = { + # Will be populated by actual KTV pipeline step implementations +} + + +def register_handler(step_name: str, handler): + """Register a step handler function.""" + STEP_HANDLERS[step_name] = handler diff --git a/pipeline_service/server.py b/pipeline_service/server.py new file mode 100644 index 0000000..95aad24 --- /dev/null +++ b/pipeline_service/server.py @@ -0,0 +1,361 @@ +"""Pipeline Service - aiohttp HTTP API server. + +Endpoints: + POST /v1/task/submit - Create and start a new pipeline + GET /v1/tasks - List pipelines for a user + GET /v1/task/{id} - Get pipeline detail with node tree + GET /v1/task/{id}/node/{step} - Get node input/output + POST /v1/task/update - Modify node artifact + cascade rerun + POST /v1/task/{id}/pause - Pause a running pipeline + POST /v1/task/{id}/resume - Resume a paused pipeline + GET /v1/health - Health check +""" + +import json +import logging +import os +from aiohttp import web + +from .state import ( + PIPELINE_SUBMITTED, PIPELINE_RUNNING, PIPELINE_COMPLETED, PIPELINE_FAILED, PIPELINE_PAUSED, + get_cascade_rerun_steps, get_rerun_from_next, build_dependency_map, +) +from .storage import ( + create_pipeline, get_manifest, save_manifest, + get_artifact, save_artifact, get_all_artifacts, + create_new_version, reset_steps, get_user_pipelines, +) +from .executor import start_pipeline, resume_pipeline, stop_pipeline, is_running + +logger = logging.getLogger("pipeline.api") + +JSON_HEADERS = {"Content-Type": "application/json; charset=utf-8"} + + +def json_response(data, status=200): + return web.Response( + text=json.dumps(data, ensure_ascii=False), + status=status, + headers=JSON_HEADERS, + ) + + +def error_response(message, status=400): + return json_response({"status": "error", "message": message}, status=status) + + +# === Health === + +async def handle_health(request): + return json_response({"status": "ok", "service": "pipeline-service", "version": "1.0.0"}) + + +# === Submit New Pipeline === + +async def handle_submit(request): + """POST /v1/task/submit + Body: {mode, title, lyrics?, input_audio?, input_video?, user_id, ...} + """ + try: + body = await request.json() + except Exception: + return error_response("Invalid JSON body") + + mode = body.get("mode") + title = body.get("title") + user_id = body.get("user_id") + + if not mode: + return error_response("Missing required field: mode") + if not title: + return error_response("Missing required field: title") + if not user_id: + return error_response("Missing required field: user_id") + + # Validate mode + valid_modes = ["audio_lyrics", "video_lyrics", "lyrics_only"] + if mode not in valid_modes: + return error_response(f"Invalid mode: {mode}. Must be one of: {valid_modes}") + + # Validate mode-specific requirements + if mode == "audio_lyrics" and not body.get("input_audio"): + return error_response("Mode audio_lyrics requires input_audio") + if mode == "video_lyrics" and not body.get("input_video"): + return error_response("Mode video_lyrics requires input_video") + + # Extract params (everything except mode/title/user_id) + params = {k: v for k, v in body.items() if k not in ("mode", "title", "user_id")} + + try: + manifest = create_pipeline(user_id, mode, title, params) + except Exception as e: + return error_response(f"Failed to create pipeline: {e}", 500) + + pipeline_id = manifest["pipeline_id"] + + # Start execution in background + await start_pipeline(pipeline_id) + + return json_response({ + "status": "ok", + "pipeline_id": pipeline_id, + "mode": mode, + "title": title, + "message": f"Pipeline created and started", + }) + + +# === List User Pipelines === + +async def handle_list(request): + """GET /v1/tasks?user_id=xxx""" + user_id = request.query.get("user_id") + if not user_id: + return error_response("Missing query param: user_id") + + pipeline_ids = get_user_pipelines(user_id) + tasks = [] + for pid in pipeline_ids: + manifest = get_manifest(pid) + if manifest: + tasks.append({ + "pipeline_id": manifest["pipeline_id"], + "mode": manifest["mode"], + "title": manifest["title"], + "state": manifest["state"], + "current_version": manifest["current_version"], + "created_at": manifest["created_at"], + "updated_at": manifest["updated_at"], + }) + + return json_response({"status": "ok", "tasks": tasks, "total": len(tasks)}) + + +# === Pipeline Detail === + +async def handle_detail(request): + """GET /v1/task/{id}""" + pipeline_id = request.match_info["id"] + manifest = get_manifest(pipeline_id) + if not manifest: + return error_response(f"Pipeline not found: {pipeline_id}", 404) + + version = manifest["current_version"] + artifacts = get_all_artifacts(pipeline_id, version) + + # Build artifact summary + artifact_summary = {} + for key, art in artifacts.items(): + artifact_summary[key] = { + "step": art.get("step"), + "type": art.get("type"), + "version": art.get("version"), + "saved_at": art.get("saved_at"), + } + + return json_response({ + "status": "ok", + "pipeline_id": manifest["pipeline_id"], + "user_id": manifest["user_id"], + "mode": manifest["mode"], + "title": manifest["title"], + "state": manifest["state"], + "current_version": version, + "created_at": manifest["created_at"], + "updated_at": manifest["updated_at"], + "steps": manifest["steps"], + "versions": manifest["versions"], + "artifacts": artifact_summary, + "is_running": is_running(pipeline_id), + }) + + +# === Node Input/Output === + +async def handle_node(request): + """GET /v1/task/{id}/node/{step}?version=N""" + pipeline_id = request.match_info["id"] + step = request.match_info["step"] + version = request.query.get("version") + + manifest = get_manifest(pipeline_id) + if not manifest: + return error_response(f"Pipeline not found: {pipeline_id}", 404) + + if step not in manifest["steps"]: + return error_response(f"Step not found: {step}") + + v = int(version) if version else manifest["current_version"] + + input_art = get_artifact(pipeline_id, v, step, "input") + output_art = get_artifact(pipeline_id, v, step, "output") + + step_info = manifest["steps"][step] + + return json_response({ + "status": "ok", + "pipeline_id": pipeline_id, + "step": step, + "display_name": step_info.get("display_name", step), + "version": v, + "state": step_info["state"], + "input": input_art["data"] if input_art else None, + "output": output_art["data"] if output_art else None, + "input_version": input_art.get("version") if input_art else None, + "output_version": output_art.get("version") if output_art else None, + }) + + +# === Modify Node + Cascade Rerun === + +async def handle_update(request): + """POST /v1/task/update + Body: {pipeline_id, updates: {step: {content: ...}}, rerun_from: "node"|"next"} + + rerun_from: + - "node": input changed, rerun from this step + - "next": output changed, rerun from next step(s) + """ + try: + body = await request.json() + except Exception: + return error_response("Invalid JSON body") + + pipeline_id = body.get("pipeline_id") + updates = body.get("updates", {}) + rerun_from = body.get("rerun_from", "node") + + if not pipeline_id: + return error_response("Missing pipeline_id") + if not updates: + return error_response("Missing or empty updates") + + manifest = get_manifest(pipeline_id) + if not manifest: + return error_response(f"Pipeline not found: {pipeline_id}", 404) + + if manifest["state"] in (PIPELINE_RUNNING,): + if is_running(pipeline_id): + return error_response("Pipeline is currently running. Pause it first.") + + mode = manifest["mode"] + + # Collect all steps that need rerunning + all_rerun = set() + for step_name, step_update in updates.items(): + if step_name not in manifest["steps"]: + return error_response(f"Unknown step: {step_name}") + + content = step_update.get("content", {}) + + if rerun_from == "node": + # Input modified - save as new input, rerun from this step + affected = get_cascade_rerun_steps(mode, step_name) + else: + # Output modified - save as new output, rerun from next steps + affected = get_rerun_from_next(mode, step_name) + + all_rerun.update(affected) + + # Create new version + change_desc = f"修改步骤: {', '.join(updates.keys())} (rerun_from={rerun_from})" + new_version = create_new_version(pipeline_id, change_desc) + + # Save modified artifacts + for step_name, step_update in updates.items(): + content = step_update.get("content", {}) + io_type = "input" if rerun_from == "node" else "output" + save_artifact(pipeline_id, new_version, step_name, io_type, content) + + # Reset affected steps to pending + all_rerun_list = sorted(all_rerun, key=lambda s: manifest["steps"].get(s, {}).get("order", 0)) + reset_steps(pipeline_id, all_rerun_list) + + # Update manifest + manifest = get_manifest(pipeline_id) + manifest["state"] = PIPELINE_RUNNING + save_manifest(pipeline_id, manifest) + + # Resume execution + await resume_pipeline(pipeline_id) + + return json_response({ + "status": "ok", + "pipeline_id": pipeline_id, + "new_version": new_version, + "rerun_steps": all_rerun_list, + "rerun_from": rerun_from, + "message": f"Created v{new_version}, rerunning {len(all_rerun_list)} steps", + }) + + +# === Pause/Resume === + +async def handle_pause(request): + """POST /v1/task/{id}/pause""" + pipeline_id = request.match_info["id"] + manifest = get_manifest(pipeline_id) + if not manifest: + return error_response(f"Pipeline not found: {pipeline_id}", 404) + + await stop_pipeline(pipeline_id) + manifest["state"] = PIPELINE_PAUSED + save_manifest(pipeline_id, manifest) + + return json_response({"status": "ok", "pipeline_id": pipeline_id, "state": PIPELINE_PAUSED}) + + +async def handle_resume(request): + """POST /v1/task/{id}/resume""" + pipeline_id = request.match_info["id"] + manifest = get_manifest(pipeline_id) + if not manifest: + return error_response(f"Pipeline not found: {pipeline_id}", 404) + + manifest["state"] = PIPELINE_RUNNING + save_manifest(pipeline_id, manifest) + await resume_pipeline(pipeline_id) + + return json_response({"status": "ok", "pipeline_id": pipeline_id, "state": PIPELINE_RUNNING}) + + +# === CORS Middleware === + +@web.middleware +async def cors_middleware(request, handler): + response = await handler(request) + response.headers["Access-Control-Allow-Origin"] = "*" + response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS" + response.headers["Access-Control-Allow-Headers"] = "Content-Type, Authorization" + return response + + +# === App Setup === + +def create_app(): + app = web.Application(middlewares=[cors_middleware]) + + app.router.add_get("/v1/health", handle_health) + app.router.add_post("/v1/task/submit", handle_submit) + app.router.add_get("/v1/tasks", handle_list) + app.router.add_get("/v1/task/{id}", handle_detail) + app.router.add_get("/v1/task/{id}/node/{step}", handle_node) + app.router.add_post("/v1/task/update", handle_update) + app.router.add_post("/v1/task/{id}/pause", handle_pause) + app.router.add_post("/v1/task/{id}/resume", handle_resume) + + return app + + +def main(): + logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s") + port = int(os.environ.get("PIPELINE_PORT", "8190")) + host = os.environ.get("PIPELINE_HOST", "0.0.0.0") + + app = create_app() + logger.info(f"Starting Pipeline Service on {host}:{port}") + web.run_app(app, host=host, port=port) + + +if __name__ == "__main__": + main() diff --git a/pipeline_service/state.py b/pipeline_service/state.py new file mode 100644 index 0000000..ee9b160 --- /dev/null +++ b/pipeline_service/state.py @@ -0,0 +1,145 @@ +"""Pipeline state machine - step definitions, dependency graph, and ordering.""" + +from typing import Dict, List, Optional, Tuple + +# Step definitions per mode: (step_name, dependencies, display_name) +MODE_STEPS: Dict[str, List[Tuple[str, List[str], str]]] = { + "audio_lyrics": [ # Mode A + ("audio_preparing", [], "音频准备"), + ("demucs_separating", ["audio_preparing"], "人声分离"), + ("lyric_generating", ["demucs_separating"], "歌词生成"), + ("lyric_evaluating", ["lyric_generating"], "歌词评估"), + ("music_generating", ["lyric_evaluating"], "音乐生成"), + ("music_polling", ["music_generating"], "音乐轮询"), + ("lyric_calibrating", ["music_polling", "demucs_separating"], "歌词校准"), + ("subtitle_rendering", ["lyric_calibrating"], "字幕渲染"), + ("subtitle_exporting", ["subtitle_rendering"], "字幕导出"), + ("character_designing", ["lyric_calibrating"], "角色设计"), + ("character_image_generating", ["character_designing"], "角色图生成"), + ("storyboard_generating", ["character_designing", "music_polling"], "分镜剧本"), + ("scene_video_generating", ["storyboard_generating", "character_image_generating"], "分镜视频生成"), + ("scene_video_evaluating", ["scene_video_generating"], "分镜视频评估"), + ("scene_video_concatenating", ["scene_video_evaluating"], "分镜视频拼接"), + ("ktv_synthesizing", ["scene_video_concatenating", "subtitle_rendering", "music_polling"], "KTV合成"), + ], + "video_lyrics": [ # Mode B + ("video_preparing", [], "视频准备"), + ("demucs_separating", ["video_preparing"], "人声分离"), + ("lyric_generating", ["demucs_separating"], "歌词生成"), + ("lyric_evaluating", ["lyric_generating"], "歌词评估"), + ("music_generating", ["lyric_evaluating"], "音乐生成"), + ("music_polling", ["music_generating"], "音乐轮询"), + ("lyric_calibrating", ["music_polling", "demucs_separating"], "歌词校准"), + ("subtitle_rendering", ["lyric_calibrating"], "字幕渲染"), + ("subtitle_exporting", ["subtitle_rendering"], "字幕导出"), + ("character_designing", ["lyric_calibrating"], "角色设计"), + ("character_image_generating", ["character_designing"], "角色图生成"), + ("storyboard_generating", ["character_designing", "music_polling"], "分镜剧本"), + ("scene_video_generating", ["storyboard_generating", "character_image_generating"], "分镜视频生成"), + ("scene_video_evaluating", ["scene_video_generating"], "分镜视频评估"), + ("scene_video_concatenating", ["scene_video_evaluating"], "分镜视频拼接"), + ("ktv_synthesizing", ["scene_video_concatenating", "subtitle_rendering", "music_polling"], "KTV合成"), + ], + "lyrics_only": [ # Mode C + ("lyric_generating", [], "歌词生成"), + ("lyric_evaluating", ["lyric_generating"], "歌词评估"), + ("music_generating", ["lyric_evaluating"], "音乐生成"), + ("music_polling", ["music_generating"], "音乐轮询"), + ("lyric_calibrating", ["music_polling"], "歌词校准"), + ("subtitle_rendering", ["lyric_calibrating"], "字幕渲染"), + ("subtitle_exporting", ["subtitle_rendering"], "字幕导出"), + ("character_designing", ["lyric_calibrating"], "角色设计"), + ("character_image_generating", ["character_designing"], "角色图生成"), + ("storyboard_generating", ["character_designing", "music_polling"], "分镜剧本"), + ("scene_video_generating", ["storyboard_generating", "character_image_generating"], "分镜视频生成"), + ("scene_video_evaluating", ["scene_video_generating"], "分镜视频评估"), + ("scene_video_concatenating", ["scene_video_evaluating"], "分镜视频拼接"), + ("ktv_synthesizing", ["scene_video_concatenating", "subtitle_rendering", "music_polling"], "KTV合成"), + ], +} + +# Step states +STATE_PENDING = "pending" +STATE_RUNNING = "running" +STATE_COMPLETED = "completed" +STATE_FAILED = "failed" +STATE_SKIPPED = "skipped" + +# Pipeline states +PIPELINE_SUBMITTED = "submitted" +PIPELINE_RUNNING = "running" +PIPELINE_COMPLETED = "completed" +PIPELINE_FAILED = "failed" +PIPELINE_PAUSED = "paused" # waiting for user modification + + +def get_step_graph(mode: str) -> List[Tuple[str, List[str], str]]: + """Get step definitions for a mode. Returns [(name, deps, display_name), ...]""" + if mode not in MODE_STEPS: + raise ValueError(f"Unknown mode: {mode}. Available: {list(MODE_STEPS.keys())}") + return MODE_STEPS[mode] + + +def build_dependency_map(mode: str) -> Dict[str, dict]: + """Build a dependency map for a mode. + Returns: {step_name: {"deps": [...], "dependents": [...], "display_name": "...", "order": int}} + """ + steps = get_step_graph(mode) + dep_map = {} + for i, (name, deps, display) in enumerate(steps): + dep_map[name] = { + "deps": list(deps), + "dependents": [], + "display_name": display, + "order": i + 1, + } + # Build reverse mapping + for name, info in dep_map.items(): + for dep in info["deps"]: + if dep in dep_map: + dep_map[dep]["dependents"].append(name) + return dep_map + + +def get_cascade_rerun_steps(mode: str, from_step: str) -> List[str]: + """Get all steps that need to be rerun when a step is modified. + BFS from the modified step through dependents. + Returns ordered list of step names. + """ + dep_map = build_dependency_map(mode) + if from_step not in dep_map: + return [] + + visited = set() + queue = [from_step] + result = [] + + while queue: + current = queue.pop(0) + if current in visited: + continue + visited.add(current) + result.append(current) + for dep in dep_map.get(current, {}).get("dependents", []): + if dep not in visited: + queue.append(dep) + + # Sort by order + result.sort(key=lambda s: dep_map.get(s, {}).get("order", 999)) + return result + + +def get_rerun_from_next(mode: str, from_step: str) -> List[str]: + """When output is modified, rerun from the NEXT steps (dependents only, not the step itself).""" + dep_map = build_dependency_map(mode) + if from_step not in dep_map: + return [] + + direct_dependents = dep_map[from_step]["dependents"] + all_steps = set() + for d in direct_dependents: + all_steps.update(get_cascade_rerun_steps(mode, d)) + + result = list(all_steps) + result.sort(key=lambda s: dep_map.get(s, {}).get("order", 999)) + return result diff --git a/pipeline_service/storage.py b/pipeline_service/storage.py new file mode 100644 index 0000000..d30f0bf --- /dev/null +++ b/pipeline_service/storage.py @@ -0,0 +1,258 @@ +"""File-based storage for pipeline tasks, artifacts, and versions.""" + +import json +import os +import shutil +import uuid +from datetime import datetime +from typing import Any, Dict, List, Optional + +from .state import ( + STATE_PENDING, STATE_RUNNING, STATE_COMPLETED, STATE_FAILED, + PIPELINE_SUBMITTED, PIPELINE_RUNNING, + build_dependency_map, +) + +# Base data directory +DATA_DIR = os.environ.get("PIPELINE_DATA_DIR", os.path.expanduser("~/pipeline_data")) + + +def _ensure_dir(path: str): + os.makedirs(path, exist_ok=True) + + +def _pipeline_dir(pipeline_id: str) -> str: + return os.path.join(DATA_DIR, pipeline_id) + + +def _version_dir(pipeline_id: str, version: int) -> str: + return os.path.join(_pipeline_dir(pipeline_id), f"v{version}") + + +def _manifest_path(pipeline_id: str) -> str: + return os.path.join(_pipeline_dir(pipeline_id), "manifest.json") + + +def _read_json(path: str) -> dict: + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + +def _write_json(path: str, data: dict): + _ensure_dir(os.path.dirname(path)) + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + +def generate_pipeline_id() -> str: + return f"ktv_{uuid.uuid4().hex[:12]}" + + +def create_pipeline(user_id: str, mode: str, title: str, params: dict) -> dict: + """Create a new pipeline task. Returns the manifest.""" + pipeline_id = generate_pipeline_id() + dep_map = build_dependency_map(mode) + now = datetime.now().isoformat() + + # Build steps + steps = {} + for name, info in dep_map.items(): + steps[name] = { + "order": info["order"], + "display_name": info["display_name"], + "deps": info["deps"], + "dependents": info["dependents"], + "state": STATE_PENDING, + "version": 1, + "started_at": None, + "completed_at": None, + "error": None, + } + + manifest = { + "pipeline_id": pipeline_id, + "user_id": user_id, + "mode": mode, + "title": title, + "params": params, + "created_at": now, + "updated_at": now, + "current_version": 1, + "state": PIPELINE_SUBMITTED, + "steps": steps, + "versions": { + "1": { + "created_at": now, + "changes": "初始版本", + } + }, + } + + # Write to disk + pdir = _pipeline_dir(pipeline_id) + _ensure_dir(pdir) + _ensure_dir(_version_dir(pipeline_id, 1)) + _write_json(_manifest_path(pipeline_id), manifest) + + # Store initial params as artifact + save_artifact(pipeline_id, 1, "_params", "input", params) + + # Index by user + _add_to_user_index(user_id, pipeline_id) + + return manifest + + +def get_manifest(pipeline_id: str) -> Optional[dict]: + """Read pipeline manifest.""" + path = _manifest_path(pipeline_id) + if not os.path.exists(path): + return None + return _read_json(path) + + +def save_manifest(pipeline_id: str, manifest: dict): + """Save pipeline manifest.""" + manifest["updated_at"] = datetime.now().isoformat() + _write_json(_manifest_path(pipeline_id), manifest) + + +def save_artifact(pipeline_id: str, version: int, step: str, io_type: str, data: Any): + """Save artifact data for a step. + io_type: 'input' or 'output' + """ + vdir = _version_dir(pipeline_id, version) + _ensure_dir(vdir) + path = os.path.join(vdir, f"{step}.{io_type}.json") + _write_json(path, {"step": step, "version": version, "type": io_type, "data": data, + "saved_at": datetime.now().isoformat()}) + + +def get_artifact(pipeline_id: str, version: int, step: str, io_type: str) -> Optional[dict]: + """Read artifact data for a step.""" + vdir = _version_dir(pipeline_id, version) + path = os.path.join(vdir, f"{step}.{io_type}.json") + if not os.path.exists(path): + # Try previous versions + for v in range(version, 0, -1): + path = os.path.join(_version_dir(pipeline_id, v), f"{step}.{io_type}.json") + if os.path.exists(path): + return _read_json(path) + return None + return _read_json(path) + + +def get_all_artifacts(pipeline_id: str, version: int) -> Dict[str, dict]: + """Get all artifacts for a specific version.""" + vdir = _version_dir(pipeline_id, version) + if not os.path.exists(vdir): + return {} + artifacts = {} + for fname in os.listdir(vdir): + if fname.endswith(".json"): + fpath = os.path.join(vdir, fname) + try: + data = _read_json(fpath) + key = fname.replace(".json", "") + artifacts[key] = data + except Exception: + pass + return artifacts + + +def create_new_version(pipeline_id: str, changes: str) -> int: + """Create a new version directory. Returns new version number.""" + manifest = get_manifest(pipeline_id) + if not manifest: + raise ValueError(f"Pipeline not found: {pipeline_id}") + + new_version = manifest["current_version"] + 1 + manifest["current_version"] = new_version + manifest["versions"][str(new_version)] = { + "created_at": datetime.now().isoformat(), + "changes": changes, + } + + # Copy previous version artifacts to new version (hard links) + prev_vdir = _version_dir(pipeline_id, new_version - 1) + new_vdir = _version_dir(pipeline_id, new_version) + _ensure_dir(new_vdir) + + if os.path.exists(prev_vdir): + for fname in os.listdir(prev_vdir): + src = os.path.join(prev_vdir, fname) + dst = os.path.join(new_vdir, fname) + if os.path.isfile(src) and not os.path.exists(dst): + try: + os.link(src, dst) # hard link + except OSError: + shutil.copy2(src, dst) + + save_manifest(pipeline_id, manifest) + return new_version + + +def reset_steps(pipeline_id: str, step_names: List[str]): + """Reset specified steps to pending state.""" + manifest = get_manifest(pipeline_id) + if not manifest: + return + for name in step_names: + if name in manifest["steps"]: + manifest["steps"][name]["state"] = STATE_PENDING + manifest["steps"][name]["error"] = None + manifest["steps"][name]["started_at"] = None + manifest["steps"][name]["completed_at"] = None + save_manifest(pipeline_id, manifest) + + +def update_step_state(pipeline_id: str, step: str, state: str, error: str = None): + """Update a step's state.""" + manifest = get_manifest(pipeline_id) + if not manifest or step not in manifest["steps"]: + return + now = datetime.now().isoformat() + manifest["steps"][step]["state"] = state + if state == STATE_RUNNING: + manifest["steps"][step]["started_at"] = now + elif state in (STATE_COMPLETED, STATE_FAILED): + manifest["steps"][step]["completed_at"] = now + if error: + manifest["steps"][step]["error"] = error + + # Update pipeline state + all_states = [s["state"] for s in manifest["steps"].values()] + if all(s == STATE_COMPLETED for s in all_states): + manifest["state"] = "completed" + elif any(s == STATE_FAILED for s in all_states): + manifest["state"] = "failed" + elif any(s == STATE_RUNNING for s in all_states): + manifest["state"] = PIPELINE_RUNNING + + save_manifest(pipeline_id, manifest) + + +# === User Index === + +def _user_index_path(user_id: str) -> str: + return os.path.join(DATA_DIR, f"_user_{user_id}.json") + + +def _add_to_user_index(user_id: str, pipeline_id: str): + path = _user_index_path(user_id) + if os.path.exists(path): + data = _read_json(path) + else: + data = {"user_id": user_id, "pipelines": []} + if pipeline_id not in data["pipelines"]: + data["pipelines"].append(pipeline_id) + _write_json(path, data) + + +def get_user_pipelines(user_id: str) -> List[str]: + """Get all pipeline IDs for a user.""" + path = _user_index_path(user_id) + if not os.path.exists(path): + return [] + data = _read_json(path) + return data.get("pipelines", []) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..d74c297 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,16 @@ +[build-system] +requires = ["setuptools>=45", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "pipeline-service" +version = "1.0.0" +description = "Hermes Pipeline Backend - manages KTV/MV production pipeline tasks" +requires-python = ">=3.8" +dependencies = [ + "aiohttp>=3.8", +] + +[tool.setuptools.packages.find] +where = ["."] +include = ["pipeline_service*"] diff --git a/run.py b/run.py new file mode 100644 index 0000000..912b90d --- /dev/null +++ b/run.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 +"""Pipeline Service entry point.""" +from pipeline_service.server import main + +if __name__ == "__main__": + main()