bugfix
This commit is contained in:
parent
ab60bebefd
commit
a9dac89119
@ -1,3 +1,6 @@
|
||||
import asyncio
|
||||
import time
|
||||
from datetime import datetime
|
||||
from appPublic.log import exception, debug
|
||||
from appPublic.uniqueID import getID
|
||||
from appPublic.dictObject import DictObject
|
||||
@ -52,17 +55,27 @@ async def checkCustomerBalance(llmid, userorgid):
|
||||
debug(f'{userorgid=} checkCustomerBalance() failed')
|
||||
return False
|
||||
|
||||
async def llm_accounting(request, llmusage):
|
||||
env = request._run_ns
|
||||
async def llm_accounting(llmusage):
|
||||
env = ServerEnv()
|
||||
llmid = llmusage.llmid
|
||||
async with get_sor_context(request._run_ns, 'llmage') as sor:
|
||||
async with get_sor_context(env, 'llmage') as sor:
|
||||
sql = "select * from llm where id=${llmid}$"
|
||||
recs = await sor.sqlExe(sql, {'llmid': llmusage.llmid})
|
||||
if len(recs) == 0:
|
||||
ns = {
|
||||
'id': llmusage.id,
|
||||
'accounting_status': 'failed'
|
||||
}
|
||||
await sor.U('llmusage', ns)
|
||||
e = Exception(f'llm not found({llmid})')
|
||||
exception(f'{e}')
|
||||
raise e
|
||||
if recs[0].ppid is None:
|
||||
ns = {
|
||||
'id': llmusage.id,
|
||||
'accounting_status': 'failed'
|
||||
}
|
||||
await sor.U('llmusage', ns)
|
||||
e = Exception(f'llm ({llmid}) donot has a pricing_program')
|
||||
exception(f'{e}')
|
||||
raise e
|
||||
@ -133,3 +146,52 @@ async def llm_accounting(request, llmusage):
|
||||
}
|
||||
await sor.U('llmusage', ns)
|
||||
|
||||
async def get_accounting_llmusages(luid=None):
|
||||
env = ServerEnv()
|
||||
lus = []
|
||||
t = time.time - 20
|
||||
dt = datetime.fromtimestamp(t)
|
||||
tsstr = dt.strftime('%Y-%m-%d %H:%M:%S.') + f'{dt.microsecond // 1000:03d}'
|
||||
async with get_sor_context(env, 'llmage') as sor:
|
||||
sql = """select a.*. b.ppid
|
||||
from llmusage a, llm b
|
||||
where a.llmid = b.id
|
||||
and a.status = 'SUCCEEDED'
|
||||
and a.use_time < ${tsstr}$
|
||||
and a.accounting_status='created'"""
|
||||
ns = {'tsstr': tsstr}
|
||||
if luid:
|
||||
sql += " and a.id=${luid}$"
|
||||
ns['luid'] = luid
|
||||
recs = await sor.sqlExe(sql, ns)
|
||||
for r in recs:
|
||||
if r.usages is None:
|
||||
io = json.loads(r.ioinfo)
|
||||
if len(io['output']) == 0:
|
||||
llmusage.accounting_status = 'failed'
|
||||
await sor.U('llmusage', {'id': llmusage.id, 'accounting_status': 'failed'})
|
||||
continue
|
||||
r.usages = io['output'][-1]['usage']
|
||||
if r.usages is None:
|
||||
llmusage.accounting_status = 'failed'
|
||||
await sor.U('llmusage', {'id': llmusage.id, 'accounting_status': 'failed'})
|
||||
continue
|
||||
try:
|
||||
r = await llm_charging(sor, llmusage.ppid, llmusage)
|
||||
except Exception as e:
|
||||
continue
|
||||
llmusage.amount = r.amount
|
||||
llmusage.cost = r.cost
|
||||
await sor.U('llmusage', llmusage.copy())
|
||||
lus.append(r)
|
||||
return lus
|
||||
|
||||
async def backend_accounting():
|
||||
env = ServerEnv()
|
||||
debug(f'backend accounting started ...')
|
||||
while True:
|
||||
lus = await get_accounting_llmusages()
|
||||
for lu in lus:
|
||||
await llm_accounting(lu)
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
|
||||
@ -226,7 +226,7 @@ async def query_task_status(request, upappid, apiname, luid, userid, taskid):
|
||||
if llmusage.accounting_status != 'accounted' \
|
||||
and changed.amount > 0.00001:
|
||||
try:
|
||||
await llm_accounting(request, llmusage)
|
||||
await llm_accounting(llmusage)
|
||||
except Exception as e:
|
||||
debug(f'{changed=} accounting failed,{e=} ')
|
||||
return
|
||||
|
||||
@ -49,6 +49,6 @@ async def asynctask_callbacka(appname, apiname, params_kw)
|
||||
raise e
|
||||
|
||||
if llmusage:
|
||||
await llm_accounting(request, llmusage)
|
||||
await llm_accounting(llmusage)
|
||||
|
||||
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
from appPublic.base64_to_file import hex2base64
|
||||
from appPublic.registerfunction import RegisterFunction
|
||||
from ahserver.serverenv import ServerEnv
|
||||
from ahserver.configuredServer import add_cleanupctx
|
||||
from .keling import keling_token
|
||||
from .jimeng import jimeng_auth_headers
|
||||
from .utils import (
|
||||
@ -22,6 +23,7 @@ from .llmclient import (
|
||||
from .accounting import (
|
||||
checkCustomerBalance,
|
||||
llm_charging,
|
||||
backend_accounting,
|
||||
llm_accounting
|
||||
)
|
||||
|
||||
@ -31,6 +33,11 @@ from .asyncinference import (
|
||||
get_today_asynctask_list
|
||||
)
|
||||
|
||||
async def start_backend(app):
|
||||
task = asyncio.create_task(abackend_accounting())
|
||||
yield
|
||||
task.cancel()
|
||||
|
||||
def load_llmage():
|
||||
env = ServerEnv()
|
||||
env.llm_query_orders = llm_query_orders
|
||||
@ -54,3 +61,4 @@ def load_llmage():
|
||||
env.llm_query_price = llm_query_price
|
||||
rf = RegisterFunction()
|
||||
rf.register('jimeng_auth_headers', jimeng_auth_headers)
|
||||
add_cleanupctx(start_backend)
|
||||
|
||||
@ -114,7 +114,7 @@ async def uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None)
|
||||
llmusage.accounting_status = 'created'
|
||||
await write_llmusage(llmusage)
|
||||
if llmusage.amount > 0.0001:
|
||||
await llm_accounting(request, llmusage)
|
||||
await llm_accounting(llmusage)
|
||||
|
||||
except Exception as e:
|
||||
exception(f'{e=},{format_exc()}')
|
||||
|
||||
@ -79,7 +79,7 @@ async def sync_uapi_request(request, llm, sor, callerid, callerorgid, params_kw=
|
||||
yield b
|
||||
await write_llmusage(llmusage)
|
||||
if llmusage.amount > 0.0001:
|
||||
await llm_accounting(request, llmusage)
|
||||
await llm_accounting(llmusage)
|
||||
except Exception as e:
|
||||
exception(f'{e=},{format_exc()}')
|
||||
estr = erase_apikey(e)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user