This commit is contained in:
yumoqing 2026-03-29 17:28:39 +08:00
parent eeb88c1621
commit 777b065a97
5 changed files with 356 additions and 10 deletions

169
llmage/asyncinference.py Normal file
View File

@ -0,0 +1,169 @@
import json
import time
import asyncio
from random import randint
from functools import partial
from traceback import format_exc
from sqlor.dbpools import DBPools, get_sor_context
from appPublic.log import debug, exception, error
from appPublic.uniqueID import getID
from appPublic.dictObject import DictObject
from appPublic.timeUtils import curDateString, timestampstr
from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64
from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi
from ahserver.serverenv import get_serverenv, ServerEnv
from ahserver.filestorage import FileStorage
from llmage.accounting import llm_accounting
from .utils immport *
async def get_today_asynctask_list(userid):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
today = getCurrentDate()
sql = '''select * from llmusage
where userid=${userid}$
and use_date = ${date}$'''
recs = await sor.sqlExe(sql, {
'date': today,
'userid': userid
}
return recs
return []
async def get_asynctask_status(taskid):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
recs = sor.R('llmusage', {'taskid', taskid})
if recs:
r = recs[0]
io = json.loads(r.ioinfo)
d = io.get('output', {})
if isinstance(d, list):
return d[-1]
return d
return {
'taskid': taskid,
'status': 'FAILED',
'error': f'taskid={taskid} not exist'
}
return {
'taskid': taskid,
'status': 'FAILED',
'error': f'system error'
}
async def async_uapi_request(request, llm, sor,
callerid, callerorgid, params_kw=None):
env = request._run_ns.copy()
if not params_kw:
params_kw = env.params_kw
# callerorgid = await env.get_userorgid()
# callerid = await env.get_user()
uapi = UAPI(request, sor=sor)
userid = await get_owner_userid(sor, llm)
b = None
luid = getID()
try:
start_timestamp = time.time()
if llm.callbackurl:
params_kw.callbackurl = llm.callbackurl
b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw)
if isinstance(b, bytes):
b = b.decode('utf-8')
debug(f'task sumbited:{b}')
d = DictObject(**json.loads(b))
if d.status != 'SUCCEEDED':
e = Exception(f'resp={d} not success')
raise e
responsed_seconds = time.time() - start_timestamp
finish_seconds = response_seconds
llmusage = DictObject()
llmusage.id = luid
llmusage.llmid = llm.id
llmusage.use_date = curDateString()
llmusage.use_time = timestampstr()
llmusage.userid = callerid
llmusage.usage = json.dumps(usage)
llmusage.ioinfo = json.dumps({
"input": params_kw
})
llmusage.taskid = d.taskid
llmusage.transno = params_kw.transno
llmusage.responsed_seconds = responsed_seconds
llmusage.finish_seconds = finish_seconds
llmusage.status = 'CREATED'
llmusage.userorgid = callerorgid
llmusage.ownerid = llm.orgid
b = json.dumps(d, ensure_ascii=False)
yield b
await write_llmusage(llmusage)
await llm_accounting(request, llmusage)
if llm.callbackurl:
return
apinames = [ name.strip() for name in llm.query_apiname.split(',') ]
asyncio.create_task(query_task_status(request, llm.upappid,
apinames, luid, userid, d.taskid))
except Exception as e:
exception(f'{e=},{format_exc()}')
estr = erase_apikey(e)
ed = {"error": f"ERROR:{estr}", "status": "FAILED"}
s = json.dumps(ed)
s = ''.join(s.split('\n'))
yield f'{s}\n'
return
async def add_new_llmusage_output(luid, rzt):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
recs = await sor.R('llmusage', {'id': luid})
if recs:
r = recs[0]
io = json.loads(r.ioinfo)
out = io.get('output', [])
out.append(out)
io['output'] = out
r.ioinfo = json.dumps({
'input': io.get('input',{}),
'output': out
})
await await sor.U('llmusage', r)
return
async def query_task_status(request, upappid, apinames, luid, userid, taskid):
async with get_sor_context(env, 'llmage') as sor:
uapi = UAPI(request, sor)
for apiname in apinames:
try:
ns = {'taskid': taskid}
b = await uapi.call(upappid, apiname, userid, params=ns)
if isinstance(b, bytes):
b = b.decode('utf-8')
d = json.loads(b)
rzt = DictObject(**d)
await add_new_llmusage_output(luid, rzt)
if rzt.status == 'FAILED':
return
if rzt.status == 'SUCCEEDED':
if llm.ppid:
try:
chargings = await llm_charginng(sor,
llm.ppid, callerid, usage)
llmusage.amount = chargings.amount
llmusage.cost = chargings.cost
except Exception as e:
e = Exception(f'{llm.pid} charging error{e}')
exception(f'{e}')
else:
llmusage.amount = 0
llmusage.cost = 0
await llm_accounting(request, llmusage)
except Exception as e:
exception(f'{e=},{format_exc()}')
estr = erase_apikey(e)
recs = sor.R('llmusage', {'id': luid})
ed = {"error": f"ERROR:{estr}", "status": "FAILED", 'taskid': taskid}
await add_new_llmusage_output(luid, ed)
return

