chore: remove build/ from git tracking, add to .gitignore

This commit is contained in:
yumoqing 2026-05-30 13:54:44 +08:00
parent 5ec5946a90
commit 62dce1d3d7
12 changed files with 1 additions and 1452 deletions

1
.gitignore vendored
View File

@ -6,3 +6,4 @@ wwwroot/llmcatelog_list/
wwwroot/llmusage/
wwwroot/llmusage_accounting_failed/
wwwroot/llmusage_history/
build/

View File

@ -1,369 +0,0 @@
import asyncio
import json
import time
from datetime import datetime
from appPublic.log import exception, debug
from appPublic.uniqueID import getID
from appPublic.dictObject import DictObject
from sqlor.dbpools import get_sor_context
from ahserver.serverenv import ServerEnv
from accounting.consume import consume_accounting
from accounting.getaccount import getCustomerBalance
from .utils import *
async def llm_charging(ppid, llmusage):
env = ServerEnv()
usages = llmusage.usages
if isinstance(usages, str):
usages = json.loads(usages)
prices = await env.buffered_charging(ppid, usages)
if prices is None:
e = Exception(f'{ppid=}, {usages=}{llmusage.id=} env.buffered_charging() return None')
exception(f'{e}')
raise e
return None
amount = 0
cost = 0
for p in prices:
amount += p.amount
if p.cost:
cost += p.cost
discount = await env.get_customer_discount(llmusage.ownerid,
llmusage.userorgid)
return DictObject(**{
'original_amount': amount,
'amount': amount * discount,
'cost': cost
})
async def checkCustomerBalance(llmid, userid, userorgid, catelogid=None):
if llmid is None:
debug(f'checkCustomerBalance(): llmid is None')
return False
env = ServerEnv()
llm = await get_llm(llmid)
if llm.ownerid == userorgid:
debug(f'self orgid user')
return True
balance = 0.00
tpac = await get_user_tpac(userid)
if tpac:
debug(f'{tpac=}, get tpac balance')
balance = await get_tpac_balance(tpac, userid)
else:
debug(f'{tpac=}, get local balance')
async with get_sor_context(env, 'accounting') as sor:
balance = await getCustomerBalance(sor, userorgid)
bal = 0 if balance is None else balance
if llm.min_balance is None:
llm.min_balance = 0.00
ret = llm.ppid and llm.min_balance < bal
debug(f'{llm.ppid=}, {llm.min_balance=}, {bal=}')
return ret
async def llm_accounting(llmusage):
env = ServerEnv()
llmid = llmusage.llmid
async with get_sor_context(env, 'llmage') as sor:
sql = """select a.*, b.ppid from llm a, llm_api_map b
where a.id=${llmid}$
and a.id = b.llmid
and b.isdefaultcatelog = '1'
"""
recs = await sor.sqlExe(sql, {'llmid': llmusage.llmid})
if len(recs) == 0:
ns = {
'id': llmusage.id,
'accounting_status': 'failed'
}
await sor.U('llmusage', ns)
e = Exception(f'llm not found({llmid})')
exception(f'{e}')
raise e
if recs[0].ppid is None:
ns = {
'id': llmusage.id,
'accounting_status': 'failed'
}
await sor.U('llmusage', ns)
e = Exception(f'llm ({llmid}) donot has a pricing_program')
exception(f'{e}')
raise e
customerid = llmusage.userorgid
userid = llmusage.userid
resellerid = recs[0].ownerid
providerid = recs[0].providerid
trans_amount = llmusage.amount
trans_cost = llmusage.cost
biz_date = await env.get_business_date(sor)
timestamp = env.timestampstr()
orderid = getID()
order = {
"id": orderid,
"customerid": customerid,
"resellerid": resellerid,
"order_date": biz_date,
"order_status": "1", # accounted
"business_op": "PAY",
"amount": trans_amount,
"userid": userid,
"productid": llmid
}
await sor.C('biz_order', order)
orderdetail = {
"id": getID(),
"orderid": orderid,
"productid": llmid,
"product_cnt": 1,
"trans_amount": trans_amount
}
await sor.C('biz_orderdetail', orderdetail)
ais = []
if customerid != resellerid:
ai0 = DictObject()
ai0.action = 'PAY'
ai0.customerid = customerid
ai0.resellerid = resellerid
ai0.providerid = providerid
ai0.biz_date = biz_date
ai0.timestamp = timestamp
ai0.productid = llmid
ai0.transamt = trans_amount
ai0.variable = {
"交易金额": trans_amount,
"交易手续费": 0
}
ais.append(ai0)
ai1 = DictObject()
ai1.action = 'PAY*'
ai1.customerid = customerid
ai1.resellerid = resellerid
ai1.providerid = providerid
ai1.biz_date = biz_date
ai1.timestamp = timestamp
ai1.providerid = providerid
ai1.productid = llmid
ai1.transamt = trans_cost
ai1.variable = {
"采购成本": trans_cost
}
ais.append(ai1)
await consume_accounting(sor, orderid, ais)
llmusage.accounting_status = 'accounted'
ns = {
'id': llmusage.id,
'accounting_status': 'accounted'
}
await sor.U('llmusage', ns)
async def get_accounting_llmusages(luid=None):
env = ServerEnv()
lus = []
t = time.time()
dt = datetime.fromtimestamp(t)
tsstr = dt.strftime('%Y-%m-%d %H:%M:%S.') + f'{dt.microsecond // 1000:03d}'
async with get_sor_context(env, 'llmage') as sor:
sql = """select a.*, c.ppid
from llmusage a, llm b, llm_api_map c
where a.llmid = b.id
and a.llmid = c.llmid
and c.isdefaultcatelog = '1'
and a.status = 'SUCCEEDED'
and a.use_time < ${tsstr}$
and a.accounting_status='created'"""
ns = {'tsstr': tsstr}
if luid:
sql += " and a.id=${luid}$"
ns['luid'] = luid
recs = await sor.sqlExe(sql, ns)
# debug(f'{sql=}, {ns=}, {len(recs)=}')
for r in recs:
if r.usages is None:
try:
output = await get_lastoutput(r.ioinfo)
except Exception as e:
continue
r.usages = output.get('usage')
if r.usages is None:
debug(f'{r.usages=} is None, accoiunting failed')
await llm_accoung_failed(r.id, reason='usages is None')
continue
d = None
try:
debug(f'{r.ppid=}, {r.usages=} {r.id=}')
d = await llm_charging(r.ppid, r)
except Exception as e:
exception(f'{r.ppid=}, {r.usages=} llm_charging() failed,{e}')
await llm_accoung_failed(r.id, reason=f'llm_charging failed: {e}')
continue
r.amount = d.amount
r.cost = d.cost
ns = {
'id': r.id,
'amount': r.amount,
'cost': r.cost,
'usage': json.dumps(r.usage, ensure_ascii=False, indent=4)
}
await sor.U('llmusage', ns)
lus.append(r)
return lus
async def llm_accoung_failed(luid, reason=None):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
await sor.U('llmusage', {
'id': luid,
'accounting_status': 'failed'
})
# Also record in the failed accounting table for tracking
recs = await sor.R('llmusage', {'id': luid})
if recs:
r = recs[0]
failed_id = getID()
failed_rec = {
'id': failed_id,
'llmusageid': luid,
'llmid': r.llmid,
'userid': r.userid,
'userorgid': r.userorgid,
'use_date': r.use_date,
'use_time': r.use_time,
'amount': r.amount,
'cost': r.cost,
'failed_reason': reason or 'accounting failed',
'failed_time': env.timestampstr(),
'retry_count': 0,
'handled': '0'
}
await sor.C('llmusage_accounting_failed', failed_rec)
async def backup_accounted_llmusage():
"""Backup yesterday's accounted records to history table and remove from llmusage."""
env = ServerEnv()
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
ts = env.timestampstr()
batched = 0
async with get_sor_context(env, 'llmage') as sor:
# Select yesterday's accounted records
sql = """select * from llmusage
where accounting_status='accounted'
and use_date < ${yesterday}$"""
recs = await sor.sqlExe(sql, {'yesterday': yesterday})
if not recs:
debug(f'backup_accounted_llmusage: no records to backup for use_date < {yesterday}')
return 0
debug(f'backup_accounted_llmusage: {len(recs)} records to backup')
for r in recs:
history_rec = {
'id': r.id,
'llmid': r.llmid,
'use_date': r.use_date,
'use_time': r.use_time,
'userid': r.userid,
'usages': r.usages,
'ioinfo': r.ioinfo,
'transno': r.transno,
'responsed_seconds': r.responsed_seconds,
'finish_seconds': r.finish_seconds,
'status': r.status,
'taskid': r.taskid,
'amount': r.amount,
'cost': r.cost,
'userorgid': r.userorgid,
'ownerid': r.ownerid,
'accounting_status': r.accounting_status,
'backup_time': ts
}
await sor.C('llmusage_history', history_rec)
# Delete from main table
await sor.D('llmusage', {'id': r.id})
batched += 1
debug(f'backup_accounted_llmusage: backed up {batched} records')
return batched
async def get_failed_accounting_records(filters=None, page=1, page_size=50):
"""Search failed accounting records with optional filters.
filters: dict with optional keys:
- userorgid: filter by user organization
- llmid: filter by model ID
- handled: '0' or '1'
- start_date: filter use_date >= start_date
- end_date: filter use_date <= end_date
"""
async with get_sor_context(ServerEnv(), 'llmage') as sor:
conditions = []
ns = {}
if filters:
if filters.get('userorgid'):
conditions.append("userorgid=${userorgid}$")
ns['userorgid'] = filters['userorgid']
if filters.get('llmid'):
conditions.append("llmid=${llmid}$")
ns['llmid'] = filters['llmid']
if filters.get('handled') is not None:
conditions.append("handled=${handled}$")
ns['handled'] = filters['handled']
if filters.get('start_date'):
conditions.append("use_date>=${start_date}$")
ns['start_date'] = filters['start_date']
if filters.get('end_date'):
conditions.append("use_date<=${end_date}$")
ns['end_date'] = filters['end_date']
where = ""
if conditions:
where = "where " + " and ".join(conditions)
# Count total
count_sql = f"select count(*) as cnt from llmusage_accounting_failed {where}"
count_recs = await sor.sqlExe(count_sql, ns)
total = count_recs[0].cnt if count_recs else 0
# Query with pagination
offset = (page - 1) * page_size
query_sql = f"""select * from llmusage_accounting_failed {where}
order by failed_time desc limit {page_size} offset {offset}"""
recs = await sor.sqlExe(query_sql, ns)
return {
'total': total,
'page': page,
'page_size': page_size,
'records': recs
}
async def backend_accounting():
env = ServerEnv()
debug(f'backend accounting started ...')
backup_counter = 0
while True:
try:
lus = await get_accounting_llmusages()
except Exception as e:
exception(f'{e}')
lus = []
# debug(f'{len(lus)=} need to accounting........')
for lu in lus:
try:
tpac = await get_user_tpac(lu.userid)
if tpac:
debug(f'{lu.id=},{lu.userid=}, {tpac=}, go tpac')
await tpac_accounting(tpac, lu.userid, lu.llmid, lu.amount, lu.usages, lu.id)
else:
debug(f'{lu.id=},{lu.userid=}, {tpac=}, go local')
await llm_accounting(lu)
except Exception as e:
exception(f'{e}, {lu.id=}')
await llm_accoung_failed(lu.id, reason=str(e))
# Run backup every 100 iterations (roughly every ~1000 seconds)
backup_counter += 1
if backup_counter >= 100:
backup_counter = 0
try:
await backup_accounted_llmusage()
except Exception as e:
exception(f'backup_accounted_llmusage failed: {e}')
await asyncio.sleep(10)

