# -*- coding:utf-8 -*- """ Wan2.2-TI2V-5B 视频生成 worker(进程内推理,模型常驻内存) 使用 Wan22 类直接调用推理 pipeline, 替代原先每次任务启动子进程的方式。 """ import os import json import uuid import asyncio from datetime import datetime from appPublic.log import debug, exception OUTPUT_DIR = '/data/ymq/wan22-outputs' REPO_DIR = '/data/ymq/wan22-service/repo' MODEL_PATH = '/data/ymq/models/Wan-AI/Wan2.2-TI2V-5B' # 全局 Wan22 实例,在 process_task 第一次调用时惰性初始化 _engine = None def _get_engine(longtasks): """惰性加载 Wan22 引擎,模型常驻内存。""" global _engine if _engine is not None: return _engine debug('Loading Wan22 engine (first call, may take 30-60s)...') # 把 repo 加入 sys.path,让 wan 包可导入 import sys if REPO_DIR not in sys.path: sys.path.insert(0, REPO_DIR) from workers.wan22_wrapper import Wan22 gpu_id = getattr(longtasks, 'gpu_id', int(os.environ.get('WAN22_GPU_ID', '2'))) os.environ['CUDA_VISIBLE_DEVICES'] = str(gpu_id) _engine = Wan22( ckpt_dir=MODEL_PATH, task='ti2v-5B', device_id=0, # CUDA_VISIBLE_DEVICES 已隔离,从0开始 use_prompt_extend=False, offload_model=True, ) debug(f'Wan22 engine loaded, GPU: {gpu_id}') return _engine async def run_generate(longtasks, payload): """ 执行视频生成(进程内推理)。 payload: { task_id: str, prompt: str, image: str (optional), size: str (default "1280*720"), frame_num: int (default 81, 4n+1), sample_steps: int (optional), sample_guide_scale: float (optional), base_seed: int (optional), } """ task_id = payload.get('task_id', str(uuid.uuid4())[:12]) prompt = payload.get('prompt', '') image_path = payload.get('image', None) size = payload.get('size', '1280*720') frame_num = payload.get('frame_num', 81) sample_steps = payload.get('sample_steps', None) sample_guide_scale = payload.get('sample_guide_scale', None) base_seed = payload.get('base_seed', None) # 校验 frame_num (4n+1) frame_num = max(17, min(frame_num, 129)) if (frame_num - 1) % 4 != 0: frame_num = ((frame_num - 1) // 4) * 4 + 1 os.makedirs(OUTPUT_DIR, exist_ok=True) output_file = os.path.join(OUTPUT_DIR, f'{task_id}.mp4') try: # 惰性加载引擎(模型常驻,后续任务复用) engine = _get_engine(longtasks) # 在 executor 中运行同步推理(不阻塞 asyncio 事件循环) loop = asyncio.get_running_loop() def _infer(): return engine.generate( prompt=prompt, image_path=image_path, size=size, frame_num=frame_num, steps=sample_steps, guide_scale=sample_guide_scale, seed=base_seed if base_seed is not None else engine.seed, save_file=output_file, ) result = await loop.run_in_executor(None, _infer) if not os.path.exists(output_file): return { 'task_id': task_id, 'status': 'failed', 'error': 'Output file not created by engine', } file_size = os.path.getsize(output_file) debug(f'Video generated: {output_file} ({file_size} bytes)') return { 'task_id': task_id, 'status': 'completed', 'video_url': f'/idfile?path={task_id}.mp4', 'video_path': output_file, 'size': size, 'frame_num': frame_num, 'file_size': file_size, 'prompt': prompt[:100], 'seed': result.get('seed'), } except Exception as e: exception(f'Generation error: {e}') return { 'task_id': task_id, 'status': 'failed', 'error': str(e), }