""" ASR Transcription Worker using faster-whisper. Lazy-loads the model on first use and keeps it in GPU memory. Processes transcription tasks from the Redis queue. """ import os import json import asyncio import time from appPublic.log import debug, error # Module-level model cache (lazy-loaded, stays in memory) _model = None _model_lock = None MODEL_PATH = '/data/ymq/models/deepdml/faster-whisper-large-v3-turbo-ct2' def _get_lock(): """Get or create the async lock for model loading.""" global _model_lock if _model_lock is None: _model_lock = asyncio.Lock() return _model_lock async def load_model(): """Lazy-load the faster-whisper model. Thread-safe, loads once.""" global _model if _model is not None: return _model async with _get_lock(): # Double-check after acquiring lock if _model is not None: return _model debug(f'Loading faster-whisper model from {MODEL_PATH}...') t0 = time.time() from faster_whisper import WhisperModel # CUDA device 0 — CUDA_VISIBLE_DEVICES already isolates the GPU _model = WhisperModel( MODEL_PATH, device='cuda', device_index=0, compute_type='float16', num_workers=1, ) elapsed = time.time() - t0 debug(f'faster-whisper model loaded in {elapsed:.1f}s') return _model async def run_transcribe(tasks, payload): """ Run transcription on an audio file. Payload fields: audio_path (str): Path to the audio file (required) language (str): Language code, default 'zh' word_timestamps (bool): Enable word-level timestamps, default True vad_filter (bool): Enable VAD filter, default True output_path (str): Optional path to save result JSON Returns: dict with segments, language, duration, etc. """ audio_path = payload.get('audio_path') if not audio_path: raise ValueError('audio_path is required') if not os.path.exists(audio_path): raise FileNotFoundError(f'Audio file not found: {audio_path}') language = payload.get('language', 'zh') word_timestamps = payload.get('word_timestamps', True) vad_filter = payload.get('vad_filter', True) output_path = payload.get('output_path') debug(f'Transcribing: {audio_path} (lang={language}, vad={vad_filter}, words={word_timestamps})') t0 = time.time() model = await load_model() # Run the synchronous transcription in a thread to not block the event loop loop = asyncio.get_event_loop() segments_gen, info = await loop.run_in_executor( None, lambda: model.transcribe( audio_path, language=language, word_timestamps=word_timestamps, vad_filter=vad_filter, ) ) # Collect segments segments = [] for seg in segments_gen: seg_data = { 'text': seg.text, 'start': round(seg.start, 3), 'end': round(seg.end, 3), } if word_timestamps and seg.words: seg_data['words'] = [ { 'word': w.word, 'start': round(w.start, 3), 'end': round(w.end, 3), 'probability': round(w.probability, 4), } for w in seg.words ] segments.append(seg_data) elapsed = time.time() - t0 result = { 'status': 'ok', 'text': ' '.join(s['text'] for s in segments), 'language': info.language, 'language_probability': round(info.language_probability, 4), 'duration': round(info.duration, 3), 'segments': segments, 'processing_time': round(elapsed, 2), 'audio_path': audio_path, } debug(f'Transcription done in {elapsed:.1f}s: {len(segments)} segments, ' f'duration={info.duration:.1f}s, lang={info.language}') # Save result if output_path specified if output_path: os.makedirs(os.path.dirname(output_path), exist_ok=True) with open(output_path, 'w', encoding='utf-8') as f: json.dump(result, f, ensure_ascii=False, indent=2) debug(f'Result saved to {output_path}') return result