From 3659c750976cbc889a428b76974d29a4d3269220 Mon Sep 17 00:00:00 2001 From: yumoqing Date: Thu, 9 Apr 2026 13:21:38 +0800 Subject: [PATCH] bugfix --- llmage/asyncinference.py | 135 +++++++++++++-------------------------- llmage/utils.py | 19 ++++++ 2 files changed, 63 insertions(+), 91 deletions(-) diff --git a/llmage/asyncinference.py b/llmage/asyncinference.py index 20c6b7a..161e330 100644 --- a/llmage/asyncinference.py +++ b/llmage/asyncinference.py @@ -88,10 +88,12 @@ async def async_uapi_request(request, llm, llmusage.use_date = curDateString() llmusage.use_time = timestampstr() llmusage.userid = callerid - llmusage.ioinfo = json.dumps({ - "input": params_kw, - 'output': [d] - }, ensure_ascii=False) + ioinfo = json.dumps({ + "input": params_kw, + 'output': [d] + }, ensure_ascii=False) + webpath = await write_llmio(llmusage.id, ioinfo) + llmusage.ioinfo = webpath llmusage.taskid = d.taskid llmusage.transno = params_kw.transno llmusage.responsed_seconds = responsed_seconds @@ -119,30 +121,13 @@ async def async_uapi_request(request, llm, yield f'{s}\n' return -async def add_new_llmusage_output(luid, newd): +async def modify_llmusage_status(llmusage): env = ServerEnv() - newd = newd.copy() async with get_sor_context(env, 'llmage') as sor: - recs = await sor.R('llmusage', {'id': luid}) - if recs: - r = recs[0] - io = json.loads(r.ioinfo) - out = io.get('output', []) - rzt = newd.get('output') - if rzt: - out.append(rzt) - newd = {k:v for k,v in newd.items() if k != 'output'} - io['output'] = out - r.ioinfo = json.dumps(io, ensure_ascii=False) - r.update(newd) - await sor.U('llmusage', r) - # debug(f'llmuasage update to {r}') - return - else: - exception(f'add_new_llmusage_output({luid}, {newd}) llmusage not found') - return - - exception(f'add_new_llmusage_output({luid}, {newd}) Error') + await sor.U('llmusage', { + 'id': llmusage.id, + 'status': llmusage.status + }) def get_llmusage_last_output(r): io = json.loads(r.ioinfo) @@ -152,7 +137,7 @@ def get_llmusage_last_output(r): d = DictObject(**outs[-1]) return d -async def query_task_status(request, upappid, apiname, luid, userid, taskid): +async def get_llm_llmusage(luid): env = request._run_ns async with get_sor_context(env, 'llmage') as sor: recs = await sor.R('llmusage', {'id': luid}) @@ -171,70 +156,38 @@ async def query_task_status(request, upappid, apiname, luid, userid, taskid): exception(f'{e}') raise e llm = llms[0] - lastoutout = get_llmusage_last_output(llmusage) - uapi = UpAppApi(request) - apinames = apiname.split(',') - for apiname in apinames: - status = 'unknown' - changed = None - while status != 'SUCCEEDED': - ns = {'taskid': taskid} - b = d = None - try: - b = await uapi.call(upappid, apiname, userid, params=ns) - if isinstance(b, bytes): - b = b.decode('utf-8') - d = json.loads(b) - changed = DictObject(**{ - 'status': d['status'], - 'output': d - }) - except Exception as e: - exception(f'{e}, {b=}') - changed = { - 'status': 'FAILED', - 'output': {'status': 'FAILED', 'error': str(e)} - } - await add_new_llmusage_output(luid, changed) - return - if changed.status == 'SUCCEEDED': - llmusage.usages = changed.output.usage - """联机不记账 - if llm.ppid: - try: - charging = await llm_charging(sor, - llm.ppid, llmusage) - if charging: - changed.amount = charging.amount - changed.cost = charging.cost - debug(f'{changed=},{charging=}') - else: - changed.amount = cost = 0.0 - except Exception as e: - e1 = Exception(f'{llm.ppid} charging error{e}, {llm.ppid}, {llmusage=}') - exception(f'{e}') - changed.amount = changed.cost = 0 + return llm, llmusage - else: - changed.amount = 0 - changed.cost = 0 - llmusage.amount = changed.amount - llmusage.cost = changed.cost - """ - await add_new_llmusage_output(luid, changed) - if changed.status == 'FAILED': - return - if changed.status == 'SUCCEEDED': - """联机不记账 - if llmusage.accounting_status != 'accounted' \ - and changed.amount > 0.00001: - try: - await llm_accounting(llmusage) - except Exception as e: - debug(f'{changed=} accounting failed,{e=} ') - """ - return +async def query_task_status(request, upappid, apiname, luid, userid, taskid): + uapi = UpAppApi(request) + apinames = apiname.split(',') + llm, llmusage = await get_llm_llmusage(luid) - await asyncio.sleep(llm.query_period or 30) - debug(f'{llm.query_period=} seconds will retry, {changed.status=}') + for apiname in apinames: + while True + lastoutout = get_llmusage_last_output(llmusage) + if lastoutout['status'] in ['FAILED', 'SUCCEEDED'] + return + ns = {'taskid': taskid} + new_output = b = d = None + try: + b = await uapi.call(upappid, apiname, userid, params=ns) + if isinstance(b, bytes): + b = b.decode('utf-8') + new_output = json.loads(b) + except Exception as e: + exception(f'{e}, {b=}') + new_output = { + 'status': 'FAILED', + 'error': f'{b},{e}}' + } + if lastoutout['status'] != new_output['status']: + llmusage.status = new_output['status'] + await append_new_llmoutput(llmusage.id, new_output) + if llmusage.status in ['FAILED', 'SUCCEEDED']: + await modify_llmusage_status(llmusage) + return + + await asyncio.sleep(llm.query_period or 30) + debug(f'{llm.query_period=} seconds will retry, {changed.status=}') diff --git a/llmage/utils.py b/llmage/utils.py index fad6bb4..776c5af 100644 --- a/llmage/utils.py +++ b/llmage/utils.py @@ -13,6 +13,25 @@ from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi from ahserver.serverenv import get_serverenv, ServerEnv from ahserver.filestorage import FileStorage +async def append_new_llmoutput(webpath, output): + fs = FileStorage() + p = fs.realPath(webpath) + if not isinstance(output, str): + output = json.loads(output) + bin = await read_webpath(webpath) + io = json.loads(bin.decode('utf-8')) + io['output'].append(output) + async with aiofiles.open(p, 'wb') as f: + iostr = json.dumps(io, ensure_ascii=False, indent=4) + f.write(iostr.encode('utf-8')) + +async def read_webpath(webpath): + fs = FileStorage() + p = fs.realPath(webpath) + async with aiofiles.open(p,'rb') as f: + bin = f.read() + return bin + async def write_llmio(luid, io_dic): fs = FileStorage() s = json.dumps(io_dic, ensure_ascii=False, indent=4)