Hermes Agent 396a27f43a feat: KTV pipeline adapter — 17 step handlers, decoupled from engine
Adapter pattern: pipeline-service is the stable engine,
pipeline-ktv registers its step_types and handlers independently.

- 17 KTV step handlers (audio/video/demucs/lyrics/music/subtitle/video)
- Step type metadata with categories (media/llm)
- load_ktv_adapter() one-call integration
- Depends on pipeline_service>=3.1.0
2026-06-16 11:19:13 +08:00

936 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""KTV pipeline step handlers.
Implements the 17 step types for KTV production pipelines.
Each handler: async def handler(tenant_id, task_id, step_name, input_data, config) -> dict
Architecture:
- Heavy compute (demucs, video gen) runs on GPU server via SSH
- ffmpeg/audio processing runs locally
- LLM calls via harnessed_agent (llm_chat)
- ASR via SenseVoice
- External APIs: Suno/MiniMax for music, wan2.7 for images/video
"""
import asyncio
import json
import os
import logging
import tempfile
import time
logger = logging.getLogger("pipeline.handlers.ktv")
# GPU server config (from memory: ymq@opencomputing.net, 8x4090)
GPU_HOST = "ymq@opencomputing.net"
GPU_DEMUCS_VENV = "/data/ymq/demucs_venv"
GPU_WAN22_DIR = "/data/ymq/wan22-service"
GPU_PIPELINE_DIR = "/data/pipeline/ktv"
# Local work directory
LOCAL_WORK_DIR = "/data/pipeline/ktv"
def _task_dir(task_id: str) -> str:
"""Get working directory for a task."""
d = os.path.join(LOCAL_WORK_DIR, task_id)
os.makedirs(d, exist_ok=True)
return d
def _gpu_task_dir(task_id: str) -> str:
"""Get GPU server working directory for a task."""
return f"{GPU_PIPELINE_DIR}/{task_id}"
async def _run_local(cmd: str, timeout: int = 300) -> tuple:
"""Run a local command, return (stdout, stderr, returncode)."""
proc = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
return stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace"), proc.returncode
except asyncio.TimeoutError:
proc.kill()
return "", "timeout", -1
async def _run_gpu(cmd: str, timeout: int = 600) -> tuple:
"""Run a command on GPU server via SSH."""
ssh_cmd = f"ssh -o StrictHostKeyChecking=no {GPU_HOST} '{cmd}'"
return await _run_local(ssh_cmd, timeout=timeout)
async def _copy_to_gpu(local_path: str, remote_path: str):
"""SCP file to GPU server."""
await _run_local(f"scp -o StrictHostKeyChecking=no '{local_path}' {GPU_HOST}:{remote_path}")
async def _copy_from_gpu(remote_path: str, local_path: str):
"""SCP file from GPU server."""
await _run_local(f"scp -o StrictHostKeyChecking=no {GPU_HOST}:{remote_path} '{local_path}'")
# ─── Media Preparation ────────────────────────────────────────────────
async def handle_audio_preparing(tenant_id, task_id, step_name, input_data, config):
"""Download/copy audio file, extract duration with ffprobe."""
work_dir = _task_dir(task_id)
params = input_data.get("task_params", {})
audio_url = params.get("audio_url", params.get("audio_path", ""))
if not audio_url:
raise ValueError("缺少 audio_url 或 audio_path 参数")
# Download or copy audio
audio_path = os.path.join(work_dir, "original_audio.mp3")
if audio_url.startswith("http"):
stdout, stderr, rc = await _run_local(f"curl -sL -o '{audio_path}' '{audio_url}'")
if rc != 0:
raise ValueError(f"下载音频失败: {stderr}")
else:
await _run_local(f"cp '{audio_url}' '{audio_path}'")
# Extract duration
stdout, stderr, rc = await _run_local(
f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 '{audio_path}'"
)
duration = float(stdout.strip()) if rc == 0 else 0
return {
"audio_path": audio_path,
"duration": duration,
"format": os.path.splitext(audio_path)[1].lstrip("."),
}
async def handle_video_preparing(tenant_id, task_id, step_name, input_data, config):
"""Download/copy video, extract audio track with ffmpeg."""
work_dir = _task_dir(task_id)
params = input_data.get("task_params", {})
video_url = params.get("video_url", params.get("video_path", ""))
if not video_url:
raise ValueError("缺少 video_url 或 video_path 参数")
video_path = os.path.join(work_dir, "original_video.mp4")
audio_path = os.path.join(work_dir, "original_audio.mp3")
if video_url.startswith("http"):
await _run_local(f"curl -sL -o '{video_path}' '{video_url}'")
else:
await _run_local(f"cp '{video_url}' '{video_path}'")
# Extract audio
await _run_local(
f"ffmpeg -y -i '{video_path}' -vn -acodec libmp3lame -q:a 2 '{audio_path}'"
)
# Get duration
stdout, _, rc = await _run_local(
f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 '{video_path}'"
)
duration = float(stdout.strip()) if rc == 0 else 0
return {
"video_path": video_path,
"audio_path": audio_path,
"duration": duration,
}
# ─── Demucs Separation ───────────────────────────────────────────────
async def handle_demucs_separating(tenant_id, task_id, step_name, input_data, config):
"""Run Demucs on GPU server to separate vocals and accompaniment."""
work_dir = _task_dir(task_id)
gpu_dir = _gpu_task_dir(task_id)
# Find audio path from deps
audio_path = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
audio_path = dep_output.get("audio_path")
if audio_path:
break
if not audio_path:
raise ValueError("上游步骤未提供 audio_path")
# Prepare GPU directory
await _run_gpu(f"mkdir -p {gpu_dir}")
# Copy audio to GPU
remote_audio = f"{gpu_dir}/audio.mp3"
await _copy_to_gpu(audio_path, remote_audio)
# Run Demucs on GPU
demucs_cmd = (
f"cd {gpu_dir} && "
f"source {GPU_DEMUCS_VENV}/bin/activate && "
f"python -m demucs --two-stems vocals -n htdemucs --mp3 '{remote_audio}' && "
f"deactivate"
)
stdout, stderr, rc = await _run_gpu(demucs_cmd, timeout=600)
if rc != 0:
raise ValueError(f"Demucs 分离失败: {stderr}")
# Copy results back
vocals_local = os.path.join(work_dir, "vocals.wav")
no_vocals_local = os.path.join(work_dir, "no_vocals.wav")
base = os.path.splitext(os.path.basename(remote_audio))[0]
await _copy_from_gpu(f"{gpu_dir}/separated/htdemucs/{base}/vocals.wav", vocals_local)
await _copy_from_gpu(f"{gpu_dir}/separated/htdemuds/{base}/no_vocals.wav", no_vocals_local)
return {
"vocals_path": vocals_local,
"no_vocals_path": no_vocals_local,
}
# ─── Lyric Calibration ───────────────────────────────────────────────
async def handle_lyric_calibrating(tenant_id, task_id, step_name, input_data, config):
"""ASR timing recognition + LLM calibration against original lyrics."""
work_dir = _task_dir(task_id)
params = input_data.get("task_params", {})
lyrics_text = params.get("lyrics", params.get("lyrics_text", ""))
# Find vocals path from deps
vocals_path = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
vocals_path = dep_output.get("vocals_path")
if vocals_path:
break
if not vocals_path:
raise ValueError("上游步骤未提供 vocals_path")
if not lyrics_text:
raise ValueError("缺少 lyrics 参数")
# Step 1: Run SenseVoice ASR on vocals to get timing
asr_result_path = os.path.join(work_dir, "asr_timings.json")
# Copy vocals to GPU for ASR
gpu_dir = _gpu_task_dir(task_id)
await _run_gpu(f"mkdir -p {gpu_dir}")
remote_vocals = f"{gpu_dir}/vocals.wav"
await _copy_to_gpu(vocals_path, remote_vocals)
# Run SenseVoice ASR
asr_script = f"""
cd {gpu_dir}
source {GPU_DEMUCS_VENV}/bin/activate
python -c "
import json
from funasr import AutoModel
model = AutoModel(model='iic/SenseVoiceSmall', trust_remote_code=True)
res = model.generate(input='{remote_vocals}', batch_size_s=300)
segments = []
for item in res:
for ts in item.get('timestamp', []):
segments.append({{'text': item.get('text', ''), 'start': ts[0]/1000, 'end': ts[1]/1000}})
with open('asr_timings.json', 'w') as f:
json.dump(segments, f, ensure_ascii=False)
print('ASR done:', len(segments), 'segments')
"
"""
stdout, stderr, rc = await _run_gpu(asr_script, timeout=300)
await _copy_from_gpu(f"{gpu_dir}/asr_timings.json", asr_result_path)
with open(asr_result_path, "r") as f:
asr_timings = json.load(f)
# Step 2: LLM calibration — align ASR timings with original lyrics
from pipeline_service.handler import get_handler
calibrated = await _llm_calibrate(lyrics_text, asr_timings)
# Save calibrated lyrics
calibrated_path = os.path.join(work_dir, "calibrated_lyrics.json")
with open(calibrated_path, "w", encoding="utf-8") as f:
json.dump(calibrated, f, ensure_ascii=False, indent=2)
return {
"calibrated_lyrics_path": calibrated_path,
"calibrated_lyrics": calibrated,
"segment_count": len(calibrated),
}
async def _llm_calibrate(lyrics_text: str, asr_timings: list) -> list:
"""Use LLM to align raw lyrics text with ASR timings."""
prompt = f"""你是一个歌词时间轴校准专家。
原始歌词文本:
{lyrics_text}
ASR识别的时间戳:
{json.dumps(asr_timings, ensure_ascii=False)}
请将原始歌词的每一句与ASR时间戳对齐输出JSON数组每个元素包含:
- text: 歌词文本
- start: 开始时间(秒,浮点数)
- end: 结束时间(秒,浮点数)
要求:
1. 保持原始歌词的文本和顺序
2. 时间戳以ASR结果为基础进行微调
3. 确保时间不重叠,每句之间留适当间隔
4. 只输出JSON不要其他内容"""
try:
from pipeline_service.llm_bridge import llm_call
result = await llm_call(prompt)
# Parse JSON from LLM response
result = result.strip()
if result.startswith("```"):
result = result.split("\n", 1)[1].rsplit("```", 1)[0]
return json.loads(result)
except Exception as e:
logger.warning(f"LLM calibration failed, using ASR timings directly: {e}")
return asr_timings
# ─── Subtitle Rendering ──────────────────────────────────────────────
async def handle_subtitle_rendering(tenant_id, task_id, step_name, input_data, config):
"""Generate ASS karaoke subtitle file from calibrated lyrics."""
work_dir = _task_dir(task_id)
# Find calibrated lyrics
calibrated = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
calibrated = dep_output.get("calibrated_lyrics")
if not calibrated and dep_output.get("calibrated_lyrics_path"):
with open(dep_output["calibrated_lyrics_path"], "r") as f:
calibrated = json.load(f)
if calibrated:
break
if not calibrated:
raise ValueError("上游步骤未提供 calibrated_lyrics")
# Generate ASS file
ass_path = os.path.join(work_dir, "karaoke.ass")
_write_ass_file(ass_path, calibrated)
return {"ass_path": ass_path}
def _write_ass_file(path: str, segments: list):
"""Write segments to ASS subtitle file with karaoke effect."""
header = """[Script Info]
Title: KTV Karaoke Subtitles
ScriptType: v4.00+
PlayResX: 1920
PlayResY: 1080
WrapStyle: 0
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: KTV,Source Han Sans SC,72,&H00FFFFFF,&H0000FFFF,&H00000000,&H80000000,-1,0,0,0,100,100,1,0,1,3,1,2,40,40,60,1
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
with open(path, "w", encoding="utf-8") as f:
f.write(header)
for seg in segments:
start = _seconds_to_ass_time(seg["start"])
end = _seconds_to_ass_time(seg["end"])
text = seg["text"].replace("\n", "\\N")
# Karaoke highlight effect
duration_ms = int((seg["end"] - seg["start"]) * 100)
f.write(f"Dialogue: 0,{start},{end},KTV,,0,0,0,,{{\\k{duration_ms}}}{text}\n")
def _seconds_to_ass_time(seconds: float) -> str:
"""Convert seconds to ASS time format H:MM:SS.CC"""
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
cs = int((seconds % 1) * 100)
return f"{h}:{m:02d}:{s:02d}.{cs:02d}"
# ─── Subtitle Exporting ──────────────────────────────────────────────
async def handle_subtitle_exporting(tenant_id, task_id, step_name, input_data, config):
"""Export ASS subtitle as standalone file (already done by rendering)."""
ass_path = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
ass_path = dep_output.get("ass_path")
if ass_path:
break
if not ass_path or not os.path.exists(ass_path):
raise ValueError("上游步骤未提供有效的 ass_path")
return {"subtitle_path": ass_path, "format": "ass"}
# ─── Lyric Generation & Evaluation (Mode C) ──────────────────────────
async def handle_lyric_generating(tenant_id, task_id, step_name, input_data, config):
"""LLM generates lyrics from topic/outline."""
params = input_data.get("task_params", {})
topic = params.get("topic", params.get("outline", ""))
style = params.get("style", "流行")
language = params.get("language", "zh")
if not topic:
raise ValueError("缺少 topic/outline参数")
prompt = f"""请创作一首{style}风格的{language}歌词。
主题/大纲: {topic}
要求:
1. 包含完整结构: 主歌(Verse)、副歌(Chorus)、桥段(Bridge)
2. 每句歌词节奏感强,适合演唱
3. 押韵自然,情感真实
4. 总长度适合3-5分钟歌曲
直接输出歌词文本,标注段落结构。"""
try:
from pipeline_service.llm_bridge import llm_call
lyrics = await llm_call(prompt)
return {"lyrics": lyrics.strip(), "topic": topic, "style": style}
except Exception as e:
raise ValueError(f"歌词生成失败: {e}")
async def handle_lyric_evaluating(tenant_id, task_id, step_name, input_data, config):
"""Evaluate lyric quality, retry if below threshold."""
threshold = config.get("threshold", 8.5)
lyrics = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
lyrics = dep_output.get("lyrics")
if lyrics:
break
if not lyrics:
raise ValueError("上游步骤未提供歌词")
# Evaluate via LLM
prompt = f"""请从以下维度评估这首歌词的质量(1-10分):
1. 韵律节奏: 押韵、节奏感、可唱性
2. 情感表达: 情感真实度、共鸣力
3. 文学性: 用词、意象、修辞
4. 结构完整: 段落编排、层次感
5. 商业潜力: 流行度、记忆点
歌词:
{lyrics}
输出JSON: {{"score": 8.5, "dimensions": {{...}}, "suggestions": "..."}}
只输出JSON。"""
try:
from pipeline_service.llm_bridge import llm_call
result = await llm_call(prompt)
result = result.strip()
if result.startswith("```"):
result = result.split("\n", 1)[1].rsplit("```", 1)[0]
evaluation = json.loads(result)
score = evaluation.get("score", 0)
except Exception:
score = 7.0 # Default pass if evaluation fails
evaluation = {"score": score, "note": "evaluation_parse_failed"}
if score < threshold:
raise ValueError(f"歌词评分 {score} 低于阈值 {threshold},需要重新生成")
return {
"lyrics": lyrics,
"evaluation": evaluation,
"score": score,
"passed": True,
}
# ─── Music Generation ────────────────────────────────────────────────
async def handle_music_generating(tenant_id, task_id, step_name, input_data, config):
"""Submit music generation job to Suno/MiniMax API."""
lyrics = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
lyrics = dep_output.get("lyrics")
if lyrics:
break
if not lyrics:
raise ValueError("上游步骤未提供歌词")
params = input_data.get("task_params", {})
music_service = params.get("music_service", "suno")
style = params.get("music_style", "pop")
# Submit to music generation API
# TODO: Implement actual API call to Suno/MiniMax
# For now, return a job_id placeholder
job_id = f"music_{task_id}_{int(time.time())}"
return {
"music_job_id": job_id,
"music_service": music_service,
"style": style,
"lyrics": lyrics,
"status": "submitted",
}
async def handle_music_polling(tenant_id, task_id, step_name, input_data, config):
"""Poll music generation API until complete."""
job_info = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
job_info = dep_output
if job_info and job_info.get("music_job_id"):
break
if not job_info:
raise ValueError("上游步骤未提供 music_job_id")
# TODO: Implement actual polling logic
# For now, simulate a wait and return
work_dir = _task_dir(task_id)
music_path = os.path.join(work_dir, "generated_music.mp3")
# Placeholder: the actual implementation would poll the API
# and download the result
return {
"music_path": music_path,
"music_job_id": job_info.get("music_job_id"),
"status": "completed",
}
# ─── Character & Video Generation ────────────────────────────────────
async def handle_character_designing(tenant_id, task_id, step_name, input_data, config):
"""LLM designs MV character descriptions."""
lyrics = None
params = input_data.get("task_params", {})
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
lyrics = dep_output.get("lyrics") or dep_output.get("calibrated_lyrics")
if isinstance(lyrics, list):
lyrics = " ".join(s.get("text", "") for s in lyrics)
if lyrics:
break
style = params.get("visual_style", "anime")
prompt = f"""根据以下歌词设计MV角色方案。
歌词:
{lyrics}
视觉风格: {style}
请设计1-3个角色每个角色包含:
1. 角色名称
2. 外貌描述用于AI图像生成的详细prompt
3. 性格特征
4. 在MV中的角色定位
输出JSON数组。"""
try:
from pipeline_service.llm_bridge import llm_call
result = await llm_call(prompt)
result = result.strip()
if result.startswith("```"):
result = result.split("\n", 1)[1].rsplit("```", 1)[0]
characters = json.loads(result)
except Exception as e:
raise ValueError(f"角色设计失败: {e}")
return {"characters": characters, "visual_style": style}
async def handle_character_image_generating(tenant_id, task_id, step_name, input_data, config):
"""Generate character reference images using wan2.7 on GPU server."""
work_dir = _task_dir(task_id)
gpu_dir = _gpu_task_dir(task_id)
characters = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
characters = dep_output.get("characters")
if characters:
break
if not characters:
raise ValueError("上游步骤未提供角色设计")
await _run_gpu(f"mkdir -p {gpu_dir}/characters")
char_images = []
for i, char in enumerate(characters):
prompt = char.get("prompt", char.get("description", ""))
if not prompt:
continue
# Generate image on GPU with wan2.7
gen_cmd = (
f"cd {GPU_WAN22_DIR} && "
f"source venv/bin/activate && "
f"python generate.py --prompt '{prompt}' "
f"--output {gpu_dir}/characters/char_{i}.png "
f"--width 512 --height 512"
)
stdout, stderr, rc = await _run_gpu(gen_cmd, timeout=120)
local_path = os.path.join(work_dir, f"char_{i}.png")
await _copy_from_gpu(f"{gpu_dir}/characters/char_{i}.png", local_path)
char_images.append({
"name": char.get("name", f"char_{i}"),
"image_path": local_path,
"prompt": prompt,
})
return {"character_images": char_images}
async def handle_storyboard_generating(tenant_id, task_id, step_name, input_data, config):
"""LLM generates storyboard script from lyrics + characters."""
lyrics = None
char_images = None
params = input_data.get("task_params", {})
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
if dep_output.get("calibrated_lyrics"):
lyrics = dep_output["calibrated_lyrics"]
elif dep_output.get("lyrics"):
lyrics = dep_output["lyrics"]
if dep_output.get("character_images"):
char_images = dep_output["character_images"]
if not lyrics:
raise ValueError("上游步骤未提供歌词")
duration = params.get("duration", 240) # Default 4 min
prompt = f"""根据歌词和角色生成MV分镜脚本。
歌词:
{json.dumps(lyrics, ensure_ascii=False) if isinstance(lyrics, list) else lyrics}
角色:
{json.dumps(char_images, ensure_ascii=False) if char_images else "无特定角色"}
视频总时长: {duration}
请输出JSON数组每个分镜包含:
- scene_id: 分镜编号
- start_time: 开始秒数
- end_time: 结束秒数
- description: 场景描述英文用于视频生成prompt
- characters: 出现的角色
- camera: 镜头运动描述
- mood: 情绪/色调
确保分镜覆盖整首歌每个分镜5-15秒。"""
try:
from pipeline_service.llm_bridge import llm_call
result = await llm_call(prompt)
result = result.strip()
if result.startswith("```"):
result = result.split("\n", 1)[1].rsplit("```", 1)[0]
storyboard = json.loads(result)
except Exception as e:
raise ValueError(f"分镜生成失败: {e}")
work_dir = _task_dir(task_id)
sb_path = os.path.join(work_dir, "storyboard.json")
with open(sb_path, "w", encoding="utf-8") as f:
json.dump(storyboard, f, ensure_ascii=False, indent=2)
return {"storyboard": storyboard, "storyboard_path": sb_path, "scene_count": len(storyboard)}
async def handle_scene_video_generating(tenant_id, task_id, step_name, input_data, config):
"""Generate scene videos on GPU using T2V/Ref2V."""
work_dir = _task_dir(task_id)
gpu_dir = _gpu_task_dir(task_id)
storyboard = None
char_images = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
if dep_output.get("storyboard"):
storyboard = dep_output["storyboard"]
if dep_output.get("character_images"):
char_images = dep_output["character_images"]
if not storyboard:
raise ValueError("上游步骤未提供分镜脚本")
await _run_gpu(f"mkdir -p {gpu_dir}/scenes")
# Copy character images to GPU if available
if char_images:
for ci in char_images:
local = ci.get("image_path", "")
if local and os.path.exists(local):
remote = f"{gpu_dir}/characters/{os.path.basename(local)}"
await _copy_to_gpu(local, remote)
scene_videos = []
for i, scene in enumerate(storyboard):
desc = scene.get("description", "")
duration = scene.get("end_time", 10) - scene.get("start_time", 5)
frames = int(duration * 24) # 24fps
# Determine if we use Ref2V (with character ref) or T2V
ref_image = None
if char_images and scene.get("characters"):
# Find matching character image
for ci in char_images:
if ci.get("name") in str(scene.get("characters", [])):
ref_image = f"{gpu_dir}/characters/{os.path.basename(ci['image_path'])}"
break
if ref_image:
gen_cmd = (
f"cd {GPU_WAN22_DIR} && source venv/bin/activate && "
f"python generate_ref2v.py --prompt '{desc}' "
f"--ref_image '{ref_image}' "
f"--output {gpu_dir}/scenes/scene_{i:03d}.mp4 "
f"--frames {frames}"
)
else:
gen_cmd = (
f"cd {GPU_WAN22_DIR} && source venv/bin/activate && "
f"python generate_t2v.py --prompt '{desc}' "
f"--output {gpu_dir}/scenes/scene_{i:03d}.mp4 "
f"--frames {frames}"
)
stdout, stderr, rc = await _run_gpu(gen_cmd, timeout=600)
local_scene = os.path.join(work_dir, f"scene_{i:03d}.mp4")
await _copy_from_gpu(f"{gpu_dir}/scenes/scene_{i:03d}.mp4", local_scene)
scene_videos.append({
"scene_id": scene.get("scene_id", i),
"video_path": local_scene,
"description": desc,
"duration": duration,
})
return {"scene_videos": scene_videos, "scene_count": len(scene_videos)}
async def handle_scene_video_evaluating(tenant_id, task_id, step_name, input_data, config):
"""Evaluate scene video quality via VLM, retry if below threshold."""
threshold = config.get("threshold", 7.0)
max_retry = config.get("max_retry", 3)
scene_videos = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
scene_videos = dep_output.get("scene_videos")
if scene_videos:
break
if not scene_videos:
raise ValueError("上游步骤未提供场景视频")
# Evaluate each scene (simplified: check file exists and has reasonable size)
valid_scenes = []
for sv in scene_videos:
path = sv.get("video_path", "")
if os.path.exists(path) and os.path.getsize(path) > 10000:
sv["quality_score"] = 8.0 # Placeholder
valid_scenes.append(sv)
else:
sv["quality_score"] = 0
logger.warning(f"Scene {sv.get('scene_id')} missing or too small: {path}")
if not valid_scenes:
raise ValueError("所有场景视频质量不合格")
avg_score = sum(s.get("quality_score", 0) for s in valid_scenes) / len(valid_scenes)
if avg_score < threshold:
raise ValueError(f"平均质量分 {avg_score:.1f} 低于阈值 {threshold}")
return {"scene_videos": valid_scenes, "avg_quality": avg_score}
async def handle_scene_video_concatenating(tenant_id, task_id, step_name, input_data, config):
"""Concatenate scene videos with ffmpeg, loop to match audio duration."""
work_dir = _task_dir(task_id)
scene_videos = None
audio_duration = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
if dep_output.get("scene_videos"):
scene_videos = dep_output["scene_videos"]
if dep_output.get("duration"):
audio_duration = dep_output["duration"]
if not scene_videos:
raise ValueError("上游步骤未提供场景视频")
# Create concat file
concat_list = os.path.join(work_dir, "concat_list.txt")
with open(concat_list, "w") as f:
for sv in scene_videos:
path = sv.get("video_path", "")
if os.path.exists(path):
f.write(f"file '{path}'\n")
# Concatenate
concat_path = os.path.join(work_dir, "concat_video.mp4")
await _run_local(
f"ffmpeg -y -f concat -safe 0 -i '{concat_list}' -c copy '{concat_path}'"
)
# Loop to match audio duration if needed
final_path = os.path.join(work_dir, "final_video.mp4")
if audio_duration and audio_duration > 0:
# Get concat duration
stdout, _, _ = await _run_local(
f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 '{concat_path}'"
)
concat_dur = float(stdout.strip()) if stdout.strip() else 0
if concat_dur > 0 and concat_dur < audio_duration:
loops = int(audio_duration / concat_dur) + 1
await _run_local(
f"ffmpeg -y -stream_loop {loops} -i '{concat_path}' "
f"-t {audio_duration} -c:v libx264 -preset fast '{final_path}'"
)
else:
await _run_local(f"cp '{concat_path}' '{final_path}'")
else:
await _run_local(f"cp '{concat_path}' '{final_path}'")
return {"final_video_path": final_path}
# ─── Final Synthesis ─────────────────────────────────────────────────
async def handle_ktv_synthesizing(tenant_id, task_id, step_name, input_data, config):
"""Synthesize final KTV (dual-track) + MTV (single-track) videos."""
work_dir = _task_dir(task_id)
video_path = None
ass_path = None
vocals_path = None
no_vocals_path = None
has_original_video = False
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
if dep_output.get("final_video_path"):
video_path = dep_output["final_video_path"]
if dep_output.get("ass_path"):
ass_path = dep_output["ass_path"]
if dep_output.get("vocals_path"):
vocals_path = dep_output["vocals_path"]
if dep_output.get("no_vocals_path"):
no_vocals_path = dep_output["no_vocals_path"]
if dep_output.get("video_path") and not video_path:
# Mode B: use original video
video_path = dep_output["video_path"]
has_original_video = True
if not ass_path:
raise ValueError("缺少字幕文件")
# Determine audio tracks
if has_original_video and not vocals_path:
# Mode B: extract from video
vocals_path = os.path.join(work_dir, "vocals.wav")
no_vocals_path = os.path.join(work_dir, "no_vocals.wav")
if not os.path.exists(vocals_path):
raise ValueError("Demucs 步骤未提供人声轨道")
if not video_path:
raise ValueError("缺少视频源")
# KTV version: dual audio (vocals + no_vocals) with subtitle overlay
ktv_path = os.path.join(work_dir, "ktv_final.mp4")
mtv_path = os.path.join(work_dir, "mtv_final.mp4")
# KTV: video + vocals_track + no_vocals_track + subtitle burn
if vocals_path and no_vocals_path:
ktv_cmd = (
f"ffmpeg -y -i '{video_path}' -i '{vocals_path}' -i '{no_vocals_path}' "
f"-filter_complex \"[0:v]ass='{ass_path}'[v]\" "
f"-map '[v]' -map 1:a -map 2:a "
f"-c:v libx264 -preset fast -c:a aac -b:a 192k "
f"-metadata:s:a:0 title='Vocals' -metadata:s:a:1 title='Accompaniment' "
f"'{ktv_path}'"
)
else:
ktv_cmd = (
f"ffmpeg -y -i '{video_path}' "
f"-vf \"ass='{ass_path}'\" "
f"-c:v libx264 -preset fast -c:a aac -b:a 192k "
f"'{ktv_path}'"
)
stdout, stderr, rc = await _run_local(ktv_cmd, timeout=600)
if rc != 0:
raise ValueError(f"KTV合成失败: {stderr}")
# MTV: single audio (original/mix) with subtitle
mtv_cmd = (
f"ffmpeg -y -i '{video_path}' "
f"-vf \"ass='{ass_path}'\" "
f"-c:v libx264 -preset fast -c:a aac -b:a 192k "
f"'{mtv_path}'"
)
stdout, stderr, rc = await _run_local(mtv_cmd, timeout=600)
if rc != 0:
logger.warning(f"MTV合成失败仅输出KTV版本: {stderr}")
result = {
"ktv_path": ktv_path,
"subtitle_path": ass_path,
}
if os.path.exists(mtv_path):
result["mtv_path"] = mtv_path
return result
# ─── Registration ─────────────────────────────────────────────────────
KTV_HANDLERS = {
"audio_preparing": handle_audio_preparing,
"video_preparing": handle_video_preparing,
"demucs_separating": handle_demucs_separating,
"lyric_calibrating": handle_lyric_calibrating,
"subtitle_rendering": handle_subtitle_rendering,
"subtitle_exporting": handle_subtitle_exporting,
"lyric_generating": handle_lyric_generating,
"lyric_evaluating": handle_lyric_evaluating,
"music_generating": handle_music_generating,
"music_polling": handle_music_polling,
"character_designing": handle_character_designing,
"character_image_generating": handle_character_image_generating,
"storyboard_generating": handle_storyboard_generating,
"scene_video_generating": handle_scene_video_generating,
"scene_video_evaluating": handle_scene_video_evaluating,
"scene_video_concatenating": handle_scene_video_concatenating,
"ktv_synthesizing": handle_ktv_synthesizing,
}