54
llmage/callback.py Normal file
View File

@ -0,0 +1,54 @@
from ahserver.serverenv import ServerEnv
from appPublic.dictObject import DictObject
from sqlor.dbpools import get_sor_context
from .accounting import llm_charging, llm_accounting
async def asynctask_callbacka(appname, apiname, params_kw)
env = ServerEnv()
llmusage = None
async with get_sor_context(env, 'llmage') as sor:
uapi = await env.sor_get_uapi_by_appname_apiname(appname, apiname)
try:
dstr = await env.tmpl_engine.renders(uapi.response, params_kw)
d = DictObject(**json.loads(dstr))
llmus = await sor.R('llmusage', {'taskid': d.taskid})
if len(llmus) == 0:
e = Exception(f'{d=}, {taskid=} not found')
exception(f'{e}')
raise e
llmusage = llmus[0]
io = json.loads(llmusage.ioinfo)
out = io.get('output')
out.append(d)
llmusage.status = d.status
if d.status == 'SUCCEEDED':
llms = await sor.R('llm', {'id': llmusage.llmid})
if len(llms) == 0:
e = Exception(f'{llmusage.llmid} llm not found')
exception(f'{e}')
raise e
llm = llms[0]
if llm.ppid:
try:
chargings = await llm_charging(sor, llm.ppid,
llmusage.userid, d.usage)
llmusage.amount = chargings.amount
llmusage.cost = chargings.cost
except Exception as e:
e = Exception(f'{llm.pid} charging error{e}')
exception(f'{e}')
else:
llmusage.amount = 0
llmusage.cost = 0
sor.U('llmusage', llmusage)
except Exception as e:
e = Exception(f'{uapi.response=}, {params_kw=} render error')
exception(f'{e}')
raise e
if llmusage:
await llm_accounting(request, llmusage)

View File

@ -5,7 +5,7 @@ from random import randint
from functools import partial from functools import partial
from traceback import format_exc from traceback import format_exc
from sqlor.dbpools import DBPools, get_sor_context from sqlor.dbpools import DBPools, get_sor_context
from appPublic.log import debug, exception from appPublic.log import debug, exception, error
from appPublic.uniqueID import getID from appPublic.uniqueID import getID
from appPublic.dictObject import DictObject from appPublic.dictObject import DictObject
from appPublic.timeUtils import curDateString, timestampstr from appPublic.timeUtils import curDateString, timestampstr
@ -13,7 +13,9 @@ from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64
from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi
from ahserver.serverenv import get_serverenv, ServerEnv from ahserver.serverenv import get_serverenv, ServerEnv
from ahserver.filestorage import FileStorage from ahserver.filestorage import FileStorage
from llmage.accounting import llm_accounting from .asyncinference import async_uapi_request
from .syncinference import syncinference
from .accounting import llm_accounting
def erase_apikey(e): def erase_apikey(e):
e = str(e) e = str(e)
@ -516,6 +518,23 @@ async def llm_query_price(llmid, config_data):
prices = await env.pricing_program_charging(sor, llm.ppid, config_data) prices = await env.pricing_program_charging(sor, llm.ppid, config_data)
return prices return prices
async def add_new_llmusage_output(luid, rzt):
env = ServerEnv()
async with get_sor_context(env, 'llmage') as sor:
recs = await sor.R('llmusage', {'id': luid})
if recs:
r = recs[0]
io = json.loads(r.ioinfo)
out = io.get('output', [])
out.append(out)
io['output'] = out
r.ioinfo = json.dumps({
'input': io.get('input',{}),
'output': out
})
await await sor.U('llmusage', r)
return
async def query_task_status(request, upappid, apinames, luid, userid, taskid): async def query_task_status(request, upappid, apinames, luid, userid, taskid):
async with get_sor_context(env, 'llmage') as sor: async with get_sor_context(env, 'llmage') as sor:
uapi = UAPI(request, sor) uapi = UAPI(request, sor)
@ -527,18 +546,29 @@ async def query_task_status(request, upappid, apinames, luid, userid, taskid):
b = b.decode('utf-8') b = b.decode('utf-8')
d = json.loads(b) d = json.loads(b)
rzt = DictObject(**d) rzt = DictObject(**d)
await add_new_llmusage_output(luid, rzt)
if rzt.status == 'FAILED': if rzt.status == 'FAILED':
return
if rzt.status == 'SUCCEEDED':
if llm.ppid:
try:
chargings = await llm_charginng(sor,
llm.ppid, callerid, usage)
llmusage.amount = chargings.amount
llmusage.cost = chargings.cost
except Exception as e:
e = Exception(f'{llm.pid} charging error{e}')
exception(f'{e}')
else:
llmusage.amount = 0
llmusage.cost = 0
await llm_accounting(request, llmusage)
except Exception as e: except Exception as e:
exception(f'{e=},{format_exc()}') exception(f'{e=},{format_exc()}')
estr = erase_apikey(e) estr = erase_apikey(e)
recs = sor.R('llmusage', {'id': luid}) recs = sor.R('llmusage', {'id': luid})
ed = {"error": f"ERROR:{estr}", "status": "FAILED"} ed = {"error": f"ERROR:{estr}", "status": "FAILED", 'taskid': taskid}
s = json.dumps(ed) await add_new_llmusage_output(luid, ed)
s = ''.join(s.split('\n'))
outlines.append(ed)
yield f'{s}\n'
await write_llmusage(luid, llm, callerid, None, params_kw, outlines, sor)
return return
rzt['llmusageid'] = luid

