109 lines
3.1 KiB
Python
109 lines
3.1 KiB
Python
import json
|
||
from appPublic.log import debug, exception
|
||
from appPublic.timeUtils import timestampstr
|
||
from appPublic.registerfunction import RegisterFunction
|
||
from appPublic.dictObject import DictObject
|
||
import inspect
|
||
from sqlor.dbpools import get_sor_context
|
||
from ahserver.serverenv import ServerEnv
|
||
|
||
## 针对发起远端任务通过callback url 告知任务状态的逻辑
|
||
async def uptask_started(taskid, userid, convert_func_name):
|
||
"""
|
||
taskid 任务id,唯一码
|
||
convert_func_name: 响应数据转换函数,要完成数据校验,并将数据转换为正确的格式
|
||
并返回status, resp_data
|
||
status必须是:SUCCEEDED:成功, FAILED:失败, 其他状态可以不
|
||
resp_resp 一个yaml格式的响应数据说明,这里只关注 状态(status)和识别码(idnentify_code)在返回数据中的字段名, 状态对照关系
|
||
例子:
|
||
```
|
||
status:
|
||
name: status
|
||
mapping:
|
||
failed: FAILED
|
||
success: SUCCEEDED
|
||
identify: identify
|
||
identify_code 识别码字段名字
|
||
```
|
||
"""
|
||
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'longtasks') as sor:
|
||
id = env.uuid()
|
||
ns = {
|
||
'id': id,
|
||
'userid': userid,
|
||
'executor_taskid': taskid,
|
||
'convert_func_name': convert_func_name,
|
||
'status': 'started',
|
||
'start_timestamp': timestampstr()
|
||
}
|
||
d = await sor.C('uptask', ns)
|
||
return id
|
||
return None
|
||
|
||
async def uptask_feedback(task_id, resp_data):
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'longtasks') as sor:
|
||
recs = await sor.R('uptask', {'executor_taskid': task_id})
|
||
if not recs:
|
||
e = f'{task_id} not found'
|
||
exception(e)
|
||
raise Exception(e)
|
||
lt = recs[0]
|
||
if lt.status in ['SUCCEEDED', 'FAILED']:
|
||
return
|
||
|
||
rf = RegisterFunction()
|
||
resp = None
|
||
f = rf.get(lt.convert_func_name)
|
||
try:
|
||
if inspect.iscoroutinefunction(f):
|
||
resp = await f(resp_data)
|
||
else:
|
||
resp = f(resp_data)
|
||
if resp is None:
|
||
raise Exception(f'{task_id} {lt.convert_func_name}() return None')
|
||
resp = DictObject(**resp)
|
||
except Exception as e:
|
||
e = f'{task_id}{e}'
|
||
exception(e)
|
||
raise Exception(e)
|
||
|
||
lt.status = resp.status
|
||
if resp.status in ['SUCCEEDED', 'FAILED']:
|
||
lt.response_data = json.dumps(resp, ensure_ascii=False)
|
||
lt.end_timestamp = timestampstr()
|
||
rf.delete(lt.convert_func_name)
|
||
await sor.U('uptask', lt.copy())
|
||
|
||
async def get_my_uptasks(userid, biz_date):
|
||
env = ServerEnv()
|
||
begin = f'{biz_date} 00:00:00.000'
|
||
end = f'{biz_date} 24:00:00.999
|
||
sql = """select * from uptask
|
||
where userid=${userid}
|
||
and start_timestamp >= ${begin}$
|
||
and start_timestamp ${end}$"""
|
||
async with get_sor_context(env, 'longtasks') as sor:
|
||
recs = await sor.R(sql, {'userid': userid, 'biz_date': biz_date})
|
||
return recs
|
||
return []
|
||
|
||
async def check_uptask_status(task_id):
|
||
env = ServerEnv()
|
||
async with get_sor_context(env, 'longtasks') as sor:
|
||
recs = await sor.R('uptask', {'executor_taskid': task_id})
|
||
if not recs:
|
||
e = f'{task_id} not found'
|
||
exception(e)
|
||
raise Exception(e)
|
||
lt = recs[0]
|
||
if lt.status in ['SUCCEEDED', 'FAILED']:
|
||
return DictObject(**json.loads(lt.response_data))
|
||
|
||
return DictObject(**{
|
||
'status': lt.status
|
||
})
|
||
|