View File

@ -1,203 +0,0 @@
import json
import time
import asyncio
from random import randint
from functools import partial
from traceback import format_exc
from sqlor.dbpools import DBPools, get_sor_context
from appPublic.log import debug, exception, error, critical
from appPublic.uniqueID import getID
from appPublic.dictObject import DictObject
from appPublic.timeUtils import curDateString, timestampstr, timestampAdd
from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64
from ahserver.serverenv import get_serverenv, ServerEnv
from ahserver.filestorage import FileStorage
from .accounting import llm_accounting, llm_charging
from .utils import *
async def get_today_asynctask_list(userid):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
today = await env.get_business_date(sor)
sql = '''select * from llmusage
where userid=${userid}$
and use_date = ${date}$'''
recs = await sor.sqlExe(sql, {
'date': today,
'userid': userid
})
return recs
return []
async def get_asynctask_status(request, taskid):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
recs = await sor.R('llmusage', {'taskid': taskid})
if recs:
r = recs[0]
output = await get_lastoutput(r.ioinfo)
t = timestampAdd(r.use_time, 600)
now = time.time()
if r.status not in ['UNKNOWN', 'FAILED', 'SUCCEEDED'] and now > t:
asyncio.create_task(query_task_status(request, r.id))
return output
return {
'taskid': taskid,
'status': 'FAILED',
'error': f'taskid={taskid} not exist'
}
return {
'taskid': taskid,
'status': 'FAILED',
'error': f'system error'
}
async def async_uapi_request(request, llm,
callerid, callerorgid, params_kw=None):
env = request._run_ns.copy()
if not params_kw:
params_kw = env.params_kw
# callerorgid = await env.get_userorgid()
# callerid = await env.get_user()
uapi = env.UpAppApi(request)
userid = await env.uapi_data.get_calluserid(llm.upappid, orgid=llm.ownerid)
b = None
luid = getID()
try:
start_timestamp = time.time()
if llm.callbackurl:
params_kw.callbackurl = llm.callbackurl
b = None
try:
b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw)
except Exception as e:
estr = erase_apikey(e)
ed = {"error": f"ERROR:{estr}", "status": "FAILED"}
exception(f'{ed}')
yield f'{ed}\n'
return
if isinstance(b, bytes):
b = b.decode('utf-8')
debug(f'task submited:{b}')
d = DictObject(**json.loads(b))
responsed_seconds = time.time() - start_timestamp
finish_seconds = responsed_seconds
llmusage = DictObject()
llmusage.id = luid
llmusage.llmid = llm.id
llmusage.use_date = curDateString()
llmusage.use_time = timestampstr()
llmusage.userid = callerid
ioinfo = {
"input": params_kw,
'output': [d]
}
webpath = await write_llmio(llmusage.id, ioinfo)
llmusage.ioinfo = webpath
llmusage.taskid = d.taskid
llmusage.transno = params_kw.transno
llmusage.responsed_seconds = responsed_seconds
llmusage.finish_seconds = finish_seconds
llmusage.status = d.status
llmusage.userorgid = callerorgid
llmusage.ownerid = llm.ownerid
llmusage.accounting_status = 'created'
b = json.dumps(d, ensure_ascii=False)
yield b
await write_llmusage(llmusage)
# if llm.callbackurl:
# return
if d.status == 'FAILED':
e = Exception(f'resp={d} FFAILED')
return
asyncio.create_task(query_task_status(request, luid))
except Exception as e:
ed = {"error": f"ERROR:{e}", "status": "FAILED"}
s = json.dumps(ed, ensure_ascii=False)
s = ''.join(s.split('\n'))
exception(s)
yield f'{s}\n'
return
async def modify_llmusage(ns):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
await sor.U('llmusage', ns.copy())
async def get_llm_llmusage(luid):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
recs = await sor.R('llmusage', {'id': luid})
if len(recs) == 0:
e = Exception(f'{luid=} is not found in llmusage')
exception(f'{e}')
raise e
llmusage = recs[0]
if llmusage.status == 'UNKNOWN':
return
if llmusage.status == 'SUCCEEDED':
return
if llmusage.status == 'FAILED':
return
llms = await sor.R('llm', {'id': llmusage.llmid})
if len(llms) == 0:
e = Exception(f'{llmusage.llmid=} not found in llm')
exception(f'{e}')
raise e
llm = llms[0]
return llm, llmusage
async def query_task_status(request, luid, onetime=False):
env = ServerEnv()
uapi = env.UpAppApi(request)
llm, llmusage = await get_llm_llmusage(luid)
userid = await env.uapi_data.get_calluserid(llm.upappid, orgid=llm.ownerid)
taskid = llmusage.taskid
upappid = llm.upappid
apinames = llm.query_apiname.split(',')
for apiname in apinames:
while True:
lastoutout = await get_lastoutput(llmusage.ioinfo)
if lastoutout['status'] in ['UNKNOWN', 'FAILED', 'SUCCEEDED']:
critical(f"{lastoutout['status']=}")
return
ns = {'taskid': taskid}
new_output = b = d = None
try:
b = await uapi.call(upappid, apiname, userid, params=ns)
if isinstance(b, bytes):
b = b.decode('utf-8')
new_output = json.loads(b)
except Exception as e:
exception(f'{e}, {b=}')
new_output = {
'status': 'FAILED',
'error': f'{b},{e}'
}
if not new_output.get('status'):
e = Exception(f"{new_output=} {upappid=}, {apiname=} has not status field")
critical(f'{e}')
raise e
if lastoutout['status'] != new_output.get('status'):
llmusage.status = new_output['status']
ns = {
'id': llmusage.id,
'status': llmusage.status
}
if 'usage' in new_output.keys():
ns['usages'] = json.dumps(new_output['usage'])
await append_new_llmoutput(llmusage.ioinfo, new_output)
await modify_llmusage(ns)
if llmusage.status in ['UNKNOWN', 'FAILED', 'SUCCEEDED']:
critical(f'finished .. {llmusage.status=}')
return
if onetime:
critical(f'onetime is true, returned')
return
await asyncio.sleep(llm.query_period or 30)
critical(f'{llm.query_period=} seconds will retry, {new_output["status"]=}')

