- Wan2.2-TI2V-5B GPU 视频推理 - ahserver + longtasks 异步任务队列 - OpenAI 兼容 API: POST /api/submit, GET /api/task, GET /api/status - 模型常驻内存,惰性加载 - 全局串行推理锁(GPU 安全) - 支持 t2v/i2v/ti2v/s2v 四种任务类型
135 lines
4.0 KiB
Python
Executable File
135 lines
4.0 KiB
Python
Executable File
# -*- coding:utf-8 -*-
|
||
"""
|
||
Wan2.2-TI2V-5B 视频生成 worker(进程内推理,模型常驻内存)
|
||
|
||
使用 Wan22 类直接调用推理 pipeline,
|
||
替代原先每次任务启动子进程的方式。
|
||
"""
|
||
import os
|
||
import json
|
||
import uuid
|
||
import asyncio
|
||
from datetime import datetime
|
||
from appPublic.log import debug, exception
|
||
|
||
OUTPUT_DIR = '/data/ymq/wan22-outputs'
|
||
REPO_DIR = '/data/ymq/wan22-service/repo'
|
||
MODEL_PATH = '/data/ymq/models/Wan-AI/Wan2.2-TI2V-5B'
|
||
|
||
# 全局 Wan22 实例,在 process_task 第一次调用时惰性初始化
|
||
_engine = None
|
||
|
||
|
||
def _get_engine(longtasks):
|
||
"""惰性加载 Wan22 引擎,模型常驻内存。"""
|
||
global _engine
|
||
if _engine is not None:
|
||
return _engine
|
||
|
||
debug('Loading Wan22 engine (first call, may take 30-60s)...')
|
||
|
||
# 把 repo 加入 sys.path,让 wan 包可导入
|
||
import sys
|
||
if REPO_DIR not in sys.path:
|
||
sys.path.insert(0, REPO_DIR)
|
||
|
||
from workers.wan22_wrapper import Wan22
|
||
|
||
gpu_id = getattr(longtasks, 'gpu_id', int(os.environ.get('WAN22_GPU_ID', '2')))
|
||
os.environ['CUDA_VISIBLE_DEVICES'] = str(gpu_id)
|
||
|
||
_engine = Wan22(
|
||
ckpt_dir=MODEL_PATH,
|
||
task='ti2v-5B',
|
||
device_id=0, # CUDA_VISIBLE_DEVICES 已隔离,从0开始
|
||
use_prompt_extend=False,
|
||
offload_model=True,
|
||
)
|
||
|
||
debug(f'Wan22 engine loaded, GPU: {gpu_id}')
|
||
return _engine
|
||
|
||
|
||
async def run_generate(longtasks, payload):
|
||
"""
|
||
执行视频生成(进程内推理)。
|
||
|
||
payload: {
|
||
task_id: str,
|
||
prompt: str,
|
||
image: str (optional),
|
||
size: str (default "1280*720"),
|
||
frame_num: int (default 81, 4n+1),
|
||
sample_steps: int (optional),
|
||
sample_guide_scale: float (optional),
|
||
base_seed: int (optional),
|
||
}
|
||
"""
|
||
task_id = payload.get('task_id', str(uuid.uuid4())[:12])
|
||
prompt = payload.get('prompt', '')
|
||
image_path = payload.get('image', None)
|
||
size = payload.get('size', '1280*720')
|
||
frame_num = payload.get('frame_num', 81)
|
||
sample_steps = payload.get('sample_steps', None)
|
||
sample_guide_scale = payload.get('sample_guide_scale', None)
|
||
base_seed = payload.get('base_seed', None)
|
||
|
||
# 校验 frame_num (4n+1)
|
||
frame_num = max(17, min(frame_num, 129))
|
||
if (frame_num - 1) % 4 != 0:
|
||
frame_num = ((frame_num - 1) // 4) * 4 + 1
|
||
|
||
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
||
output_file = os.path.join(OUTPUT_DIR, f'{task_id}.mp4')
|
||
|
||
try:
|
||
# 惰性加载引擎(模型常驻,后续任务复用)
|
||
engine = _get_engine(longtasks)
|
||
|
||
# 在 executor 中运行同步推理(不阻塞 asyncio 事件循环)
|
||
loop = asyncio.get_running_loop()
|
||
|
||
def _infer():
|
||
return engine.generate(
|
||
prompt=prompt,
|
||
image_path=image_path,
|
||
size=size,
|
||
frame_num=frame_num,
|
||
steps=sample_steps,
|
||
guide_scale=sample_guide_scale,
|
||
seed=base_seed if base_seed is not None else engine.seed,
|
||
save_file=output_file,
|
||
)
|
||
|
||
result = await loop.run_in_executor(None, _infer)
|
||
|
||
if not os.path.exists(output_file):
|
||
return {
|
||
'task_id': task_id,
|
||
'status': 'failed',
|
||
'error': 'Output file not created by engine',
|
||
}
|
||
|
||
file_size = os.path.getsize(output_file)
|
||
debug(f'Video generated: {output_file} ({file_size} bytes)')
|
||
|
||
return {
|
||
'task_id': task_id,
|
||
'status': 'completed',
|
||
'video_url': f'/idfile?path={task_id}.mp4',
|
||
'video_path': output_file,
|
||
'size': size,
|
||
'frame_num': frame_num,
|
||
'file_size': file_size,
|
||
'prompt': prompt[:100],
|
||
'seed': result.get('seed'),
|
||
}
|
||
|
||
except Exception as e:
|
||
exception(f'Generation error: {e}')
|
||
return {
|
||
'task_id': task_id,
|
||
'status': 'failed',
|
||
'error': str(e),
|
||
}
|