This commit is contained in:
yumoqing 2026-04-09 13:21:38 +08:00
parent 9e9a6e4e02
commit 3659c75097
2 changed files with 63 additions and 91 deletions

View File

@ -88,10 +88,12 @@ async def async_uapi_request(request, llm,
llmusage.use_date = curDateString() llmusage.use_date = curDateString()
llmusage.use_time = timestampstr() llmusage.use_time = timestampstr()
llmusage.userid = callerid llmusage.userid = callerid
llmusage.ioinfo = json.dumps({ ioinfo = json.dumps({
"input": params_kw, "input": params_kw,
'output': [d] 'output': [d]
}, ensure_ascii=False) }, ensure_ascii=False)
webpath = await write_llmio(llmusage.id, ioinfo)
llmusage.ioinfo = webpath
llmusage.taskid = d.taskid llmusage.taskid = d.taskid
llmusage.transno = params_kw.transno llmusage.transno = params_kw.transno
llmusage.responsed_seconds = responsed_seconds llmusage.responsed_seconds = responsed_seconds
@ -119,30 +121,13 @@ async def async_uapi_request(request, llm,
yield f'{s}\n' yield f'{s}\n'
return return
async def add_new_llmusage_output(luid, newd): async def modify_llmusage_status(llmusage):
env = ServerEnv() env = ServerEnv()
newd = newd.copy()
async with get_sor_context(env, 'llmage') as sor: async with get_sor_context(env, 'llmage') as sor:
recs = await sor.R('llmusage', {'id': luid}) await sor.U('llmusage', {
if recs: 'id': llmusage.id,
r = recs[0] 'status': llmusage.status
io = json.loads(r.ioinfo) })
out = io.get('output', [])
rzt = newd.get('output')
if rzt:
out.append(rzt)
newd = {k:v for k,v in newd.items() if k != 'output'}
io['output'] = out
r.ioinfo = json.dumps(io, ensure_ascii=False)
r.update(newd)
await sor.U('llmusage', r)
# debug(f'llmuasage update to {r}')
return
else:
exception(f'add_new_llmusage_output({luid}, {newd}) llmusage not found')
return
exception(f'add_new_llmusage_output({luid}, {newd}) Error')
def get_llmusage_last_output(r): def get_llmusage_last_output(r):
io = json.loads(r.ioinfo) io = json.loads(r.ioinfo)
@ -152,7 +137,7 @@ def get_llmusage_last_output(r):
d = DictObject(**outs[-1]) d = DictObject(**outs[-1])
return d return d
async def query_task_status(request, upappid, apiname, luid, userid, taskid): async def get_llm_llmusage(luid):
env = request._run_ns env = request._run_ns
async with get_sor_context(env, 'llmage') as sor: async with get_sor_context(env, 'llmage') as sor:
recs = await sor.R('llmusage', {'id': luid}) recs = await sor.R('llmusage', {'id': luid})
@ -171,68 +156,36 @@ async def query_task_status(request, upappid, apiname, luid, userid, taskid):
exception(f'{e}') exception(f'{e}')
raise e raise e
llm = llms[0] llm = llms[0]
lastoutout = get_llmusage_last_output(llmusage) return llm, llmusage
async def query_task_status(request, upappid, apiname, luid, userid, taskid):
uapi = UpAppApi(request) uapi = UpAppApi(request)
apinames = apiname.split(',') apinames = apiname.split(',')
llm, llmusage = await get_llm_llmusage(luid)
for apiname in apinames: for apiname in apinames:
status = 'unknown' while True
changed = None lastoutout = get_llmusage_last_output(llmusage)
while status != 'SUCCEEDED': if lastoutout['status'] in ['FAILED', 'SUCCEEDED']
return
ns = {'taskid': taskid} ns = {'taskid': taskid}
b = d = None new_output = b = d = None
try: try:
b = await uapi.call(upappid, apiname, userid, params=ns) b = await uapi.call(upappid, apiname, userid, params=ns)
if isinstance(b, bytes): if isinstance(b, bytes):
b = b.decode('utf-8') b = b.decode('utf-8')
d = json.loads(b) new_output = json.loads(b)
changed = DictObject(**{
'status': d['status'],
'output': d
})
except Exception as e: except Exception as e:
exception(f'{e}, {b=}') exception(f'{e}, {b=}')
changed = { new_output = {
'status': 'FAILED', 'status': 'FAILED',
'output': {'status': 'FAILED', 'error': str(e)} 'error': f'{b},{e}}'
} }
await add_new_llmusage_output(luid, changed) if lastoutout['status'] != new_output['status']:
return llmusage.status = new_output['status']
if changed.status == 'SUCCEEDED': await append_new_llmoutput(llmusage.id, new_output)
llmusage.usages = changed.output.usage if llmusage.status in ['FAILED', 'SUCCEEDED']:
"""联机不记账 await modify_llmusage_status(llmusage)
if llm.ppid:
try:
charging = await llm_charging(sor,
llm.ppid, llmusage)
if charging:
changed.amount = charging.amount
changed.cost = charging.cost
debug(f'{changed=},{charging=}')
else:
changed.amount = cost = 0.0
except Exception as e:
e1 = Exception(f'{llm.ppid} charging error{e}, {llm.ppid}, {llmusage=}')
exception(f'{e}')
changed.amount = changed.cost = 0
else:
changed.amount = 0
changed.cost = 0
llmusage.amount = changed.amount
llmusage.cost = changed.cost
"""
await add_new_llmusage_output(luid, changed)
if changed.status == 'FAILED':
return
if changed.status == 'SUCCEEDED':
"""联机不记账
if llmusage.accounting_status != 'accounted' \
and changed.amount > 0.00001:
try:
await llm_accounting(llmusage)
except Exception as e:
debug(f'{changed=} accounting failed,{e=} ')
"""
return return
await asyncio.sleep(llm.query_period or 30) await asyncio.sleep(llm.query_period or 30)

View File

@ -13,6 +13,25 @@ from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi
from ahserver.serverenv import get_serverenv, ServerEnv from ahserver.serverenv import get_serverenv, ServerEnv
from ahserver.filestorage import FileStorage from ahserver.filestorage import FileStorage
async def append_new_llmoutput(webpath, output):
fs = FileStorage()
p = fs.realPath(webpath)
if not isinstance(output, str):
output = json.loads(output)
bin = await read_webpath(webpath)
io = json.loads(bin.decode('utf-8'))
io['output'].append(output)
async with aiofiles.open(p, 'wb') as f:
iostr = json.dumps(io, ensure_ascii=False, indent=4)
f.write(iostr.encode('utf-8'))
async def read_webpath(webpath):
fs = FileStorage()
p = fs.realPath(webpath)
async with aiofiles.open(p,'rb') as f:
bin = f.read()
return bin
async def write_llmio(luid, io_dic): async def write_llmio(luid, io_dic):
fs = FileStorage() fs = FileStorage()
s = json.dumps(io_dic, ensure_ascii=False, indent=4) s = json.dumps(io_dic, ensure_ascii=False, indent=4)