diff --git a/pipeline_ktv/adapter.py b/pipeline_ktv/adapter.py index 53980d7..1c38a79 100644 --- a/pipeline_ktv/adapter.py +++ b/pipeline_ktv/adapter.py @@ -2,7 +2,7 @@ 本模块是pipeline-service的外部适配器,负责: 1. 注册KTV特有的step_types(含元数据) -2. 注册17个step handler函数 +2. 注册18个step handler函数(含asset_uploading) 3. 提供load_ktv_adapter()一键加载 宿主应用先调用 load_pipeline_service(),再调用 load_ktv_adapter()。 @@ -24,6 +24,7 @@ from .handlers import ( handle_music_polling, handle_character_designing, handle_character_image_generating, + handle_asset_uploading, handle_storyboard_generating, handle_scene_video_generating, handle_scene_video_evaluating, @@ -101,28 +102,34 @@ KTV_STEP_TYPES = { "description": "轮询音乐生成API直到完成", }, "character_designing": { - "display_name": "角色设计", + "display_name": "视觉素材设计", "category": "llm", "is_interactive": False, - "description": "LLM设计MV角色方案", + "description": "LLM设计角色(正面/左侧/右侧)、道具、服饰、场景图片prompt", }, "character_image_generating": { - "display_name": "角色图片生成", + "display_name": "素材图片生成", "category": "media", "is_interactive": False, - "description": "GPU生成角色参考图片", + "description": "通过llmage T2I API (wan2.7) 生成角色多视角、道具、服饰、场景图片", + }, + "asset_uploading": { + "display_name": "素材上传虚拟库", + "category": "media", + "is_interactive": False, + "description": "将生成的素材图片上传到虚拟素材库,返回asset://素材号引用", }, "storyboard_generating": { "display_name": "分镜脚本生成", "category": "llm", "is_interactive": False, - "description": "LLM生成MV分镜脚本", + "description": "LLM生成MV分镜脚本,引用asset://素材号,标记r2v/t2v模式", }, "scene_video_generating": { "display_name": "场景视频生成", "category": "media", "is_interactive": False, - "description": "GPU运行T2V/Ref2V生成场景视频", + "description": "通过Seedance 2.0生成场景视频,含角色场景用r2v模式+素材引用", }, "scene_video_evaluating": { "display_name": "场景视频评估", @@ -160,6 +167,7 @@ KTV_HANDLERS = { "music_polling": handle_music_polling, "character_designing": handle_character_designing, "character_image_generating": handle_character_image_generating, + "asset_uploading": handle_asset_uploading, "storyboard_generating": handle_storyboard_generating, "scene_video_generating": handle_scene_video_generating, "scene_video_evaluating": handle_scene_video_evaluating, diff --git a/pipeline_ktv/handlers.py b/pipeline_ktv/handlers.py index 02e978f..ec98e5a 100644 --- a/pipeline_ktv/handlers.py +++ b/pipeline_ktv/handlers.py @@ -514,10 +514,202 @@ async def handle_music_polling(tenant_id, task_id, step_name, input_data, config } -# ─── Character & Video Generation ──────────────────────────────────── +# ─── Llmage API Helpers (unified via token.opencomputing.cn) ───────── + +LLMAGE_BASE = "https://token.opencomputing.cn/llmage/v1" +LLMAGE_API_KEY = "0V4xNbIsR061JaYGt1f1L" # From media-server config + +# Virtual asset API base +ASSET_API_BASE = "https://token.opencomputing.cn/reallife_asset/api" + + +async def _llmage_t2i(prompt: str, model: str = "wan2.7-image-pro", + size: str = "1024*1024") -> str: + """Call llmage T2I API (synchronous), return image URL.""" + import urllib.request + body = json.dumps({ + "model": model, + "catelogid": "t2i", + "prompt": prompt, + "size": size, + "n": 1, + }).encode() + req = urllib.request.Request( + f"{LLMAGE_BASE}/image/generations", + data=body, + headers={ + "Authorization": f"Bearer {LLMAGE_API_KEY}", + "Content-Type": "application/json", + }, + ) + with urllib.request.urlopen(req, timeout=120) as resp: + data = json.loads(resp.read()) + urls = data.get("data", [{}]) + if urls and urls[0].get("url"): + return urls[0]["url"] + raise ValueError(f"T2I 生成失败: {json.dumps(data, ensure_ascii=False)}") + + +async def _llmage_video_submit(model: str, catelogid: str, prompt: str, + duration: int = 10, ratio: str = "16:9", + resolution: str = "720p", + image_files: list = None, # type: ignore + video_files: list = None, # type: ignore + audio_files: list = None) -> str: # type: ignore + """Submit video generation task via llmage API, return task_id.""" + import urllib.request + body = { + "model": model, + "catelogid": catelogid, + "prompt": prompt, + "duration": duration, + "ratio": ratio, + "resolution": resolution, + } + if image_files: + body["image_files"] = image_files + if video_files: + body["video_files"] = video_files + if audio_files: + body["audio_files"] = audio_files + + data_bytes = json.dumps(body, ensure_ascii=False).encode() + req = urllib.request.Request( + f"{LLMAGE_BASE}/video/generations", + data=data_bytes, + headers={ + "Authorization": f"Bearer {LLMAGE_API_KEY}", + "Content-Type": "application/json", + }, + ) + with urllib.request.urlopen(req, timeout=120) as resp: + result = json.loads(resp.read()) + task_id = result.get("taskid") or result.get("task_id") or result.get("id") + if not task_id: + raise ValueError(f"视频提交失败: {json.dumps(result, ensure_ascii=False)}") + return str(task_id) + + +async def _llmage_video_poll(task_id: str, timeout: int = 600) -> str: + """Poll video generation task until complete, return video URL.""" + import urllib.request + start = time.time() + while time.time() - start < timeout: + req = urllib.request.Request( + f"{LLMAGE_BASE}/tasks?taskid={task_id}", + headers={"Authorization": f"Bearer {LLMAGE_API_KEY}"}, + ) + with urllib.request.urlopen(req, timeout=30) as resp: + result = json.loads(resp.read()) + + # Handle both JSON and Python-dict responses + dd = result.get("data", result) + status = dd.get("status", result.get("status", "")) + + if status == "SUCCEEDED": + video_url = dd.get("video") or dd.get("video_url", "") + if video_url: + return video_url + raise ValueError(f"任务成功但无视频URL: {json.dumps(result)}") + elif status == "FAILED": + raise ValueError(f"视频生成失败: {json.dumps(result, ensure_ascii=False)}") + + await asyncio.sleep(15) + + raise ValueError(f"视频生成超时 ({timeout}s): task_id={task_id}") + + +async def _upload_to_asset_library(image_url: str, asset_name: str, + asset_type: str = "Image", + vendor_group_id: str = "") -> str: + """Upload image to virtual asset library, return vendor_asset_id.""" + import urllib.request + import urllib.parse + + # First: get or create an AIGC virtual group if not provided + if not vendor_group_id: + vendor_group_id = await _get_or_create_aigc_group() + + body = urllib.parse.urlencode({ + "vendor_group_id": vendor_group_id, + "source_url": image_url, + "asset_type": asset_type, + "name": asset_name, + }).encode() + + req = urllib.request.Request( + f"{ASSET_API_BASE}/rl_virtual_upload.dspy", + data=body, + headers={ + "Authorization": f"Bearer {LLMAGE_API_KEY}", + "Content-Type": "application/x-www-form-urlencoded", + }, + ) + with urllib.request.urlopen(req, timeout=120) as resp: + result = json.loads(resp.read()) + + if result.get("status") == "ok": + data = result.get("data", {}) + asset_id = data.get("vendor_asset_id") or data.get("id", "") + if asset_id: + return asset_id + raise ValueError(f"素材上传失败: {json.dumps(result, ensure_ascii=False)}") + + +async def _get_or_create_aigc_group() -> str: + """Get existing AIGC group or create new one, return vendor_group_id.""" + import urllib.request + + # Try to list existing groups first + req = urllib.request.Request( + f"{ASSET_API_BASE}/rl_virtual_groups.dspy", + headers={"Authorization": f"Bearer {LLMAGE_API_KEY}"}, + ) + with urllib.request.urlopen(req, timeout=30) as resp: + result = json.loads(resp.read()) + + groups = result.get("data", {}).get("groups", []) + if groups: + return groups[0].get("vendor_group_id", "") + + # Create new group + body = json.dumps({"name": "KTV产线虚拟素材"}).encode() + req = urllib.request.Request( + f"{ASSET_API_BASE}/rl_virtual_create_group.dspy", + data=body, + headers={ + "Authorization": f"Bearer {LLMAGE_API_KEY}", + "Content-Type": "application/json", + }, + ) + with urllib.request.urlopen(req, timeout=30) as resp: + result = json.loads(resp.read()) + + gid = result.get("data", {}).get("vendor_group_id", "") + if gid: + return gid + raise ValueError(f"创建AIGC素材组失败: {json.dumps(result, ensure_ascii=False)}") + + +async def _download_url(url: str, dest: str): + """Download a URL to local file.""" + stdout, stderr, rc = await _run_local(f"curl -sL -o '{dest}' '{url}'") + if rc != 0: + raise ValueError(f"下载失败 {url}: {stderr}") + + +# ─── Character & Asset Generation (v2 — Ali T2I + Virtual Asset Library) ── + +# Default Seedance 2.0 model for video generation +DEFAULT_VIDEO_MODEL = "doubao-seedance-2-0-260128" +DEFAULT_VIDEO_MODEL_FAST = "doubao-seedance-2-0-fast-260128" + async def handle_character_designing(tenant_id, task_id, step_name, input_data, config): - """LLM designs MV character descriptions.""" + """LLM designs complete MV visual assets: characters (3 views), props, costumes, scenes. + + Output includes image generation prompts for each asset type, ready for T2I generation. + """ lyrics = None params = input_data.get("task_params", {}) for dep_name, dep_output in input_data.items(): @@ -530,20 +722,54 @@ async def handle_character_designing(tenant_id, task_id, step_name, input_data, style = params.get("visual_style", "anime") - prompt = f"""根据以下歌词,设计MV角色方案。 + prompt = f"""根据以下歌词,设计MV的完整视觉素材方案。 歌词: {lyrics} 视觉风格: {style} -请设计1-3个角色,每个角色包含: -1. 角色名称 -2. 外貌描述(用于AI图像生成的详细prompt) -3. 性格特征 -4. 在MV中的角色定位 +请设计以下素材: -输出JSON数组。""" +## 1. 角色(characters)— 1-3个主角 +每个角色包含: +- id: 角色标识(如 char_01) +- name: 角色名 +- description: 中文外貌描述 +- prompts: 三个英文图像生成prompt: + - front: 正面半身肖像(用于角色主参考图) + - left: 左侧半身肖像(用于多角度参考) + - right: 右侧半身肖像(用于多角度参考) + 每个prompt格式: "portrait of [age]-year-old [gender], [hair], wearing [clothing], [expression], [lighting], [style], full body, photorealistic" + +## 2. 道具(props)— 3-5个关键道具 +每个道具包含: +- id: 道具标识(如 prop_01) +- name: 道具名 +- prompt: 英文图像生成prompt(产品级特写,白色背景) + +## 3. 服饰(costumes)— 1-3套服装 +每个服饰包含: +- id: 服饰标识(如 costume_01) +- name: 服饰名 +- for_character: 关联角色id +- prompt: 英文图像生成prompt(服装平铺展示) + +## 4. 场景(scenes_bg)— 2-3个主要场景背景 +每个场景包含: +- id: 场景标识(如 scene_bg_01) +- name: 场景名 +- prompt: 英文图像生成prompt(宽幅风景/环境图,无角色) + +输出JSON格式: +{{ + "characters": [...], + "props": [...], + "costumes": [...], + "scenes_bg": [...] +}} + +只输出JSON,不要其他内容。""" try: from pipeline_service.llm_bridge import llm_call @@ -551,62 +777,236 @@ async def handle_character_designing(tenant_id, task_id, step_name, input_data, result = result.strip() if result.startswith("```"): result = result.split("\n", 1)[1].rsplit("```", 1)[0] - characters = json.loads(result) + design = json.loads(result) except Exception as e: raise ValueError(f"角色设计失败: {e}") - return {"characters": characters, "visual_style": style} + return { + "character_design": design, + "visual_style": style, + "characters": design.get("characters", []), + "props": design.get("props", []), + "costumes": design.get("costumes", []), + "scenes_bg": design.get("scenes_bg", []), + } async def handle_character_image_generating(tenant_id, task_id, step_name, input_data, config): - """Generate character reference images using wan2.7 on GPU server.""" - work_dir = _task_dir(task_id) - gpu_dir = _gpu_task_dir(task_id) + """Generate all visual asset images via Ali T2I (wan2.7-image-pro) through llmage API. - characters = None + Generates: character images (front/left/right), props, costumes, scene backgrounds. + All images generated via https://token.opencomputing.cn/llmage/v1/image/generations + """ + work_dir = _task_dir(task_id) + assets_dir = os.path.join(work_dir, "assets") + os.makedirs(assets_dir, exist_ok=True) + + # Get design from upstream + design = None for dep_name, dep_output in input_data.items(): if isinstance(dep_output, dict): - characters = dep_output.get("characters") - if characters: + design = dep_output.get("character_design") + if design: break - if not characters: - raise ValueError("上游步骤未提供角色设计") + if not design: + raise ValueError("上游步骤未提供角色设计方案") - await _run_gpu(f"mkdir -p {gpu_dir}/characters") - char_images = [] + model = config.get("t2i_model", "wan2.7-image-pro") + generated_assets = [] - for i, char in enumerate(characters): - prompt = char.get("prompt", char.get("description", "")) - if not prompt: + # Generate character images (3 views each) + for char in design.get("characters", []): + char_id = char.get("id", f"char_{len(generated_assets)}") + prompts = char.get("prompts", {}) + if isinstance(prompts, str): + prompts = {"front": prompts} + + for view in ["front", "left", "right"]: + prompt_text = prompts.get(view, "") + if not prompt_text: + continue + asset_name = f"{char_id}_{view}" + try: + img_url = await _llmage_t2i(prompt_text, model=model) + local_path = os.path.join(assets_dir, f"{asset_name}.png") + await _download_url(img_url, local_path) + generated_assets.append({ + "asset_type": "character", + "asset_id": char_id, + "view": view, + "name": f"{char.get('name', char_id)} ({view})", + "image_url": img_url, + "local_path": local_path, + "prompt": prompt_text, + }) + logger.info(f"Generated character image: {asset_name}") + except Exception as e: + logger.warning(f"Failed to generate {asset_name}: {e}") + + # Generate prop images + for prop in design.get("props", []): + prop_id = prop.get("id", f"prop_{len(generated_assets)}") + prompt_text = prop.get("prompt", "") + if not prompt_text: + continue + try: + img_url = await _llmage_t2i(prompt_text, model=model) + local_path = os.path.join(assets_dir, f"{prop_id}.png") + await _download_url(img_url, local_path) + generated_assets.append({ + "asset_type": "prop", + "asset_id": prop_id, + "name": prop.get("name", prop_id), + "image_url": img_url, + "local_path": local_path, + "prompt": prompt_text, + }) + except Exception as e: + logger.warning(f"Failed to generate prop {prop_id}: {e}") + + # Generate costume images + for costume in design.get("costumes", []): + costume_id = costume.get("id", f"costume_{len(generated_assets)}") + prompt_text = costume.get("prompt", "") + if not prompt_text: + continue + try: + img_url = await _llmage_t2i(prompt_text, model=model) + local_path = os.path.join(assets_dir, f"{costume_id}.png") + await _download_url(img_url, local_path) + generated_assets.append({ + "asset_type": "costume", + "asset_id": costume_id, + "for_character": costume.get("for_character", ""), + "name": costume.get("name", costume_id), + "image_url": img_url, + "local_path": local_path, + "prompt": prompt_text, + }) + except Exception as e: + logger.warning(f"Failed to generate costume {costume_id}: {e}") + + # Generate scene background images + for scene in design.get("scenes_bg", []): + scene_id = scene.get("id", f"scene_bg_{len(generated_assets)}") + prompt_text = scene.get("prompt", "") + if not prompt_text: + continue + try: + img_url = await _llmage_t2i(prompt_text, model=model, size="1280*720") + local_path = os.path.join(assets_dir, f"{scene_id}.png") + await _download_url(img_url, local_path) + generated_assets.append({ + "asset_type": "scene_bg", + "asset_id": scene_id, + "name": scene.get("name", scene_id), + "image_url": img_url, + "local_path": local_path, + "prompt": prompt_text, + }) + except Exception as e: + logger.warning(f"Failed to generate scene bg {scene_id}: {e}") + + if not generated_assets: + raise ValueError("所有素材图片生成失败") + + return { + "generated_assets": generated_assets, + "asset_count": len(generated_assets), + "character_design": design, + } + + +async def handle_asset_uploading(tenant_id, task_id, step_name, input_data, config): + """Upload generated asset images to virtual asset library. + + Returns asset references (asset://vendor_asset_id) for use in storyboard and video generation. + """ + generated_assets = None + for dep_name, dep_output in input_data.items(): + if isinstance(dep_output, dict): + generated_assets = dep_output.get("generated_assets") + if generated_assets: + break + + if not generated_assets: + raise ValueError("上游步骤未提供生成的素材") + + asset_refs = [] # List of {asset_type, asset_id, name, asset_ref, vendor_asset_id} + vendor_group_id = config.get("vendor_group_id") # Optional override + + for asset in generated_assets: + image_url = asset.get("image_url", "") + name = asset.get("name", asset.get("asset_id", "unknown")) + + if not image_url: + logger.warning(f"Asset {name} has no image_url, skipping upload") continue - # Generate image on GPU with wan2.7 - gen_cmd = ( - f"cd {GPU_WAN22_DIR} && " - f"source venv/bin/activate && " - f"python generate.py --prompt '{prompt}' " - f"--output {gpu_dir}/characters/char_{i}.png " - f"--width 512 --height 512" - ) - stdout, stderr, rc = await _run_gpu(gen_cmd, timeout=120) + try: + vendor_asset_id = await _upload_to_asset_library( + image_url=image_url, + asset_name=name, + asset_type="Image", + vendor_group_id=vendor_group_id, + ) + asset_ref = f"asset://{vendor_asset_id}" + asset["asset_ref"] = asset_ref + asset["vendor_asset_id"] = vendor_asset_id + asset_refs.append({ + "asset_type": asset.get("asset_type"), + "asset_id": asset.get("asset_id"), + "name": name, + "asset_ref": asset_ref, + "vendor_asset_id": vendor_asset_id, + "view": asset.get("view", ""), + }) + logger.info(f"Uploaded asset: {name} → {asset_ref}") + except Exception as e: + logger.warning(f"Failed to upload asset {name}: {e}") + # Fallback: use direct image URL as reference + asset["asset_ref"] = image_url + asset_refs.append({ + "asset_type": asset.get("asset_type"), + "asset_id": asset.get("asset_id"), + "name": name, + "asset_ref": image_url, + "vendor_asset_id": None, + "fallback": True, + }) - local_path = os.path.join(work_dir, f"char_{i}.png") - await _copy_from_gpu(f"{gpu_dir}/characters/char_{i}.png", local_path) + if not asset_refs: + raise ValueError("所有素材上传失败") - char_images.append({ - "name": char.get("name", f"char_{i}"), - "image_path": local_path, - "prompt": prompt, - }) + # Build asset lookup for storyboard generation + asset_lookup = {} + for ref in asset_refs: + key = f"{ref['asset_type']}:{ref['asset_id']}" + asset_lookup[key] = ref["asset_ref"] + if ref.get("view"): + view_key = f"{ref['asset_type']}:{ref['asset_id']}:{ref['view']}" + asset_lookup[view_key] = ref["asset_ref"] - return {"character_images": char_images} + return { + "asset_refs": asset_refs, + "asset_lookup": asset_lookup, + "asset_count": len(asset_refs), + "generated_assets": generated_assets, + "character_design": None, # Pass through from upstream if needed + } async def handle_storyboard_generating(tenant_id, task_id, step_name, input_data, config): - """LLM generates storyboard script from lyrics + characters.""" + """LLM generates storyboard using asset references (asset://素材号). + + Scenes with characters use r2v mode (reference-to-video with asset references). + Scenes without characters use t2v mode. + """ lyrics = None - char_images = None + asset_refs = None + asset_lookup = None + character_design = None params = input_data.get("task_params", {}) for dep_name, dep_output in input_data.items(): @@ -615,34 +1015,73 @@ async def handle_storyboard_generating(tenant_id, task_id, step_name, input_data lyrics = dep_output["calibrated_lyrics"] elif dep_output.get("lyrics"): lyrics = dep_output["lyrics"] - if dep_output.get("character_images"): - char_images = dep_output["character_images"] + if dep_output.get("asset_refs"): + asset_refs = dep_output["asset_refs"] + if dep_output.get("asset_lookup"): + asset_lookup = dep_output["asset_lookup"] + if dep_output.get("character_design"): + character_design = dep_output["character_design"] if not lyrics: raise ValueError("上游步骤未提供歌词") - duration = params.get("duration", 240) # Default 4 min + duration = params.get("duration", 240) - prompt = f"""根据歌词和角色,生成MV分镜脚本。 + # Build asset summary for LLM context + asset_summary = "" + if asset_refs: + chars = [r for r in asset_refs if r.get("asset_type") == "character"] + props = [r for r in asset_refs if r.get("asset_type") == "prop"] + costumes = [r for r in asset_refs if r.get("asset_type") == "costume"] + scenes_bg = [r for r in asset_refs if r.get("asset_type") == "scene_bg"] + + if chars: + asset_summary += "角色素材:\n" + for c in chars: + asset_summary += f" - {c['name']}: {c['asset_ref']}\n" + if props: + asset_summary += "道具素材:\n" + for p in props: + asset_summary += f" - {p['name']}: {p['asset_ref']}\n" + if costumes: + asset_summary += "服饰素材:\n" + for c in costumes: + asset_summary += f" - {c['name']}: {c['asset_ref']}\n" + if scenes_bg: + asset_summary += "场景素材:\n" + for s in scenes_bg: + asset_summary += f" - {s['name']}: {s['asset_ref']}\n" + + prompt = f"""根据歌词和已有素材,生成MV分镜脚本。 歌词: {json.dumps(lyrics, ensure_ascii=False) if isinstance(lyrics, list) else lyrics} -角色: -{json.dumps(char_images, ensure_ascii=False) if char_images else "无特定角色"} - 视频总时长: {duration}秒 +可用素材(用 asset://素材号 引用): +{asset_summary if asset_summary else "无特定素材,使用纯T2V模式"} + 请输出JSON数组,每个分镜包含: - scene_id: 分镜编号 - start_time: 开始秒数 - end_time: 结束秒数 -- description: 场景描述(英文,用于视频生成prompt) -- characters: 出现的角色 -- camera: 镜头运动描述 +- description: 场景中文描述 +- video_prompt: 英文视频生成prompt(详细描述画面内容、动作、镜头) +- characters: 出现的角色名列表(如 ["char_01"]) +- assets: 使用的素材引用列表(如 ["asset://xxx", "asset://yyy"]) +- use_r2v: 是否使用r2v模式(当场景包含角色时为true) - mood: 情绪/色调 +- camera: 镜头运动描述 -确保分镜覆盖整首歌,每个分镜5-15秒。""" +规则: +1. 包含角色的分镜必须设 use_r2v=true,并在assets中引用对应角色的asset:// +2. 无角色的分镜(纯风景/空镜)设 use_r2v=false +3. 确保分镜覆盖整首歌 +4. 每个分镜5-15秒 +5. video_prompt 必须纯英文,包含场景、光线、色调、镜头 + +只输出JSON数组。""" try: from pipeline_service.llm_bridge import llm_call @@ -654,85 +1093,172 @@ async def handle_storyboard_generating(tenant_id, task_id, step_name, input_data except Exception as e: raise ValueError(f"分镜生成失败: {e}") + # Enrich storyboard with asset references + if asset_lookup: + for scene in storyboard: + if scene.get("use_r2v") and scene.get("characters"): + enriched_assets = [] + for char_name in scene["characters"]: + # Look up character front view asset ref + for key, ref in asset_lookup.items(): + if char_name in key and "front" in key: + enriched_assets.append(ref) + break + else: + # Fallback: any view + for key, ref in asset_lookup.items(): + if char_name in key: + enriched_assets.append(ref) + break + if enriched_assets: + scene["assets"] = enriched_assets + work_dir = _task_dir(task_id) sb_path = os.path.join(work_dir, "storyboard.json") with open(sb_path, "w", encoding="utf-8") as f: json.dump(storyboard, f, ensure_ascii=False, indent=2) - return {"storyboard": storyboard, "storyboard_path": sb_path, "scene_count": len(storyboard)} + r2v_count = sum(1 for s in storyboard if s.get("use_r2v")) + t2v_count = len(storyboard) - r2v_count + + return { + "storyboard": storyboard, + "storyboard_path": sb_path, + "scene_count": len(storyboard), + "r2v_scene_count": r2v_count, + "t2v_scene_count": t2v_count, + "asset_refs": asset_refs, + } async def handle_scene_video_generating(tenant_id, task_id, step_name, input_data, config): - """Generate scene videos on GPU using T2V/Ref2V.""" + """Generate scene videos via Seedance 2.0 (default) through llmage API. + + - Scenes with use_r2v=true: Seedance 2.0 r2v mode with asset:// references + - Scenes without characters: Seedance 2.0 t2v mode + - All calls via https://token.opencomputing.cn/llmage/v1/video/generations + """ work_dir = _task_dir(task_id) - gpu_dir = _gpu_task_dir(task_id) + scenes_dir = os.path.join(work_dir, "scenes") + os.makedirs(scenes_dir, exist_ok=True) storyboard = None - char_images = None for dep_name, dep_output in input_data.items(): if isinstance(dep_output, dict): if dep_output.get("storyboard"): storyboard = dep_output["storyboard"] - if dep_output.get("character_images"): - char_images = dep_output["character_images"] if not storyboard: raise ValueError("上游步骤未提供分镜脚本") - await _run_gpu(f"mkdir -p {gpu_dir}/scenes") - - # Copy character images to GPU if available - if char_images: - for ci in char_images: - local = ci.get("image_path", "") - if local and os.path.exists(local): - remote = f"{gpu_dir}/characters/{os.path.basename(local)}" - await _copy_to_gpu(local, remote) + video_model = config.get("video_model", DEFAULT_VIDEO_MODEL) + resolution = config.get("resolution", "720p") + batch_size = config.get("batch_size", 5) scene_videos = [] - for i, scene in enumerate(storyboard): - desc = scene.get("description", "") - duration = scene.get("end_time", 10) - scene.get("start_time", 5) - frames = int(duration * 24) # 24fps + submitted_tasks = [] # [(scene_idx, task_id, scene_info)] - # Determine if we use Ref2V (with character ref) or T2V - ref_image = None - if char_images and scene.get("characters"): - # Find matching character image - for ci in char_images: - if ci.get("name") in str(scene.get("characters", [])): - ref_image = f"{gpu_dir}/characters/{os.path.basename(ci['image_path'])}" - break + # Phase 1: Submit all video generation tasks in batches + for batch_start in range(0, len(storyboard), batch_size): + batch = storyboard[batch_start:batch_start + batch_size] + for i, scene in enumerate(batch): + scene_idx = batch_start + i + video_prompt = scene.get("video_prompt", scene.get("description", "")) + duration = scene.get("end_time", 10) - scene.get("start_time", 0) - if ref_image: - gen_cmd = ( - f"cd {GPU_WAN22_DIR} && source venv/bin/activate && " - f"python generate_ref2v.py --prompt '{desc}' " - f"--ref_image '{ref_image}' " - f"--output {gpu_dir}/scenes/scene_{i:03d}.mp4 " - f"--frames {frames}" - ) - else: - gen_cmd = ( - f"cd {GPU_WAN22_DIR} && source venv/bin/activate && " - f"python generate_t2v.py --prompt '{desc}' " - f"--output {gpu_dir}/scenes/scene_{i:03d}.mp4 " - f"--frames {frames}" - ) + # Map duration to Seedance supported range [4, 15] + duration = max(4, min(15, duration)) - stdout, stderr, rc = await _run_gpu(gen_cmd, timeout=600) + try: + if scene.get("use_r2v") and scene.get("assets"): + # R2V mode: use asset references for character consistency + task_id_video = await _llmage_video_submit( + model=video_model, + catelogid="r2v", + prompt=video_prompt, + duration=duration, + ratio="16:9", + resolution=resolution, + image_files=scene["assets"], + ) + mode = "r2v" + else: + # T2V mode: pure text-to-video + task_id_video = await _llmage_video_submit( + model=video_model, + catelogid="t2v", + prompt=video_prompt, + duration=duration, + ratio="16:9", + resolution=resolution, + ) + mode = "t2v" - local_scene = os.path.join(work_dir, f"scene_{i:03d}.mp4") - await _copy_from_gpu(f"{gpu_dir}/scenes/scene_{i:03d}.mp4", local_scene) + submitted_tasks.append((scene_idx, task_id_video, { + "scene_id": scene.get("scene_id", scene_idx), + "description": scene.get("description", ""), + "video_prompt": video_prompt, + "duration": duration, + "mode": mode, + })) + logger.info(f"Submitted scene {scene_idx} ({mode}): task={task_id_video}") + except Exception as e: + logger.warning(f"Failed to submit scene {scene_idx}: {e}") + # Try fallback to fast model + try: + task_id_video = await _llmage_video_submit( + model=DEFAULT_VIDEO_MODEL_FAST, + catelogid="t2v", + prompt=video_prompt, + duration=duration, + ratio="16:9", + resolution="720p", + ) + submitted_tasks.append((scene_idx, task_id_video, { + "scene_id": scene.get("scene_id", scene_idx), + "description": scene.get("description", ""), + "video_prompt": video_prompt, + "duration": duration, + "mode": "t2v_fallback", + })) + logger.info(f"Scene {scene_idx} fallback submitted: task={task_id_video}") + except Exception as e2: + logger.error(f"Scene {scene_idx} fallback also failed: {e2}") - scene_videos.append({ - "scene_id": scene.get("scene_id", i), - "video_path": local_scene, - "description": desc, - "duration": duration, - }) + # Wait between batches to avoid rate limits + if batch_start + batch_size < len(storyboard): + await asyncio.sleep(2) - return {"scene_videos": scene_videos, "scene_count": len(scene_videos)} + # Phase 2: Poll all submitted tasks + logger.info(f"Polling {len(submitted_tasks)} video tasks...") + for scene_idx, vid_task_id, scene_info in submitted_tasks: + try: + video_url = await _llmage_video_poll(vid_task_id, timeout=600) + local_path = os.path.join(scenes_dir, f"scene_{scene_idx:03d}.mp4") + await _download_url(video_url, local_path) + + scene_videos.append({ + "scene_id": scene_info["scene_id"], + "video_path": local_path, + "video_url": video_url, + "description": scene_info["description"], + "video_prompt": scene_info["video_prompt"], + "duration": scene_info["duration"], + "mode": scene_info["mode"], + }) + logger.info(f"Scene {scene_idx} completed: {local_path}") + except Exception as e: + logger.warning(f"Scene {scene_idx} failed during polling: {e}") + + if not scene_videos: + raise ValueError("所有场景视频生成失败") + + return { + "scene_videos": scene_videos, + "scene_count": len(scene_videos), + "total_submitted": len(submitted_tasks), + "video_model": video_model, + } async def handle_scene_video_evaluating(tenant_id, task_id, step_name, input_data, config): @@ -927,6 +1453,7 @@ KTV_HANDLERS = { "music_polling": handle_music_polling, "character_designing": handle_character_designing, "character_image_generating": handle_character_image_generating, + "asset_uploading": handle_asset_uploading, "storyboard_generating": handle_storyboard_generating, "scene_video_generating": handle_scene_video_generating, "scene_video_evaluating": handle_scene_video_evaluating,