diff --git a/llmage/llmclient.py b/llmage/llmclient.py index 1fa142e..0bd1860 100644 --- a/llmage/llmclient.py +++ b/llmage/llmclient.py @@ -1,4 +1,5 @@ import json +import time import asyncio from random import randint from functools import partial @@ -7,7 +8,7 @@ from sqlor.dbpools import DBPools from appPublic.log import debug, exception from appPublic.uniqueID import getID from appPublic.dictObject import DictObject -from appPublic.timeUtils import curDateString +from appPublic.timeUtils import curDateString, timestampstr from appPublic.base64_to_file import base64_to_file, getFilenameFromBase64 from uapi.appapi import UAPI, sor_get_callerid, sor_get_uapi from ahserver.serverenv import get_serverenv @@ -98,16 +99,41 @@ where a.upappid=b.id i = randint(0, len(recs)-1) return recs[i].userid -async def uapi_request(request, llm, sor): +async def write_llmusage(llm, userid, usage, params_kw, outdata, sor): + d = { + "id": getID(), + "llmid": llm.id, + "use_date": curDateString(), + "use_time": timestampstr(), + "userid": userid, + "useages": usages, + "ioinfo": json.dumps({ + "input": params_kw, + "output": outdata + }) + } + await sor.C('llmusage', d) + +async def uapi_request(request, llm, sor, params_kw=None): env = request._run_ns.copy() + if not params_kw: + params_kw = env.params_kw caller_orgid = await env.get_userorgid() callerid = await env.get_user() uapi = UAPI(request, sor=sor) userid = await get_owner_userid(sor, llm) + outlines = [] txt = '' try: + t1 = time.time() + t2 = t1 + t3 = t1 + first = True async for l in uapi.stream_linify(llm.upappid, llm.apiname, userid, - params=env.params_kw): + params=params_kw): + if first: + first = False + t2 = time.time() if isinstance(l, bytes): l = l.decode('utf-8') if l[-1] == '\n': @@ -128,68 +154,106 @@ async def uapi_request(request, llm, sor): if d.get('content'): txt = txt + d['content'] yield_it = True + outlines.append(d) yield json.dumps(d) + '\n' + usage = outlines[-1].get('usage',{}) + t3 = time.time() + usage['response_time'] = t2 - t1 + usage['finish_time'] = t3 - t1 + if not usage.get('completions_tokens'): + usage['completions_tokens'] = len(txt) + if not usage.get('input_tokens'): + cnt = 0 + if params_kw.prompt: + cnt += len(params_kw.prompt) + if params_kw.negitive_prompt: + cnt += len(params_kw.negitive_promot) + usage['input_tokens'] = len + await write_llmusage(llm, callerid, usage, params_kw, outlines, sor) except Exception as e: exception(f'{e=},{format_exc()}') estr = erase_apikey(e) + outlines.append({"error": "ERROR:{estr}", "status": "FAILED" }) yield f'{{"error": "ERROR:{estr}", "status": "SUCCEEDED" }}\n' + await write_llmusage(llm, callerid, None, params_kw, outlines, sor) return debug(f'{txt=}') -async def sync_uapi_request(request, llm, sor): +async def sync_uapi_request(request, llm, sor, params_kw=None): env = request._run_ns.copy() + if not params_kw: + params_kw = env.params_kw caller_orgid = await env.get_userorgid() callerid = await env.get_user() uapi = UAPI(request, sor=sor) userid = await get_owner_userid(sor, llm) + outlines = [] b = None + d = None + t1 = t2 = t3 = time.time() try: - b = await uapi.call(llm.upappid, llm.apiname, userid, params=env.params_kw) + + b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw) + if isinstance(b, bytes): + b = b.decode('utf-8') + d = json.loads(b) except Exception as e: exception(f'{e=},{format_exc()}') estr = erase_apikey(e) yield f'{{"error": "ERROR:{estr}", "status": "SUCCEEDED" }}\n' + outlines.append({"error": "ERROR:{estr}", "status": "FAILED" }) + await write_llmusage(llm, callerid, None, params_kw, outlines, sor) return - if isinstance(b, bytes): - b = b.decode('utf-8') + outlines.append(d) + t2 = t3 = time.time() + usage = d.get('usage', {}) + usage['response_time'] = t2 - t1 + usage['finish_time'] = t3 - t1 + await write_llmusage(llm, callerid, usage, params_kw, outlines, sor) debug(f'finished:{b}') yield b -async def async_uapi_request(request, llm, sor): +async def async_uapi_request(request, llm, sor, params_kw=None): env = request._run_ns.copy() + if not params_kw: + params_kw = env.params_kw caller_orgid = await env.get_userorgid() callerid = await env.get_user() uapi = UAPI(request, sor=sor) userid = await get_owner_userid(sor, llm) + outlines = [] b = None + t1 = t2 = t3 = time.time() try: - b = await uapi.call(llm.upappid, llm.apiname, userid, params=env.params_kw) + b = await uapi.call(llm.upappid, llm.apiname, userid, params=params_kw) except Exception as e: exception(f'{e=},{format_exc()}') estr = erase_apikey(e) yield f'{{"error": "ERROR:{estr}", "status": "SUCCEEDED" }}\n' + outlines.append({"error": "ERROR:{estr}", "status": "FAILED" }) + await write_llmusage(llm, callerid, None, params_kw, outlines, sor) return if isinstance(b, bytes): b = b.decode('utf-8') debug(f'task sumbited:{b}') d = DictObject(**json.loads(b)) - if not d.get('context'): - debug(f'{b} error') - yield '{"error":"server return no taskid", "status": "SUCCEEDED" }\n' - return + outlines.append(d) + t2 = time.time() uapi = UAPI(request, sor=sor) apinames = [ name.strip() for name in llm.query_apiname.split(',') ] for apiname in apinames: while True: b = None try: - b = await uapi.call(llm.upappid, apiname, userid, params=d.context) + b = await uapi.call(llm.upappid, apiname, userid, params=d) except Exception as e: exception(f'{e=},{format_exc()}') estr = erase_apikey(e) yield f'{{"error": "ERROR:{estr}", "status": "SUCCEEDED" }}\n' - break + outlines.append({"error": "ERROR:{estr}", "status": "FAILED" }) + await write_llmusage(llm, callerid, None, params_kw, outlines, sor) + return if isinstance(b, bytes): b = b.decode('utf-8') @@ -200,11 +264,19 @@ async def async_uapi_request(request, llm, sor): if not rzt.status or rzt.status == 'FAILED': debug(f'{b=} return error') yield f'{{"error": "ERROR:upapp return failed", "status": "SUCCEEDED" }}\n' + outlines.append({"error": "ERROR:{estr}", "status": "FAILED" }) + await write_llmusage(llm, callerid, None, params_kw, outlines, sor) return if rzt.status == 'SUCCEEDED': debug(f'{b=} return successed') await asyncio.sleep(1) d = rzt + outlines.append(d) + usage = d.get('usage', {}) + t3 = time.time() + usage['response_time'] = t2 - t1 + usage['finish_time'] = t3 -t1 + await write_llmusage(llm, callerid, usage, params_kw, outlines, sor) break period = llm.query_period or 30 await asyncio.sleep(period) @@ -228,20 +300,22 @@ def b64media2url(request, mediafile): url = entire_url('/idfile?path=') + env.quote(mediafile) return url -async def inference(request, *args, **kw): +async def inference(request, *args, params_kw=None, **kw): env = request._run_ns.copy() - llmid = env.params_kw.llmid + if not params_kw: + params_kw = env.params_kw + llmid = params_kw.llmid dbname = env.get_module_dbname('llmage') db = env.DBPools() async with db.sqlorContext(dbname) as sor: llm = await get_llm(llmid) if llm.stream == 'async': - f = partial(async_uapi_request, request, llm, sor) + f = partial(async_uapi_request, request, llm, sor, params_kw=params_kw) return await env.stream_response(request, f) if llm.stream == 'sync': - f = partial(sync_uapi_request, request, llm, sor) + f = partial(sync_uapi_request, request, llm, sor, params_kw=params_kw) return await env.stream_response(request, f) # env.update(llm) uapi = UAPI(request, sor=sor) - f = partial(uapi_request, request, llm, sor) + f = partial(uapi_request, request, llm, sor, params_kw=params_kw) return await env.stream_response(request, f) diff --git a/models/llm.xlsx b/models/llm.xlsx index 67bb365..8731c84 100644 Binary files a/models/llm.xlsx and b/models/llm.xlsx differ