Compare commits
No commits in common. "56b4fdc439a667350082e0f152287d3546880acd" and "151440964ff382a9072aa5bcb2690afc37ed2b1f" have entirely different histories.
56b4fdc439
...
151440964f
@ -104,8 +104,9 @@ async def async_uapi_request(request, llm, sor,
|
||||
await write_llmusage(llmusage)
|
||||
# if llm.callbackurl:
|
||||
# return
|
||||
apinames = [ name.strip() for name in llm.query_apiname.split(',') ]
|
||||
asyncio.create_task(query_task_status(request, llm.upappid,
|
||||
llm.query_apiname, luid, userid, d.taskid))
|
||||
apinames, luid, userid, d.taskid))
|
||||
|
||||
except Exception as e:
|
||||
exception(f'{e=},{format_exc()}')
|
||||
@ -151,12 +152,6 @@ async def query_task_status(request, upappid, apiname, luid, userid, taskid):
|
||||
exception(f'{e}')
|
||||
raise e
|
||||
llmusage = recs[0]
|
||||
llms = await sor.R('llm', {'id': llmusage.llmid})
|
||||
if len(llms) == 0:
|
||||
e = Exception(f'{llmusage.llmid=} not found in llm')
|
||||
exception(f'{e}')
|
||||
raise e
|
||||
llm = llms[0]
|
||||
lastoutout = get_llmusage_last_output(llmusage)
|
||||
if lastoutout and lastoutout.status == 'SUCCEEDED':
|
||||
return
|
||||
@ -187,6 +182,12 @@ async def query_task_status(request, upappid, apiname, luid, userid, taskid):
|
||||
return
|
||||
if changed.status == 'SUCCEEDED':
|
||||
llmusage.usage = changed.output.usage
|
||||
llms = await sor.R('llm', {'id': llmusage.llmid})
|
||||
if len(llms) == 0:
|
||||
e = Exception(f'{llmusage.llmid=} not found in llm')
|
||||
exception(f'{e}')
|
||||
raise e
|
||||
llm = llms[0]
|
||||
if llm.ppid:
|
||||
try:
|
||||
charging = await llm_charging(sor,
|
||||
@ -209,6 +210,5 @@ async def query_task_status(request, upappid, apiname, luid, userid, taskid):
|
||||
debug(f'{changed=} accounted ')
|
||||
if changed.status in ['FAILED', 'SUCCEEDED']:
|
||||
return
|
||||
await asyncio.sleep(llm.query_period or 30)
|
||||
debug(f'{llm.query_period=} seconds will retry, {changed.status=}')
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
|
||||
@ -209,3 +209,57 @@ async def llm_query_price(llmid, config_data):
|
||||
prices = await env.pricing_program_charging(sor, llm.ppid, config_data)
|
||||
return prices
|
||||
|
||||
async def add_new_llmusage_output(luid, rzt):
|
||||
env = ServerEnv()
|
||||
async with get_sor_context(env, 'llmage') as sor:
|
||||
recs = await sor.R('llmusage', {'id': luid})
|
||||
if recs:
|
||||
r = recs[0]
|
||||
io = json.loads(r.ioinfo)
|
||||
out = io.get('output', [])
|
||||
out.append(out)
|
||||
io['output'] = out
|
||||
r.ioinfo = json.dumps({
|
||||
'input': io.get('input',{}),
|
||||
'output': out
|
||||
})
|
||||
await sor.U('llmusage', r)
|
||||
return
|
||||
|
||||
async def query_task_status(request, upappid, apinames, luid, userid, taskid):
|
||||
async with get_sor_context(env, 'llmage') as sor:
|
||||
uapi = UAPI(request, sor)
|
||||
for apiname in apinames:
|
||||
try:
|
||||
ns = {'taskid': taskid}
|
||||
b = await uapi.call(upappid, apiname, userid, params=ns)
|
||||
if isinstance(b, bytes):
|
||||
b = b.decode('utf-8')
|
||||
d = json.loads(b)
|
||||
rzt = DictObject(**d)
|
||||
await add_new_llmusage_output(luid, rzt)
|
||||
if rzt.status == 'FAILED':
|
||||
return
|
||||
if rzt.status == 'SUCCEEDED':
|
||||
if llm.ppid:
|
||||
try:
|
||||
chargings = await llm_charging(sor,
|
||||
llm.ppid, callerid, usage)
|
||||
llmusage.amount = chargings.amount
|
||||
llmusage.cost = chargings.cost
|
||||
except Exception as e:
|
||||
e = Exception(f'{llm.pid} charging error{e}')
|
||||
exception(f'{e}')
|
||||
else:
|
||||
llmusage.amount = 0
|
||||
llmusage.cost = 0
|
||||
await llm_accounting(request, llmusage)
|
||||
|
||||
except Exception as e:
|
||||
exception(f'{e=},{format_exc()}')
|
||||
estr = erase_apikey(e)
|
||||
recs = sor.R('llmusage', {'id': luid})
|
||||
ed = {"error": f"ERROR:{estr}", "status": "FAILED", 'taskid': taskid}
|
||||
await add_new_llmusage_output(luid, ed)
|
||||
return
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user