class DingCallbackCrypto3: def __init__(self, token, encodingAesKey, key): self.encodingAesKey = encodingAesKey self.key = key self.token = token self.aesKey = base64.b64decode(self.encodingAesKey + '=') ## 生成回调处理完成后的success加密数据 def getEncryptedMap(self, content): encryptContent = self.encrypt(content) timeStamp = str(int(datetime.datetime.now().timestamp())) nonce = self.generateRandomKey(16) sign = self.generateSignature(nonce, timeStamp, self.token, encryptContent) return {'msg_signature': sign, 'encrypt': encryptContent, 'timeStamp': timeStamp, 'nonce': nonce} ##解密钉钉发送的数据 def getDecryptMsg(self, msg_signature, timeStamp, nonce, content): """ 解密 :param content: :return: """ sign = self.generateSignature(nonce, timeStamp, self.token, content) print(sign, msg_signature) if msg_signature != sign: raise ValueError('signature check error') # 对密文BASE64解码 content = base64.decodebytes(content.encode('UTF-8')) ##钉钉返回的消息体 iv = self.aesKey[:16] ##初始向量 aesDecode = AES.new(self.aesKey, AES.MODE_CBC, iv) decodeRes = aesDecode.decrypt(content) pad = int(decodeRes[-1]) if pad > 32: raise ValueError('Input is not padded or padding is corrupt') decodeRes = decodeRes[:-pad] l = struct.unpack('!i', decodeRes[16:20])[0] ##获取去除初始向量,四位msg长度以及尾部corpid nl = len(decodeRes) if decodeRes[(20 + l):].decode() != self.key: raise ValueError('corpId 钉钉回调校验错误') return decodeRes[20:(20 + l)].decode() def encrypt(self, content): """ 加密 :param content: :return: """ msg_len = self.length(content) content = ''.join([self.generateRandomKey(16), msg_len.decode(), content, self.key]) contentEncode = self.pks7encode(content) iv = self.aesKey[:16] aesEncode = AES.new(self.aesKey, AES.MODE_CBC, iv) aesEncrypt = aesEncode.encrypt(contentEncode.encode('UTF-8')) return base64.encodebytes(aesEncrypt).decode('UTF-8') ### 生成回调返回使用的签名值 def generateSignature(self, nonce, timestamp, token, msg_encrypt): info(f"{type(nonce)=}, {type(timestamp)=}, {type(token)=}, {type(msg_encrypt)=}") v = msg_encrypt signList = ''.join(sorted([nonce, timestamp, token, v])) return hashlib.sha1(signList.encode()).hexdigest() def length(self, content): """ 将msg_len转为符合要求的四位字节长度 :param content: :return: """ l = len(content) return struct.pack('>l', l) def pks7encode(self, content): """ 安装 PKCS#7 标准填充字符串 :param text: str :return: str """ l = len(content) output = io.StringIO() val = 32 - (l % 32) for _ in range(val): output.write('%02x' % val) # print "pks7encode",content,"pks7encode", val, "pks7encode", output.getvalue() return content + binascii.unhexlify(output.getvalue()).decode() def pks7decode(self, content): nl = len(content) val = int(binascii.hexlify(content[-1]), 16) if val > 32: raise ValueError('Input is not padded or padding is corrupt') l = nl - val return content[:l] def generateRandomKey(self, size, chars=string.ascii_letters + string.ascii_lowercase + string.ascii_uppercase + string.digits): """ 生成加密所需要的随机字符串 :param size: :param chars: :return: """ return ''.join(random.choice(chars) for i in range(size)) async def save_data(apv_id, finis_hTime, apv_status) -> str: """ 保存审批回调数据 :param apv_id: 审批id :param finis_hTime: 创建时间 :param apv_status: 流程状态 """ data = { "apv_id": apv_id, "apv_status": apv_status, "update_at": str(datetime.datetime.now())[:19], } db = DBPools() async with db.sqlorContext('kboss') as sor: sql = "update apv_data set apv_status = ${apv_status}$ , apv_finish_time=${update_at}$,update_at=${update_at}$ where apv_id = ${apv_id}$" await sor.sqlExe(sql, data) data = { "id": uuid(), "apv_id": apv_id, "apv_status": apv_status, } await sor.C("apv_status_history", data) info("数据入库成功") async def get_key(oid: str) -> str: db = DBPools() async with db.sqlorContext('kboss') as sor: sql = "select http_token,http_aes_key,app_key from apv_key where del_flg = '0' and orgid = ${oid}$" data = await sor.sqlExe(sql, {"oid": oid}) if data: return data[0] else: raise f"oid:{oid},get key is null." # 内部回调 async def call_back_inner(processInstanceId, status): """ 内部回调 :param processInstanceId: apv_id :param status: 审批状态 :return: """ # get callback url db = DBPools() async with db.sqlorContext('kboss') as sor: sql = "SELECT b.callback_url FROM apv_data d LEFT JOIN apv_business b ON d.business_id=b.id WHERE d.apv_id=${processInstanceId}$" data = await sor.sqlExe(sql, {"processInstanceId": processInstanceId}) if len(data) == 0: info(f"钉钉内部回调获取失败:apv_id:{processInstanceId}") return 0 data = data[0] url = data.get("callback_url") + f"?apv_id={processInstanceId}&status={status}" # url = "https://dev.kaiyuancloud.cn/account/addledgers.dspy?apv_id=ehV03fARQjqtT69piTFtzw07441693812206&status=agree" async with aiohttp_client.request("GET",url) as res: try: json_data = await res.json() info(f"apv_id:{processInstanceId},回调内部响应:{res.status},resp json:{json_data}") except Exception as e: info(f"apv_id:{processInstanceId},回调内部响应:{res.status},resp:{res.text}") async def get_sender_by_apv_id(apv_id): db = DBPools() async with db.sqlorContext('kboss') as sor: data = await sor.R("apv_data", {"apv_id": apv_id}) if not data: return None else: return data[0]["user_id"] return None async def callback_apv(ns={}): """ 审批回调 :param ns: :return: """ oid = ns.get("oid") if not oid: return "oid is None" data = await get_key(oid) token = password_decode(data["http_token"]) aes_key = password_decode(data["http_aes_key"]) key = password_decode(data["app_key"]) test = DingCallbackCrypto3(token, aes_key, key) # 解密参数 text = test.getDecryptMsg(msg_signature=ns.get("signature"), timeStamp=ns.get("timestamp"), nonce=ns.get("nonce"), content=ns.get("encrypt")) text = json.loads(text) info(f"回调数据:{text}") t = text.get("type", None) if t != "finish": status = t else: status = text.get("result", t) processInstanceId = text.get("processInstanceId",None) # 加密返回 res = test.getEncryptedMap("success") if processInstanceId: # 判断是否是 api 发起 f = await get_sender_by_apv_id(processInstanceId) if not f: info(f"apv_id:{processInstanceId} is not use api") return res # 保存数据 await save_data(apv_id=processInstanceId, finis_hTime=text.get("finishTime"), apv_status=status) try: # 回调内部接口 info(f"开始回调内部接口:{processInstanceId}") await call_back_inner(processInstanceId=processInstanceId, status=status) except Exception as e: info(f"回调内部接口失败:{e}") return res ret = await callback_apv(params_kw) return ret