uapi/uapi/appapi.py
2025-08-14 15:05:07 +08:00

216 lines
5.8 KiB
Python

import json
from time import time
from traceback import format_exc
from functools import partial
from sqlor.dbpools import DBPools
from appPublic.streamhttpclient import StreamHttpClient, liner
from appPublic.dictObject import DictObject
from appPublic.myTE import MyTemplateEngine
from appPublic.log import debug, exception, error
from appPublic.aes import aes_encrypt_ecb
from ahserver.globalEnv import password_decode
from ahserver.serverenv import get_serverenv
from random import randint
async def get_callerid(orgid):
dbname = get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
return await sor_get_callerid(sor, orgid)
return None
async def sor_get_callerid(sor, orgid):
sql = """select a.ownerid from upappkey a, users b
where b.orgid = ${orgid}$
and a.ownerid = b.id"""
recs = await sor.sqlExe(sql, {'orgid': orgid})
cnt = len(recs)
if cnt == 0:
return None
i = randint(0, cnt - 1)
return recs[i].ownerid
async def get_uapi(upappid, apiname):
dbname = get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
return await sor_get_uapi(sor, upappid, apiname)
return None
async def sor_get_uapi(sor, upappid, apiname):
sql = """select a.*,
c.auth_apiname
from uapi a, upapp b, uapiset c
where a.apisetid = b.apisetid
and b.apisetid = c.id
and a.name = ${apiname}$
and b.id = ${upappid}$"""
recs = await sor.sqlExe(sql, {'upappid': upappid, 'apiname': apiname})
if len(recs) == 0:
debug(f'{sql=},{upappid=}, {apiname=} uapi not found')
return None
return recs[0]
def get_dbname():
f = get_serverenv('get_module_dbname')
dbname = f('uapi')
return dbname
def deerer(myappid, apikey, secretkey):
t = time()
txt = f'{t}:{apikey}'
cyber = aes_encrypt_ecb(secretkey, txt)
return f'Deerer {appid}-:-{cyber}'
def bearer(apikey):
return f'Bearer {apikey}'
async def sync_users(request, upappid, userid):
db = DBPools()
dbname = get_dbname()
async with db.sqlorContext(dbname) as sor:
upapp = await get_upapp(sor, upappid)
class UAPI:
def __init__(self, request, env=DictObject(), sor=None):
self.request = request
self.te = MyTemplateEngine([], env=env)
self.env = env
self.env.request = request
self.auth_api = None
self.auth_ret = None
self.sor = sor
async def rendertmpl(self, tmplstr, params={}):
if tmplstr is None:
return None
ns = self.env.copy()
ns.update(params)
te = MyTemplateEngine([], env=self.env)
return te.renders(tmplstr, ns)
async def get_uapis(self, sor, upappid, apiname, callerid, params={}):
self.env.update(params)
uapi = None
auth_uapi = None
uapi = await sor_get_uapi(sor, upappid, apiname)
if uapi is None:
e = Exception(f'UAPI not found:{upappid=}, {apiname=}')
exception(f'{e}\n{format_exc()}')
raise e
if uapi.auth_apiname:
auth_uapi = await sor_get_uapi(sor, upappid, iuapi.auth_apiname)
kinfo = await self.get_userapikey(sor, upappid, callerid)
self.env.update(kinfo)
return auth_uapi, uapi
async def __call__(self, upappid, apiname, callerid, params={}):
"""
"""
auth_uapi = uapi = None
dbname = get_dbname()
debug(f'{dbname=}')
db = DBPools()
async with db.sqlorContext(dbname) as sor:
self.sor = sor
auth_uapi, uapi = await self.get_uapis(sor,
upappid, apiname,
callerid, params=params)
if uapi is None:
return
if auth_uapi:
await self.do_auth(auth_uapi)
async for chunk in self.stream_resp(uapi):
yield chunk
async def stream_linify(self, upappid, apiname, callerid, params={}):
gen = liner(self.__call__(upappid, apiname, callerid, params=params))
async for line in gen:
filter = self.uapi.chunk_match
if line.startswith(filter):
line = line[len(filter):]
if self.uapi.streamresponse:
dic = json.loads(line)
line = self.rendertmpl(self.uapi.streamresponse, dic)
yield line + '\n'
else:
debug(f'invalid line:{line}')
async def call(self, upappid, apiname, callerid, params={}):
b = b''
async for chunk in self.__call__(upappid, apiname, callerid, params=params):
b += chunk
return b
async def do_auth(self, auth_uapi):
b = b''
async for chunk in self.stream_resp(auth_uapi):
b+= chunk
d = json.loads(b.encode('utf-8'))
self.env.update(d)
return
async def stream_resp(self, api):
url = self.env.get('baseurl') + api.path
method = api.httpmethod
headers = await self.rendertmpl(api.headers)
headers = json.loads(headers)
body = await self.rendertmpl(api.data)
if body:
try:
bdy = json.loads(body)
body = json.dumps(bdy, ensure_ascii=False)
except Exception as e:
exception(f'{e}, {body=},{api.data=}, {self.env=}')
body = None
_params = await self.rendertmpl(api.params)
if _params:
_params = json.loads(_params)
debug(f'{headers=}, {body=}. {method=}, {url=}')
shc = StreamHttpClient()
gen = shc(method, url,
headers=headers,
data=body,
params=_params)
async for chunk in gen:
yield chunk
async def get_userapikey(self, sor, upappid, callerid):
"""
argumemts:
upappid: upappid which will make call to
orgid: owner organization or user which as the caller of the call
return:
None: this orgid has not gotton apikey from upapp
dict if apikey and upapp infos
"""
sql = """select
a.myappid,
a.ownerid as appownerid,
a.baseurl,
b.apikey,
a.secretkey
from upapp a, upappkey b
where a.id = b.upappid
and a.id = ${appid}$
and b.ownerid = ${ownerid}$"""
recs = await sor.sqlExe(sql, {'appid':upappid, 'ownerid': callerid})
if len(recs) < 1:
e = Exception(f'{upappid=}, {callerid=} has not apikey')
exception(f'{e}, {format_exc()}')
raise e
r = recs[0]
debug(f'{r=}')
return DictObject(**{
'apikey':password_decode(r.apikey),
'secretkey':password_decode(r.secretkey),
'baseurl':r.baseurl,
'appownerid': r.appownerid,
'myappid': r.myappid
})
if __name__ == '__main__':
print('test')