refactor: add get_llmage_llm() for non-API-call llm data access, replace get_llm() in accounting and pricing
This commit is contained in:
parent
cef4859574
commit
93ec47f198
0
build/lib/llmage/__init__.py
Normal file
0
build/lib/llmage/__init__.py
Normal file
369
build/lib/llmage/accounting.py
Normal file
369
build/lib/llmage/accounting.py
Normal file
@ -0,0 +1,369 @@
|
|||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
from appPublic.log import exception, debug
|
||||||
|
from appPublic.uniqueID import getID
|
||||||
|
from appPublic.dictObject import DictObject
|
||||||
|
from sqlor.dbpools import get_sor_context
|
||||||
|
from ahserver.serverenv import ServerEnv
|
||||||
|
from accounting.consume import consume_accounting
|
||||||
|
from accounting.getaccount import getCustomerBalance
|
||||||
|
from .utils import *
|
||||||
|
|
||||||
|
async def llm_charging(ppid, llmusage):
|
||||||
|
env = ServerEnv()
|
||||||
|
usages = llmusage.usages
|
||||||
|
if isinstance(usages, str):
|
||||||
|
usages = json.loads(usages)
|
||||||
|
prices = await env.buffered_charging(ppid, usages)
|
||||||
|
if prices is None:
|
||||||
|
e = Exception(f'{ppid=}, {usages=}{llmusage.id=} env.buffered_charging() return None')
|
||||||
|
exception(f'{e}')
|
||||||
|
raise e
|
||||||
|
return None
|
||||||
|
amount = 0
|
||||||
|
cost = 0
|
||||||
|
for p in prices:
|
||||||
|
amount += p.amount
|
||||||
|
if p.cost:
|
||||||
|
cost += p.cost
|
||||||
|
discount = await env.get_customer_discount(llmusage.ownerid,
|
||||||
|
llmusage.userorgid)
|
||||||
|
return DictObject(**{
|
||||||
|
'original_amount': amount,
|
||||||
|
'amount': amount * discount,
|
||||||
|
'cost': cost
|
||||||
|
})
|
||||||
|
|
||||||
|
async def checkCustomerBalance(llmid, userid, userorgid, catelogid=None):
|
||||||
|
if llmid is None:
|
||||||
|
debug(f'checkCustomerBalance(): llmid is None')
|
||||||
|
return False
|
||||||
|
env = ServerEnv()
|
||||||
|
llm = await get_llm(llmid)
|
||||||
|
if llm.ownerid == userorgid:
|
||||||
|
debug(f'self orgid user')
|
||||||
|
return True
|
||||||
|
balance = 0.00
|
||||||
|
tpac = await get_user_tpac(userid)
|
||||||
|
if tpac:
|
||||||
|
debug(f'{tpac=}, get tpac balance')
|
||||||
|
balance = await get_tpac_balance(tpac, userid)
|
||||||
|
else:
|
||||||
|
debug(f'{tpac=}, get local balance')
|
||||||
|
async with get_sor_context(env, 'accounting') as sor:
|
||||||
|
balance = await getCustomerBalance(sor, userorgid)
|
||||||
|
bal = 0 if balance is None else balance
|
||||||
|
if llm.min_balance is None:
|
||||||
|
llm.min_balance = 0.00
|
||||||
|
ret = llm.ppid and llm.min_balance < bal
|
||||||
|
debug(f'{llm.ppid=}, {llm.min_balance=}, {bal=}')
|
||||||
|
return ret
|
||||||
|
|
||||||
|
async def llm_accounting(llmusage):
|
||||||
|
env = ServerEnv()
|
||||||
|
llmid = llmusage.llmid
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
sql = """select a.*, b.ppid from llm a, llm_api_map b
|
||||||
|
where a.id=${llmid}$
|
||||||
|
and a.id = b.llmid
|
||||||
|
and b.isdefaultcatelog = '1'
|
||||||
|
"""
|
||||||
|
recs = await sor.sqlExe(sql, {'llmid': llmusage.llmid})
|
||||||
|
if len(recs) == 0:
|
||||||
|
ns = {
|
||||||
|
'id': llmusage.id,
|
||||||
|
'accounting_status': 'failed'
|
||||||
|
}
|
||||||
|
await sor.U('llmusage', ns)
|
||||||
|
e = Exception(f'llm not found({llmid})')
|
||||||
|
exception(f'{e}')
|
||||||
|
raise e
|
||||||
|
if recs[0].ppid is None:
|
||||||
|
ns = {
|
||||||
|
'id': llmusage.id,
|
||||||
|
'accounting_status': 'failed'
|
||||||
|
}
|
||||||
|
await sor.U('llmusage', ns)
|
||||||
|
e = Exception(f'llm ({llmid}) donot has a pricing_program')
|
||||||
|
exception(f'{e}')
|
||||||
|
raise e
|
||||||
|
customerid = llmusage.userorgid
|
||||||
|
userid = llmusage.userid
|
||||||
|
resellerid = recs[0].ownerid
|
||||||
|
providerid = recs[0].providerid
|
||||||
|
trans_amount = llmusage.amount
|
||||||
|
trans_cost = llmusage.cost
|
||||||
|
biz_date = await env.get_business_date(sor)
|
||||||
|
timestamp = env.timestampstr()
|
||||||
|
orderid = getID()
|
||||||
|
order = {
|
||||||
|
"id": orderid,
|
||||||
|
"customerid": customerid,
|
||||||
|
"resellerid": resellerid,
|
||||||
|
"order_date": biz_date,
|
||||||
|
"order_status": "1", # accounted
|
||||||
|
"business_op": "PAY",
|
||||||
|
"amount": trans_amount,
|
||||||
|
"userid": userid,
|
||||||
|
"productid": llmid
|
||||||
|
}
|
||||||
|
await sor.C('biz_order', order)
|
||||||
|
orderdetail = {
|
||||||
|
"id": getID(),
|
||||||
|
"orderid": orderid,
|
||||||
|
"productid": llmid,
|
||||||
|
"product_cnt": 1,
|
||||||
|
"trans_amount": trans_amount
|
||||||
|
}
|
||||||
|
await sor.C('biz_orderdetail', orderdetail)
|
||||||
|
ais = []
|
||||||
|
if customerid != resellerid:
|
||||||
|
ai0 = DictObject()
|
||||||
|
ai0.action = 'PAY'
|
||||||
|
ai0.customerid = customerid
|
||||||
|
ai0.resellerid = resellerid
|
||||||
|
ai0.providerid = providerid
|
||||||
|
ai0.biz_date = biz_date
|
||||||
|
ai0.timestamp = timestamp
|
||||||
|
ai0.productid = llmid
|
||||||
|
ai0.transamt = trans_amount
|
||||||
|
ai0.variable = {
|
||||||
|
"交易金额": trans_amount,
|
||||||
|
"交易手续费": 0
|
||||||
|
}
|
||||||
|
ais.append(ai0)
|
||||||
|
ai1 = DictObject()
|
||||||
|
ai1.action = 'PAY*'
|
||||||
|
ai1.customerid = customerid
|
||||||
|
ai1.resellerid = resellerid
|
||||||
|
ai1.providerid = providerid
|
||||||
|
ai1.biz_date = biz_date
|
||||||
|
ai1.timestamp = timestamp
|
||||||
|
ai1.providerid = providerid
|
||||||
|
ai1.productid = llmid
|
||||||
|
ai1.transamt = trans_cost
|
||||||
|
ai1.variable = {
|
||||||
|
"采购成本": trans_cost
|
||||||
|
}
|
||||||
|
ais.append(ai1)
|
||||||
|
await consume_accounting(sor, orderid, ais)
|
||||||
|
llmusage.accounting_status = 'accounted'
|
||||||
|
ns = {
|
||||||
|
'id': llmusage.id,
|
||||||
|
'accounting_status': 'accounted'
|
||||||
|
}
|
||||||
|
await sor.U('llmusage', ns)
|
||||||
|
|
||||||
|
async def get_accounting_llmusages(luid=None):
|
||||||
|
env = ServerEnv()
|
||||||
|
lus = []
|
||||||
|
t = time.time()
|
||||||
|
dt = datetime.fromtimestamp(t)
|
||||||
|
tsstr = dt.strftime('%Y-%m-%d %H:%M:%S.') + f'{dt.microsecond // 1000:03d}'
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
sql = """select a.*, c.ppid
|
||||||
|
from llmusage a, llm b, llm_api_map c
|
||||||
|
where a.llmid = b.id
|
||||||
|
and a.llmid = c.llmid
|
||||||
|
and c.isdefaultcatelog = '1'
|
||||||
|
and a.status = 'SUCCEEDED'
|
||||||
|
and a.use_time < ${tsstr}$
|
||||||
|
and a.accounting_status='created'"""
|
||||||
|
ns = {'tsstr': tsstr}
|
||||||
|
if luid:
|
||||||
|
sql += " and a.id=${luid}$"
|
||||||
|
ns['luid'] = luid
|
||||||
|
recs = await sor.sqlExe(sql, ns)
|
||||||
|
# debug(f'{sql=}, {ns=}, {len(recs)=}')
|
||||||
|
for r in recs:
|
||||||
|
if r.usages is None:
|
||||||
|
try:
|
||||||
|
output = await get_lastoutput(r.ioinfo)
|
||||||
|
except Exception as e:
|
||||||
|
continue
|
||||||
|
r.usages = output.get('usage')
|
||||||
|
if r.usages is None:
|
||||||
|
debug(f'{r.usages=} is None, accoiunting failed')
|
||||||
|
await llm_accoung_failed(r.id, reason='usages is None')
|
||||||
|
continue
|
||||||
|
d = None
|
||||||
|
try:
|
||||||
|
debug(f'{r.ppid=}, {r.usages=} {r.id=}')
|
||||||
|
d = await llm_charging(r.ppid, r)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
exception(f'{r.ppid=}, {r.usages=} llm_charging() failed,{e}')
|
||||||
|
await llm_accoung_failed(r.id, reason=f'llm_charging failed: {e}')
|
||||||
|
continue
|
||||||
|
r.amount = d.amount
|
||||||
|
r.cost = d.cost
|
||||||
|
ns = {
|
||||||
|
'id': r.id,
|
||||||
|
'amount': r.amount,
|
||||||
|
'cost': r.cost,
|
||||||
|
'usage': json.dumps(r.usage, ensure_ascii=False, indent=4)
|
||||||
|
}
|
||||||
|
await sor.U('llmusage', ns)
|
||||||
|
lus.append(r)
|
||||||
|
return lus
|
||||||
|
|
||||||
|
async def llm_accoung_failed(luid, reason=None):
|
||||||
|
env = ServerEnv()
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
await sor.U('llmusage', {
|
||||||
|
'id': luid,
|
||||||
|
'accounting_status': 'failed'
|
||||||
|
})
|
||||||
|
# Also record in the failed accounting table for tracking
|
||||||
|
recs = await sor.R('llmusage', {'id': luid})
|
||||||
|
if recs:
|
||||||
|
r = recs[0]
|
||||||
|
failed_id = getID()
|
||||||
|
failed_rec = {
|
||||||
|
'id': failed_id,
|
||||||
|
'llmusageid': luid,
|
||||||
|
'llmid': r.llmid,
|
||||||
|
'userid': r.userid,
|
||||||
|
'userorgid': r.userorgid,
|
||||||
|
'use_date': r.use_date,
|
||||||
|
'use_time': r.use_time,
|
||||||
|
'amount': r.amount,
|
||||||
|
'cost': r.cost,
|
||||||
|
'failed_reason': reason or 'accounting failed',
|
||||||
|
'failed_time': env.timestampstr(),
|
||||||
|
'retry_count': 0,
|
||||||
|
'handled': '0'
|
||||||
|
}
|
||||||
|
await sor.C('llmusage_accounting_failed', failed_rec)
|
||||||
|
|
||||||
|
|
||||||
|
async def backup_accounted_llmusage():
|
||||||
|
"""Backup yesterday's accounted records to history table and remove from llmusage."""
|
||||||
|
env = ServerEnv()
|
||||||
|
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
|
||||||
|
ts = env.timestampstr()
|
||||||
|
batched = 0
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
# Select yesterday's accounted records
|
||||||
|
sql = """select * from llmusage
|
||||||
|
where accounting_status='accounted'
|
||||||
|
and use_date < ${yesterday}$"""
|
||||||
|
recs = await sor.sqlExe(sql, {'yesterday': yesterday})
|
||||||
|
if not recs:
|
||||||
|
debug(f'backup_accounted_llmusage: no records to backup for use_date < {yesterday}')
|
||||||
|
return 0
|
||||||
|
debug(f'backup_accounted_llmusage: {len(recs)} records to backup')
|
||||||
|
for r in recs:
|
||||||
|
history_rec = {
|
||||||
|
'id': r.id,
|
||||||
|
'llmid': r.llmid,
|
||||||
|
'use_date': r.use_date,
|
||||||
|
'use_time': r.use_time,
|
||||||
|
'userid': r.userid,
|
||||||
|
'usages': r.usages,
|
||||||
|
'ioinfo': r.ioinfo,
|
||||||
|
'transno': r.transno,
|
||||||
|
'responsed_seconds': r.responsed_seconds,
|
||||||
|
'finish_seconds': r.finish_seconds,
|
||||||
|
'status': r.status,
|
||||||
|
'taskid': r.taskid,
|
||||||
|
'amount': r.amount,
|
||||||
|
'cost': r.cost,
|
||||||
|
'userorgid': r.userorgid,
|
||||||
|
'ownerid': r.ownerid,
|
||||||
|
'accounting_status': r.accounting_status,
|
||||||
|
'backup_time': ts
|
||||||
|
}
|
||||||
|
await sor.C('llmusage_history', history_rec)
|
||||||
|
# Delete from main table
|
||||||
|
await sor.D('llmusage', {'id': r.id})
|
||||||
|
batched += 1
|
||||||
|
debug(f'backup_accounted_llmusage: backed up {batched} records')
|
||||||
|
return batched
|
||||||
|
|
||||||
|
|
||||||
|
async def get_failed_accounting_records(filters=None, page=1, page_size=50):
|
||||||
|
"""Search failed accounting records with optional filters.
|
||||||
|
|
||||||
|
filters: dict with optional keys:
|
||||||
|
- userorgid: filter by user organization
|
||||||
|
- llmid: filter by model ID
|
||||||
|
- handled: '0' or '1'
|
||||||
|
- start_date: filter use_date >= start_date
|
||||||
|
- end_date: filter use_date <= end_date
|
||||||
|
"""
|
||||||
|
async with get_sor_context(ServerEnv(), 'llmage') as sor:
|
||||||
|
conditions = []
|
||||||
|
ns = {}
|
||||||
|
if filters:
|
||||||
|
if filters.get('userorgid'):
|
||||||
|
conditions.append("userorgid=${userorgid}$")
|
||||||
|
ns['userorgid'] = filters['userorgid']
|
||||||
|
if filters.get('llmid'):
|
||||||
|
conditions.append("llmid=${llmid}$")
|
||||||
|
ns['llmid'] = filters['llmid']
|
||||||
|
if filters.get('handled') is not None:
|
||||||
|
conditions.append("handled=${handled}$")
|
||||||
|
ns['handled'] = filters['handled']
|
||||||
|
if filters.get('start_date'):
|
||||||
|
conditions.append("use_date>=${start_date}$")
|
||||||
|
ns['start_date'] = filters['start_date']
|
||||||
|
if filters.get('end_date'):
|
||||||
|
conditions.append("use_date<=${end_date}$")
|
||||||
|
ns['end_date'] = filters['end_date']
|
||||||
|
where = ""
|
||||||
|
if conditions:
|
||||||
|
where = "where " + " and ".join(conditions)
|
||||||
|
# Count total
|
||||||
|
count_sql = f"select count(*) as cnt from llmusage_accounting_failed {where}"
|
||||||
|
count_recs = await sor.sqlExe(count_sql, ns)
|
||||||
|
total = count_recs[0].cnt if count_recs else 0
|
||||||
|
# Query with pagination
|
||||||
|
offset = (page - 1) * page_size
|
||||||
|
query_sql = f"""select * from llmusage_accounting_failed {where}
|
||||||
|
order by failed_time desc limit {page_size} offset {offset}"""
|
||||||
|
recs = await sor.sqlExe(query_sql, ns)
|
||||||
|
return {
|
||||||
|
'total': total,
|
||||||
|
'page': page,
|
||||||
|
'page_size': page_size,
|
||||||
|
'records': recs
|
||||||
|
}
|
||||||
|
|
||||||
|
async def backend_accounting():
|
||||||
|
env = ServerEnv()
|
||||||
|
debug(f'backend accounting started ...')
|
||||||
|
backup_counter = 0
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
lus = await get_accounting_llmusages()
|
||||||
|
except Exception as e:
|
||||||
|
exception(f'{e}')
|
||||||
|
lus = []
|
||||||
|
# debug(f'{len(lus)=} need to accounting........')
|
||||||
|
for lu in lus:
|
||||||
|
try:
|
||||||
|
tpac = await get_user_tpac(lu.userid)
|
||||||
|
if tpac:
|
||||||
|
debug(f'{lu.id=},{lu.userid=}, {tpac=}, go tpac')
|
||||||
|
await tpac_accounting(tpac, lu.userid, lu.llmid, lu.amount, lu.usages, lu.id)
|
||||||
|
else:
|
||||||
|
debug(f'{lu.id=},{lu.userid=}, {tpac=}, go local')
|
||||||
|
await llm_accounting(lu)
|
||||||
|
except Exception as e:
|
||||||
|
exception(f'{e}, {lu.id=}')
|
||||||
|
await llm_accoung_failed(lu.id, reason=str(e))
|
||||||
|
|
||||||
|
# Run backup every 100 iterations (roughly every ~1000 seconds)
|
||||||
|
backup_counter += 1
|
||||||
|
if backup_counter >= 100:
|
||||||
|
backup_counter = 0
|
||||||
|
try:
|
||||||
|
await backup_accounted_llmusage()
|
||||||
|
except Exception as e:
|
||||||
|
exception(f'backup_accounted_llmusage failed: {e}')
|
||||||
|
|
||||||
|
await asyncio.sleep(10)
|
||||||
|
|
||||||
203
build/lib/llmage/asyncinference.py
Normal file
203
build/lib/llmage/asyncinference.py
Normal file
@ -0,0 +1,203 @@
|
|||||||
|
import json
|
||||||
|
import time
|
||||||
|
import asyncio
|
||||||
|
from random import randint
|
||||||
|
from functools import partial
|
||||||
|
from traceback import format_exc
|
||||||
|
from sqlor.dbpools import DBPools, get_sor_context
|
||||||
|
from appPublic.log import debug, exception, error, critical
|
||||||
|
from appPublic.uniqueID import getID
|
||||||
|
from appPublic.dictObject import DictObject
|
||||||
|
from appPublic.timeUtils import curDateString, timestampstr, timestampAdd
|
||||||
|
from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64
|
||||||
|
from ahserver.serverenv import get_serverenv, ServerEnv
|
||||||
|
from ahserver.filestorage import FileStorage
|
||||||
|
from .accounting import llm_accounting, llm_charging
|
||||||
|
from .utils import *
|
||||||
|
|
||||||
|
async def get_today_asynctask_list(userid):
|
||||||
|
env = ServerEnv()
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
today = await env.get_business_date(sor)
|
||||||
|
sql = '''select * from llmusage
|
||||||
|
where userid=${userid}$
|
||||||
|
and use_date = ${date}$'''
|
||||||
|
recs = await sor.sqlExe(sql, {
|
||||||
|
'date': today,
|
||||||
|
'userid': userid
|
||||||
|
})
|
||||||
|
return recs
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def get_asynctask_status(request, taskid):
|
||||||
|
env = ServerEnv()
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
recs = await sor.R('llmusage', {'taskid': taskid})
|
||||||
|
if recs:
|
||||||
|
r = recs[0]
|
||||||
|
output = await get_lastoutput(r.ioinfo)
|
||||||
|
t = timestampAdd(r.use_time, 600)
|
||||||
|
now = time.time()
|
||||||
|
if r.status not in ['UNKNOWN', 'FAILED', 'SUCCEEDED'] and now > t:
|
||||||
|
asyncio.create_task(query_task_status(request, r.id))
|
||||||
|
return output
|
||||||
|
return {
|
||||||
|
'taskid': taskid,
|
||||||
|
'status': 'FAILED',
|
||||||
|
'error': f'taskid={taskid} not exist'
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
'taskid': taskid,
|
||||||
|
'status': 'FAILED',
|
||||||
|
'error': f'system error'
|
||||||
|
}
|
||||||
|
|
||||||
|
async def async_uapi_request(request, llm,
|
||||||
|
callerid, callerorgid, params_kw=None):
|
||||||
|
env = request._run_ns.copy()
|
||||||
|
if not params_kw:
|
||||||
|
params_kw = env.params_kw
|
||||||
|
# callerorgid = await env.get_userorgid()
|
||||||
|
# callerid = await env.get_user()
|
||||||
|
uapi = env.UpAppApi(request)
|
||||||
|
userid = await env.uapi_data.get_calluserid(llm.upappid, orgid=llm.ownerid)
|
||||||
|
b = None
|
||||||
|
luid = getID()
|
||||||
|
try:
|
||||||
|
start_timestamp = time.time()
|
||||||
|
if llm.callbackurl:
|
||||||
|
params_kw.callbackurl = llm.callbackurl
|
||||||
|
|
||||||
|
b = None
|
||||||
|
try:
|
||||||
|
b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw)
|
||||||
|
except Exception as e:
|
||||||
|
estr = erase_apikey(e)
|
||||||
|
ed = {"error": f"ERROR:{estr}", "status": "FAILED"}
|
||||||
|
exception(f'{ed}')
|
||||||
|
yield f'{ed}\n'
|
||||||
|
return
|
||||||
|
if isinstance(b, bytes):
|
||||||
|
b = b.decode('utf-8')
|
||||||
|
debug(f'task submited:{b}')
|
||||||
|
d = DictObject(**json.loads(b))
|
||||||
|
responsed_seconds = time.time() - start_timestamp
|
||||||
|
finish_seconds = responsed_seconds
|
||||||
|
llmusage = DictObject()
|
||||||
|
llmusage.id = luid
|
||||||
|
llmusage.llmid = llm.id
|
||||||
|
llmusage.use_date = curDateString()
|
||||||
|
llmusage.use_time = timestampstr()
|
||||||
|
llmusage.userid = callerid
|
||||||
|
ioinfo = {
|
||||||
|
"input": params_kw,
|
||||||
|
'output': [d]
|
||||||
|
}
|
||||||
|
webpath = await write_llmio(llmusage.id, ioinfo)
|
||||||
|
llmusage.ioinfo = webpath
|
||||||
|
llmusage.taskid = d.taskid
|
||||||
|
llmusage.transno = params_kw.transno
|
||||||
|
llmusage.responsed_seconds = responsed_seconds
|
||||||
|
llmusage.finish_seconds = finish_seconds
|
||||||
|
llmusage.status = d.status
|
||||||
|
llmusage.userorgid = callerorgid
|
||||||
|
llmusage.ownerid = llm.ownerid
|
||||||
|
llmusage.accounting_status = 'created'
|
||||||
|
b = json.dumps(d, ensure_ascii=False)
|
||||||
|
yield b
|
||||||
|
await write_llmusage(llmusage)
|
||||||
|
# if llm.callbackurl:
|
||||||
|
# return
|
||||||
|
if d.status == 'FAILED':
|
||||||
|
e = Exception(f'resp={d} FFAILED')
|
||||||
|
return
|
||||||
|
asyncio.create_task(query_task_status(request, luid))
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
ed = {"error": f"ERROR:{e}", "status": "FAILED"}
|
||||||
|
s = json.dumps(ed, ensure_ascii=False)
|
||||||
|
s = ''.join(s.split('\n'))
|
||||||
|
exception(s)
|
||||||
|
yield f'{s}\n'
|
||||||
|
return
|
||||||
|
|
||||||
|
async def modify_llmusage(ns):
|
||||||
|
env = ServerEnv()
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
await sor.U('llmusage', ns.copy())
|
||||||
|
|
||||||
|
async def get_llm_llmusage(luid):
|
||||||
|
env = ServerEnv()
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
recs = await sor.R('llmusage', {'id': luid})
|
||||||
|
if len(recs) == 0:
|
||||||
|
e = Exception(f'{luid=} is not found in llmusage')
|
||||||
|
exception(f'{e}')
|
||||||
|
raise e
|
||||||
|
llmusage = recs[0]
|
||||||
|
if llmusage.status == 'UNKNOWN':
|
||||||
|
return
|
||||||
|
if llmusage.status == 'SUCCEEDED':
|
||||||
|
return
|
||||||
|
if llmusage.status == 'FAILED':
|
||||||
|
return
|
||||||
|
llms = await sor.R('llm', {'id': llmusage.llmid})
|
||||||
|
if len(llms) == 0:
|
||||||
|
e = Exception(f'{llmusage.llmid=} not found in llm')
|
||||||
|
exception(f'{e}')
|
||||||
|
raise e
|
||||||
|
llm = llms[0]
|
||||||
|
return llm, llmusage
|
||||||
|
|
||||||
|
async def query_task_status(request, luid, onetime=False):
|
||||||
|
env = ServerEnv()
|
||||||
|
uapi = env.UpAppApi(request)
|
||||||
|
llm, llmusage = await get_llm_llmusage(luid)
|
||||||
|
userid = await env.uapi_data.get_calluserid(llm.upappid, orgid=llm.ownerid)
|
||||||
|
taskid = llmusage.taskid
|
||||||
|
upappid = llm.upappid
|
||||||
|
apinames = llm.query_apiname.split(',')
|
||||||
|
|
||||||
|
for apiname in apinames:
|
||||||
|
while True:
|
||||||
|
lastoutout = await get_lastoutput(llmusage.ioinfo)
|
||||||
|
if lastoutout['status'] in ['UNKNOWN', 'FAILED', 'SUCCEEDED']:
|
||||||
|
critical(f"{lastoutout['status']=}")
|
||||||
|
return
|
||||||
|
ns = {'taskid': taskid}
|
||||||
|
new_output = b = d = None
|
||||||
|
try:
|
||||||
|
b = await uapi.call(upappid, apiname, userid, params=ns)
|
||||||
|
if isinstance(b, bytes):
|
||||||
|
b = b.decode('utf-8')
|
||||||
|
new_output = json.loads(b)
|
||||||
|
except Exception as e:
|
||||||
|
exception(f'{e}, {b=}')
|
||||||
|
new_output = {
|
||||||
|
'status': 'FAILED',
|
||||||
|
'error': f'{b},{e}'
|
||||||
|
}
|
||||||
|
if not new_output.get('status'):
|
||||||
|
e = Exception(f"{new_output=} {upappid=}, {apiname=} has not status field")
|
||||||
|
critical(f'{e}')
|
||||||
|
raise e
|
||||||
|
if lastoutout['status'] != new_output.get('status'):
|
||||||
|
llmusage.status = new_output['status']
|
||||||
|
ns = {
|
||||||
|
'id': llmusage.id,
|
||||||
|
'status': llmusage.status
|
||||||
|
}
|
||||||
|
if 'usage' in new_output.keys():
|
||||||
|
ns['usages'] = json.dumps(new_output['usage'])
|
||||||
|
await append_new_llmoutput(llmusage.ioinfo, new_output)
|
||||||
|
await modify_llmusage(ns)
|
||||||
|
if llmusage.status in ['UNKNOWN', 'FAILED', 'SUCCEEDED']:
|
||||||
|
critical(f'finished .. {llmusage.status=}')
|
||||||
|
return
|
||||||
|
|
||||||
|
if onetime:
|
||||||
|
critical(f'onetime is true, returned')
|
||||||
|
return
|
||||||
|
await asyncio.sleep(llm.query_period or 30)
|
||||||
|
critical(f'{llm.query_period=} seconds will retry, {new_output["status"]=}')
|
||||||
|
|
||||||
33
build/lib/llmage/callback.py
Normal file
33
build/lib/llmage/callback.py
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
from ahserver.serverenv import ServerEnv
|
||||||
|
from appPublic.dictObject import DictObject
|
||||||
|
from sqlor.dbpools import get_sor_context
|
||||||
|
|
||||||
|
async def asynctask_callback(appname, apiname, params_kw)
|
||||||
|
env = ServerEnv()
|
||||||
|
llmusage = None
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
uapi = await env.sor_get_uapi_by_appname_apiname(appname, apiname)
|
||||||
|
try:
|
||||||
|
dstr = await env.tmpl_engine.renders(uapi.response, params_kw)
|
||||||
|
d = DictObject(**json.loads(dstr))
|
||||||
|
llmus = await sor.R('llmusage', {'taskid': d.taskid})
|
||||||
|
if len(llmus) == 0:
|
||||||
|
e = Exception(f'{d=}, {taskid=} not found')
|
||||||
|
exception(f'{e}')
|
||||||
|
raise e
|
||||||
|
llmusage = llmus[0]
|
||||||
|
io = json.loads(llmusage.ioinfo)
|
||||||
|
out = io.get('output')
|
||||||
|
out.append(d)
|
||||||
|
llmusage.ioinfo = json.dumps(io, ensure_ascii=False)
|
||||||
|
llmusage.status = d.status
|
||||||
|
await sor.U('llmusage', llmusage)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
e = Exception(f'{uapi.response=}, {params_kw=} render error')
|
||||||
|
exception(f'{e}')
|
||||||
|
raise e
|
||||||
|
|
||||||
|
if llmusage:
|
||||||
|
await llm_accounting(llmusage)
|
||||||
|
|
||||||
68
build/lib/llmage/init.py
Normal file
68
build/lib/llmage/init.py
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
import asyncio
|
||||||
|
from appPublic.registerfunction import RegisterFunction
|
||||||
|
from sqlor.dbpools import DBPools
|
||||||
|
from ahserver.serverenv import ServerEnv
|
||||||
|
from appPublic.log import debug
|
||||||
|
from .keling import keling_token
|
||||||
|
from .jimeng import jimeng_auth_headers
|
||||||
|
from .utils import (
|
||||||
|
llm_query_orders,
|
||||||
|
read_webpath,
|
||||||
|
llm_query_price,
|
||||||
|
get_llm_by_model,
|
||||||
|
get_llms_by_catelog,
|
||||||
|
get_llms_sort_by_provider,
|
||||||
|
get_llmcatelogs,
|
||||||
|
get_llms_by_catelog_to_customer,
|
||||||
|
get_llmproviders,
|
||||||
|
get_llm,
|
||||||
|
)
|
||||||
|
|
||||||
|
from .llmclient import (
|
||||||
|
inference_generator,
|
||||||
|
inference
|
||||||
|
)
|
||||||
|
from .accounting import (
|
||||||
|
checkCustomerBalance,
|
||||||
|
llm_charging,
|
||||||
|
get_accounting_llmusages,
|
||||||
|
backend_accounting,
|
||||||
|
llm_accounting,
|
||||||
|
backup_accounted_llmusage,
|
||||||
|
get_failed_accounting_records,
|
||||||
|
llm_accoung_failed
|
||||||
|
)
|
||||||
|
|
||||||
|
from .asyncinference import (
|
||||||
|
get_asynctask_status,
|
||||||
|
query_task_status,
|
||||||
|
get_today_asynctask_list
|
||||||
|
)
|
||||||
|
|
||||||
|
def load_llmage():
|
||||||
|
env = ServerEnv()
|
||||||
|
env.llm_query_orders = llm_query_orders
|
||||||
|
env.read_webpath = read_webpath
|
||||||
|
env.get_llm_by_model = get_llm_by_model
|
||||||
|
env.llm_charging = llm_charging
|
||||||
|
env.get_accounting_llmusages = get_accounting_llmusages
|
||||||
|
env.llm_accounting = llm_accounting
|
||||||
|
env.get_today_asynctask_list = get_today_asynctask_list
|
||||||
|
env.get_asynctask_status = get_asynctask_status
|
||||||
|
env.query_task_status = query_task_status
|
||||||
|
env.get_llm = get_llm
|
||||||
|
env.inference = inference
|
||||||
|
env.inference_generator = inference_generator
|
||||||
|
env.get_llms_by_catelog = get_llms_by_catelog
|
||||||
|
env.get_llmcatelogs = get_llmcatelogs
|
||||||
|
env.checkCustomerBalance = checkCustomerBalance
|
||||||
|
env.get_llmproviders = get_llmproviders
|
||||||
|
env.get_llms_sort_by_provider = get_llms_sort_by_provider
|
||||||
|
env.keling_token = keling_token
|
||||||
|
env.llm_query_price = llm_query_price
|
||||||
|
env.get_llms_by_catelog_to_customer = get_llms_by_catelog_to_customer
|
||||||
|
env.backup_accounted_llmusage = backup_accounted_llmusage
|
||||||
|
env.get_failed_accounting_records = get_failed_accounting_records
|
||||||
|
rf = RegisterFunction()
|
||||||
|
rf.register('jimeng_auth_headers', jimeng_auth_headers)
|
||||||
|
|
||||||
98
build/lib/llmage/jimeng.py
Normal file
98
build/lib/llmage/jimeng.py
Normal file
@ -0,0 +1,98 @@
|
|||||||
|
import hashlib
|
||||||
|
import datetime
|
||||||
|
from datetime import timezone
|
||||||
|
import hmac
|
||||||
|
import json
|
||||||
|
from urllib.parse import quote
|
||||||
|
|
||||||
|
Service = "visual"
|
||||||
|
Version = "2022-08-31"
|
||||||
|
Region = "cn-north-1"
|
||||||
|
Host = "visual.volcengineapi.com"
|
||||||
|
ContentType = "application/json"
|
||||||
|
|
||||||
|
def utc_now():
|
||||||
|
try:
|
||||||
|
from datetime import timezone
|
||||||
|
return datetime.datetime.now(timezone.utc)
|
||||||
|
except ImportError:
|
||||||
|
class UTC(datetime.tzinfo):
|
||||||
|
def utcoffset(self, dt):
|
||||||
|
return datetime.timedelta(0)
|
||||||
|
def tzname(self, dt):
|
||||||
|
return "UTC"
|
||||||
|
def dst(self, dt):
|
||||||
|
return datetime.timedelta(0)
|
||||||
|
return datetime.datetime.now(UTC())
|
||||||
|
|
||||||
|
def jm_timestamp():
|
||||||
|
dt = utc_new()
|
||||||
|
return dt.strftime("%Y%m%dT%H%M%SZ")
|
||||||
|
|
||||||
|
# sha256 非对称加密
|
||||||
|
def hmac_sha256(key: bytes, content: str):
|
||||||
|
return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest()
|
||||||
|
|
||||||
|
# sha256 hash算法
|
||||||
|
def hash_sha256(content: str):
|
||||||
|
return hashlib.sha256(content.encode("utf-8")).hexdigest()
|
||||||
|
|
||||||
|
def jimeng_auth_headers(opts):
|
||||||
|
apikey = opts.get('apikey')
|
||||||
|
secretkey = opts.get('secretkey')
|
||||||
|
path = opts.get('path')
|
||||||
|
method = opts.get('method')
|
||||||
|
params = opts.get('params')
|
||||||
|
body = opts.get('body')
|
||||||
|
headers = opts.get('headers')
|
||||||
|
content_type = headers.get('Content-Type')
|
||||||
|
x_date = jm_timestamp()
|
||||||
|
short_x_date = DT[:8]
|
||||||
|
credential = {
|
||||||
|
"access_key_id": apikey,
|
||||||
|
"secret_access_key": secretkey,
|
||||||
|
"service": Service,
|
||||||
|
"region": Region,
|
||||||
|
}
|
||||||
|
x_content_sha256 = hash_sha256(body)
|
||||||
|
sign_result = {
|
||||||
|
"Host": Host,
|
||||||
|
"X-Content-Sha256": x_content_sha256,
|
||||||
|
"X-Date": x_date,
|
||||||
|
"Content-Type": ContentType
|
||||||
|
}
|
||||||
|
headers.update(sign_result)
|
||||||
|
signed_headers_str = ";".join(
|
||||||
|
["content-type", "host", "x-content-sha256", "x-date"]
|
||||||
|
)
|
||||||
|
canonical_request_str = "\n".join(
|
||||||
|
[method.upper(),
|
||||||
|
path,
|
||||||
|
norm_query(params),
|
||||||
|
"\n".join(
|
||||||
|
[
|
||||||
|
"content-type:" + content_type,
|
||||||
|
"host:" + Host,
|
||||||
|
"x-content-sha256:" + x_content_sha256,
|
||||||
|
"x-date:" + x_date,
|
||||||
|
]
|
||||||
|
),
|
||||||
|
"",
|
||||||
|
signed_headers_str,
|
||||||
|
x_content_sha256,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
hashed_canonical_request = hash_sha256(canonical_request_str)
|
||||||
|
credential_scope = "/".join([short_x_date, credential["region"], credential["service"], "request"])
|
||||||
|
string_to_sign = "\n".join(["HMAC-SHA256", x_date, credential_scope, hashed_canonical_request])
|
||||||
|
k_date = hmac_sha256(secretkey.encode("utf-8"), short_x_date)
|
||||||
|
k_region = hmac_sha256(k_date, credential["region"])
|
||||||
|
k_service = hmac_sha256(k_region, credential["service"])
|
||||||
|
k_signing = hmac_sha256(k_service, "request")
|
||||||
|
signature = hmac_sha256(k_signing, string_to_sign).hex()
|
||||||
|
headers['Authorization'] = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format(
|
||||||
|
apikey + "/" + credential_scope,
|
||||||
|
signed_headers_str,
|
||||||
|
signature,
|
||||||
|
)
|
||||||
|
|
||||||
18
build/lib/llmage/keling.py
Normal file
18
build/lib/llmage/keling.py
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
import time
|
||||||
|
import jwt
|
||||||
|
|
||||||
|
ak = "" # 填写access key
|
||||||
|
sk = "" # 填写secret key
|
||||||
|
|
||||||
|
def keling_token(ak, sk):
|
||||||
|
headers = {
|
||||||
|
"alg": "HS256",
|
||||||
|
"typ": "JWT"
|
||||||
|
}
|
||||||
|
payload = {
|
||||||
|
"iss": ak,
|
||||||
|
"exp": int(time.time()) + 1800, # 有效时间,此处示例代表当前时间+1800s(30min)
|
||||||
|
"nbf": int(time.time()) - 5 # 开始生效的时间,此处示例代表当前时间-5秒
|
||||||
|
}
|
||||||
|
token = jwt.encode(payload, sk, headers=headers)
|
||||||
|
return token
|
||||||
148
build/lib/llmage/llmclient.py
Normal file
148
build/lib/llmage/llmclient.py
Normal file
@ -0,0 +1,148 @@
|
|||||||
|
import json
|
||||||
|
import time
|
||||||
|
import asyncio
|
||||||
|
from random import randint
|
||||||
|
from functools import partial
|
||||||
|
from traceback import format_exc
|
||||||
|
from appPublic.log import debug, exception, error
|
||||||
|
from appPublic.uniqueID import getID
|
||||||
|
from appPublic.dictObject import DictObject
|
||||||
|
from appPublic.timeUtils import curDateString, timestampstr
|
||||||
|
from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64
|
||||||
|
from ahserver.serverenv import get_serverenv, ServerEnv
|
||||||
|
from ahserver.filestorage import FileStorage
|
||||||
|
from .asyncinference import async_uapi_request
|
||||||
|
from .syncinference import sync_uapi_request
|
||||||
|
from .accounting import llm_accounting, llm_charging
|
||||||
|
from .utils import *
|
||||||
|
|
||||||
|
async def uapi_request(request, llm, callerid, callerorgid, params_kw=None):
|
||||||
|
env = request._run_ns.copy()
|
||||||
|
if not params_kw:
|
||||||
|
params_kw = env.params_kw
|
||||||
|
# callerorgid = await env.get_userorgid()
|
||||||
|
# callerid = await env.get_user()
|
||||||
|
uapi = env.UpAppApi(request)
|
||||||
|
userid = await env.uapi_data.get_calluserid(llm.upappid, orgid=llm.ownerid)
|
||||||
|
outlines = []
|
||||||
|
txt = ''
|
||||||
|
luid = getID()
|
||||||
|
try:
|
||||||
|
start_timestamp = time.time()
|
||||||
|
responsed_seconds = None
|
||||||
|
finish_seconds = None
|
||||||
|
first = True
|
||||||
|
usage = None
|
||||||
|
async for l in uapi.stream_linify(llm.upappid, llm.apiname, userid,
|
||||||
|
params=params_kw):
|
||||||
|
if first:
|
||||||
|
first = False
|
||||||
|
responsed_seconds = time.time() - start_timestamp
|
||||||
|
if isinstance(l, bytes):
|
||||||
|
l = l.decode('utf-8')
|
||||||
|
if l[-1] == '\n':
|
||||||
|
l = l[:-1]
|
||||||
|
debug(f'stream response line={l},{type(l)}')
|
||||||
|
l = ''.join(l.split('\n'))
|
||||||
|
if l and l != '[DONE]':
|
||||||
|
yield_it = False
|
||||||
|
d = {}
|
||||||
|
try:
|
||||||
|
d = json.loads(l)
|
||||||
|
except Exception as e:
|
||||||
|
debug(f'json.loads({l}) error({e})')
|
||||||
|
continue
|
||||||
|
if d.get('reasoning_content'):
|
||||||
|
txt += d.get('reasoning_content')
|
||||||
|
yield_it = True
|
||||||
|
if d.get('content'):
|
||||||
|
txt = txt + d['content']
|
||||||
|
yield_it = True
|
||||||
|
if d.get('usage'):
|
||||||
|
usage = d['usage']
|
||||||
|
d['llmusageid'] = luid
|
||||||
|
outlines.append(d)
|
||||||
|
yield json.dumps(d, ensure_ascii=False) + '\n'
|
||||||
|
if usage is None:
|
||||||
|
error(f'{llm=} response has not usage')
|
||||||
|
finish_seconds = time.time() - start_timestamp
|
||||||
|
if responsed_seconds is None:
|
||||||
|
responsed_seconds = finish_seconds
|
||||||
|
llmusage = DictObject()
|
||||||
|
llmusage.id = luid
|
||||||
|
llmusage.llmid = llm.id
|
||||||
|
llmusage.use_date = curDateString()
|
||||||
|
llmusage.use_time = timestampstr()
|
||||||
|
llmusage.userid = callerid
|
||||||
|
llmusage.usages = json.dumps(usage, ensure_ascii=False, indent=4)
|
||||||
|
debug(f' {usage=}, {type(usage)=}, {llmusage.usages=}')
|
||||||
|
ioinfo = {
|
||||||
|
"input": params_kw,
|
||||||
|
'output': outlines
|
||||||
|
}
|
||||||
|
webpath = await write_llmio(llmusage.id, ioinfo)
|
||||||
|
llmusage.ioinfo = webpath
|
||||||
|
llmusage.transno = params_kw.transno
|
||||||
|
llmusage.responsed_seconds = responsed_seconds
|
||||||
|
llmusage.finish_seconds = finish_seconds
|
||||||
|
llmusage.status = 'SUCCEEDED'
|
||||||
|
llmusage.userorgid = callerorgid
|
||||||
|
llmusage.ownerid = llm.ownerid
|
||||||
|
llmusage.accounting_status = 'created'
|
||||||
|
await write_llmusage(llmusage)
|
||||||
|
except Exception as e:
|
||||||
|
exception(f'{e=},{format_exc()}')
|
||||||
|
estr = erase_apikey(e)
|
||||||
|
ed = {"error": f"ERROR:{estr}", "status": "FAILED" ,"llmusageid": luid}
|
||||||
|
s = json.dumps(ed, ensure_ascii=False)
|
||||||
|
s = ''.join(s.split('\n'))
|
||||||
|
outlines.append(ed)
|
||||||
|
yield f'{s}\n'
|
||||||
|
return
|
||||||
|
|
||||||
|
async def inference_generator(request, *args, params_kw=None, **kw):
|
||||||
|
env = request._run_ns.copy()
|
||||||
|
callerorgid = await env.get_userorgid()
|
||||||
|
callerid = await env.get_user()
|
||||||
|
async for d in _inference_generator(request, callerid,
|
||||||
|
callerorgid, params_kw=params_kw, **kw):
|
||||||
|
yield d
|
||||||
|
|
||||||
|
async def _inference_generator(request, callerid, callerorgid,
|
||||||
|
params_kw={}, **kw):
|
||||||
|
env = request._run_ns
|
||||||
|
if not params_kw:
|
||||||
|
params_kw = env.params_kw
|
||||||
|
if not params_kw.transno:
|
||||||
|
params_kw.transno = getID()
|
||||||
|
llmid = params_kw.llmid
|
||||||
|
f = None
|
||||||
|
llm = await get_llm(llmid)
|
||||||
|
if llm is None:
|
||||||
|
errmsg = f'{{"status": "FAILED", "error":"llmid:{llmid}没找到模型"}}\n'
|
||||||
|
exception(errmsg)
|
||||||
|
yield errmsg
|
||||||
|
return
|
||||||
|
if not params_kw.model:
|
||||||
|
params_kw.model = llm.model
|
||||||
|
if llm.stream == 'async':
|
||||||
|
if llm.callbackurl:
|
||||||
|
cb_url = env.entire_url(llm.callbackurl)
|
||||||
|
params_kw.callbackurl = cb_url
|
||||||
|
f = partial(async_uapi_request, request, llm, callerid, callerorgid, params_kw=params_kw)
|
||||||
|
elif not params_kw.stream:
|
||||||
|
llm.stream = False
|
||||||
|
debug(f'---{params_kw.stream=}, {llm.stream=} ---use sync_uapi_request ')
|
||||||
|
f = partial(sync_uapi_request, request, llm, callerid, callerorgid, params_kw=params_kw)
|
||||||
|
else:
|
||||||
|
llm.stream = True
|
||||||
|
debug(f'---{params_kw.stream=}, {llm.stream=} ---use uapi_request ')
|
||||||
|
f = partial(uapi_request, request, llm, callerid, callerorgid, params_kw=params_kw)
|
||||||
|
async for d in f():
|
||||||
|
yield d
|
||||||
|
|
||||||
|
async def inference(request, *args, params_kw=None, **kw):
|
||||||
|
env = request._run_ns.copy()
|
||||||
|
f = partial(inference_generator, request, *args, params_kw=params_kw, **kw)
|
||||||
|
return await env.stream_response(request, f)
|
||||||
|
|
||||||
69
build/lib/llmage/messages.py
Normal file
69
build/lib/llmage/messages.py
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
from appPublic.myTE import MyTemplateEngine
|
||||||
|
|
||||||
|
def default_sysmessage():
|
||||||
|
return """{
|
||||||
|
"role":"system",
|
||||||
|
"content":"{{content}}"
|
||||||
|
}"""
|
||||||
|
|
||||||
|
def default_usrmessage():
|
||||||
|
return """{
|
||||||
|
"role":"user",
|
||||||
|
"content":"{{prompt}}"
|
||||||
|
}"""
|
||||||
|
|
||||||
|
def default_llmmessage():
|
||||||
|
return """{
|
||||||
|
"role":"assisant",
|
||||||
|
"content":"{{content}}"
|
||||||
|
}"""
|
||||||
|
|
||||||
|
class BaseMessages:
|
||||||
|
def __init__(self, request, llmid, sys_message, usr_message, llm_message):
|
||||||
|
self.request = request
|
||||||
|
self.llmid = llmid
|
||||||
|
self.sys_message = sys_message
|
||||||
|
self.usr_message = usr_message
|
||||||
|
self.llm_message = llm_message
|
||||||
|
self.te = MyTemplateEngine([])
|
||||||
|
|
||||||
|
async def append_meessages(self, msg_format, **kw):
|
||||||
|
m = self.te.renders(msg_format, kw)
|
||||||
|
msgs = await self.get_messages()
|
||||||
|
msgs.append(m)
|
||||||
|
await self.set_message(msgs)
|
||||||
|
return msgs
|
||||||
|
|
||||||
|
async def append_usr_messages(self, **kw):
|
||||||
|
return await self.append_messages(self.usr_message, **kw)
|
||||||
|
|
||||||
|
async def append_sys_messages(self, **kw):
|
||||||
|
return await self.append_messages(self.sys_message, **kw)
|
||||||
|
|
||||||
|
async def append_llm_messages(self, **kw):
|
||||||
|
return await self.append_messages(self.llm_message, **kw)
|
||||||
|
|
||||||
|
async def get_messages(self):
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def set_messages(self, msgs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class SessionMessages(BaseMessages):
|
||||||
|
async def get_messages(self):
|
||||||
|
env = self.request._run_ns
|
||||||
|
s = await env.get_session()
|
||||||
|
userid = await env.get_user()
|
||||||
|
mk = f'{self.llmid}_{userid}_msgs'
|
||||||
|
msgs = s[mk]
|
||||||
|
if not msgs:
|
||||||
|
msgs = []
|
||||||
|
return msgs
|
||||||
|
|
||||||
|
async def set_messages(self, msgs):
|
||||||
|
env = self.request._run_ns
|
||||||
|
s = await env.get_session()
|
||||||
|
userid = await env.get_user()
|
||||||
|
mk = f'{self.llmid}_{userid}_msgs'
|
||||||
|
s[mk] = msgs
|
||||||
|
|
||||||
98
build/lib/llmage/syncinference.py
Normal file
98
build/lib/llmage/syncinference.py
Normal file
@ -0,0 +1,98 @@
|
|||||||
|
import json
|
||||||
|
import time
|
||||||
|
import asyncio
|
||||||
|
from random import randint
|
||||||
|
from functools import partial
|
||||||
|
from traceback import format_exc
|
||||||
|
from sqlor.dbpools import DBPools, get_sor_context
|
||||||
|
from appPublic.log import debug, exception, error
|
||||||
|
from appPublic.uniqueID import getID
|
||||||
|
from appPublic.dictObject import DictObject
|
||||||
|
from appPublic.timeUtils import curDateString, timestampstr
|
||||||
|
from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64
|
||||||
|
# from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi
|
||||||
|
from ahserver.serverenv import get_serverenv, ServerEnv
|
||||||
|
from ahserver.filestorage import FileStorage
|
||||||
|
from .accounting import llm_accounting, llm_charging
|
||||||
|
from .utils import *
|
||||||
|
|
||||||
|
async def sync_uapi_request(request, llm, callerid, callerorgid, params_kw=None):
|
||||||
|
env = request._run_ns.copy()
|
||||||
|
if not params_kw:
|
||||||
|
params_kw = env.params_kw
|
||||||
|
# callerid = await env.get_user()
|
||||||
|
# callerorgid = await env.get_userorgid()
|
||||||
|
# uapi = UAPI(request, sor=sor)
|
||||||
|
uapi = env.UpAppApi(request)
|
||||||
|
userid = await env.uapi_data.get_calluserid(llm.upappid, orgid=llm.ownerid)
|
||||||
|
outlines = []
|
||||||
|
b = None
|
||||||
|
d = None
|
||||||
|
luid = getID()
|
||||||
|
try:
|
||||||
|
start_timestamp = time.time()
|
||||||
|
responsed_seconds = None
|
||||||
|
finish_seconds = None
|
||||||
|
b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw)
|
||||||
|
if isinstance(b, bytes):
|
||||||
|
b = b.decode('utf-8')
|
||||||
|
d = json.loads(b)
|
||||||
|
status = d.get('status')
|
||||||
|
usage = d.get('usage')
|
||||||
|
if status and status != 'SUCCEEDED':
|
||||||
|
raise Exception(d['error'])
|
||||||
|
responsed_seconds = time.time() - start_timestamp
|
||||||
|
finish_seconds = responsed_seconds
|
||||||
|
llmusage = DictObject()
|
||||||
|
llmusage.id = luid
|
||||||
|
llmusage.llmid = llm.id
|
||||||
|
llmusage.use_date = curDateString()
|
||||||
|
llmusage.use_time = timestampstr()
|
||||||
|
llmusage.userid = callerid
|
||||||
|
llmusage.usages = json.dumps(usage, ensure_ascii=False)
|
||||||
|
ioinfo = {
|
||||||
|
"input": params_kw,
|
||||||
|
'output': [d]
|
||||||
|
}
|
||||||
|
webpath = await write_llmio(llmusage.id, ioinfo)
|
||||||
|
llmusage.ioinfo = webpath
|
||||||
|
llmusage.transno = params_kw.transno
|
||||||
|
llmusage.responsed_seconds = responsed_seconds
|
||||||
|
llmusage.finish_seconds = finish_seconds
|
||||||
|
llmusage.status = 'SUCCEEDED'
|
||||||
|
llmusage.amount = llmusage.cost = 0.00
|
||||||
|
""" 联机不记账
|
||||||
|
if llm.ppid:
|
||||||
|
try:
|
||||||
|
charging = await llm_charging(llm.ppid, llmusage)
|
||||||
|
if charging:
|
||||||
|
llmusage.amount = charging.amount
|
||||||
|
llmusage.cost = charging.cost
|
||||||
|
else:
|
||||||
|
llmusage.amount = llmusage.cost = 0.0
|
||||||
|
except Exception as e:
|
||||||
|
e = Exception(f'{llm.pid} charging error{e}')
|
||||||
|
exception(f'{e}')
|
||||||
|
else:
|
||||||
|
llmusage.amount = 0
|
||||||
|
llmusage.cost = 0
|
||||||
|
"""
|
||||||
|
llmusage.userorgid = callerorgid
|
||||||
|
llmusage.ownerid = llm.ownerid
|
||||||
|
llmusage.accounting_status = 'created'
|
||||||
|
b = json.dumps(d, ensure_ascii=False)
|
||||||
|
yield b
|
||||||
|
await write_llmusage(llmusage)
|
||||||
|
"""联机不记账
|
||||||
|
if llmusage.amount > 0.0001:
|
||||||
|
await llm_accounting(llmusage)
|
||||||
|
"""
|
||||||
|
except Exception as e:
|
||||||
|
exception(f'{e=},{format_exc()}, {b=}')
|
||||||
|
estr = erase_apikey(e)
|
||||||
|
ed = {"error": f"ERROR:{estr}", "status": "FAILED" ,"llmusageid": luid}
|
||||||
|
s = json.dumps(ed, ensure_ascii=False)
|
||||||
|
s = ''.join(s.split('\n'))
|
||||||
|
outlines.append(ed)
|
||||||
|
yield f'{s}\n'
|
||||||
|
|
||||||
348
build/lib/llmage/utils.py
Normal file
348
build/lib/llmage/utils.py
Normal file
@ -0,0 +1,348 @@
|
|||||||
|
import json
|
||||||
|
import time
|
||||||
|
import asyncio
|
||||||
|
import aiofiles
|
||||||
|
from random import randint
|
||||||
|
from functools import partial
|
||||||
|
from traceback import format_exc
|
||||||
|
from sqlor.dbpools import DBPools, get_sor_context
|
||||||
|
from appPublic.log import debug, exception, error, critical
|
||||||
|
from appPublic.uniqueID import getID
|
||||||
|
from appPublic.dictObject import DictObject
|
||||||
|
from appPublic.timeUtils import curDateString, timestampstr
|
||||||
|
from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi
|
||||||
|
from ahserver.serverenv import get_serverenv, ServerEnv
|
||||||
|
from ahserver.filestorage import FileStorage
|
||||||
|
from appPublic.jsonConfig import getConfig
|
||||||
|
from appPublic.streamhttpclient import StreamHttpClient
|
||||||
|
|
||||||
|
async def update_llmusage(ns):
|
||||||
|
env = ServerEnv()
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
await sor.U('llmusage', ns)
|
||||||
|
|
||||||
|
async def get_user_tpac(userid):
|
||||||
|
env = ServerEnv()
|
||||||
|
config = getConfig()
|
||||||
|
async with get_sor_context(env, 'rbac') as sor:
|
||||||
|
recs = await sor.R('users', {'id': userid})
|
||||||
|
if recs:
|
||||||
|
tpac = config.tpacs.get(recs[0].sync_from)
|
||||||
|
return tpac
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def get_tpac_balance(tpac, userid):
|
||||||
|
url = tpac.get_tpac_balance_url
|
||||||
|
hc = StreamHttpClient()
|
||||||
|
try:
|
||||||
|
b = await hc.request('GET', url, params={'userid': userid})
|
||||||
|
if b:
|
||||||
|
d = json.loads(b.decode('utf-8'))
|
||||||
|
if d['status'] == 'ok':
|
||||||
|
return d['balance']
|
||||||
|
exception(f'{url=}, {userid=}, {b} error')
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
exception(f'{url=}, {userid=}, error:{e}')
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def tpac_accounting(tpac, userid, llmid, amount, usage, luid):
|
||||||
|
url = tpac.tpac_accounting_url
|
||||||
|
hc = StreamHttpClient()
|
||||||
|
d = {
|
||||||
|
'userid': userid,
|
||||||
|
'llmid': llmid,
|
||||||
|
'amount': amount,
|
||||||
|
'usage': usage
|
||||||
|
}
|
||||||
|
status = 'failed'
|
||||||
|
try:
|
||||||
|
b = await hc.request('POST', url, data=d)
|
||||||
|
d = json.loads(b.decode('utf-8'))
|
||||||
|
if d['status'] == 'ok':
|
||||||
|
debug(f'{d=}')
|
||||||
|
await update_llmusage({'id': luid, 'accounting_status': 'accounted'})
|
||||||
|
return
|
||||||
|
raise Exception(f'{d} tpac accounting error')
|
||||||
|
except Exception as e:
|
||||||
|
exception(f'{userid=}, {llmid=}, {amount=}, {usage=} tpac accounting error:{e}')
|
||||||
|
raise e
|
||||||
|
|
||||||
|
async def append_new_llmoutput(webpath, output):
|
||||||
|
fs = FileStorage()
|
||||||
|
p = fs.realPath(webpath)
|
||||||
|
if isinstance(output, str):
|
||||||
|
output = json.loads(output)
|
||||||
|
bin = await read_webpath(webpath)
|
||||||
|
io = json.loads(bin.decode('utf-8'))
|
||||||
|
io['output'].append(output)
|
||||||
|
async with aiofiles.open(p, 'wb') as f:
|
||||||
|
iostr = json.dumps(io, ensure_ascii=False, indent=4)
|
||||||
|
await f.write(iostr.encode('utf-8'))
|
||||||
|
|
||||||
|
async def get_lastoutput(webpath):
|
||||||
|
bin = await read_webpath(webpath)
|
||||||
|
io = json.loads(bin.decode('utf-8'))
|
||||||
|
return io['output'][-1]
|
||||||
|
|
||||||
|
async def read_webpath(webpath):
|
||||||
|
fs = FileStorage()
|
||||||
|
p = fs.realPath(webpath)
|
||||||
|
async with aiofiles.open(p,'rb') as f:
|
||||||
|
bin = await f.read()
|
||||||
|
return bin
|
||||||
|
|
||||||
|
async def write_llmio(luid, io_dic):
|
||||||
|
fs = FileStorage()
|
||||||
|
s = io_dic
|
||||||
|
if not isinstance(io_dic, str):
|
||||||
|
s = json.dumps(io_dic, ensure_ascii=False, indent=4)
|
||||||
|
name = f'{luid}.json'
|
||||||
|
webpath = await fs.save(name, s, userid='llmio')
|
||||||
|
return webpath
|
||||||
|
|
||||||
|
async def llm_query_orders(userorgid, page, pagerows=80):
|
||||||
|
env = ServerEnv()
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
sql = """select a.llmid,
|
||||||
|
a.use_date,
|
||||||
|
a.use_time,
|
||||||
|
a.userid,
|
||||||
|
a.usages,
|
||||||
|
a.status,
|
||||||
|
a.amount,
|
||||||
|
a.userorgid,
|
||||||
|
a.accounting_status,
|
||||||
|
b.name,
|
||||||
|
b.model
|
||||||
|
from llmusage a, llm b
|
||||||
|
where userorgid = ${userorgid}$
|
||||||
|
and a.llmid = b.id
|
||||||
|
"""
|
||||||
|
ns = dict(
|
||||||
|
page=page,
|
||||||
|
pagerows=pagerows,
|
||||||
|
sort="use_time desc",
|
||||||
|
userorgid=userorgid)
|
||||||
|
|
||||||
|
data = await sor.sqlExe(sql, ns)
|
||||||
|
return data
|
||||||
|
return {'total': 0, 'rows':[]}
|
||||||
|
|
||||||
|
async def get_llm_by_model(id, lctype=None):
|
||||||
|
env = ServerEnv()
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
sql = 'select * from llm where model=${model}$'
|
||||||
|
recs = await sor.R('llm', {'model': model})
|
||||||
|
return recs
|
||||||
|
|
||||||
|
def erase_apikey(e):
|
||||||
|
e = str(e)
|
||||||
|
ss = e.split('Bearer ')
|
||||||
|
if len(ss) < 2:
|
||||||
|
return e
|
||||||
|
|
||||||
|
for i, c in enumerate(ss[1]):
|
||||||
|
if c in ['"', "'"]:
|
||||||
|
newb = "XXXXXXXX" + ss[1][i:]
|
||||||
|
break
|
||||||
|
return ss[0] + 'Bearer ' + newb
|
||||||
|
|
||||||
|
async def get_llmproviders():
|
||||||
|
env = ServerEnv()
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
sql = """select a.providerid, a.iconid, b.orgname
|
||||||
|
from llm a, organization b
|
||||||
|
where a.providerid = b.id
|
||||||
|
group by a.providerid, a.iconid, b.orgname"""
|
||||||
|
return await sor.sqlExe(sql, {})
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def get_llms_sort_by_provider():
|
||||||
|
env = ServerEnv()
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
today = curDateString()
|
||||||
|
sql = """select a.*, b.orgname from llm a, organization b
|
||||||
|
where a.enabled_date <= ${today}$
|
||||||
|
and a.expired_date > ${today}$
|
||||||
|
and a.providerid = b.id
|
||||||
|
order by a.providerid, a.id
|
||||||
|
"""
|
||||||
|
recs = await sor.sqlExe(sql, {'today': today})
|
||||||
|
d = []
|
||||||
|
x = None
|
||||||
|
oldpid = '-111'
|
||||||
|
for l in recs:
|
||||||
|
if l.providerid != oldpid:
|
||||||
|
x = {
|
||||||
|
'id': l.providerid,
|
||||||
|
'orgname': l.orgname,
|
||||||
|
'llms': [l]
|
||||||
|
}
|
||||||
|
d.append(x)
|
||||||
|
oldpid = l.providerid
|
||||||
|
else:
|
||||||
|
x['llms'].append(l)
|
||||||
|
return d
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def get_llmcatelogs():
|
||||||
|
db = DBPools()
|
||||||
|
dbname = get_serverenv('get_module_dbname')('llmage')
|
||||||
|
async with db.sqlorContext(dbname) as sor:
|
||||||
|
recs = await sor.R('llmcatelog', {})
|
||||||
|
return recs
|
||||||
|
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def get_llms_by_catelog_to_customer(catelogid=None, orderby='providerid'):
|
||||||
|
env = ServerEnv()
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
today = curDateString()
|
||||||
|
# Join with llm_api_map to get catalog relationship
|
||||||
|
sql = """select distinct a.*,
|
||||||
|
b.name as catelogname,
|
||||||
|
m.llmcatelogid as catelog_id,
|
||||||
|
m.apiname,
|
||||||
|
m.query_apiname,
|
||||||
|
m.query_period,
|
||||||
|
m.ppid
|
||||||
|
from llm a
|
||||||
|
join llm_api_map m on a.id = m.llmid
|
||||||
|
join llmcatelog b on m.llmcatelogid = b.id
|
||||||
|
where a.enabled_date <= ${today}$
|
||||||
|
and m.ppid is not null
|
||||||
|
and a.expired_date > ${today}$
|
||||||
|
"""
|
||||||
|
sortstr='catelog_id, ' + orderby
|
||||||
|
params = {'today': today, 'sort': sortstr}
|
||||||
|
if catelogid:
|
||||||
|
sql += " and m.llmcatelogid = ${catelogid}$"
|
||||||
|
params['catelogid'] = catelogid
|
||||||
|
|
||||||
|
debug(f'{sql=}')
|
||||||
|
recs = await sor.sqlExe(sql, params.copy())
|
||||||
|
debug(f'{sql=}, {recs=}, {params=}')
|
||||||
|
d = []
|
||||||
|
cid = ''
|
||||||
|
x = None
|
||||||
|
for r in recs:
|
||||||
|
if cid != r.catelog_id:
|
||||||
|
x = {
|
||||||
|
'catelogid': r.catelog_id,
|
||||||
|
'catelogname': r.catelogname,
|
||||||
|
'llms': [r]
|
||||||
|
}
|
||||||
|
d.append(x)
|
||||||
|
cid = r.catelog_id
|
||||||
|
else:
|
||||||
|
x['llms'].append(r)
|
||||||
|
return d
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def get_llms_by_catelog(catelogid=None, orderby='providerid'):
|
||||||
|
env = ServerEnv()
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
today = curDateString()
|
||||||
|
# Join with llm_api_map to get catalog relationship
|
||||||
|
sql = """select distinct a.*, b.name as catelogname, m.llmcatelogid as catelog_id
|
||||||
|
from llm a
|
||||||
|
join llm_api_map m on a.id = m.llmid
|
||||||
|
join llmcatelog b on m.llmcatelogid = b.id
|
||||||
|
where a.enabled_date <= ${today}$
|
||||||
|
and a.expired_date > ${today}$"""
|
||||||
|
params = {'today': today, 'sort': orderby}
|
||||||
|
if catelogid:
|
||||||
|
sql += " and m.llmcatelogid = ${catelogid}$"
|
||||||
|
params['catelogid'] = catelogid
|
||||||
|
|
||||||
|
sql += " order by m.llmcatelogid, a.id"
|
||||||
|
|
||||||
|
recs = await sor.sqlExe(sql, params)
|
||||||
|
d = []
|
||||||
|
cid = ''
|
||||||
|
x = None
|
||||||
|
for r in recs:
|
||||||
|
if cid != r.catelog_id:
|
||||||
|
x = {
|
||||||
|
'catelogid': r.catelog_id,
|
||||||
|
'catelogname': r.catelogname,
|
||||||
|
'llms': [r]
|
||||||
|
}
|
||||||
|
d.append(x)
|
||||||
|
cid = r.catelog_id
|
||||||
|
else:
|
||||||
|
x['llms'].append(r)
|
||||||
|
return d
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def get_llm(llmid, catelogid=None):
|
||||||
|
today = curDateString()
|
||||||
|
env = ServerEnv()
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
sql = """select a.id,
|
||||||
|
a.name,
|
||||||
|
a.model,
|
||||||
|
a.providerid,
|
||||||
|
a.description,
|
||||||
|
a.iconid,
|
||||||
|
a.upappid,
|
||||||
|
a.ownerid,
|
||||||
|
a.min_balance,
|
||||||
|
m.llmcatelogid,
|
||||||
|
m.apiname,
|
||||||
|
m.query_apiname,
|
||||||
|
m.query_period,
|
||||||
|
m.ppid,
|
||||||
|
e.ioid,
|
||||||
|
e.stream,
|
||||||
|
e.callbackurl,
|
||||||
|
f.input_fields,
|
||||||
|
lc.name as catelogname
|
||||||
|
from llm a
|
||||||
|
,llm_api_map m
|
||||||
|
,llmcatelog lc
|
||||||
|
,upapp c
|
||||||
|
,uapi e
|
||||||
|
,uapiio f
|
||||||
|
where a.id = m.llmid
|
||||||
|
and a.upappid = c.id
|
||||||
|
and c.id = e.upappid
|
||||||
|
and m.apiname = e.name
|
||||||
|
and e.ioid = f.id
|
||||||
|
and a.id = ${llmid}$
|
||||||
|
and a.expired_date > ${today}$
|
||||||
|
and a.enabled_date <= ${today}$
|
||||||
|
"""
|
||||||
|
ns = {'llmid': llmid, 'today': today}
|
||||||
|
if catelogid:
|
||||||
|
sql += ' and m.llmcatelogid = ${catelogid}$ '
|
||||||
|
ns['catelogid'] = catelogid
|
||||||
|
else:
|
||||||
|
sql += " and m.isdefaultcatelog = '1'"
|
||||||
|
recs = await sor.sqlExe(sql, ns.copy())
|
||||||
|
if len(recs) > 0:
|
||||||
|
r = recs[0]
|
||||||
|
return r
|
||||||
|
else:
|
||||||
|
debug(f'{llmid=} not found, {ns=}, {sql=}')
|
||||||
|
return None
|
||||||
|
exception(f'Error: {format_exc()}')
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
async def write_llmusage(llmusage):
|
||||||
|
env = ServerEnv()
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
await sor.C('llmusage', llmusage)
|
||||||
|
|
||||||
|
async def llm_query_price(llmid, config_data):
|
||||||
|
env = ServerEnv()
|
||||||
|
llm = await get_llm(llmid)
|
||||||
|
if llm.ppid is None:
|
||||||
|
e = Exception(f'{llm=} ppid is None')
|
||||||
|
exception(f'{e}')
|
||||||
|
raise e
|
||||||
|
prices = await env.buffered_charging(llm.ppid, config_data)
|
||||||
|
return prices
|
||||||
|
|
||||||
@ -41,7 +41,7 @@ async def checkCustomerBalance(llmid, userid, userorgid, catelogid=None):
|
|||||||
debug(f'checkCustomerBalance(): llmid is None')
|
debug(f'checkCustomerBalance(): llmid is None')
|
||||||
return False
|
return False
|
||||||
env = ServerEnv()
|
env = ServerEnv()
|
||||||
llm = await get_llm(llmid)
|
llm = await get_llmage_llm(llmid)
|
||||||
if llm.ownerid == userorgid:
|
if llm.ownerid == userorgid:
|
||||||
debug(f'self orgid user')
|
debug(f'self orgid user')
|
||||||
return True
|
return True
|
||||||
@ -64,37 +64,34 @@ async def checkCustomerBalance(llmid, userid, userorgid, catelogid=None):
|
|||||||
async def llm_accounting(llmusage):
|
async def llm_accounting(llmusage):
|
||||||
env = ServerEnv()
|
env = ServerEnv()
|
||||||
llmid = llmusage.llmid
|
llmid = llmusage.llmid
|
||||||
|
llm = await get_llmage_llm(llmid)
|
||||||
|
if llm is None:
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
ns = {
|
||||||
|
'id': llmusage.id,
|
||||||
|
'accounting_status': 'failed'
|
||||||
|
}
|
||||||
|
await sor.U('llmusage', ns)
|
||||||
|
e = Exception(f'llm not found({llmid})')
|
||||||
|
exception(f'{e}')
|
||||||
|
raise e
|
||||||
|
if llm.ppid is None:
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
ns = {
|
||||||
|
'id': llmusage.id,
|
||||||
|
'accounting_status': 'failed'
|
||||||
|
}
|
||||||
|
await sor.U('llmusage', ns)
|
||||||
|
e = Exception(f'llm ({llmid}) donot has a pricing_program')
|
||||||
|
exception(f'{e}')
|
||||||
|
raise e
|
||||||
|
customerid = llmusage.userorgid
|
||||||
|
userid = llmusage.userid
|
||||||
|
resellerid = llm.ownerid
|
||||||
|
providerid = llm.providerid
|
||||||
|
trans_amount = llmusage.amount
|
||||||
|
trans_cost = llmusage.cost
|
||||||
async with get_sor_context(env, 'llmage') as sor:
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
sql = """select a.*, b.ppid from llm a, llm_api_map b
|
|
||||||
where a.id=${llmid}$
|
|
||||||
and a.id = b.llmid
|
|
||||||
and b.isdefaultcatelog = '1'
|
|
||||||
"""
|
|
||||||
recs = await sor.sqlExe(sql, {'llmid': llmusage.llmid})
|
|
||||||
if len(recs) == 0:
|
|
||||||
ns = {
|
|
||||||
'id': llmusage.id,
|
|
||||||
'accounting_status': 'failed'
|
|
||||||
}
|
|
||||||
await sor.U('llmusage', ns)
|
|
||||||
e = Exception(f'llm not found({llmid})')
|
|
||||||
exception(f'{e}')
|
|
||||||
raise e
|
|
||||||
if recs[0].ppid is None:
|
|
||||||
ns = {
|
|
||||||
'id': llmusage.id,
|
|
||||||
'accounting_status': 'failed'
|
|
||||||
}
|
|
||||||
await sor.U('llmusage', ns)
|
|
||||||
e = Exception(f'llm ({llmid}) donot has a pricing_program')
|
|
||||||
exception(f'{e}')
|
|
||||||
raise e
|
|
||||||
customerid = llmusage.userorgid
|
|
||||||
userid = llmusage.userid
|
|
||||||
resellerid = recs[0].ownerid
|
|
||||||
providerid = recs[0].providerid
|
|
||||||
trans_amount = llmusage.amount
|
|
||||||
trans_cost = llmusage.cost
|
|
||||||
biz_date = await env.get_business_date(sor)
|
biz_date = await env.get_business_date(sor)
|
||||||
timestamp = env.timestampstr()
|
timestamp = env.timestampstr()
|
||||||
orderid = getID()
|
orderid = getID()
|
||||||
|
|||||||
@ -16,7 +16,8 @@ from .utils import (
|
|||||||
get_llmcatelogs,
|
get_llmcatelogs,
|
||||||
get_llms_by_catelog_to_customer,
|
get_llms_by_catelog_to_customer,
|
||||||
get_llmproviders,
|
get_llmproviders,
|
||||||
get_llm,
|
get_llm,
|
||||||
|
get_llmage_llm,
|
||||||
)
|
)
|
||||||
|
|
||||||
from .llmclient import (
|
from .llmclient import (
|
||||||
@ -54,6 +55,7 @@ def load_llmage():
|
|||||||
env.get_asynctask_status = get_asynctask_status
|
env.get_asynctask_status = get_asynctask_status
|
||||||
env.query_task_status = query_task_status
|
env.query_task_status = query_task_status
|
||||||
env.get_llm = get_llm
|
env.get_llm = get_llm
|
||||||
|
env.get_llmage_llm = get_llmage_llm
|
||||||
env.inference = inference
|
env.inference = inference
|
||||||
env.inference_generator = inference_generator
|
env.inference_generator = inference_generator
|
||||||
env.get_llms_by_catelog = get_llms_by_catelog
|
env.get_llms_by_catelog = get_llms_by_catelog
|
||||||
|
|||||||
@ -188,6 +188,39 @@ where a.enabled_date <= ${today}$
|
|||||||
return d
|
return d
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
async def get_llmage_llm(llmid=None, catelogid=None):
|
||||||
|
"""Unified accessor for llm + llm_api_map + llmcatelog.
|
||||||
|
For non-API-call scenarios only (display, listing, querying, accounting).
|
||||||
|
Do NOT use for vendor model API calls — use get_llm() instead.
|
||||||
|
|
||||||
|
- llmid: get specific llm by id (returns single DictObject or None)
|
||||||
|
- catelogid: filter by catalog (returns list)
|
||||||
|
- neither: return all with catalog info (returns list)
|
||||||
|
"""
|
||||||
|
env = ServerEnv()
|
||||||
|
async with get_sor_context(env, 'llmage') as sor:
|
||||||
|
sql = """select a.id, a.name, a.model, a.providerid, a.description,
|
||||||
|
a.iconid, a.upappid, a.ownerid, a.min_balance, a.status,
|
||||||
|
m.llmcatelogid, m.apiname, m.query_apiname, m.query_period, m.ppid, m.isdefaultcatelog,
|
||||||
|
lc.name as catelogname
|
||||||
|
from llm a
|
||||||
|
join llm_api_map m on a.id = m.llmid
|
||||||
|
join llmcatelog lc on m.llmcatelogid = lc.id
|
||||||
|
where 1=1
|
||||||
|
"""
|
||||||
|
ns = {}
|
||||||
|
if llmid:
|
||||||
|
sql += " and a.id = ${llmid}$ and m.isdefaultcatelog = '1'"
|
||||||
|
ns['llmid'] = llmid
|
||||||
|
if catelogid:
|
||||||
|
sql += " and m.llmcatelogid = ${catelogid}$"
|
||||||
|
ns['catelogid'] = catelogid
|
||||||
|
sql += " order by m.llmcatelogid, a.id"
|
||||||
|
recs = await sor.sqlExe(sql, ns)
|
||||||
|
if llmid:
|
||||||
|
return recs[0] if recs else None
|
||||||
|
return recs
|
||||||
|
|
||||||
async def get_llmcatelogs():
|
async def get_llmcatelogs():
|
||||||
db = DBPools()
|
db = DBPools()
|
||||||
dbname = get_serverenv('get_module_dbname')('llmage')
|
dbname = get_serverenv('get_module_dbname')('llmage')
|
||||||
@ -375,7 +408,7 @@ async def write_llmusage(llmusage):
|
|||||||
|
|
||||||
async def llm_query_price(llmid, config_data):
|
async def llm_query_price(llmid, config_data):
|
||||||
env = ServerEnv()
|
env = ServerEnv()
|
||||||
llm = await get_llm(llmid)
|
llm = await get_llmage_llm(llmid)
|
||||||
if llm.ppid is None:
|
if llm.ppid is None:
|
||||||
e = Exception(f'{llm=} ppid is None')
|
e = Exception(f'{llm=} ppid is None')
|
||||||
exception(f'{e}')
|
exception(f'{e}')
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user