View File

@ -1,33 +0,0 @@
from ahserver.serverenv import ServerEnv
from appPublic.dictObject import DictObject
from sqlor.dbpools import get_sor_context
async def asynctask_callback(appname, apiname, params_kw)
env = ServerEnv()
llmusage = None
async with get_sor_context(env, 'llmage') as sor:
uapi = await env.sor_get_uapi_by_appname_apiname(appname, apiname)
try:
dstr = await env.tmpl_engine.renders(uapi.response, params_kw)
d = DictObject(**json.loads(dstr))
llmus = await sor.R('llmusage', {'taskid': d.taskid})
if len(llmus) == 0:
e = Exception(f'{d=}, {taskid=} not found')
exception(f'{e}')
raise e
llmusage = llmus[0]
io = json.loads(llmusage.ioinfo)
out = io.get('output')
out.append(d)
llmusage.ioinfo = json.dumps(io, ensure_ascii=False)
llmusage.status = d.status
await sor.U('llmusage', llmusage)
except Exception as e:
e = Exception(f'{uapi.response=}, {params_kw=} render error')
exception(f'{e}')
raise e
if llmusage:
await llm_accounting(llmusage)

View File

@ -1,68 +0,0 @@
import asyncio
from appPublic.registerfunction import RegisterFunction
from sqlor.dbpools import DBPools
from ahserver.serverenv import ServerEnv
from appPublic.log import debug
from .keling import keling_token
from .jimeng import jimeng_auth_headers
from .utils import (
llm_query_orders,
read_webpath,
llm_query_price,
get_llm_by_model,
get_llms_by_catelog,
get_llms_sort_by_provider,
get_llmcatelogs,
get_llms_by_catelog_to_customer,
get_llmproviders,
get_llm,
)
from .llmclient import (
inference_generator,
inference
)
from .accounting import (
checkCustomerBalance,
llm_charging,
get_accounting_llmusages,
backend_accounting,
llm_accounting,
backup_accounted_llmusage,
get_failed_accounting_records,
llm_accoung_failed
)
from .asyncinference import (
get_asynctask_status,
query_task_status,
get_today_asynctask_list
)
def load_llmage():
env = ServerEnv()
env.llm_query_orders = llm_query_orders
env.read_webpath = read_webpath
env.get_llm_by_model = get_llm_by_model
env.llm_charging = llm_charging
env.get_accounting_llmusages = get_accounting_llmusages
env.llm_accounting = llm_accounting
env.get_today_asynctask_list = get_today_asynctask_list
env.get_asynctask_status = get_asynctask_status
env.query_task_status = query_task_status
env.get_llm = get_llm
env.inference = inference
env.inference_generator = inference_generator
env.get_llms_by_catelog = get_llms_by_catelog
env.get_llmcatelogs = get_llmcatelogs
env.checkCustomerBalance = checkCustomerBalance
env.get_llmproviders = get_llmproviders
env.get_llms_sort_by_provider = get_llms_sort_by_provider
env.keling_token = keling_token
env.llm_query_price = llm_query_price
env.get_llms_by_catelog_to_customer = get_llms_by_catelog_to_customer
env.backup_accounted_llmusage = backup_accounted_llmusage
env.get_failed_accounting_records = get_failed_accounting_records
rf = RegisterFunction()
rf.register('jimeng_auth_headers', jimeng_auth_headers)

