From accb61a3987837d59a2997ec33a77e797173fa6e Mon Sep 17 00:00:00 2001 From: yumoqing Date: Fri, 27 Mar 2026 12:04:13 +0800 Subject: [PATCH] bugfix --- uapi/uptask.py | 108 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 uapi/uptask.py diff --git a/uapi/uptask.py b/uapi/uptask.py new file mode 100644 index 0000000..0dbb427 --- /dev/null +++ b/uapi/uptask.py @@ -0,0 +1,108 @@ +import json +from appPublic.log import debug, exception +from appPublic.timeUtils import timestampstr +from appPublic.registerfunction import RegisterFunction +from appPublic.dictObject import DictObject +import inspect +from sqlor.dbpools import get_sor_context +from ahserver.serverenv import ServerEnv + +## 针对发起远端任务通过callback url 告知任务状态的逻辑 +async def uptask_started(taskid, userid, convert_func_name): + """ + taskid 任务id,唯一码 + convert_func_name: 响应数据转换函数,要完成数据校验,并将数据转换为正确的格式 + 并返回status, resp_data + status必须是:SUCCEEDED:成功, FAILED:失败, 其他状态可以不 + resp_resp 一个yaml格式的响应数据说明,这里只关注 状态(status)和识别码(idnentify_code)在返回数据中的字段名, 状态对照关系 + 例子: + ``` + status: + name: status + mapping: + failed: FAILED + success: SUCCEEDED + identify: identify + identify_code 识别码字段名字 + ``` + """ + + env = ServerEnv() + async with get_sor_context(env, 'longtasks') as sor: + id = env.uuid() + ns = { + 'id': id, + 'userid': userid, + 'executor_taskid': taskid, + 'convert_func_name': convert_func_name, + 'status': 'started', + 'start_timestamp': timestampstr() + } + d = await sor.C('uptask', ns) + return id + return None + +async def uptask_feedback(task_id, resp_data): + env = ServerEnv() + async with get_sor_context(env, 'longtasks') as sor: + recs = await sor.R('uptask', {'executor_taskid': task_id}) + if not recs: + e = f'{task_id} not found' + exception(e) + raise Exception(e) + lt = recs[0] + if lt.status in ['SUCCEEDED', 'FAILED']: + return + + rf = RegisterFunction() + resp = None + f = rf.get(lt.convert_func_name) + try: + if inspect.iscoroutinefunction(f): + resp = await f(resp_data) + else: + resp = f(resp_data) + if resp is None: + raise Exception(f'{task_id} {lt.convert_func_name}() return None') + resp = DictObject(**resp) + except Exception as e: + e = f'{task_id}{e}' + exception(e) + raise Exception(e) + + lt.status = resp.status + if resp.status in ['SUCCEEDED', 'FAILED']: + lt.response_data = json.dumps(resp, ensure_ascii=False) + lt.end_timestamp = timestampstr() + rf.delete(lt.convert_func_name) + await sor.U('uptask', lt.copy()) + +async def get_my_uptasks(userid, biz_date): + env = ServerEnv() + begin = f'{biz_date} 00:00:00.000' + end = f'{biz_date} 24:00:00.999 + sql = """select * from uptask +where userid=${userid} + and start_timestamp >= ${begin}$ + and start_timestamp ${end}$""" + async with get_sor_context(env, 'longtasks') as sor: + recs = await sor.R(sql, {'userid': userid, 'biz_date': biz_date}) + return recs + return [] + +async def check_uptask_status(task_id): + env = ServerEnv() + async with get_sor_context(env, 'longtasks') as sor: + recs = await sor.R('uptask', {'executor_taskid': task_id}) + if not recs: + e = f'{task_id} not found' + exception(e) + raise Exception(e) + lt = recs[0] + if lt.status in ['SUCCEEDED', 'FAILED']: + return DictObject(**json.loads(lt.response_data)) + + return DictObject(**{ + 'status': lt.status + }) +