171 lines
4.7 KiB
Python
171 lines
4.7 KiB
Python
|
|
import json
|
|
from time import time
|
|
from traceback import format_exc
|
|
from functools import partial
|
|
from sqlor.dbpools import DBPools
|
|
from appPublic.streamhttpclient import StreamHttpClient, liner
|
|
from appPublic.dictObject import DictObject
|
|
from appPublic.log import debug, exception, error
|
|
from appPublic.aes import aes_encode_b64
|
|
from ahserver.globalEnv import password_decode
|
|
from ahserver.serverenv import get_serverenv, ServerEnv
|
|
from random import randint
|
|
|
|
class UpAppApi:
|
|
def __init__(self, request=None):
|
|
self.env = ServerEnv()
|
|
if request:
|
|
self.env.request = request
|
|
self.uapi_data = self.env.uapi_data
|
|
self.auth_api = None
|
|
self.auth_ret = None
|
|
|
|
async def rendertmpl(self, tmplstr, params={}):
|
|
if tmplstr is None:
|
|
return None
|
|
ns = self.env.copy()
|
|
ns.update(params)
|
|
te = self.env.tmpl_engine
|
|
try:
|
|
ret = await te.renders(tmplstr, ns)
|
|
return ret
|
|
except Exception as e:
|
|
e = Exception(f'{e}:{tmplstr=}, {params=}')
|
|
exception(f'{e}')
|
|
raise e
|
|
|
|
async def get_uapis(self, upappid, apiname, callerid, params={}):
|
|
self.env.update(params)
|
|
uapi = None
|
|
auth_uapi = None
|
|
uapi = await self.uapi_data.get_api(upappid, apiname)
|
|
# uapi = await sor_get_uapi(sor, upappid, apiname)
|
|
if uapi is None:
|
|
e = Exception(f'UAPI not found:{upappid=}, {apiname=}')
|
|
exception(f'{e}\n{format_exc()}')
|
|
raise e
|
|
if uapi.auth_apiname:
|
|
auth_uapi = await self.uapi_data.get_api(upappid, uapi.auth_apiname)
|
|
self.uapi = uapi
|
|
self.auth_uapi = auth_uapi
|
|
kinfo = await self.uapi_data.get_userapikey(upappid, callerid)
|
|
self.env.update(kinfo)
|
|
return auth_uapi, uapi
|
|
|
|
async def __call__(self, upappid, apiname, callerid, params={}):
|
|
"""
|
|
"""
|
|
auth_uapi = uapi = None
|
|
auth_uapi, uapi = await self.get_uapis(upappid, apiname,
|
|
callerid, params=params)
|
|
|
|
if uapi is None:
|
|
return
|
|
if auth_uapi:
|
|
await self.do_auth(auth_uapi)
|
|
async for chunk in self.stream_resp(uapi):
|
|
yield chunk
|
|
|
|
def filter_nl_cr(self, s):
|
|
s = ''.join(s.split('\n'))
|
|
s = ''.join(s.split('\r'))
|
|
return s
|
|
|
|
async def stream_linify(self, upappid, apiname, callerid, params={}):
|
|
gen = liner(self.__call__(upappid, apiname, callerid, params=params))
|
|
async for line in gen:
|
|
debug(f'{line=},{type(line)=}')
|
|
respline = line
|
|
line = line.decode('utf-8')
|
|
filter = self.uapi.chunk_match
|
|
if not filter:
|
|
filter = ''
|
|
if line.startswith(filter):
|
|
line = line[len(filter):]
|
|
if self.uapi.response:
|
|
try:
|
|
dic = json.loads(line)
|
|
line = await self.rendertmpl(self.uapi.response, dic)
|
|
line = self.filter_nl_cr(line)
|
|
except Exception as e:
|
|
debug(f'{respline=}, {line=}, {self.uapi.response=} error({e})\n{format_exc()}')
|
|
continue
|
|
if len(line):
|
|
yield f'{line}\n'
|
|
else:
|
|
debug(f'invalid line:{line}')
|
|
|
|
async def call(self, upappid, apiname, callerid, params={}):
|
|
b = b''
|
|
async for chunk in self.__call__(upappid, apiname, callerid, params=params):
|
|
b += chunk
|
|
if self.uapi.response:
|
|
try:
|
|
dic = json.loads(b.decode('utf-8'))
|
|
s = await self.rendertmpl(self.uapi.response, dic)
|
|
b = s.encode('utf-8')
|
|
except Exception as e:
|
|
debug(f'http.response={b}, response_template={self.uapi.response}, error={e}')
|
|
return b
|
|
|
|
async def do_auth(self, auth_uapi):
|
|
b = b''
|
|
async for chunk in self.stream_resp(auth_uapi):
|
|
b+= chunk
|
|
d = json.loads(b.encode('utf-8'))
|
|
if auth_uapi.response:
|
|
s = await self.rendertmpl(auth_uapi.response, d)
|
|
d = json.loads(s)
|
|
self.env.update(d)
|
|
return
|
|
|
|
async def stream_resp(self, api):
|
|
path = await self.rendertmpl(api.path)
|
|
url = ''
|
|
if path.startswith('https://') or path.startswith('http://'):
|
|
url = path
|
|
else:
|
|
url = self.env.get('baseurl') + path
|
|
method = api.httpmethod
|
|
headers = await self.rendertmpl(api.headers)
|
|
try:
|
|
headers = json.loads(headers)
|
|
except Exception as e:
|
|
exception(f'{e}, {headers=},{api.headers=}')
|
|
raise e
|
|
body = await self.rendertmpl(api.data)
|
|
if body:
|
|
try:
|
|
bdy = json.loads(body)
|
|
body = json.dumps(bdy, ensure_ascii=False)
|
|
except Exception as e:
|
|
exception(f'{e}, {body=},{api.data=}')
|
|
body = None
|
|
_params = await self.rendertmpl(api.params)
|
|
if _params:
|
|
_params = json.loads(_params)
|
|
debug(f'{headers=}, {body=}. {method=}, {url=}')
|
|
if self.env.dynamic_func_name:
|
|
f = RegisterFunction()
|
|
opts = {
|
|
'apikey': self.env.apikey,
|
|
'secretkey': self.env.secretkey,
|
|
'method':method,
|
|
'path':path,
|
|
'headers': headers,
|
|
'params':params,
|
|
'body':body
|
|
}
|
|
await f.exe(self.env.dynamic_func, opts)
|
|
shc = StreamHttpClient()
|
|
gen = shc(method, url,
|
|
headers=headers,
|
|
data=body,
|
|
params=_params)
|
|
async for chunk in gen:
|
|
yield chunk
|
|
|
|
if __name__ == '__main__':
|
|
print('test')
|