View File

@ -1,98 +0,0 @@
import hashlib
import datetime
from datetime import timezone
import hmac
import json
from urllib.parse import quote
Service = "visual"
Version = "2022-08-31"
Region = "cn-north-1"
Host = "visual.volcengineapi.com"
ContentType = "application/json"
def utc_now():
try:
from datetime import timezone
return datetime.datetime.now(timezone.utc)
except ImportError:
class UTC(datetime.tzinfo):
def utcoffset(self, dt):
return datetime.timedelta(0)
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return datetime.timedelta(0)
return datetime.datetime.now(UTC())
def jm_timestamp():
dt = utc_new()
return dt.strftime("%Y%m%dT%H%M%SZ")
# sha256 非对称加密
def hmac_sha256(key: bytes, content: str):
return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest()
# sha256 hash算法
def hash_sha256(content: str):
return hashlib.sha256(content.encode("utf-8")).hexdigest()
def jimeng_auth_headers(opts):
apikey = opts.get('apikey')
secretkey = opts.get('secretkey')
path = opts.get('path')
method = opts.get('method')
params = opts.get('params')
body = opts.get('body')
headers = opts.get('headers')
content_type = headers.get('Content-Type')
x_date = jm_timestamp()
short_x_date = DT[:8]
credential = {
"access_key_id": apikey,
"secret_access_key": secretkey,
"service": Service,
"region": Region,
}
x_content_sha256 = hash_sha256(body)
sign_result = {
"Host": Host,
"X-Content-Sha256": x_content_sha256,
"X-Date": x_date,
"Content-Type": ContentType
}
headers.update(sign_result)
signed_headers_str = ";".join(
["content-type", "host", "x-content-sha256", "x-date"]
)
canonical_request_str = "\n".join(
[method.upper(),
path,
norm_query(params),
"\n".join(
[
"content-type:" + content_type,
"host:" + Host,
"x-content-sha256:" + x_content_sha256,
"x-date:" + x_date,
]
),
"",
signed_headers_str,
x_content_sha256,
]
)
hashed_canonical_request = hash_sha256(canonical_request_str)
credential_scope = "/".join([short_x_date, credential["region"], credential["service"], "request"])
string_to_sign = "\n".join(["HMAC-SHA256", x_date, credential_scope, hashed_canonical_request])
k_date = hmac_sha256(secretkey.encode("utf-8"), short_x_date)
k_region = hmac_sha256(k_date, credential["region"])
k_service = hmac_sha256(k_region, credential["service"])
k_signing = hmac_sha256(k_service, "request")
signature = hmac_sha256(k_signing, string_to_sign).hex()
headers['Authorization'] = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format(
apikey + "/" + credential_scope,
signed_headers_str,
signature,
)

