From 9ff3f19e88262d41c0c1dedc50540172d1d21fac Mon Sep 17 00:00:00 2001 From: yumoqing Date: Sun, 14 Jun 2026 16:17:10 +0800 Subject: [PATCH] Fix LongTasks import and worker interface --- ah.py | 48 ++++++++++++++++++++++++++++++++++------------- workers/verify.py | 6 +----- 2 files changed, 36 insertions(+), 18 deletions(-) diff --git a/ah.py b/ah.py index 80ed84a..848aa88 100644 --- a/ah.py +++ b/ah.py @@ -5,20 +5,42 @@ import sys, os sys.path.insert(0, os.path.dirname(__file__)) from ahserver.webapp import webapp -from longtasks import LongTasks from ahserver.serverenv import ServerEnv +from ahserver.configuredServer import add_startup +from longtasks.longtasks import LongTasks, schedule_once +from appPublic.log import debug from workers.verify import VerifyWorker -def init(app): - env = ServerEnv() - env.longtasks = LongTasks( - app=app, - worker_class=VerifyWorker, - queue_name="verify_delivery", - worker_cnt=1, - stuck_seconds=600, - max_age_hours=24 - ) - return env -webapp(init) +class VerifyTasks(LongTasks): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + async def process_task(self, payload, workid=None): + import json + if isinstance(payload, str): + payload = json.loads(payload) + worker = VerifyWorker(self) + result = await worker.run_task(payload) + return result + + +async def on_app_built(app): + env = ServerEnv() + lt = env.longtasks + if lt: + schedule_once(0.1, lt.run) + debug('Verify delivery longtasks worker started') + + +def init(): + env = ServerEnv() + env.longtasks = VerifyTasks( + 'redis://127.0.0.1:6379', 'verify_delivery', + worker_cnt=1, stuck_seconds=600, max_age_hours=24 + ) + add_startup(on_app_built) + + +if __name__ == '__main__': + webapp(init) diff --git a/workers/verify.py b/workers/verify.py index 707a859..d3dc27c 100644 --- a/workers/verify.py +++ b/workers/verify.py @@ -11,10 +11,7 @@ class VerifyWorker: def __init__(self, longtasks): self.lt = longtasks - async def run(self, task): - payload = task.payload - task_id = task.task_id - + async def run_task(self, payload): mtv_path = payload.get("mtv_path", "") ktv_path = payload.get("ktv_path", "") ass_path = payload.get("ass_path", "") @@ -67,7 +64,6 @@ class VerifyWorker: passed = len(all_errors) == 0 return { "status": "PASSED" if passed else "FAILED", - "task_id": task_id, "qa_results": qa_results, "total_errors": len(all_errors), "errors": all_errors