yumoqing 769fc4968e feat: initial wan22 video generation service
- Wan2.2-TI2V-5B GPU 视频推理
- ahserver + longtasks 异步任务队列
- OpenAI 兼容 API: POST /api/submit, GET /api/task, GET /api/status
- 模型常驻内存,惰性加载
- 全局串行推理锁(GPU 安全)
- 支持 t2v/i2v/ti2v/s2v 四种任务类型
2026-06-09 22:00:22 +08:00

135 lines
4.0 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding:utf-8 -*-
"""
Wan2.2-TI2V-5B 视频生成 worker进程内推理模型常驻内存
使用 Wan22 类直接调用推理 pipeline
替代原先每次任务启动子进程的方式。
"""
import os
import json
import uuid
import asyncio
from datetime import datetime
from appPublic.log import debug, exception
OUTPUT_DIR = '/data/ymq/wan22-outputs'
REPO_DIR = '/data/ymq/wan22-service/repo'
MODEL_PATH = '/data/ymq/models/Wan-AI/Wan2.2-TI2V-5B'
# 全局 Wan22 实例,在 process_task 第一次调用时惰性初始化
_engine = None
def _get_engine(longtasks):
"""惰性加载 Wan22 引擎,模型常驻内存。"""
global _engine
if _engine is not None:
return _engine
debug('Loading Wan22 engine (first call, may take 30-60s)...')
# 把 repo 加入 sys.path让 wan 包可导入
import sys
if REPO_DIR not in sys.path:
sys.path.insert(0, REPO_DIR)
from workers.wan22_wrapper import Wan22
gpu_id = getattr(longtasks, 'gpu_id', int(os.environ.get('WAN22_GPU_ID', '2')))
os.environ['CUDA_VISIBLE_DEVICES'] = str(gpu_id)
_engine = Wan22(
ckpt_dir=MODEL_PATH,
task='ti2v-5B',
device_id=0, # CUDA_VISIBLE_DEVICES 已隔离从0开始
use_prompt_extend=False,
offload_model=True,
)
debug(f'Wan22 engine loaded, GPU: {gpu_id}')
return _engine
async def run_generate(longtasks, payload):
"""
执行视频生成(进程内推理)。
payload: {
task_id: str,
prompt: str,
image: str (optional),
size: str (default "1280*720"),
frame_num: int (default 81, 4n+1),
sample_steps: int (optional),
sample_guide_scale: float (optional),
base_seed: int (optional),
}
"""
task_id = payload.get('task_id', str(uuid.uuid4())[:12])
prompt = payload.get('prompt', '')
image_path = payload.get('image', None)
size = payload.get('size', '1280*720')
frame_num = payload.get('frame_num', 81)
sample_steps = payload.get('sample_steps', None)
sample_guide_scale = payload.get('sample_guide_scale', None)
base_seed = payload.get('base_seed', None)
# 校验 frame_num (4n+1)
frame_num = max(17, min(frame_num, 129))
if (frame_num - 1) % 4 != 0:
frame_num = ((frame_num - 1) // 4) * 4 + 1
os.makedirs(OUTPUT_DIR, exist_ok=True)
output_file = os.path.join(OUTPUT_DIR, f'{task_id}.mp4')
try:
# 惰性加载引擎(模型常驻,后续任务复用)
engine = _get_engine(longtasks)
# 在 executor 中运行同步推理(不阻塞 asyncio 事件循环)
loop = asyncio.get_running_loop()
def _infer():
return engine.generate(
prompt=prompt,
image_path=image_path,
size=size,
frame_num=frame_num,
steps=sample_steps,
guide_scale=sample_guide_scale,
seed=base_seed if base_seed is not None else engine.seed,
save_file=output_file,
)
result = await loop.run_in_executor(None, _infer)
if not os.path.exists(output_file):
return {
'task_id': task_id,
'status': 'failed',
'error': 'Output file not created by engine',
}
file_size = os.path.getsize(output_file)
debug(f'Video generated: {output_file} ({file_size} bytes)')
return {
'task_id': task_id,
'status': 'completed',
'video_url': f'/idfile?path={task_id}.mp4',
'video_path': output_file,
'size': size,
'frame_num': frame_num,
'file_size': file_size,
'prompt': prompt[:100],
'seed': result.get('seed'),
}
except Exception as e:
exception(f'Generation error: {e}')
return {
'task_id': task_id,
'status': 'failed',
'error': str(e),
}