This commit is contained in:
yumoqing 2025-11-13 13:16:33 +08:00
parent 73bb0f84c8
commit 3ac420984d
2 changed files with 94 additions and 20 deletions

View File

@ -1,4 +1,5 @@
import json
import time
import asyncio
from random import randint
from functools import partial
@ -7,7 +8,7 @@ from sqlor.dbpools import DBPools
from appPublic.log import debug, exception
from appPublic.uniqueID import getID
from appPublic.dictObject import DictObject
from appPublic.timeUtils import curDateString
from appPublic.timeUtils import curDateString, timestampstr
from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64
from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi
from ahserver.serverenv import get_serverenv
@ -98,16 +99,41 @@ where a.upappid=b.id
i = randint(0, len(recs)-1)
return recs[i].userid
async def uapi_request(request, llm, sor):
async def write_llmusage(llm, userid, usage, params_kw, outdata, sor):
d = {
"id": getID(),
"llmid": llm.id,
"use_date": curDateString(),
"use_time": timestampstr(),
"userid": userid,
"useages": usages,
"ioinfo": json.dumps({
"input": params_kw,
"output": outdata
})
}
await sor.C('llmusage', d)
async def uapi_request(request, llm, sor, params_kw=None):
env = request._run_ns.copy()
if not params_kw:
params_kw = env.params_kw
caller_orgid = await env.get_userorgid()
callerid = await env.get_user()
uapi = UAPI(request, sor=sor)
userid = await get_owner_userid(sor, llm)
outlines = []
txt = ''
try:
t1 = time.time()
t2 = t1
t3 = t1
first = True
async for l in uapi.stream_linify(llm.upappid, llm.apiname, userid,
params=env.params_kw):
params=params_kw):
if first:
first = False
t2 = time.time()
if isinstance(l, bytes):
l = l.decode('utf-8')
if l[-1] == '\n':
@ -128,68 +154,106 @@ async def uapi_request(request, llm, sor):
if d.get('content'):
txt = txt + d['content']
yield_it = True
outlines.append(d)
yield json.dumps(d) + '\n'
usage = outlines[-1].get('usage',{})
t3 = time.time()
usage['response_time'] = t2 - t1
usage['finish_time'] = t3 - t1
if not usage.get('completions_tokens'):
usage['completions_tokens'] = len(txt)
if not usage.get('input_tokens'):
cnt = 0
if params_kw.prompt:
cnt += len(params_kw.prompt)
if params_kw.negitive_prompt:
cnt += len(params_kw.negitive_promot)
usage['input_tokens'] = len
await write_llmusage(llm, callerid, usage, params_kw, outlines, sor)
except Exception as e:
exception(f'{e=},{format_exc()}')
estr = erase_apikey(e)
outlines.append({"error": "ERROR:{estr}", "status": "FAILED" })
yield f'{{"error": "ERROR:{estr}", "status": "SUCCEEDED" }}\n'
await write_llmusage(llm, callerid, None, params_kw, outlines, sor)
return
debug(f'{txt=}')
async def sync_uapi_request(request, llm, sor):
async def sync_uapi_request(request, llm, sor, params_kw=None):
env = request._run_ns.copy()
if not params_kw:
params_kw = env.params_kw
caller_orgid = await env.get_userorgid()
callerid = await env.get_user()
uapi = UAPI(request, sor=sor)
userid = await get_owner_userid(sor, llm)
outlines = []
b = None
d = None
t1 = t2 = t3 = time.time()
try:
b = await uapi.call(llm.upappid, llm.apiname, userid, params=env.params_kw)
b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw)
if isinstance(b, bytes):
b = b.decode('utf-8')
d = json.loads(b)
except Exception as e:
exception(f'{e=},{format_exc()}')
estr = erase_apikey(e)
yield f'{{"error": "ERROR:{estr}", "status": "SUCCEEDED" }}\n'
outlines.append({"error": "ERROR:{estr}", "status": "FAILED" })
await write_llmusage(llm, callerid, None, params_kw, outlines, sor)
return
if isinstance(b, bytes):
b = b.decode('utf-8')
outlines.append(d)
t2 = t3 = time.time()
usage = d.get('usage', {})
usage['response_time'] = t2 - t1
usage['finish_time'] = t3 - t1
await write_llmusage(llm, callerid, usage, params_kw, outlines, sor)
debug(f'finished:{b}')
yield b
async def async_uapi_request(request, llm, sor):
async def async_uapi_request(request, llm, sor, params_kw=None):
env = request._run_ns.copy()
if not params_kw:
params_kw = env.params_kw
caller_orgid = await env.get_userorgid()
callerid = await env.get_user()
uapi = UAPI(request, sor=sor)
userid = await get_owner_userid(sor, llm)
outlines = []
b = None
t1 = t2 = t3 = time.time()
try:
b = await uapi.call(llm.upappid, llm.apiname, userid, params=env.params_kw)
b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw)
except Exception as e:
exception(f'{e=},{format_exc()}')
estr = erase_apikey(e)
yield f'{{"error": "ERROR:{estr}", "status": "SUCCEEDED" }}\n'
outlines.append({"error": "ERROR:{estr}", "status": "FAILED" })
await write_llmusage(llm, callerid, None, params_kw, outlines, sor)
return
if isinstance(b, bytes):
b = b.decode('utf-8')
debug(f'task sumbited:{b}')
d = DictObject(**json.loads(b))
if not d.get('context'):
debug(f'{b} error')
yield '{"error":"server return no taskid", "status": "SUCCEEDED" }\n'
return
outlines.append(d)
t2 = time.time()
uapi = UAPI(request, sor=sor)
apinames = [ name.strip() for name in llm.query_apiname.split(',') ]
for apiname in apinames:
while True:
b = None
try:
b = await uapi.call(llm.upappid, apiname, userid, params=d.context)
b = await uapi.call(llm.upappid, apiname, userid, params=d)
except Exception as e:
exception(f'{e=},{format_exc()}')
estr = erase_apikey(e)
yield f'{{"error": "ERROR:{estr}", "status": "SUCCEEDED" }}\n'
break
outlines.append({"error": "ERROR:{estr}", "status": "FAILED" })
await write_llmusage(llm, callerid, None, params_kw, outlines, sor)
return
if isinstance(b, bytes):
b = b.decode('utf-8')
@ -200,11 +264,19 @@ async def async_uapi_request(request, llm, sor):
if not rzt.status or rzt.status == 'FAILED':
debug(f'{b=} return error')
yield f'{{"error": "ERROR:upapp return failed", "status": "SUCCEEDED" }}\n'
outlines.append({"error": "ERROR:{estr}", "status": "FAILED" })
await write_llmusage(llm, callerid, None, params_kw, outlines, sor)
return
if rzt.status == 'SUCCEEDED':
debug(f'{b=} return successed')
await asyncio.sleep(1)
d = rzt
outlines.append(d)
usage = d.get('usage', {})
t3 = time.time()
usage['response_time'] = t2 - t1
usage['finish_time'] = t3 -t1
await write_llmusage(llm, callerid, usage, params_kw, outlines, sor)
break
period = llm.query_period or 30
await asyncio.sleep(period)
@ -228,20 +300,22 @@ def b64media2url(request, mediafile):
url = entire_url('/idfile?path=') + env.quote(mediafile)
return url
async def inference(request, *args, **kw):
async def inference(request, *args, params_kw=None, **kw):
env = request._run_ns.copy()
llmid = env.params_kw.llmid
if not params_kw:
params_kw = env.params_kw
llmid = params_kw.llmid
dbname = env.get_module_dbname('llmage')
db = env.DBPools()
async with db.sqlorContext(dbname) as sor:
llm = await get_llm(llmid)
if llm.stream == 'async':
f = partial(async_uapi_request, request, llm, sor)
f = partial(async_uapi_request, request, llm, sor, params_kw=params_kw)
return await env.stream_response(request, f)
if llm.stream == 'sync':
f = partial(sync_uapi_request, request, llm, sor)
f = partial(sync_uapi_request, request, llm, sor, params_kw=params_kw)
return await env.stream_response(request, f)
# env.update(llm)
uapi = UAPI(request, sor=sor)
f = partial(uapi_request, request, llm, sor)
f = partial(uapi_request, request, llm, sor, params_kw=params_kw)
return await env.stream_response(request, f)

Binary file not shown.