llmage/llmage/utils.py
2026-05-21 13:33:32 +08:00

352 lines
9.2 KiB
Python

import json
import time
import asyncio
import aiofiles
from random import randint
from functools import partial
from traceback import format_exc
from sqlor.dbpools import DBPools, get_sor_context
from appPublic.log import debug, exception, error, critical
from appPublic.uniqueID import getID
from appPublic.dictObject import DictObject
from appPublic.timeUtils import curDateString, timestampstr
from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi
from ahserver.serverenv import get_serverenv, ServerEnv
from ahserver.filestorage import FileStorage
from appPublic.jsonConfig import getConfig
from appPublic.streamhttpclient import StreamHttpClient
async def get_user_tpac_apikey(userid):
env = ServerEnv()
config = getConfig()
if not config.tpac:
return None
apikey = await env.get_user_dapp_apikey(config.tpac.dappid, userid)
if apikey is None:
return None
return apikey
async def get_tpac_balance(apikey, userid):
config = getConfig()
if apikey is None:
return None
url = config.tpac.get_user_balance_url
hc = StreamHttpClient()
try:
b = hc.request('GET', url, params={"apikey": apikey, 'userid': userid})
if b:
d = json.loads(b.decode('utf-8'))
if d['status'] == 'ok':
return d['balance']
exception(f'{url=}, {userid=}, {apikey=}, error')
return None
except Exception as e:
exception(f'{url=}, {userid=}, {apikey=}, error:{e}')
return None
async def tpac_accounting(apikey, userid, llmid, amount, usage):
if apikey is None:
return
config = getConfig()
url = config.tpac.accounting_url
d = {
'apikey': apikey,
'userid': userid,
'llmid': llmid,
'amount': amount,
'usage': usage
}
hc = StreamHttpClient()
status = 'failed'
try:
b = hc.request('POST', url, data=d):
d = json.loads(b.decode('utf-8'))
if d['status'] == 'ok':
status = 'accounted'
exception(f'{apikey=}, {userid=}, {llmid=}, {amount=}, {usage=} tpac accounting error')
except Exception as e:
exception(f'{apikey=}, {userid=}, {llmid=}, {amount=}, {usage=} tpac accounting error:{e}')
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
await sor.U('llmusage', {
'id': llmid,
'accounting_status': status
})
return
exception(f'{apikey=}, {userid=}, {llmid=}, {amount=}, {usage=} tpac_accounting error:update llmusage error')
async def append_new_llmoutput(webpath, output):
fs = FileStorage()
p = fs.realPath(webpath)
if isinstance(output, str):
output = json.loads(output)
bin = await read_webpath(webpath)
io = json.loads(bin.decode('utf-8'))
io['output'].append(output)
async with aiofiles.open(p, 'wb') as f:
iostr = json.dumps(io, ensure_ascii=False, indent=4)
await f.write(iostr.encode('utf-8'))
async def get_lastoutput(webpath):
bin = await read_webpath(webpath)
io = json.loads(bin.decode('utf-8'))
return io['output'][-1]
async def read_webpath(webpath):
fs = FileStorage()
p = fs.realPath(webpath)
async with aiofiles.open(p,'rb') as f:
bin = await f.read()
return bin
async def write_llmio(luid, io_dic):
fs = FileStorage()
s = io_dic
if not isinstance(io_dic, str):
s = json.dumps(io_dic, ensure_ascii=False, indent=4)
name = f'{luid}.json'
webpath = await fs.save(name, s, userid='llmio')
return webpath
async def llm_query_orders(userorgid, page, pagerows=80):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
sql = """select a.llmid,
a.use_date,
a.use_time,
a.userid,
a.usages,
a.status,
a.amount,
a.userorgid,
a.accounting_status,
b.name,
b.model
from llmusage a, llm b
where userorgid = ${userorgid}$
and a.llmid = b.id
"""
ns = dict(
page=page,
pagerows=pagerows,
sort="use_time desc",
userorgid=userorgid)
data = await sor.sqlExe(sql, ns)
return data
return {'total': 0, 'rows':[]}
async def get_llm_by_model(id, lctype=None):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
sql = 'select * from llm where model=${model}$'
recs = await sor.R('llm', {'model': model})
return recs
def erase_apikey(e):
e = str(e)
ss = e.split('Bearer ')
if len(ss) < 2:
return e
for i, c in enumerate(ss[1]):
if c in ['"', "'"]:
newb = "XXXXXXXX" + ss[1][i:]
break
return ss[0] + 'Bearer ' + newb
async def get_llmproviders():
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
sql = """select a.providerid, a.iconid, b.orgname
from llm a, organization b
where a.providerid = b.id
group by a.providerid, a.iconid, b.orgname"""
return await sor.sqlExe(sql, {})
return []
async def get_llms_sort_by_provider():
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
today = curDateString()
sql = """select a.*, b.orgname from llm a, organization b
where a.enabled_date <= ${today}$
and a.expired_date > ${today}$
and a.providerid = b.id
order by a.providerid, a.id
"""
recs = await sor.sqlExe(sql, {'today': today})
d = []
x = None
oldpid = '-111'
for l in recs:
if l.providerid != oldpid:
x = {
'id': l.providerid,
'orgname': l.orgname,
'llms': [l]
}
d.append(x)
oldpid = l.providerid
else:
x['llms'].append(l)
return d
return []
async def get_llmcatelogs():
db = DBPools()
dbname = get_serverenv('get_module_dbname')('llmage')
async with db.sqlorContext(dbname) as sor:
recs = await sor.R('llmcatelog', {})
return recs
return []
async def get_llms_by_catelog_to_customer(catelogid=None, orderby='providerid'):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
today = curDateString()
# Join with llm_catalog_rel to support multiple catalogs per LLM
sql = """select a.*, b.name as catelogname, rel.llmcatelogid as catelog_id
from llm a
join llm_catalog_rel rel on a.id = rel.llmid
join llmcatelog b on rel.llmcatelogid = b.id
where a.enabled_date <= ${today}$
and a.ppid is not null
and a.expired_date > ${today}$
"""
sortstr='catelog_id, ' + orderby
params = {'today': today, 'sort': sortstr}
if catelogid:
sql += " and rel.llmcatelogid = ${catelogid}$"
params['catelogid'] = catelogid
debug(f'{sql=}')
recs = await sor.sqlExe(sql, params.copy())
debug(f'{sql=}, {recs=}, {params=}')
d = []
cid = ''
x = None
for r in recs:
if cid != r.catelog_id:
x = {
'catelogid': r.catelog_id,
'catelogname': r.catelogname,
'llms': [r]
}
d.append(x)
cid = r.catelog_id
else:
x['llms'].append(r)
return d
return []
async def get_llms_by_catelog(catelogid=None, orderby='providerid'):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
today = curDateString()
# Join with llm_catalog_rel to support multiple catalogs per LLM
sql = """select a.*, b.name as catelogname, rel.llmcatelogid as catelog_id
from llm a
join llm_catalog_rel rel on a.id = rel.llmid
join llmcatelog b on rel.llmcatelogid = b.id
where a.enabled_date <= ${today}$
and a.expired_date > ${today}$"""
params = {'today': today, 'sort': orderby}
if catelogid:
sql += " and rel.llmcatelogid = ${catelogid}$"
params['catelogid'] = catelogid
sql += " order by rel.llmcatelogid, a.id"
recs = await sor.sqlExe(sql, params)
d = []
cid = ''
x = None
for r in recs:
if cid != r.catelog_id:
x = {
'catelogid': r.catelog_id,
'catelogname': r.catelogname,
'llms': [r]
}
d.append(x)
cid = r.catelog_id
else:
x['llms'].append(r)
return d
return []
class BufferedLLMs:
llms = {}
@classmethod
def clear_cache(cls, data=None):
"""Clear all cached LLM configurations.
Called as EventDispatcher handler, so accepts optional data param.
"""
cls.llms.clear()
debug('BufferedLLMs cache cleared')
async def get_llm(self, llmid):
today = curDateString()
k = f'{llmid}.{today}'
d = BufferedLLMs.llms.get(k)
if d:
return d
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
sql = """select x.*,
z.input_fields
from (
select a.*, e.ioid, e.stream, e.callbackurl, f.input_fields as inputfields
from llm a
join llm_api_map m on a.id = m.llmid
join upapp c on a.upappid = c.id
join uapi e on c.apisetid = e.apisetid and m.apiname = e.name
join uapiio f on e.ioid = f.id
where a.expired_date > ${today}$
and a.enabled_date <= ${today}$
) x left join uapiio z on x.ioid = z.id
where x.id = ${llmid}$
"""
ns = {'llmid': llmid, 'today': today}
recs = await sor.sqlExe(sql, ns.copy())
if len(recs) > 0:
r = recs[0]
dates = BufferedLLMs.llms.get(llmid, [])
dates.append(today)
cnt = len(dates)
if cnt > 2:
for i in range(0, cnt -2):
dat = dates[i]
del BufferedLLMs.llms[f'{llmid}.{dat}']
dates = dates[-2:]
BufferedLLMs.llms[llmid] = dates
BufferedLLMs.llms[k] = r
return r
else:
debug(f'{llmid=} not found, {ns=}, {sql=}')
return None
exception(f'Error: {format_exc()}')
return None
async def get_llm(llmid):
bllms = BufferedLLMs()
return await bllms.get_llm(llmid)
async def write_llmusage(llmusage):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
await sor.C('llmusage', llmusage)
async def llm_query_price(llmid, config_data):
env = ServerEnv()
llm = await get_llm(llmid)
if llm.ppid is None:
e = Exception(f'{llm=} ppid is None')
exception(f'{e}')
raise e
prices = await env.buffered_charging(llm.ppid, config_data)
return prices