diff --git a/llmage/accounting.py b/llmage/accounting.py index 15b6401..d3e955e 100644 --- a/llmage/accounting.py +++ b/llmage/accounting.py @@ -7,6 +7,8 @@ from ahserver.serverenv import ServerEnv from accounting.consume import consume_accounting from accounting.getaccount import getCustomerBalance +async def llm_charging(sor, ppid, userid, usage): + async def checkCustomerBalance(llmid, userorgid): env = ServerEnv() async with get_sor_context(env, 'llmage') as sor: diff --git a/llmage/llmclient.py b/llmage/llmclient.py index ad1f758..c46feb5 100644 --- a/llmage/llmclient.py +++ b/llmage/llmclient.py @@ -147,22 +147,10 @@ where a.upappid=b.id i = randint(0, len(recs)-1) return recs[i].userid -async def write_llmusage(id, llm, userid, usage, params_kw, outdata, sor): - d = { - "id": id, - "llmid": llm.id, - "use_date": curDateString(), - "use_time": timestampstr(), - "userid": userid, - "transno": params_kw.transno, - "evalvalue": 0, - "usages": json.dumps(usage), - "ioinfo": json.dumps({ - "input": params_kw, - "output": outdata - }, ensure_ascii=False) - } - await sor.C('llmusage', d) +async def write_llmusage(llmusage): + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + await sor.C('llmusage', d) async def uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None): env = request._run_ns.copy() @@ -179,16 +167,13 @@ async def uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None) start_timestamp = time.time() responsed_seconds = None finish_seconds = None - t2 = start_timestamp - t3 = start_timestamp first = True usage = None async for l in uapi.stream_linify(llm.upappid, llm.apiname, userid, params=params_kw): if first: first = False - t2 = time.time() - responsed_seconds = t2 - start_timestamp + responsed_seconds = time.time() - start_timestamp if isinstance(l, bytes): l = l.decode('utf-8') if l[-1] == '\n': @@ -216,8 +201,7 @@ async def uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None) yield json.dumps(d) + '\n' if usage is None: error(f'{llm=} response has not usage') - t3 = time.time() - finish_seconds = t3 - start_timestamp + finish_seconds = time.time() - start_timestamp if responsed_seconds is None: responsed_seconds = finish_seconds if not usage.get('completion_tokens'): @@ -245,16 +229,22 @@ async def uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None) llmusage.finish_seconds = finish_seconds llmusage.status = 'SUCCEEDED' if llm.ppid and callerorgid: - chargings = await llm_query_price(llm.ppid, usage) + try: + chargings = await llm_charginng(sor, llm.ppid, callerid, usage) + llmusage.amount = chargings.amount + llmusage.cost = chargings.cost + except Exception as e: + e = Exception(f'{llm.pid} charging error{e}') + exception(f'{e}') else: llmusage.amount = 0 llmusage.cost = 0 llmusage.userorgid = callerorgid llmusage.ownerid = llm.orgid - await sor.C('llmusage', llmusage) - if llm.ppid and callerorgid != llm.ownerid: - debug(f'{usage=},{llm.ownerid=},{callerorgid=}') - await llm_accounting(request, llm.id, usage, callerorgid, callerid) + llmusage.accounting_status = 'created' + await write_llmusage(llmusage) + await llm_accounting(request, llmusage) + except Exception as e: exception(f'{e=},{format_exc()}') estr = erase_apikey(e) @@ -263,7 +253,7 @@ async def uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None) s = ''.join(s.split('\n')) outlines.append(ed) yield f'{s}\n' - await write_llmusage(luid, llm, callerid, None, params_kw, outlines, sor) + # await write_llmusage(luid, llm, callerid, None, params_kw, outlines, sor) return async def sync_uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None): @@ -280,14 +270,52 @@ async def sync_uapi_request(request, llm, sor, callerid, callerorgid, params_kw= t1 = t2 = t3 = time.time() luid = getID() try: - + start_timestamp = time.time() + responsed_seconds = None + finish_seconds = None b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw) if isinstance(b, bytes): b = b.decode('utf-8') d = json.loads(b) status = d.get('status') + usage = d.get('usage', {}) if status and status != 'SUCCEEDED': raise Exception(d['error']) + responsed_seconds = time.time() - start_timestamp + finish_seconds = response_seconds + llmusage = DictObject() + llmusage.id = luid + llmusage.llmid = llm.id + llmusage.use_date = curDateString() + llmusage.use_time = timestampstr() + llmusage.userid = callerid + llmusage.usage = json.dumps(usage) + llmusage.ioinfo = json.dumps({ + "input": params_kw, + "output": d + }) + llmusage.transno = params_kw.transno + llmusage.responsed_seconds = responsed_seconds + llmusage.finish_seconds = finish_seconds + llmusage.status = 'SUCCEEDED' + if llm.ppid: + try: + chargings = await llm_charginng(sor, llm.ppid, callerid, usage) + llmusage.amount = chargings.amount + llmusage.cost = chargings.cost + except Exception as e: + e = Exception(f'{llm.pid} charging error{e}') + exception(f'{e}') + else: + llmusage.amount = 0 + llmusage.cost = 0 + llmusage.userorgid = callerorgid + llmusage.ownerid = llm.orgid + llmusage.accounting_status = 'created' + b = json.dumps(d, ensure_ascii=False) + yield b + await write_llmusage(llmusage) + await llm_accounting(request, llmusage) except Exception as e: exception(f'{e=},{format_exc()}') estr = erase_apikey(e) @@ -296,20 +324,6 @@ async def sync_uapi_request(request, llm, sor, callerid, callerorgid, params_kw= s = ''.join(s.split('\n')) outlines.append(ed) yield f'{s}\n' - await write_llmusage(luid, llm, callerid, None, params_kw, outlines, sor) - return - d['llmusageid'] = luid - outlines.append(d) - t2 = t3 = time.time() - usage = d.get('usage', {}) - usage['response_time'] = t2 - t1 - usage['finish_time'] = t3 - t1 - b = json.dumps(d, ensure_ascii=False) - yield b - await write_llmusage(luid, llm, callerid, usage, params_kw, outlines, sor) - if llm.ppid and callerorgid != llm.ownerid: - debug(f'{usage=},{llm.ownerid=},{callerorgid=}') - await llm_accounting(request, llm.id, usage, callerorgid, callerid) async def async_uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None): env = request._run_ns.copy() @@ -319,26 +333,57 @@ async def async_uapi_request(request, llm, sor, callerid, callerorgid, params_kw # callerid = await env.get_user() uapi = UAPI(request, sor=sor) userid = await get_owner_userid(sor, llm) - outlines = [] b = None - t1 = t2 = t3 = time.time() luid = getID() try: + start_timestamp = time.time() + if llm.callbackurl: + params_kw.callbackurl = llm.callbackurl b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw) + if isinstance(b, bytes): + b = b.decode('utf-8') + debug(f'task sumbited:{b}') + d = DictObject(**json.loads(b)) + if d.status != 'SUCCEEDED': + e = Exception(f'resp={d} not success') + raise e + responsed_seconds = time.time() - start_timestamp + finish_seconds = response_seconds + llmusage = DictObject() + llmusage.id = luid + llmusage.llmid = llm.id + llmusage.use_date = curDateString() + llmusage.use_time = timestampstr() + llmusage.userid = callerid + llmusage.usage = json.dumps(usage) + llmusage.ioinfo = json.dumps({ + "input": params_kw + }) + llmusage.taskid = d.taskid + llmusage.transno = params_kw.transno + llmusage.responsed_seconds = responsed_seconds + llmusage.finish_seconds = finish_seconds + llmusage.status = 'CREATED' + llmusage.userorgid = callerorgid + llmusage.ownerid = llm.orgid + b = json.dumps(d, ensure_ascii=False) + yield b + await write_llmusage(llmusage) + await llm_accounting(request, llmusage) + if llm.callbackurl: + return + apinames = [ name.strip() for name in llm.query_apiname.split(',') ] + asyncio.create_task(query_task_status(request, llm.upappid, + apinames, luid, userid, d.taskid)) + except Exception as e: exception(f'{e=},{format_exc()}') estr = erase_apikey(e) - ed = {"error": f"ERROR:{estr}", "status": "FAILED" ,"llmusageid": luid} + ed = {"error": f"ERROR:{estr}", "status": "FAILED"} s = json.dumps(ed) s = ''.join(s.split('\n')) - outlines.append(ed) yield f'{s}\n' - await write_llmusage(luid, llm, callerid, None, params_kw, outlines, sor) return - if isinstance(b, bytes): - b = b.decode('utf-8') - debug(f'task sumbited:{b}') - d = DictObject(**json.loads(b)) outlines.append(d) t2 = time.time() uapi = UAPI(request, sor=sor) @@ -470,3 +515,30 @@ async def llm_query_price(llmid, config_data): raise e prices = await env.pricing_program_charging(sor, llm.ppid, config_data) return prices + +async def query_task_status(request, upappid, apinames, luid, userid, taskid): + async with get_sor_context(env, 'llmage') as sor: + uapi = UAPI(request, sor) + for apiname in apinames: + try: + ns = {'taskid': taskid} + b = await uapi.call(upappid, apiname, userid, params=ns) + if isinstance(b, bytes): + b = b.decode('utf-8') + d = json.loads(b) + rzt = DictObject(**d) + if rzt.status == 'FAILED': + except Exception as e: + exception(f'{e=},{format_exc()}') + estr = erase_apikey(e) + recs = sor.R('llmusage', {'id': luid}) + ed = {"error": f"ERROR:{estr}", "status": "FAILED"} + s = json.dumps(ed) + s = ''.join(s.split('\n')) + outlines.append(ed) + yield f'{s}\n' + await write_llmusage(luid, llm, callerid, None, params_kw, outlines, sor) + return + + rzt['llmusageid'] = luid +