From 3cb0dc22f26c83f810e14f482b009d815cd7b747 Mon Sep 17 00:00:00 2001 From: yumoqing Date: Tue, 4 Nov 2025 17:31:41 +0800 Subject: [PATCH] bugfix --- README.md | 9 +++++++-- longtasks/longtasks.py | 33 +++++++++++---------------------- 2 files changed, 18 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index e2ee7b4..3848014 100644 --- a/README.md +++ b/README.md @@ -12,12 +12,17 @@ from appPublic.worker import schedule_once class MyTasks(LongTasks): # Child class - async def process_task(self, payload): + async def process_task(self, payload, workerid=None): # use logic to execute task pass def load_longtasks() - longtasks = MyTasks('redis://127.0.0.1:6379', 'example') + longtasks = MyTasks('redis://127.0.0.1:6379', 'example', worker_cnt=2, stuck_seconds=600, max_age_hours=3) + ## arguments + ## example: task name of this instance, each instance must has difference task name + ## worker_cnt: backend job count + ## stuck_seconds: tasks in processing_queue have waited longer than, will reenter the queue + ## max_age_hours: task created longer the max_age_hours will deleted from the LongTasks instance env = ServerEnv() env.longtasks = longtasks # run the backend job diff --git a/longtasks/longtasks.py b/longtasks/longtasks.py index 35ee1ca..f555f4e 100644 --- a/longtasks/longtasks.py +++ b/longtasks/longtasks.py @@ -7,13 +7,8 @@ import time from typing import Any, Dict from appPublic.worker import get_event_loop, schedule_interval, schedule_once from appPublic.uniqueID import getID +from appPublic.log import debug, exception -def work1(x, y=1): - print(f'{x} * {y} = {x*y}') - -async def work2(): - print('task2 ...') - class LongTasks: def __init__(self, redis_url, taskname, worker_cnt=2, stuck_seconds=600, max_age_hours=3): self.redis_url = redis_url @@ -37,15 +32,15 @@ class LongTasks: continue if created + self.max_age_seconds < now: - print(f"🧹 删除过期任务: {key}") + debug(f"🧹 删除过期任务: {key}") await self.redis.delete(key) await self.redis.lrem("task_queue", 0, key) # 从任务队列中移除(可选) - async def process_task(self, payload:dict): + async def process_task(self, payload:dict, workid:int=None): sec = randint(0,5) await asyncio.sleep(sec) - print(f'{payload=} done') + debug(f'{payload=} done') return { 'result': 'OK' } @@ -91,7 +86,7 @@ class LongTasks: # 这里我们选择重新入队并删除 processing entry await self.redis.rpush(self.task_queue, item) await self.redis.lrem(self.processing_queue, 1, item) - print(f"[recover] requeued missing-hash {task_id}") + debug(f"[recover] requeued missing-hash {task_id}") continue started_at = float(info.get("started_at") or 0) @@ -104,11 +99,11 @@ class LongTasks: await self.update_task_hash(task_id, {"status": "PENDING", "attempts": attempts}) await self.redis.rpush(self.task_queue, item) await self.redis.lrem(self.processing_queue, 1, item) - print(f"[recover] task {task_id} requeued due to stuck") + debug(f"[recover] task {task_id} requeued due to stuck") # else: 正常 running 或其他状态,不处理 async def worker_loop(self, worker_id: int): - print(f"[worker {worker_id}] start") + debug(f"[worker {worker_id}] start") while True: try: # BRPOPLPUSH: 从 task_queue 弹出(阻塞),并 push 到 processing_queue(原子) @@ -124,7 +119,7 @@ class LongTasks: task_id = task_obj["task_id"] payload = task_obj["payload"] except Exception as e: - print(f"[worker {worker_id}] bad item in queue, removing: {e}") + exception(f"[worker {worker_id}] bad item in queue, removing: {e}") # 异常数据从 processing_queue 中移除 await self.redis.lrem(self.processing_queue, 1, item) continue @@ -143,7 +138,7 @@ class LongTasks: # 2) 执行任务(catch exceptions) try: - result = await self.process_task(payload) + result = await self.process_task(worker_id, payload) except asyncio.CancelledError: # 若希望支持取消,可把 status 设为 cancelling 等 await self.update_task_hash(task_id, {"status": "FAILED", "error": "cancelled"}) @@ -169,12 +164,12 @@ class LongTasks: }) # 最后一步:从 processing_queue 中移除任务项(LREM) await self.redis.lrem(self.processing_queue, 1, item) - print(f"[worker {worker_id}] finished {task_id}") + debug(f"[worker {worker_id}] finished {task_id}") except asyncio.CancelledError: break except Exception as e: - print(f"[worker {worker_id}] loop error: {e}") + exception(f"[worker {worker_id}] loop error: {e}") await asyncio.sleep(1) async def submit_task(self, payload): @@ -208,12 +203,6 @@ class LongTasks: if __name__ == '__main__': async def main(lt): - x = schedule_interval(5, work1, 6) - print(f'interval worker {x}') - y = schedule_interval(3, work2) - print(f'interval worker {y}') - await asyncio.sleep(10000) - return tasks = [] for i in range(0, 10): payload = {