kboss/b/apv/callback_apv.dspy
2025-07-16 14:27:17 +08:00

233 lines
8.0 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

class DingCallbackCrypto3:
def __init__(self, token, encodingAesKey, key):
self.encodingAesKey = encodingAesKey
self.key = key
self.token = token
self.aesKey = base64.b64decode(self.encodingAesKey + '=')
## 生成回调处理完成后的success加密数据
def getEncryptedMap(self, content):
encryptContent = self.encrypt(content)
timeStamp = str(int(datetime.datetime.now().timestamp()))
nonce = self.generateRandomKey(16)
sign = self.generateSignature(nonce, timeStamp, self.token, encryptContent)
return {'msg_signature': sign, 'encrypt': encryptContent, 'timeStamp': timeStamp, 'nonce': nonce}
##解密钉钉发送的数据
def getDecryptMsg(self, msg_signature, timeStamp, nonce, content):
"""
解密
:param content:
:return:
"""
sign = self.generateSignature(nonce, timeStamp, self.token, content)
print(sign, msg_signature)
if msg_signature != sign:
raise ValueError('signature check error')
# 对密文BASE64解码
content = base64.decodebytes(content.encode('UTF-8')) ##钉钉返回的消息体
iv = self.aesKey[:16] ##初始向量
aesDecode = AES.new(self.aesKey, AES.MODE_CBC, iv)
decodeRes = aesDecode.decrypt(content)
pad = int(decodeRes[-1])
if pad > 32:
raise ValueError('Input is not padded or padding is corrupt')
decodeRes = decodeRes[:-pad]
l = struct.unpack('!i', decodeRes[16:20])[0]
##获取去除初始向量四位msg长度以及尾部corpid
nl = len(decodeRes)
if decodeRes[(20 + l):].decode() != self.key:
raise ValueError('corpId 钉钉回调校验错误')
return decodeRes[20:(20 + l)].decode()
def encrypt(self, content):
"""
加密
:param content:
:return:
"""
msg_len = self.length(content)
content = ''.join([self.generateRandomKey(16), msg_len.decode(), content, self.key])
contentEncode = self.pks7encode(content)
iv = self.aesKey[:16]
aesEncode = AES.new(self.aesKey, AES.MODE_CBC, iv)
aesEncrypt = aesEncode.encrypt(contentEncode.encode('UTF-8'))
return base64.encodebytes(aesEncrypt).decode('UTF-8')
### 生成回调返回使用的签名值
def generateSignature(self, nonce, timestamp, token, msg_encrypt):
info(f"{type(nonce)=}, {type(timestamp)=}, {type(token)=}, {type(msg_encrypt)=}")
v = msg_encrypt
signList = ''.join(sorted([nonce, timestamp, token, v]))
return hashlib.sha1(signList.encode()).hexdigest()
def length(self, content):
"""
将msg_len转为符合要求的四位字节长度
:param content:
:return:
"""
l = len(content)
return struct.pack('>l', l)
def pks7encode(self, content):
"""
安装 PKCS#7 标准填充字符串
:param text: str
:return: str
"""
l = len(content)
output = io.StringIO()
val = 32 - (l % 32)
for _ in range(val):
output.write('%02x' % val)
# print "pks7encode",content,"pks7encode", val, "pks7encode", output.getvalue()
return content + binascii.unhexlify(output.getvalue()).decode()
def pks7decode(self, content):
nl = len(content)
val = int(binascii.hexlify(content[-1]), 16)
if val > 32:
raise ValueError('Input is not padded or padding is corrupt')
l = nl - val
return content[:l]
def generateRandomKey(self, size,
chars=string.ascii_letters + string.ascii_lowercase + string.ascii_uppercase + string.digits):
"""
生成加密所需要的随机字符串
:param size:
:param chars:
:return:
"""
return ''.join(random.choice(chars) for i in range(size))
async def save_data(apv_id, finis_hTime, apv_status) -> str:
"""
保存审批回调数据
:param apv_id: 审批id
:param finis_hTime: 创建时间
:param apv_status: 流程状态
"""
data = {
"apv_id": apv_id,
"apv_status": apv_status,
"update_at": str(datetime.datetime.now())[:19],
}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
sql = "update apv_data set apv_status = ${apv_status}$ , apv_finish_time=${update_at}$,update_at=${update_at}$ where apv_id = ${apv_id}$"
await sor.sqlExe(sql, data)
data = {
"id": uuid(),
"apv_id": apv_id,
"apv_status": apv_status,
}
await sor.C("apv_status_history", data)
info("数据入库成功")
async def get_key(oid: str) -> str:
db = DBPools()
async with db.sqlorContext('kboss') as sor:
sql = "select http_token,http_aes_key,app_key from apv_key where del_flg = '0' and orgid = ${oid}$"
data = await sor.sqlExe(sql, {"oid": oid})
if data:
return data[0]
else:
raise f"oid:{oid},get key is null."
# 内部回调
async def call_back_inner(processInstanceId, status):
"""
内部回调
:param processInstanceId: apv_id
:param status: 审批状态
:return:
"""
# get callback url
db = DBPools()
async with db.sqlorContext('kboss') as sor:
sql = "SELECT b.callback_url FROM apv_data d LEFT JOIN apv_business b ON d.business_id=b.id WHERE d.apv_id=${processInstanceId}$"
data = await sor.sqlExe(sql, {"processInstanceId": processInstanceId})
if len(data) == 0:
info(f"钉钉内部回调获取失败apv_id:{processInstanceId}")
return 0
data = data[0]
url = data.get("callback_url") + f"?apv_id={processInstanceId}&status={status}"
# url = "https://dev.kaiyuancloud.cn/account/addledgers.dspy?apv_id=ehV03fARQjqtT69piTFtzw07441693812206&status=agree"
async with aiohttp_client.request("GET",url) as res:
try:
json_data = await res.json()
info(f"apv_id:{processInstanceId},回调内部响应:{res.status}resp json:{json_data}")
except Exception as e:
info(f"apv_id:{processInstanceId},回调内部响应:{res.status}resp:{res.text}")
async def get_sender_by_apv_id(apv_id):
db = DBPools()
async with db.sqlorContext('kboss') as sor:
data = await sor.R("apv_data", {"apv_id": apv_id})
if not data:
return None
else:
return data[0]["user_id"]
return None
async def callback_apv(ns={}):
"""
审批回调
:param ns:
:return:
"""
oid = ns.get("oid")
if not oid:
return "oid is None"
data = await get_key(oid)
token = password_decode(data["http_token"])
aes_key = password_decode(data["http_aes_key"])
key = password_decode(data["app_key"])
test = DingCallbackCrypto3(token, aes_key, key)
# 解密参数
text = test.getDecryptMsg(msg_signature=ns.get("signature"), timeStamp=ns.get("timestamp"), nonce=ns.get("nonce"), content=ns.get("encrypt"))
text = json.loads(text)
info(f"回调数据:{text}")
t = text.get("type", None)
if t != "finish":
status = t
else:
status = text.get("result", t)
processInstanceId = text.get("processInstanceId",None)
# 加密返回
res = test.getEncryptedMap("success")
if processInstanceId:
# 判断是否是 api 发起
f = await get_sender_by_apv_id(processInstanceId)
if not f:
info(f"apv_id:{processInstanceId} is not use api")
return res
# 保存数据
await save_data(apv_id=processInstanceId, finis_hTime=text.get("finishTime"), apv_status=status)
try:
# 回调内部接口
info(f"开始回调内部接口:{processInstanceId}")
await call_back_inner(processInstanceId=processInstanceId, status=status)
except Exception as e:
info(f"回调内部接口失败:{e}")
return res
ret = await callback_apv(params_kw)
return ret