diff --git a/llmage/accounting.py b/llmage/accounting.py index d3e955e..d9a4fbc 100644 --- a/llmage/accounting.py +++ b/llmage/accounting.py @@ -7,7 +7,23 @@ from ahserver.serverenv import ServerEnv from accounting.consume import consume_accounting from accounting.getaccount import getCustomerBalance -async def llm_charging(sor, ppid, userid, usage): +async def llm_charging(sor, ppid, llmusage): + env = ServerEnv() + prices = await env.pricing_program_charging(sor, ppid, llmusage.usage) + amount = 0 + cost = 0 + for p in prices: + amount += p.amount + if p.cost: + cost += p.cost + discount = await env.sor_get_customer_discount(sor, + llmusage.ownerid, + llmusage.userorgid) + return DictObject(**{ + 'original_amount': amount, + 'amount': amount * discount, + 'cost': cost + }) async def checkCustomerBalance(llmid, userorgid): env = ServerEnv() @@ -29,12 +45,11 @@ async def checkCustomerBalance(llmid, userorgid): return ret return False -async def llm_accounting(request, llmid, - usage, customerid, userid, orderid=None): +async def llm_accounting(request, llmusage): env = request._run_ns async with get_sor_context(request._run_ns, 'llmage') as sor: sql = "select * from llm where id=${llmid}$" - recs = await sor.sqlExe(sql, {'llmid': llmid}) + recs = await sor.sqlExe(sql, {'llmid': llmusage.llmid}) if len(recs) == 0: e = Exception(f'llm not found({llmid})') exception(f'{e}') @@ -45,29 +60,23 @@ async def llm_accounting(request, llmid, raise e resellerid = recs[0].ownerid providerid = recs[0].providerid - charges = await env.pricing_program_charging(sor, recs[0].ppid, usage) - trans_amount = trans_cost = 0 - for c in charges: - trans_amount += c.amount - trans_cost += c.cost - if trans_amount < 0.00001: - return + trans_amount = llmusage.amount + trans_cost = llmusage.cost biz_date = await env.get_business_date(sor) timestamp = env.timestampstr() - if orderid is None: - orderid = getID() - order = { - "id": orderid, - "customerid": customerid, - "resellerid": resellerid, - "order_date": biz_date, - "order_status": "1", # accounted - "business_op": "PAY", - "amount": trans_amount, - "userid": userid, - "productid": llmid - } - await sor.C('biz_order', order) + orderid = getID() + order = { + "id": orderid, + "customerid": customerid, + "resellerid": resellerid, + "order_date": biz_date, + "order_status": "1", # accounted + "business_op": "PAY", + "amount": trans_amount, + "userid": userid, + "productid": llmid + } + await sor.C('biz_order', order) orderdetail = { "id": getID(), "orderid": orderid, diff --git a/llmage/asyncinference.py b/llmage/asyncinference.py index 4cc9070..24341a9 100644 --- a/llmage/asyncinference.py +++ b/llmage/asyncinference.py @@ -13,8 +13,7 @@ from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64 from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi from ahserver.serverenv import get_serverenv, ServerEnv from ahserver.filestorage import FileStorage -from llmage.accounting import llm_accounting -from .utils immport * +from .accounting import llm_accounting, llm_charging async def get_today_asynctask_list(userid): env = ServerEnv() @@ -147,8 +146,8 @@ async def query_task_status(request, upappid, apinames, luid, userid, taskid): if rzt.status == 'SUCCEEDED': if llm.ppid: try: - chargings = await llm_charginng(sor, - llm.ppid, callerid, usage) + chargings = await llm_charging(sor, + llm.ppid, llmusage) llmusage.amount = chargings.amount llmusage.cost = chargings.cost except Exception as e: diff --git a/llmage/llmclient.py b/llmage/llmclient.py index 0b857d6..5e9fc57 100644 --- a/llmage/llmclient.py +++ b/llmage/llmclient.py @@ -232,7 +232,7 @@ async def uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None) llmusage.status = 'SUCCEEDED' if llm.ppid and callerorgid: try: - chargings = await llm_charginng(sor, llm.ppid, callerid, usage) + chargings = await llm_charging(sor, llm.ppid, llmusage) llmusage.amount = chargings.amount llmusage.cost = chargings.cost except Exception as e: @@ -258,185 +258,6 @@ async def uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None) # await write_llmusage(luid, llm, callerid, None, params_kw, outlines, sor) return -async def sync_uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None): - env = request._run_ns.copy() - if not params_kw: - params_kw = env.params_kw - # callerid = await env.get_user() - # callerorgid = await env.get_userorgid() - uapi = UAPI(request, sor=sor) - userid = await get_owner_userid(sor, llm) - outlines = [] - b = None - d = None - t1 = t2 = t3 = time.time() - luid = getID() - try: - start_timestamp = time.time() - responsed_seconds = None - finish_seconds = None - b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw) - if isinstance(b, bytes): - b = b.decode('utf-8') - d = json.loads(b) - status = d.get('status') - usage = d.get('usage', {}) - if status and status != 'SUCCEEDED': - raise Exception(d['error']) - responsed_seconds = time.time() - start_timestamp - finish_seconds = response_seconds - llmusage = DictObject() - llmusage.id = luid - llmusage.llmid = llm.id - llmusage.use_date = curDateString() - llmusage.use_time = timestampstr() - llmusage.userid = callerid - llmusage.usage = json.dumps(usage) - llmusage.ioinfo = json.dumps({ - "input": params_kw, - "output": d - }) - llmusage.transno = params_kw.transno - llmusage.responsed_seconds = responsed_seconds - llmusage.finish_seconds = finish_seconds - llmusage.status = 'SUCCEEDED' - if llm.ppid: - try: - chargings = await llm_charginng(sor, llm.ppid, callerid, usage) - llmusage.amount = chargings.amount - llmusage.cost = chargings.cost - except Exception as e: - e = Exception(f'{llm.pid} charging error{e}') - exception(f'{e}') - else: - llmusage.amount = 0 - llmusage.cost = 0 - llmusage.userorgid = callerorgid - llmusage.ownerid = llm.orgid - llmusage.accounting_status = 'created' - b = json.dumps(d, ensure_ascii=False) - yield b - await write_llmusage(llmusage) - await llm_accounting(request, llmusage) - except Exception as e: - exception(f'{e=},{format_exc()}') - estr = erase_apikey(e) - ed = {"error": f"ERROR:{estr}", "status": "FAILED" ,"llmusageid": luid} - s = json.dumps(ed) - s = ''.join(s.split('\n')) - outlines.append(ed) - yield f'{s}\n' - -async def async_uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None): - env = request._run_ns.copy() - if not params_kw: - params_kw = env.params_kw - # callerorgid = await env.get_userorgid() - # callerid = await env.get_user() - uapi = UAPI(request, sor=sor) - userid = await get_owner_userid(sor, llm) - b = None - luid = getID() - try: - start_timestamp = time.time() - if llm.callbackurl: - params_kw.callbackurl = llm.callbackurl - b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw) - if isinstance(b, bytes): - b = b.decode('utf-8') - debug(f'task sumbited:{b}') - d = DictObject(**json.loads(b)) - if d.status != 'SUCCEEDED': - e = Exception(f'resp={d} not success') - raise e - responsed_seconds = time.time() - start_timestamp - finish_seconds = response_seconds - llmusage = DictObject() - llmusage.id = luid - llmusage.llmid = llm.id - llmusage.use_date = curDateString() - llmusage.use_time = timestampstr() - llmusage.userid = callerid - llmusage.usage = json.dumps(usage) - llmusage.ioinfo = json.dumps({ - "input": params_kw - }) - llmusage.taskid = d.taskid - llmusage.transno = params_kw.transno - llmusage.responsed_seconds = responsed_seconds - llmusage.finish_seconds = finish_seconds - llmusage.status = 'CREATED' - llmusage.userorgid = callerorgid - llmusage.ownerid = llm.orgid - b = json.dumps(d, ensure_ascii=False) - yield b - await write_llmusage(llmusage) - await llm_accounting(request, llmusage) - if llm.callbackurl: - return - apinames = [ name.strip() for name in llm.query_apiname.split(',') ] - asyncio.create_task(query_task_status(request, llm.upappid, - apinames, luid, userid, d.taskid)) - - except Exception as e: - exception(f'{e=},{format_exc()}') - estr = erase_apikey(e) - ed = {"error": f"ERROR:{estr}", "status": "FAILED"} - s = json.dumps(ed) - s = ''.join(s.split('\n')) - yield f'{s}\n' - return - outlines.append(d) - t2 = time.time() - uapi = UAPI(request, sor=sor) - apinames = [ name.strip() for name in llm.query_apiname.split(',') ] - for apiname in apinames: - while True: - b = None - try: - b = await uapi.call(llm.upappid, apiname, userid, params=d) - except Exception as e: - exception(f'{e=},{format_exc()}') - estr = erase_apikey(e) - ed = {"error": f"ERROR:{estr}", "status": "FAILED" ,"llmusageid": luid} - s = json.dumps(ed) - s = ''.join(s.split('\n')) - outlines.append(ed) - yield f'{s}\n' - await write_llmusage(luid, llm, callerid, None, params_kw, outlines, sor) - return - - if isinstance(b, bytes): - b = b.decode('utf-8') - d = json.loads(b) - rzt = DictObject(**d) - rzt['llmusageid'] = luid - b = json.dumps(rzt, ensure_ascii=False) - b = ''.join(b.split('\n')) - debug(f'response line = {b}') - yield b + '\n' - if not rzt.status or rzt.status == 'FAILED': - debug(f'{b=} {rzt=} has not status, return error') - outlines.append(rzt) - await write_llmusage(luid, llm, callerid, None, params_kw, outlines, sor) - return - if rzt.status == 'SUCCEEDED': - await asyncio.sleep(1) - outlines.append(rzt) - usage = rzt.get('usage', {}) - t3 = time.time() - usage['response_time'] = t2 - t1 - usage['finish_time'] = t3 -t1 - await write_llmusage(luid, llm, callerid, usage, params_kw, outlines, sor) - if llm.ppid and callerorgid != llm.ownerid: - debug(f'{usage=},{llm.ownerid=},{callerorgid=}') - await llm_accounting(request, llm.id, usage, callerorgid, callerid) - - d = rzt - break - period = llm.query_period or 30 - await asyncio.sleep(period) - def b64media2url(request, mediafile): env = request._run_ns entire_url = env.entire_url diff --git a/llmage/syncinference.py b/llmage/syncinference.py index d8cadf2..0dea879 100644 --- a/llmage/syncinference.py +++ b/llmage/syncinference.py @@ -13,7 +13,7 @@ from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64 from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi from ahserver.serverenv import get_serverenv, ServerEnv from ahserver.filestorage import FileStorage -from llmage.accounting import llm_accounting, llm_charginng +from llmage.accounting import llm_accounting, llm_charging async def sync_uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None): env = request._run_ns.copy() @@ -58,7 +58,7 @@ async def sync_uapi_request(request, llm, sor, callerid, callerorgid, params_kw= llmusage.status = 'SUCCEEDED' if llm.ppid: try: - chargings = await llm_charginng(sor, llm.ppid, callerid, usage) + chargings = await llm_charging(sor, llm.ppid, llmusage) llmusage.amount = chargings.amount llmusage.cost = chargings.cost except Exception as e: