from longtasks.longtasks import LongTasks from faster_whisper import WhisperModel from appPublic.worker import awaitify from appPublic.jsonConfig import getConfig from appPublic.log import debug, exception from ahserver.filestorage import FileStorage class MyLongTask(LongTasks): def __init__(self): self.config = getConfig() redis_url = self.config.redis_url taskname = 'fastwhisper' worker_cnt = self.config.worker_cnt super().__init__(redis_url, taskname, worker_cnt=worker_cnt) self.load_models() def load_models(self): self.models = [] for i in range(self.worker_cnt): model = WhisperModel(self.config.model_path, device="cuda", compute_type="float16") self.models.append(model) async def process_task(self, payload:dict, workerid:int=None): debug(f'process_task():{payload=}, {workerid=}') if workerid is None: return { "task_status": "error", "message": "workerid is None" } model = self.models[workerid] webpath = payload.get('audio_file') if webpath is None: return { "task_status": "error", "message":"not audio_file provided" } fs = FileStorage() fpath = fs.realPath(webpath) f = awaitify(self.transcribe) return await f(model, fpath) def transcribe(self, model, fpath): segments, info = model.transcribe(fpath, beam_size=5) segments = list(segments) debug(f'{segments=}') words = [] for s in segments: if s.words: words.append([[word.start, word.end, word.word] for word in s.words]) return { 'language': info.language, 'language_probability': info.language_probability, 'content': ' '.join([s.text for s in segments]), 'words': words, 'segments': [[segment.start, segment.end, segment.text ] for segment in segments] }