From fd7fce82ec1cc7465f36d3d3402fe1bfe9f2c808 Mon Sep 17 00:00:00 2001 From: yumoqing Date: Mon, 3 Nov 2025 18:26:26 +0800 Subject: [PATCH] bugfix --- longtasks/__init__.py | 0 longtasks/longtasks.py | 180 +++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 21 +++++ 3 files changed, 201 insertions(+) create mode 100644 longtasks/__init__.py create mode 100644 longtasks/longtasks.py create mode 100644 pyproject.toml diff --git a/longtasks/__init__.py b/longtasks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/longtasks/longtasks.py b/longtasks/longtasks.py new file mode 100644 index 0000000..d72afbe --- /dev/null +++ b/longtasks/longtasks.py @@ -0,0 +1,180 @@ +# -*- coding:utf-8 -*- +import asyncio +import aioredis +import json +import time +from typing import Any, Dict +from appPublic.uniqueID import getID + + +class LongTask: + def __init__(self, task: asyncio.Task): + self.id = getID() + self.status = 'pending', + self.result = None + self.task_status = None + self.start_time = time.time() + + def start(self): + self.status = 'running' + self.task.run() + + def status(self): + return { + 'taskid': self.id, + 'status':self.status, + 'task_status': self.task_status, + 'result': self.result + } + +class LongTasks: + def __init__(self, redis_url, task_queue, processing_queue, worker_cnt=2 stuck_seconds=600): + self.redis_url = redis_url + self.worker_cnt = worker_cnt + self.task_queue = task_queue + self.processing_queue = processing_queue + self.stuck_seconds = stuck_seconds + + async def init(self): + self.redis = await aioredis.from_url(self.redis_url, decode_responses=True) + await self.recover_stuck_tasks() + workers = [asyncio.create_task(self.worker_loop(redis, i)) for i in range(self.worker_cnt)] + try: + await asyncio.gather(*workers) + except asyncio.CancelledError: + for w in workers: + w.cancel() + finally: + await redis.close() + + async def update_task_hash(self, task_id: str, mapping: Dict[str, Any]): + # all values must be str + str_map = {k: json.dumps(v) if not isinstance(v, str) else v for k, v in mapping.items()} + await self.redis.hset(f"task:{task_id}", mapping=str_map) + + async def recover_stuck_tasks(self): + """ + 启动时或定期调用,检查 processing_queue 中可能卡住的任务, + 如果某任务的 task:{id}.started_at 距现在 > self.stuck_seconds,则认为卡住并重新入队或标记为 failed。 + """ + # 读取整个 processing_queue(注意:当队列非常大时需改成分页) + items = await redis.lrange(self.processing_queue, 0, -1) + now = time.time() + for item in items: + try: + task_obj = json.loads(item) + task_id = task_obj["task_id"] + except Exception: + # 非法项直接清理 + await redis.lrem(self.processing_queue, 1, item) + continue + + task_key = f"task:{task_id}" + info = await redis.hgetall(task_key, encoding="utf-8") + if not info: + # 如果 task hash 不存在,可选择直接删除或重新 enqueue + # 这里我们选择重新入队并删除 processing entry + await redis.rpush(self.task_queue, item) + await redis.lrem(self.processing_queue, 1, item) + print(f"[recover] requeued missing-hash {task_id}") + continue + + started_at = float(info.get("started_at") or 0) + status = info.get("status") + if status == "running" and (now - started_at) > self.stuck_seconds: + # 任务卡住 -> 重新入队并更新 attempts 或直接标记失败 + # 示例:重新入队并增加 attempts 字段 + attempts = int(json.loads(info.get("attempts") or "0")) + attempts += 1 + await self.update_task_hash(task_id, {"status": "queued", "attempts": attempts}) + await redis.rpush(self.task_queue, item) + await redis.lrem(self.processing_queue, 1, item) + print(f"[recover] task {task_id} requeued due to stuck") + # else: 正常 running 或其他状态,不处理 + + async def worker_loop(self, worker_id: int): + print(f"[worker {worker_id}] start") + while True: + try: + # BRPOPLPUSH: 从 task_queue 弹出(阻塞),并 push 到 processing_queue(原子) + # aioredis: brpoplpush(source, destination, timeout) + item = await self.redis.brpoplpush(self.task_queue, self.processing_queue, timeout=5) + if not item: + await asyncio.sleep(0.1) + continue + + # item 是字符串 JSON + try: + task_obj = json.loads(item) + task_id = task_obj["task_id"] + payload = task_obj["payload"] + except Exception as e: + print(f"[worker {worker_id}] bad item in queue, removing: {e}") + # 异常数据从 processing_queue 中移除 + await self.redis.lrem(self.processing_queue, 1, item) + continue + + # 1) 更新 task hash 为 running(这一步很重要:客户端读取到状态) + started_at = time.time() + await self.update_task_hash(task_id, { + "status": "running", + "started_at": started_at, + # optional: increment attempts + }) + + # 2) 执行任务(catch exceptions) + try: + result = await process_task(payload) + except asyncio.CancelledError: + # 若希望支持取消,可把 status 设为 cancelling 等 + await self.update_task_hash(task_id, {"status": "failed", "error": "cancelled"}) + # 移除 processing_queue 项(已处理) + await self.redis.lrem(self.processing_queue, 1, item) + continue + except Exception as e: + # 写回失败信息 + await self.update_task_hash(task_id, { + "status": "failed", + "error": str(e), + "finished_at": time.time() + }) + # 从 processing_queue 移除该项 + await self.redis.lrem(self.processing_queue, 1, item) + continue + + # 3) 写回成功结果并移除 processing_queue 项 + await self.update_task_hash(task_id, { + "status": "success", + "result": result, + "finished_at": time.time() + }) + # 最后一步:从 processing_queue 中移除任务项(LREM) + await self.redis.lrem(self.processing_queue, 1, item) + print(f"[worker {worker_id}] finished {task_id}") + + except asyncio.CancelledError: + break + except Exception as e: + print(f"[worker {worker_id}] loop error: {e}") + await asyncio.sleep(1) + + async def submit_task(self, payload): + taskid = getID() + task_data = { + "task_id": taskid, + "status": "pending", + "payload": json.dumps(payload) + } + await self.redis.hset(f'task:{taskid}', mapping=task_data) + await self.redis.rpush(self.task_queue, json.dumps({ + "task_id": taskid, + "payload": payload + })) + return {'task_id': taskid} + + async def get_status(taskid:str): + task = self.redis.hgetall(f'task:{taskid}', encoding="utf-8") + if not task: + return {'error': 'no task'} + return task + diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..7828a09 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,21 @@ +[build-system] +requires = ["setuptools>=61", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "longtasks" +version = "0.1.0" +description = "long task engine with submit, get_status interface to client" +authors = [{name = "Yu Moqing", email = "yumoqing@gmail.com"}] +license = {text = "MIT"} +dependencies = ["ahserver", "sqlor", "appPublic"] + +[tool.setuptools.packages.find] +where = ["."] +include = ["longtasks"] + +# [tool.setuptools] +#include-package-data = true + +#[tool.setuptools.package-data] +#checklang = ["lid.176.ftz"]