import os import asyncio import cv2 import shutil import subprocess from pathlib import Path from appPublic.log import debug def _load_model(tasks): """Lazy-load Real-ESRGAN model (stays in VRAM)""" if tasks.upsampler is not None: return import torch from basicsr.archs.rrdbnet_arch import RRDBNet from realesrgan import RealESRGANer model_path = "/data/ymq/models/RealESRGAN_x2plus.pth" if not os.path.exists(model_path): os.makedirs(os.path.dirname(model_path), exist_ok=True) subprocess.run([ "wget", "-q", "https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.1/RealESRGAN_x2plus.pth", "-O", model_path ], check=True) device = "cuda:0" model = RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=2) tasks.upsampler = RealESRGANer( scale=2, model_path=model_path, model=model, tile=0, tile_pad=10, pre_pad=0, half=False, device=device ) debug(f"Real-ESRGAN x2 model loaded on {device}") async def run_upscale(tasks, payload: dict): """Upscale a video file using Real-ESRGAN""" video_path = payload.get("video_path", "") output_dir = payload.get("output_dir", "/data/ymq/upscaled-outputs") scale = payload.get("scale", 2) if not video_path or not os.path.exists(video_path): return {"task_id": payload.get("task_id", ""), "status": "failed", "error": f"Video not found: {video_path}"} # Load model on first call _load_model(tasks) task_id = payload.get("task_id", "unknown") task_dir = Path(f"/tmp/realesrgan_{task_id}") frames_dir = task_dir / "frames" upscaled_dir = task_dir / "upscaled" try: for d in [task_dir, frames_dir, upscaled_dir]: d.mkdir(parents=True, exist_ok=True) # Extract frames debug(f"[{task_id}] Extracting frames...") subprocess.run([ "ffmpeg", "-i", video_path, "-q:v", "1", str(frames_dir / "frame_%04d.jpg") ], check=True, capture_output=True) # Get video fps result = subprocess.run([ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=r_frame_rate", "-of", "csv=p=0", video_path ], check=True, capture_output=True, text=True) fps = result.stdout.strip() # Upscale frames frame_files = sorted(frames_dir.glob("*.jpg")) total = len(frame_files) debug(f"[{task_id}] Upscaling {total} frames...") for i, frame_path in enumerate(frame_files, 1): img = cv2.imread(str(frame_path), cv2.IMREAD_UNCHANGED) output, _ = tasks.upsampler.enhance(img, outscale=scale) cv2.imwrite(str(upscaled_dir / frame_path.name), output) if i % 10 == 0 or i == total: debug(f"[{task_id}] Frame {i}/{total}") # Re-encode video os.makedirs(output_dir, exist_ok=True) stem = Path(video_path).stem output_video = Path(output_dir) / f"{stem}_upscaled.mp4" debug(f"[{task_id}] Re-encoding video...") subprocess.run([ "ffmpeg", "-framerate", fps, "-i", str(upscaled_dir / "frame_%04d.jpg"), "-c:v", "libx264", "-preset", "slow", "-crf", "18", "-pix_fmt", "yuv420p", str(output_video), "-y" ], check=True, capture_output=True) # Get output resolution result = subprocess.run([ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=p=0", str(output_video) ], check=True, capture_output=True, text=True) w, h = result.stdout.strip().split(",") debug(f"[{task_id}] Done: {output_video} ({w}x{h})") return { "task_id": task_id, "status": "success", "video_path": str(output_video), "resolution": f"{w}x{h}", "frames": total } except Exception as e: debug(f"[{task_id}] Error: {e}") return {"task_id": task_id, "status": "failed", "error": str(e)} finally: if task_dir.exists(): shutil.rmtree(task_dir)