View File

@ -1,18 +0,0 @@
import time
import jwt
ak = "" # 填写access key
sk = "" # 填写secret key
def keling_token(ak, sk):
headers = {
"alg": "HS256",
"typ": "JWT"
}
payload = {
"iss": ak,
"exp": int(time.time()) + 1800, # 有效时间,此处示例代表当前时间+1800s(30min)
"nbf": int(time.time()) - 5 # 开始生效的时间,此处示例代表当前时间-5秒
}
token = jwt.encode(payload, sk, headers=headers)
return token

View File

@ -1,148 +0,0 @@
import json
import time
import asyncio
from random import randint
from functools import partial
from traceback import format_exc
from appPublic.log import debug, exception, error
from appPublic.uniqueID import getID
from appPublic.dictObject import DictObject
from appPublic.timeUtils import curDateString, timestampstr
from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64
from ahserver.serverenv import get_serverenv, ServerEnv
from ahserver.filestorage import FileStorage
from .asyncinference import async_uapi_request
from .syncinference import sync_uapi_request
from .accounting import llm_accounting, llm_charging
from .utils import *
async def uapi_request(request, llm, callerid, callerorgid, params_kw=None):
env = request._run_ns.copy()
if not params_kw:
params_kw = env.params_kw
# callerorgid = await env.get_userorgid()
# callerid = await env.get_user()
uapi = env.UpAppApi(request)
userid = await env.uapi_data.get_calluserid(llm.upappid, orgid=llm.ownerid)
outlines = []
txt = ''
luid = getID()
try:
start_timestamp = time.time()
responsed_seconds = None
finish_seconds = None
first = True
usage = None
async for l in uapi.stream_linify(llm.upappid, llm.apiname, userid,
params=params_kw):
if first:
first = False
responsed_seconds = time.time() - start_timestamp
if isinstance(l, bytes):
l = l.decode('utf-8')
if l[-1] == '\n':
l = l[:-1]
debug(f'stream response line={l},{type(l)}')
l = ''.join(l.split('\n'))
if l and l != '[DONE]':
yield_it = False
d = {}
try:
d = json.loads(l)
except Exception as e:
debug(f'json.loads({l}) error({e})')
continue
if d.get('reasoning_content'):
txt += d.get('reasoning_content')
yield_it = True
if d.get('content'):
txt = txt + d['content']
yield_it = True
if d.get('usage'):
usage = d['usage']
d['llmusageid'] = luid
outlines.append(d)
yield json.dumps(d, ensure_ascii=False) + '\n'
if usage is None:
error(f'{llm=} response has not usage')
finish_seconds = time.time() - start_timestamp
if responsed_seconds is None:
responsed_seconds = finish_seconds
llmusage = DictObject()
llmusage.id = luid
llmusage.llmid = llm.id
llmusage.use_date = curDateString()
llmusage.use_time = timestampstr()
llmusage.userid = callerid
llmusage.usages = json.dumps(usage, ensure_ascii=False, indent=4)
debug(f' {usage=}, {type(usage)=}, {llmusage.usages=}')
ioinfo = {
"input": params_kw,
'output': outlines
}
webpath = await write_llmio(llmusage.id, ioinfo)
llmusage.ioinfo = webpath
llmusage.transno = params_kw.transno
llmusage.responsed_seconds = responsed_seconds
llmusage.finish_seconds = finish_seconds
llmusage.status = 'SUCCEEDED'
llmusage.userorgid = callerorgid
llmusage.ownerid = llm.ownerid
llmusage.accounting_status = 'created'
await write_llmusage(llmusage)
except Exception as e:
exception(f'{e=},{format_exc()}')
estr = erase_apikey(e)
ed = {"error": f"ERROR:{estr}", "status": "FAILED" ,"llmusageid": luid}
s = json.dumps(ed, ensure_ascii=False)
s = ''.join(s.split('\n'))
outlines.append(ed)
yield f'{s}\n'
return
async def inference_generator(request, *args, params_kw=None, **kw):
env = request._run_ns.copy()
callerorgid = await env.get_userorgid()
callerid = await env.get_user()
async for d in _inference_generator(request, callerid,
callerorgid, params_kw=params_kw, **kw):
yield d
async def _inference_generator(request, callerid, callerorgid,
params_kw={}, **kw):
env = request._run_ns
if not params_kw:
params_kw = env.params_kw
if not params_kw.transno:
params_kw.transno = getID()
llmid = params_kw.llmid
f = None
llm = await get_llm(llmid)
if llm is None:
errmsg = f'{{"status": "FAILED", "error":"llmid:{llmid}没找到模型"}}\n'
exception(errmsg)
yield errmsg
return
if not params_kw.model:
params_kw.model = llm.model
if llm.stream == 'async':
if llm.callbackurl:
cb_url = env.entire_url(llm.callbackurl)
params_kw.callbackurl = cb_url
f = partial(async_uapi_request, request, llm, callerid, callerorgid, params_kw=params_kw)
elif not params_kw.stream:
llm.stream = False
debug(f'---{params_kw.stream=}, {llm.stream=} ---use sync_uapi_request ')
f = partial(sync_uapi_request, request, llm, callerid, callerorgid, params_kw=params_kw)
else:
llm.stream = True
debug(f'---{params_kw.stream=}, {llm.stream=} ---use uapi_request ')
f = partial(uapi_request, request, llm, callerid, callerorgid, params_kw=params_kw)
async for d in f():
yield d
async def inference(request, *args, params_kw=None, **kw):
env = request._run_ns.copy()
f = partial(inference_generator, request, *args, params_kw=params_kw, **kw)
return await env.stream_response(request, f)

