diff --git a/.gitignore b/.gitignore index 99f8b75..ec31262 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ wwwroot/llmcatelog_list/ wwwroot/llmusage/ wwwroot/llmusage_accounting_failed/ wwwroot/llmusage_history/ +build/ diff --git a/build/lib/llmage/__init__.py b/build/lib/llmage/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/build/lib/llmage/accounting.py b/build/lib/llmage/accounting.py deleted file mode 100644 index c3a0fd6..0000000 --- a/build/lib/llmage/accounting.py +++ /dev/null @@ -1,369 +0,0 @@ -import asyncio -import json -import time -from datetime import datetime -from appPublic.log import exception, debug -from appPublic.uniqueID import getID -from appPublic.dictObject import DictObject -from sqlor.dbpools import get_sor_context -from ahserver.serverenv import ServerEnv -from accounting.consume import consume_accounting -from accounting.getaccount import getCustomerBalance -from .utils import * - -async def llm_charging(ppid, llmusage): - env = ServerEnv() - usages = llmusage.usages - if isinstance(usages, str): - usages = json.loads(usages) - prices = await env.buffered_charging(ppid, usages) - if prices is None: - e = Exception(f'{ppid=}, {usages=}{llmusage.id=} env.buffered_charging() return None') - exception(f'{e}') - raise e - return None - amount = 0 - cost = 0 - for p in prices: - amount += p.amount - if p.cost: - cost += p.cost - discount = await env.get_customer_discount(llmusage.ownerid, - llmusage.userorgid) - return DictObject(**{ - 'original_amount': amount, - 'amount': amount * discount, - 'cost': cost - }) - -async def checkCustomerBalance(llmid, userid, userorgid, catelogid=None): - if llmid is None: - debug(f'checkCustomerBalance(): llmid is None') - return False - env = ServerEnv() - llm = await get_llm(llmid) - if llm.ownerid == userorgid: - debug(f'self orgid user') - return True - balance = 0.00 - tpac = await get_user_tpac(userid) - if tpac: - debug(f'{tpac=}, get tpac balance') - balance = await get_tpac_balance(tpac, userid) - else: - debug(f'{tpac=}, get local balance') - async with get_sor_context(env, 'accounting') as sor: - balance = await getCustomerBalance(sor, userorgid) - bal = 0 if balance is None else balance - if llm.min_balance is None: - llm.min_balance = 0.00 - ret = llm.ppid and llm.min_balance < bal - debug(f'{llm.ppid=}, {llm.min_balance=}, {bal=}') - return ret - -async def llm_accounting(llmusage): - env = ServerEnv() - llmid = llmusage.llmid - async with get_sor_context(env, 'llmage') as sor: - sql = """select a.*, b.ppid from llm a, llm_api_map b -where a.id=${llmid}$ - and a.id = b.llmid - and b.isdefaultcatelog = '1' -""" - recs = await sor.sqlExe(sql, {'llmid': llmusage.llmid}) - if len(recs) == 0: - ns = { - 'id': llmusage.id, - 'accounting_status': 'failed' - } - await sor.U('llmusage', ns) - e = Exception(f'llm not found({llmid})') - exception(f'{e}') - raise e - if recs[0].ppid is None: - ns = { - 'id': llmusage.id, - 'accounting_status': 'failed' - } - await sor.U('llmusage', ns) - e = Exception(f'llm ({llmid}) donot has a pricing_program') - exception(f'{e}') - raise e - customerid = llmusage.userorgid - userid = llmusage.userid - resellerid = recs[0].ownerid - providerid = recs[0].providerid - trans_amount = llmusage.amount - trans_cost = llmusage.cost - biz_date = await env.get_business_date(sor) - timestamp = env.timestampstr() - orderid = getID() - order = { - "id": orderid, - "customerid": customerid, - "resellerid": resellerid, - "order_date": biz_date, - "order_status": "1", # accounted - "business_op": "PAY", - "amount": trans_amount, - "userid": userid, - "productid": llmid - } - await sor.C('biz_order', order) - orderdetail = { - "id": getID(), - "orderid": orderid, - "productid": llmid, - "product_cnt": 1, - "trans_amount": trans_amount - } - await sor.C('biz_orderdetail', orderdetail) - ais = [] - if customerid != resellerid: - ai0 = DictObject() - ai0.action = 'PAY' - ai0.customerid = customerid - ai0.resellerid = resellerid - ai0.providerid = providerid - ai0.biz_date = biz_date - ai0.timestamp = timestamp - ai0.productid = llmid - ai0.transamt = trans_amount - ai0.variable = { - "交易金额": trans_amount, - "交易手续费": 0 - } - ais.append(ai0) - ai1 = DictObject() - ai1.action = 'PAY*' - ai1.customerid = customerid - ai1.resellerid = resellerid - ai1.providerid = providerid - ai1.biz_date = biz_date - ai1.timestamp = timestamp - ai1.providerid = providerid - ai1.productid = llmid - ai1.transamt = trans_cost - ai1.variable = { - "采购成本": trans_cost - } - ais.append(ai1) - await consume_accounting(sor, orderid, ais) - llmusage.accounting_status = 'accounted' - ns = { - 'id': llmusage.id, - 'accounting_status': 'accounted' - } - await sor.U('llmusage', ns) - -async def get_accounting_llmusages(luid=None): - env = ServerEnv() - lus = [] - t = time.time() - dt = datetime.fromtimestamp(t) - tsstr = dt.strftime('%Y-%m-%d %H:%M:%S.') + f'{dt.microsecond // 1000:03d}' - async with get_sor_context(env, 'llmage') as sor: - sql = """select a.*, c.ppid -from llmusage a, llm b, llm_api_map c -where a.llmid = b.id - and a.llmid = c.llmid - and c.isdefaultcatelog = '1' - and a.status = 'SUCCEEDED' - and a.use_time < ${tsstr}$ - and a.accounting_status='created'""" - ns = {'tsstr': tsstr} - if luid: - sql += " and a.id=${luid}$" - ns['luid'] = luid - recs = await sor.sqlExe(sql, ns) - # debug(f'{sql=}, {ns=}, {len(recs)=}') - for r in recs: - if r.usages is None: - try: - output = await get_lastoutput(r.ioinfo) - except Exception as e: - continue - r.usages = output.get('usage') - if r.usages is None: - debug(f'{r.usages=} is None, accoiunting failed') - await llm_accoung_failed(r.id, reason='usages is None') - continue - d = None - try: - debug(f'{r.ppid=}, {r.usages=} {r.id=}') - d = await llm_charging(r.ppid, r) - - except Exception as e: - exception(f'{r.ppid=}, {r.usages=} llm_charging() failed,{e}') - await llm_accoung_failed(r.id, reason=f'llm_charging failed: {e}') - continue - r.amount = d.amount - r.cost = d.cost - ns = { - 'id': r.id, - 'amount': r.amount, - 'cost': r.cost, - 'usage': json.dumps(r.usage, ensure_ascii=False, indent=4) - } - await sor.U('llmusage', ns) - lus.append(r) - return lus - -async def llm_accoung_failed(luid, reason=None): - env = ServerEnv() - async with get_sor_context(env, 'llmage') as sor: - await sor.U('llmusage', { - 'id': luid, - 'accounting_status': 'failed' - }) - # Also record in the failed accounting table for tracking - recs = await sor.R('llmusage', {'id': luid}) - if recs: - r = recs[0] - failed_id = getID() - failed_rec = { - 'id': failed_id, - 'llmusageid': luid, - 'llmid': r.llmid, - 'userid': r.userid, - 'userorgid': r.userorgid, - 'use_date': r.use_date, - 'use_time': r.use_time, - 'amount': r.amount, - 'cost': r.cost, - 'failed_reason': reason or 'accounting failed', - 'failed_time': env.timestampstr(), - 'retry_count': 0, - 'handled': '0' - } - await sor.C('llmusage_accounting_failed', failed_rec) - - -async def backup_accounted_llmusage(): - """Backup yesterday's accounted records to history table and remove from llmusage.""" - env = ServerEnv() - yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') - ts = env.timestampstr() - batched = 0 - async with get_sor_context(env, 'llmage') as sor: - # Select yesterday's accounted records - sql = """select * from llmusage -where accounting_status='accounted' - and use_date < ${yesterday}$""" - recs = await sor.sqlExe(sql, {'yesterday': yesterday}) - if not recs: - debug(f'backup_accounted_llmusage: no records to backup for use_date < {yesterday}') - return 0 - debug(f'backup_accounted_llmusage: {len(recs)} records to backup') - for r in recs: - history_rec = { - 'id': r.id, - 'llmid': r.llmid, - 'use_date': r.use_date, - 'use_time': r.use_time, - 'userid': r.userid, - 'usages': r.usages, - 'ioinfo': r.ioinfo, - 'transno': r.transno, - 'responsed_seconds': r.responsed_seconds, - 'finish_seconds': r.finish_seconds, - 'status': r.status, - 'taskid': r.taskid, - 'amount': r.amount, - 'cost': r.cost, - 'userorgid': r.userorgid, - 'ownerid': r.ownerid, - 'accounting_status': r.accounting_status, - 'backup_time': ts - } - await sor.C('llmusage_history', history_rec) - # Delete from main table - await sor.D('llmusage', {'id': r.id}) - batched += 1 - debug(f'backup_accounted_llmusage: backed up {batched} records') - return batched - - -async def get_failed_accounting_records(filters=None, page=1, page_size=50): - """Search failed accounting records with optional filters. - - filters: dict with optional keys: - - userorgid: filter by user organization - - llmid: filter by model ID - - handled: '0' or '1' - - start_date: filter use_date >= start_date - - end_date: filter use_date <= end_date - """ - async with get_sor_context(ServerEnv(), 'llmage') as sor: - conditions = [] - ns = {} - if filters: - if filters.get('userorgid'): - conditions.append("userorgid=${userorgid}$") - ns['userorgid'] = filters['userorgid'] - if filters.get('llmid'): - conditions.append("llmid=${llmid}$") - ns['llmid'] = filters['llmid'] - if filters.get('handled') is not None: - conditions.append("handled=${handled}$") - ns['handled'] = filters['handled'] - if filters.get('start_date'): - conditions.append("use_date>=${start_date}$") - ns['start_date'] = filters['start_date'] - if filters.get('end_date'): - conditions.append("use_date<=${end_date}$") - ns['end_date'] = filters['end_date'] - where = "" - if conditions: - where = "where " + " and ".join(conditions) - # Count total - count_sql = f"select count(*) as cnt from llmusage_accounting_failed {where}" - count_recs = await sor.sqlExe(count_sql, ns) - total = count_recs[0].cnt if count_recs else 0 - # Query with pagination - offset = (page - 1) * page_size - query_sql = f"""select * from llmusage_accounting_failed {where} -order by failed_time desc limit {page_size} offset {offset}""" - recs = await sor.sqlExe(query_sql, ns) - return { - 'total': total, - 'page': page, - 'page_size': page_size, - 'records': recs - } - -async def backend_accounting(): - env = ServerEnv() - debug(f'backend accounting started ...') - backup_counter = 0 - while True: - try: - lus = await get_accounting_llmusages() - except Exception as e: - exception(f'{e}') - lus = [] - # debug(f'{len(lus)=} need to accounting........') - for lu in lus: - try: - tpac = await get_user_tpac(lu.userid) - if tpac: - debug(f'{lu.id=},{lu.userid=}, {tpac=}, go tpac') - await tpac_accounting(tpac, lu.userid, lu.llmid, lu.amount, lu.usages, lu.id) - else: - debug(f'{lu.id=},{lu.userid=}, {tpac=}, go local') - await llm_accounting(lu) - except Exception as e: - exception(f'{e}, {lu.id=}') - await llm_accoung_failed(lu.id, reason=str(e)) - - # Run backup every 100 iterations (roughly every ~1000 seconds) - backup_counter += 1 - if backup_counter >= 100: - backup_counter = 0 - try: - await backup_accounted_llmusage() - except Exception as e: - exception(f'backup_accounted_llmusage failed: {e}') - - await asyncio.sleep(10) - diff --git a/build/lib/llmage/asyncinference.py b/build/lib/llmage/asyncinference.py deleted file mode 100644 index c21044d..0000000 --- a/build/lib/llmage/asyncinference.py +++ /dev/null @@ -1,203 +0,0 @@ -import json -import time -import asyncio -from random import randint -from functools import partial -from traceback import format_exc -from sqlor.dbpools import DBPools, get_sor_context -from appPublic.log import debug, exception, error, critical -from appPublic.uniqueID import getID -from appPublic.dictObject import DictObject -from appPublic.timeUtils import curDateString, timestampstr, timestampAdd -from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64 -from ahserver.serverenv import get_serverenv, ServerEnv -from ahserver.filestorage import FileStorage -from .accounting import llm_accounting, llm_charging -from .utils import * - -async def get_today_asynctask_list(userid): - env = ServerEnv() - async with get_sor_context(env, 'llmage') as sor: - today = await env.get_business_date(sor) - sql = '''select * from llmusage -where userid=${userid}$ - and use_date = ${date}$''' - recs = await sor.sqlExe(sql, { - 'date': today, - 'userid': userid - }) - return recs - return [] - -async def get_asynctask_status(request, taskid): - env = ServerEnv() - async with get_sor_context(env, 'llmage') as sor: - recs = await sor.R('llmusage', {'taskid': taskid}) - if recs: - r = recs[0] - output = await get_lastoutput(r.ioinfo) - t = timestampAdd(r.use_time, 600) - now = time.time() - if r.status not in ['UNKNOWN', 'FAILED', 'SUCCEEDED'] and now > t: - asyncio.create_task(query_task_status(request, r.id)) - return output - return { - 'taskid': taskid, - 'status': 'FAILED', - 'error': f'taskid={taskid} not exist' - } - return { - 'taskid': taskid, - 'status': 'FAILED', - 'error': f'system error' - } - -async def async_uapi_request(request, llm, - callerid, callerorgid, params_kw=None): - env = request._run_ns.copy() - if not params_kw: - params_kw = env.params_kw - # callerorgid = await env.get_userorgid() - # callerid = await env.get_user() - uapi = env.UpAppApi(request) - userid = await env.uapi_data.get_calluserid(llm.upappid, orgid=llm.ownerid) - b = None - luid = getID() - try: - start_timestamp = time.time() - if llm.callbackurl: - params_kw.callbackurl = llm.callbackurl - - b = None - try: - b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw) - except Exception as e: - estr = erase_apikey(e) - ed = {"error": f"ERROR:{estr}", "status": "FAILED"} - exception(f'{ed}') - yield f'{ed}\n' - return - if isinstance(b, bytes): - b = b.decode('utf-8') - debug(f'task submited:{b}') - d = DictObject(**json.loads(b)) - responsed_seconds = time.time() - start_timestamp - finish_seconds = responsed_seconds - llmusage = DictObject() - llmusage.id = luid - llmusage.llmid = llm.id - llmusage.use_date = curDateString() - llmusage.use_time = timestampstr() - llmusage.userid = callerid - ioinfo = { - "input": params_kw, - 'output': [d] - } - webpath = await write_llmio(llmusage.id, ioinfo) - llmusage.ioinfo = webpath - llmusage.taskid = d.taskid - llmusage.transno = params_kw.transno - llmusage.responsed_seconds = responsed_seconds - llmusage.finish_seconds = finish_seconds - llmusage.status = d.status - llmusage.userorgid = callerorgid - llmusage.ownerid = llm.ownerid - llmusage.accounting_status = 'created' - b = json.dumps(d, ensure_ascii=False) - yield b - await write_llmusage(llmusage) - # if llm.callbackurl: - # return - if d.status == 'FAILED': - e = Exception(f'resp={d} FFAILED') - return - asyncio.create_task(query_task_status(request, luid)) - - except Exception as e: - ed = {"error": f"ERROR:{e}", "status": "FAILED"} - s = json.dumps(ed, ensure_ascii=False) - s = ''.join(s.split('\n')) - exception(s) - yield f'{s}\n' - return - -async def modify_llmusage(ns): - env = ServerEnv() - async with get_sor_context(env, 'llmage') as sor: - await sor.U('llmusage', ns.copy()) - -async def get_llm_llmusage(luid): - env = ServerEnv() - async with get_sor_context(env, 'llmage') as sor: - recs = await sor.R('llmusage', {'id': luid}) - if len(recs) == 0: - e = Exception(f'{luid=} is not found in llmusage') - exception(f'{e}') - raise e - llmusage = recs[0] - if llmusage.status == 'UNKNOWN': - return - if llmusage.status == 'SUCCEEDED': - return - if llmusage.status == 'FAILED': - return - llms = await sor.R('llm', {'id': llmusage.llmid}) - if len(llms) == 0: - e = Exception(f'{llmusage.llmid=} not found in llm') - exception(f'{e}') - raise e - llm = llms[0] - return llm, llmusage - -async def query_task_status(request, luid, onetime=False): - env = ServerEnv() - uapi = env.UpAppApi(request) - llm, llmusage = await get_llm_llmusage(luid) - userid = await env.uapi_data.get_calluserid(llm.upappid, orgid=llm.ownerid) - taskid = llmusage.taskid - upappid = llm.upappid - apinames = llm.query_apiname.split(',') - - for apiname in apinames: - while True: - lastoutout = await get_lastoutput(llmusage.ioinfo) - if lastoutout['status'] in ['UNKNOWN', 'FAILED', 'SUCCEEDED']: - critical(f"{lastoutout['status']=}") - return - ns = {'taskid': taskid} - new_output = b = d = None - try: - b = await uapi.call(upappid, apiname, userid, params=ns) - if isinstance(b, bytes): - b = b.decode('utf-8') - new_output = json.loads(b) - except Exception as e: - exception(f'{e}, {b=}') - new_output = { - 'status': 'FAILED', - 'error': f'{b},{e}' - } - if not new_output.get('status'): - e = Exception(f"{new_output=} {upappid=}, {apiname=} has not status field") - critical(f'{e}') - raise e - if lastoutout['status'] != new_output.get('status'): - llmusage.status = new_output['status'] - ns = { - 'id': llmusage.id, - 'status': llmusage.status - } - if 'usage' in new_output.keys(): - ns['usages'] = json.dumps(new_output['usage']) - await append_new_llmoutput(llmusage.ioinfo, new_output) - await modify_llmusage(ns) - if llmusage.status in ['UNKNOWN', 'FAILED', 'SUCCEEDED']: - critical(f'finished .. {llmusage.status=}') - return - - if onetime: - critical(f'onetime is true, returned') - return - await asyncio.sleep(llm.query_period or 30) - critical(f'{llm.query_period=} seconds will retry, {new_output["status"]=}') - diff --git a/build/lib/llmage/callback.py b/build/lib/llmage/callback.py deleted file mode 100644 index afef55e..0000000 --- a/build/lib/llmage/callback.py +++ /dev/null @@ -1,33 +0,0 @@ -from ahserver.serverenv import ServerEnv -from appPublic.dictObject import DictObject -from sqlor.dbpools import get_sor_context - -async def asynctask_callback(appname, apiname, params_kw) - env = ServerEnv() - llmusage = None - async with get_sor_context(env, 'llmage') as sor: - uapi = await env.sor_get_uapi_by_appname_apiname(appname, apiname) - try: - dstr = await env.tmpl_engine.renders(uapi.response, params_kw) - d = DictObject(**json.loads(dstr)) - llmus = await sor.R('llmusage', {'taskid': d.taskid}) - if len(llmus) == 0: - e = Exception(f'{d=}, {taskid=} not found') - exception(f'{e}') - raise e - llmusage = llmus[0] - io = json.loads(llmusage.ioinfo) - out = io.get('output') - out.append(d) - llmusage.ioinfo = json.dumps(io, ensure_ascii=False) - llmusage.status = d.status - await sor.U('llmusage', llmusage) - - except Exception as e: - e = Exception(f'{uapi.response=}, {params_kw=} render error') - exception(f'{e}') - raise e - - if llmusage: - await llm_accounting(llmusage) - diff --git a/build/lib/llmage/init.py b/build/lib/llmage/init.py deleted file mode 100644 index e0f3080..0000000 --- a/build/lib/llmage/init.py +++ /dev/null @@ -1,68 +0,0 @@ -import asyncio -from appPublic.registerfunction import RegisterFunction -from sqlor.dbpools import DBPools -from ahserver.serverenv import ServerEnv -from appPublic.log import debug -from .keling import keling_token -from .jimeng import jimeng_auth_headers -from .utils import ( - llm_query_orders, - read_webpath, - llm_query_price, - get_llm_by_model, - get_llms_by_catelog, - get_llms_sort_by_provider, - get_llmcatelogs, - get_llms_by_catelog_to_customer, - get_llmproviders, - get_llm, -) - -from .llmclient import ( - inference_generator, - inference -) -from .accounting import ( - checkCustomerBalance, - llm_charging, - get_accounting_llmusages, - backend_accounting, - llm_accounting, - backup_accounted_llmusage, - get_failed_accounting_records, - llm_accoung_failed -) - -from .asyncinference import ( - get_asynctask_status, - query_task_status, - get_today_asynctask_list -) - -def load_llmage(): - env = ServerEnv() - env.llm_query_orders = llm_query_orders - env.read_webpath = read_webpath - env.get_llm_by_model = get_llm_by_model - env.llm_charging = llm_charging - env.get_accounting_llmusages = get_accounting_llmusages - env.llm_accounting = llm_accounting - env.get_today_asynctask_list = get_today_asynctask_list - env.get_asynctask_status = get_asynctask_status - env.query_task_status = query_task_status - env.get_llm = get_llm - env.inference = inference - env.inference_generator = inference_generator - env.get_llms_by_catelog = get_llms_by_catelog - env.get_llmcatelogs = get_llmcatelogs - env.checkCustomerBalance = checkCustomerBalance - env.get_llmproviders = get_llmproviders - env.get_llms_sort_by_provider = get_llms_sort_by_provider - env.keling_token = keling_token - env.llm_query_price = llm_query_price - env.get_llms_by_catelog_to_customer = get_llms_by_catelog_to_customer - env.backup_accounted_llmusage = backup_accounted_llmusage - env.get_failed_accounting_records = get_failed_accounting_records - rf = RegisterFunction() - rf.register('jimeng_auth_headers', jimeng_auth_headers) - diff --git a/build/lib/llmage/jimeng.py b/build/lib/llmage/jimeng.py deleted file mode 100644 index 31ef9cc..0000000 --- a/build/lib/llmage/jimeng.py +++ /dev/null @@ -1,98 +0,0 @@ -import hashlib -import datetime -from datetime import timezone -import hmac -import json -from urllib.parse import quote - -Service = "visual" -Version = "2022-08-31" -Region = "cn-north-1" -Host = "visual.volcengineapi.com" -ContentType = "application/json" - -def utc_now(): - try: - from datetime import timezone - return datetime.datetime.now(timezone.utc) - except ImportError: - class UTC(datetime.tzinfo): - def utcoffset(self, dt): - return datetime.timedelta(0) - def tzname(self, dt): - return "UTC" - def dst(self, dt): - return datetime.timedelta(0) - return datetime.datetime.now(UTC()) - -def jm_timestamp(): - dt = utc_new() - return dt.strftime("%Y%m%dT%H%M%SZ") - -# sha256 非对称加密 -def hmac_sha256(key: bytes, content: str): - return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest() - -# sha256 hash算法 -def hash_sha256(content: str): - return hashlib.sha256(content.encode("utf-8")).hexdigest() - -def jimeng_auth_headers(opts): - apikey = opts.get('apikey') - secretkey = opts.get('secretkey') - path = opts.get('path') - method = opts.get('method') - params = opts.get('params') - body = opts.get('body') - headers = opts.get('headers') - content_type = headers.get('Content-Type') - x_date = jm_timestamp() - short_x_date = DT[:8] - credential = { - "access_key_id": apikey, - "secret_access_key": secretkey, - "service": Service, - "region": Region, - } - x_content_sha256 = hash_sha256(body) - sign_result = { - "Host": Host, - "X-Content-Sha256": x_content_sha256, - "X-Date": x_date, - "Content-Type": ContentType - } - headers.update(sign_result) - signed_headers_str = ";".join( - ["content-type", "host", "x-content-sha256", "x-date"] - ) - canonical_request_str = "\n".join( - [method.upper(), - path, - norm_query(params), - "\n".join( - [ - "content-type:" + content_type, - "host:" + Host, - "x-content-sha256:" + x_content_sha256, - "x-date:" + x_date, - ] - ), - "", - signed_headers_str, - x_content_sha256, - ] - ) - hashed_canonical_request = hash_sha256(canonical_request_str) - credential_scope = "/".join([short_x_date, credential["region"], credential["service"], "request"]) - string_to_sign = "\n".join(["HMAC-SHA256", x_date, credential_scope, hashed_canonical_request]) - k_date = hmac_sha256(secretkey.encode("utf-8"), short_x_date) - k_region = hmac_sha256(k_date, credential["region"]) - k_service = hmac_sha256(k_region, credential["service"]) - k_signing = hmac_sha256(k_service, "request") - signature = hmac_sha256(k_signing, string_to_sign).hex() - headers['Authorization'] = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format( - apikey + "/" + credential_scope, - signed_headers_str, - signature, - ) - diff --git a/build/lib/llmage/keling.py b/build/lib/llmage/keling.py deleted file mode 100644 index e7e5eeb..0000000 --- a/build/lib/llmage/keling.py +++ /dev/null @@ -1,18 +0,0 @@ -import time -import jwt - -ak = "" # 填写access key -sk = "" # 填写secret key - -def keling_token(ak, sk): - headers = { - "alg": "HS256", - "typ": "JWT" - } - payload = { - "iss": ak, - "exp": int(time.time()) + 1800, # 有效时间,此处示例代表当前时间+1800s(30min) - "nbf": int(time.time()) - 5 # 开始生效的时间,此处示例代表当前时间-5秒 - } - token = jwt.encode(payload, sk, headers=headers) - return token diff --git a/build/lib/llmage/llmclient.py b/build/lib/llmage/llmclient.py deleted file mode 100644 index 6ed461c..0000000 --- a/build/lib/llmage/llmclient.py +++ /dev/null @@ -1,148 +0,0 @@ -import json -import time -import asyncio -from random import randint -from functools import partial -from traceback import format_exc -from appPublic.log import debug, exception, error -from appPublic.uniqueID import getID -from appPublic.dictObject import DictObject -from appPublic.timeUtils import curDateString, timestampstr -from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64 -from ahserver.serverenv import get_serverenv, ServerEnv -from ahserver.filestorage import FileStorage -from .asyncinference import async_uapi_request -from .syncinference import sync_uapi_request -from .accounting import llm_accounting, llm_charging -from .utils import * - -async def uapi_request(request, llm, callerid, callerorgid, params_kw=None): - env = request._run_ns.copy() - if not params_kw: - params_kw = env.params_kw - # callerorgid = await env.get_userorgid() - # callerid = await env.get_user() - uapi = env.UpAppApi(request) - userid = await env.uapi_data.get_calluserid(llm.upappid, orgid=llm.ownerid) - outlines = [] - txt = '' - luid = getID() - try: - start_timestamp = time.time() - responsed_seconds = None - finish_seconds = None - first = True - usage = None - async for l in uapi.stream_linify(llm.upappid, llm.apiname, userid, - params=params_kw): - if first: - first = False - responsed_seconds = time.time() - start_timestamp - if isinstance(l, bytes): - l = l.decode('utf-8') - if l[-1] == '\n': - l = l[:-1] - debug(f'stream response line={l},{type(l)}') - l = ''.join(l.split('\n')) - if l and l != '[DONE]': - yield_it = False - d = {} - try: - d = json.loads(l) - except Exception as e: - debug(f'json.loads({l}) error({e})') - continue - if d.get('reasoning_content'): - txt += d.get('reasoning_content') - yield_it = True - if d.get('content'): - txt = txt + d['content'] - yield_it = True - if d.get('usage'): - usage = d['usage'] - d['llmusageid'] = luid - outlines.append(d) - yield json.dumps(d, ensure_ascii=False) + '\n' - if usage is None: - error(f'{llm=} response has not usage') - finish_seconds = time.time() - start_timestamp - if responsed_seconds is None: - responsed_seconds = finish_seconds - llmusage = DictObject() - llmusage.id = luid - llmusage.llmid = llm.id - llmusage.use_date = curDateString() - llmusage.use_time = timestampstr() - llmusage.userid = callerid - llmusage.usages = json.dumps(usage, ensure_ascii=False, indent=4) - debug(f' {usage=}, {type(usage)=}, {llmusage.usages=}') - ioinfo = { - "input": params_kw, - 'output': outlines - } - webpath = await write_llmio(llmusage.id, ioinfo) - llmusage.ioinfo = webpath - llmusage.transno = params_kw.transno - llmusage.responsed_seconds = responsed_seconds - llmusage.finish_seconds = finish_seconds - llmusage.status = 'SUCCEEDED' - llmusage.userorgid = callerorgid - llmusage.ownerid = llm.ownerid - llmusage.accounting_status = 'created' - await write_llmusage(llmusage) - except Exception as e: - exception(f'{e=},{format_exc()}') - estr = erase_apikey(e) - ed = {"error": f"ERROR:{estr}", "status": "FAILED" ,"llmusageid": luid} - s = json.dumps(ed, ensure_ascii=False) - s = ''.join(s.split('\n')) - outlines.append(ed) - yield f'{s}\n' - return - -async def inference_generator(request, *args, params_kw=None, **kw): - env = request._run_ns.copy() - callerorgid = await env.get_userorgid() - callerid = await env.get_user() - async for d in _inference_generator(request, callerid, - callerorgid, params_kw=params_kw, **kw): - yield d - -async def _inference_generator(request, callerid, callerorgid, - params_kw={}, **kw): - env = request._run_ns - if not params_kw: - params_kw = env.params_kw - if not params_kw.transno: - params_kw.transno = getID() - llmid = params_kw.llmid - f = None - llm = await get_llm(llmid) - if llm is None: - errmsg = f'{{"status": "FAILED", "error":"llmid:{llmid}没找到模型"}}\n' - exception(errmsg) - yield errmsg - return - if not params_kw.model: - params_kw.model = llm.model - if llm.stream == 'async': - if llm.callbackurl: - cb_url = env.entire_url(llm.callbackurl) - params_kw.callbackurl = cb_url - f = partial(async_uapi_request, request, llm, callerid, callerorgid, params_kw=params_kw) - elif not params_kw.stream: - llm.stream = False - debug(f'---{params_kw.stream=}, {llm.stream=} ---use sync_uapi_request ') - f = partial(sync_uapi_request, request, llm, callerid, callerorgid, params_kw=params_kw) - else: - llm.stream = True - debug(f'---{params_kw.stream=}, {llm.stream=} ---use uapi_request ') - f = partial(uapi_request, request, llm, callerid, callerorgid, params_kw=params_kw) - async for d in f(): - yield d - -async def inference(request, *args, params_kw=None, **kw): - env = request._run_ns.copy() - f = partial(inference_generator, request, *args, params_kw=params_kw, **kw) - return await env.stream_response(request, f) - diff --git a/build/lib/llmage/messages.py b/build/lib/llmage/messages.py deleted file mode 100644 index d1aad29..0000000 --- a/build/lib/llmage/messages.py +++ /dev/null @@ -1,69 +0,0 @@ -from appPublic.myTE import MyTemplateEngine - -def default_sysmessage(): - return """{ - "role":"system", - "content":"{{content}}" - }""" - -def default_usrmessage(): - return """{ - "role":"user", - "content":"{{prompt}}" - }""" - -def default_llmmessage(): - return """{ - "role":"assisant", - "content":"{{content}}" - }""" - -class BaseMessages: - def __init__(self, request, llmid, sys_message, usr_message, llm_message): - self.request = request - self.llmid = llmid - self.sys_message = sys_message - self.usr_message = usr_message - self.llm_message = llm_message - self.te = MyTemplateEngine([]) - - async def append_meessages(self, msg_format, **kw): - m = self.te.renders(msg_format, kw) - msgs = await self.get_messages() - msgs.append(m) - await self.set_message(msgs) - return msgs - - async def append_usr_messages(self, **kw): - return await self.append_messages(self.usr_message, **kw) - - async def append_sys_messages(self, **kw): - return await self.append_messages(self.sys_message, **kw) - - async def append_llm_messages(self, **kw): - return await self.append_messages(self.llm_message, **kw) - - async def get_messages(self): - return [] - - async def set_messages(self, msgs): - pass - -class SessionMessages(BaseMessages): - async def get_messages(self): - env = self.request._run_ns - s = await env.get_session() - userid = await env.get_user() - mk = f'{self.llmid}_{userid}_msgs' - msgs = s[mk] - if not msgs: - msgs = [] - return msgs - - async def set_messages(self, msgs): - env = self.request._run_ns - s = await env.get_session() - userid = await env.get_user() - mk = f'{self.llmid}_{userid}_msgs' - s[mk] = msgs - diff --git a/build/lib/llmage/syncinference.py b/build/lib/llmage/syncinference.py deleted file mode 100644 index a524be1..0000000 --- a/build/lib/llmage/syncinference.py +++ /dev/null @@ -1,98 +0,0 @@ -import json -import time -import asyncio -from random import randint -from functools import partial -from traceback import format_exc -from sqlor.dbpools import DBPools, get_sor_context -from appPublic.log import debug, exception, error -from appPublic.uniqueID import getID -from appPublic.dictObject import DictObject -from appPublic.timeUtils import curDateString, timestampstr -from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64 -# from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi -from ahserver.serverenv import get_serverenv, ServerEnv -from ahserver.filestorage import FileStorage -from .accounting import llm_accounting, llm_charging -from .utils import * - -async def sync_uapi_request(request, llm, callerid, callerorgid, params_kw=None): - env = request._run_ns.copy() - if not params_kw: - params_kw = env.params_kw - # callerid = await env.get_user() - # callerorgid = await env.get_userorgid() - # uapi = UAPI(request, sor=sor) - uapi = env.UpAppApi(request) - userid = await env.uapi_data.get_calluserid(llm.upappid, orgid=llm.ownerid) - outlines = [] - b = None - d = None - luid = getID() - try: - start_timestamp = time.time() - responsed_seconds = None - finish_seconds = None - b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw) - if isinstance(b, bytes): - b = b.decode('utf-8') - d = json.loads(b) - status = d.get('status') - usage = d.get('usage') - if status and status != 'SUCCEEDED': - raise Exception(d['error']) - responsed_seconds = time.time() - start_timestamp - finish_seconds = responsed_seconds - llmusage = DictObject() - llmusage.id = luid - llmusage.llmid = llm.id - llmusage.use_date = curDateString() - llmusage.use_time = timestampstr() - llmusage.userid = callerid - llmusage.usages = json.dumps(usage, ensure_ascii=False) - ioinfo = { - "input": params_kw, - 'output': [d] - } - webpath = await write_llmio(llmusage.id, ioinfo) - llmusage.ioinfo = webpath - llmusage.transno = params_kw.transno - llmusage.responsed_seconds = responsed_seconds - llmusage.finish_seconds = finish_seconds - llmusage.status = 'SUCCEEDED' - llmusage.amount = llmusage.cost = 0.00 - """ 联机不记账 - if llm.ppid: - try: - charging = await llm_charging(llm.ppid, llmusage) - if charging: - llmusage.amount = charging.amount - llmusage.cost = charging.cost - else: - llmusage.amount = llmusage.cost = 0.0 - except Exception as e: - e = Exception(f'{llm.pid} charging error{e}') - exception(f'{e}') - else: - llmusage.amount = 0 - llmusage.cost = 0 - """ - llmusage.userorgid = callerorgid - llmusage.ownerid = llm.ownerid - llmusage.accounting_status = 'created' - b = json.dumps(d, ensure_ascii=False) - yield b - await write_llmusage(llmusage) - """联机不记账 - if llmusage.amount > 0.0001: - await llm_accounting(llmusage) - """ - except Exception as e: - exception(f'{e=},{format_exc()}, {b=}') - estr = erase_apikey(e) - ed = {"error": f"ERROR:{estr}", "status": "FAILED" ,"llmusageid": luid} - s = json.dumps(ed, ensure_ascii=False) - s = ''.join(s.split('\n')) - outlines.append(ed) - yield f'{s}\n' - diff --git a/build/lib/llmage/utils.py b/build/lib/llmage/utils.py deleted file mode 100644 index 3d00cb6..0000000 --- a/build/lib/llmage/utils.py +++ /dev/null @@ -1,348 +0,0 @@ -import json -import time -import asyncio -import aiofiles -from random import randint -from functools import partial -from traceback import format_exc -from sqlor.dbpools import DBPools, get_sor_context -from appPublic.log import debug, exception, error, critical -from appPublic.uniqueID import getID -from appPublic.dictObject import DictObject -from appPublic.timeUtils import curDateString, timestampstr -from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi -from ahserver.serverenv import get_serverenv, ServerEnv -from ahserver.filestorage import FileStorage -from appPublic.jsonConfig import getConfig -from appPublic.streamhttpclient import StreamHttpClient - -async def update_llmusage(ns): - env = ServerEnv() - async with get_sor_context(env, 'llmage') as sor: - await sor.U('llmusage', ns) - -async def get_user_tpac(userid): - env = ServerEnv() - config = getConfig() - async with get_sor_context(env, 'rbac') as sor: - recs = await sor.R('users', {'id': userid}) - if recs: - tpac = config.tpacs.get(recs[0].sync_from) - return tpac - return None - -async def get_tpac_balance(tpac, userid): - url = tpac.get_tpac_balance_url - hc = StreamHttpClient() - try: - b = await hc.request('GET', url, params={'userid': userid}) - if b: - d = json.loads(b.decode('utf-8')) - if d['status'] == 'ok': - return d['balance'] - exception(f'{url=}, {userid=}, {b} error') - return None - except Exception as e: - exception(f'{url=}, {userid=}, error:{e}') - return None - -async def tpac_accounting(tpac, userid, llmid, amount, usage, luid): - url = tpac.tpac_accounting_url - hc = StreamHttpClient() - d = { - 'userid': userid, - 'llmid': llmid, - 'amount': amount, - 'usage': usage - } - status = 'failed' - try: - b = await hc.request('POST', url, data=d) - d = json.loads(b.decode('utf-8')) - if d['status'] == 'ok': - debug(f'{d=}') - await update_llmusage({'id': luid, 'accounting_status': 'accounted'}) - return - raise Exception(f'{d} tpac accounting error') - except Exception as e: - exception(f'{userid=}, {llmid=}, {amount=}, {usage=} tpac accounting error:{e}') - raise e - -async def append_new_llmoutput(webpath, output): - fs = FileStorage() - p = fs.realPath(webpath) - if isinstance(output, str): - output = json.loads(output) - bin = await read_webpath(webpath) - io = json.loads(bin.decode('utf-8')) - io['output'].append(output) - async with aiofiles.open(p, 'wb') as f: - iostr = json.dumps(io, ensure_ascii=False, indent=4) - await f.write(iostr.encode('utf-8')) - -async def get_lastoutput(webpath): - bin = await read_webpath(webpath) - io = json.loads(bin.decode('utf-8')) - return io['output'][-1] - -async def read_webpath(webpath): - fs = FileStorage() - p = fs.realPath(webpath) - async with aiofiles.open(p,'rb') as f: - bin = await f.read() - return bin - -async def write_llmio(luid, io_dic): - fs = FileStorage() - s = io_dic - if not isinstance(io_dic, str): - s = json.dumps(io_dic, ensure_ascii=False, indent=4) - name = f'{luid}.json' - webpath = await fs.save(name, s, userid='llmio') - return webpath - -async def llm_query_orders(userorgid, page, pagerows=80): - env = ServerEnv() - async with get_sor_context(env, 'llmage') as sor: - sql = """select a.llmid, -a.use_date, -a.use_time, -a.userid, -a.usages, -a.status, -a.amount, -a.userorgid, -a.accounting_status, -b.name, -b.model -from llmusage a, llm b -where userorgid = ${userorgid}$ - and a.llmid = b.id -""" - ns = dict( - page=page, - pagerows=pagerows, - sort="use_time desc", - userorgid=userorgid) - - data = await sor.sqlExe(sql, ns) - return data - return {'total': 0, 'rows':[]} - -async def get_llm_by_model(id, lctype=None): - env = ServerEnv() - async with get_sor_context(env, 'llmage') as sor: - sql = 'select * from llm where model=${model}$' - recs = await sor.R('llm', {'model': model}) - return recs - -def erase_apikey(e): - e = str(e) - ss = e.split('Bearer ') - if len(ss) < 2: - return e - - for i, c in enumerate(ss[1]): - if c in ['"', "'"]: - newb = "XXXXXXXX" + ss[1][i:] - break - return ss[0] + 'Bearer ' + newb - -async def get_llmproviders(): - env = ServerEnv() - async with get_sor_context(env, 'llmage') as sor: - sql = """select a.providerid, a.iconid, b.orgname -from llm a, organization b -where a.providerid = b.id -group by a.providerid, a.iconid, b.orgname""" - return await sor.sqlExe(sql, {}) - return [] - -async def get_llms_sort_by_provider(): - env = ServerEnv() - async with get_sor_context(env, 'llmage') as sor: - today = curDateString() - sql = """select a.*, b.orgname from llm a, organization b -where a.enabled_date <= ${today}$ - and a.expired_date > ${today}$ - and a.providerid = b.id - order by a.providerid, a.id - """ - recs = await sor.sqlExe(sql, {'today': today}) - d = [] - x = None - oldpid = '-111' - for l in recs: - if l.providerid != oldpid: - x = { - 'id': l.providerid, - 'orgname': l.orgname, - 'llms': [l] - } - d.append(x) - oldpid = l.providerid - else: - x['llms'].append(l) - return d - return [] - -async def get_llmcatelogs(): - db = DBPools() - dbname = get_serverenv('get_module_dbname')('llmage') - async with db.sqlorContext(dbname) as sor: - recs = await sor.R('llmcatelog', {}) - return recs - - return [] - -async def get_llms_by_catelog_to_customer(catelogid=None, orderby='providerid'): - env = ServerEnv() - async with get_sor_context(env, 'llmage') as sor: - today = curDateString() - # Join with llm_api_map to get catalog relationship - sql = """select distinct a.*, -b.name as catelogname, -m.llmcatelogid as catelog_id, -m.apiname, -m.query_apiname, -m.query_period, -m.ppid - from llm a - join llm_api_map m on a.id = m.llmid - join llmcatelog b on m.llmcatelogid = b.id - where a.enabled_date <= ${today}$ - and m.ppid is not null - and a.expired_date > ${today}$ - """ - sortstr='catelog_id, ' + orderby - params = {'today': today, 'sort': sortstr} - if catelogid: - sql += " and m.llmcatelogid = ${catelogid}$" - params['catelogid'] = catelogid - - debug(f'{sql=}') - recs = await sor.sqlExe(sql, params.copy()) - debug(f'{sql=}, {recs=}, {params=}') - d = [] - cid = '' - x = None - for r in recs: - if cid != r.catelog_id: - x = { - 'catelogid': r.catelog_id, - 'catelogname': r.catelogname, - 'llms': [r] - } - d.append(x) - cid = r.catelog_id - else: - x['llms'].append(r) - return d - return [] - -async def get_llms_by_catelog(catelogid=None, orderby='providerid'): - env = ServerEnv() - async with get_sor_context(env, 'llmage') as sor: - today = curDateString() - # Join with llm_api_map to get catalog relationship - sql = """select distinct a.*, b.name as catelogname, m.llmcatelogid as catelog_id - from llm a - join llm_api_map m on a.id = m.llmid - join llmcatelog b on m.llmcatelogid = b.id - where a.enabled_date <= ${today}$ - and a.expired_date > ${today}$""" - params = {'today': today, 'sort': orderby} - if catelogid: - sql += " and m.llmcatelogid = ${catelogid}$" - params['catelogid'] = catelogid - - sql += " order by m.llmcatelogid, a.id" - - recs = await sor.sqlExe(sql, params) - d = [] - cid = '' - x = None - for r in recs: - if cid != r.catelog_id: - x = { - 'catelogid': r.catelog_id, - 'catelogname': r.catelogname, - 'llms': [r] - } - d.append(x) - cid = r.catelog_id - else: - x['llms'].append(r) - return d - return [] - -async def get_llm(llmid, catelogid=None): - today = curDateString() - env = ServerEnv() - async with get_sor_context(env, 'llmage') as sor: - sql = """select a.id, -a.name, -a.model, -a.providerid, -a.description, -a.iconid, -a.upappid, -a.ownerid, -a.min_balance, -m.llmcatelogid, -m.apiname, -m.query_apiname, -m.query_period, -m.ppid, -e.ioid, -e.stream, -e.callbackurl, -f.input_fields, -lc.name as catelogname -from llm a -,llm_api_map m -,llmcatelog lc -,upapp c -,uapi e -,uapiio f -where a.id = m.llmid -and a.upappid = c.id -and c.id = e.upappid -and m.apiname = e.name -and e.ioid = f.id -and a.id = ${llmid}$ -and a.expired_date > ${today}$ -and a.enabled_date <= ${today}$ -""" - ns = {'llmid': llmid, 'today': today} - if catelogid: - sql += ' and m.llmcatelogid = ${catelogid}$ ' - ns['catelogid'] = catelogid - else: - sql += " and m.isdefaultcatelog = '1'" - recs = await sor.sqlExe(sql, ns.copy()) - if len(recs) > 0: - r = recs[0] - return r - else: - debug(f'{llmid=} not found, {ns=}, {sql=}') - return None - exception(f'Error: {format_exc()}') - return None - - -async def write_llmusage(llmusage): - env = ServerEnv() - async with get_sor_context(env, 'llmage') as sor: - await sor.C('llmusage', llmusage) - -async def llm_query_price(llmid, config_data): - env = ServerEnv() - llm = await get_llm(llmid) - if llm.ppid is None: - e = Exception(f'{llm=} ppid is None') - exception(f'{e}') - raise e - prices = await env.buffered_charging(llm.ppid, config_data) - return prices -