From 93ec47f198c09b35bad7dfc1a0933dca2d12641f Mon Sep 17 00:00:00 2001 From: yumoqing Date: Fri, 29 May 2026 17:41:35 +0800 Subject: [PATCH] refactor: add get_llmage_llm() for non-API-call llm data access, replace get_llm() in accounting and pricing --- build/lib/llmage/__init__.py | 0 build/lib/llmage/accounting.py | 369 +++++++++++++++++++++++++++++ build/lib/llmage/asyncinference.py | 203 ++++++++++++++++ build/lib/llmage/callback.py | 33 +++ build/lib/llmage/init.py | 68 ++++++ build/lib/llmage/jimeng.py | 98 ++++++++ build/lib/llmage/keling.py | 18 ++ build/lib/llmage/llmclient.py | 148 ++++++++++++ build/lib/llmage/messages.py | 69 ++++++ build/lib/llmage/syncinference.py | 98 ++++++++ build/lib/llmage/utils.py | 348 +++++++++++++++++++++++++++ llmage/accounting.py | 59 +++-- llmage/init.py | 4 +- llmage/utils.py | 35 ++- 14 files changed, 1517 insertions(+), 33 deletions(-) create mode 100644 build/lib/llmage/__init__.py create mode 100644 build/lib/llmage/accounting.py create mode 100644 build/lib/llmage/asyncinference.py create mode 100644 build/lib/llmage/callback.py create mode 100644 build/lib/llmage/init.py create mode 100644 build/lib/llmage/jimeng.py create mode 100644 build/lib/llmage/keling.py create mode 100644 build/lib/llmage/llmclient.py create mode 100644 build/lib/llmage/messages.py create mode 100644 build/lib/llmage/syncinference.py create mode 100644 build/lib/llmage/utils.py diff --git a/build/lib/llmage/__init__.py b/build/lib/llmage/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/build/lib/llmage/accounting.py b/build/lib/llmage/accounting.py new file mode 100644 index 0000000..c3a0fd6 --- /dev/null +++ b/build/lib/llmage/accounting.py @@ -0,0 +1,369 @@ +import asyncio +import json +import time +from datetime import datetime +from appPublic.log import exception, debug +from appPublic.uniqueID import getID +from appPublic.dictObject import DictObject +from sqlor.dbpools import get_sor_context +from ahserver.serverenv import ServerEnv +from accounting.consume import consume_accounting +from accounting.getaccount import getCustomerBalance +from .utils import * + +async def llm_charging(ppid, llmusage): + env = ServerEnv() + usages = llmusage.usages + if isinstance(usages, str): + usages = json.loads(usages) + prices = await env.buffered_charging(ppid, usages) + if prices is None: + e = Exception(f'{ppid=}, {usages=}{llmusage.id=} env.buffered_charging() return None') + exception(f'{e}') + raise e + return None + amount = 0 + cost = 0 + for p in prices: + amount += p.amount + if p.cost: + cost += p.cost + discount = await env.get_customer_discount(llmusage.ownerid, + llmusage.userorgid) + return DictObject(**{ + 'original_amount': amount, + 'amount': amount * discount, + 'cost': cost + }) + +async def checkCustomerBalance(llmid, userid, userorgid, catelogid=None): + if llmid is None: + debug(f'checkCustomerBalance(): llmid is None') + return False + env = ServerEnv() + llm = await get_llm(llmid) + if llm.ownerid == userorgid: + debug(f'self orgid user') + return True + balance = 0.00 + tpac = await get_user_tpac(userid) + if tpac: + debug(f'{tpac=}, get tpac balance') + balance = await get_tpac_balance(tpac, userid) + else: + debug(f'{tpac=}, get local balance') + async with get_sor_context(env, 'accounting') as sor: + balance = await getCustomerBalance(sor, userorgid) + bal = 0 if balance is None else balance + if llm.min_balance is None: + llm.min_balance = 0.00 + ret = llm.ppid and llm.min_balance < bal + debug(f'{llm.ppid=}, {llm.min_balance=}, {bal=}') + return ret + +async def llm_accounting(llmusage): + env = ServerEnv() + llmid = llmusage.llmid + async with get_sor_context(env, 'llmage') as sor: + sql = """select a.*, b.ppid from llm a, llm_api_map b +where a.id=${llmid}$ + and a.id = b.llmid + and b.isdefaultcatelog = '1' +""" + recs = await sor.sqlExe(sql, {'llmid': llmusage.llmid}) + if len(recs) == 0: + ns = { + 'id': llmusage.id, + 'accounting_status': 'failed' + } + await sor.U('llmusage', ns) + e = Exception(f'llm not found({llmid})') + exception(f'{e}') + raise e + if recs[0].ppid is None: + ns = { + 'id': llmusage.id, + 'accounting_status': 'failed' + } + await sor.U('llmusage', ns) + e = Exception(f'llm ({llmid}) donot has a pricing_program') + exception(f'{e}') + raise e + customerid = llmusage.userorgid + userid = llmusage.userid + resellerid = recs[0].ownerid + providerid = recs[0].providerid + trans_amount = llmusage.amount + trans_cost = llmusage.cost + biz_date = await env.get_business_date(sor) + timestamp = env.timestampstr() + orderid = getID() + order = { + "id": orderid, + "customerid": customerid, + "resellerid": resellerid, + "order_date": biz_date, + "order_status": "1", # accounted + "business_op": "PAY", + "amount": trans_amount, + "userid": userid, + "productid": llmid + } + await sor.C('biz_order', order) + orderdetail = { + "id": getID(), + "orderid": orderid, + "productid": llmid, + "product_cnt": 1, + "trans_amount": trans_amount + } + await sor.C('biz_orderdetail', orderdetail) + ais = [] + if customerid != resellerid: + ai0 = DictObject() + ai0.action = 'PAY' + ai0.customerid = customerid + ai0.resellerid = resellerid + ai0.providerid = providerid + ai0.biz_date = biz_date + ai0.timestamp = timestamp + ai0.productid = llmid + ai0.transamt = trans_amount + ai0.variable = { + "交易金额": trans_amount, + "交易手续费": 0 + } + ais.append(ai0) + ai1 = DictObject() + ai1.action = 'PAY*' + ai1.customerid = customerid + ai1.resellerid = resellerid + ai1.providerid = providerid + ai1.biz_date = biz_date + ai1.timestamp = timestamp + ai1.providerid = providerid + ai1.productid = llmid + ai1.transamt = trans_cost + ai1.variable = { + "采购成本": trans_cost + } + ais.append(ai1) + await consume_accounting(sor, orderid, ais) + llmusage.accounting_status = 'accounted' + ns = { + 'id': llmusage.id, + 'accounting_status': 'accounted' + } + await sor.U('llmusage', ns) + +async def get_accounting_llmusages(luid=None): + env = ServerEnv() + lus = [] + t = time.time() + dt = datetime.fromtimestamp(t) + tsstr = dt.strftime('%Y-%m-%d %H:%M:%S.') + f'{dt.microsecond // 1000:03d}' + async with get_sor_context(env, 'llmage') as sor: + sql = """select a.*, c.ppid +from llmusage a, llm b, llm_api_map c +where a.llmid = b.id + and a.llmid = c.llmid + and c.isdefaultcatelog = '1' + and a.status = 'SUCCEEDED' + and a.use_time < ${tsstr}$ + and a.accounting_status='created'""" + ns = {'tsstr': tsstr} + if luid: + sql += " and a.id=${luid}$" + ns['luid'] = luid + recs = await sor.sqlExe(sql, ns) + # debug(f'{sql=}, {ns=}, {len(recs)=}') + for r in recs: + if r.usages is None: + try: + output = await get_lastoutput(r.ioinfo) + except Exception as e: + continue + r.usages = output.get('usage') + if r.usages is None: + debug(f'{r.usages=} is None, accoiunting failed') + await llm_accoung_failed(r.id, reason='usages is None') + continue + d = None + try: + debug(f'{r.ppid=}, {r.usages=} {r.id=}') + d = await llm_charging(r.ppid, r) + + except Exception as e: + exception(f'{r.ppid=}, {r.usages=} llm_charging() failed,{e}') + await llm_accoung_failed(r.id, reason=f'llm_charging failed: {e}') + continue + r.amount = d.amount + r.cost = d.cost + ns = { + 'id': r.id, + 'amount': r.amount, + 'cost': r.cost, + 'usage': json.dumps(r.usage, ensure_ascii=False, indent=4) + } + await sor.U('llmusage', ns) + lus.append(r) + return lus + +async def llm_accoung_failed(luid, reason=None): + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + await sor.U('llmusage', { + 'id': luid, + 'accounting_status': 'failed' + }) + # Also record in the failed accounting table for tracking + recs = await sor.R('llmusage', {'id': luid}) + if recs: + r = recs[0] + failed_id = getID() + failed_rec = { + 'id': failed_id, + 'llmusageid': luid, + 'llmid': r.llmid, + 'userid': r.userid, + 'userorgid': r.userorgid, + 'use_date': r.use_date, + 'use_time': r.use_time, + 'amount': r.amount, + 'cost': r.cost, + 'failed_reason': reason or 'accounting failed', + 'failed_time': env.timestampstr(), + 'retry_count': 0, + 'handled': '0' + } + await sor.C('llmusage_accounting_failed', failed_rec) + + +async def backup_accounted_llmusage(): + """Backup yesterday's accounted records to history table and remove from llmusage.""" + env = ServerEnv() + yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') + ts = env.timestampstr() + batched = 0 + async with get_sor_context(env, 'llmage') as sor: + # Select yesterday's accounted records + sql = """select * from llmusage +where accounting_status='accounted' + and use_date < ${yesterday}$""" + recs = await sor.sqlExe(sql, {'yesterday': yesterday}) + if not recs: + debug(f'backup_accounted_llmusage: no records to backup for use_date < {yesterday}') + return 0 + debug(f'backup_accounted_llmusage: {len(recs)} records to backup') + for r in recs: + history_rec = { + 'id': r.id, + 'llmid': r.llmid, + 'use_date': r.use_date, + 'use_time': r.use_time, + 'userid': r.userid, + 'usages': r.usages, + 'ioinfo': r.ioinfo, + 'transno': r.transno, + 'responsed_seconds': r.responsed_seconds, + 'finish_seconds': r.finish_seconds, + 'status': r.status, + 'taskid': r.taskid, + 'amount': r.amount, + 'cost': r.cost, + 'userorgid': r.userorgid, + 'ownerid': r.ownerid, + 'accounting_status': r.accounting_status, + 'backup_time': ts + } + await sor.C('llmusage_history', history_rec) + # Delete from main table + await sor.D('llmusage', {'id': r.id}) + batched += 1 + debug(f'backup_accounted_llmusage: backed up {batched} records') + return batched + + +async def get_failed_accounting_records(filters=None, page=1, page_size=50): + """Search failed accounting records with optional filters. + + filters: dict with optional keys: + - userorgid: filter by user organization + - llmid: filter by model ID + - handled: '0' or '1' + - start_date: filter use_date >= start_date + - end_date: filter use_date <= end_date + """ + async with get_sor_context(ServerEnv(), 'llmage') as sor: + conditions = [] + ns = {} + if filters: + if filters.get('userorgid'): + conditions.append("userorgid=${userorgid}$") + ns['userorgid'] = filters['userorgid'] + if filters.get('llmid'): + conditions.append("llmid=${llmid}$") + ns['llmid'] = filters['llmid'] + if filters.get('handled') is not None: + conditions.append("handled=${handled}$") + ns['handled'] = filters['handled'] + if filters.get('start_date'): + conditions.append("use_date>=${start_date}$") + ns['start_date'] = filters['start_date'] + if filters.get('end_date'): + conditions.append("use_date<=${end_date}$") + ns['end_date'] = filters['end_date'] + where = "" + if conditions: + where = "where " + " and ".join(conditions) + # Count total + count_sql = f"select count(*) as cnt from llmusage_accounting_failed {where}" + count_recs = await sor.sqlExe(count_sql, ns) + total = count_recs[0].cnt if count_recs else 0 + # Query with pagination + offset = (page - 1) * page_size + query_sql = f"""select * from llmusage_accounting_failed {where} +order by failed_time desc limit {page_size} offset {offset}""" + recs = await sor.sqlExe(query_sql, ns) + return { + 'total': total, + 'page': page, + 'page_size': page_size, + 'records': recs + } + +async def backend_accounting(): + env = ServerEnv() + debug(f'backend accounting started ...') + backup_counter = 0 + while True: + try: + lus = await get_accounting_llmusages() + except Exception as e: + exception(f'{e}') + lus = [] + # debug(f'{len(lus)=} need to accounting........') + for lu in lus: + try: + tpac = await get_user_tpac(lu.userid) + if tpac: + debug(f'{lu.id=},{lu.userid=}, {tpac=}, go tpac') + await tpac_accounting(tpac, lu.userid, lu.llmid, lu.amount, lu.usages, lu.id) + else: + debug(f'{lu.id=},{lu.userid=}, {tpac=}, go local') + await llm_accounting(lu) + except Exception as e: + exception(f'{e}, {lu.id=}') + await llm_accoung_failed(lu.id, reason=str(e)) + + # Run backup every 100 iterations (roughly every ~1000 seconds) + backup_counter += 1 + if backup_counter >= 100: + backup_counter = 0 + try: + await backup_accounted_llmusage() + except Exception as e: + exception(f'backup_accounted_llmusage failed: {e}') + + await asyncio.sleep(10) + diff --git a/build/lib/llmage/asyncinference.py b/build/lib/llmage/asyncinference.py new file mode 100644 index 0000000..c21044d --- /dev/null +++ b/build/lib/llmage/asyncinference.py @@ -0,0 +1,203 @@ +import json +import time +import asyncio +from random import randint +from functools import partial +from traceback import format_exc +from sqlor.dbpools import DBPools, get_sor_context +from appPublic.log import debug, exception, error, critical +from appPublic.uniqueID import getID +from appPublic.dictObject import DictObject +from appPublic.timeUtils import curDateString, timestampstr, timestampAdd +from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64 +from ahserver.serverenv import get_serverenv, ServerEnv +from ahserver.filestorage import FileStorage +from .accounting import llm_accounting, llm_charging +from .utils import * + +async def get_today_asynctask_list(userid): + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + today = await env.get_business_date(sor) + sql = '''select * from llmusage +where userid=${userid}$ + and use_date = ${date}$''' + recs = await sor.sqlExe(sql, { + 'date': today, + 'userid': userid + }) + return recs + return [] + +async def get_asynctask_status(request, taskid): + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + recs = await sor.R('llmusage', {'taskid': taskid}) + if recs: + r = recs[0] + output = await get_lastoutput(r.ioinfo) + t = timestampAdd(r.use_time, 600) + now = time.time() + if r.status not in ['UNKNOWN', 'FAILED', 'SUCCEEDED'] and now > t: + asyncio.create_task(query_task_status(request, r.id)) + return output + return { + 'taskid': taskid, + 'status': 'FAILED', + 'error': f'taskid={taskid} not exist' + } + return { + 'taskid': taskid, + 'status': 'FAILED', + 'error': f'system error' + } + +async def async_uapi_request(request, llm, + callerid, callerorgid, params_kw=None): + env = request._run_ns.copy() + if not params_kw: + params_kw = env.params_kw + # callerorgid = await env.get_userorgid() + # callerid = await env.get_user() + uapi = env.UpAppApi(request) + userid = await env.uapi_data.get_calluserid(llm.upappid, orgid=llm.ownerid) + b = None + luid = getID() + try: + start_timestamp = time.time() + if llm.callbackurl: + params_kw.callbackurl = llm.callbackurl + + b = None + try: + b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw) + except Exception as e: + estr = erase_apikey(e) + ed = {"error": f"ERROR:{estr}", "status": "FAILED"} + exception(f'{ed}') + yield f'{ed}\n' + return + if isinstance(b, bytes): + b = b.decode('utf-8') + debug(f'task submited:{b}') + d = DictObject(**json.loads(b)) + responsed_seconds = time.time() - start_timestamp + finish_seconds = responsed_seconds + llmusage = DictObject() + llmusage.id = luid + llmusage.llmid = llm.id + llmusage.use_date = curDateString() + llmusage.use_time = timestampstr() + llmusage.userid = callerid + ioinfo = { + "input": params_kw, + 'output': [d] + } + webpath = await write_llmio(llmusage.id, ioinfo) + llmusage.ioinfo = webpath + llmusage.taskid = d.taskid + llmusage.transno = params_kw.transno + llmusage.responsed_seconds = responsed_seconds + llmusage.finish_seconds = finish_seconds + llmusage.status = d.status + llmusage.userorgid = callerorgid + llmusage.ownerid = llm.ownerid + llmusage.accounting_status = 'created' + b = json.dumps(d, ensure_ascii=False) + yield b + await write_llmusage(llmusage) + # if llm.callbackurl: + # return + if d.status == 'FAILED': + e = Exception(f'resp={d} FFAILED') + return + asyncio.create_task(query_task_status(request, luid)) + + except Exception as e: + ed = {"error": f"ERROR:{e}", "status": "FAILED"} + s = json.dumps(ed, ensure_ascii=False) + s = ''.join(s.split('\n')) + exception(s) + yield f'{s}\n' + return + +async def modify_llmusage(ns): + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + await sor.U('llmusage', ns.copy()) + +async def get_llm_llmusage(luid): + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + recs = await sor.R('llmusage', {'id': luid}) + if len(recs) == 0: + e = Exception(f'{luid=} is not found in llmusage') + exception(f'{e}') + raise e + llmusage = recs[0] + if llmusage.status == 'UNKNOWN': + return + if llmusage.status == 'SUCCEEDED': + return + if llmusage.status == 'FAILED': + return + llms = await sor.R('llm', {'id': llmusage.llmid}) + if len(llms) == 0: + e = Exception(f'{llmusage.llmid=} not found in llm') + exception(f'{e}') + raise e + llm = llms[0] + return llm, llmusage + +async def query_task_status(request, luid, onetime=False): + env = ServerEnv() + uapi = env.UpAppApi(request) + llm, llmusage = await get_llm_llmusage(luid) + userid = await env.uapi_data.get_calluserid(llm.upappid, orgid=llm.ownerid) + taskid = llmusage.taskid + upappid = llm.upappid + apinames = llm.query_apiname.split(',') + + for apiname in apinames: + while True: + lastoutout = await get_lastoutput(llmusage.ioinfo) + if lastoutout['status'] in ['UNKNOWN', 'FAILED', 'SUCCEEDED']: + critical(f"{lastoutout['status']=}") + return + ns = {'taskid': taskid} + new_output = b = d = None + try: + b = await uapi.call(upappid, apiname, userid, params=ns) + if isinstance(b, bytes): + b = b.decode('utf-8') + new_output = json.loads(b) + except Exception as e: + exception(f'{e}, {b=}') + new_output = { + 'status': 'FAILED', + 'error': f'{b},{e}' + } + if not new_output.get('status'): + e = Exception(f"{new_output=} {upappid=}, {apiname=} has not status field") + critical(f'{e}') + raise e + if lastoutout['status'] != new_output.get('status'): + llmusage.status = new_output['status'] + ns = { + 'id': llmusage.id, + 'status': llmusage.status + } + if 'usage' in new_output.keys(): + ns['usages'] = json.dumps(new_output['usage']) + await append_new_llmoutput(llmusage.ioinfo, new_output) + await modify_llmusage(ns) + if llmusage.status in ['UNKNOWN', 'FAILED', 'SUCCEEDED']: + critical(f'finished .. {llmusage.status=}') + return + + if onetime: + critical(f'onetime is true, returned') + return + await asyncio.sleep(llm.query_period or 30) + critical(f'{llm.query_period=} seconds will retry, {new_output["status"]=}') + diff --git a/build/lib/llmage/callback.py b/build/lib/llmage/callback.py new file mode 100644 index 0000000..afef55e --- /dev/null +++ b/build/lib/llmage/callback.py @@ -0,0 +1,33 @@ +from ahserver.serverenv import ServerEnv +from appPublic.dictObject import DictObject +from sqlor.dbpools import get_sor_context + +async def asynctask_callback(appname, apiname, params_kw) + env = ServerEnv() + llmusage = None + async with get_sor_context(env, 'llmage') as sor: + uapi = await env.sor_get_uapi_by_appname_apiname(appname, apiname) + try: + dstr = await env.tmpl_engine.renders(uapi.response, params_kw) + d = DictObject(**json.loads(dstr)) + llmus = await sor.R('llmusage', {'taskid': d.taskid}) + if len(llmus) == 0: + e = Exception(f'{d=}, {taskid=} not found') + exception(f'{e}') + raise e + llmusage = llmus[0] + io = json.loads(llmusage.ioinfo) + out = io.get('output') + out.append(d) + llmusage.ioinfo = json.dumps(io, ensure_ascii=False) + llmusage.status = d.status + await sor.U('llmusage', llmusage) + + except Exception as e: + e = Exception(f'{uapi.response=}, {params_kw=} render error') + exception(f'{e}') + raise e + + if llmusage: + await llm_accounting(llmusage) + diff --git a/build/lib/llmage/init.py b/build/lib/llmage/init.py new file mode 100644 index 0000000..e0f3080 --- /dev/null +++ b/build/lib/llmage/init.py @@ -0,0 +1,68 @@ +import asyncio +from appPublic.registerfunction import RegisterFunction +from sqlor.dbpools import DBPools +from ahserver.serverenv import ServerEnv +from appPublic.log import debug +from .keling import keling_token +from .jimeng import jimeng_auth_headers +from .utils import ( + llm_query_orders, + read_webpath, + llm_query_price, + get_llm_by_model, + get_llms_by_catelog, + get_llms_sort_by_provider, + get_llmcatelogs, + get_llms_by_catelog_to_customer, + get_llmproviders, + get_llm, +) + +from .llmclient import ( + inference_generator, + inference +) +from .accounting import ( + checkCustomerBalance, + llm_charging, + get_accounting_llmusages, + backend_accounting, + llm_accounting, + backup_accounted_llmusage, + get_failed_accounting_records, + llm_accoung_failed +) + +from .asyncinference import ( + get_asynctask_status, + query_task_status, + get_today_asynctask_list +) + +def load_llmage(): + env = ServerEnv() + env.llm_query_orders = llm_query_orders + env.read_webpath = read_webpath + env.get_llm_by_model = get_llm_by_model + env.llm_charging = llm_charging + env.get_accounting_llmusages = get_accounting_llmusages + env.llm_accounting = llm_accounting + env.get_today_asynctask_list = get_today_asynctask_list + env.get_asynctask_status = get_asynctask_status + env.query_task_status = query_task_status + env.get_llm = get_llm + env.inference = inference + env.inference_generator = inference_generator + env.get_llms_by_catelog = get_llms_by_catelog + env.get_llmcatelogs = get_llmcatelogs + env.checkCustomerBalance = checkCustomerBalance + env.get_llmproviders = get_llmproviders + env.get_llms_sort_by_provider = get_llms_sort_by_provider + env.keling_token = keling_token + env.llm_query_price = llm_query_price + env.get_llms_by_catelog_to_customer = get_llms_by_catelog_to_customer + env.backup_accounted_llmusage = backup_accounted_llmusage + env.get_failed_accounting_records = get_failed_accounting_records + rf = RegisterFunction() + rf.register('jimeng_auth_headers', jimeng_auth_headers) + diff --git a/build/lib/llmage/jimeng.py b/build/lib/llmage/jimeng.py new file mode 100644 index 0000000..31ef9cc --- /dev/null +++ b/build/lib/llmage/jimeng.py @@ -0,0 +1,98 @@ +import hashlib +import datetime +from datetime import timezone +import hmac +import json +from urllib.parse import quote + +Service = "visual" +Version = "2022-08-31" +Region = "cn-north-1" +Host = "visual.volcengineapi.com" +ContentType = "application/json" + +def utc_now(): + try: + from datetime import timezone + return datetime.datetime.now(timezone.utc) + except ImportError: + class UTC(datetime.tzinfo): + def utcoffset(self, dt): + return datetime.timedelta(0) + def tzname(self, dt): + return "UTC" + def dst(self, dt): + return datetime.timedelta(0) + return datetime.datetime.now(UTC()) + +def jm_timestamp(): + dt = utc_new() + return dt.strftime("%Y%m%dT%H%M%SZ") + +# sha256 非对称加密 +def hmac_sha256(key: bytes, content: str): + return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest() + +# sha256 hash算法 +def hash_sha256(content: str): + return hashlib.sha256(content.encode("utf-8")).hexdigest() + +def jimeng_auth_headers(opts): + apikey = opts.get('apikey') + secretkey = opts.get('secretkey') + path = opts.get('path') + method = opts.get('method') + params = opts.get('params') + body = opts.get('body') + headers = opts.get('headers') + content_type = headers.get('Content-Type') + x_date = jm_timestamp() + short_x_date = DT[:8] + credential = { + "access_key_id": apikey, + "secret_access_key": secretkey, + "service": Service, + "region": Region, + } + x_content_sha256 = hash_sha256(body) + sign_result = { + "Host": Host, + "X-Content-Sha256": x_content_sha256, + "X-Date": x_date, + "Content-Type": ContentType + } + headers.update(sign_result) + signed_headers_str = ";".join( + ["content-type", "host", "x-content-sha256", "x-date"] + ) + canonical_request_str = "\n".join( + [method.upper(), + path, + norm_query(params), + "\n".join( + [ + "content-type:" + content_type, + "host:" + Host, + "x-content-sha256:" + x_content_sha256, + "x-date:" + x_date, + ] + ), + "", + signed_headers_str, + x_content_sha256, + ] + ) + hashed_canonical_request = hash_sha256(canonical_request_str) + credential_scope = "/".join([short_x_date, credential["region"], credential["service"], "request"]) + string_to_sign = "\n".join(["HMAC-SHA256", x_date, credential_scope, hashed_canonical_request]) + k_date = hmac_sha256(secretkey.encode("utf-8"), short_x_date) + k_region = hmac_sha256(k_date, credential["region"]) + k_service = hmac_sha256(k_region, credential["service"]) + k_signing = hmac_sha256(k_service, "request") + signature = hmac_sha256(k_signing, string_to_sign).hex() + headers['Authorization'] = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format( + apikey + "/" + credential_scope, + signed_headers_str, + signature, + ) + diff --git a/build/lib/llmage/keling.py b/build/lib/llmage/keling.py new file mode 100644 index 0000000..e7e5eeb --- /dev/null +++ b/build/lib/llmage/keling.py @@ -0,0 +1,18 @@ +import time +import jwt + +ak = "" # 填写access key +sk = "" # 填写secret key + +def keling_token(ak, sk): + headers = { + "alg": "HS256", + "typ": "JWT" + } + payload = { + "iss": ak, + "exp": int(time.time()) + 1800, # 有效时间,此处示例代表当前时间+1800s(30min) + "nbf": int(time.time()) - 5 # 开始生效的时间,此处示例代表当前时间-5秒 + } + token = jwt.encode(payload, sk, headers=headers) + return token diff --git a/build/lib/llmage/llmclient.py b/build/lib/llmage/llmclient.py new file mode 100644 index 0000000..6ed461c --- /dev/null +++ b/build/lib/llmage/llmclient.py @@ -0,0 +1,148 @@ +import json +import time +import asyncio +from random import randint +from functools import partial +from traceback import format_exc +from appPublic.log import debug, exception, error +from appPublic.uniqueID import getID +from appPublic.dictObject import DictObject +from appPublic.timeUtils import curDateString, timestampstr +from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64 +from ahserver.serverenv import get_serverenv, ServerEnv +from ahserver.filestorage import FileStorage +from .asyncinference import async_uapi_request +from .syncinference import sync_uapi_request +from .accounting import llm_accounting, llm_charging +from .utils import * + +async def uapi_request(request, llm, callerid, callerorgid, params_kw=None): + env = request._run_ns.copy() + if not params_kw: + params_kw = env.params_kw + # callerorgid = await env.get_userorgid() + # callerid = await env.get_user() + uapi = env.UpAppApi(request) + userid = await env.uapi_data.get_calluserid(llm.upappid, orgid=llm.ownerid) + outlines = [] + txt = '' + luid = getID() + try: + start_timestamp = time.time() + responsed_seconds = None + finish_seconds = None + first = True + usage = None + async for l in uapi.stream_linify(llm.upappid, llm.apiname, userid, + params=params_kw): + if first: + first = False + responsed_seconds = time.time() - start_timestamp + if isinstance(l, bytes): + l = l.decode('utf-8') + if l[-1] == '\n': + l = l[:-1] + debug(f'stream response line={l},{type(l)}') + l = ''.join(l.split('\n')) + if l and l != '[DONE]': + yield_it = False + d = {} + try: + d = json.loads(l) + except Exception as e: + debug(f'json.loads({l}) error({e})') + continue + if d.get('reasoning_content'): + txt += d.get('reasoning_content') + yield_it = True + if d.get('content'): + txt = txt + d['content'] + yield_it = True + if d.get('usage'): + usage = d['usage'] + d['llmusageid'] = luid + outlines.append(d) + yield json.dumps(d, ensure_ascii=False) + '\n' + if usage is None: + error(f'{llm=} response has not usage') + finish_seconds = time.time() - start_timestamp + if responsed_seconds is None: + responsed_seconds = finish_seconds + llmusage = DictObject() + llmusage.id = luid + llmusage.llmid = llm.id + llmusage.use_date = curDateString() + llmusage.use_time = timestampstr() + llmusage.userid = callerid + llmusage.usages = json.dumps(usage, ensure_ascii=False, indent=4) + debug(f' {usage=}, {type(usage)=}, {llmusage.usages=}') + ioinfo = { + "input": params_kw, + 'output': outlines + } + webpath = await write_llmio(llmusage.id, ioinfo) + llmusage.ioinfo = webpath + llmusage.transno = params_kw.transno + llmusage.responsed_seconds = responsed_seconds + llmusage.finish_seconds = finish_seconds + llmusage.status = 'SUCCEEDED' + llmusage.userorgid = callerorgid + llmusage.ownerid = llm.ownerid + llmusage.accounting_status = 'created' + await write_llmusage(llmusage) + except Exception as e: + exception(f'{e=},{format_exc()}') + estr = erase_apikey(e) + ed = {"error": f"ERROR:{estr}", "status": "FAILED" ,"llmusageid": luid} + s = json.dumps(ed, ensure_ascii=False) + s = ''.join(s.split('\n')) + outlines.append(ed) + yield f'{s}\n' + return + +async def inference_generator(request, *args, params_kw=None, **kw): + env = request._run_ns.copy() + callerorgid = await env.get_userorgid() + callerid = await env.get_user() + async for d in _inference_generator(request, callerid, + callerorgid, params_kw=params_kw, **kw): + yield d + +async def _inference_generator(request, callerid, callerorgid, + params_kw={}, **kw): + env = request._run_ns + if not params_kw: + params_kw = env.params_kw + if not params_kw.transno: + params_kw.transno = getID() + llmid = params_kw.llmid + f = None + llm = await get_llm(llmid) + if llm is None: + errmsg = f'{{"status": "FAILED", "error":"llmid:{llmid}没找到模型"}}\n' + exception(errmsg) + yield errmsg + return + if not params_kw.model: + params_kw.model = llm.model + if llm.stream == 'async': + if llm.callbackurl: + cb_url = env.entire_url(llm.callbackurl) + params_kw.callbackurl = cb_url + f = partial(async_uapi_request, request, llm, callerid, callerorgid, params_kw=params_kw) + elif not params_kw.stream: + llm.stream = False + debug(f'---{params_kw.stream=}, {llm.stream=} ---use sync_uapi_request ') + f = partial(sync_uapi_request, request, llm, callerid, callerorgid, params_kw=params_kw) + else: + llm.stream = True + debug(f'---{params_kw.stream=}, {llm.stream=} ---use uapi_request ') + f = partial(uapi_request, request, llm, callerid, callerorgid, params_kw=params_kw) + async for d in f(): + yield d + +async def inference(request, *args, params_kw=None, **kw): + env = request._run_ns.copy() + f = partial(inference_generator, request, *args, params_kw=params_kw, **kw) + return await env.stream_response(request, f) + diff --git a/build/lib/llmage/messages.py b/build/lib/llmage/messages.py new file mode 100644 index 0000000..d1aad29 --- /dev/null +++ b/build/lib/llmage/messages.py @@ -0,0 +1,69 @@ +from appPublic.myTE import MyTemplateEngine + +def default_sysmessage(): + return """{ + "role":"system", + "content":"{{content}}" + }""" + +def default_usrmessage(): + return """{ + "role":"user", + "content":"{{prompt}}" + }""" + +def default_llmmessage(): + return """{ + "role":"assisant", + "content":"{{content}}" + }""" + +class BaseMessages: + def __init__(self, request, llmid, sys_message, usr_message, llm_message): + self.request = request + self.llmid = llmid + self.sys_message = sys_message + self.usr_message = usr_message + self.llm_message = llm_message + self.te = MyTemplateEngine([]) + + async def append_meessages(self, msg_format, **kw): + m = self.te.renders(msg_format, kw) + msgs = await self.get_messages() + msgs.append(m) + await self.set_message(msgs) + return msgs + + async def append_usr_messages(self, **kw): + return await self.append_messages(self.usr_message, **kw) + + async def append_sys_messages(self, **kw): + return await self.append_messages(self.sys_message, **kw) + + async def append_llm_messages(self, **kw): + return await self.append_messages(self.llm_message, **kw) + + async def get_messages(self): + return [] + + async def set_messages(self, msgs): + pass + +class SessionMessages(BaseMessages): + async def get_messages(self): + env = self.request._run_ns + s = await env.get_session() + userid = await env.get_user() + mk = f'{self.llmid}_{userid}_msgs' + msgs = s[mk] + if not msgs: + msgs = [] + return msgs + + async def set_messages(self, msgs): + env = self.request._run_ns + s = await env.get_session() + userid = await env.get_user() + mk = f'{self.llmid}_{userid}_msgs' + s[mk] = msgs + diff --git a/build/lib/llmage/syncinference.py b/build/lib/llmage/syncinference.py new file mode 100644 index 0000000..a524be1 --- /dev/null +++ b/build/lib/llmage/syncinference.py @@ -0,0 +1,98 @@ +import json +import time +import asyncio +from random import randint +from functools import partial +from traceback import format_exc +from sqlor.dbpools import DBPools, get_sor_context +from appPublic.log import debug, exception, error +from appPublic.uniqueID import getID +from appPublic.dictObject import DictObject +from appPublic.timeUtils import curDateString, timestampstr +from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64 +# from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi +from ahserver.serverenv import get_serverenv, ServerEnv +from ahserver.filestorage import FileStorage +from .accounting import llm_accounting, llm_charging +from .utils import * + +async def sync_uapi_request(request, llm, callerid, callerorgid, params_kw=None): + env = request._run_ns.copy() + if not params_kw: + params_kw = env.params_kw + # callerid = await env.get_user() + # callerorgid = await env.get_userorgid() + # uapi = UAPI(request, sor=sor) + uapi = env.UpAppApi(request) + userid = await env.uapi_data.get_calluserid(llm.upappid, orgid=llm.ownerid) + outlines = [] + b = None + d = None + luid = getID() + try: + start_timestamp = time.time() + responsed_seconds = None + finish_seconds = None + b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw) + if isinstance(b, bytes): + b = b.decode('utf-8') + d = json.loads(b) + status = d.get('status') + usage = d.get('usage') + if status and status != 'SUCCEEDED': + raise Exception(d['error']) + responsed_seconds = time.time() - start_timestamp + finish_seconds = responsed_seconds + llmusage = DictObject() + llmusage.id = luid + llmusage.llmid = llm.id + llmusage.use_date = curDateString() + llmusage.use_time = timestampstr() + llmusage.userid = callerid + llmusage.usages = json.dumps(usage, ensure_ascii=False) + ioinfo = { + "input": params_kw, + 'output': [d] + } + webpath = await write_llmio(llmusage.id, ioinfo) + llmusage.ioinfo = webpath + llmusage.transno = params_kw.transno + llmusage.responsed_seconds = responsed_seconds + llmusage.finish_seconds = finish_seconds + llmusage.status = 'SUCCEEDED' + llmusage.amount = llmusage.cost = 0.00 + """ 联机不记账 + if llm.ppid: + try: + charging = await llm_charging(llm.ppid, llmusage) + if charging: + llmusage.amount = charging.amount + llmusage.cost = charging.cost + else: + llmusage.amount = llmusage.cost = 0.0 + except Exception as e: + e = Exception(f'{llm.pid} charging error{e}') + exception(f'{e}') + else: + llmusage.amount = 0 + llmusage.cost = 0 + """ + llmusage.userorgid = callerorgid + llmusage.ownerid = llm.ownerid + llmusage.accounting_status = 'created' + b = json.dumps(d, ensure_ascii=False) + yield b + await write_llmusage(llmusage) + """联机不记账 + if llmusage.amount > 0.0001: + await llm_accounting(llmusage) + """ + except Exception as e: + exception(f'{e=},{format_exc()}, {b=}') + estr = erase_apikey(e) + ed = {"error": f"ERROR:{estr}", "status": "FAILED" ,"llmusageid": luid} + s = json.dumps(ed, ensure_ascii=False) + s = ''.join(s.split('\n')) + outlines.append(ed) + yield f'{s}\n' + diff --git a/build/lib/llmage/utils.py b/build/lib/llmage/utils.py new file mode 100644 index 0000000..3d00cb6 --- /dev/null +++ b/build/lib/llmage/utils.py @@ -0,0 +1,348 @@ +import json +import time +import asyncio +import aiofiles +from random import randint +from functools import partial +from traceback import format_exc +from sqlor.dbpools import DBPools, get_sor_context +from appPublic.log import debug, exception, error, critical +from appPublic.uniqueID import getID +from appPublic.dictObject import DictObject +from appPublic.timeUtils import curDateString, timestampstr +from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi +from ahserver.serverenv import get_serverenv, ServerEnv +from ahserver.filestorage import FileStorage +from appPublic.jsonConfig import getConfig +from appPublic.streamhttpclient import StreamHttpClient + +async def update_llmusage(ns): + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + await sor.U('llmusage', ns) + +async def get_user_tpac(userid): + env = ServerEnv() + config = getConfig() + async with get_sor_context(env, 'rbac') as sor: + recs = await sor.R('users', {'id': userid}) + if recs: + tpac = config.tpacs.get(recs[0].sync_from) + return tpac + return None + +async def get_tpac_balance(tpac, userid): + url = tpac.get_tpac_balance_url + hc = StreamHttpClient() + try: + b = await hc.request('GET', url, params={'userid': userid}) + if b: + d = json.loads(b.decode('utf-8')) + if d['status'] == 'ok': + return d['balance'] + exception(f'{url=}, {userid=}, {b} error') + return None + except Exception as e: + exception(f'{url=}, {userid=}, error:{e}') + return None + +async def tpac_accounting(tpac, userid, llmid, amount, usage, luid): + url = tpac.tpac_accounting_url + hc = StreamHttpClient() + d = { + 'userid': userid, + 'llmid': llmid, + 'amount': amount, + 'usage': usage + } + status = 'failed' + try: + b = await hc.request('POST', url, data=d) + d = json.loads(b.decode('utf-8')) + if d['status'] == 'ok': + debug(f'{d=}') + await update_llmusage({'id': luid, 'accounting_status': 'accounted'}) + return + raise Exception(f'{d} tpac accounting error') + except Exception as e: + exception(f'{userid=}, {llmid=}, {amount=}, {usage=} tpac accounting error:{e}') + raise e + +async def append_new_llmoutput(webpath, output): + fs = FileStorage() + p = fs.realPath(webpath) + if isinstance(output, str): + output = json.loads(output) + bin = await read_webpath(webpath) + io = json.loads(bin.decode('utf-8')) + io['output'].append(output) + async with aiofiles.open(p, 'wb') as f: + iostr = json.dumps(io, ensure_ascii=False, indent=4) + await f.write(iostr.encode('utf-8')) + +async def get_lastoutput(webpath): + bin = await read_webpath(webpath) + io = json.loads(bin.decode('utf-8')) + return io['output'][-1] + +async def read_webpath(webpath): + fs = FileStorage() + p = fs.realPath(webpath) + async with aiofiles.open(p,'rb') as f: + bin = await f.read() + return bin + +async def write_llmio(luid, io_dic): + fs = FileStorage() + s = io_dic + if not isinstance(io_dic, str): + s = json.dumps(io_dic, ensure_ascii=False, indent=4) + name = f'{luid}.json' + webpath = await fs.save(name, s, userid='llmio') + return webpath + +async def llm_query_orders(userorgid, page, pagerows=80): + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + sql = """select a.llmid, +a.use_date, +a.use_time, +a.userid, +a.usages, +a.status, +a.amount, +a.userorgid, +a.accounting_status, +b.name, +b.model +from llmusage a, llm b +where userorgid = ${userorgid}$ + and a.llmid = b.id +""" + ns = dict( + page=page, + pagerows=pagerows, + sort="use_time desc", + userorgid=userorgid) + + data = await sor.sqlExe(sql, ns) + return data + return {'total': 0, 'rows':[]} + +async def get_llm_by_model(id, lctype=None): + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + sql = 'select * from llm where model=${model}$' + recs = await sor.R('llm', {'model': model}) + return recs + +def erase_apikey(e): + e = str(e) + ss = e.split('Bearer ') + if len(ss) < 2: + return e + + for i, c in enumerate(ss[1]): + if c in ['"', "'"]: + newb = "XXXXXXXX" + ss[1][i:] + break + return ss[0] + 'Bearer ' + newb + +async def get_llmproviders(): + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + sql = """select a.providerid, a.iconid, b.orgname +from llm a, organization b +where a.providerid = b.id +group by a.providerid, a.iconid, b.orgname""" + return await sor.sqlExe(sql, {}) + return [] + +async def get_llms_sort_by_provider(): + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + today = curDateString() + sql = """select a.*, b.orgname from llm a, organization b +where a.enabled_date <= ${today}$ + and a.expired_date > ${today}$ + and a.providerid = b.id + order by a.providerid, a.id + """ + recs = await sor.sqlExe(sql, {'today': today}) + d = [] + x = None + oldpid = '-111' + for l in recs: + if l.providerid != oldpid: + x = { + 'id': l.providerid, + 'orgname': l.orgname, + 'llms': [l] + } + d.append(x) + oldpid = l.providerid + else: + x['llms'].append(l) + return d + return [] + +async def get_llmcatelogs(): + db = DBPools() + dbname = get_serverenv('get_module_dbname')('llmage') + async with db.sqlorContext(dbname) as sor: + recs = await sor.R('llmcatelog', {}) + return recs + + return [] + +async def get_llms_by_catelog_to_customer(catelogid=None, orderby='providerid'): + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + today = curDateString() + # Join with llm_api_map to get catalog relationship + sql = """select distinct a.*, +b.name as catelogname, +m.llmcatelogid as catelog_id, +m.apiname, +m.query_apiname, +m.query_period, +m.ppid + from llm a + join llm_api_map m on a.id = m.llmid + join llmcatelog b on m.llmcatelogid = b.id + where a.enabled_date <= ${today}$ + and m.ppid is not null + and a.expired_date > ${today}$ + """ + sortstr='catelog_id, ' + orderby + params = {'today': today, 'sort': sortstr} + if catelogid: + sql += " and m.llmcatelogid = ${catelogid}$" + params['catelogid'] = catelogid + + debug(f'{sql=}') + recs = await sor.sqlExe(sql, params.copy()) + debug(f'{sql=}, {recs=}, {params=}') + d = [] + cid = '' + x = None + for r in recs: + if cid != r.catelog_id: + x = { + 'catelogid': r.catelog_id, + 'catelogname': r.catelogname, + 'llms': [r] + } + d.append(x) + cid = r.catelog_id + else: + x['llms'].append(r) + return d + return [] + +async def get_llms_by_catelog(catelogid=None, orderby='providerid'): + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + today = curDateString() + # Join with llm_api_map to get catalog relationship + sql = """select distinct a.*, b.name as catelogname, m.llmcatelogid as catelog_id + from llm a + join llm_api_map m on a.id = m.llmid + join llmcatelog b on m.llmcatelogid = b.id + where a.enabled_date <= ${today}$ + and a.expired_date > ${today}$""" + params = {'today': today, 'sort': orderby} + if catelogid: + sql += " and m.llmcatelogid = ${catelogid}$" + params['catelogid'] = catelogid + + sql += " order by m.llmcatelogid, a.id" + + recs = await sor.sqlExe(sql, params) + d = [] + cid = '' + x = None + for r in recs: + if cid != r.catelog_id: + x = { + 'catelogid': r.catelog_id, + 'catelogname': r.catelogname, + 'llms': [r] + } + d.append(x) + cid = r.catelog_id + else: + x['llms'].append(r) + return d + return [] + +async def get_llm(llmid, catelogid=None): + today = curDateString() + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + sql = """select a.id, +a.name, +a.model, +a.providerid, +a.description, +a.iconid, +a.upappid, +a.ownerid, +a.min_balance, +m.llmcatelogid, +m.apiname, +m.query_apiname, +m.query_period, +m.ppid, +e.ioid, +e.stream, +e.callbackurl, +f.input_fields, +lc.name as catelogname +from llm a +,llm_api_map m +,llmcatelog lc +,upapp c +,uapi e +,uapiio f +where a.id = m.llmid +and a.upappid = c.id +and c.id = e.upappid +and m.apiname = e.name +and e.ioid = f.id +and a.id = ${llmid}$ +and a.expired_date > ${today}$ +and a.enabled_date <= ${today}$ +""" + ns = {'llmid': llmid, 'today': today} + if catelogid: + sql += ' and m.llmcatelogid = ${catelogid}$ ' + ns['catelogid'] = catelogid + else: + sql += " and m.isdefaultcatelog = '1'" + recs = await sor.sqlExe(sql, ns.copy()) + if len(recs) > 0: + r = recs[0] + return r + else: + debug(f'{llmid=} not found, {ns=}, {sql=}') + return None + exception(f'Error: {format_exc()}') + return None + + +async def write_llmusage(llmusage): + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + await sor.C('llmusage', llmusage) + +async def llm_query_price(llmid, config_data): + env = ServerEnv() + llm = await get_llm(llmid) + if llm.ppid is None: + e = Exception(f'{llm=} ppid is None') + exception(f'{e}') + raise e + prices = await env.buffered_charging(llm.ppid, config_data) + return prices + diff --git a/llmage/accounting.py b/llmage/accounting.py index c34d42b..8ab56a9 100644 --- a/llmage/accounting.py +++ b/llmage/accounting.py @@ -41,7 +41,7 @@ async def checkCustomerBalance(llmid, userid, userorgid, catelogid=None): debug(f'checkCustomerBalance(): llmid is None') return False env = ServerEnv() - llm = await get_llm(llmid) + llm = await get_llmage_llm(llmid) if llm.ownerid == userorgid: debug(f'self orgid user') return True @@ -64,37 +64,34 @@ async def checkCustomerBalance(llmid, userid, userorgid, catelogid=None): async def llm_accounting(llmusage): env = ServerEnv() llmid = llmusage.llmid + llm = await get_llmage_llm(llmid) + if llm is None: + async with get_sor_context(env, 'llmage') as sor: + ns = { + 'id': llmusage.id, + 'accounting_status': 'failed' + } + await sor.U('llmusage', ns) + e = Exception(f'llm not found({llmid})') + exception(f'{e}') + raise e + if llm.ppid is None: + async with get_sor_context(env, 'llmage') as sor: + ns = { + 'id': llmusage.id, + 'accounting_status': 'failed' + } + await sor.U('llmusage', ns) + e = Exception(f'llm ({llmid}) donot has a pricing_program') + exception(f'{e}') + raise e + customerid = llmusage.userorgid + userid = llmusage.userid + resellerid = llm.ownerid + providerid = llm.providerid + trans_amount = llmusage.amount + trans_cost = llmusage.cost async with get_sor_context(env, 'llmage') as sor: - sql = """select a.*, b.ppid from llm a, llm_api_map b -where a.id=${llmid}$ - and a.id = b.llmid - and b.isdefaultcatelog = '1' -""" - recs = await sor.sqlExe(sql, {'llmid': llmusage.llmid}) - if len(recs) == 0: - ns = { - 'id': llmusage.id, - 'accounting_status': 'failed' - } - await sor.U('llmusage', ns) - e = Exception(f'llm not found({llmid})') - exception(f'{e}') - raise e - if recs[0].ppid is None: - ns = { - 'id': llmusage.id, - 'accounting_status': 'failed' - } - await sor.U('llmusage', ns) - e = Exception(f'llm ({llmid}) donot has a pricing_program') - exception(f'{e}') - raise e - customerid = llmusage.userorgid - userid = llmusage.userid - resellerid = recs[0].ownerid - providerid = recs[0].providerid - trans_amount = llmusage.amount - trans_cost = llmusage.cost biz_date = await env.get_business_date(sor) timestamp = env.timestampstr() orderid = getID() diff --git a/llmage/init.py b/llmage/init.py index d1b92a6..0e0637d 100644 --- a/llmage/init.py +++ b/llmage/init.py @@ -16,7 +16,8 @@ from .utils import ( get_llmcatelogs, get_llms_by_catelog_to_customer, get_llmproviders, - get_llm, + get_llm, + get_llmage_llm, ) from .llmclient import ( @@ -54,6 +55,7 @@ def load_llmage(): env.get_asynctask_status = get_asynctask_status env.query_task_status = query_task_status env.get_llm = get_llm + env.get_llmage_llm = get_llmage_llm env.inference = inference env.inference_generator = inference_generator env.get_llms_by_catelog = get_llms_by_catelog diff --git a/llmage/utils.py b/llmage/utils.py index 10f4a25..85b59da 100644 --- a/llmage/utils.py +++ b/llmage/utils.py @@ -188,6 +188,39 @@ where a.enabled_date <= ${today}$ return d return [] +async def get_llmage_llm(llmid=None, catelogid=None): + """Unified accessor for llm + llm_api_map + llmcatelog. + For non-API-call scenarios only (display, listing, querying, accounting). + Do NOT use for vendor model API calls — use get_llm() instead. + + - llmid: get specific llm by id (returns single DictObject or None) + - catelogid: filter by catalog (returns list) + - neither: return all with catalog info (returns list) + """ + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + sql = """select a.id, a.name, a.model, a.providerid, a.description, +a.iconid, a.upappid, a.ownerid, a.min_balance, a.status, +m.llmcatelogid, m.apiname, m.query_apiname, m.query_period, m.ppid, m.isdefaultcatelog, +lc.name as catelogname +from llm a +join llm_api_map m on a.id = m.llmid +join llmcatelog lc on m.llmcatelogid = lc.id +where 1=1 +""" + ns = {} + if llmid: + sql += " and a.id = ${llmid}$ and m.isdefaultcatelog = '1'" + ns['llmid'] = llmid + if catelogid: + sql += " and m.llmcatelogid = ${catelogid}$" + ns['catelogid'] = catelogid + sql += " order by m.llmcatelogid, a.id" + recs = await sor.sqlExe(sql, ns) + if llmid: + return recs[0] if recs else None + return recs + async def get_llmcatelogs(): db = DBPools() dbname = get_serverenv('get_module_dbname')('llmage') @@ -375,7 +408,7 @@ async def write_llmusage(llmusage): async def llm_query_price(llmid, config_data): env = ServerEnv() - llm = await get_llm(llmid) + llm = await get_llmage_llm(llmid) if llm.ppid is None: e = Exception(f'{llm=} ppid is None') exception(f'{e}')