diff --git a/llmage/accounting.py b/llmage/accounting.py index 0a018c5..cbd3af8 100644 --- a/llmage/accounting.py +++ b/llmage/accounting.py @@ -1,3 +1,6 @@ +import asyncio +import time +from datetime import datetime from appPublic.log import exception, debug from appPublic.uniqueID import getID from appPublic.dictObject import DictObject @@ -52,17 +55,27 @@ async def checkCustomerBalance(llmid, userorgid): debug(f'{userorgid=} checkCustomerBalance() failed') return False -async def llm_accounting(request, llmusage): - env = request._run_ns +async def llm_accounting(llmusage): + env = ServerEnv() llmid = llmusage.llmid - async with get_sor_context(request._run_ns, 'llmage') as sor: + async with get_sor_context(env, 'llmage') as sor: sql = "select * from llm where id=${llmid}$" recs = await sor.sqlExe(sql, {'llmid': llmusage.llmid}) if len(recs) == 0: + ns = { + 'id': llmusage.id, + 'accounting_status': 'failed' + } + await sor.U('llmusage', ns) e = Exception(f'llm not found({llmid})') exception(f'{e}') raise e if recs[0].ppid is None: + ns = { + 'id': llmusage.id, + 'accounting_status': 'failed' + } + await sor.U('llmusage', ns) e = Exception(f'llm ({llmid}) donot has a pricing_program') exception(f'{e}') raise e @@ -133,3 +146,52 @@ async def llm_accounting(request, llmusage): } await sor.U('llmusage', ns) +async def get_accounting_llmusages(luid=None): + env = ServerEnv() + lus = [] + t = time.time - 20 + dt = datetime.fromtimestamp(t) + tsstr = dt.strftime('%Y-%m-%d %H:%M:%S.') + f'{dt.microsecond // 1000:03d}' + async with get_sor_context(env, 'llmage') as sor: + sql = """select a.*. b.ppid +from llmusage a, llm b +where a.llmid = b.id + and a.status = 'SUCCEEDED' + and a.use_time < ${tsstr}$ + and a.accounting_status='created'""" + ns = {'tsstr': tsstr} + if luid: + sql += " and a.id=${luid}$" + ns['luid'] = luid + recs = await sor.sqlExe(sql, ns) + for r in recs: + if r.usages is None: + io = json.loads(r.ioinfo) + if len(io['output']) == 0: + llmusage.accounting_status = 'failed' + await sor.U('llmusage', {'id': llmusage.id, 'accounting_status': 'failed'}) + continue + r.usages = io['output'][-1]['usage'] + if r.usages is None: + llmusage.accounting_status = 'failed' + await sor.U('llmusage', {'id': llmusage.id, 'accounting_status': 'failed'}) + continue + try: + r = await llm_charging(sor, llmusage.ppid, llmusage) + except Exception as e: + continue + llmusage.amount = r.amount + llmusage.cost = r.cost + await sor.U('llmusage', llmusage.copy()) + lus.append(r) + return lus + + async def backend_accounting(): + env = ServerEnv() + debug(f'backend accounting started ...') + while True: + lus = await get_accounting_llmusages() + for lu in lus: + await llm_accounting(lu) + await asyncio.sleep(0.1) + diff --git a/llmage/asyncinference.py b/llmage/asyncinference.py index fd0cda0..3a00c94 100644 --- a/llmage/asyncinference.py +++ b/llmage/asyncinference.py @@ -226,7 +226,7 @@ async def query_task_status(request, upappid, apiname, luid, userid, taskid): if llmusage.accounting_status != 'accounted' \ and changed.amount > 0.00001: try: - await llm_accounting(request, llmusage) + await llm_accounting(llmusage) except Exception as e: debug(f'{changed=} accounting failed,{e=} ') return diff --git a/llmage/callback.py b/llmage/callback.py index 57dbbec..c09ed68 100644 --- a/llmage/callback.py +++ b/llmage/callback.py @@ -49,6 +49,6 @@ async def asynctask_callbacka(appname, apiname, params_kw) raise e if llmusage: - await llm_accounting(request, llmusage) + await llm_accounting(llmusage) diff --git a/llmage/init.py b/llmage/init.py index 04c5af7..9eab240 100644 --- a/llmage/init.py +++ b/llmage/init.py @@ -1,6 +1,7 @@ from appPublic.base64_to_file import hex2base64 from appPublic.registerfunction import RegisterFunction from ahserver.serverenv import ServerEnv +from ahserver.configuredServer import add_cleanupctx from .keling import keling_token from .jimeng import jimeng_auth_headers from .utils import ( @@ -22,6 +23,7 @@ from .llmclient import ( from .accounting import ( checkCustomerBalance, llm_charging, + backend_accounting, llm_accounting ) @@ -31,6 +33,11 @@ from .asyncinference import ( get_today_asynctask_list ) +async def start_backend(app): + task = asyncio.create_task(abackend_accounting()) + yield + task.cancel() + def load_llmage(): env = ServerEnv() env.llm_query_orders = llm_query_orders @@ -54,3 +61,4 @@ def load_llmage(): env.llm_query_price = llm_query_price rf = RegisterFunction() rf.register('jimeng_auth_headers', jimeng_auth_headers) + add_cleanupctx(start_backend) diff --git a/llmage/llmclient.py b/llmage/llmclient.py index ae1acfc..9ead916 100644 --- a/llmage/llmclient.py +++ b/llmage/llmclient.py @@ -114,7 +114,7 @@ async def uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None) llmusage.accounting_status = 'created' await write_llmusage(llmusage) if llmusage.amount > 0.0001: - await llm_accounting(request, llmusage) + await llm_accounting(llmusage) except Exception as e: exception(f'{e=},{format_exc()}') diff --git a/llmage/syncinference.py b/llmage/syncinference.py index 625569c..737d6f4 100644 --- a/llmage/syncinference.py +++ b/llmage/syncinference.py @@ -79,7 +79,7 @@ async def sync_uapi_request(request, llm, sor, callerid, callerorgid, params_kw= yield b await write_llmusage(llmusage) if llmusage.amount > 0.0001: - await llm_accounting(request, llmusage) + await llm_accounting(llmusage) except Exception as e: exception(f'{e=},{format_exc()}') estr = erase_apikey(e)