154 lines
4.1 KiB
Python
154 lines
4.1 KiB
Python
import hashlib
|
|
import hmac
|
|
import base64
|
|
import datetime
|
|
import time
|
|
import jwt
|
|
|
|
from appPublic.registerfunction import RegisterFunction
|
|
from appPublic.rc4 import password, unpassword
|
|
from appPublic.jsonConfig import getConfig
|
|
from appPublic.log import debug
|
|
|
|
rf = RegisterFunction()
|
|
def get_module_dbname(modulename):
|
|
return 'sage'
|
|
|
|
rf.register('get_module_dbname', get_module_dbname)
|
|
|
|
def zhipu_token(uk_dic:dict):
|
|
apikey = uk_dic.get('apikey')
|
|
exp_seconds = 86400
|
|
try:
|
|
id, secret = apikey.split(".")
|
|
except Exception as e:
|
|
raise Exception("invalid apikey", e)
|
|
|
|
payload = {
|
|
"api_key": id,
|
|
"exp": int(round(time.time() * 1000)) + exp_seconds * 1000,
|
|
"timestamp": int(round(time.time() * 1000)),
|
|
}
|
|
|
|
uk_dic.update({'token':jwt.encode(
|
|
payload,
|
|
secret,
|
|
algorithm="HS256",
|
|
headers={"alg": "HS256", "sign_type": "SIGN"},
|
|
)})
|
|
return uk_dic
|
|
|
|
rf.register('zhipu_token', zhipu_token)
|
|
|
|
def shangtang_apikey(uk_dic):
|
|
apikey = uk_dic.get("apikey")
|
|
secretkey = uk_dic.get("secretkey")
|
|
current_time = int(time.time())
|
|
headers = {
|
|
"alg": "HS256",
|
|
"typ": "JWT"
|
|
}
|
|
payload = {
|
|
"iss": apikey,
|
|
"exp": current_time + 1800,
|
|
"nbf": current_time - 5
|
|
}
|
|
token = jwt.encode(payload, secretkey, algorithm="HS256", headers=headers)
|
|
return {
|
|
"apikey": apikey,
|
|
"secretkey": secretkey,
|
|
"authorization": token
|
|
}
|
|
rf.register('shangtang_apikey', shangtang_apikey)
|
|
|
|
# body_str body请求体
|
|
def mengzi_apikey(uk_dic):
|
|
body_str = uk_dic.get("body_str")
|
|
access_key = uk_dic.get("apikey")
|
|
access_secret = uk_dic.get("secretkey")
|
|
md5 = hashlib.md5(body_str.encode('utf-8')).digest()
|
|
content_md5 = base64.b64encode(md5).decode('utf-8')
|
|
|
|
date_str = datetime.datetime.utcnow().strftime('%a,%d %b %Y %H:%M:%S GMT')
|
|
|
|
nonce = str(int(datetime.datetime.utcnow().timestamp() * 1000))
|
|
|
|
string_to_sign = "\n".join([
|
|
"POST",
|
|
"application/json",
|
|
content_md5,
|
|
"application/json",
|
|
date_str,
|
|
"HMAC-SHA256",
|
|
nonce,
|
|
""
|
|
])
|
|
|
|
signature = base64.b64encode(
|
|
hmac.new(access_secret.encode('utf-8'), string_to_sign.encode('utf-8'), hashlib.sha256).digest()).decode('utf-8')
|
|
authorization = access_key + ":" + signature
|
|
|
|
return {
|
|
"apikey": access_key,
|
|
"secretkey": access_secret,
|
|
"content_md5": content_md5,
|
|
"date_header": date_str,
|
|
"nonce": nonce,
|
|
"authorization": authorization,
|
|
}
|
|
rf.register('mengzi_apikey', mengzi_apikey)
|
|
|
|
def tiangong_apikey(uk_dic):
|
|
app_key = uk_dic.get("apikey")
|
|
app_secret = uk_dic.get("secretkey")
|
|
# 获取当前的时间戳
|
|
timestamp = str(int(time.time()))
|
|
# 计算sign
|
|
sign_str = app_key + app_secret + timestamp
|
|
sign = hashlib.md5(sign_str.encode()).hexdigest()
|
|
return {
|
|
"apikey": app_key,
|
|
"secretkey": app_secret,
|
|
"sign": sign,
|
|
"timestamp": timestamp,
|
|
}
|
|
|
|
rf.register('tiangong_apikey', tiangong_apikey)
|
|
|
|
default_pkey='456ft7ygubhinjlmkjiuyg7t65'
|
|
def encode_password(ns):
|
|
config = getConfig()
|
|
pwd = ns.get('password');
|
|
pkey = config.password_key or default_pkey
|
|
debug(f'encode_password(): {pwd=}, {pkey=}')
|
|
crypt = password(pwd, key=pkey)
|
|
ns['password'] = crypt
|
|
|
|
def decode_password(ns):
|
|
config.getConfig()
|
|
pkey = config.password_key or default_pkey
|
|
crypt = ns.get('password')
|
|
pwd = unpassword(crypt, key=pkey)
|
|
ns['password'] = pwd;
|
|
return pwd
|
|
|
|
# {dbname}:{tablename}:c:before
|
|
|
|
def cut_b64_image_header(b64img):
|
|
debug('cut_b64_image_header() called .........')
|
|
if isinstance(b64img, str):
|
|
return b64img.split(';base64,')[-1]
|
|
if isinstance(b64img, list):
|
|
ret = []
|
|
for s in b64img:
|
|
ret.append(s.split(';base64,')[-1])
|
|
return ret
|
|
|
|
rf.register('cut_b64_image_header', cut_b64_image_header)
|
|
# rf.register('password', encode_password)
|
|
# rf.register('sage:hostdev:c:before', encode_password)
|
|
# rf.register('sage:hostdev:u:before', encode_password)
|
|
# rf.register('sage:users:c:before', encode_password)
|
|
# rf.register('sage:users:u:before', encode_password)
|
|
# rf.register('unpassword', decode_password)
|