View File

@ -1,69 +0,0 @@
from appPublic.myTE import MyTemplateEngine
def default_sysmessage():
return """{
"role":"system",
"content":"{{content}}"
}"""
def default_usrmessage():
return """{
"role":"user",
"content":"{{prompt}}"
}"""
def default_llmmessage():
return """{
"role":"assisant",
"content":"{{content}}"
}"""
class BaseMessages:
def __init__(self, request, llmid, sys_message, usr_message, llm_message):
self.request = request
self.llmid = llmid
self.sys_message = sys_message
self.usr_message = usr_message
self.llm_message = llm_message
self.te = MyTemplateEngine([])
async def append_meessages(self, msg_format, **kw):
m = self.te.renders(msg_format, kw)
msgs = await self.get_messages()
msgs.append(m)
await self.set_message(msgs)
return msgs
async def append_usr_messages(self, **kw):
return await self.append_messages(self.usr_message, **kw)
async def append_sys_messages(self, **kw):
return await self.append_messages(self.sys_message, **kw)
async def append_llm_messages(self, **kw):
return await self.append_messages(self.llm_message, **kw)
async def get_messages(self):
return []
async def set_messages(self, msgs):
pass
class SessionMessages(BaseMessages):
async def get_messages(self):
env = self.request._run_ns
s = await env.get_session()
userid = await env.get_user()
mk = f'{self.llmid}_{userid}_msgs'
msgs = s[mk]
if not msgs:
msgs = []
return msgs
async def set_messages(self, msgs):
env = self.request._run_ns
s = await env.get_session()
userid = await env.get_user()
mk = f'{self.llmid}_{userid}_msgs'
s[mk] = msgs

View File

