uapi/uapi/appapi.py
2025-09-19 15:07:56 +08:00

251 lines
6.8 KiB
Python

import json
from time import time
from traceback import format_exc
from functools import partial
from sqlor.dbpools import DBPools
from appPublic.streamhttpclient import StreamHttpClient, liner
from appPublic.dictObject import DictObject
from appPublic.myTE import MyTemplateEngine
from appPublic.log import debug, exception, error
from appPublic.aes import aes_encode_b64
from ahserver.globalEnv import password_decode
from ahserver.serverenv import get_serverenv
from random import randint
async def get_callerid(orgid):
dbname = get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
return await sor_get_callerid(sor, orgid)
return None
async def sor_get_callerid(sor, orgid):
sql = """select a.ownerid from upappkey a, users b
where b.orgid = ${orgid}$
and a.ownerid = b.id"""
recs = await sor.sqlExe(sql, {'orgid': orgid})
cnt = len(recs)
if cnt == 0:
return None
i = randint(0, cnt - 1)
return recs[i].ownerid
async def get_deerer(upappid, callerid):
db = DBPools()
dbname = get_dbname()
async with db.sqlorContext(dbname) as sor:
ki = await get_userapikey(sor, upappid, callerid)
d = deerer(ki.myappid, ki.apikey, ki.secretkey)
if not d:
return None
return d[7:]
return None
async def get_uapi(upappid, apiname):
dbname = get_dbname()
db = DBPools()
async with db.sqlorContext(dbname) as sor:
return await sor_get_uapi(sor, upappid, apiname)
return None
async def get_userapikey(sor, upappid, callerid):
"""
argumemts:
upappid: upappid which will make call to
orgid: owner organization or user which as the caller of the call
return:
None: this orgid has not gotton apikey from upapp
dict if apikey and upapp infos
"""
recs = await sor.R('upapp', {'id': upappid})
if len(recs) < 1:
e = Exception(f'upapp id={upappid} not found, result={recs}')
exception(f'{e}, {format_exc()}')
raise e
r = recs[0]
apikeys = await sor.R('upappkey', {'upappid': upappid, 'ownerid': callerid})
if len(apikeys) > 0:
r.apikey = apikeys[0].apikey
else:
debug(f'-------user({callerid}) has not upappkey -----')
r = recs[0]
debug(f'{r=}')
return DictObject(**{
'apikey':None if r.apikey is None else password_decode(r.apikey),
'secretkey':None if r.secretkey is None else password_decode(r.secretkey),
'baseurl':r.baseurl,
'appownerid': r.appownerid,
'myappid': r.myappid
})
async def sor_get_uapi(sor, upappid, apiname):
sql = """select a.*,
c.auth_apiname
from uapi a, upapp b, uapiset c
where a.apisetid = b.apisetid
and b.apisetid = c.id
and a.name = ${apiname}$
and b.id = ${upappid}$"""
recs = await sor.sqlExe(sql, {'upappid': upappid, 'apiname': apiname})
if len(recs) == 0:
debug(f'{sql=},{upappid=}, {apiname=} uapi not found')
return None
return recs[0]
def get_dbname():
f = get_serverenv('get_module_dbname')
dbname = f('uapi')
return dbname
def deerer(myappid, apikey, secretkey):
t = time()
txt = f'{t}:{apikey}'
cyber = aes_encode_b64(secretkey, txt)
return f'Deerer {myappid}-:-{cyber}'
def bearer(apikey):
return f'Bearer {apikey}'
async def sync_users(request, upappid, userid):
db = DBPools()
dbname = get_dbname()
async with db.sqlorContext(dbname) as sor:
upapp = await get_upapp(sor, upappid)
class UAPI:
def __init__(self, request, env=DictObject(), sor=None):
self.request = request
self.te = MyTemplateEngine([], env=env)
self.env = request._run_ns.copy()
self.env.request = request
self.auth_api = None
self.auth_ret = None
self.sor = sor
def rendertmpl(self, tmplstr, params={}):
if tmplstr is None:
return None
ns = self.env.copy()
ns.update(params)
te = MyTemplateEngine([], env=self.env)
return te.renders(tmplstr, ns)
async def get_uapis(self, sor, upappid, apiname, callerid, params={}):
self.env.update(params)
uapi = None
auth_uapi = None
uapi = await sor_get_uapi(sor, upappid, apiname)
if uapi is None:
e = Exception(f'UAPI not found:{upappid=}, {apiname=}')
exception(f'{e}\n{format_exc()}')
raise e
if uapi.auth_apiname:
auth_uapi = await sor_get_uapi(sor, upappid, iuapi.auth_apiname)
self.uapi = uapi
self.auth_uapi = auth_uapi
kinfo = await get_userapikey(sor, upappid, callerid)
self.env.update(kinfo)
return auth_uapi, uapi
async def __call__(self, upappid, apiname, callerid, params={}):
"""
"""
auth_uapi = uapi = None
dbname = get_dbname()
debug(f'{dbname=}')
db = DBPools()
async with db.sqlorContext(dbname) as sor:
self.sor = sor
auth_uapi, uapi = await self.get_uapis(sor,
upappid, apiname,
callerid, params=params)
if uapi is None:
return
if auth_uapi:
await self.do_auth(auth_uapi)
async for chunk in self.stream_resp(uapi):
yield chunk
def filter_nl_cr(self, s):
s = ''.join(s.split('\n'))
s = ''.join(s.split('\r'))
return s
async def stream_linify(self, upappid, apiname, callerid, params={}):
gen = liner(self.__call__(upappid, apiname, callerid, params=params))
async for line in gen:
debug(f'{line=},{type(line)=}')
line = line.decode('utf-8')
filter = self.uapi.chunk_match
if not filter:
filter = ''
if line.startswith(filter):
line = line[len(filter):]
if self.uapi.response:
try:
dic = json.loads(line)
line = self.rendertmpl(self.uapi.response, dic)
line = self.filter_nl_cr(line)
except Exception as e:
debug(f'{line=}, {self.uapi.response=} error({e})\n{format_exc()}')
continue
if len(line):
yield f'{line}\n'
else:
debug(f'invalid line:{line}')
async def call(self, upappid, apiname, callerid, params={}):
b = b''
async for chunk in self.__call__(upappid, apiname, callerid, params=params):
b += chunk
if self.uapi.response:
dic = json.loads(b.decode('utf-8'))
s = self.rendertmpl(self.uapi.response, dic)
b = s.encode('utf-8')
return b
async def do_auth(self, auth_uapi):
b = b''
async for chunk in self.stream_resp(auth_uapi):
b+= chunk
d = json.loads(b.encode('utf-8'))
if auth_uapi.response:
s = self.rendertmpl(auth_uapi.response, d)
d = json.loads(s)
self.env.update(d)
return
async def stream_resp(self, api):
path = self.rendertmpl(api.path)
url = self.env.get('baseurl') + path
method = api.httpmethod
headers = self.rendertmpl(api.headers)
try:
headers = json.loads(headers)
except Exception as e:
exception(f'{e}, {headers=},{api.headers=}')
raise e
body = self.rendertmpl(api.data)
if body:
try:
bdy = json.loads(body)
body = json.dumps(bdy, ensure_ascii=False)
except Exception as e:
exception(f'{e}, {body=},{api.data=}')
body = None
_params = self.rendertmpl(api.params)
if _params:
_params = json.loads(_params)
debug(f'{headers=}, {body=}. {method=}, {url=}')
shc = StreamHttpClient()
gen = shc(method, url,
headers=headers,
data=body,
params=_params)
async for chunk in gen:
yield chunk
if __name__ == '__main__':
print('test')