diff --git a/llmage/asyncinference.py b/llmage/asyncinference.py new file mode 100644 index 0000000..4cc9070 --- /dev/null +++ b/llmage/asyncinference.py @@ -0,0 +1,169 @@ +import json +import time +import asyncio +from random import randint +from functools import partial +from traceback import format_exc +from sqlor.dbpools import DBPools, get_sor_context +from appPublic.log import debug, exception, error +from appPublic.uniqueID import getID +from appPublic.dictObject import DictObject +from appPublic.timeUtils import curDateString, timestampstr +from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64 +from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi +from ahserver.serverenv import get_serverenv, ServerEnv +from ahserver.filestorage import FileStorage +from llmage.accounting import llm_accounting +from .utils immport * + +async def get_today_asynctask_list(userid): + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + today = getCurrentDate() + sql = '''select * from llmusage +where userid=${userid}$ + and use_date = ${date}$''' + recs = await sor.sqlExe(sql, { + 'date': today, + 'userid': userid + } + return recs + return [] + +async def get_asynctask_status(taskid): + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + recs = sor.R('llmusage', {'taskid', taskid}) + if recs: + r = recs[0] + io = json.loads(r.ioinfo) + d = io.get('output', {}) + if isinstance(d, list): + return d[-1] + return d + return { + 'taskid': taskid, + 'status': 'FAILED', + 'error': f'taskid={taskid} not exist' + } + return { + 'taskid': taskid, + 'status': 'FAILED', + 'error': f'system error' + } + +async def async_uapi_request(request, llm, sor, + callerid, callerorgid, params_kw=None): + env = request._run_ns.copy() + if not params_kw: + params_kw = env.params_kw + # callerorgid = await env.get_userorgid() + # callerid = await env.get_user() + uapi = UAPI(request, sor=sor) + userid = await get_owner_userid(sor, llm) + b = None + luid = getID() + try: + start_timestamp = time.time() + if llm.callbackurl: + params_kw.callbackurl = llm.callbackurl + b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw) + if isinstance(b, bytes): + b = b.decode('utf-8') + debug(f'task sumbited:{b}') + d = DictObject(**json.loads(b)) + if d.status != 'SUCCEEDED': + e = Exception(f'resp={d} not success') + raise e + responsed_seconds = time.time() - start_timestamp + finish_seconds = response_seconds + llmusage = DictObject() + llmusage.id = luid + llmusage.llmid = llm.id + llmusage.use_date = curDateString() + llmusage.use_time = timestampstr() + llmusage.userid = callerid + llmusage.usage = json.dumps(usage) + llmusage.ioinfo = json.dumps({ + "input": params_kw + }) + llmusage.taskid = d.taskid + llmusage.transno = params_kw.transno + llmusage.responsed_seconds = responsed_seconds + llmusage.finish_seconds = finish_seconds + llmusage.status = 'CREATED' + llmusage.userorgid = callerorgid + llmusage.ownerid = llm.orgid + b = json.dumps(d, ensure_ascii=False) + yield b + await write_llmusage(llmusage) + await llm_accounting(request, llmusage) + if llm.callbackurl: + return + apinames = [ name.strip() for name in llm.query_apiname.split(',') ] + asyncio.create_task(query_task_status(request, llm.upappid, + apinames, luid, userid, d.taskid)) + + except Exception as e: + exception(f'{e=},{format_exc()}') + estr = erase_apikey(e) + ed = {"error": f"ERROR:{estr}", "status": "FAILED"} + s = json.dumps(ed) + s = ''.join(s.split('\n')) + yield f'{s}\n' + return + +async def add_new_llmusage_output(luid, rzt): + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + recs = await sor.R('llmusage', {'id': luid}) + if recs: + r = recs[0] + io = json.loads(r.ioinfo) + out = io.get('output', []) + out.append(out) + io['output'] = out + r.ioinfo = json.dumps({ + 'input': io.get('input',{}), + 'output': out + }) + await await sor.U('llmusage', r) + return + +async def query_task_status(request, upappid, apinames, luid, userid, taskid): + async with get_sor_context(env, 'llmage') as sor: + uapi = UAPI(request, sor) + for apiname in apinames: + try: + ns = {'taskid': taskid} + b = await uapi.call(upappid, apiname, userid, params=ns) + if isinstance(b, bytes): + b = b.decode('utf-8') + d = json.loads(b) + rzt = DictObject(**d) + await add_new_llmusage_output(luid, rzt) + if rzt.status == 'FAILED': + return + if rzt.status == 'SUCCEEDED': + if llm.ppid: + try: + chargings = await llm_charginng(sor, + llm.ppid, callerid, usage) + llmusage.amount = chargings.amount + llmusage.cost = chargings.cost + except Exception as e: + e = Exception(f'{llm.pid} charging error{e}') + exception(f'{e}') + else: + llmusage.amount = 0 + llmusage.cost = 0 + await llm_accounting(request, llmusage) + + except Exception as e: + exception(f'{e=},{format_exc()}') + estr = erase_apikey(e) + recs = sor.R('llmusage', {'id': luid}) + ed = {"error": f"ERROR:{estr}", "status": "FAILED", 'taskid': taskid} + await add_new_llmusage_output(luid, ed) + return + diff --git a/llmage/callback.py b/llmage/callback.py new file mode 100644 index 0000000..57dbbec --- /dev/null +++ b/llmage/callback.py @@ -0,0 +1,54 @@ +from ahserver.serverenv import ServerEnv +from appPublic.dictObject import DictObject +from sqlor.dbpools import get_sor_context +from .accounting import llm_charging, llm_accounting + +async def asynctask_callbacka(appname, apiname, params_kw) + env = ServerEnv() + llmusage = None + async with get_sor_context(env, 'llmage') as sor: + uapi = await env.sor_get_uapi_by_appname_apiname(appname, apiname) + try: + dstr = await env.tmpl_engine.renders(uapi.response, params_kw) + d = DictObject(**json.loads(dstr)) + llmus = await sor.R('llmusage', {'taskid': d.taskid}) + if len(llmus) == 0: + e = Exception(f'{d=}, {taskid=} not found') + exception(f'{e}') + raise e + llmusage = llmus[0] + io = json.loads(llmusage.ioinfo) + out = io.get('output') + out.append(d) + llmusage.status = d.status + if d.status == 'SUCCEEDED': + llms = await sor.R('llm', {'id': llmusage.llmid}) + if len(llms) == 0: + e = Exception(f'{llmusage.llmid} llm not found') + exception(f'{e}') + raise e + llm = llms[0] + if llm.ppid: + try: + chargings = await llm_charging(sor, llm.ppid, + llmusage.userid, d.usage) + llmusage.amount = chargings.amount + llmusage.cost = chargings.cost + except Exception as e: + e = Exception(f'{llm.pid} charging error{e}') + exception(f'{e}') + else: + llmusage.amount = 0 + llmusage.cost = 0 + sor.U('llmusage', llmusage) + + + except Exception as e: + e = Exception(f'{uapi.response=}, {params_kw=} render error') + exception(f'{e}') + raise e + + if llmusage: + await llm_accounting(request, llmusage) + + diff --git a/llmage/llmclient.py b/llmage/llmclient.py index c46feb5..0b857d6 100644 --- a/llmage/llmclient.py +++ b/llmage/llmclient.py @@ -5,7 +5,7 @@ from random import randint from functools import partial from traceback import format_exc from sqlor.dbpools import DBPools, get_sor_context -from appPublic.log import debug, exception +from appPublic.log import debug, exception, error from appPublic.uniqueID import getID from appPublic.dictObject import DictObject from appPublic.timeUtils import curDateString, timestampstr @@ -13,7 +13,9 @@ from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64 from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi from ahserver.serverenv import get_serverenv, ServerEnv from ahserver.filestorage import FileStorage -from llmage.accounting import llm_accounting +from .asyncinference import async_uapi_request +from .syncinference import syncinference +from .accounting import llm_accounting def erase_apikey(e): e = str(e) @@ -516,6 +518,23 @@ async def llm_query_price(llmid, config_data): prices = await env.pricing_program_charging(sor, llm.ppid, config_data) return prices +async def add_new_llmusage_output(luid, rzt): + env = ServerEnv() + async with get_sor_context(env, 'llmage') as sor: + recs = await sor.R('llmusage', {'id': luid}) + if recs: + r = recs[0] + io = json.loads(r.ioinfo) + out = io.get('output', []) + out.append(out) + io['output'] = out + r.ioinfo = json.dumps({ + 'input': io.get('input',{}), + 'output': out + }) + await await sor.U('llmusage', r) + return + async def query_task_status(request, upappid, apinames, luid, userid, taskid): async with get_sor_context(env, 'llmage') as sor: uapi = UAPI(request, sor) @@ -527,18 +546,29 @@ async def query_task_status(request, upappid, apinames, luid, userid, taskid): b = b.decode('utf-8') d = json.loads(b) rzt = DictObject(**d) + await add_new_llmusage_output(luid, rzt) if rzt.status == 'FAILED': + return + if rzt.status == 'SUCCEEDED': + if llm.ppid: + try: + chargings = await llm_charginng(sor, + llm.ppid, callerid, usage) + llmusage.amount = chargings.amount + llmusage.cost = chargings.cost + except Exception as e: + e = Exception(f'{llm.pid} charging error{e}') + exception(f'{e}') + else: + llmusage.amount = 0 + llmusage.cost = 0 + await llm_accounting(request, llmusage) + except Exception as e: exception(f'{e=},{format_exc()}') estr = erase_apikey(e) recs = sor.R('llmusage', {'id': luid}) - ed = {"error": f"ERROR:{estr}", "status": "FAILED"} - s = json.dumps(ed) - s = ''.join(s.split('\n')) - outlines.append(ed) - yield f'{s}\n' - await write_llmusage(luid, llm, callerid, None, params_kw, outlines, sor) + ed = {"error": f"ERROR:{estr}", "status": "FAILED", 'taskid': taskid} + await add_new_llmusage_output(luid, ed) return - rzt['llmusageid'] = luid - diff --git a/llmage/syncinference.py b/llmage/syncinference.py new file mode 100644 index 0000000..d8cadf2 --- /dev/null +++ b/llmage/syncinference.py @@ -0,0 +1,85 @@ +import json +import time +import asyncio +from random import randint +from functools import partial +from traceback import format_exc +from sqlor.dbpools import DBPools, get_sor_context +from appPublic.log import debug, exception, error +from appPublic.uniqueID import getID +from appPublic.dictObject import DictObject +from appPublic.timeUtils import curDateString, timestampstr +from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64 +from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi +from ahserver.serverenv import get_serverenv, ServerEnv +from ahserver.filestorage import FileStorage +from llmage.accounting import llm_accounting, llm_charginng + +async def sync_uapi_request(request, llm, sor, callerid, callerorgid, params_kw=None): + env = request._run_ns.copy() + if not params_kw: + params_kw = env.params_kw + # callerid = await env.get_user() + # callerorgid = await env.get_userorgid() + uapi = UAPI(request, sor=sor) + userid = await get_owner_userid(sor, llm) + outlines = [] + b = None + d = None + luid = getID() + try: + start_timestamp = time.time() + responsed_seconds = None + finish_seconds = None + b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw) + if isinstance(b, bytes): + b = b.decode('utf-8') + d = json.loads(b) + status = d.get('status') + usage = d.get('usage', {}) + if status and status != 'SUCCEEDED': + raise Exception(d['error']) + responsed_seconds = time.time() - start_timestamp + finish_seconds = response_seconds + llmusage = DictObject() + llmusage.id = luid + llmusage.llmid = llm.id + llmusage.use_date = curDateString() + llmusage.use_time = timestampstr() + llmusage.userid = callerid + llmusage.usage = json.dumps(usage) + llmusage.ioinfo = json.dumps({ + "input": params_kw, + "output": d + }) + llmusage.transno = params_kw.transno + llmusage.responsed_seconds = responsed_seconds + llmusage.finish_seconds = finish_seconds + llmusage.status = 'SUCCEEDED' + if llm.ppid: + try: + chargings = await llm_charginng(sor, llm.ppid, callerid, usage) + llmusage.amount = chargings.amount + llmusage.cost = chargings.cost + except Exception as e: + e = Exception(f'{llm.pid} charging error{e}') + exception(f'{e}') + else: + llmusage.amount = 0 + llmusage.cost = 0 + llmusage.userorgid = callerorgid + llmusage.ownerid = llm.orgid + llmusage.accounting_status = 'created' + b = json.dumps(d, ensure_ascii=False) + yield b + await write_llmusage(llmusage) + await llm_accounting(request, llmusage) + except Exception as e: + exception(f'{e=},{format_exc()}') + estr = erase_apikey(e) + ed = {"error": f"ERROR:{estr}", "status": "FAILED" ,"llmusageid": luid} + s = json.dumps(ed) + s = ''.join(s.split('\n')) + outlines.append(ed) + yield f'{s}\n' + diff --git a/wwwroot/vidu_callback.dspy b/wwwroot/vidu_callback.dspy new file mode 100644 index 0000000..4b729f0 --- /dev/null +++ b/wwwroot/vidu_callback.dspy @@ -0,0 +1,8 @@ +try: + r = await asynctask_callbacka('vidu平台', 'taskSTatus', params_kw) +except Exception as e: + exception(f'{e}') +return json_response({ + text:'success' +}) +