53 lines
1.2 KiB
Markdown
53 lines
1.2 KiB
Markdown
# longtasks
|
|
|
|
## Usage
|
|
|
|
Use with ahserver, you need to save a instance of LongTasks or its child class to ServerEnv, in child class, implements the process_task method for your business
|
|
|
|
save LongTasks instace to ServerEnv:
|
|
```
|
|
from ahserver.serverenv import ServerEnv
|
|
from longtasks.longtasks import Longtasks
|
|
from appPublic.worker import schedule_once
|
|
|
|
class MyTasks(LongTasks):
|
|
# Child class
|
|
async def process_task(self, payload):
|
|
# use logic to execute task
|
|
pass
|
|
|
|
def load_longtasks()
|
|
longtasks = MyTasks('redis://127.0.0.1:6379', 'example')
|
|
env = ServerEnv()
|
|
env.longtasks = longtasks
|
|
# run the backend job
|
|
schedule_once(0.1, longtasks.run)
|
|
|
|
```
|
|
|
|
submit a task in dspy
|
|
```
|
|
payload = {
|
|
'prompt':'gagagag'
|
|
}
|
|
x = await longtasks.submit_task(payload)
|
|
# x is a dict with a 'task_id' key, it must be used to query the status of task
|
|
return x
|
|
```
|
|
|
|
query task status
|
|
```
|
|
taskid = 'mytaskid'
|
|
task_status = await longtasks.get_status(taskid)
|
|
## task_status is a dict
|
|
# {
|
|
# "status": one of "SUCCEEDED", "FAILED", "PENDING", "RUNNING"
|
|
# "result": process_task method returned
|
|
# "created_at": task created time
|
|
# "started_at": task begin running time
|
|
# "finished_at": task finished time, success or failed
|
|
# }
|
|
return task_status
|
|
```
|
|
|