feat: add KTV pipeline handlers (17 step types, 3 production modes)

- handlers_ktv.py: 17 async step handlers for KTV production
  - Audio/Video preparation (ffmpeg)
  - Demucs vocal separation (GPU server SSH)
  - Lyric calibration (SenseVoice ASR + LLM)
  - Subtitle rendering (ASS karaoke format)
  - Lyric generation & evaluation (Mode C)
  - Music generation (Suno/MiniMax API)
  - Character design & image generation (wan2.7)
  - Storyboard generation (LLM)
  - Scene video generation (T2V/Ref2V)
  - Scene video evaluation (quality threshold)
  - Scene video concatenation (ffmpeg loop)
  - KTV synthesis (dual-track + MTV)

- llm_bridge.py: async LLM call bridge (harnessed_agent / OpenAI API)
- storage.py: extract deps from step_config JSON
- init.py: auto-register KTV handlers on load
This commit is contained in:
yumoqing 2026-06-11 20:36:05 +08:00
parent 19cd63e719
commit 113eb7e040
5 changed files with 1033 additions and 3 deletions

View File

@ -12,7 +12,8 @@ from .init import (
pipeline_cancel,
pipeline_handlers,
)
from .handler import register_handler, list_handlers, get_handler
from .handler import register_handler, list_handlers, register_default_handler
from .handlers_ktv import register_ktv_handlers
from .state import (
STATE_PENDING, STATE_RUNNING, STATE_COMPLETED, STATE_FAILED, STATE_SKIPPED,
TASK_SUBMITTED, TASK_RUNNING, TASK_COMPLETED, TASK_FAILED, TASK_PAUSED, TASK_CANCELLED,

View File

@ -0,0 +1,943 @@
"""KTV pipeline step handlers.
Implements the 17 step types for KTV production pipelines.
Each handler: async def handler(tenant_id, task_id, step_name, input_data, config) -> dict
Architecture:
- Heavy compute (demucs, video gen) runs on GPU server via SSH
- ffmpeg/audio processing runs locally
- LLM calls via harnessed_agent (llm_chat)
- ASR via SenseVoice
- External APIs: Suno/MiniMax for music, wan2.7 for images/video
"""
import asyncio
import json
import os
import logging
import tempfile
import time
logger = logging.getLogger("pipeline.handlers.ktv")
# GPU server config (from memory: ymq@opencomputing.net, 8x4090)
GPU_HOST = "ymq@opencomputing.net"
GPU_DEMUCS_VENV = "/data/ymq/demucs_venv"
GPU_WAN22_DIR = "/data/ymq/wan22-service"
GPU_PIPELINE_DIR = "/data/pipeline/ktv"
# Local work directory
LOCAL_WORK_DIR = "/data/pipeline/ktv"
def _task_dir(task_id: str) -> str:
"""Get working directory for a task."""
d = os.path.join(LOCAL_WORK_DIR, task_id)
os.makedirs(d, exist_ok=True)
return d
def _gpu_task_dir(task_id: str) -> str:
"""Get GPU server working directory for a task."""
return f"{GPU_PIPELINE_DIR}/{task_id}"
async def _run_local(cmd: str, timeout: int = 300) -> tuple:
"""Run a local command, return (stdout, stderr, returncode)."""
proc = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
return stdout.decode("utf-8", errors="replace"), stderr.decode("utf-8", errors="replace"), proc.returncode
except asyncio.TimeoutError:
proc.kill()
return "", "timeout", -1
async def _run_gpu(cmd: str, timeout: int = 600) -> tuple:
"""Run a command on GPU server via SSH."""
ssh_cmd = f"ssh -o StrictHostKeyChecking=no {GPU_HOST} '{cmd}'"
return await _run_local(ssh_cmd, timeout=timeout)
async def _copy_to_gpu(local_path: str, remote_path: str):
"""SCP file to GPU server."""
await _run_local(f"scp -o StrictHostKeyChecking=no '{local_path}' {GPU_HOST}:{remote_path}")
async def _copy_from_gpu(remote_path: str, local_path: str):
"""SCP file from GPU server."""
await _run_local(f"scp -o StrictHostKeyChecking=no {GPU_HOST}:{remote_path} '{local_path}'")
# ─── Media Preparation ────────────────────────────────────────────────
async def handle_audio_preparing(tenant_id, task_id, step_name, input_data, config):
"""Download/copy audio file, extract duration with ffprobe."""
work_dir = _task_dir(task_id)
params = input_data.get("task_params", {})
audio_url = params.get("audio_url", params.get("audio_path", ""))
if not audio_url:
raise ValueError("缺少 audio_url 或 audio_path 参数")
# Download or copy audio
audio_path = os.path.join(work_dir, "original_audio.mp3")
if audio_url.startswith("http"):
stdout, stderr, rc = await _run_local(f"curl -sL -o '{audio_path}' '{audio_url}'")
if rc != 0:
raise ValueError(f"下载音频失败: {stderr}")
else:
await _run_local(f"cp '{audio_url}' '{audio_path}'")
# Extract duration
stdout, stderr, rc = await _run_local(
f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 '{audio_path}'"
)
duration = float(stdout.strip()) if rc == 0 else 0
return {
"audio_path": audio_path,
"duration": duration,
"format": os.path.splitext(audio_path)[1].lstrip("."),
}
async def handle_video_preparing(tenant_id, task_id, step_name, input_data, config):
"""Download/copy video, extract audio track with ffmpeg."""
work_dir = _task_dir(task_id)
params = input_data.get("task_params", {})
video_url = params.get("video_url", params.get("video_path", ""))
if not video_url:
raise ValueError("缺少 video_url 或 video_path 参数")
video_path = os.path.join(work_dir, "original_video.mp4")
audio_path = os.path.join(work_dir, "original_audio.mp3")
if video_url.startswith("http"):
await _run_local(f"curl -sL -o '{video_path}' '{video_url}'")
else:
await _run_local(f"cp '{video_url}' '{video_path}'")
# Extract audio
await _run_local(
f"ffmpeg -y -i '{video_path}' -vn -acodec libmp3lame -q:a 2 '{audio_path}'"
)
# Get duration
stdout, _, rc = await _run_local(
f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 '{video_path}'"
)
duration = float(stdout.strip()) if rc == 0 else 0
return {
"video_path": video_path,
"audio_path": audio_path,
"duration": duration,
}
# ─── Demucs Separation ───────────────────────────────────────────────
async def handle_demucs_separating(tenant_id, task_id, step_name, input_data, config):
"""Run Demucs on GPU server to separate vocals and accompaniment."""
work_dir = _task_dir(task_id)
gpu_dir = _gpu_task_dir(task_id)
# Find audio path from deps
audio_path = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
audio_path = dep_output.get("audio_path")
if audio_path:
break
if not audio_path:
raise ValueError("上游步骤未提供 audio_path")
# Prepare GPU directory
await _run_gpu(f"mkdir -p {gpu_dir}")
# Copy audio to GPU
remote_audio = f"{gpu_dir}/audio.mp3"
await _copy_to_gpu(audio_path, remote_audio)
# Run Demucs on GPU
demucs_cmd = (
f"cd {gpu_dir} && "
f"source {GPU_DEMUCS_VENV}/bin/activate && "
f"python -m demucs --two-stems vocals -n htdemucs --mp3 '{remote_audio}' && "
f"deactivate"
)
stdout, stderr, rc = await _run_gpu(demucs_cmd, timeout=600)
if rc != 0:
raise ValueError(f"Demucs 分离失败: {stderr}")
# Copy results back
vocals_local = os.path.join(work_dir, "vocals.wav")
no_vocals_local = os.path.join(work_dir, "no_vocals.wav")
base = os.path.splitext(os.path.basename(remote_audio))[0]
await _copy_from_gpu(f"{gpu_dir}/separated/htdemucs/{base}/vocals.wav", vocals_local)
await _copy_from_gpu(f"{gpu_dir}/separated/htdemuds/{base}/no_vocals.wav", no_vocals_local)
return {
"vocals_path": vocals_local,
"no_vocals_path": no_vocals_local,
}
# ─── Lyric Calibration ───────────────────────────────────────────────
async def handle_lyric_calibrating(tenant_id, task_id, step_name, input_data, config):
"""ASR timing recognition + LLM calibration against original lyrics."""
work_dir = _task_dir(task_id)
params = input_data.get("task_params", {})
lyrics_text = params.get("lyrics", params.get("lyrics_text", ""))
# Find vocals path from deps
vocals_path = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
vocals_path = dep_output.get("vocals_path")
if vocals_path:
break
if not vocals_path:
raise ValueError("上游步骤未提供 vocals_path")
if not lyrics_text:
raise ValueError("缺少 lyrics 参数")
# Step 1: Run SenseVoice ASR on vocals to get timing
asr_result_path = os.path.join(work_dir, "asr_timings.json")
# Copy vocals to GPU for ASR
gpu_dir = _gpu_task_dir(task_id)
await _run_gpu(f"mkdir -p {gpu_dir}")
remote_vocals = f"{gpu_dir}/vocals.wav"
await _copy_to_gpu(vocals_path, remote_vocals)
# Run SenseVoice ASR
asr_script = f"""
cd {gpu_dir}
source {GPU_DEMUCS_VENV}/bin/activate
python -c "
import json
from funasr import AutoModel
model = AutoModel(model='iic/SenseVoiceSmall', trust_remote_code=True)
res = model.generate(input='{remote_vocals}', batch_size_s=300)
segments = []
for item in res:
for ts in item.get('timestamp', []):
segments.append({{'text': item.get('text', ''), 'start': ts[0]/1000, 'end': ts[1]/1000}})
with open('asr_timings.json', 'w') as f:
json.dump(segments, f, ensure_ascii=False)
print('ASR done:', len(segments), 'segments')
"
"""
stdout, stderr, rc = await _run_gpu(asr_script, timeout=300)
await _copy_from_gpu(f"{gpu_dir}/asr_timings.json", asr_result_path)
with open(asr_result_path, "r") as f:
asr_timings = json.load(f)
# Step 2: LLM calibration — align ASR timings with original lyrics
from pipeline_service.handler import get_handler
calibrated = await _llm_calibrate(lyrics_text, asr_timings)
# Save calibrated lyrics
calibrated_path = os.path.join(work_dir, "calibrated_lyrics.json")
with open(calibrated_path, "w", encoding="utf-8") as f:
json.dump(calibrated, f, ensure_ascii=False, indent=2)
return {
"calibrated_lyrics_path": calibrated_path,
"calibrated_lyrics": calibrated,
"segment_count": len(calibrated),
}
async def _llm_calibrate(lyrics_text: str, asr_timings: list) -> list:
"""Use LLM to align raw lyrics text with ASR timings."""
prompt = f"""你是一个歌词时间轴校准专家。
原始歌词文本:
{lyrics_text}
ASR识别的时间戳:
{json.dumps(asr_timings, ensure_ascii=False)}
请将原始歌词的每一句与ASR时间戳对齐输出JSON数组每个元素包含:
- text: 歌词文本
- start: 开始时间浮点数
- end: 结束时间浮点数
要求:
1. 保持原始歌词的文本和顺序
2. 时间戳以ASR结果为基础进行微调
3. 确保时间不重叠每句之间留适当间隔
4. 只输出JSON不要其他内容"""
try:
from pipeline_service.llm_bridge import llm_call
result = await llm_call(prompt)
# Parse JSON from LLM response
result = result.strip()
if result.startswith("```"):
result = result.split("\n", 1)[1].rsplit("```", 1)[0]
return json.loads(result)
except Exception as e:
logger.warning(f"LLM calibration failed, using ASR timings directly: {e}")
return asr_timings
# ─── Subtitle Rendering ──────────────────────────────────────────────
async def handle_subtitle_rendering(tenant_id, task_id, step_name, input_data, config):
"""Generate ASS karaoke subtitle file from calibrated lyrics."""
work_dir = _task_dir(task_id)
# Find calibrated lyrics
calibrated = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
calibrated = dep_output.get("calibrated_lyrics")
if not calibrated and dep_output.get("calibrated_lyrics_path"):
with open(dep_output["calibrated_lyrics_path"], "r") as f:
calibrated = json.load(f)
if calibrated:
break
if not calibrated:
raise ValueError("上游步骤未提供 calibrated_lyrics")
# Generate ASS file
ass_path = os.path.join(work_dir, "karaoke.ass")
_write_ass_file(ass_path, calibrated)
return {"ass_path": ass_path}
def _write_ass_file(path: str, segments: list):
"""Write segments to ASS subtitle file with karaoke effect."""
header = """[Script Info]
Title: KTV Karaoke Subtitles
ScriptType: v4.00+
PlayResX: 1920
PlayResY: 1080
WrapStyle: 0
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: KTV,Source Han Sans SC,72,&H00FFFFFF,&H0000FFFF,&H00000000,&H80000000,-1,0,0,0,100,100,1,0,1,3,1,2,40,40,60,1
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
with open(path, "w", encoding="utf-8") as f:
f.write(header)
for seg in segments:
start = _seconds_to_ass_time(seg["start"])
end = _seconds_to_ass_time(seg["end"])
text = seg["text"].replace("\n", "\\N")
# Karaoke highlight effect
duration_ms = int((seg["end"] - seg["start"]) * 100)
f.write(f"Dialogue: 0,{start},{end},KTV,,0,0,0,,{{\\k{duration_ms}}}{text}\n")
def _seconds_to_ass_time(seconds: float) -> str:
"""Convert seconds to ASS time format H:MM:SS.CC"""
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
cs = int((seconds % 1) * 100)
return f"{h}:{m:02d}:{s:02d}.{cs:02d}"
# ─── Subtitle Exporting ──────────────────────────────────────────────
async def handle_subtitle_exporting(tenant_id, task_id, step_name, input_data, config):
"""Export ASS subtitle as standalone file (already done by rendering)."""
ass_path = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
ass_path = dep_output.get("ass_path")
if ass_path:
break
if not ass_path or not os.path.exists(ass_path):
raise ValueError("上游步骤未提供有效的 ass_path")
return {"subtitle_path": ass_path, "format": "ass"}
# ─── Lyric Generation & Evaluation (Mode C) ──────────────────────────
async def handle_lyric_generating(tenant_id, task_id, step_name, input_data, config):
"""LLM generates lyrics from topic/outline."""
params = input_data.get("task_params", {})
topic = params.get("topic", params.get("outline", ""))
style = params.get("style", "流行")
language = params.get("language", "zh")
if not topic:
raise ValueError("缺少 topic/outline参数")
prompt = f"""请创作一首{style}风格的{language}歌词。
主题/大纲: {topic}
要求:
1. 包含完整结构: 主歌(Verse)副歌(Chorus)桥段(Bridge)
2. 每句歌词节奏感强适合演唱
3. 押韵自然情感真实
4. 总长度适合3-5分钟歌曲
直接输出歌词文本标注段落结构"""
try:
from pipeline_service.llm_bridge import llm_call
lyrics = await llm_call(prompt)
return {"lyrics": lyrics.strip(), "topic": topic, "style": style}
except Exception as e:
raise ValueError(f"歌词生成失败: {e}")
async def handle_lyric_evaluating(tenant_id, task_id, step_name, input_data, config):
"""Evaluate lyric quality, retry if below threshold."""
threshold = config.get("threshold", 8.5)
lyrics = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
lyrics = dep_output.get("lyrics")
if lyrics:
break
if not lyrics:
raise ValueError("上游步骤未提供歌词")
# Evaluate via LLM
prompt = f"""请从以下维度评估这首歌词的质量(1-10分):
1. 韵律节奏: 押韵节奏感可唱性
2. 情感表达: 情感真实度共鸣力
3. 文学性: 用词意象修辞
4. 结构完整: 段落编排层次感
5. 商业潜力: 流行度记忆点
歌词:
{lyrics}
输出JSON: {{"score": 8.5, "dimensions": {{...}}, "suggestions": "..."}}
只输出JSON"""
try:
from pipeline_service.llm_bridge import llm_call
result = await llm_call(prompt)
result = result.strip()
if result.startswith("```"):
result = result.split("\n", 1)[1].rsplit("```", 1)[0]
evaluation = json.loads(result)
score = evaluation.get("score", 0)
except Exception:
score = 7.0 # Default pass if evaluation fails
evaluation = {"score": score, "note": "evaluation_parse_failed"}
if score < threshold:
raise ValueError(f"歌词评分 {score} 低于阈值 {threshold},需要重新生成")
return {
"lyrics": lyrics,
"evaluation": evaluation,
"score": score,
"passed": True,
}
# ─── Music Generation ────────────────────────────────────────────────
async def handle_music_generating(tenant_id, task_id, step_name, input_data, config):
"""Submit music generation job to Suno/MiniMax API."""
lyrics = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
lyrics = dep_output.get("lyrics")
if lyrics:
break
if not lyrics:
raise ValueError("上游步骤未提供歌词")
params = input_data.get("task_params", {})
music_service = params.get("music_service", "suno")
style = params.get("music_style", "pop")
# Submit to music generation API
# TODO: Implement actual API call to Suno/MiniMax
# For now, return a job_id placeholder
job_id = f"music_{task_id}_{int(time.time())}"
return {
"music_job_id": job_id,
"music_service": music_service,
"style": style,
"lyrics": lyrics,
"status": "submitted",
}
async def handle_music_polling(tenant_id, task_id, step_name, input_data, config):
"""Poll music generation API until complete."""
job_info = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
job_info = dep_output
if job_info and job_info.get("music_job_id"):
break
if not job_info:
raise ValueError("上游步骤未提供 music_job_id")
# TODO: Implement actual polling logic
# For now, simulate a wait and return
work_dir = _task_dir(task_id)
music_path = os.path.join(work_dir, "generated_music.mp3")
# Placeholder: the actual implementation would poll the API
# and download the result
return {
"music_path": music_path,
"music_job_id": job_info.get("music_job_id"),
"status": "completed",
}
# ─── Character & Video Generation ────────────────────────────────────
async def handle_character_designing(tenant_id, task_id, step_name, input_data, config):
"""LLM designs MV character descriptions."""
lyrics = None
params = input_data.get("task_params", {})
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
lyrics = dep_output.get("lyrics") or dep_output.get("calibrated_lyrics")
if isinstance(lyrics, list):
lyrics = " ".join(s.get("text", "") for s in lyrics)
if lyrics:
break
style = params.get("visual_style", "anime")
prompt = f"""根据以下歌词设计MV角色方案。
歌词:
{lyrics}
视觉风格: {style}
请设计1-3个角色每个角色包含:
1. 角色名称
2. 外貌描述用于AI图像生成的详细prompt
3. 性格特征
4. 在MV中的角色定位
输出JSON数组"""
try:
from pipeline_service.llm_bridge import llm_call
result = await llm_call(prompt)
result = result.strip()
if result.startswith("```"):
result = result.split("\n", 1)[1].rsplit("```", 1)[0]
characters = json.loads(result)
except Exception as e:
raise ValueError(f"角色设计失败: {e}")
return {"characters": characters, "visual_style": style}
async def handle_character_image_generating(tenant_id, task_id, step_name, input_data, config):
"""Generate character reference images using wan2.7 on GPU server."""
work_dir = _task_dir(task_id)
gpu_dir = _gpu_task_dir(task_id)
characters = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
characters = dep_output.get("characters")
if characters:
break
if not characters:
raise ValueError("上游步骤未提供角色设计")
await _run_gpu(f"mkdir -p {gpu_dir}/characters")
char_images = []
for i, char in enumerate(characters):
prompt = char.get("prompt", char.get("description", ""))
if not prompt:
continue
# Generate image on GPU with wan2.7
gen_cmd = (
f"cd {GPU_WAN22_DIR} && "
f"source venv/bin/activate && "
f"python generate.py --prompt '{prompt}' "
f"--output {gpu_dir}/characters/char_{i}.png "
f"--width 512 --height 512"
)
stdout, stderr, rc = await _run_gpu(gen_cmd, timeout=120)
local_path = os.path.join(work_dir, f"char_{i}.png")
await _copy_from_gpu(f"{gpu_dir}/characters/char_{i}.png", local_path)
char_images.append({
"name": char.get("name", f"char_{i}"),
"image_path": local_path,
"prompt": prompt,
})
return {"character_images": char_images}
async def handle_storyboard_generating(tenant_id, task_id, step_name, input_data, config):
"""LLM generates storyboard script from lyrics + characters."""
lyrics = None
char_images = None
params = input_data.get("task_params", {})
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
if dep_output.get("calibrated_lyrics"):
lyrics = dep_output["calibrated_lyrics"]
elif dep_output.get("lyrics"):
lyrics = dep_output["lyrics"]
if dep_output.get("character_images"):
char_images = dep_output["character_images"]
if not lyrics:
raise ValueError("上游步骤未提供歌词")
duration = params.get("duration", 240) # Default 4 min
prompt = f"""根据歌词和角色生成MV分镜脚本。
歌词:
{json.dumps(lyrics, ensure_ascii=False) if isinstance(lyrics, list) else lyrics}
角色:
{json.dumps(char_images, ensure_ascii=False) if char_images else "无特定角色"}
视频总时长: {duration}
请输出JSON数组每个分镜包含:
- scene_id: 分镜编号
- start_time: 开始秒数
- end_time: 结束秒数
- description: 场景描述英文用于视频生成prompt
- characters: 出现的角色
- camera: 镜头运动描述
- mood: 情绪/色调
确保分镜覆盖整首歌每个分镜5-15"""
try:
from pipeline_service.llm_bridge import llm_call
result = await llm_call(prompt)
result = result.strip()
if result.startswith("```"):
result = result.split("\n", 1)[1].rsplit("```", 1)[0]
storyboard = json.loads(result)
except Exception as e:
raise ValueError(f"分镜生成失败: {e}")
work_dir = _task_dir(task_id)
sb_path = os.path.join(work_dir, "storyboard.json")
with open(sb_path, "w", encoding="utf-8") as f:
json.dump(storyboard, f, ensure_ascii=False, indent=2)
return {"storyboard": storyboard, "storyboard_path": sb_path, "scene_count": len(storyboard)}
async def handle_scene_video_generating(tenant_id, task_id, step_name, input_data, config):
"""Generate scene videos on GPU using T2V/Ref2V."""
work_dir = _task_dir(task_id)
gpu_dir = _gpu_task_dir(task_id)
storyboard = None
char_images = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
if dep_output.get("storyboard"):
storyboard = dep_output["storyboard"]
if dep_output.get("character_images"):
char_images = dep_output["character_images"]
if not storyboard:
raise ValueError("上游步骤未提供分镜脚本")
await _run_gpu(f"mkdir -p {gpu_dir}/scenes")
# Copy character images to GPU if available
if char_images:
for ci in char_images:
local = ci.get("image_path", "")
if local and os.path.exists(local):
remote = f"{gpu_dir}/characters/{os.path.basename(local)}"
await _copy_to_gpu(local, remote)
scene_videos = []
for i, scene in enumerate(storyboard):
desc = scene.get("description", "")
duration = scene.get("end_time", 10) - scene.get("start_time", 5)
frames = int(duration * 24) # 24fps
# Determine if we use Ref2V (with character ref) or T2V
ref_image = None
if char_images and scene.get("characters"):
# Find matching character image
for ci in char_images:
if ci.get("name") in str(scene.get("characters", [])):
ref_image = f"{gpu_dir}/characters/{os.path.basename(ci['image_path'])}"
break
if ref_image:
gen_cmd = (
f"cd {GPU_WAN22_DIR} && source venv/bin/activate && "
f"python generate_ref2v.py --prompt '{desc}' "
f"--ref_image '{ref_image}' "
f"--output {gpu_dir}/scenes/scene_{i:03d}.mp4 "
f"--frames {frames}"
)
else:
gen_cmd = (
f"cd {GPU_WAN22_DIR} && source venv/bin/activate && "
f"python generate_t2v.py --prompt '{desc}' "
f"--output {gpu_dir}/scenes/scene_{i:03d}.mp4 "
f"--frames {frames}"
)
stdout, stderr, rc = await _run_gpu(gen_cmd, timeout=600)
local_scene = os.path.join(work_dir, f"scene_{i:03d}.mp4")
await _copy_from_gpu(f"{gpu_dir}/scenes/scene_{i:03d}.mp4", local_scene)
scene_videos.append({
"scene_id": scene.get("scene_id", i),
"video_path": local_scene,
"description": desc,
"duration": duration,
})
return {"scene_videos": scene_videos, "scene_count": len(scene_videos)}
async def handle_scene_video_evaluating(tenant_id, task_id, step_name, input_data, config):
"""Evaluate scene video quality via VLM, retry if below threshold."""
threshold = config.get("threshold", 7.0)
max_retry = config.get("max_retry", 3)
scene_videos = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
scene_videos = dep_output.get("scene_videos")
if scene_videos:
break
if not scene_videos:
raise ValueError("上游步骤未提供场景视频")
# Evaluate each scene (simplified: check file exists and has reasonable size)
valid_scenes = []
for sv in scene_videos:
path = sv.get("video_path", "")
if os.path.exists(path) and os.path.getsize(path) > 10000:
sv["quality_score"] = 8.0 # Placeholder
valid_scenes.append(sv)
else:
sv["quality_score"] = 0
logger.warning(f"Scene {sv.get('scene_id')} missing or too small: {path}")
if not valid_scenes:
raise ValueError("所有场景视频质量不合格")
avg_score = sum(s.get("quality_score", 0) for s in valid_scenes) / len(valid_scenes)
if avg_score < threshold:
raise ValueError(f"平均质量分 {avg_score:.1f} 低于阈值 {threshold}")
return {"scene_videos": valid_scenes, "avg_quality": avg_score}
async def handle_scene_video_concatenating(tenant_id, task_id, step_name, input_data, config):
"""Concatenate scene videos with ffmpeg, loop to match audio duration."""
work_dir = _task_dir(task_id)
scene_videos = None
audio_duration = None
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
if dep_output.get("scene_videos"):
scene_videos = dep_output["scene_videos"]
if dep_output.get("duration"):
audio_duration = dep_output["duration"]
if not scene_videos:
raise ValueError("上游步骤未提供场景视频")
# Create concat file
concat_list = os.path.join(work_dir, "concat_list.txt")
with open(concat_list, "w") as f:
for sv in scene_videos:
path = sv.get("video_path", "")
if os.path.exists(path):
f.write(f"file '{path}'\n")
# Concatenate
concat_path = os.path.join(work_dir, "concat_video.mp4")
await _run_local(
f"ffmpeg -y -f concat -safe 0 -i '{concat_list}' -c copy '{concat_path}'"
)
# Loop to match audio duration if needed
final_path = os.path.join(work_dir, "final_video.mp4")
if audio_duration and audio_duration > 0:
# Get concat duration
stdout, _, _ = await _run_local(
f"ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 '{concat_path}'"
)
concat_dur = float(stdout.strip()) if stdout.strip() else 0
if concat_dur > 0 and concat_dur < audio_duration:
loops = int(audio_duration / concat_dur) + 1
await _run_local(
f"ffmpeg -y -stream_loop {loops} -i '{concat_path}' "
f"-t {audio_duration} -c:v libx264 -preset fast '{final_path}'"
)
else:
await _run_local(f"cp '{concat_path}' '{final_path}'")
else:
await _run_local(f"cp '{concat_path}' '{final_path}'")
return {"final_video_path": final_path}
# ─── Final Synthesis ─────────────────────────────────────────────────
async def handle_ktv_synthesizing(tenant_id, task_id, step_name, input_data, config):
"""Synthesize final KTV (dual-track) + MTV (single-track) videos."""
work_dir = _task_dir(task_id)
video_path = None
ass_path = None
vocals_path = None
no_vocals_path = None
has_original_video = False
for dep_name, dep_output in input_data.items():
if isinstance(dep_output, dict):
if dep_output.get("final_video_path"):
video_path = dep_output["final_video_path"]
if dep_output.get("ass_path"):
ass_path = dep_output["ass_path"]
if dep_output.get("vocals_path"):
vocals_path = dep_output["vocals_path"]
if dep_output.get("no_vocals_path"):
no_vocals_path = dep_output["no_vocals_path"]
if dep_output.get("video_path") and not video_path:
# Mode B: use original video
video_path = dep_output["video_path"]
has_original_video = True
if not ass_path:
raise ValueError("缺少字幕文件")
# Determine audio tracks
if has_original_video and not vocals_path:
# Mode B: extract from video
vocals_path = os.path.join(work_dir, "vocals.wav")
no_vocals_path = os.path.join(work_dir, "no_vocals.wav")
if not os.path.exists(vocals_path):
raise ValueError("Demucs 步骤未提供人声轨道")
if not video_path:
raise ValueError("缺少视频源")
# KTV version: dual audio (vocals + no_vocals) with subtitle overlay
ktv_path = os.path.join(work_dir, "ktv_final.mp4")
mtv_path = os.path.join(work_dir, "mtv_final.mp4")
# KTV: video + vocals_track + no_vocals_track + subtitle burn
if vocals_path and no_vocals_path:
ktv_cmd = (
f"ffmpeg -y -i '{video_path}' -i '{vocals_path}' -i '{no_vocals_path}' "
f"-filter_complex \"[0:v]ass='{ass_path}'[v]\" "
f"-map '[v]' -map 1:a -map 2:a "
f"-c:v libx264 -preset fast -c:a aac -b:a 192k "
f"-metadata:s:a:0 title='Vocals' -metadata:s:a:1 title='Accompaniment' "
f"'{ktv_path}'"
)
else:
ktv_cmd = (
f"ffmpeg -y -i '{video_path}' "
f"-vf \"ass='{ass_path}'\" "
f"-c:v libx264 -preset fast -c:a aac -b:a 192k "
f"'{ktv_path}'"
)
stdout, stderr, rc = await _run_local(ktv_cmd, timeout=600)
if rc != 0:
raise ValueError(f"KTV合成失败: {stderr}")
# MTV: single audio (original/mix) with subtitle
mtv_cmd = (
f"ffmpeg -y -i '{video_path}' "
f"-vf \"ass='{ass_path}'\" "
f"-c:v libx264 -preset fast -c:a aac -b:a 192k "
f"'{mtv_path}'"
)
stdout, stderr, rc = await _run_local(mtv_cmd, timeout=600)
if rc != 0:
logger.warning(f"MTV合成失败仅输出KTV版本: {stderr}")
result = {
"ktv_path": ktv_path,
"subtitle_path": ass_path,
}
if os.path.exists(mtv_path):
result["mtv_path"] = mtv_path
return result
# ─── Registration ─────────────────────────────────────────────────────
KTV_HANDLERS = {
"audio_preparing": handle_audio_preparing,
"video_preparing": handle_video_preparing,
"demucs_separating": handle_demucs_separating,
"lyric_calibrating": handle_lyric_calibrating,
"subtitle_rendering": handle_subtitle_rendering,
"subtitle_exporting": handle_subtitle_exporting,
"lyric_generating": handle_lyric_generating,
"lyric_evaluating": handle_lyric_evaluating,
"music_generating": handle_music_generating,
"music_polling": handle_music_polling,
"character_designing": handle_character_designing,
"character_image_generating": handle_character_image_generating,
"storyboard_generating": handle_storyboard_generating,
"scene_video_generating": handle_scene_video_generating,
"scene_video_evaluating": handle_scene_video_evaluating,
"scene_video_concatenating": handle_scene_video_concatenating,
"ktv_synthesizing": handle_ktv_synthesizing,
}
def register_ktv_handlers():
"""Register all KTV step handlers."""
from .handler import register_handler
for step_type, fn in KTV_HANDLERS.items():
register_handler(step_type, fn)
logger.info(f"Registered {len(KTV_HANDLERS)} KTV handlers")

View File

@ -23,6 +23,7 @@ from .storage import (
)
from .executor import start_task, resume_task, stop_task, is_running
from .handler import register_handler, list_handlers, register_default_handler
from .handlers_ktv import register_ktv_handlers
MODULE_NAME = "pipeline_service"
MODULE_VERSION = "2.0.0"
@ -278,5 +279,8 @@ def load_pipeline_service():
# Register default handler
register_default_handler()
# Register KTV handlers
register_ktv_handlers()
debug(f"[{MODULE_NAME}] v{MODULE_VERSION} loaded — generic pipeline execution engine")
return True

View File

@ -0,0 +1,62 @@
"""LLM bridge for pipeline handlers.
Provides a simple async interface for handlers to call LLM APIs.
Uses harnessed_agent's llm_chat under the hood when available,
falls back to direct HTTP calls.
"""
import json
import logging
import os
logger = logging.getLogger("pipeline.llm_bridge")
async def llm_call(prompt: str, model: str = None, temperature: float = 0.7) -> str:
"""Call LLM and return text response.
Tries multiple backends:
1. harnessed_agent.llm_chat (if loaded in ServerEnv)
2. Direct OpenAI-compatible API call
"""
# Try harnessed_agent first
try:
from ahserver.serverenv import ServerEnv
env = ServerEnv()
if hasattr(env, 'llm_chat'):
result = await env.llm_chat(prompt, model=model, temperature=temperature)
if isinstance(result, dict):
return result.get("content", result.get("text", str(result)))
return str(result)
except Exception:
pass
# Fallback: direct HTTP call to OpenAI-compatible endpoint
import aiohttp
api_base = os.environ.get("LLM_API_BASE", "https://api.openai.com/v1")
api_key = os.environ.get("LLM_API_KEY", "")
model = model or os.environ.get("LLM_MODEL", "gpt-4o-mini")
if not api_key:
raise ValueError("No LLM API configured (set LLM_API_KEY env var)")
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": temperature,
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{api_base}/chat/completions", headers=headers, json=payload, timeout=aiohttp.ClientTimeout(total=120)
) as resp:
if resp.status != 200:
text = await resp.text()
raise ValueError(f"LLM API error {resp.status}: {text[:200]}")
data = await resp.json()
return data["choices"][0]["message"]["content"]

View File

@ -14,11 +14,31 @@ def _get_db():
async def get_pipeline_steps(pipeline_id: str) -> list:
"""Read step definitions from pipeline_steps table (defined by pipeline_core)."""
"""Read step definitions from pipeline_steps table (defined by pipeline_core).
Extracts 'deps' from step_config JSON and injects it as a top-level field
so that build_step_graph() can find it.
"""
db, dbname = _get_db()
async with db.sqlorContext(dbname) as sor:
recs = await sor.R('pipeline_steps', {'pipeline_id': pipeline_id}, sort='step_order')
return list(recs) if recs else []
if not recs:
return []
result = []
for rec in recs:
if hasattr(rec, '__dict__'):
d = {k: getattr(rec, k) for k in dir(rec) if not k.startswith('_')}
else:
d = dict(rec)
# Extract deps from step_config JSON
cfg_raw = d.get('step_config', '{}')
try:
cfg = json.loads(cfg_raw) if isinstance(cfg_raw, str) else cfg_raw
except (json.JSONDecodeError, TypeError):
cfg = {}
d['deps'] = cfg.get('deps', [])
result.append(d)
return result
async def create_task(tenant_id: str, pipeline_id: str, owner_id: str, title: str, params: dict) -> str: