This commit is contained in:
yumoqing 2026-03-29 18:37:05 +08:00
parent 777b065a97
commit 5e01606bf4
4 changed files with 40 additions and 211 deletions

View File

@ -7,7 +7,23 @@ from ahserver.serverenv import ServerEnv
from accounting.consume import consume_accounting from accounting.consume import consume_accounting
from accounting.getaccount import getCustomerBalance from accounting.getaccount import getCustomerBalance
async def llm_charging(sor, ppid, userid, usage): async def llm_charging(sor, ppid, llmusage):
env = ServerEnv()
prices = await env.pricing_program_charging(sor, ppid, llmusage.usage)
amount = 0
cost = 0
for p in prices:
amount += p.amount
if p.cost:
cost += p.cost
discount = await env.sor_get_customer_discount(sor,
llmusage.ownerid,
llmusage.userorgid)
return DictObject(**{
'original_amount': amount,
'amount': amount * discount,
'cost': cost
})
async def checkCustomerBalance(llmid, userorgid): async def checkCustomerBalance(llmid, userorgid):
env = ServerEnv() env = ServerEnv()
@ -29,12 +45,11 @@ async def checkCustomerBalance(llmid, userorgid):
return ret return ret
return False return False
async def llm_accounting(request, llmid, async def llm_accounting(request, llmusage):
usage, customerid, userid, orderid=None):
env = request._run_ns env = request._run_ns
async with get_sor_context(request._run_ns, 'llmage') as sor: async with get_sor_context(request._run_ns, 'llmage') as sor:
sql = "select * from llm where id=${llmid}$" sql = "select * from llm where id=${llmid}$"
recs = await sor.sqlExe(sql, {'llmid': llmid}) recs = await sor.sqlExe(sql, {'llmid': llmusage.llmid})
if len(recs) == 0: if len(recs) == 0:
e = Exception(f'llm not found({llmid})') e = Exception(f'llm not found({llmid})')
exception(f'{e}') exception(f'{e}')
@ -45,29 +60,23 @@ async def llm_accounting(request, llmid,
raise e raise e
resellerid = recs[0].ownerid resellerid = recs[0].ownerid
providerid = recs[0].providerid providerid = recs[0].providerid
charges = await env.pricing_program_charging(sor, recs[0].ppid, usage) trans_amount = llmusage.amount
trans_amount = trans_cost = 0 trans_cost = llmusage.cost
for c in charges:
trans_amount += c.amount
trans_cost += c.cost
if trans_amount < 0.00001:
return
biz_date = await env.get_business_date(sor) biz_date = await env.get_business_date(sor)
timestamp = env.timestampstr() timestamp = env.timestampstr()
if orderid is None: orderid = getID()
orderid = getID() order = {
order = { "id": orderid,
"id": orderid, "customerid": customerid,
"customerid": customerid, "resellerid": resellerid,
"resellerid": resellerid, "order_date": biz_date,
"order_date": biz_date, "order_status": "1", # accounted
"order_status": "1", # accounted "business_op": "PAY",
"business_op": "PAY", "amount": trans_amount,
"amount": trans_amount, "userid": userid,
"userid": userid, "productid": llmid
"productid": llmid }
} await sor.C('biz_order', order)
await sor.C('biz_order', order)
orderdetail = { orderdetail = {
"id": getID(), "id": getID(),
"orderid": orderid, "orderid": orderid,

View File

@ -13,8 +13,7 @@ from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64
from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi
from ahserver.serverenv import get_serverenv, ServerEnv from ahserver.serverenv import get_serverenv, ServerEnv
from ahserver.filestorage import FileStorage from ahserver.filestorage import FileStorage
from llmage.accounting import llm_accounting from .accounting import llm_accounting, llm_charging
from .utils immport *
async def get_today_asynctask_list(userid): async def get_today_asynctask_list(userid):
env = ServerEnv() env = ServerEnv()
@ -147,8 +146,8 @@ async def query_task_status(request, upappid, apinames, luid, userid, taskid):
if rzt.status == 'SUCCEEDED': if rzt.status == 'SUCCEEDED':
if llm.ppid: if llm.ppid:
try: try:
chargings = await llm_charginng(sor, chargings = await llm_charging(sor,
llm.ppid, callerid, usage) llm.ppid, llmusage)
llmusage.amount = chargings.amount llmusage.amount = chargings.amount
llmusage.cost = chargings.cost llmusage.cost = chargings.cost
except Exception as e: except Exception as e:

View File

@ -232,7 +232,7 @@ async def uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None)
llmusage.status = 'SUCCEEDED' llmusage.status = 'SUCCEEDED'
if llm.ppid and callerorgid: if llm.ppid and callerorgid:
try: try:
chargings = await llm_charginng(sor, llm.ppid, callerid, usage) chargings = await llm_charging(sor, llm.ppid, llmusage)
llmusage.amount = chargings.amount llmusage.amount = chargings.amount
llmusage.cost = chargings.cost llmusage.cost = chargings.cost
except Exception as e: except Exception as e:
@ -258,185 +258,6 @@ async def uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None)
# await write_llmusage(luid, llm, callerid, None, params_kw, outlines, sor) # await write_llmusage(luid, llm, callerid, None, params_kw, outlines, sor)
return return
async def sync_uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None):
env = request._run_ns.copy()
if not params_kw:
params_kw = env.params_kw
# callerid = await env.get_user()
# callerorgid = await env.get_userorgid()
uapi = UAPI(request, sor=sor)
userid = await get_owner_userid(sor, llm)
outlines = []
b = None
d = None
t1 = t2 = t3 = time.time()
luid = getID()
try:
start_timestamp = time.time()
responsed_seconds = None
finish_seconds = None
b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw)
if isinstance(b, bytes):
b = b.decode('utf-8')
d = json.loads(b)
status = d.get('status')
usage = d.get('usage', {})
if status and status != 'SUCCEEDED':
raise Exception(d['error'])
responsed_seconds = time.time() - start_timestamp
finish_seconds = response_seconds
llmusage = DictObject()
llmusage.id = luid
llmusage.llmid = llm.id
llmusage.use_date = curDateString()
llmusage.use_time = timestampstr()
llmusage.userid = callerid
llmusage.usage = json.dumps(usage)
llmusage.ioinfo = json.dumps({
"input": params_kw,
"output": d
})
llmusage.transno = params_kw.transno
llmusage.responsed_seconds = responsed_seconds
llmusage.finish_seconds = finish_seconds
llmusage.status = 'SUCCEEDED'
if llm.ppid:
try:
chargings = await llm_charginng(sor, llm.ppid, callerid, usage)
llmusage.amount = chargings.amount
llmusage.cost = chargings.cost
except Exception as e:
e = Exception(f'{llm.pid} charging error{e}')
exception(f'{e}')
else:
llmusage.amount = 0
llmusage.cost = 0
llmusage.userorgid = callerorgid
llmusage.ownerid = llm.orgid
llmusage.accounting_status = 'created'
b = json.dumps(d, ensure_ascii=False)
yield b
await write_llmusage(llmusage)
await llm_accounting(request, llmusage)
except Exception as e:
exception(f'{e=},{format_exc()}')
estr = erase_apikey(e)
ed = {"error": f"ERROR:{estr}", "status": "FAILED" ,"llmusageid": luid}
s = json.dumps(ed)
s = ''.join(s.split('\n'))
outlines.append(ed)
yield f'{s}\n'
async def async_uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None):
env = request._run_ns.copy()
if not params_kw:
params_kw = env.params_kw
# callerorgid = await env.get_userorgid()
# callerid = await env.get_user()
uapi = UAPI(request, sor=sor)
userid = await get_owner_userid(sor, llm)
b = None
luid = getID()
try:
start_timestamp = time.time()
if llm.callbackurl:
params_kw.callbackurl = llm.callbackurl
b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw)
if isinstance(b, bytes):
b = b.decode('utf-8')
debug(f'task sumbited:{b}')
d = DictObject(**json.loads(b))
if d.status != 'SUCCEEDED':
e = Exception(f'resp={d} not success')
raise e
responsed_seconds = time.time() - start_timestamp
finish_seconds = response_seconds
llmusage = DictObject()
llmusage.id = luid
llmusage.llmid = llm.id
llmusage.use_date = curDateString()
llmusage.use_time = timestampstr()
llmusage.userid = callerid
llmusage.usage = json.dumps(usage)
llmusage.ioinfo = json.dumps({
"input": params_kw
})
llmusage.taskid = d.taskid
llmusage.transno = params_kw.transno
llmusage.responsed_seconds = responsed_seconds
llmusage.finish_seconds = finish_seconds
llmusage.status = 'CREATED'
llmusage.userorgid = callerorgid
llmusage.ownerid = llm.orgid
b = json.dumps(d, ensure_ascii=False)
yield b
await write_llmusage(llmusage)
await llm_accounting(request, llmusage)
if llm.callbackurl:
return
apinames = [ name.strip() for name in llm.query_apiname.split(',') ]
asyncio.create_task(query_task_status(request, llm.upappid,
apinames, luid, userid, d.taskid))
except Exception as e:
exception(f'{e=},{format_exc()}')
estr = erase_apikey(e)
ed = {"error": f"ERROR:{estr}", "status": "FAILED"}
s = json.dumps(ed)
s = ''.join(s.split('\n'))
yield f'{s}\n'
return
outlines.append(d)
t2 = time.time()
uapi = UAPI(request, sor=sor)
apinames = [ name.strip() for name in llm.query_apiname.split(',') ]
for apiname in apinames:
while True:
b = None
try:
b = await uapi.call(llm.upappid, apiname, userid, params=d)
except Exception as e:
exception(f'{e=},{format_exc()}')
estr = erase_apikey(e)
ed = {"error": f"ERROR:{estr}", "status": "FAILED" ,"llmusageid": luid}
s = json.dumps(ed)
s = ''.join(s.split('\n'))
outlines.append(ed)
yield f'{s}\n'
await write_llmusage(luid, llm, callerid, None, params_kw, outlines, sor)
return
if isinstance(b, bytes):
b = b.decode('utf-8')
d = json.loads(b)
rzt = DictObject(**d)
rzt['llmusageid'] = luid
b = json.dumps(rzt, ensure_ascii=False)
b = ''.join(b.split('\n'))
debug(f'response line = {b}')
yield b + '\n'
if not rzt.status or rzt.status == 'FAILED':
debug(f'{b=} {rzt=} has not status, return error')
outlines.append(rzt)
await write_llmusage(luid, llm, callerid, None, params_kw, outlines, sor)
return
if rzt.status == 'SUCCEEDED':
await asyncio.sleep(1)
outlines.append(rzt)
usage = rzt.get('usage', {})
t3 = time.time()
usage['response_time'] = t2 - t1
usage['finish_time'] = t3 -t1
await write_llmusage(luid, llm, callerid, usage, params_kw, outlines, sor)
if llm.ppid and callerorgid != llm.ownerid:
debug(f'{usage=},{llm.ownerid=},{callerorgid=}')
await llm_accounting(request, llm.id, usage, callerorgid, callerid)
d = rzt
break
period = llm.query_period or 30
await asyncio.sleep(period)
def b64media2url(request, mediafile): def b64media2url(request, mediafile):
env = request._run_ns env = request._run_ns
entire_url = env.entire_url entire_url = env.entire_url

View File

@ -13,7 +13,7 @@ from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64
from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi
from ahserver.serverenv import get_serverenv, ServerEnv from ahserver.serverenv import get_serverenv, ServerEnv
from ahserver.filestorage import FileStorage from ahserver.filestorage import FileStorage
from llmage.accounting import llm_accounting, llm_charginng from llmage.accounting import llm_accounting, llm_charging
async def sync_uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None): async def sync_uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None):
env = request._run_ns.copy() env = request._run_ns.copy()
@ -58,7 +58,7 @@ async def sync_uapi_request(request, llm, sor, callerid, callerorgid, params_kw=
llmusage.status = 'SUCCEEDED' llmusage.status = 'SUCCEEDED'
if llm.ppid: if llm.ppid:
try: try:
chargings = await llm_charginng(sor, llm.ppid, callerid, usage) chargings = await llm_charging(sor, llm.ppid, llmusage)
llmusage.amount = chargings.amount llmusage.amount = chargings.amount
llmusage.cost = chargings.cost llmusage.cost = chargings.cost
except Exception as e: except Exception as e: