2025-11-06 11:48:11 +08:00
2025-11-06 11:48:11 +08:00
2025-11-05 14:06:52 +08:00
2025-11-04 16:34:32 +08:00
2025-11-04 17:31:41 +08:00

longtasks

Usage

Use with ahserver, you need to save a instance of LongTasks or its child class to ServerEnv, in child class, implements the process_task method for your business

save LongTasks instace to ServerEnv:

from ahserver.serverenv import ServerEnv
from longtasks.longtasks import Longtasks
from appPublic.worker import schedule_once

class MyTasks(LongTasks):
	# Child class
    async def process_task(self, payload, workerid=None):
        # use logic to execute task
		pass

def load_longtasks()
    longtasks = MyTasks('redis://127.0.0.1:6379', 'example', worker_cnt=2, stuck_seconds=600, max_age_hours=3)
	## arguments 
	## example: task name of this instance, each instance must has difference task name
	## worker_cnt: backend job count
	## stuck_seconds: tasks in processing_queue have waited longer than, will reenter the queue
	## max_age_hours: task created longer the max_age_hours will deleted from the LongTasks instance
    env = ServerEnv()
    env.longtasks = longtasks
    # run the backend job 
	schedule_once(0.1, longtasks.run)

submit a task in dspy

payload = {
	'prompt':'gagagag'
}
x = await longtasks.submit_task(payload)
# x is a dict with a 'task_id' key, it must be used to query the status of task
return x

query task status

taskid = 'mytaskid'
task_status = await longtasks.get_status(taskid)
## task_status is a dict
# {
#	"status": one of "SUCCEEDED", "FAILED", "PENDING", "RUNNING"
#	"result": process_task method returned
#	"created_at": task created time
#	"started_at": task begin running time
#	"finished_at": task finished time, success or failed
# }
return task_status
Description
No description provided
Readme 81 KiB
Languages
Python 100%