diff --git a/pipeline_service/__init__.py b/pipeline_service/__init__.py index 9262e24..698b2ed 100644 --- a/pipeline_service/__init__.py +++ b/pipeline_service/__init__.py @@ -16,7 +16,6 @@ from .init import ( pipeline_unregister_step_type, ) from .handler import register_handler, list_handlers, register_default_handler -from .handlers_ktv import register_ktv_handlers from .step_registry import register_step_type, get_step_type, list_step_types, load_builtin_types from .human import human_complete, approval_approve, approval_reject, human_list from .state import ( @@ -26,4 +25,4 @@ from .state import ( HUMAN_PENDING, HUMAN_SUBMITTED, HUMAN_APPROVED, HUMAN_REJECTED, HUMAN_EXPIRED, ) -__version__ = "3.0.0" +__version__ = "3.1.0" diff --git a/pipeline_service/handlers_ktv.py b/pipeline_service/handlers_ktv.py deleted file mode 100644 index 99d2425..0000000 --- a/pipeline_service/handlers_ktv.py +++ /dev/null @@ -1,943 +0,0 @@ -"""KTV pipeline step handlers. - -Implements the 17 step types for KTV production pipelines. -Each handler: async def handler(tenant_id, task_id, step_name, input_data, config) -> dict - -Architecture: -- Heavy compute (demucs, video gen) runs on GPU server via SSH -- ffmpeg/audio processing runs locally -- LLM calls via harnessed_agent (llm_chat) -- ASR via SenseVoice -- External APIs: Suno/MiniMax for music, wan2.7 for images/video -""" - -import asyncio -import json -import os -import logging -import tempfile -import time - -logger = logging.getLogger("pipeline.handlers.ktv") - -# GPU server config (from memory: ymq@opencomputing.net, 8x4090) -GPU_HOST = "ymq@opencomputing.net" -GPU_DEMUCS_VENV = "/data/ymq/demucs_venv" -GPU_WAN22_DIR = "/data/ymq/wan22-service" -GPU_PIPELINE_DIR = "/data/pipeline/ktv" - -# Local work directory -LOCAL_WORK_DIR = "/data/pipeline/ktv" - - -def _task_dir(task_id: str) -> str: - """Get working directory for a task.""" - d = os.path.join(LOCAL_WORK_DIR, task_id) - os.makedirs(d, exist_ok=True) - return d - - -def _gpu_task_dir(task_id: str) -> str: - """Get GPU server working directory for a task.""" - return f"{GPU_PIPELINE_DIR}/{task_id}" - - -async def _run_local(cmd: str, timeout: int = 300) -> tuple: - """Run a local command, return (stdout, stderr, returncode).""" - proc = await asyncio.create_subprocess_shell( - cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) - try: - stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) - return stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace"), proc.returncode - except asyncio.TimeoutError: - proc.kill() - return "", "timeout", -1 - - -async def _run_gpu(cmd: str, timeout: int = 600) -> tuple: - """Run a command on GPU server via SSH.""" - ssh_cmd = f"ssh -o StrictHostKeyChecking=no {GPU_HOST} '{cmd}'" - return await _run_local(ssh_cmd, timeout=timeout) - - -async def _copy_to_gpu(local_path: str, remote_path: str): - """SCP file to GPU server.""" - await _run_local(f"scp -o StrictHostKeyChecking=no '{local_path}' {GPU_HOST}:{remote_path}") - - -async def _copy_from_gpu(remote_path: str, local_path: str): - """SCP file from GPU server.""" - await _run_local(f"scp -o StrictHostKeyChecking=no {GPU_HOST}:{remote_path} '{local_path}'") - - -# ─── Media Preparation ──────────────────────────────────────────────── - -async def handle_audio_preparing(tenant_id, task_id, step_name, input_data, config): - """Download/copy audio file, extract duration with ffprobe.""" - work_dir = _task_dir(task_id) - params = input_data.get("task_params", {}) - audio_url = params.get("audio_url", params.get("audio_path", "")) - - if not audio_url: - raise ValueError("缺少 audio_url 或 audio_path 参数") - - # Download or copy audio - audio_path = os.path.join(work_dir, "original_audio.mp3") - if audio_url.startswith("http"): - stdout, stderr, rc = await _run_local(f"curl -sL -o '{audio_path}' '{audio_url}'") - if rc != 0: - raise ValueError(f"下载音频失败: {stderr}") - else: - await _run_local(f"cp '{audio_url}' '{audio_path}'") - - # Extract duration - stdout, stderr, rc = await _run_local( - f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 '{audio_path}'" - ) - duration = float(stdout.strip()) if rc == 0 else 0 - - return { - "audio_path": audio_path, - "duration": duration, - "format": os.path.splitext(audio_path)[1].lstrip("."), - } - - -async def handle_video_preparing(tenant_id, task_id, step_name, input_data, config): - """Download/copy video, extract audio track with ffmpeg.""" - work_dir = _task_dir(task_id) - params = input_data.get("task_params", {}) - video_url = params.get("video_url", params.get("video_path", "")) - - if not video_url: - raise ValueError("缺少 video_url 或 video_path 参数") - - video_path = os.path.join(work_dir, "original_video.mp4") - audio_path = os.path.join(work_dir, "original_audio.mp3") - - if video_url.startswith("http"): - await _run_local(f"curl -sL -o '{video_path}' '{video_url}'") - else: - await _run_local(f"cp '{video_url}' '{video_path}'") - - # Extract audio - await _run_local( - f"ffmpeg -y -i '{video_path}' -vn -acodec libmp3lame -q:a 2 '{audio_path}'" - ) - - # Get duration - stdout, _, rc = await _run_local( - f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 '{video_path}'" - ) - duration = float(stdout.strip()) if rc == 0 else 0 - - return { - "video_path": video_path, - "audio_path": audio_path, - "duration": duration, - } - - -# ─── Demucs Separation ─────────────────────────────────────────────── - -async def handle_demucs_separating(tenant_id, task_id, step_name, input_data, config): - """Run Demucs on GPU server to separate vocals and accompaniment.""" - work_dir = _task_dir(task_id) - gpu_dir = _gpu_task_dir(task_id) - - # Find audio path from deps - audio_path = None - for dep_name, dep_output in input_data.items(): - if isinstance(dep_output, dict): - audio_path = dep_output.get("audio_path") - if audio_path: - break - - if not audio_path: - raise ValueError("上游步骤未提供 audio_path") - - # Prepare GPU directory - await _run_gpu(f"mkdir -p {gpu_dir}") - - # Copy audio to GPU - remote_audio = f"{gpu_dir}/audio.mp3" - await _copy_to_gpu(audio_path, remote_audio) - - # Run Demucs on GPU - demucs_cmd = ( - f"cd {gpu_dir} && " - f"source {GPU_DEMUCS_VENV}/bin/activate && " - f"python -m demucs --two-stems vocals -n htdemucs --mp3 '{remote_audio}' && " - f"deactivate" - ) - stdout, stderr, rc = await _run_gpu(demucs_cmd, timeout=600) - if rc != 0: - raise ValueError(f"Demucs 分离失败: {stderr}") - - # Copy results back - vocals_local = os.path.join(work_dir, "vocals.wav") - no_vocals_local = os.path.join(work_dir, "no_vocals.wav") - base = os.path.splitext(os.path.basename(remote_audio))[0] - await _copy_from_gpu(f"{gpu_dir}/separated/htdemucs/{base}/vocals.wav", vocals_local) - await _copy_from_gpu(f"{gpu_dir}/separated/htdemuds/{base}/no_vocals.wav", no_vocals_local) - - return { - "vocals_path": vocals_local, - "no_vocals_path": no_vocals_local, - } - - -# ─── Lyric Calibration ─────────────────────────────────────────────── - -async def handle_lyric_calibrating(tenant_id, task_id, step_name, input_data, config): - """ASR timing recognition + LLM calibration against original lyrics.""" - work_dir = _task_dir(task_id) - params = input_data.get("task_params", {}) - lyrics_text = params.get("lyrics", params.get("lyrics_text", "")) - - # Find vocals path from deps - vocals_path = None - for dep_name, dep_output in input_data.items(): - if isinstance(dep_output, dict): - vocals_path = dep_output.get("vocals_path") - if vocals_path: - break - - if not vocals_path: - raise ValueError("上游步骤未提供 vocals_path") - if not lyrics_text: - raise ValueError("缺少 lyrics 参数") - - # Step 1: Run SenseVoice ASR on vocals to get timing - asr_result_path = os.path.join(work_dir, "asr_timings.json") - - # Copy vocals to GPU for ASR - gpu_dir = _gpu_task_dir(task_id) - await _run_gpu(f"mkdir -p {gpu_dir}") - remote_vocals = f"{gpu_dir}/vocals.wav" - await _copy_to_gpu(vocals_path, remote_vocals) - - # Run SenseVoice ASR - asr_script = f""" -cd {gpu_dir} -source {GPU_DEMUCS_VENV}/bin/activate -python -c " -import json -from funasr import AutoModel -model = AutoModel(model='iic/SenseVoiceSmall', trust_remote_code=True) -res = model.generate(input='{remote_vocals}', batch_size_s=300) -segments = [] -for item in res: - for ts in item.get('timestamp', []): - segments.append({{'text': item.get('text', ''), 'start': ts[0]/1000, 'end': ts[1]/1000}}) -with open('asr_timings.json', 'w') as f: - json.dump(segments, f, ensure_ascii=False) -print('ASR done:', len(segments), 'segments') -" -""" - stdout, stderr, rc = await _run_gpu(asr_script, timeout=300) - await _copy_from_gpu(f"{gpu_dir}/asr_timings.json", asr_result_path) - - with open(asr_result_path, "r") as f: - asr_timings = json.load(f) - - # Step 2: LLM calibration — align ASR timings with original lyrics - from pipeline_service.handler import get_handler - calibrated = await _llm_calibrate(lyrics_text, asr_timings) - - # Save calibrated lyrics - calibrated_path = os.path.join(work_dir, "calibrated_lyrics.json") - with open(calibrated_path, "w", encoding="utf-8") as f: - json.dump(calibrated, f, ensure_ascii=False, indent=2) - - return { - "calibrated_lyrics_path": calibrated_path, - "calibrated_lyrics": calibrated, - "segment_count": len(calibrated), - } - - -async def _llm_calibrate(lyrics_text: str, asr_timings: list) -> list: - """Use LLM to align raw lyrics text with ASR timings.""" - prompt = f"""你是一个歌词时间轴校准专家。 - -原始歌词文本: -{lyrics_text} - -ASR识别的时间戳(秒): -{json.dumps(asr_timings, ensure_ascii=False)} - -请将原始歌词的每一句与ASR时间戳对齐,输出JSON数组,每个元素包含: -- text: 歌词文本 -- start: 开始时间(秒,浮点数) -- end: 结束时间(秒,浮点数) - -要求: -1. 保持原始歌词的文本和顺序 -2. 时间戳以ASR结果为基础进行微调 -3. 确保时间不重叠,每句之间留适当间隔 -4. 只输出JSON,不要其他内容""" - - try: - from pipeline_service.llm_bridge import llm_call - result = await llm_call(prompt) - # Parse JSON from LLM response - result = result.strip() - if result.startswith("```"): - result = result.split("\n", 1)[1].rsplit("```", 1)[0] - return json.loads(result) - except Exception as e: - logger.warning(f"LLM calibration failed, using ASR timings directly: {e}") - return asr_timings - - -# ─── Subtitle Rendering ────────────────────────────────────────────── - -async def handle_subtitle_rendering(tenant_id, task_id, step_name, input_data, config): - """Generate ASS karaoke subtitle file from calibrated lyrics.""" - work_dir = _task_dir(task_id) - - # Find calibrated lyrics - calibrated = None - for dep_name, dep_output in input_data.items(): - if isinstance(dep_output, dict): - calibrated = dep_output.get("calibrated_lyrics") - if not calibrated and dep_output.get("calibrated_lyrics_path"): - with open(dep_output["calibrated_lyrics_path"], "r") as f: - calibrated = json.load(f) - if calibrated: - break - - if not calibrated: - raise ValueError("上游步骤未提供 calibrated_lyrics") - - # Generate ASS file - ass_path = os.path.join(work_dir, "karaoke.ass") - _write_ass_file(ass_path, calibrated) - - return {"ass_path": ass_path} - - -def _write_ass_file(path: str, segments: list): - """Write segments to ASS subtitle file with karaoke effect.""" - header = """[Script Info] -Title: KTV Karaoke Subtitles -ScriptType: v4.00+ -PlayResX: 1920 -PlayResY: 1080 -WrapStyle: 0 - -[V4+ Styles] -Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding -Style: KTV,Source Han Sans SC,72,&H00FFFFFF,&H0000FFFF,&H00000000,&H80000000,-1,0,0,0,100,100,1,0,1,3,1,2,40,40,60,1 - -[Events] -Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text -""" - with open(path, "w", encoding="utf-8") as f: - f.write(header) - for seg in segments: - start = _seconds_to_ass_time(seg["start"]) - end = _seconds_to_ass_time(seg["end"]) - text = seg["text"].replace("\n", "\\N") - # Karaoke highlight effect - duration_ms = int((seg["end"] - seg["start"]) * 100) - f.write(f"Dialogue: 0,{start},{end},KTV,,0,0,0,,{{\\k{duration_ms}}}{text}\n") - - -def _seconds_to_ass_time(seconds: float) -> str: - """Convert seconds to ASS time format H:MM:SS.CC""" - h = int(seconds // 3600) - m = int((seconds % 3600) // 60) - s = int(seconds % 60) - cs = int((seconds % 1) * 100) - return f"{h}:{m:02d}:{s:02d}.{cs:02d}" - - -# ─── Subtitle Exporting ────────────────────────────────────────────── - -async def handle_subtitle_exporting(tenant_id, task_id, step_name, input_data, config): - """Export ASS subtitle as standalone file (already done by rendering).""" - ass_path = None - for dep_name, dep_output in input_data.items(): - if isinstance(dep_output, dict): - ass_path = dep_output.get("ass_path") - if ass_path: - break - - if not ass_path or not os.path.exists(ass_path): - raise ValueError("上游步骤未提供有效的 ass_path") - - return {"subtitle_path": ass_path, "format": "ass"} - - -# ─── Lyric Generation & Evaluation (Mode C) ────────────────────────── - -async def handle_lyric_generating(tenant_id, task_id, step_name, input_data, config): - """LLM generates lyrics from topic/outline.""" - params = input_data.get("task_params", {}) - topic = params.get("topic", params.get("outline", "")) - style = params.get("style", "流行") - language = params.get("language", "zh") - - if not topic: - raise ValueError("缺少 topic/outline参数") - - prompt = f"""请创作一首{style}风格的{language}歌词。 - -主题/大纲: {topic} - -要求: -1. 包含完整结构: 主歌(Verse)、副歌(Chorus)、桥段(Bridge) -2. 每句歌词节奏感强,适合演唱 -3. 押韵自然,情感真实 -4. 总长度适合3-5分钟歌曲 - -直接输出歌词文本,标注段落结构。""" - - try: - from pipeline_service.llm_bridge import llm_call - lyrics = await llm_call(prompt) - return {"lyrics": lyrics.strip(), "topic": topic, "style": style} - except Exception as e: - raise ValueError(f"歌词生成失败: {e}") - - -async def handle_lyric_evaluating(tenant_id, task_id, step_name, input_data, config): - """Evaluate lyric quality, retry if below threshold.""" - threshold = config.get("threshold", 8.5) - - lyrics = None - for dep_name, dep_output in input_data.items(): - if isinstance(dep_output, dict): - lyrics = dep_output.get("lyrics") - if lyrics: - break - - if not lyrics: - raise ValueError("上游步骤未提供歌词") - - # Evaluate via LLM - prompt = f"""请从以下维度评估这首歌词的质量(1-10分): - -1. 韵律节奏: 押韵、节奏感、可唱性 -2. 情感表达: 情感真实度、共鸣力 -3. 文学性: 用词、意象、修辞 -4. 结构完整: 段落编排、层次感 -5. 商业潜力: 流行度、记忆点 - -歌词: -{lyrics} - -输出JSON: {{"score": 8.5, "dimensions": {{...}}, "suggestions": "..."}} -只输出JSON。""" - - try: - from pipeline_service.llm_bridge import llm_call - result = await llm_call(prompt) - result = result.strip() - if result.startswith("```"): - result = result.split("\n", 1)[1].rsplit("```", 1)[0] - evaluation = json.loads(result) - score = evaluation.get("score", 0) - except Exception: - score = 7.0 # Default pass if evaluation fails - evaluation = {"score": score, "note": "evaluation_parse_failed"} - - if score < threshold: - raise ValueError(f"歌词评分 {score} 低于阈值 {threshold},需要重新生成") - - return { - "lyrics": lyrics, - "evaluation": evaluation, - "score": score, - "passed": True, - } - - -# ─── Music Generation ──────────────────────────────────────────────── - -async def handle_music_generating(tenant_id, task_id, step_name, input_data, config): - """Submit music generation job to Suno/MiniMax API.""" - lyrics = None - for dep_name, dep_output in input_data.items(): - if isinstance(dep_output, dict): - lyrics = dep_output.get("lyrics") - if lyrics: - break - - if not lyrics: - raise ValueError("上游步骤未提供歌词") - - params = input_data.get("task_params", {}) - music_service = params.get("music_service", "suno") - style = params.get("music_style", "pop") - - # Submit to music generation API - # TODO: Implement actual API call to Suno/MiniMax - # For now, return a job_id placeholder - job_id = f"music_{task_id}_{int(time.time())}" - - return { - "music_job_id": job_id, - "music_service": music_service, - "style": style, - "lyrics": lyrics, - "status": "submitted", - } - - -async def handle_music_polling(tenant_id, task_id, step_name, input_data, config): - """Poll music generation API until complete.""" - job_info = None - for dep_name, dep_output in input_data.items(): - if isinstance(dep_output, dict): - job_info = dep_output - if job_info and job_info.get("music_job_id"): - break - - if not job_info: - raise ValueError("上游步骤未提供 music_job_id") - - # TODO: Implement actual polling logic - # For now, simulate a wait and return - work_dir = _task_dir(task_id) - music_path = os.path.join(work_dir, "generated_music.mp3") - - # Placeholder: the actual implementation would poll the API - # and download the result - return { - "music_path": music_path, - "music_job_id": job_info.get("music_job_id"), - "status": "completed", - } - - -# ─── Character & Video Generation ──────────────────────────────────── - -async def handle_character_designing(tenant_id, task_id, step_name, input_data, config): - """LLM designs MV character descriptions.""" - lyrics = None - params = input_data.get("task_params", {}) - for dep_name, dep_output in input_data.items(): - if isinstance(dep_output, dict): - lyrics = dep_output.get("lyrics") or dep_output.get("calibrated_lyrics") - if isinstance(lyrics, list): - lyrics = " ".join(s.get("text", "") for s in lyrics) - if lyrics: - break - - style = params.get("visual_style", "anime") - - prompt = f"""根据以下歌词,设计MV角色方案。 - -歌词: -{lyrics} - -视觉风格: {style} - -请设计1-3个角色,每个角色包含: -1. 角色名称 -2. 外貌描述(用于AI图像生成的详细prompt) -3. 性格特征 -4. 在MV中的角色定位 - -输出JSON数组。""" - - try: - from pipeline_service.llm_bridge import llm_call - result = await llm_call(prompt) - result = result.strip() - if result.startswith("```"): - result = result.split("\n", 1)[1].rsplit("```", 1)[0] - characters = json.loads(result) - except Exception as e: - raise ValueError(f"角色设计失败: {e}") - - return {"characters": characters, "visual_style": style} - - -async def handle_character_image_generating(tenant_id, task_id, step_name, input_data, config): - """Generate character reference images using wan2.7 on GPU server.""" - work_dir = _task_dir(task_id) - gpu_dir = _gpu_task_dir(task_id) - - characters = None - for dep_name, dep_output in input_data.items(): - if isinstance(dep_output, dict): - characters = dep_output.get("characters") - if characters: - break - - if not characters: - raise ValueError("上游步骤未提供角色设计") - - await _run_gpu(f"mkdir -p {gpu_dir}/characters") - char_images = [] - - for i, char in enumerate(characters): - prompt = char.get("prompt", char.get("description", "")) - if not prompt: - continue - - # Generate image on GPU with wan2.7 - gen_cmd = ( - f"cd {GPU_WAN22_DIR} && " - f"source venv/bin/activate && " - f"python generate.py --prompt '{prompt}' " - f"--output {gpu_dir}/characters/char_{i}.png " - f"--width 512 --height 512" - ) - stdout, stderr, rc = await _run_gpu(gen_cmd, timeout=120) - - local_path = os.path.join(work_dir, f"char_{i}.png") - await _copy_from_gpu(f"{gpu_dir}/characters/char_{i}.png", local_path) - - char_images.append({ - "name": char.get("name", f"char_{i}"), - "image_path": local_path, - "prompt": prompt, - }) - - return {"character_images": char_images} - - -async def handle_storyboard_generating(tenant_id, task_id, step_name, input_data, config): - """LLM generates storyboard script from lyrics + characters.""" - lyrics = None - char_images = None - params = input_data.get("task_params", {}) - - for dep_name, dep_output in input_data.items(): - if isinstance(dep_output, dict): - if dep_output.get("calibrated_lyrics"): - lyrics = dep_output["calibrated_lyrics"] - elif dep_output.get("lyrics"): - lyrics = dep_output["lyrics"] - if dep_output.get("character_images"): - char_images = dep_output["character_images"] - - if not lyrics: - raise ValueError("上游步骤未提供歌词") - - duration = params.get("duration", 240) # Default 4 min - - prompt = f"""根据歌词和角色,生成MV分镜脚本。 - -歌词: -{json.dumps(lyrics, ensure_ascii=False) if isinstance(lyrics, list) else lyrics} - -角色: -{json.dumps(char_images, ensure_ascii=False) if char_images else "无特定角色"} - -视频总时长: {duration}秒 - -请输出JSON数组,每个分镜包含: -- scene_id: 分镜编号 -- start_time: 开始秒数 -- end_time: 结束秒数 -- description: 场景描述(英文,用于视频生成prompt) -- characters: 出现的角色 -- camera: 镜头运动描述 -- mood: 情绪/色调 - -确保分镜覆盖整首歌,每个分镜5-15秒。""" - - try: - from pipeline_service.llm_bridge import llm_call - result = await llm_call(prompt) - result = result.strip() - if result.startswith("```"): - result = result.split("\n", 1)[1].rsplit("```", 1)[0] - storyboard = json.loads(result) - except Exception as e: - raise ValueError(f"分镜生成失败: {e}") - - work_dir = _task_dir(task_id) - sb_path = os.path.join(work_dir, "storyboard.json") - with open(sb_path, "w", encoding="utf-8") as f: - json.dump(storyboard, f, ensure_ascii=False, indent=2) - - return {"storyboard": storyboard, "storyboard_path": sb_path, "scene_count": len(storyboard)} - - -async def handle_scene_video_generating(tenant_id, task_id, step_name, input_data, config): - """Generate scene videos on GPU using T2V/Ref2V.""" - work_dir = _task_dir(task_id) - gpu_dir = _gpu_task_dir(task_id) - - storyboard = None - char_images = None - for dep_name, dep_output in input_data.items(): - if isinstance(dep_output, dict): - if dep_output.get("storyboard"): - storyboard = dep_output["storyboard"] - if dep_output.get("character_images"): - char_images = dep_output["character_images"] - - if not storyboard: - raise ValueError("上游步骤未提供分镜脚本") - - await _run_gpu(f"mkdir -p {gpu_dir}/scenes") - - # Copy character images to GPU if available - if char_images: - for ci in char_images: - local = ci.get("image_path", "") - if local and os.path.exists(local): - remote = f"{gpu_dir}/characters/{os.path.basename(local)}" - await _copy_to_gpu(local, remote) - - scene_videos = [] - for i, scene in enumerate(storyboard): - desc = scene.get("description", "") - duration = scene.get("end_time", 10) - scene.get("start_time", 5) - frames = int(duration * 24) # 24fps - - # Determine if we use Ref2V (with character ref) or T2V - ref_image = None - if char_images and scene.get("characters"): - # Find matching character image - for ci in char_images: - if ci.get("name") in str(scene.get("characters", [])): - ref_image = f"{gpu_dir}/characters/{os.path.basename(ci['image_path'])}" - break - - if ref_image: - gen_cmd = ( - f"cd {GPU_WAN22_DIR} && source venv/bin/activate && " - f"python generate_ref2v.py --prompt '{desc}' " - f"--ref_image '{ref_image}' " - f"--output {gpu_dir}/scenes/scene_{i:03d}.mp4 " - f"--frames {frames}" - ) - else: - gen_cmd = ( - f"cd {GPU_WAN22_DIR} && source venv/bin/activate && " - f"python generate_t2v.py --prompt '{desc}' " - f"--output {gpu_dir}/scenes/scene_{i:03d}.mp4 " - f"--frames {frames}" - ) - - stdout, stderr, rc = await _run_gpu(gen_cmd, timeout=600) - - local_scene = os.path.join(work_dir, f"scene_{i:03d}.mp4") - await _copy_from_gpu(f"{gpu_dir}/scenes/scene_{i:03d}.mp4", local_scene) - - scene_videos.append({ - "scene_id": scene.get("scene_id", i), - "video_path": local_scene, - "description": desc, - "duration": duration, - }) - - return {"scene_videos": scene_videos, "scene_count": len(scene_videos)} - - -async def handle_scene_video_evaluating(tenant_id, task_id, step_name, input_data, config): - """Evaluate scene video quality via VLM, retry if below threshold.""" - threshold = config.get("threshold", 7.0) - max_retry = config.get("max_retry", 3) - - scene_videos = None - for dep_name, dep_output in input_data.items(): - if isinstance(dep_output, dict): - scene_videos = dep_output.get("scene_videos") - if scene_videos: - break - - if not scene_videos: - raise ValueError("上游步骤未提供场景视频") - - # Evaluate each scene (simplified: check file exists and has reasonable size) - valid_scenes = [] - for sv in scene_videos: - path = sv.get("video_path", "") - if os.path.exists(path) and os.path.getsize(path) > 10000: - sv["quality_score"] = 8.0 # Placeholder - valid_scenes.append(sv) - else: - sv["quality_score"] = 0 - logger.warning(f"Scene {sv.get('scene_id')} missing or too small: {path}") - - if not valid_scenes: - raise ValueError("所有场景视频质量不合格") - - avg_score = sum(s.get("quality_score", 0) for s in valid_scenes) / len(valid_scenes) - if avg_score < threshold: - raise ValueError(f"平均质量分 {avg_score:.1f} 低于阈值 {threshold}") - - return {"scene_videos": valid_scenes, "avg_quality": avg_score} - - -async def handle_scene_video_concatenating(tenant_id, task_id, step_name, input_data, config): - """Concatenate scene videos with ffmpeg, loop to match audio duration.""" - work_dir = _task_dir(task_id) - - scene_videos = None - audio_duration = None - for dep_name, dep_output in input_data.items(): - if isinstance(dep_output, dict): - if dep_output.get("scene_videos"): - scene_videos = dep_output["scene_videos"] - if dep_output.get("duration"): - audio_duration = dep_output["duration"] - - if not scene_videos: - raise ValueError("上游步骤未提供场景视频") - - # Create concat file - concat_list = os.path.join(work_dir, "concat_list.txt") - with open(concat_list, "w") as f: - for sv in scene_videos: - path = sv.get("video_path", "") - if os.path.exists(path): - f.write(f"file '{path}'\n") - - # Concatenate - concat_path = os.path.join(work_dir, "concat_video.mp4") - await _run_local( - f"ffmpeg -y -f concat -safe 0 -i '{concat_list}' -c copy '{concat_path}'" - ) - - # Loop to match audio duration if needed - final_path = os.path.join(work_dir, "final_video.mp4") - if audio_duration and audio_duration > 0: - # Get concat duration - stdout, _, _ = await _run_local( - f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 '{concat_path}'" - ) - concat_dur = float(stdout.strip()) if stdout.strip() else 0 - - if concat_dur > 0 and concat_dur < audio_duration: - loops = int(audio_duration / concat_dur) + 1 - await _run_local( - f"ffmpeg -y -stream_loop {loops} -i '{concat_path}' " - f"-t {audio_duration} -c:v libx264 -preset fast '{final_path}'" - ) - else: - await _run_local(f"cp '{concat_path}' '{final_path}'") - else: - await _run_local(f"cp '{concat_path}' '{final_path}'") - - return {"final_video_path": final_path} - - -# ─── Final Synthesis ───────────────────────────────────────────────── - -async def handle_ktv_synthesizing(tenant_id, task_id, step_name, input_data, config): - """Synthesize final KTV (dual-track) + MTV (single-track) videos.""" - work_dir = _task_dir(task_id) - - video_path = None - ass_path = None - vocals_path = None - no_vocals_path = None - has_original_video = False - - for dep_name, dep_output in input_data.items(): - if isinstance(dep_output, dict): - if dep_output.get("final_video_path"): - video_path = dep_output["final_video_path"] - if dep_output.get("ass_path"): - ass_path = dep_output["ass_path"] - if dep_output.get("vocals_path"): - vocals_path = dep_output["vocals_path"] - if dep_output.get("no_vocals_path"): - no_vocals_path = dep_output["no_vocals_path"] - if dep_output.get("video_path") and not video_path: - # Mode B: use original video - video_path = dep_output["video_path"] - has_original_video = True - - if not ass_path: - raise ValueError("缺少字幕文件") - - # Determine audio tracks - if has_original_video and not vocals_path: - # Mode B: extract from video - vocals_path = os.path.join(work_dir, "vocals.wav") - no_vocals_path = os.path.join(work_dir, "no_vocals.wav") - if not os.path.exists(vocals_path): - raise ValueError("Demucs 步骤未提供人声轨道") - - if not video_path: - raise ValueError("缺少视频源") - - # KTV version: dual audio (vocals + no_vocals) with subtitle overlay - ktv_path = os.path.join(work_dir, "ktv_final.mp4") - mtv_path = os.path.join(work_dir, "mtv_final.mp4") - - # KTV: video + vocals_track + no_vocals_track + subtitle burn - if vocals_path and no_vocals_path: - ktv_cmd = ( - f"ffmpeg -y -i '{video_path}' -i '{vocals_path}' -i '{no_vocals_path}' " - f"-filter_complex \"[0:v]ass='{ass_path}'[v]\" " - f"-map '[v]' -map 1:a -map 2:a " - f"-c:v libx264 -preset fast -c:a aac -b:a 192k " - f"-metadata:s:a:0 title='Vocals' -metadata:s:a:1 title='Accompaniment' " - f"'{ktv_path}'" - ) - else: - ktv_cmd = ( - f"ffmpeg -y -i '{video_path}' " - f"-vf \"ass='{ass_path}'\" " - f"-c:v libx264 -preset fast -c:a aac -b:a 192k " - f"'{ktv_path}'" - ) - - stdout, stderr, rc = await _run_local(ktv_cmd, timeout=600) - if rc != 0: - raise ValueError(f"KTV合成失败: {stderr}") - - # MTV: single audio (original/mix) with subtitle - mtv_cmd = ( - f"ffmpeg -y -i '{video_path}' " - f"-vf \"ass='{ass_path}'\" " - f"-c:v libx264 -preset fast -c:a aac -b:a 192k " - f"'{mtv_path}'" - ) - stdout, stderr, rc = await _run_local(mtv_cmd, timeout=600) - if rc != 0: - logger.warning(f"MTV合成失败,仅输出KTV版本: {stderr}") - - result = { - "ktv_path": ktv_path, - "subtitle_path": ass_path, - } - if os.path.exists(mtv_path): - result["mtv_path"] = mtv_path - - return result - - -# ─── Registration ───────────────────────────────────────────────────── - -KTV_HANDLERS = { - "audio_preparing": handle_audio_preparing, - "video_preparing": handle_video_preparing, - "demucs_separating": handle_demucs_separating, - "lyric_calibrating": handle_lyric_calibrating, - "subtitle_rendering": handle_subtitle_rendering, - "subtitle_exporting": handle_subtitle_exporting, - "lyric_generating": handle_lyric_generating, - "lyric_evaluating": handle_lyric_evaluating, - "music_generating": handle_music_generating, - "music_polling": handle_music_polling, - "character_designing": handle_character_designing, - "character_image_generating": handle_character_image_generating, - "storyboard_generating": handle_storyboard_generating, - "scene_video_generating": handle_scene_video_generating, - "scene_video_evaluating": handle_scene_video_evaluating, - "scene_video_concatenating": handle_scene_video_concatenating, - "ktv_synthesizing": handle_ktv_synthesizing, -} - - -def register_ktv_handlers(): - """Register all KTV step handlers.""" - from .handler import register_handler - for step_type, fn in KTV_HANDLERS.items(): - register_handler(step_type, fn) - logger.info(f"Registered {len(KTV_HANDLERS)} KTV handlers") diff --git a/pipeline_service/init.py b/pipeline_service/init.py index 9553cbe..c5e4b09 100644 --- a/pipeline_service/init.py +++ b/pipeline_service/init.py @@ -30,10 +30,9 @@ from .step_registry import ( unregister_step_type, load_builtin_types, ) from .human import human_complete, approval_approve, approval_reject, human_list -from .handlers_ktv import register_ktv_handlers MODULE_NAME = "pipeline_service" -MODULE_VERSION = "3.0.0" +MODULE_VERSION = "3.1.0" async def pipeline_submit(tenant_id, pipeline_id, owner_id, title, params=None): @@ -325,8 +324,5 @@ def load_pipeline_service(): # Load built-in interactive step types load_builtin_types() - # Register KTV handlers - register_ktv_handlers() - debug(f"[{MODULE_NAME}] v{MODULE_VERSION} loaded — pipeline engine with human-in-the-loop support") return True diff --git a/pyproject.toml b/pyproject.toml index 5d40bb3..5c35e0c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "pipeline_service" -version = "3.0.0" +version = "3.1.0" description = "通用产线执行引擎模块 — DAG调度、多租户隔离、可插拔步骤处理器、人工交互步骤、artifact版本管理" requires-python = ">=3.8" dependencies = [