85
llmage/syncinference.py Normal file
View File

@ -0,0 +1,85 @@
import json
import time
import asyncio
from random import randint
from functools import partial
from traceback import format_exc
from sqlor.dbpools import DBPools, get_sor_context
from appPublic.log import debug, exception, error
from appPublic.uniqueID import getID
from appPublic.dictObject import DictObject
from appPublic.timeUtils import curDateString, timestampstr
from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64
from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi
from ahserver.serverenv import get_serverenv, ServerEnv
from ahserver.filestorage import FileStorage
from llmage.accounting import llm_accounting, llm_charginng
async def sync_uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None):
env = request._run_ns.copy()
if not params_kw:
params_kw = env.params_kw
# callerid = await env.get_user()
# callerorgid = await env.get_userorgid()
uapi = UAPI(request, sor=sor)
userid = await get_owner_userid(sor, llm)
outlines = []
b = None
d = None
luid = getID()
try:
start_timestamp = time.time()
responsed_seconds = None
finish_seconds = None
b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw)
if isinstance(b, bytes):
b = b.decode('utf-8')
d = json.loads(b)
status = d.get('status')
usage = d.get('usage', {})
if status and status != 'SUCCEEDED':
raise Exception(d['error'])
responsed_seconds = time.time() - start_timestamp
finish_seconds = response_seconds
llmusage = DictObject()
llmusage.id = luid
llmusage.llmid = llm.id
llmusage.use_date = curDateString()
llmusage.use_time = timestampstr()
llmusage.userid = callerid
llmusage.usage = json.dumps(usage)
llmusage.ioinfo = json.dumps({
"input": params_kw,
"output": d
})
llmusage.transno = params_kw.transno
llmusage.responsed_seconds = responsed_seconds
llmusage.finish_seconds = finish_seconds
llmusage.status = 'SUCCEEDED'
if llm.ppid:
try:
chargings = await llm_charginng(sor, llm.ppid, callerid, usage)
llmusage.amount = chargings.amount
llmusage.cost = chargings.cost
except Exception as e:
e = Exception(f'{llm.pid} charging error{e}')
exception(f'{e}')
else:
llmusage.amount = 0
llmusage.cost = 0
llmusage.userorgid = callerorgid
llmusage.ownerid = llm.orgid
llmusage.accounting_status = 'created'
b = json.dumps(d, ensure_ascii=False)
yield b
await write_llmusage(llmusage)
await llm_accounting(request, llmusage)
except Exception as e:
exception(f'{e=},{format_exc()}')
estr = erase_apikey(e)
ed = {"error": f"ERROR:{estr}", "status": "FAILED" ,"llmusageid": luid}
s = json.dumps(ed)
s = ''.join(s.split('\n'))
outlines.append(ed)
yield f'{s}\n'

View File

@ -0,0 +1,8 @@
try:
r = await asynctask_callbacka('vidu平台', 'taskSTatus', params_kw)
except Exception as e:
exception(f'{e}')
return json_response({
text:'success'
})