@ -1,98 +0,0 @@
import json
import time
import asyncio
from random import randint
from functools import partial
from traceback import format_exc
from sqlor.dbpools import DBPools, get_sor_context
from appPublic.log import debug, exception, error
from appPublic.uniqueID import getID
from appPublic.dictObject import DictObject
from appPublic.timeUtils import curDateString, timestampstr
from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64
# from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi
from ahserver.serverenv import get_serverenv, ServerEnv
from ahserver.filestorage import FileStorage
from .accounting import llm_accounting, llm_charging
from .utils import *
async def sync_uapi_request(request, llm, callerid, callerorgid, params_kw=None):
env = request._run_ns.copy()
if not params_kw:
params_kw = env.params_kw
# callerid = await env.get_user()
# callerorgid = await env.get_userorgid()
# uapi = UAPI(request, sor=sor)
uapi = env.UpAppApi(request)
userid = await env.uapi_data.get_calluserid(llm.upappid, orgid=llm.ownerid)
outlines = []
b = None
d = None
luid = getID()
try:
start_timestamp = time.time()
responsed_seconds = None
finish_seconds = None
b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw)
if isinstance(b, bytes):
b = b.decode('utf-8')
d = json.loads(b)
status = d.get('status')
usage = d.get('usage')
if status and status != 'SUCCEEDED':
raise Exception(d['error'])
responsed_seconds = time.time() - start_timestamp
finish_seconds = responsed_seconds
llmusage = DictObject()
llmusage.id = luid
llmusage.llmid = llm.id
llmusage.use_date = curDateString()
llmusage.use_time = timestampstr()
llmusage.userid = callerid
llmusage.usages = json.dumps(usage, ensure_ascii=False)
ioinfo = {
"input": params_kw,
'output': [d]
}
webpath = await write_llmio(llmusage.id, ioinfo)
llmusage.ioinfo = webpath
llmusage.transno = params_kw.transno
llmusage.responsed_seconds = responsed_seconds
llmusage.finish_seconds = finish_seconds
llmusage.status = 'SUCCEEDED'
llmusage.amount = llmusage.cost = 0.00
""" 联机不记账
if llm.ppid:
try:
charging = await llm_charging(llm.ppid, llmusage)
if charging:
llmusage.amount = charging.amount
llmusage.cost = charging.cost
else:
llmusage.amount = llmusage.cost = 0.0
except Exception as e:
e = Exception(f'{llm.pid} charging error{e}')
exception(f'{e}')
else:
llmusage.amount = 0
llmusage.cost = 0
"""
llmusage.userorgid = callerorgid
llmusage.ownerid = llm.ownerid
llmusage.accounting_status = 'created'
b = json.dumps(d, ensure_ascii=False)
yield b
await write_llmusage(llmusage)
"""联机不记账
if llmusage.amount > 0.0001:
await llm_accounting(llmusage)
"""
except Exception as e:
exception(f'{e=},{format_exc()}, {b=}')
estr = erase_apikey(e)
ed = {"error": f"ERROR:{estr}", "status": "FAILED" ,"llmusageid": luid}
s = json.dumps(ed, ensure_ascii=False)
s = ''.join(s.split('\n'))
outlines.append(ed)
yield f'{s}\n'

View File

