Fix LongTasks import and worker interface
This commit is contained in:
parent
38213396f5
commit
9ff3f19e88
46
ah.py
46
ah.py
@ -5,20 +5,42 @@ import sys, os
|
|||||||
sys.path.insert(0, os.path.dirname(__file__))
|
sys.path.insert(0, os.path.dirname(__file__))
|
||||||
|
|
||||||
from ahserver.webapp import webapp
|
from ahserver.webapp import webapp
|
||||||
from longtasks import LongTasks
|
|
||||||
from ahserver.serverenv import ServerEnv
|
from ahserver.serverenv import ServerEnv
|
||||||
|
from ahserver.configuredServer import add_startup
|
||||||
|
from longtasks.longtasks import LongTasks, schedule_once
|
||||||
|
from appPublic.log import debug
|
||||||
from workers.verify import VerifyWorker
|
from workers.verify import VerifyWorker
|
||||||
|
|
||||||
def init(app):
|
|
||||||
env = ServerEnv()
|
|
||||||
env.longtasks = LongTasks(
|
|
||||||
app=app,
|
|
||||||
worker_class=VerifyWorker,
|
|
||||||
queue_name="verify_delivery",
|
|
||||||
worker_cnt=1,
|
|
||||||
stuck_seconds=600,
|
|
||||||
max_age_hours=24
|
|
||||||
)
|
|
||||||
return env
|
|
||||||
|
|
||||||
|
class VerifyTasks(LongTasks):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
async def process_task(self, payload, workid=None):
|
||||||
|
import json
|
||||||
|
if isinstance(payload, str):
|
||||||
|
payload = json.loads(payload)
|
||||||
|
worker = VerifyWorker(self)
|
||||||
|
result = await worker.run_task(payload)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
async def on_app_built(app):
|
||||||
|
env = ServerEnv()
|
||||||
|
lt = env.longtasks
|
||||||
|
if lt:
|
||||||
|
schedule_once(0.1, lt.run)
|
||||||
|
debug('Verify delivery longtasks worker started')
|
||||||
|
|
||||||
|
|
||||||
|
def init():
|
||||||
|
env = ServerEnv()
|
||||||
|
env.longtasks = VerifyTasks(
|
||||||
|
'redis://127.0.0.1:6379', 'verify_delivery',
|
||||||
|
worker_cnt=1, stuck_seconds=600, max_age_hours=24
|
||||||
|
)
|
||||||
|
add_startup(on_app_built)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
webapp(init)
|
webapp(init)
|
||||||
|
|||||||
@ -11,10 +11,7 @@ class VerifyWorker:
|
|||||||
def __init__(self, longtasks):
|
def __init__(self, longtasks):
|
||||||
self.lt = longtasks
|
self.lt = longtasks
|
||||||
|
|
||||||
async def run(self, task):
|
async def run_task(self, payload):
|
||||||
payload = task.payload
|
|
||||||
task_id = task.task_id
|
|
||||||
|
|
||||||
mtv_path = payload.get("mtv_path", "")
|
mtv_path = payload.get("mtv_path", "")
|
||||||
ktv_path = payload.get("ktv_path", "")
|
ktv_path = payload.get("ktv_path", "")
|
||||||
ass_path = payload.get("ass_path", "")
|
ass_path = payload.get("ass_path", "")
|
||||||
@ -67,7 +64,6 @@ class VerifyWorker:
|
|||||||
passed = len(all_errors) == 0
|
passed = len(all_errors) == 0
|
||||||
return {
|
return {
|
||||||
"status": "PASSED" if passed else "FAILED",
|
"status": "PASSED" if passed else "FAILED",
|
||||||
"task_id": task_id,
|
|
||||||
"qa_results": qa_results,
|
"qa_results": qa_results,
|
||||||
"total_errors": len(all_errors),
|
"total_errors": len(all_errors),
|
||||||
"errors": all_errors
|
"errors": all_errors
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user