From f04917363d426003ebd5e05767b00af2ccefa508 Mon Sep 17 00:00:00 2001 From: yumoqing Date: Tue, 7 Apr 2026 14:06:28 +0800 Subject: [PATCH] bugfix --- uapi/apidata.py | 137 +++++++++++++++++++++++++++++++++++++++ uapi/uapi.py | 168 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 305 insertions(+) create mode 100644 uapi/apidata.py create mode 100644 uapi/uapi.py diff --git a/uapi/apidata.py b/uapi/apidata.py new file mode 100644 index 0000000..88ac760 --- /dev/null +++ b/uapi/apidata.py @@ -0,0 +1,137 @@ +import json +from time import time +from traceback import format_exc +from functools import partial +from sqlor.dbpools import DBPools, get_sor_context +from appPublic.Singleton import SingletonDecorator +from appPublic.streamhttpclient import StreamHttpClient, liner +from appPublic.dictObject import DictObject +from appPublic.log import debug, exception, error +from appPublic.aes import aes_encode_b64 +from ahserver.globalEnv import password_decode +from ahserver.serverenv import get_serverenv, ServerEnv +from random import randint + +async def get_deerer(upappid, callerid): + db = DBPools() + dbname = get_dbname() + async with db.sqlorContext(dbname) as sor: + ki = await get_userapikey(sor, upappid, callerid) + d = deerer(ki.myappid, ki.apikey, ki.secretkey) + if not d: + return None + return d[7:] + return None + +async def sor_get_uapi(sor, upappid, apiname): + sql = """select a.*, +c.auth_apiname +from uapi a, upapp b, uapiset c +where a.apisetid = b.apisetid + and b.apisetid = c.id + and a.name = ${apiname}$ + and b.id = ${upappid}$""" + recs = await sor.sqlExe(sql, {'upappid': upappid, 'apiname': apiname}) + if len(recs) == 0: + debug(f'{sql=},{upappid=}, {apiname=} uapi not found') + return None + return recs[0] + +def deerer(myappid, apikey, secretkey): + t = time() + txt = f'{t}:{apikey}' + cyber = aes_encode_b64(secretkey, txt) + return f'Deerer {myappid}-:-{cyber}' + +def bearer(apikey): + return f'Bearer {apikey}' + +@SingletonDecorator +class UAPIData: + def __init__(self): + self.apidata = {} + self.apikeys = {} + self.org_users = {} + + async def get_userapikey(self, appid, callerid): + users = await self.get_apiusers(appid) + for u in users: + if u.userid == callerid: + return DictObject(**{ + 'apikey':u.apikey, + 'secretkey':u.secretkey, + 'baseurl':u.baseurl, + 'appownerid': u.ownerid, + 'dynamic_func_name': u.dynamic_func, + 'myappid': u.myappid + + }) + return None + + async def get_apiusers(self, appid, orgid=None): + key = appid + d = self.org_users.get(key) + if d: + return d + env = ServerEnv() + async with get_sor_context(env, 'uapi') as sor: + sql = """select +c.id, +c.ownerid, +c.myappid, +c.baseurl, +c.secretkey, +c.dynamic_func, +a.ownerid as userid, +a.orgid as userorgid, +a.apikey +from upappkey a, users b, upapp c +where b.orgid = c.ownerid + and a.ownerid = b.id + and a.upappid = c.id + and a.upappid = ${appid}$ +""" + ns = {'appid': appid, 'orgid': orgid} + if orgid: + sql += " and a.orgid = ${orgid}$" + else: + sql += " and a.orgid = c.ownerid" + + d = await sor.sqlExe(sql, ns) + for r in d: + r.apikey = None if r.apikey is None else password_decode(r.apikey) + r.secretkey = None if r.secretkey is None else password_decode(r.secretkey) + if d is None: + e = Exception(f'{appid=} {orgid=} get none user') + exception(f'{e}') + raise e + self.apidata[key] = d + return d + e = Exception(f'{appid=} {orgid=} get none user') + exception(f'{e}') + raise e + + async def get_calluserid(self, appid, orgid=None): + users = await get_apiusers(appid, orgid=orgid) + cnt = len(users) + i = randint(0, cnt - 1) + return users[i].userid + + async def get_api(self, appid, apiname): + key = f'{appid}.{apiname}' + api = self.apidata.get(key) + if api: + return api + env = ServerEnv() + async with get_sor_context(env, 'uapi') as sor: + d = await sor_get_uapi(sor, appid, apiname) + if d is None: + e = Exception(f'{appid=}, {apiname=} get none api') + exception(f'{e}') + raise e + self.apidata[key] = d + return d + e = Exception(f'{appid=}, {apiname=} get none api') + exception(f'{e}') + raise e + diff --git a/uapi/uapi.py b/uapi/uapi.py new file mode 100644 index 0000000..bc9f68e --- /dev/null +++ b/uapi/uapi.py @@ -0,0 +1,168 @@ + +import json +from time import time +from traceback import format_exc +from functools import partial +from sqlor.dbpools import DBPools +from appPublic.streamhttpclient import StreamHttpClient, liner +from appPublic.dictObject import DictObject +from appPublic.log import debug, exception, error +from appPublic.aes import aes_encode_b64 +from ahserver.globalEnv import password_decode +from ahserver.serverenv import get_serverenv, ServerEnv +from random import randint + +class UpAppApi: + def __init__(self): + self.env = ServerEnv() + self.uapi_data = self.env.uapi_data + self.auth_api = None + self.auth_ret = None + + async def rendertmpl(self, tmplstr, params={}): + if tmplstr is None: + return None + ns = self.env.copy() + ns.update(params) + te = self.env.tmpl_engine + try: + ret = await te.renders(tmplstr, ns) + return ret + except Exception as e: + e = Exception(f'{e}:{tmplstr=}, {params=}') + exception(f'{e}') + raise e + + async def get_uapis(self, upappid, apiname, callerid, params={}): + self.env.update(params) + uapi = None + auth_uapi = None + uapi = await self.uapi_data.get_api(upappid, apiname) + # uapi = await sor_get_uapi(sor, upappid, apiname) + if uapi is None: + e = Exception(f'UAPI not found:{upappid=}, {apiname=}') + exception(f'{e}\n{format_exc()}') + raise e + if uapi.auth_apiname: + auth_uapi = await self.uapi_data.get_api(upappid, uapi.auth_apiname) + self.uapi = uapi + self.auth_uapi = auth_uapi + kinfo = await self.uapi_data.get_userapikey(upappid, callerid) + self.env.update(kinfo) + return auth_uapi, uapi + + async def __call__(self, upappid, apiname, callerid, params={}): + """ + """ + auth_uapi = uapi = None + auth_uapi, uapi = await self.get_uapis(upappid, apiname, + callerid, params=params) + + if uapi is None: + return + if auth_uapi: + await self.do_auth(auth_uapi) + async for chunk in self.stream_resp(uapi): + yield chunk + + def filter_nl_cr(self, s): + s = ''.join(s.split('\n')) + s = ''.join(s.split('\r')) + return s + + async def stream_linify(self, upappid, apiname, callerid, params={}): + gen = liner(self.__call__(upappid, apiname, callerid, params=params)) + async for line in gen: + debug(f'{line=},{type(line)=}') + respline = line + line = line.decode('utf-8') + filter = self.uapi.chunk_match + if not filter: + filter = '' + if line.startswith(filter): + line = line[len(filter):] + if self.uapi.response: + try: + dic = json.loads(line) + line = await self.rendertmpl(self.uapi.response, dic) + line = self.filter_nl_cr(line) + except Exception as e: + debug(f'{respline=}, {line=}, {self.uapi.response=} error({e})\n{format_exc()}') + continue + if len(line): + yield f'{line}\n' + else: + debug(f'invalid line:{line}') + + async def call(self, upappid, apiname, callerid, params={}): + b = b'' + async for chunk in self.__call__(upappid, apiname, callerid, params=params): + b += chunk + if self.uapi.response: + try: + dic = json.loads(b.decode('utf-8')) + s = await self.rendertmpl(self.uapi.response, dic) + b = s.encode('utf-8') + except Exception as e: + debug(f'http.response={b}, response_template={self.uapi.response}, error={e}') + return b + + async def do_auth(self, auth_uapi): + b = b'' + async for chunk in self.stream_resp(auth_uapi): + b+= chunk + d = json.loads(b.encode('utf-8')) + if auth_uapi.response: + s = await self.rendertmpl(auth_uapi.response, d) + d = json.loads(s) + self.env.update(d) + return + + async def stream_resp(self, api): + path = await self.rendertmpl(api.path) + url = '' + if path.startswith('https://') or path.startswith('http://'): + url = path + else: + url = self.env.get('baseurl') + path + method = api.httpmethod + headers = await self.rendertmpl(api.headers) + try: + headers = json.loads(headers) + except Exception as e: + exception(f'{e}, {headers=},{api.headers=}') + raise e + body = await self.rendertmpl(api.data) + if body: + try: + bdy = json.loads(body) + body = json.dumps(bdy, ensure_ascii=False) + except Exception as e: + exception(f'{e}, {body=},{api.data=}') + body = None + _params = await self.rendertmpl(api.params) + if _params: + _params = json.loads(_params) + debug(f'{headers=}, {body=}. {method=}, {url=}') + if self.env.dynamic_func_name: + f = RegisterFunction() + opts = { + 'apikey': self.env.apikey, + 'secretkey': self.env.secretkey, + 'method':method, + 'path':path, + 'headers': headers, + 'params':params, + 'body':body + } + await f.exe(self.env.dynamic_func, opts) + shc = StreamHttpClient() + gen = shc(method, url, + headers=headers, + data=body, + params=_params) + async for chunk in gen: + yield chunk + +if __name__ == '__main__': + print('test')