sage/app/rf.py
2025-07-16 14:28:41 +08:00

154 lines
4.1 KiB
Python

import hashlib
import hmac
import base64
import datetime
import time
import jwt
from appPublic.registerfunction import RegisterFunction
from appPublic.rc4 import password, unpassword
from appPublic.jsonConfig import getConfig
from appPublic.log import debug
rf = RegisterFunction()
def get_module_dbname(modulename):
return 'sage'
rf.register('get_module_dbname', get_module_dbname)
def zhipu_token(uk_dic:dict):
apikey = uk_dic.get('apikey')
exp_seconds = 86400
try:
id, secret = apikey.split(".")
except Exception as e:
raise Exception("invalid apikey", e)
payload = {
"api_key": id,
"exp": int(round(time.time() * 1000)) + exp_seconds * 1000,
"timestamp": int(round(time.time() * 1000)),
}
uk_dic.update({'token':jwt.encode(
payload,
secret,
algorithm="HS256",
headers={"alg": "HS256", "sign_type": "SIGN"},
)})
return uk_dic
rf.register('zhipu_token', zhipu_token)
def shangtang_apikey(uk_dic):
apikey = uk_dic.get("apikey")
secretkey = uk_dic.get("secretkey")
current_time = int(time.time())
headers = {
"alg": "HS256",
"typ": "JWT"
}
payload = {
"iss": apikey,
"exp": current_time + 1800,
"nbf": current_time - 5
}
token = jwt.encode(payload, secretkey, algorithm="HS256", headers=headers)
return {
"apikey": apikey,
"secretkey": secretkey,
"authorization": token
}
rf.register('shangtang_apikey', shangtang_apikey)
# body_str body请求体
def mengzi_apikey(uk_dic):
body_str = uk_dic.get("body_str")
access_key = uk_dic.get("apikey")
access_secret = uk_dic.get("secretkey")
md5 = hashlib.md5(body_str.encode('utf-8')).digest()
content_md5 = base64.b64encode(md5).decode('utf-8')
date_str = datetime.datetime.utcnow().strftime('%a,%d %b %Y %H:%M:%S GMT')
nonce = str(int(datetime.datetime.utcnow().timestamp() * 1000))
string_to_sign = "\n".join([
"POST",
"application/json",
content_md5,
"application/json",
date_str,
"HMAC-SHA256",
nonce,
""
])
signature = base64.b64encode(
hmac.new(access_secret.encode('utf-8'), string_to_sign.encode('utf-8'), hashlib.sha256).digest()).decode('utf-8')
authorization = access_key + ":" + signature
return {
"apikey": access_key,
"secretkey": access_secret,
"content_md5": content_md5,
"date_header": date_str,
"nonce": nonce,
"authorization": authorization,
}
rf.register('mengzi_apikey', mengzi_apikey)
def tiangong_apikey(uk_dic):
app_key = uk_dic.get("apikey")
app_secret = uk_dic.get("secretkey")
# 获取当前的时间戳
timestamp = str(int(time.time()))
# 计算sign
sign_str = app_key + app_secret + timestamp
sign = hashlib.md5(sign_str.encode()).hexdigest()
return {
"apikey": app_key,
"secretkey": app_secret,
"sign": sign,
"timestamp": timestamp,
}
rf.register('tiangong_apikey', tiangong_apikey)
default_pkey='456ft7ygubhinjlmkjiuyg7t65'
def encode_password(ns):
config = getConfig()
pwd = ns.get('password');
pkey = config.password_key or default_pkey
debug(f'encode_password(): {pwd=}, {pkey=}')
crypt = password(pwd, key=pkey)
ns['password'] = crypt
def decode_password(ns):
config.getConfig()
pkey = config.password_key or default_pkey
crypt = ns.get('password')
pwd = unpassword(crypt, key=pkey)
ns['password'] = pwd;
return pwd
# {dbname}:{tablename}:c:before
def cut_b64_image_header(b64img):
debug('cut_b64_image_header() called .........')
if isinstance(b64img, str):
return b64img.split(';base64,')[-1]
if isinstance(b64img, list):
ret = []
for s in b64img:
ret.append(s.split(';base64,')[-1])
return ret
rf.register('cut_b64_image_header', cut_b64_image_header)
# rf.register('password', encode_password)
# rf.register('sage:hostdev:c:before', encode_password)
# rf.register('sage:hostdev:u:before', encode_password)
# rf.register('sage:users:c:before', encode_password)
# rf.register('sage:users:u:before', encode_password)
# rf.register('unpassword', decode_password)