bugfix
This commit is contained in:
parent
be70972ebf
commit
fd7fce82ec
0
longtasks/__init__.py
Normal file
0
longtasks/__init__.py
Normal file
180
longtasks/longtasks.py
Normal file
180
longtasks/longtasks.py
Normal file
@ -0,0 +1,180 @@
|
|||||||
|
# -*- coding:utf-8 -*-
|
||||||
|
import asyncio
|
||||||
|
import aioredis
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
from typing import Any, Dict
|
||||||
|
from appPublic.uniqueID import getID
|
||||||
|
|
||||||
|
|
||||||
|
class LongTask:
|
||||||
|
def __init__(self, task: asyncio.Task):
|
||||||
|
self.id = getID()
|
||||||
|
self.status = 'pending',
|
||||||
|
self.result = None
|
||||||
|
self.task_status = None
|
||||||
|
self.start_time = time.time()
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
self.status = 'running'
|
||||||
|
self.task.run()
|
||||||
|
|
||||||
|
def status(self):
|
||||||
|
return {
|
||||||
|
'taskid': self.id,
|
||||||
|
'status':self.status,
|
||||||
|
'task_status': self.task_status,
|
||||||
|
'result': self.result
|
||||||
|
}
|
||||||
|
|
||||||
|
class LongTasks:
|
||||||
|
def __init__(self, redis_url, task_queue, processing_queue, worker_cnt=2 stuck_seconds=600):
|
||||||
|
self.redis_url = redis_url
|
||||||
|
self.worker_cnt = worker_cnt
|
||||||
|
self.task_queue = task_queue
|
||||||
|
self.processing_queue = processing_queue
|
||||||
|
self.stuck_seconds = stuck_seconds
|
||||||
|
|
||||||
|
async def init(self):
|
||||||
|
self.redis = await aioredis.from_url(self.redis_url, decode_responses=True)
|
||||||
|
await self.recover_stuck_tasks()
|
||||||
|
workers = [asyncio.create_task(self.worker_loop(redis, i)) for i in range(self.worker_cnt)]
|
||||||
|
try:
|
||||||
|
await asyncio.gather(*workers)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
for w in workers:
|
||||||
|
w.cancel()
|
||||||
|
finally:
|
||||||
|
await redis.close()
|
||||||
|
|
||||||
|
async def update_task_hash(self, task_id: str, mapping: Dict[str, Any]):
|
||||||
|
# all values must be str
|
||||||
|
str_map = {k: json.dumps(v) if not isinstance(v, str) else v for k, v in mapping.items()}
|
||||||
|
await self.redis.hset(f"task:{task_id}", mapping=str_map)
|
||||||
|
|
||||||
|
async def recover_stuck_tasks(self):
|
||||||
|
"""
|
||||||
|
启动时或定期调用,检查 processing_queue 中可能卡住的任务,
|
||||||
|
如果某任务的 task:{id}.started_at 距现在 > self.stuck_seconds,则认为卡住并重新入队或标记为 failed。
|
||||||
|
"""
|
||||||
|
# 读取整个 processing_queue(注意:当队列非常大时需改成分页)
|
||||||
|
items = await redis.lrange(self.processing_queue, 0, -1)
|
||||||
|
now = time.time()
|
||||||
|
for item in items:
|
||||||
|
try:
|
||||||
|
task_obj = json.loads(item)
|
||||||
|
task_id = task_obj["task_id"]
|
||||||
|
except Exception:
|
||||||
|
# 非法项直接清理
|
||||||
|
await redis.lrem(self.processing_queue, 1, item)
|
||||||
|
continue
|
||||||
|
|
||||||
|
task_key = f"task:{task_id}"
|
||||||
|
info = await redis.hgetall(task_key, encoding="utf-8")
|
||||||
|
if not info:
|
||||||
|
# 如果 task hash 不存在,可选择直接删除或重新 enqueue
|
||||||
|
# 这里我们选择重新入队并删除 processing entry
|
||||||
|
await redis.rpush(self.task_queue, item)
|
||||||
|
await redis.lrem(self.processing_queue, 1, item)
|
||||||
|
print(f"[recover] requeued missing-hash {task_id}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
started_at = float(info.get("started_at") or 0)
|
||||||
|
status = info.get("status")
|
||||||
|
if status == "running" and (now - started_at) > self.stuck_seconds:
|
||||||
|
# 任务卡住 -> 重新入队并更新 attempts 或直接标记失败
|
||||||
|
# 示例:重新入队并增加 attempts 字段
|
||||||
|
attempts = int(json.loads(info.get("attempts") or "0"))
|
||||||
|
attempts += 1
|
||||||
|
await self.update_task_hash(task_id, {"status": "queued", "attempts": attempts})
|
||||||
|
await redis.rpush(self.task_queue, item)
|
||||||
|
await redis.lrem(self.processing_queue, 1, item)
|
||||||
|
print(f"[recover] task {task_id} requeued due to stuck")
|
||||||
|
# else: 正常 running 或其他状态,不处理
|
||||||
|
|
||||||
|
async def worker_loop(self, worker_id: int):
|
||||||
|
print(f"[worker {worker_id}] start")
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
# BRPOPLPUSH: 从 task_queue 弹出(阻塞),并 push 到 processing_queue(原子)
|
||||||
|
# aioredis: brpoplpush(source, destination, timeout)
|
||||||
|
item = await self.redis.brpoplpush(self.task_queue, self.processing_queue, timeout=5)
|
||||||
|
if not item:
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# item 是字符串 JSON
|
||||||
|
try:
|
||||||
|
task_obj = json.loads(item)
|
||||||
|
task_id = task_obj["task_id"]
|
||||||
|
payload = task_obj["payload"]
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[worker {worker_id}] bad item in queue, removing: {e}")
|
||||||
|
# 异常数据从 processing_queue 中移除
|
||||||
|
await self.redis.lrem(self.processing_queue, 1, item)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 1) 更新 task hash 为 running(这一步很重要:客户端读取到状态)
|
||||||
|
started_at = time.time()
|
||||||
|
await self.update_task_hash(task_id, {
|
||||||
|
"status": "running",
|
||||||
|
"started_at": started_at,
|
||||||
|
# optional: increment attempts
|
||||||
|
})
|
||||||
|
|
||||||
|
# 2) 执行任务(catch exceptions)
|
||||||
|
try:
|
||||||
|
result = await process_task(payload)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
# 若希望支持取消,可把 status 设为 cancelling 等
|
||||||
|
await self.update_task_hash(task_id, {"status": "failed", "error": "cancelled"})
|
||||||
|
# 移除 processing_queue 项(已处理)
|
||||||
|
await self.redis.lrem(self.processing_queue, 1, item)
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
# 写回失败信息
|
||||||
|
await self.update_task_hash(task_id, {
|
||||||
|
"status": "failed",
|
||||||
|
"error": str(e),
|
||||||
|
"finished_at": time.time()
|
||||||
|
})
|
||||||
|
# 从 processing_queue 移除该项
|
||||||
|
await self.redis.lrem(self.processing_queue, 1, item)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# 3) 写回成功结果并移除 processing_queue 项
|
||||||
|
await self.update_task_hash(task_id, {
|
||||||
|
"status": "success",
|
||||||
|
"result": result,
|
||||||
|
"finished_at": time.time()
|
||||||
|
})
|
||||||
|
# 最后一步:从 processing_queue 中移除任务项(LREM)
|
||||||
|
await self.redis.lrem(self.processing_queue, 1, item)
|
||||||
|
print(f"[worker {worker_id}] finished {task_id}")
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[worker {worker_id}] loop error: {e}")
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
async def submit_task(self, payload):
|
||||||
|
taskid = getID()
|
||||||
|
task_data = {
|
||||||
|
"task_id": taskid,
|
||||||
|
"status": "pending",
|
||||||
|
"payload": json.dumps(payload)
|
||||||
|
}
|
||||||
|
await self.redis.hset(f'task:{taskid}', mapping=task_data)
|
||||||
|
await self.redis.rpush(self.task_queue, json.dumps({
|
||||||
|
"task_id": taskid,
|
||||||
|
"payload": payload
|
||||||
|
}))
|
||||||
|
return {'task_id': taskid}
|
||||||
|
|
||||||
|
async def get_status(taskid:str):
|
||||||
|
task = self.redis.hgetall(f'task:{taskid}', encoding="utf-8")
|
||||||
|
if not task:
|
||||||
|
return {'error': 'no task'}
|
||||||
|
return task
|
||||||
|
|
||||||
21
pyproject.toml
Normal file
21
pyproject.toml
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
[build-system]
|
||||||
|
requires = ["setuptools>=61", "wheel"]
|
||||||
|
build-backend = "setuptools.build_meta"
|
||||||
|
|
||||||
|
[project]
|
||||||
|
name = "longtasks"
|
||||||
|
version = "0.1.0"
|
||||||
|
description = "long task engine with submit, get_status interface to client"
|
||||||
|
authors = [{name = "Yu Moqing", email = "yumoqing@gmail.com"}]
|
||||||
|
license = {text = "MIT"}
|
||||||
|
dependencies = ["ahserver", "sqlor", "appPublic"]
|
||||||
|
|
||||||
|
[tool.setuptools.packages.find]
|
||||||
|
where = ["."]
|
||||||
|
include = ["longtasks"]
|
||||||
|
|
||||||
|
# [tool.setuptools]
|
||||||
|
#include-package-data = true
|
||||||
|
|
||||||
|
#[tool.setuptools.package-data]
|
||||||
|
#checklang = ["lid.176.ftz"]
|
||||||
Loading…
x
Reference in New Issue
Block a user