Hermes Agent 83011d13d9 feat: KTV产线v2 — Ali T2I素材生成 + 虚拟素材库 + Seedance 2.0 R2V
- character_designing: 扩展为完整视觉素材设计(角色3视角+道具+服饰+场景)
- character_image_generating: 改用llmage API (wan2.7-image-pro)生成所有素材图片
- 新增asset_uploading: 素材图片上传到虚拟素材库,返回asset://素材号
- storyboard_generating: 用asset://引用素材,标记r2v/t2v模式
- scene_video_generating: 缺省Seedance 2.0,r2v模式+素材引用
- 所有模型统一通过token.opencomputing.cn接入
- adapter.py: 注册18个step handler(含asset_uploading)
2026-06-20 12:18:56 +08:00

1463 lines
54 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""KTV pipeline step handlers.
Implements the 17 step types for KTV production pipelines.
Each handler: async def handler(tenant_id, task_id, step_name, input_data, config) -> dict
Architecture:
- Heavy compute (demucs, video gen) runs on GPU server via SSH
- ffmpeg/audio processing runs locally
- LLM calls via harnessed_agent (llm_chat)
- ASR via SenseVoice
- External APIs: Suno/MiniMax for music, wan2.7 for images/video
"""
import asyncio
import json
import os
import logging
import tempfile
import time
logger = logging.getLogger("pipeline.handlers.ktv")
# GPU server config (from memory: ymq@opencomputing.net, 8x4090)
GPU_HOST = "ymq@opencomputing.net"
GPU_DEMUCS_VENV = "/data/ymq/demucs_venv"
GPU_WAN22_DIR = "/data/ymq/wan22-service"
GPU_PIPELINE_DIR = "/data/pipeline/ktv"
# Local work directory
LOCAL_WORK_DIR = "/data/pipeline/ktv"
def _task_dir(task_id: str) -> str:
"""Get working directory for a task."""
d = os.path.join(LOCAL_WORK_DIR, task_id)
os.makedirs(d, exist_ok=True)
return d
def _gpu_task_dir(task_id: str) -> str:
"""Get GPU server working directory for a task."""
return f"{GPU_PIPELINE_DIR}/{task_id}"
async def _run_local(cmd: str, timeout: int = 300) -> tuple:
"""Run a local command, return (stdout, stderr, returncode)."""
proc = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
return stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace"), proc.returncode
except asyncio.TimeoutError:
proc.kill()
return "", "timeout", -1
async def _run_gpu(cmd: str, timeout: int = 600) -> tuple:
"""Run a command on GPU server via SSH."""
ssh_cmd = f"ssh -o StrictHostKeyChecking=no {GPU_HOST} '{cmd}'"
return await _run_local(ssh_cmd, timeout=timeout)
async def _copy_to_gpu(local_path: str, remote_path: str):
"""SCP file to GPU server."""
await _run_local(f"scp -o StrictHostKeyChecking=no '{local_path}' {GPU_HOST}:{remote_path}")
async def _copy_from_gpu(remote_path: str, local_path: str):
"""SCP file from GPU server."""
await _run_local(f"scp -o StrictHostKeyChecking=no {GPU_HOST}:{remote_path} '{local_path}'")
# ─── Media Preparation ────────────────────────────────────────────────
async def handle_audio_preparing(tenant_id, task_id, step_name, input_data, config):
"""Download/copy audio file, extract duration with ffprobe."""
work_dir = _task_dir(task_id)
params = input_data.get("task_params", {})
audio_url = params.get("audio_url", params.get("audio_path", ""))
if not audio_url:
raise ValueError("缺少 audio_url 或 audio_path 参数")
# Download or copy audio
audio_path = os.path.join(work_dir, "original_audio.mp3")
if audio_url.startswith("http"):
stdout, stderr, rc = await _run_local(f"curl -sL -o '{audio_path}' '{audio_url}'")
if rc != 0:
raise ValueError(f"下载音频失败: {stderr}")
else:
await _run_local(f"cp '{audio_url}' '{audio_path}'")
# Extract duration
stdout, stderr, rc = await _run_local(
f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 '{audio_path}'"
)
duration = float(stdout.strip()) if rc == 0 else 0
return {
"audio_path": audio_path,
"duration": duration,
"format": os.path.splitext(audio_path)[1].lstrip("."),
}
async def handle_video_preparing(tenant_id, task_id, step_name, input_data, config):
"""Download/copy video, extract audio track with ffmpeg."""
work_dir = _task_dir(task_id)
params = input_data.get("task_params", {})
video_url = params.get("video_url", params.get("video_path", ""))
if not video_url:
raise ValueError("缺少 video_url 或 video_path 参数")
video_path = os.path.join(work_dir, "original_video.mp4")
audio_path = os.path.join(work_dir, "original_audio.mp3")
if video_url.startswith("http"):
await _run_local(f"curl -sL -o '{video_path}' '{video_url}'")
else:
await _run_local(f"cp '{video_url}' '{video_path}'")
# Extract audio
await _run_local(
f"ffmpeg -y -i '{video_path}' -vn -acodec libmp3lame -q:a 2 '{audio_path}'"
)
# Get duration
stdout, _, rc = await _run_local(
f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 '{video_path}'"
)
duration = float(stdout.strip()) if rc == 0 else 0
return {
"video_path": video_path,
"audio_path": audio_path,
"duration": duration,
}
# ─── Demucs Separation ───────────────────────────────────────────────
async def handle_demucs_separating(tenant_id, task_id, step_name, input_data, config):
"""Run Demucs on GPU server to separate vocals and accompaniment."""
work_dir = _task_dir(task_id)
gpu_dir = _gpu_task_dir(task_id)
# Find audio path from deps
audio_path = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
audio_path = dep_output.get("audio_path")
if audio_path:
break
if not audio_path:
raise ValueError("上游步骤未提供 audio_path")
# Prepare GPU directory
await _run_gpu(f"mkdir -p {gpu_dir}")
# Copy audio to GPU
remote_audio = f"{gpu_dir}/audio.mp3"
await _copy_to_gpu(audio_path, remote_audio)
# Run Demucs on GPU
demucs_cmd = (
f"cd {gpu_dir} && "
f"source {GPU_DEMUCS_VENV}/bin/activate && "
f"python -m demucs --two-stems vocals -n htdemucs --mp3 '{remote_audio}' && "
f"deactivate"
)
stdout, stderr, rc = await _run_gpu(demucs_cmd, timeout=600)
if rc != 0:
raise ValueError(f"Demucs 分离失败: {stderr}")
# Copy results back
vocals_local = os.path.join(work_dir, "vocals.wav")
no_vocals_local = os.path.join(work_dir, "no_vocals.wav")
base = os.path.splitext(os.path.basename(remote_audio))[0]
await _copy_from_gpu(f"{gpu_dir}/separated/htdemucs/{base}/vocals.wav", vocals_local)
await _copy_from_gpu(f"{gpu_dir}/separated/htdemuds/{base}/no_vocals.wav", no_vocals_local)
return {
"vocals_path": vocals_local,
"no_vocals_path": no_vocals_local,
}
# ─── Lyric Calibration ───────────────────────────────────────────────
async def handle_lyric_calibrating(tenant_id, task_id, step_name, input_data, config):
"""ASR timing recognition + LLM calibration against original lyrics."""
work_dir = _task_dir(task_id)
params = input_data.get("task_params", {})
lyrics_text = params.get("lyrics", params.get("lyrics_text", ""))
# Find vocals path from deps
vocals_path = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
vocals_path = dep_output.get("vocals_path")
if vocals_path:
break
if not vocals_path:
raise ValueError("上游步骤未提供 vocals_path")
if not lyrics_text:
raise ValueError("缺少 lyrics 参数")
# Step 1: Run SenseVoice ASR on vocals to get timing
asr_result_path = os.path.join(work_dir, "asr_timings.json")
# Copy vocals to GPU for ASR
gpu_dir = _gpu_task_dir(task_id)
await _run_gpu(f"mkdir -p {gpu_dir}")
remote_vocals = f"{gpu_dir}/vocals.wav"
await _copy_to_gpu(vocals_path, remote_vocals)
# Run SenseVoice ASR
asr_script = f"""
cd {gpu_dir}
source {GPU_DEMUCS_VENV}/bin/activate
python -c "
import json
from funasr import AutoModel
model = AutoModel(model='iic/SenseVoiceSmall', trust_remote_code=True)
res = model.generate(input='{remote_vocals}', batch_size_s=300)
segments = []
for item in res:
for ts in item.get('timestamp', []):
segments.append({{'text': item.get('text', ''), 'start': ts[0]/1000, 'end': ts[1]/1000}})
with open('asr_timings.json', 'w') as f:
json.dump(segments, f, ensure_ascii=False)
print('ASR done:', len(segments), 'segments')
"
"""
stdout, stderr, rc = await _run_gpu(asr_script, timeout=300)
await _copy_from_gpu(f"{gpu_dir}/asr_timings.json", asr_result_path)
with open(asr_result_path, "r") as f:
asr_timings = json.load(f)
# Step 2: LLM calibration — align ASR timings with original lyrics
from pipeline_service.handler import get_handler
calibrated = await _llm_calibrate(lyrics_text, asr_timings)
# Save calibrated lyrics
calibrated_path = os.path.join(work_dir, "calibrated_lyrics.json")
with open(calibrated_path, "w", encoding="utf-8") as f:
json.dump(calibrated, f, ensure_ascii=False, indent=2)
return {
"calibrated_lyrics_path": calibrated_path,
"calibrated_lyrics": calibrated,
"segment_count": len(calibrated),
}
async def _llm_calibrate(lyrics_text: str, asr_timings: list) -> list:
"""Use LLM to align raw lyrics text with ASR timings."""
prompt = f"""你是一个歌词时间轴校准专家。
原始歌词文本:
{lyrics_text}
ASR识别的时间戳:
{json.dumps(asr_timings, ensure_ascii=False)}
请将原始歌词的每一句与ASR时间戳对齐输出JSON数组每个元素包含:
- text: 歌词文本
- start: 开始时间(秒,浮点数)
- end: 结束时间(秒,浮点数)
要求:
1. 保持原始歌词的文本和顺序
2. 时间戳以ASR结果为基础进行微调
3. 确保时间不重叠,每句之间留适当间隔
4. 只输出JSON不要其他内容"""
try:
from pipeline_service.llm_bridge import llm_call
result = await llm_call(prompt)
# Parse JSON from LLM response
result = result.strip()
if result.startswith("```"):
result = result.split("\n", 1)[1].rsplit("```", 1)[0]
return json.loads(result)
except Exception as e:
logger.warning(f"LLM calibration failed, using ASR timings directly: {e}")
return asr_timings
# ─── Subtitle Rendering ──────────────────────────────────────────────
async def handle_subtitle_rendering(tenant_id, task_id, step_name, input_data, config):
"""Generate ASS karaoke subtitle file from calibrated lyrics."""
work_dir = _task_dir(task_id)
# Find calibrated lyrics
calibrated = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
calibrated = dep_output.get("calibrated_lyrics")
if not calibrated and dep_output.get("calibrated_lyrics_path"):
with open(dep_output["calibrated_lyrics_path"], "r") as f:
calibrated = json.load(f)
if calibrated:
break
if not calibrated:
raise ValueError("上游步骤未提供 calibrated_lyrics")
# Generate ASS file
ass_path = os.path.join(work_dir, "karaoke.ass")
_write_ass_file(ass_path, calibrated)
return {"ass_path": ass_path}
def _write_ass_file(path: str, segments: list):
"""Write segments to ASS subtitle file with karaoke effect."""
header = """[Script Info]
Title: KTV Karaoke Subtitles
ScriptType: v4.00+
PlayResX: 1920
PlayResY: 1080
WrapStyle: 0
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: KTV,Source Han Sans SC,72,&H00FFFFFF,&H0000FFFF,&H00000000,&H80000000,-1,0,0,0,100,100,1,0,1,3,1,2,40,40,60,1
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
with open(path, "w", encoding="utf-8") as f:
f.write(header)
for seg in segments:
start = _seconds_to_ass_time(seg["start"])
end = _seconds_to_ass_time(seg["end"])
text = seg["text"].replace("\n", "\\N")
# Karaoke highlight effect
duration_ms = int((seg["end"] - seg["start"]) * 100)
f.write(f"Dialogue: 0,{start},{end},KTV,,0,0,0,,{{\\k{duration_ms}}}{text}\n")
def _seconds_to_ass_time(seconds: float) -> str:
"""Convert seconds to ASS time format H:MM:SS.CC"""
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
cs = int((seconds % 1) * 100)
return f"{h}:{m:02d}:{s:02d}.{cs:02d}"
# ─── Subtitle Exporting ──────────────────────────────────────────────
async def handle_subtitle_exporting(tenant_id, task_id, step_name, input_data, config):
"""Export ASS subtitle as standalone file (already done by rendering)."""
ass_path = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
ass_path = dep_output.get("ass_path")
if ass_path:
break
if not ass_path or not os.path.exists(ass_path):
raise ValueError("上游步骤未提供有效的 ass_path")
return {"subtitle_path": ass_path, "format": "ass"}
# ─── Lyric Generation & Evaluation (Mode C) ──────────────────────────
async def handle_lyric_generating(tenant_id, task_id, step_name, input_data, config):
"""LLM generates lyrics from topic/outline."""
params = input_data.get("task_params", {})
topic = params.get("topic", params.get("outline", ""))
style = params.get("style", "流行")
language = params.get("language", "zh")
if not topic:
raise ValueError("缺少 topic/outline参数")
prompt = f"""请创作一首{style}风格的{language}歌词。
主题/大纲: {topic}
要求:
1. 包含完整结构: 主歌(Verse)、副歌(Chorus)、桥段(Bridge)
2. 每句歌词节奏感强,适合演唱
3. 押韵自然,情感真实
4. 总长度适合3-5分钟歌曲
直接输出歌词文本,标注段落结构。"""
try:
from pipeline_service.llm_bridge import llm_call
lyrics = await llm_call(prompt)
return {"lyrics": lyrics.strip(), "topic": topic, "style": style}
except Exception as e:
raise ValueError(f"歌词生成失败: {e}")
async def handle_lyric_evaluating(tenant_id, task_id, step_name, input_data, config):
"""Evaluate lyric quality, retry if below threshold."""
threshold = config.get("threshold", 8.5)
lyrics = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
lyrics = dep_output.get("lyrics")
if lyrics:
break
if not lyrics:
raise ValueError("上游步骤未提供歌词")
# Evaluate via LLM
prompt = f"""请从以下维度评估这首歌词的质量(1-10分):
1. 韵律节奏: 押韵、节奏感、可唱性
2. 情感表达: 情感真实度、共鸣力
3. 文学性: 用词、意象、修辞
4. 结构完整: 段落编排、层次感
5. 商业潜力: 流行度、记忆点
歌词:
{lyrics}
输出JSON: {{"score": 8.5, "dimensions": {{...}}, "suggestions": "..."}}
只输出JSON。"""
try:
from pipeline_service.llm_bridge import llm_call
result = await llm_call(prompt)
result = result.strip()
if result.startswith("```"):
result = result.split("\n", 1)[1].rsplit("```", 1)[0]
evaluation = json.loads(result)
score = evaluation.get("score", 0)
except Exception:
score = 7.0 # Default pass if evaluation fails
evaluation = {"score": score, "note": "evaluation_parse_failed"}
if score < threshold:
raise ValueError(f"歌词评分 {score} 低于阈值 {threshold},需要重新生成")
return {
"lyrics": lyrics,
"evaluation": evaluation,
"score": score,
"passed": True,
}
# ─── Music Generation ────────────────────────────────────────────────
async def handle_music_generating(tenant_id, task_id, step_name, input_data, config):
"""Submit music generation job to Suno/MiniMax API."""
lyrics = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
lyrics = dep_output.get("lyrics")
if lyrics:
break
if not lyrics:
raise ValueError("上游步骤未提供歌词")
params = input_data.get("task_params", {})
music_service = params.get("music_service", "suno")
style = params.get("music_style", "pop")
# Submit to music generation API
# TODO: Implement actual API call to Suno/MiniMax
# For now, return a job_id placeholder
job_id = f"music_{task_id}_{int(time.time())}"
return {
"music_job_id": job_id,
"music_service": music_service,
"style": style,
"lyrics": lyrics,
"status": "submitted",
}
async def handle_music_polling(tenant_id, task_id, step_name, input_data, config):
"""Poll music generation API until complete."""
job_info = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
job_info = dep_output
if job_info and job_info.get("music_job_id"):
break
if not job_info:
raise ValueError("上游步骤未提供 music_job_id")
# TODO: Implement actual polling logic
# For now, simulate a wait and return
work_dir = _task_dir(task_id)
music_path = os.path.join(work_dir, "generated_music.mp3")
# Placeholder: the actual implementation would poll the API
# and download the result
return {
"music_path": music_path,
"music_job_id": job_info.get("music_job_id"),
"status": "completed",
}
# ─── Llmage API Helpers (unified via token.opencomputing.cn) ─────────
LLMAGE_BASE = "https://token.opencomputing.cn/llmage/v1"
LLMAGE_API_KEY = "0V4xNbIsR061JaYGt1f1L" # From media-server config
# Virtual asset API base
ASSET_API_BASE = "https://token.opencomputing.cn/reallife_asset/api"
async def _llmage_t2i(prompt: str, model: str = "wan2.7-image-pro",
size: str = "1024*1024") -> str:
"""Call llmage T2I API (synchronous), return image URL."""
import urllib.request
body = json.dumps({
"model": model,
"catelogid": "t2i",
"prompt": prompt,
"size": size,
"n": 1,
}).encode()
req = urllib.request.Request(
f"{LLMAGE_BASE}/image/generations",
data=body,
headers={
"Authorization": f"Bearer {LLMAGE_API_KEY}",
"Content-Type": "application/json",
},
)
with urllib.request.urlopen(req, timeout=120) as resp:
data = json.loads(resp.read())
urls = data.get("data", [{}])
if urls and urls[0].get("url"):
return urls[0]["url"]
raise ValueError(f"T2I 生成失败: {json.dumps(data, ensure_ascii=False)}")
async def _llmage_video_submit(model: str, catelogid: str, prompt: str,
duration: int = 10, ratio: str = "16:9",
resolution: str = "720p",
image_files: list = None, # type: ignore
video_files: list = None, # type: ignore
audio_files: list = None) -> str: # type: ignore
"""Submit video generation task via llmage API, return task_id."""
import urllib.request
body = {
"model": model,
"catelogid": catelogid,
"prompt": prompt,
"duration": duration,
"ratio": ratio,
"resolution": resolution,
}
if image_files:
body["image_files"] = image_files
if video_files:
body["video_files"] = video_files
if audio_files:
body["audio_files"] = audio_files
data_bytes = json.dumps(body, ensure_ascii=False).encode()
req = urllib.request.Request(
f"{LLMAGE_BASE}/video/generations",
data=data_bytes,
headers={
"Authorization": f"Bearer {LLMAGE_API_KEY}",
"Content-Type": "application/json",
},
)
with urllib.request.urlopen(req, timeout=120) as resp:
result = json.loads(resp.read())
task_id = result.get("taskid") or result.get("task_id") or result.get("id")
if not task_id:
raise ValueError(f"视频提交失败: {json.dumps(result, ensure_ascii=False)}")
return str(task_id)
async def _llmage_video_poll(task_id: str, timeout: int = 600) -> str:
"""Poll video generation task until complete, return video URL."""
import urllib.request
start = time.time()
while time.time() - start < timeout:
req = urllib.request.Request(
f"{LLMAGE_BASE}/tasks?taskid={task_id}",
headers={"Authorization": f"Bearer {LLMAGE_API_KEY}"},
)
with urllib.request.urlopen(req, timeout=30) as resp:
result = json.loads(resp.read())
# Handle both JSON and Python-dict responses
dd = result.get("data", result)
status = dd.get("status", result.get("status", ""))
if status == "SUCCEEDED":
video_url = dd.get("video") or dd.get("video_url", "")
if video_url:
return video_url
raise ValueError(f"任务成功但无视频URL: {json.dumps(result)}")
elif status == "FAILED":
raise ValueError(f"视频生成失败: {json.dumps(result, ensure_ascii=False)}")
await asyncio.sleep(15)
raise ValueError(f"视频生成超时 ({timeout}s): task_id={task_id}")
async def _upload_to_asset_library(image_url: str, asset_name: str,
asset_type: str = "Image",
vendor_group_id: str = "") -> str:
"""Upload image to virtual asset library, return vendor_asset_id."""
import urllib.request
import urllib.parse
# First: get or create an AIGC virtual group if not provided
if not vendor_group_id:
vendor_group_id = await _get_or_create_aigc_group()
body = urllib.parse.urlencode({
"vendor_group_id": vendor_group_id,
"source_url": image_url,
"asset_type": asset_type,
"name": asset_name,
}).encode()
req = urllib.request.Request(
f"{ASSET_API_BASE}/rl_virtual_upload.dspy",
data=body,
headers={
"Authorization": f"Bearer {LLMAGE_API_KEY}",
"Content-Type": "application/x-www-form-urlencoded",
},
)
with urllib.request.urlopen(req, timeout=120) as resp:
result = json.loads(resp.read())
if result.get("status") == "ok":
data = result.get("data", {})
asset_id = data.get("vendor_asset_id") or data.get("id", "")
if asset_id:
return asset_id
raise ValueError(f"素材上传失败: {json.dumps(result, ensure_ascii=False)}")
async def _get_or_create_aigc_group() -> str:
"""Get existing AIGC group or create new one, return vendor_group_id."""
import urllib.request
# Try to list existing groups first
req = urllib.request.Request(
f"{ASSET_API_BASE}/rl_virtual_groups.dspy",
headers={"Authorization": f"Bearer {LLMAGE_API_KEY}"},
)
with urllib.request.urlopen(req, timeout=30) as resp:
result = json.loads(resp.read())
groups = result.get("data", {}).get("groups", [])
if groups:
return groups[0].get("vendor_group_id", "")
# Create new group
body = json.dumps({"name": "KTV产线虚拟素材"}).encode()
req = urllib.request.Request(
f"{ASSET_API_BASE}/rl_virtual_create_group.dspy",
data=body,
headers={
"Authorization": f"Bearer {LLMAGE_API_KEY}",
"Content-Type": "application/json",
},
)
with urllib.request.urlopen(req, timeout=30) as resp:
result = json.loads(resp.read())
gid = result.get("data", {}).get("vendor_group_id", "")
if gid:
return gid
raise ValueError(f"创建AIGC素材组失败: {json.dumps(result, ensure_ascii=False)}")
async def _download_url(url: str, dest: str):
"""Download a URL to local file."""
stdout, stderr, rc = await _run_local(f"curl -sL -o '{dest}' '{url}'")
if rc != 0:
raise ValueError(f"下载失败 {url}: {stderr}")
# ─── Character & Asset Generation (v2 — Ali T2I + Virtual Asset Library) ──
# Default Seedance 2.0 model for video generation
DEFAULT_VIDEO_MODEL = "doubao-seedance-2-0-260128"
DEFAULT_VIDEO_MODEL_FAST = "doubao-seedance-2-0-fast-260128"
async def handle_character_designing(tenant_id, task_id, step_name, input_data, config):
"""LLM designs complete MV visual assets: characters (3 views), props, costumes, scenes.
Output includes image generation prompts for each asset type, ready for T2I generation.
"""
lyrics = None
params = input_data.get("task_params", {})
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
lyrics = dep_output.get("lyrics") or dep_output.get("calibrated_lyrics")
if isinstance(lyrics, list):
lyrics = " ".join(s.get("text", "") for s in lyrics)
if lyrics:
break
style = params.get("visual_style", "anime")
prompt = f"""根据以下歌词设计MV的完整视觉素材方案。
歌词:
{lyrics}
视觉风格: {style}
请设计以下素材:
## 1. 角色characters— 1-3个主角
每个角色包含:
- id: 角色标识(如 char_01
- name: 角色名
- description: 中文外貌描述
- prompts: 三个英文图像生成prompt:
- front: 正面半身肖像(用于角色主参考图)
- left: 左侧半身肖像(用于多角度参考)
- right: 右侧半身肖像(用于多角度参考)
每个prompt格式: "portrait of [age]-year-old [gender], [hair], wearing [clothing], [expression], [lighting], [style], full body, photorealistic"
## 2. 道具props— 3-5个关键道具
每个道具包含:
- id: 道具标识(如 prop_01
- name: 道具名
- prompt: 英文图像生成prompt产品级特写白色背景
## 3. 服饰costumes— 1-3套服装
每个服饰包含:
- id: 服饰标识(如 costume_01
- name: 服饰名
- for_character: 关联角色id
- prompt: 英文图像生成prompt服装平铺展示
## 4. 场景scenes_bg— 2-3个主要场景背景
每个场景包含:
- id: 场景标识(如 scene_bg_01
- name: 场景名
- prompt: 英文图像生成prompt宽幅风景/环境图,无角色)
输出JSON格式:
{{
"characters": [...],
"props": [...],
"costumes": [...],
"scenes_bg": [...]
}}
只输出JSON不要其他内容。"""
try:
from pipeline_service.llm_bridge import llm_call
result = await llm_call(prompt)
result = result.strip()
if result.startswith("```"):
result = result.split("\n", 1)[1].rsplit("```", 1)[0]
design = json.loads(result)
except Exception as e:
raise ValueError(f"角色设计失败: {e}")
return {
"character_design": design,
"visual_style": style,
"characters": design.get("characters", []),
"props": design.get("props", []),
"costumes": design.get("costumes", []),
"scenes_bg": design.get("scenes_bg", []),
}
async def handle_character_image_generating(tenant_id, task_id, step_name, input_data, config):
"""Generate all visual asset images via Ali T2I (wan2.7-image-pro) through llmage API.
Generates: character images (front/left/right), props, costumes, scene backgrounds.
All images generated via https://token.opencomputing.cn/llmage/v1/image/generations
"""
work_dir = _task_dir(task_id)
assets_dir = os.path.join(work_dir, "assets")
os.makedirs(assets_dir, exist_ok=True)
# Get design from upstream
design = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
design = dep_output.get("character_design")
if design:
break
if not design:
raise ValueError("上游步骤未提供角色设计方案")
model = config.get("t2i_model", "wan2.7-image-pro")
generated_assets = []
# Generate character images (3 views each)
for char in design.get("characters", []):
char_id = char.get("id", f"char_{len(generated_assets)}")
prompts = char.get("prompts", {})
if isinstance(prompts, str):
prompts = {"front": prompts}
for view in ["front", "left", "right"]:
prompt_text = prompts.get(view, "")
if not prompt_text:
continue
asset_name = f"{char_id}_{view}"
try:
img_url = await _llmage_t2i(prompt_text, model=model)
local_path = os.path.join(assets_dir, f"{asset_name}.png")
await _download_url(img_url, local_path)
generated_assets.append({
"asset_type": "character",
"asset_id": char_id,
"view": view,
"name": f"{char.get('name', char_id)} ({view})",
"image_url": img_url,
"local_path": local_path,
"prompt": prompt_text,
})
logger.info(f"Generated character image: {asset_name}")
except Exception as e:
logger.warning(f"Failed to generate {asset_name}: {e}")
# Generate prop images
for prop in design.get("props", []):
prop_id = prop.get("id", f"prop_{len(generated_assets)}")
prompt_text = prop.get("prompt", "")
if not prompt_text:
continue
try:
img_url = await _llmage_t2i(prompt_text, model=model)
local_path = os.path.join(assets_dir, f"{prop_id}.png")
await _download_url(img_url, local_path)
generated_assets.append({
"asset_type": "prop",
"asset_id": prop_id,
"name": prop.get("name", prop_id),
"image_url": img_url,
"local_path": local_path,
"prompt": prompt_text,
})
except Exception as e:
logger.warning(f"Failed to generate prop {prop_id}: {e}")
# Generate costume images
for costume in design.get("costumes", []):
costume_id = costume.get("id", f"costume_{len(generated_assets)}")
prompt_text = costume.get("prompt", "")
if not prompt_text:
continue
try:
img_url = await _llmage_t2i(prompt_text, model=model)
local_path = os.path.join(assets_dir, f"{costume_id}.png")
await _download_url(img_url, local_path)
generated_assets.append({
"asset_type": "costume",
"asset_id": costume_id,
"for_character": costume.get("for_character", ""),
"name": costume.get("name", costume_id),
"image_url": img_url,
"local_path": local_path,
"prompt": prompt_text,
})
except Exception as e:
logger.warning(f"Failed to generate costume {costume_id}: {e}")
# Generate scene background images
for scene in design.get("scenes_bg", []):
scene_id = scene.get("id", f"scene_bg_{len(generated_assets)}")
prompt_text = scene.get("prompt", "")
if not prompt_text:
continue
try:
img_url = await _llmage_t2i(prompt_text, model=model, size="1280*720")
local_path = os.path.join(assets_dir, f"{scene_id}.png")
await _download_url(img_url, local_path)
generated_assets.append({
"asset_type": "scene_bg",
"asset_id": scene_id,
"name": scene.get("name", scene_id),
"image_url": img_url,
"local_path": local_path,
"prompt": prompt_text,
})
except Exception as e:
logger.warning(f"Failed to generate scene bg {scene_id}: {e}")
if not generated_assets:
raise ValueError("所有素材图片生成失败")
return {
"generated_assets": generated_assets,
"asset_count": len(generated_assets),
"character_design": design,
}
async def handle_asset_uploading(tenant_id, task_id, step_name, input_data, config):
"""Upload generated asset images to virtual asset library.
Returns asset references (asset://vendor_asset_id) for use in storyboard and video generation.
"""
generated_assets = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
generated_assets = dep_output.get("generated_assets")
if generated_assets:
break
if not generated_assets:
raise ValueError("上游步骤未提供生成的素材")
asset_refs = [] # List of {asset_type, asset_id, name, asset_ref, vendor_asset_id}
vendor_group_id = config.get("vendor_group_id") # Optional override
for asset in generated_assets:
image_url = asset.get("image_url", "")
name = asset.get("name", asset.get("asset_id", "unknown"))
if not image_url:
logger.warning(f"Asset {name} has no image_url, skipping upload")
continue
try:
vendor_asset_id = await _upload_to_asset_library(
image_url=image_url,
asset_name=name,
asset_type="Image",
vendor_group_id=vendor_group_id,
)
asset_ref = f"asset://{vendor_asset_id}"
asset["asset_ref"] = asset_ref
asset["vendor_asset_id"] = vendor_asset_id
asset_refs.append({
"asset_type": asset.get("asset_type"),
"asset_id": asset.get("asset_id"),
"name": name,
"asset_ref": asset_ref,
"vendor_asset_id": vendor_asset_id,
"view": asset.get("view", ""),
})
logger.info(f"Uploaded asset: {name}{asset_ref}")
except Exception as e:
logger.warning(f"Failed to upload asset {name}: {e}")
# Fallback: use direct image URL as reference
asset["asset_ref"] = image_url
asset_refs.append({
"asset_type": asset.get("asset_type"),
"asset_id": asset.get("asset_id"),
"name": name,
"asset_ref": image_url,
"vendor_asset_id": None,
"fallback": True,
})
if not asset_refs:
raise ValueError("所有素材上传失败")
# Build asset lookup for storyboard generation
asset_lookup = {}
for ref in asset_refs:
key = f"{ref['asset_type']}:{ref['asset_id']}"
asset_lookup[key] = ref["asset_ref"]
if ref.get("view"):
view_key = f"{ref['asset_type']}:{ref['asset_id']}:{ref['view']}"
asset_lookup[view_key] = ref["asset_ref"]
return {
"asset_refs": asset_refs,
"asset_lookup": asset_lookup,
"asset_count": len(asset_refs),
"generated_assets": generated_assets,
"character_design": None, # Pass through from upstream if needed
}
async def handle_storyboard_generating(tenant_id, task_id, step_name, input_data, config):
"""LLM generates storyboard using asset references (asset://素材号).
Scenes with characters use r2v mode (reference-to-video with asset references).
Scenes without characters use t2v mode.
"""
lyrics = None
asset_refs = None
asset_lookup = None
character_design = None
params = input_data.get("task_params", {})
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
if dep_output.get("calibrated_lyrics"):
lyrics = dep_output["calibrated_lyrics"]
elif dep_output.get("lyrics"):
lyrics = dep_output["lyrics"]
if dep_output.get("asset_refs"):
asset_refs = dep_output["asset_refs"]
if dep_output.get("asset_lookup"):
asset_lookup = dep_output["asset_lookup"]
if dep_output.get("character_design"):
character_design = dep_output["character_design"]
if not lyrics:
raise ValueError("上游步骤未提供歌词")
duration = params.get("duration", 240)
# Build asset summary for LLM context
asset_summary = ""
if asset_refs:
chars = [r for r in asset_refs if r.get("asset_type") == "character"]
props = [r for r in asset_refs if r.get("asset_type") == "prop"]
costumes = [r for r in asset_refs if r.get("asset_type") == "costume"]
scenes_bg = [r for r in asset_refs if r.get("asset_type") == "scene_bg"]
if chars:
asset_summary += "角色素材:\n"
for c in chars:
asset_summary += f" - {c['name']}: {c['asset_ref']}\n"
if props:
asset_summary += "道具素材:\n"
for p in props:
asset_summary += f" - {p['name']}: {p['asset_ref']}\n"
if costumes:
asset_summary += "服饰素材:\n"
for c in costumes:
asset_summary += f" - {c['name']}: {c['asset_ref']}\n"
if scenes_bg:
asset_summary += "场景素材:\n"
for s in scenes_bg:
asset_summary += f" - {s['name']}: {s['asset_ref']}\n"
prompt = f"""根据歌词和已有素材生成MV分镜脚本。
歌词:
{json.dumps(lyrics, ensure_ascii=False) if isinstance(lyrics, list) else lyrics}
视频总时长: {duration}
可用素材(用 asset://素材号 引用):
{asset_summary if asset_summary else "无特定素材使用纯T2V模式"}
请输出JSON数组每个分镜包含:
- scene_id: 分镜编号
- start_time: 开始秒数
- end_time: 结束秒数
- description: 场景中文描述
- video_prompt: 英文视频生成prompt详细描述画面内容、动作、镜头
- characters: 出现的角色名列表(如 ["char_01"]
- assets: 使用的素材引用列表(如 ["asset://xxx", "asset://yyy"]
- use_r2v: 是否使用r2v模式当场景包含角色时为true
- mood: 情绪/色调
- camera: 镜头运动描述
规则:
1. 包含角色的分镜必须设 use_r2v=true并在assets中引用对应角色的asset://
2. 无角色的分镜(纯风景/空镜)设 use_r2v=false
3. 确保分镜覆盖整首歌
4. 每个分镜5-15秒
5. video_prompt 必须纯英文,包含场景、光线、色调、镜头
只输出JSON数组。"""
try:
from pipeline_service.llm_bridge import llm_call
result = await llm_call(prompt)
result = result.strip()
if result.startswith("```"):
result = result.split("\n", 1)[1].rsplit("```", 1)[0]
storyboard = json.loads(result)
except Exception as e:
raise ValueError(f"分镜生成失败: {e}")
# Enrich storyboard with asset references
if asset_lookup:
for scene in storyboard:
if scene.get("use_r2v") and scene.get("characters"):
enriched_assets = []
for char_name in scene["characters"]:
# Look up character front view asset ref
for key, ref in asset_lookup.items():
if char_name in key and "front" in key:
enriched_assets.append(ref)
break
else:
# Fallback: any view
for key, ref in asset_lookup.items():
if char_name in key:
enriched_assets.append(ref)
break
if enriched_assets:
scene["assets"] = enriched_assets
work_dir = _task_dir(task_id)
sb_path = os.path.join(work_dir, "storyboard.json")
with open(sb_path, "w", encoding="utf-8") as f:
json.dump(storyboard, f, ensure_ascii=False, indent=2)
r2v_count = sum(1 for s in storyboard if s.get("use_r2v"))
t2v_count = len(storyboard) - r2v_count
return {
"storyboard": storyboard,
"storyboard_path": sb_path,
"scene_count": len(storyboard),
"r2v_scene_count": r2v_count,
"t2v_scene_count": t2v_count,
"asset_refs": asset_refs,
}
async def handle_scene_video_generating(tenant_id, task_id, step_name, input_data, config):
"""Generate scene videos via Seedance 2.0 (default) through llmage API.
- Scenes with use_r2v=true: Seedance 2.0 r2v mode with asset:// references
- Scenes without characters: Seedance 2.0 t2v mode
- All calls via https://token.opencomputing.cn/llmage/v1/video/generations
"""
work_dir = _task_dir(task_id)
scenes_dir = os.path.join(work_dir, "scenes")
os.makedirs(scenes_dir, exist_ok=True)
storyboard = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
if dep_output.get("storyboard"):
storyboard = dep_output["storyboard"]
if not storyboard:
raise ValueError("上游步骤未提供分镜脚本")
video_model = config.get("video_model", DEFAULT_VIDEO_MODEL)
resolution = config.get("resolution", "720p")
batch_size = config.get("batch_size", 5)
scene_videos = []
submitted_tasks = [] # [(scene_idx, task_id, scene_info)]
# Phase 1: Submit all video generation tasks in batches
for batch_start in range(0, len(storyboard), batch_size):
batch = storyboard[batch_start:batch_start + batch_size]
for i, scene in enumerate(batch):
scene_idx = batch_start + i
video_prompt = scene.get("video_prompt", scene.get("description", ""))
duration = scene.get("end_time", 10) - scene.get("start_time", 0)
# Map duration to Seedance supported range [4, 15]
duration = max(4, min(15, duration))
try:
if scene.get("use_r2v") and scene.get("assets"):
# R2V mode: use asset references for character consistency
task_id_video = await _llmage_video_submit(
model=video_model,
catelogid="r2v",
prompt=video_prompt,
duration=duration,
ratio="16:9",
resolution=resolution,
image_files=scene["assets"],
)
mode = "r2v"
else:
# T2V mode: pure text-to-video
task_id_video = await _llmage_video_submit(
model=video_model,
catelogid="t2v",
prompt=video_prompt,
duration=duration,
ratio="16:9",
resolution=resolution,
)
mode = "t2v"
submitted_tasks.append((scene_idx, task_id_video, {
"scene_id": scene.get("scene_id", scene_idx),
"description": scene.get("description", ""),
"video_prompt": video_prompt,
"duration": duration,
"mode": mode,
}))
logger.info(f"Submitted scene {scene_idx} ({mode}): task={task_id_video}")
except Exception as e:
logger.warning(f"Failed to submit scene {scene_idx}: {e}")
# Try fallback to fast model
try:
task_id_video = await _llmage_video_submit(
model=DEFAULT_VIDEO_MODEL_FAST,
catelogid="t2v",
prompt=video_prompt,
duration=duration,
ratio="16:9",
resolution="720p",
)
submitted_tasks.append((scene_idx, task_id_video, {
"scene_id": scene.get("scene_id", scene_idx),
"description": scene.get("description", ""),
"video_prompt": video_prompt,
"duration": duration,
"mode": "t2v_fallback",
}))
logger.info(f"Scene {scene_idx} fallback submitted: task={task_id_video}")
except Exception as e2:
logger.error(f"Scene {scene_idx} fallback also failed: {e2}")
# Wait between batches to avoid rate limits
if batch_start + batch_size < len(storyboard):
await asyncio.sleep(2)
# Phase 2: Poll all submitted tasks
logger.info(f"Polling {len(submitted_tasks)} video tasks...")
for scene_idx, vid_task_id, scene_info in submitted_tasks:
try:
video_url = await _llmage_video_poll(vid_task_id, timeout=600)
local_path = os.path.join(scenes_dir, f"scene_{scene_idx:03d}.mp4")
await _download_url(video_url, local_path)
scene_videos.append({
"scene_id": scene_info["scene_id"],
"video_path": local_path,
"video_url": video_url,
"description": scene_info["description"],
"video_prompt": scene_info["video_prompt"],
"duration": scene_info["duration"],
"mode": scene_info["mode"],
})
logger.info(f"Scene {scene_idx} completed: {local_path}")
except Exception as e:
logger.warning(f"Scene {scene_idx} failed during polling: {e}")
if not scene_videos:
raise ValueError("所有场景视频生成失败")
return {
"scene_videos": scene_videos,
"scene_count": len(scene_videos),
"total_submitted": len(submitted_tasks),
"video_model": video_model,
}
async def handle_scene_video_evaluating(tenant_id, task_id, step_name, input_data, config):
"""Evaluate scene video quality via VLM, retry if below threshold."""
threshold = config.get("threshold", 7.0)
max_retry = config.get("max_retry", 3)
scene_videos = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
scene_videos = dep_output.get("scene_videos")
if scene_videos:
break
if not scene_videos:
raise ValueError("上游步骤未提供场景视频")
# Evaluate each scene (simplified: check file exists and has reasonable size)
valid_scenes = []
for sv in scene_videos:
path = sv.get("video_path", "")
if os.path.exists(path) and os.path.getsize(path) > 10000:
sv["quality_score"] = 8.0 # Placeholder
valid_scenes.append(sv)
else:
sv["quality_score"] = 0
logger.warning(f"Scene {sv.get('scene_id')} missing or too small: {path}")
if not valid_scenes:
raise ValueError("所有场景视频质量不合格")
avg_score = sum(s.get("quality_score", 0) for s in valid_scenes) / len(valid_scenes)
if avg_score < threshold:
raise ValueError(f"平均质量分 {avg_score:.1f} 低于阈值 {threshold}")
return {"scene_videos": valid_scenes, "avg_quality": avg_score}
async def handle_scene_video_concatenating(tenant_id, task_id, step_name, input_data, config):
"""Concatenate scene videos with ffmpeg, loop to match audio duration."""
work_dir = _task_dir(task_id)
scene_videos = None
audio_duration = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
if dep_output.get("scene_videos"):
scene_videos = dep_output["scene_videos"]
if dep_output.get("duration"):
audio_duration = dep_output["duration"]
if not scene_videos:
raise ValueError("上游步骤未提供场景视频")
# Create concat file
concat_list = os.path.join(work_dir, "concat_list.txt")
with open(concat_list, "w") as f:
for sv in scene_videos:
path = sv.get("video_path", "")
if os.path.exists(path):
f.write(f"file '{path}'\n")
# Concatenate
concat_path = os.path.join(work_dir, "concat_video.mp4")
await _run_local(
f"ffmpeg -y -f concat -safe 0 -i '{concat_list}' -c copy '{concat_path}'"
)
# Loop to match audio duration if needed
final_path = os.path.join(work_dir, "final_video.mp4")
if audio_duration and audio_duration > 0:
# Get concat duration
stdout, _, _ = await _run_local(
f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 '{concat_path}'"
)
concat_dur = float(stdout.strip()) if stdout.strip() else 0
if concat_dur > 0 and concat_dur < audio_duration:
loops = int(audio_duration / concat_dur) + 1
await _run_local(
f"ffmpeg -y -stream_loop {loops} -i '{concat_path}' "
f"-t {audio_duration} -c:v libx264 -preset fast '{final_path}'"
)
else:
await _run_local(f"cp '{concat_path}' '{final_path}'")
else:
await _run_local(f"cp '{concat_path}' '{final_path}'")
return {"final_video_path": final_path}
# ─── Final Synthesis ─────────────────────────────────────────────────
async def handle_ktv_synthesizing(tenant_id, task_id, step_name, input_data, config):
"""Synthesize final KTV (dual-track) + MTV (single-track) videos."""
work_dir = _task_dir(task_id)
video_path = None
ass_path = None
vocals_path = None
no_vocals_path = None
has_original_video = False
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
if dep_output.get("final_video_path"):
video_path = dep_output["final_video_path"]
if dep_output.get("ass_path"):
ass_path = dep_output["ass_path"]
if dep_output.get("vocals_path"):
vocals_path = dep_output["vocals_path"]
if dep_output.get("no_vocals_path"):
no_vocals_path = dep_output["no_vocals_path"]
if dep_output.get("video_path") and not video_path:
# Mode B: use original video
video_path = dep_output["video_path"]
has_original_video = True
if not ass_path:
raise ValueError("缺少字幕文件")
# Determine audio tracks
if has_original_video and not vocals_path:
# Mode B: extract from video
vocals_path = os.path.join(work_dir, "vocals.wav")
no_vocals_path = os.path.join(work_dir, "no_vocals.wav")
if not os.path.exists(vocals_path):
raise ValueError("Demucs 步骤未提供人声轨道")
if not video_path:
raise ValueError("缺少视频源")
# KTV version: dual audio (vocals + no_vocals) with subtitle overlay
ktv_path = os.path.join(work_dir, "ktv_final.mp4")
mtv_path = os.path.join(work_dir, "mtv_final.mp4")
# KTV: video + vocals_track + no_vocals_track + subtitle burn
if vocals_path and no_vocals_path:
ktv_cmd = (
f"ffmpeg -y -i '{video_path}' -i '{vocals_path}' -i '{no_vocals_path}' "
f"-filter_complex \"[0:v]ass='{ass_path}'[v]\" "
f"-map '[v]' -map 1:a -map 2:a "
f"-c:v libx264 -preset fast -c:a aac -b:a 192k "
f"-metadata:s:a:0 title='Vocals' -metadata:s:a:1 title='Accompaniment' "
f"'{ktv_path}'"
)
else:
ktv_cmd = (
f"ffmpeg -y -i '{video_path}' "
f"-vf \"ass='{ass_path}'\" "
f"-c:v libx264 -preset fast -c:a aac -b:a 192k "
f"'{ktv_path}'"
)
stdout, stderr, rc = await _run_local(ktv_cmd, timeout=600)
if rc != 0:
raise ValueError(f"KTV合成失败: {stderr}")
# MTV: single audio (original/mix) with subtitle
mtv_cmd = (
f"ffmpeg -y -i '{video_path}' "
f"-vf \"ass='{ass_path}'\" "
f"-c:v libx264 -preset fast -c:a aac -b:a 192k "
f"'{mtv_path}'"
)
stdout, stderr, rc = await _run_local(mtv_cmd, timeout=600)
if rc != 0:
logger.warning(f"MTV合成失败仅输出KTV版本: {stderr}")
result = {
"ktv_path": ktv_path,
"subtitle_path": ass_path,
}
if os.path.exists(mtv_path):
result["mtv_path"] = mtv_path
return result
# ─── Registration ─────────────────────────────────────────────────────
KTV_HANDLERS = {
"audio_preparing": handle_audio_preparing,
"video_preparing": handle_video_preparing,
"demucs_separating": handle_demucs_separating,
"lyric_calibrating": handle_lyric_calibrating,
"subtitle_rendering": handle_subtitle_rendering,
"subtitle_exporting": handle_subtitle_exporting,
"lyric_generating": handle_lyric_generating,
"lyric_evaluating": handle_lyric_evaluating,
"music_generating": handle_music_generating,
"music_polling": handle_music_polling,
"character_designing": handle_character_designing,
"character_image_generating": handle_character_image_generating,
"asset_uploading": handle_asset_uploading,
"storyboard_generating": handle_storyboard_generating,
"scene_video_generating": handle_scene_video_generating,
"scene_video_evaluating": handle_scene_video_evaluating,
"scene_video_concatenating": handle_scene_video_concatenating,
"ktv_synthesizing": handle_ktv_synthesizing,
}