asr-service/workers/transcribe.py

145 lines
4.2 KiB
Python

"""
ASR Transcription Worker using faster-whisper.
Lazy-loads the model on first use and keeps it in GPU memory.
Processes transcription tasks from the Redis queue.
"""
import os
import json
import asyncio
import time
from appPublic.log import debug, error
# Module-level model cache (lazy-loaded, stays in memory)
_model = None
_model_lock = None
MODEL_PATH = '/data/ymq/models/deepdml/faster-whisper-large-v3-turbo-ct2'
def _get_lock():
"""Get or create the async lock for model loading."""
global _model_lock
if _model_lock is None:
_model_lock = asyncio.Lock()
return _model_lock
async def load_model():
"""Lazy-load the faster-whisper model. Thread-safe, loads once."""
global _model
if _model is not None:
return _model
async with _get_lock():
# Double-check after acquiring lock
if _model is not None:
return _model
debug(f'Loading faster-whisper model from {MODEL_PATH}...')
t0 = time.time()
from faster_whisper import WhisperModel
# CUDA device 0 — CUDA_VISIBLE_DEVICES already isolates the GPU
_model = WhisperModel(
MODEL_PATH,
device='cuda',
device_index=0,
compute_type='float16',
num_workers=1,
)
elapsed = time.time() - t0
debug(f'faster-whisper model loaded in {elapsed:.1f}s')
return _model
async def run_transcribe(tasks, payload):
"""
Run transcription on an audio file.
Payload fields:
audio_path (str): Path to the audio file (required)
language (str): Language code, default 'zh'
word_timestamps (bool): Enable word-level timestamps, default True
vad_filter (bool): Enable VAD filter, default True
output_path (str): Optional path to save result JSON
Returns:
dict with segments, language, duration, etc.
"""
audio_path = payload.get('audio_path')
if not audio_path:
raise ValueError('audio_path is required')
if not os.path.exists(audio_path):
raise FileNotFoundError(f'Audio file not found: {audio_path}')
language = payload.get('language', 'zh')
word_timestamps = payload.get('word_timestamps', True)
vad_filter = payload.get('vad_filter', True)
output_path = payload.get('output_path')
debug(f'Transcribing: {audio_path} (lang={language}, vad={vad_filter}, words={word_timestamps})')
t0 = time.time()
model = await load_model()
# Run the synchronous transcription in a thread to not block the event loop
loop = asyncio.get_event_loop()
segments_gen, info = await loop.run_in_executor(
None,
lambda: model.transcribe(
audio_path,
language=language,
word_timestamps=word_timestamps,
vad_filter=vad_filter,
)
)
# Collect segments
segments = []
for seg in segments_gen:
seg_data = {
'text': seg.text,
'start': round(seg.start, 3),
'end': round(seg.end, 3),
}
if word_timestamps and seg.words:
seg_data['words'] = [
{
'word': w.word,
'start': round(w.start, 3),
'end': round(w.end, 3),
'probability': round(w.probability, 4),
}
for w in seg.words
]
segments.append(seg_data)
elapsed = time.time() - t0
result = {
'status': 'ok',
'text': ' '.join(s['text'] for s in segments),
'language': info.language,
'language_probability': round(info.language_probability, 4),
'duration': round(info.duration, 3),
'segments': segments,
'processing_time': round(elapsed, 2),
'audio_path': audio_path,
}
debug(f'Transcription done in {elapsed:.1f}s: {len(segments)} segments, '
f'duration={info.duration:.1f}s, lang={info.language}')
# Save result if output_path specified
if output_path:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
debug(f'Result saved to {output_path}')
return result