# longtasks ## Usage Use with ahserver, you need to save a instance of LongTasks or its child class to ServerEnv, in child class, implements the process_task method for your business save LongTasks instace to ServerEnv: ``` from ahserver.serverenv import ServerEnv from longtasks.longtasks import Longtasks from appPublic.worker import schedule_once class MyTasks(LongTasks): # Child class async def process_task(self, payload, workerid=None): # use logic to execute task pass def load_longtasks() longtasks = MyTasks('redis://127.0.0.1:6379', 'example', worker_cnt=2, stuck_seconds=600, max_age_hours=3) ## arguments ## example: task name of this instance, each instance must has difference task name ## worker_cnt: backend job count ## stuck_seconds: tasks in processing_queue have waited longer than, will reenter the queue ## max_age_hours: task created longer the max_age_hours will deleted from the LongTasks instance env = ServerEnv() env.longtasks = longtasks # run the backend job schedule_once(0.1, longtasks.run) ``` submit a task in dspy ``` payload = { 'prompt':'gagagag' } x = await longtasks.submit_task(payload) # x is a dict with a 'task_id' key, it must be used to query the status of task return x ``` query task status ``` taskid = 'mytaskid' task_status = await longtasks.get_status(taskid) ## task_status is a dict # { # "status": one of "SUCCEEDED", "FAILED", "PENDING", "RUNNING" # "result": process_task method returned # "created_at": task created time # "started_at": task begin running time # "finished_at": task finished time, success or failed # } return task_status ```