This commit is contained in:
yumoqing 2026-03-28 20:41:49 +08:00
parent a3867d25bb
commit eeb88c1621
2 changed files with 126 additions and 52 deletions

View File

@ -7,6 +7,8 @@ from ahserver.serverenv import ServerEnv
from accounting.consume import consume_accounting
from accounting.getaccount import getCustomerBalance
async def llm_charging(sor, ppid, userid, usage):
async def checkCustomerBalance(llmid, userorgid):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:

View File

@ -147,21 +147,9 @@ where a.upappid=b.id
i = randint(0, len(recs)-1)
return recs[i].userid
async def write_llmusage(id, llm, userid, usage, params_kw, outdata, sor):
d = {
"id": id,
"llmid": llm.id,
"use_date": curDateString(),
"use_time": timestampstr(),
"userid": userid,
"transno": params_kw.transno,
"evalvalue": 0,
"usages": json.dumps(usage),
"ioinfo": json.dumps({
"input": params_kw,
"output": outdata
}, ensure_ascii=False)
}
async def write_llmusage(llmusage):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
await sor.C('llmusage', d)
async def uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None):
@ -179,16 +167,13 @@ async def uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None)
start_timestamp = time.time()
responsed_seconds = None
finish_seconds = None
t2 = start_timestamp
t3 = start_timestamp
first = True
usage = None
async for l in uapi.stream_linify(llm.upappid, llm.apiname, userid,
params=params_kw):
if first:
first = False
t2 = time.time()
responsed_seconds = t2 - start_timestamp
responsed_seconds = time.time() - start_timestamp
if isinstance(l, bytes):
l = l.decode('utf-8')
if l[-1] == '\n':
@ -216,8 +201,7 @@ async def uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None)
yield json.dumps(d) + '\n'
if usage is None:
error(f'{llm=} response has not usage')
t3 = time.time()
finish_seconds = t3 - start_timestamp
finish_seconds = time.time() - start_timestamp
if responsed_seconds is None:
responsed_seconds = finish_seconds
if not usage.get('completion_tokens'):
@ -245,16 +229,22 @@ async def uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None)
llmusage.finish_seconds = finish_seconds
llmusage.status = 'SUCCEEDED'
if llm.ppid and callerorgid:
chargings = await llm_query_price(llm.ppid, usage)
try:
chargings = await llm_charginng(sor, llm.ppid, callerid, usage)
llmusage.amount = chargings.amount
llmusage.cost = chargings.cost
except Exception as e:
e = Exception(f'{llm.pid} charging error{e}')
exception(f'{e}')
else:
llmusage.amount = 0
llmusage.cost = 0
llmusage.userorgid = callerorgid
llmusage.ownerid = llm.orgid
await sor.C('llmusage', llmusage)
if llm.ppid and callerorgid != llm.ownerid:
debug(f'{usage=},{llm.ownerid=},{callerorgid=}')
await llm_accounting(request, llm.id, usage, callerorgid, callerid)
llmusage.accounting_status = 'created'
await write_llmusage(llmusage)
await llm_accounting(request, llmusage)
except Exception as e:
exception(f'{e=},{format_exc()}')
estr = erase_apikey(e)
@ -263,7 +253,7 @@ async def uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None)
s = ''.join(s.split('\n'))
outlines.append(ed)
yield f'{s}\n'
await write_llmusage(luid, llm, callerid, None, params_kw, outlines, sor)
# await write_llmusage(luid, llm, callerid, None, params_kw, outlines, sor)
return
async def sync_uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None):
@ -280,14 +270,52 @@ async def sync_uapi_request(request, llm, sor, callerid, callerorgid, params_kw=
t1 = t2 = t3 = time.time()
luid = getID()
try:
start_timestamp = time.time()
responsed_seconds = None
finish_seconds = None
b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw)
if isinstance(b, bytes):
b = b.decode('utf-8')
d = json.loads(b)
status = d.get('status')
usage = d.get('usage', {})
if status and status != 'SUCCEEDED':
raise Exception(d['error'])
responsed_seconds = time.time() - start_timestamp
finish_seconds = response_seconds
llmusage = DictObject()
llmusage.id = luid
llmusage.llmid = llm.id
llmusage.use_date = curDateString()
llmusage.use_time = timestampstr()
llmusage.userid = callerid
llmusage.usage = json.dumps(usage)
llmusage.ioinfo = json.dumps({
"input": params_kw,
"output": d
})
llmusage.transno = params_kw.transno
llmusage.responsed_seconds = responsed_seconds
llmusage.finish_seconds = finish_seconds
llmusage.status = 'SUCCEEDED'
if llm.ppid:
try:
chargings = await llm_charginng(sor, llm.ppid, callerid, usage)
llmusage.amount = chargings.amount
llmusage.cost = chargings.cost
except Exception as e:
e = Exception(f'{llm.pid} charging error{e}')
exception(f'{e}')
else:
llmusage.amount = 0
llmusage.cost = 0
llmusage.userorgid = callerorgid
llmusage.ownerid = llm.orgid
llmusage.accounting_status = 'created'
b = json.dumps(d, ensure_ascii=False)
yield b
await write_llmusage(llmusage)
await llm_accounting(request, llmusage)
except Exception as e:
exception(f'{e=},{format_exc()}')
estr = erase_apikey(e)
@ -296,20 +324,6 @@ async def sync_uapi_request(request, llm, sor, callerid, callerorgid, params_kw=
s = ''.join(s.split('\n'))
outlines.append(ed)
yield f'{s}\n'
await write_llmusage(luid, llm, callerid, None, params_kw, outlines, sor)
return
d['llmusageid'] = luid
outlines.append(d)
t2 = t3 = time.time()
usage = d.get('usage', {})
usage['response_time'] = t2 - t1
usage['finish_time'] = t3 - t1
b = json.dumps(d, ensure_ascii=False)
yield b
await write_llmusage(luid, llm, callerid, usage, params_kw, outlines, sor)
if llm.ppid and callerorgid != llm.ownerid:
debug(f'{usage=},{llm.ownerid=},{callerorgid=}')
await llm_accounting(request, llm.id, usage, callerorgid, callerid)
async def async_uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None):
env = request._run_ns.copy()
@ -319,26 +333,57 @@ async def async_uapi_request(request, llm, sor, callerid, callerorgid, params_kw
# callerid = await env.get_user()
uapi = UAPI(request, sor=sor)
userid = await get_owner_userid(sor, llm)
outlines = []
b = None
t1 = t2 = t3 = time.time()
luid = getID()
try:
start_timestamp = time.time()
if llm.callbackurl:
params_kw.callbackurl = llm.callbackurl
b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw)
except Exception as e:
exception(f'{e=},{format_exc()}')
estr = erase_apikey(e)
ed = {"error": f"ERROR:{estr}", "status": "FAILED" ,"llmusageid": luid}
s = json.dumps(ed)
s = ''.join(s.split('\n'))
outlines.append(ed)
yield f'{s}\n'
await write_llmusage(luid, llm, callerid, None, params_kw, outlines, sor)
return
if isinstance(b, bytes):
b = b.decode('utf-8')
debug(f'task sumbited:{b}')
d = DictObject(**json.loads(b))
if d.status != 'SUCCEEDED':
e = Exception(f'resp={d} not success')
raise e
responsed_seconds = time.time() - start_timestamp
finish_seconds = response_seconds
llmusage = DictObject()
llmusage.id = luid
llmusage.llmid = llm.id
llmusage.use_date = curDateString()
llmusage.use_time = timestampstr()
llmusage.userid = callerid
llmusage.usage = json.dumps(usage)
llmusage.ioinfo = json.dumps({
"input": params_kw
})
llmusage.taskid = d.taskid
llmusage.transno = params_kw.transno
llmusage.responsed_seconds = responsed_seconds
llmusage.finish_seconds = finish_seconds
llmusage.status = 'CREATED'
llmusage.userorgid = callerorgid
llmusage.ownerid = llm.orgid
b = json.dumps(d, ensure_ascii=False)
yield b
await write_llmusage(llmusage)
await llm_accounting(request, llmusage)
if llm.callbackurl:
return
apinames = [ name.strip() for name in llm.query_apiname.split(',') ]
asyncio.create_task(query_task_status(request, llm.upappid,
apinames, luid, userid, d.taskid))
except Exception as e:
exception(f'{e=},{format_exc()}')
estr = erase_apikey(e)
ed = {"error": f"ERROR:{estr}", "status": "FAILED"}
s = json.dumps(ed)
s = ''.join(s.split('\n'))
yield f'{s}\n'
return
outlines.append(d)
t2 = time.time()
uapi = UAPI(request, sor=sor)
@ -470,3 +515,30 @@ async def llm_query_price(llmid, config_data):
raise e
prices = await env.pricing_program_charging(sor, llm.ppid, config_data)
return prices
async def query_task_status(request, upappid, apinames, luid, userid, taskid):
async with get_sor_context(env, 'llmage') as sor:
uapi = UAPI(request, sor)
for apiname in apinames:
try:
ns = {'taskid': taskid}
b = await uapi.call(upappid, apiname, userid, params=ns)
if isinstance(b, bytes):
b = b.decode('utf-8')
d = json.loads(b)
rzt = DictObject(**d)
if rzt.status == 'FAILED':
except Exception as e:
exception(f'{e=},{format_exc()}')
estr = erase_apikey(e)
recs = sor.R('llmusage', {'id': luid})
ed = {"error": f"ERROR:{estr}", "status": "FAILED"}
s = json.dumps(ed)
s = ''.join(s.split('\n'))
outlines.append(ed)
yield f'{s}\n'
await write_llmusage(luid, llm, callerid, None, params_kw, outlines, sor)
return
rzt['llmusageid'] = luid