"""KTV pipeline step handlers. Implements the 17 step types for KTV production pipelines. Each handler: async def handler(tenant_id, task_id, step_name, input_data, config) -> dict Architecture: - Heavy compute (demucs, video gen) runs on GPU server via SSH - ffmpeg/audio processing runs locally - LLM calls via harnessed_agent (llm_chat) - ASR via SenseVoice - External APIs: Suno/MiniMax for music, wan2.7 for images/video """ import asyncio import json import os import logging import tempfile import time logger = logging.getLogger("pipeline.handlers.ktv") # GPU server config (from memory: ymq@opencomputing.net, 8x4090) GPU_HOST = "ymq@opencomputing.net" GPU_DEMUCS_VENV = "/data/ymq/demucs_venv" GPU_WAN22_DIR = "/data/ymq/wan22-service" GPU_PIPELINE_DIR = "/data/pipeline/ktv" # Local work directory LOCAL_WORK_DIR = "/data/pipeline/ktv" def _task_dir(task_id: str) -> str: """Get working directory for a task.""" d = os.path.join(LOCAL_WORK_DIR, task_id) os.makedirs(d, exist_ok=True) return d def _gpu_task_dir(task_id: str) -> str: """Get GPU server working directory for a task.""" return f"{GPU_PIPELINE_DIR}/{task_id}" async def _run_local(cmd: str, timeout: int = 300) -> tuple: """Run a local command, return (stdout, stderr, returncode).""" proc = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) return stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace"), proc.returncode except asyncio.TimeoutError: proc.kill() return "", "timeout", -1 async def _run_gpu(cmd: str, timeout: int = 600) -> tuple: """Run a command on GPU server via SSH.""" ssh_cmd = f"ssh -o StrictHostKeyChecking=no {GPU_HOST} '{cmd}'" return await _run_local(ssh_cmd, timeout=timeout) async def _copy_to_gpu(local_path: str, remote_path: str): """SCP file to GPU server.""" await _run_local(f"scp -o StrictHostKeyChecking=no '{local_path}' {GPU_HOST}:{remote_path}") async def _copy_from_gpu(remote_path: str, local_path: str): """SCP file from GPU server.""" await _run_local(f"scp -o StrictHostKeyChecking=no {GPU_HOST}:{remote_path} '{local_path}'") # ─── Media Preparation ──────────────────────────────────────────────── async def handle_audio_preparing(tenant_id, task_id, step_name, input_data, config): """Download/copy audio file, extract duration with ffprobe.""" work_dir = _task_dir(task_id) params = input_data.get("task_params", {}) audio_url = params.get("audio_url", params.get("audio_path", "")) if not audio_url: raise ValueError("缺少 audio_url 或 audio_path 参数") # Download or copy audio audio_path = os.path.join(work_dir, "original_audio.mp3") if audio_url.startswith("http"): stdout, stderr, rc = await _run_local(f"curl -sL -o '{audio_path}' '{audio_url}'") if rc != 0: raise ValueError(f"下载音频失败: {stderr}") else: await _run_local(f"cp '{audio_url}' '{audio_path}'") # Extract duration stdout, stderr, rc = await _run_local( f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 '{audio_path}'" ) duration = float(stdout.strip()) if rc == 0 else 0 return { "audio_path": audio_path, "duration": duration, "format": os.path.splitext(audio_path)[1].lstrip("."), } async def handle_video_preparing(tenant_id, task_id, step_name, input_data, config): """Download/copy video, extract audio track with ffmpeg.""" work_dir = _task_dir(task_id) params = input_data.get("task_params", {}) video_url = params.get("video_url", params.get("video_path", "")) if not video_url: raise ValueError("缺少 video_url 或 video_path 参数") video_path = os.path.join(work_dir, "original_video.mp4") audio_path = os.path.join(work_dir, "original_audio.mp3") if video_url.startswith("http"): await _run_local(f"curl -sL -o '{video_path}' '{video_url}'") else: await _run_local(f"cp '{video_url}' '{video_path}'") # Extract audio await _run_local( f"ffmpeg -y -i '{video_path}' -vn -acodec libmp3lame -q:a 2 '{audio_path}'" ) # Get duration stdout, _, rc = await _run_local( f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 '{video_path}'" ) duration = float(stdout.strip()) if rc == 0 else 0 return { "video_path": video_path, "audio_path": audio_path, "duration": duration, } # ─── Demucs Separation ─────────────────────────────────────────────── async def handle_demucs_separating(tenant_id, task_id, step_name, input_data, config): """Run Demucs on GPU server to separate vocals and accompaniment.""" work_dir = _task_dir(task_id) gpu_dir = _gpu_task_dir(task_id) # Find audio path from deps audio_path = None for dep_name, dep_output in input_data.items(): if isinstance(dep_output, dict): audio_path = dep_output.get("audio_path") if audio_path: break if not audio_path: raise ValueError("上游步骤未提供 audio_path") # Prepare GPU directory await _run_gpu(f"mkdir -p {gpu_dir}") # Copy audio to GPU remote_audio = f"{gpu_dir}/audio.mp3" await _copy_to_gpu(audio_path, remote_audio) # Run Demucs on GPU demucs_cmd = ( f"cd {gpu_dir} && " f"source {GPU_DEMUCS_VENV}/bin/activate && " f"python -m demucs --two-stems vocals -n htdemucs --mp3 '{remote_audio}' && " f"deactivate" ) stdout, stderr, rc = await _run_gpu(demucs_cmd, timeout=600) if rc != 0: raise ValueError(f"Demucs 分离失败: {stderr}") # Copy results back vocals_local = os.path.join(work_dir, "vocals.wav") no_vocals_local = os.path.join(work_dir, "no_vocals.wav") base = os.path.splitext(os.path.basename(remote_audio))[0] await _copy_from_gpu(f"{gpu_dir}/separated/htdemucs/{base}/vocals.wav", vocals_local) await _copy_from_gpu(f"{gpu_dir}/separated/htdemuds/{base}/no_vocals.wav", no_vocals_local) return { "vocals_path": vocals_local, "no_vocals_path": no_vocals_local, } # ─── Lyric Calibration ─────────────────────────────────────────────── async def handle_lyric_calibrating(tenant_id, task_id, step_name, input_data, config): """ASR timing recognition + LLM calibration against original lyrics.""" work_dir = _task_dir(task_id) params = input_data.get("task_params", {}) lyrics_text = params.get("lyrics", params.get("lyrics_text", "")) # Find vocals path from deps vocals_path = None for dep_name, dep_output in input_data.items(): if isinstance(dep_output, dict): vocals_path = dep_output.get("vocals_path") if vocals_path: break if not vocals_path: raise ValueError("上游步骤未提供 vocals_path") if not lyrics_text: raise ValueError("缺少 lyrics 参数") # Step 1: Run SenseVoice ASR on vocals to get timing asr_result_path = os.path.join(work_dir, "asr_timings.json") # Copy vocals to GPU for ASR gpu_dir = _gpu_task_dir(task_id) await _run_gpu(f"mkdir -p {gpu_dir}") remote_vocals = f"{gpu_dir}/vocals.wav" await _copy_to_gpu(vocals_path, remote_vocals) # Run SenseVoice ASR asr_script = f""" cd {gpu_dir} source {GPU_DEMUCS_VENV}/bin/activate python -c " import json from funasr import AutoModel model = AutoModel(model='iic/SenseVoiceSmall', trust_remote_code=True) res = model.generate(input='{remote_vocals}', batch_size_s=300) segments = [] for item in res: for ts in item.get('timestamp', []): segments.append({{'text': item.get('text', ''), 'start': ts[0]/1000, 'end': ts[1]/1000}}) with open('asr_timings.json', 'w') as f: json.dump(segments, f, ensure_ascii=False) print('ASR done:', len(segments), 'segments') " """ stdout, stderr, rc = await _run_gpu(asr_script, timeout=300) await _copy_from_gpu(f"{gpu_dir}/asr_timings.json", asr_result_path) with open(asr_result_path, "r") as f: asr_timings = json.load(f) # Step 2: LLM calibration — align ASR timings with original lyrics from pipeline_service.handler import get_handler calibrated = await _llm_calibrate(lyrics_text, asr_timings) # Save calibrated lyrics calibrated_path = os.path.join(work_dir, "calibrated_lyrics.json") with open(calibrated_path, "w", encoding="utf-8") as f: json.dump(calibrated, f, ensure_ascii=False, indent=2) return { "calibrated_lyrics_path": calibrated_path, "calibrated_lyrics": calibrated, "segment_count": len(calibrated), } async def _llm_calibrate(lyrics_text: str, asr_timings: list) -> list: """Use LLM to align raw lyrics text with ASR timings.""" prompt = f"""你是一个歌词时间轴校准专家。 原始歌词文本: {lyrics_text} ASR识别的时间戳(秒): {json.dumps(asr_timings, ensure_ascii=False)} 请将原始歌词的每一句与ASR时间戳对齐,输出JSON数组,每个元素包含: - text: 歌词文本 - start: 开始时间(秒,浮点数) - end: 结束时间(秒,浮点数) 要求: 1. 保持原始歌词的文本和顺序 2. 时间戳以ASR结果为基础进行微调 3. 确保时间不重叠,每句之间留适当间隔 4. 只输出JSON,不要其他内容""" try: from pipeline_service.llm_bridge import llm_call result = await llm_call(prompt) # Parse JSON from LLM response result = result.strip() if result.startswith("```"): result = result.split("\n", 1)[1].rsplit("```", 1)[0] return json.loads(result) except Exception as e: logger.warning(f"LLM calibration failed, using ASR timings directly: {e}") return asr_timings # ─── Subtitle Rendering ────────────────────────────────────────────── async def handle_subtitle_rendering(tenant_id, task_id, step_name, input_data, config): """Generate ASS karaoke subtitle file from calibrated lyrics.""" work_dir = _task_dir(task_id) # Find calibrated lyrics calibrated = None for dep_name, dep_output in input_data.items(): if isinstance(dep_output, dict): calibrated = dep_output.get("calibrated_lyrics") if not calibrated and dep_output.get("calibrated_lyrics_path"): with open(dep_output["calibrated_lyrics_path"], "r") as f: calibrated = json.load(f) if calibrated: break if not calibrated: raise ValueError("上游步骤未提供 calibrated_lyrics") # Generate ASS file ass_path = os.path.join(work_dir, "karaoke.ass") _write_ass_file(ass_path, calibrated) return {"ass_path": ass_path} def _write_ass_file(path: str, segments: list): """Write segments to ASS subtitle file with karaoke effect.""" header = """[Script Info] Title: KTV Karaoke Subtitles ScriptType: v4.00+ PlayResX: 1920 PlayResY: 1080 WrapStyle: 0 [V4+ Styles] Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding Style: KTV,Source Han Sans SC,72,&H00FFFFFF,&H0000FFFF,&H00000000,&H80000000,-1,0,0,0,100,100,1,0,1,3,1,2,40,40,60,1 [Events] Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text """ with open(path, "w", encoding="utf-8") as f: f.write(header) for seg in segments: start = _seconds_to_ass_time(seg["start"]) end = _seconds_to_ass_time(seg["end"]) text = seg["text"].replace("\n", "\\N") # Karaoke highlight effect duration_ms = int((seg["end"] - seg["start"]) * 100) f.write(f"Dialogue: 0,{start},{end},KTV,,0,0,0,,{{\\k{duration_ms}}}{text}\n") def _seconds_to_ass_time(seconds: float) -> str: """Convert seconds to ASS time format H:MM:SS.CC""" h = int(seconds // 3600) m = int((seconds % 3600) // 60) s = int(seconds % 60) cs = int((seconds % 1) * 100) return f"{h}:{m:02d}:{s:02d}.{cs:02d}" # ─── Subtitle Exporting ────────────────────────────────────────────── async def handle_subtitle_exporting(tenant_id, task_id, step_name, input_data, config): """Export ASS subtitle as standalone file (already done by rendering).""" ass_path = None for dep_name, dep_output in input_data.items(): if isinstance(dep_output, dict): ass_path = dep_output.get("ass_path") if ass_path: break if not ass_path or not os.path.exists(ass_path): raise ValueError("上游步骤未提供有效的 ass_path") return {"subtitle_path": ass_path, "format": "ass"} # ─── Lyric Generation & Evaluation (Mode C) ────────────────────────── async def handle_lyric_generating(tenant_id, task_id, step_name, input_data, config): """LLM generates lyrics from topic/outline.""" params = input_data.get("task_params", {}) topic = params.get("topic", params.get("outline", "")) style = params.get("style", "流行") language = params.get("language", "zh") if not topic: raise ValueError("缺少 topic/outline参数") prompt = f"""请创作一首{style}风格的{language}歌词。 主题/大纲: {topic} 要求: 1. 包含完整结构: 主歌(Verse)、副歌(Chorus)、桥段(Bridge) 2. 每句歌词节奏感强,适合演唱 3. 押韵自然,情感真实 4. 总长度适合3-5分钟歌曲 直接输出歌词文本,标注段落结构。""" try: from pipeline_service.llm_bridge import llm_call lyrics = await llm_call(prompt) return {"lyrics": lyrics.strip(), "topic": topic, "style": style} except Exception as e: raise ValueError(f"歌词生成失败: {e}") async def handle_lyric_evaluating(tenant_id, task_id, step_name, input_data, config): """Evaluate lyric quality, retry if below threshold.""" threshold = config.get("threshold", 8.5) lyrics = None for dep_name, dep_output in input_data.items(): if isinstance(dep_output, dict): lyrics = dep_output.get("lyrics") if lyrics: break if not lyrics: raise ValueError("上游步骤未提供歌词") # Evaluate via LLM prompt = f"""请从以下维度评估这首歌词的质量(1-10分): 1. 韵律节奏: 押韵、节奏感、可唱性 2. 情感表达: 情感真实度、共鸣力 3. 文学性: 用词、意象、修辞 4. 结构完整: 段落编排、层次感 5. 商业潜力: 流行度、记忆点 歌词: {lyrics} 输出JSON: {{"score": 8.5, "dimensions": {{...}}, "suggestions": "..."}} 只输出JSON。""" try: from pipeline_service.llm_bridge import llm_call result = await llm_call(prompt) result = result.strip() if result.startswith("```"): result = result.split("\n", 1)[1].rsplit("```", 1)[0] evaluation = json.loads(result) score = evaluation.get("score", 0) except Exception: score = 7.0 # Default pass if evaluation fails evaluation = {"score": score, "note": "evaluation_parse_failed"} if score < threshold: raise ValueError(f"歌词评分 {score} 低于阈值 {threshold},需要重新生成") return { "lyrics": lyrics, "evaluation": evaluation, "score": score, "passed": True, } # ─── Music Generation ──────────────────────────────────────────────── async def handle_music_generating(tenant_id, task_id, step_name, input_data, config): """Submit music generation job to Suno/MiniMax API.""" lyrics = None for dep_name, dep_output in input_data.items(): if isinstance(dep_output, dict): lyrics = dep_output.get("lyrics") if lyrics: break if not lyrics: raise ValueError("上游步骤未提供歌词") params = input_data.get("task_params", {}) music_service = params.get("music_service", "suno") style = params.get("music_style", "pop") # Submit to music generation API # TODO: Implement actual API call to Suno/MiniMax # For now, return a job_id placeholder job_id = f"music_{task_id}_{int(time.time())}" return { "music_job_id": job_id, "music_service": music_service, "style": style, "lyrics": lyrics, "status": "submitted", } async def handle_music_polling(tenant_id, task_id, step_name, input_data, config): """Poll music generation API until complete.""" job_info = None for dep_name, dep_output in input_data.items(): if isinstance(dep_output, dict): job_info = dep_output if job_info and job_info.get("music_job_id"): break if not job_info: raise ValueError("上游步骤未提供 music_job_id") # TODO: Implement actual polling logic # For now, simulate a wait and return work_dir = _task_dir(task_id) music_path = os.path.join(work_dir, "generated_music.mp3") # Placeholder: the actual implementation would poll the API # and download the result return { "music_path": music_path, "music_job_id": job_info.get("music_job_id"), "status": "completed", } # ─── Llmage API Helpers (unified via token.opencomputing.cn) ───────── LLMAGE_BASE = "https://token.opencomputing.cn/llmage/v1" LLMAGE_API_KEY = "0V4xNbIsR061JaYGt1f1L" # From media-server config # Virtual asset API base ASSET_API_BASE = "https://token.opencomputing.cn/reallife_asset/api" async def _llmage_t2i(prompt: str, model: str = "wan2.7-image-pro", size: str = "1024*1024") -> str: """Call llmage T2I API (synchronous), return image URL.""" import urllib.request body = json.dumps({ "model": model, "catelogid": "t2i", "prompt": prompt, "size": size, "n": 1, }).encode() req = urllib.request.Request( f"{LLMAGE_BASE}/image/generations", data=body, headers={ "Authorization": f"Bearer {LLMAGE_API_KEY}", "Content-Type": "application/json", }, ) with urllib.request.urlopen(req, timeout=120) as resp: data = json.loads(resp.read()) urls = data.get("data", [{}]) if urls and urls[0].get("url"): return urls[0]["url"] raise ValueError(f"T2I 生成失败: {json.dumps(data, ensure_ascii=False)}") async def _llmage_video_submit(model: str, catelogid: str, prompt: str, duration: int = 10, ratio: str = "16:9", resolution: str = "720p", image_files: list = None, # type: ignore video_files: list = None, # type: ignore audio_files: list = None) -> str: # type: ignore """Submit video generation task via llmage API, return task_id.""" import urllib.request body = { "model": model, "catelogid": catelogid, "prompt": prompt, "duration": duration, "ratio": ratio, "resolution": resolution, } if image_files: body["image_files"] = image_files if video_files: body["video_files"] = video_files if audio_files: body["audio_files"] = audio_files data_bytes = json.dumps(body, ensure_ascii=False).encode() req = urllib.request.Request( f"{LLMAGE_BASE}/video/generations", data=data_bytes, headers={ "Authorization": f"Bearer {LLMAGE_API_KEY}", "Content-Type": "application/json", }, ) with urllib.request.urlopen(req, timeout=120) as resp: result = json.loads(resp.read()) task_id = result.get("taskid") or result.get("task_id") or result.get("id") if not task_id: raise ValueError(f"视频提交失败: {json.dumps(result, ensure_ascii=False)}") return str(task_id) async def _llmage_video_poll(task_id: str, timeout: int = 600) -> str: """Poll video generation task until complete, return video URL.""" import urllib.request start = time.time() while time.time() - start < timeout: req = urllib.request.Request( f"{LLMAGE_BASE}/tasks?taskid={task_id}", headers={"Authorization": f"Bearer {LLMAGE_API_KEY}"}, ) with urllib.request.urlopen(req, timeout=30) as resp: result = json.loads(resp.read()) # Handle both JSON and Python-dict responses dd = result.get("data", result) status = dd.get("status", result.get("status", "")) if status == "SUCCEEDED": video_url = dd.get("video") or dd.get("video_url", "") if video_url: return video_url raise ValueError(f"任务成功但无视频URL: {json.dumps(result)}") elif status == "FAILED": raise ValueError(f"视频生成失败: {json.dumps(result, ensure_ascii=False)}") await asyncio.sleep(15) raise ValueError(f"视频生成超时 ({timeout}s): task_id={task_id}") async def _upload_to_asset_library(image_url: str, asset_name: str, asset_type: str = "Image", vendor_group_id: str = "") -> str: """Upload image to virtual asset library, return vendor_asset_id.""" import urllib.request import urllib.parse # First: get or create an AIGC virtual group if not provided if not vendor_group_id: vendor_group_id = await _get_or_create_aigc_group() body = urllib.parse.urlencode({ "vendor_group_id": vendor_group_id, "source_url": image_url, "asset_type": asset_type, "name": asset_name, }).encode() req = urllib.request.Request( f"{ASSET_API_BASE}/rl_virtual_upload.dspy", data=body, headers={ "Authorization": f"Bearer {LLMAGE_API_KEY}", "Content-Type": "application/x-www-form-urlencoded", }, ) with urllib.request.urlopen(req, timeout=120) as resp: result = json.loads(resp.read()) if result.get("status") == "ok": data = result.get("data", {}) asset_id = data.get("vendor_asset_id") or data.get("id", "") if asset_id: return asset_id raise ValueError(f"素材上传失败: {json.dumps(result, ensure_ascii=False)}") async def _get_or_create_aigc_group() -> str: """Get existing AIGC group or create new one, return vendor_group_id.""" import urllib.request # Try to list existing groups first req = urllib.request.Request( f"{ASSET_API_BASE}/rl_virtual_groups.dspy", headers={"Authorization": f"Bearer {LLMAGE_API_KEY}"}, ) with urllib.request.urlopen(req, timeout=30) as resp: result = json.loads(resp.read()) groups = result.get("data", {}).get("groups", []) if groups: return groups[0].get("vendor_group_id", "") # Create new group body = json.dumps({"name": "KTV产线虚拟素材"}).encode() req = urllib.request.Request( f"{ASSET_API_BASE}/rl_virtual_create_group.dspy", data=body, headers={ "Authorization": f"Bearer {LLMAGE_API_KEY}", "Content-Type": "application/json", }, ) with urllib.request.urlopen(req, timeout=30) as resp: result = json.loads(resp.read()) gid = result.get("data", {}).get("vendor_group_id", "") if gid: return gid raise ValueError(f"创建AIGC素材组失败: {json.dumps(result, ensure_ascii=False)}") async def _download_url(url: str, dest: str): """Download a URL to local file.""" stdout, stderr, rc = await _run_local(f"curl -sL -o '{dest}' '{url}'") if rc != 0: raise ValueError(f"下载失败 {url}: {stderr}") # ─── Character & Asset Generation (v2 — Ali T2I + Virtual Asset Library) ── # Default Seedance 2.0 model for video generation DEFAULT_VIDEO_MODEL = "doubao-seedance-2-0-260128" DEFAULT_VIDEO_MODEL_FAST = "doubao-seedance-2-0-fast-260128" async def handle_character_designing(tenant_id, task_id, step_name, input_data, config): """LLM designs complete MV visual assets: characters (3 views), props, costumes, scenes. Output includes image generation prompts for each asset type, ready for T2I generation. """ lyrics = None params = input_data.get("task_params", {}) for dep_name, dep_output in input_data.items(): if isinstance(dep_output, dict): lyrics = dep_output.get("lyrics") or dep_output.get("calibrated_lyrics") if isinstance(lyrics, list): lyrics = " ".join(s.get("text", "") for s in lyrics) if lyrics: break style = params.get("visual_style", "anime") prompt = f"""根据以下歌词,设计MV的完整视觉素材方案。 歌词: {lyrics} 视觉风格: {style} 请设计以下素材: ## 1. 角色(characters)— 1-3个主角 每个角色包含: - id: 角色标识(如 char_01) - name: 角色名 - description: 中文外貌描述 - prompts: 三个英文图像生成prompt: - front: 正面半身肖像(用于角色主参考图) - left: 左侧半身肖像(用于多角度参考) - right: 右侧半身肖像(用于多角度参考) 每个prompt格式: "portrait of [age]-year-old [gender], [hair], wearing [clothing], [expression], [lighting], [style], full body, photorealistic" ## 2. 道具(props)— 3-5个关键道具 每个道具包含: - id: 道具标识(如 prop_01) - name: 道具名 - prompt: 英文图像生成prompt(产品级特写,白色背景) ## 3. 服饰(costumes)— 1-3套服装 每个服饰包含: - id: 服饰标识(如 costume_01) - name: 服饰名 - for_character: 关联角色id - prompt: 英文图像生成prompt(服装平铺展示) ## 4. 场景(scenes_bg)— 2-3个主要场景背景 每个场景包含: - id: 场景标识(如 scene_bg_01) - name: 场景名 - prompt: 英文图像生成prompt(宽幅风景/环境图,无角色) 输出JSON格式: {{ "characters": [...], "props": [...], "costumes": [...], "scenes_bg": [...] }} 只输出JSON,不要其他内容。""" try: from pipeline_service.llm_bridge import llm_call result = await llm_call(prompt) result = result.strip() if result.startswith("```"): result = result.split("\n", 1)[1].rsplit("```", 1)[0] design = json.loads(result) except Exception as e: raise ValueError(f"角色设计失败: {e}") return { "character_design": design, "visual_style": style, "characters": design.get("characters", []), "props": design.get("props", []), "costumes": design.get("costumes", []), "scenes_bg": design.get("scenes_bg", []), } async def handle_character_image_generating(tenant_id, task_id, step_name, input_data, config): """Generate all visual asset images via Ali T2I (wan2.7-image-pro) through llmage API. Generates: character images (front/left/right), props, costumes, scene backgrounds. All images generated via https://token.opencomputing.cn/llmage/v1/image/generations """ work_dir = _task_dir(task_id) assets_dir = os.path.join(work_dir, "assets") os.makedirs(assets_dir, exist_ok=True) # Get design from upstream design = None for dep_name, dep_output in input_data.items(): if isinstance(dep_output, dict): design = dep_output.get("character_design") if design: break if not design: raise ValueError("上游步骤未提供角色设计方案") model = config.get("t2i_model", "wan2.7-image-pro") generated_assets = [] # Generate character images (3 views each) for char in design.get("characters", []): char_id = char.get("id", f"char_{len(generated_assets)}") prompts = char.get("prompts", {}) if isinstance(prompts, str): prompts = {"front": prompts} for view in ["front", "left", "right"]: prompt_text = prompts.get(view, "") if not prompt_text: continue asset_name = f"{char_id}_{view}" try: img_url = await _llmage_t2i(prompt_text, model=model) local_path = os.path.join(assets_dir, f"{asset_name}.png") await _download_url(img_url, local_path) generated_assets.append({ "asset_type": "character", "asset_id": char_id, "view": view, "name": f"{char.get('name', char_id)} ({view})", "image_url": img_url, "local_path": local_path, "prompt": prompt_text, }) logger.info(f"Generated character image: {asset_name}") except Exception as e: logger.warning(f"Failed to generate {asset_name}: {e}") # Generate prop images for prop in design.get("props", []): prop_id = prop.get("id", f"prop_{len(generated_assets)}") prompt_text = prop.get("prompt", "") if not prompt_text: continue try: img_url = await _llmage_t2i(prompt_text, model=model) local_path = os.path.join(assets_dir, f"{prop_id}.png") await _download_url(img_url, local_path) generated_assets.append({ "asset_type": "prop", "asset_id": prop_id, "name": prop.get("name", prop_id), "image_url": img_url, "local_path": local_path, "prompt": prompt_text, }) except Exception as e: logger.warning(f"Failed to generate prop {prop_id}: {e}") # Generate costume images for costume in design.get("costumes", []): costume_id = costume.get("id", f"costume_{len(generated_assets)}") prompt_text = costume.get("prompt", "") if not prompt_text: continue try: img_url = await _llmage_t2i(prompt_text, model=model) local_path = os.path.join(assets_dir, f"{costume_id}.png") await _download_url(img_url, local_path) generated_assets.append({ "asset_type": "costume", "asset_id": costume_id, "for_character": costume.get("for_character", ""), "name": costume.get("name", costume_id), "image_url": img_url, "local_path": local_path, "prompt": prompt_text, }) except Exception as e: logger.warning(f"Failed to generate costume {costume_id}: {e}") # Generate scene background images for scene in design.get("scenes_bg", []): scene_id = scene.get("id", f"scene_bg_{len(generated_assets)}") prompt_text = scene.get("prompt", "") if not prompt_text: continue try: img_url = await _llmage_t2i(prompt_text, model=model, size="1280*720") local_path = os.path.join(assets_dir, f"{scene_id}.png") await _download_url(img_url, local_path) generated_assets.append({ "asset_type": "scene_bg", "asset_id": scene_id, "name": scene.get("name", scene_id), "image_url": img_url, "local_path": local_path, "prompt": prompt_text, }) except Exception as e: logger.warning(f"Failed to generate scene bg {scene_id}: {e}") if not generated_assets: raise ValueError("所有素材图片生成失败") return { "generated_assets": generated_assets, "asset_count": len(generated_assets), "character_design": design, } async def handle_asset_uploading(tenant_id, task_id, step_name, input_data, config): """Upload generated asset images to virtual asset library. Returns asset references (asset://vendor_asset_id) for use in storyboard and video generation. """ generated_assets = None for dep_name, dep_output in input_data.items(): if isinstance(dep_output, dict): generated_assets = dep_output.get("generated_assets") if generated_assets: break if not generated_assets: raise ValueError("上游步骤未提供生成的素材") asset_refs = [] # List of {asset_type, asset_id, name, asset_ref, vendor_asset_id} vendor_group_id = config.get("vendor_group_id") # Optional override for asset in generated_assets: image_url = asset.get("image_url", "") name = asset.get("name", asset.get("asset_id", "unknown")) if not image_url: logger.warning(f"Asset {name} has no image_url, skipping upload") continue try: vendor_asset_id = await _upload_to_asset_library( image_url=image_url, asset_name=name, asset_type="Image", vendor_group_id=vendor_group_id, ) asset_ref = f"asset://{vendor_asset_id}" asset["asset_ref"] = asset_ref asset["vendor_asset_id"] = vendor_asset_id asset_refs.append({ "asset_type": asset.get("asset_type"), "asset_id": asset.get("asset_id"), "name": name, "asset_ref": asset_ref, "vendor_asset_id": vendor_asset_id, "view": asset.get("view", ""), }) logger.info(f"Uploaded asset: {name} → {asset_ref}") except Exception as e: logger.warning(f"Failed to upload asset {name}: {e}") # Fallback: use direct image URL as reference asset["asset_ref"] = image_url asset_refs.append({ "asset_type": asset.get("asset_type"), "asset_id": asset.get("asset_id"), "name": name, "asset_ref": image_url, "vendor_asset_id": None, "fallback": True, }) if not asset_refs: raise ValueError("所有素材上传失败") # Build asset lookup for storyboard generation asset_lookup = {} for ref in asset_refs: key = f"{ref['asset_type']}:{ref['asset_id']}" asset_lookup[key] = ref["asset_ref"] if ref.get("view"): view_key = f"{ref['asset_type']}:{ref['asset_id']}:{ref['view']}" asset_lookup[view_key] = ref["asset_ref"] return { "asset_refs": asset_refs, "asset_lookup": asset_lookup, "asset_count": len(asset_refs), "generated_assets": generated_assets, "character_design": None, # Pass through from upstream if needed } async def handle_storyboard_generating(tenant_id, task_id, step_name, input_data, config): """LLM generates storyboard using asset references (asset://素材号). Scenes with characters use r2v mode (reference-to-video with asset references). Scenes without characters use t2v mode. """ lyrics = None asset_refs = None asset_lookup = None character_design = None params = input_data.get("task_params", {}) for dep_name, dep_output in input_data.items(): if isinstance(dep_output, dict): if dep_output.get("calibrated_lyrics"): lyrics = dep_output["calibrated_lyrics"] elif dep_output.get("lyrics"): lyrics = dep_output["lyrics"] if dep_output.get("asset_refs"): asset_refs = dep_output["asset_refs"] if dep_output.get("asset_lookup"): asset_lookup = dep_output["asset_lookup"] if dep_output.get("character_design"): character_design = dep_output["character_design"] if not lyrics: raise ValueError("上游步骤未提供歌词") duration = params.get("duration", 240) # Build asset summary for LLM context asset_summary = "" if asset_refs: chars = [r for r in asset_refs if r.get("asset_type") == "character"] props = [r for r in asset_refs if r.get("asset_type") == "prop"] costumes = [r for r in asset_refs if r.get("asset_type") == "costume"] scenes_bg = [r for r in asset_refs if r.get("asset_type") == "scene_bg"] if chars: asset_summary += "角色素材:\n" for c in chars: asset_summary += f" - {c['name']}: {c['asset_ref']}\n" if props: asset_summary += "道具素材:\n" for p in props: asset_summary += f" - {p['name']}: {p['asset_ref']}\n" if costumes: asset_summary += "服饰素材:\n" for c in costumes: asset_summary += f" - {c['name']}: {c['asset_ref']}\n" if scenes_bg: asset_summary += "场景素材:\n" for s in scenes_bg: asset_summary += f" - {s['name']}: {s['asset_ref']}\n" prompt = f"""根据歌词和已有素材,生成MV分镜脚本。 歌词: {json.dumps(lyrics, ensure_ascii=False) if isinstance(lyrics, list) else lyrics} 视频总时长: {duration}秒 可用素材(用 asset://素材号 引用): {asset_summary if asset_summary else "无特定素材,使用纯T2V模式"} 请输出JSON数组,每个分镜包含: - scene_id: 分镜编号 - start_time: 开始秒数 - end_time: 结束秒数 - description: 场景中文描述 - video_prompt: 英文视频生成prompt(详细描述画面内容、动作、镜头) - characters: 出现的角色名列表(如 ["char_01"]) - assets: 使用的素材引用列表(如 ["asset://xxx", "asset://yyy"]) - use_r2v: 是否使用r2v模式(当场景包含角色时为true) - mood: 情绪/色调 - camera: 镜头运动描述 规则: 1. 包含角色的分镜必须设 use_r2v=true,并在assets中引用对应角色的asset:// 2. 无角色的分镜(纯风景/空镜)设 use_r2v=false 3. 确保分镜覆盖整首歌 4. 每个分镜5-15秒 5. video_prompt 必须纯英文,包含场景、光线、色调、镜头 只输出JSON数组。""" try: from pipeline_service.llm_bridge import llm_call result = await llm_call(prompt) result = result.strip() if result.startswith("```"): result = result.split("\n", 1)[1].rsplit("```", 1)[0] storyboard = json.loads(result) except Exception as e: raise ValueError(f"分镜生成失败: {e}") # Enrich storyboard with asset references if asset_lookup: for scene in storyboard: if scene.get("use_r2v") and scene.get("characters"): enriched_assets = [] for char_name in scene["characters"]: # Look up character front view asset ref for key, ref in asset_lookup.items(): if char_name in key and "front" in key: enriched_assets.append(ref) break else: # Fallback: any view for key, ref in asset_lookup.items(): if char_name in key: enriched_assets.append(ref) break if enriched_assets: scene["assets"] = enriched_assets work_dir = _task_dir(task_id) sb_path = os.path.join(work_dir, "storyboard.json") with open(sb_path, "w", encoding="utf-8") as f: json.dump(storyboard, f, ensure_ascii=False, indent=2) r2v_count = sum(1 for s in storyboard if s.get("use_r2v")) t2v_count = len(storyboard) - r2v_count return { "storyboard": storyboard, "storyboard_path": sb_path, "scene_count": len(storyboard), "r2v_scene_count": r2v_count, "t2v_scene_count": t2v_count, "asset_refs": asset_refs, } async def handle_scene_video_generating(tenant_id, task_id, step_name, input_data, config): """Generate scene videos via Seedance 2.0 (default) through llmage API. - Scenes with use_r2v=true: Seedance 2.0 r2v mode with asset:// references - Scenes without characters: Seedance 2.0 t2v mode - All calls via https://token.opencomputing.cn/llmage/v1/video/generations """ work_dir = _task_dir(task_id) scenes_dir = os.path.join(work_dir, "scenes") os.makedirs(scenes_dir, exist_ok=True) storyboard = None for dep_name, dep_output in input_data.items(): if isinstance(dep_output, dict): if dep_output.get("storyboard"): storyboard = dep_output["storyboard"] if not storyboard: raise ValueError("上游步骤未提供分镜脚本") video_model = config.get("video_model", DEFAULT_VIDEO_MODEL) resolution = config.get("resolution", "720p") batch_size = config.get("batch_size", 5) scene_videos = [] submitted_tasks = [] # [(scene_idx, task_id, scene_info)] # Phase 1: Submit all video generation tasks in batches for batch_start in range(0, len(storyboard), batch_size): batch = storyboard[batch_start:batch_start + batch_size] for i, scene in enumerate(batch): scene_idx = batch_start + i video_prompt = scene.get("video_prompt", scene.get("description", "")) duration = scene.get("end_time", 10) - scene.get("start_time", 0) # Map duration to Seedance supported range [4, 15] duration = max(4, min(15, duration)) try: if scene.get("use_r2v") and scene.get("assets"): # R2V mode: use asset references for character consistency task_id_video = await _llmage_video_submit( model=video_model, catelogid="r2v", prompt=video_prompt, duration=duration, ratio="16:9", resolution=resolution, image_files=scene["assets"], ) mode = "r2v" else: # T2V mode: pure text-to-video task_id_video = await _llmage_video_submit( model=video_model, catelogid="t2v", prompt=video_prompt, duration=duration, ratio="16:9", resolution=resolution, ) mode = "t2v" submitted_tasks.append((scene_idx, task_id_video, { "scene_id": scene.get("scene_id", scene_idx), "description": scene.get("description", ""), "video_prompt": video_prompt, "duration": duration, "mode": mode, })) logger.info(f"Submitted scene {scene_idx} ({mode}): task={task_id_video}") except Exception as e: logger.warning(f"Failed to submit scene {scene_idx}: {e}") # Try fallback to fast model try: task_id_video = await _llmage_video_submit( model=DEFAULT_VIDEO_MODEL_FAST, catelogid="t2v", prompt=video_prompt, duration=duration, ratio="16:9", resolution="720p", ) submitted_tasks.append((scene_idx, task_id_video, { "scene_id": scene.get("scene_id", scene_idx), "description": scene.get("description", ""), "video_prompt": video_prompt, "duration": duration, "mode": "t2v_fallback", })) logger.info(f"Scene {scene_idx} fallback submitted: task={task_id_video}") except Exception as e2: logger.error(f"Scene {scene_idx} fallback also failed: {e2}") # Wait between batches to avoid rate limits if batch_start + batch_size < len(storyboard): await asyncio.sleep(2) # Phase 2: Poll all submitted tasks logger.info(f"Polling {len(submitted_tasks)} video tasks...") for scene_idx, vid_task_id, scene_info in submitted_tasks: try: video_url = await _llmage_video_poll(vid_task_id, timeout=600) local_path = os.path.join(scenes_dir, f"scene_{scene_idx:03d}.mp4") await _download_url(video_url, local_path) scene_videos.append({ "scene_id": scene_info["scene_id"], "video_path": local_path, "video_url": video_url, "description": scene_info["description"], "video_prompt": scene_info["video_prompt"], "duration": scene_info["duration"], "mode": scene_info["mode"], }) logger.info(f"Scene {scene_idx} completed: {local_path}") except Exception as e: logger.warning(f"Scene {scene_idx} failed during polling: {e}") if not scene_videos: raise ValueError("所有场景视频生成失败") return { "scene_videos": scene_videos, "scene_count": len(scene_videos), "total_submitted": len(submitted_tasks), "video_model": video_model, } async def handle_scene_video_evaluating(tenant_id, task_id, step_name, input_data, config): """Evaluate scene video quality via VLM, retry if below threshold.""" threshold = config.get("threshold", 7.0) max_retry = config.get("max_retry", 3) scene_videos = None for dep_name, dep_output in input_data.items(): if isinstance(dep_output, dict): scene_videos = dep_output.get("scene_videos") if scene_videos: break if not scene_videos: raise ValueError("上游步骤未提供场景视频") # Evaluate each scene (simplified: check file exists and has reasonable size) valid_scenes = [] for sv in scene_videos: path = sv.get("video_path", "") if os.path.exists(path) and os.path.getsize(path) > 10000: sv["quality_score"] = 8.0 # Placeholder valid_scenes.append(sv) else: sv["quality_score"] = 0 logger.warning(f"Scene {sv.get('scene_id')} missing or too small: {path}") if not valid_scenes: raise ValueError("所有场景视频质量不合格") avg_score = sum(s.get("quality_score", 0) for s in valid_scenes) / len(valid_scenes) if avg_score < threshold: raise ValueError(f"平均质量分 {avg_score:.1f} 低于阈值 {threshold}") return {"scene_videos": valid_scenes, "avg_quality": avg_score} async def handle_scene_video_concatenating(tenant_id, task_id, step_name, input_data, config): """Concatenate scene videos with ffmpeg, loop to match audio duration.""" work_dir = _task_dir(task_id) scene_videos = None audio_duration = None for dep_name, dep_output in input_data.items(): if isinstance(dep_output, dict): if dep_output.get("scene_videos"): scene_videos = dep_output["scene_videos"] if dep_output.get("duration"): audio_duration = dep_output["duration"] if not scene_videos: raise ValueError("上游步骤未提供场景视频") # Create concat file concat_list = os.path.join(work_dir, "concat_list.txt") with open(concat_list, "w") as f: for sv in scene_videos: path = sv.get("video_path", "") if os.path.exists(path): f.write(f"file '{path}'\n") # Concatenate concat_path = os.path.join(work_dir, "concat_video.mp4") await _run_local( f"ffmpeg -y -f concat -safe 0 -i '{concat_list}' -c copy '{concat_path}'" ) # Loop to match audio duration if needed final_path = os.path.join(work_dir, "final_video.mp4") if audio_duration and audio_duration > 0: # Get concat duration stdout, _, _ = await _run_local( f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 '{concat_path}'" ) concat_dur = float(stdout.strip()) if stdout.strip() else 0 if concat_dur > 0 and concat_dur < audio_duration: loops = int(audio_duration / concat_dur) + 1 await _run_local( f"ffmpeg -y -stream_loop {loops} -i '{concat_path}' " f"-t {audio_duration} -c:v libx264 -preset fast '{final_path}'" ) else: await _run_local(f"cp '{concat_path}' '{final_path}'") else: await _run_local(f"cp '{concat_path}' '{final_path}'") return {"final_video_path": final_path} # ─── Final Synthesis ───────────────────────────────────────────────── async def handle_ktv_synthesizing(tenant_id, task_id, step_name, input_data, config): """Synthesize final KTV (dual-track) + MTV (single-track) videos.""" work_dir = _task_dir(task_id) video_path = None ass_path = None vocals_path = None no_vocals_path = None has_original_video = False for dep_name, dep_output in input_data.items(): if isinstance(dep_output, dict): if dep_output.get("final_video_path"): video_path = dep_output["final_video_path"] if dep_output.get("ass_path"): ass_path = dep_output["ass_path"] if dep_output.get("vocals_path"): vocals_path = dep_output["vocals_path"] if dep_output.get("no_vocals_path"): no_vocals_path = dep_output["no_vocals_path"] if dep_output.get("video_path") and not video_path: # Mode B: use original video video_path = dep_output["video_path"] has_original_video = True if not ass_path: raise ValueError("缺少字幕文件") # Determine audio tracks if has_original_video and not vocals_path: # Mode B: extract from video vocals_path = os.path.join(work_dir, "vocals.wav") no_vocals_path = os.path.join(work_dir, "no_vocals.wav") if not os.path.exists(vocals_path): raise ValueError("Demucs 步骤未提供人声轨道") if not video_path: raise ValueError("缺少视频源") # KTV version: dual audio (vocals + no_vocals) with subtitle overlay ktv_path = os.path.join(work_dir, "ktv_final.mp4") mtv_path = os.path.join(work_dir, "mtv_final.mp4") # KTV: video + vocals_track + no_vocals_track + subtitle burn if vocals_path and no_vocals_path: ktv_cmd = ( f"ffmpeg -y -i '{video_path}' -i '{vocals_path}' -i '{no_vocals_path}' " f"-filter_complex \"[0:v]ass='{ass_path}'[v]\" " f"-map '[v]' -map 1:a -map 2:a " f"-c:v libx264 -preset fast -c:a aac -b:a 192k " f"-metadata:s:a:0 title='Vocals' -metadata:s:a:1 title='Accompaniment' " f"'{ktv_path}'" ) else: ktv_cmd = ( f"ffmpeg -y -i '{video_path}' " f"-vf \"ass='{ass_path}'\" " f"-c:v libx264 -preset fast -c:a aac -b:a 192k " f"'{ktv_path}'" ) stdout, stderr, rc = await _run_local(ktv_cmd, timeout=600) if rc != 0: raise ValueError(f"KTV合成失败: {stderr}") # MTV: single audio (original/mix) with subtitle mtv_cmd = ( f"ffmpeg -y -i '{video_path}' " f"-vf \"ass='{ass_path}'\" " f"-c:v libx264 -preset fast -c:a aac -b:a 192k " f"'{mtv_path}'" ) stdout, stderr, rc = await _run_local(mtv_cmd, timeout=600) if rc != 0: logger.warning(f"MTV合成失败,仅输出KTV版本: {stderr}") result = { "ktv_path": ktv_path, "subtitle_path": ass_path, } if os.path.exists(mtv_path): result["mtv_path"] = mtv_path return result # ─── Registration ───────────────────────────────────────────────────── KTV_HANDLERS = { "audio_preparing": handle_audio_preparing, "video_preparing": handle_video_preparing, "demucs_separating": handle_demucs_separating, "lyric_calibrating": handle_lyric_calibrating, "subtitle_rendering": handle_subtitle_rendering, "subtitle_exporting": handle_subtitle_exporting, "lyric_generating": handle_lyric_generating, "lyric_evaluating": handle_lyric_evaluating, "music_generating": handle_music_generating, "music_polling": handle_music_polling, "character_designing": handle_character_designing, "character_image_generating": handle_character_image_generating, "asset_uploading": handle_asset_uploading, "storyboard_generating": handle_storyboard_generating, "scene_video_generating": handle_scene_video_generating, "scene_video_evaluating": handle_scene_video_evaluating, "scene_video_concatenating": handle_scene_video_concatenating, "ktv_synthesizing": handle_ktv_synthesizing, }