import hashlib import hmac import base64 import datetime import time import jwt from appPublic.registerfunction import RegisterFunction from appPublic.rc4 import password, unpassword from appPublic.jsonConfig import getConfig from appPublic.log import debug rf = RegisterFunction() def get_module_dbname(modulename): return 'sage' rf.register('get_module_dbname', get_module_dbname) def zhipu_token(uk_dic:dict): apikey = uk_dic.get('apikey') exp_seconds = 86400 try: id, secret = apikey.split(".") except Exception as e: raise Exception("invalid apikey", e) payload = { "api_key": id, "exp": int(round(time.time() * 1000)) + exp_seconds * 1000, "timestamp": int(round(time.time() * 1000)), } uk_dic.update({'token':jwt.encode( payload, secret, algorithm="HS256", headers={"alg": "HS256", "sign_type": "SIGN"}, )}) return uk_dic rf.register('zhipu_token', zhipu_token) def shangtang_apikey(uk_dic): apikey = uk_dic.get("apikey") secretkey = uk_dic.get("secretkey") current_time = int(time.time()) headers = { "alg": "HS256", "typ": "JWT" } payload = { "iss": apikey, "exp": current_time + 1800, "nbf": current_time - 5 } token = jwt.encode(payload, secretkey, algorithm="HS256", headers=headers) return { "apikey": apikey, "secretkey": secretkey, "authorization": token } rf.register('shangtang_apikey', shangtang_apikey) # body_str body请求体 def mengzi_apikey(uk_dic): body_str = uk_dic.get("body_str") access_key = uk_dic.get("apikey") access_secret = uk_dic.get("secretkey") md5 = hashlib.md5(body_str.encode('utf-8')).digest() content_md5 = base64.b64encode(md5).decode('utf-8') date_str = datetime.datetime.utcnow().strftime('%a,%d %b %Y %H:%M:%S GMT') nonce = str(int(datetime.datetime.utcnow().timestamp() * 1000)) string_to_sign = "\n".join([ "POST", "application/json", content_md5, "application/json", date_str, "HMAC-SHA256", nonce, "" ]) signature = base64.b64encode( hmac.new(access_secret.encode('utf-8'), string_to_sign.encode('utf-8'), hashlib.sha256).digest()).decode('utf-8') authorization = access_key + ":" + signature return { "apikey": access_key, "secretkey": access_secret, "content_md5": content_md5, "date_header": date_str, "nonce": nonce, "authorization": authorization, } rf.register('mengzi_apikey', mengzi_apikey) def tiangong_apikey(uk_dic): app_key = uk_dic.get("apikey") app_secret = uk_dic.get("secretkey") # 获取当前的时间戳 timestamp = str(int(time.time())) # 计算sign sign_str = app_key + app_secret + timestamp sign = hashlib.md5(sign_str.encode()).hexdigest() return { "apikey": app_key, "secretkey": app_secret, "sign": sign, "timestamp": timestamp, } rf.register('tiangong_apikey', tiangong_apikey) default_pkey='456ft7ygubhinjlmkjiuyg7t65' def encode_password(ns): config = getConfig() pwd = ns.get('password'); pkey = config.password_key or default_pkey debug(f'encode_password(): {pwd=}, {pkey=}') crypt = password(pwd, key=pkey) ns['password'] = crypt def decode_password(ns): config.getConfig() pkey = config.password_key or default_pkey crypt = ns.get('password') pwd = unpassword(crypt, key=pkey) ns['password'] = pwd; return pwd # {dbname}:{tablename}:c:before def cut_b64_image_header(b64img): debug('cut_b64_image_header() called .........') if isinstance(b64img, str): return b64img.split(';base64,')[-1] if isinstance(b64img, list): ret = [] for s in b64img: ret.append(s.split(';base64,')[-1]) return ret rf.register('cut_b64_image_header', cut_b64_image_header) # rf.register('password', encode_password) # rf.register('sage:hostdev:c:before', encode_password) # rf.register('sage:hostdev:u:before', encode_password) # rf.register('sage:users:c:before', encode_password) # rf.register('sage:users:u:before', encode_password) # rf.register('unpassword', decode_password)