@ -1,348 +0,0 @@
import json
import time
import asyncio
import aiofiles
from random import randint
from functools import partial
from traceback import format_exc
from sqlor.dbpools import DBPools, get_sor_context
from appPublic.log import debug, exception, error, critical
from appPublic.uniqueID import getID
from appPublic.dictObject import DictObject
from appPublic.timeUtils import curDateString, timestampstr
from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi
from ahserver.serverenv import get_serverenv, ServerEnv
from ahserver.filestorage import FileStorage
from appPublic.jsonConfig import getConfig
from appPublic.streamhttpclient import StreamHttpClient
async def update_llmusage(ns):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
await sor.U('llmusage', ns)
async def get_user_tpac(userid):
env = ServerEnv()
config = getConfig()
async with get_sor_context(env, 'rbac') as sor:
recs = await sor.R('users', {'id': userid})
if recs:
tpac = config.tpacs.get(recs[0].sync_from)
return tpac
return None
async def get_tpac_balance(tpac, userid):
url = tpac.get_tpac_balance_url
hc = StreamHttpClient()
try:
b = await hc.request('GET', url, params={'userid': userid})
if b:
d = json.loads(b.decode('utf-8'))
if d['status'] == 'ok':
return d['balance']
exception(f'{url=}, {userid=}, {b} error')
return None
except Exception as e:
exception(f'{url=}, {userid=}, error:{e}')
return None
async def tpac_accounting(tpac, userid, llmid, amount, usage, luid):
url = tpac.tpac_accounting_url
hc = StreamHttpClient()
d = {
'userid': userid,
'llmid': llmid,
'amount': amount,
'usage': usage
}
status = 'failed'
try:
b = await hc.request('POST', url, data=d)
d = json.loads(b.decode('utf-8'))
if d['status'] == 'ok':
debug(f'{d=}')
await update_llmusage({'id': luid, 'accounting_status': 'accounted'})
return
raise Exception(f'{d} tpac accounting error')
except Exception as e:
exception(f'{userid=}, {llmid=}, {amount=}, {usage=} tpac accounting error:{e}')
raise e
async def append_new_llmoutput(webpath, output):
fs = FileStorage()
p = fs.realPath(webpath)
if isinstance(output, str):
output = json.loads(output)
bin = await read_webpath(webpath)
io = json.loads(bin.decode('utf-8'))
io['output'].append(output)
async with aiofiles.open(p, 'wb') as f:
iostr = json.dumps(io, ensure_ascii=False, indent=4)
await f.write(iostr.encode('utf-8'))
async def get_lastoutput(webpath):
bin = await read_webpath(webpath)
io = json.loads(bin.decode('utf-8'))
return io['output'][-1]
async def read_webpath(webpath):
fs = FileStorage()
p = fs.realPath(webpath)
async with aiofiles.open(p,'rb') as f:
bin = await f.read()
return bin
async def write_llmio(luid, io_dic):
fs = FileStorage()
s = io_dic
if not isinstance(io_dic, str):
s = json.dumps(io_dic, ensure_ascii=False, indent=4)
name = f'{luid}.json'
webpath = await fs.save(name, s, userid='llmio')
return webpath
async def llm_query_orders(userorgid, page, pagerows=80):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
sql = """select a.llmid,
a.use_date,
a.use_time,
a.userid,
a.usages,
a.status,
a.amount,
a.userorgid,
a.accounting_status,
b.name,
b.model
from llmusage a, llm b
where userorgid = ${userorgid}$
and a.llmid = b.id
"""
ns = dict(
page=page,
pagerows=pagerows,
sort="use_time desc",
userorgid=userorgid)
data = await sor.sqlExe(sql, ns)
return data
return {'total': 0, 'rows':[]}
async def get_llm_by_model(id, lctype=None):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
sql = 'select * from llm where model=${model}$'
recs = await sor.R('llm', {'model': model})
return recs
def erase_apikey(e):
e = str(e)
ss = e.split('Bearer ')
if len(ss) < 2:
return e
for i, c in enumerate(ss[1]):
if c in ['"', "'"]:
newb = "XXXXXXXX" + ss[1][i:]
break
return ss[0] + 'Bearer ' + newb
async def get_llmproviders():
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
sql = """select a.providerid, a.iconid, b.orgname
from llm a, organization b
where a.providerid = b.id
group by a.providerid, a.iconid, b.orgname"""
return await sor.sqlExe(sql, {})
return []
async def get_llms_sort_by_provider():
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
today = curDateString()
sql = """select a.*, b.orgname from llm a, organization b
where a.enabled_date <= ${today}$
and a.expired_date > ${today}$
and a.providerid = b.id
order by a.providerid, a.id
"""
recs = await sor.sqlExe(sql, {'today': today})
d = []
x = None
oldpid = '-111'
for l in recs:
if l.providerid != oldpid:
x = {
'id': l.providerid,
'orgname': l.orgname,
'llms': [l]
}
d.append(x)
oldpid = l.providerid
else:
x['llms'].append(l)
return d
return []
async def get_llmcatelogs():
db = DBPools()
dbname = get_serverenv('get_module_dbname')('llmage')
async with db.sqlorContext(dbname) as sor:
recs = await sor.R('llmcatelog', {})
return recs
return []
async def get_llms_by_catelog_to_customer(catelogid=None, orderby='providerid'):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
today = curDateString()
# Join with llm_api_map to get catalog relationship
sql = """select distinct a.*,
b.name as catelogname,
m.llmcatelogid as catelog_id,
m.apiname,
m.query_apiname,
m.query_period,
m.ppid
from llm a
join llm_api_map m on a.id = m.llmid
join llmcatelog b on m.llmcatelogid = b.id
where a.enabled_date <= ${today}$
and m.ppid is not null
and a.expired_date > ${today}$
"""
sortstr='catelog_id, ' + orderby
params = {'today': today, 'sort': sortstr}
if catelogid:
sql += " and m.llmcatelogid = ${catelogid}$"
params['catelogid'] = catelogid
debug(f'{sql=}')
recs = await sor.sqlExe(sql, params.copy())
debug(f'{sql=}, {recs=}, {params=}')
d = []
cid = ''
x = None
for r in recs:
if cid != r.catelog_id:
x = {
'catelogid': r.catelog_id,
'catelogname': r.catelogname,
'llms': [r]
}
d.append(x)
cid = r.catelog_id
else:
x['llms'].append(r)
return d
return []
async def get_llms_by_catelog(catelogid=None, orderby='providerid'):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
today = curDateString()
# Join with llm_api_map to get catalog relationship
sql = """select distinct a.*, b.name as catelogname, m.llmcatelogid as catelog_id
from llm a
join llm_api_map m on a.id = m.llmid
join llmcatelog b on m.llmcatelogid = b.id
where a.enabled_date <= ${today}$
and a.expired_date > ${today}$"""
params = {'today': today, 'sort': orderby}
if catelogid:
sql += " and m.llmcatelogid = ${catelogid}$"
params['catelogid'] = catelogid
sql += " order by m.llmcatelogid, a.id"
recs = await sor.sqlExe(sql, params)
d = []
cid = ''
x = None
for r in recs:
if cid != r.catelog_id:
x = {
'catelogid': r.catelog_id,
'catelogname': r.catelogname,
'llms': [r]
}
d.append(x)
cid = r.catelog_id
else:
x['llms'].append(r)
return d
return []
async def get_llm(llmid, catelogid=None):
today = curDateString()
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
sql = """select a.id,
a.name,
a.model,
a.providerid,
a.description,
a.iconid,
a.upappid,
a.ownerid,
a.min_balance,
m.llmcatelogid,
m.apiname,
m.query_apiname,
m.query_period,
m.ppid,
e.ioid,
e.stream,
e.callbackurl,
f.input_fields,
lc.name as catelogname
from llm a
,llm_api_map m
,llmcatelog lc
,upapp c
,uapi e
,uapiio f
where a.id = m.llmid
and a.upappid = c.id
and c.id = e.upappid
and m.apiname = e.name
and e.ioid = f.id
and a.id = ${llmid}$
and a.expired_date > ${today}$
and a.enabled_date <= ${today}$
"""
ns = {'llmid': llmid, 'today': today}
if catelogid:
sql += ' and m.llmcatelogid = ${catelogid}$ '
ns['catelogid'] = catelogid
else:
sql += " and m.isdefaultcatelog = '1'"
recs = await sor.sqlExe(sql, ns.copy())
if len(recs) > 0:
r = recs[0]
return r
else:
debug(f'{llmid=} not found, {ns=}, {sql=}')
return None
exception(f'Error: {format_exc()}')
return None
async def write_llmusage(llmusage):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
await sor.C('llmusage', llmusage)
async def llm_query_price(llmid, config_data):
env = ServerEnv()
llm = await get_llm(llmid)
if llm.ppid is None:
e = Exception(f'{llm=} ppid is None')
exception(f'{e}')
raise e
prices = await env.buffered_charging(llm.ppid, config_data)
return prices