233 lines
8.0 KiB
Plaintext
233 lines
8.0 KiB
Plaintext
class DingCallbackCrypto3:
|
||
def __init__(self, token, encodingAesKey, key):
|
||
self.encodingAesKey = encodingAesKey
|
||
self.key = key
|
||
self.token = token
|
||
self.aesKey = base64.b64decode(self.encodingAesKey + '=')
|
||
|
||
## 生成回调处理完成后的success加密数据
|
||
def getEncryptedMap(self, content):
|
||
encryptContent = self.encrypt(content)
|
||
timeStamp = str(int(datetime.datetime.now().timestamp()))
|
||
nonce = self.generateRandomKey(16)
|
||
sign = self.generateSignature(nonce, timeStamp, self.token, encryptContent)
|
||
return {'msg_signature': sign, 'encrypt': encryptContent, 'timeStamp': timeStamp, 'nonce': nonce}
|
||
|
||
##解密钉钉发送的数据
|
||
def getDecryptMsg(self, msg_signature, timeStamp, nonce, content):
|
||
"""
|
||
解密
|
||
:param content:
|
||
:return:
|
||
"""
|
||
sign = self.generateSignature(nonce, timeStamp, self.token, content)
|
||
print(sign, msg_signature)
|
||
if msg_signature != sign:
|
||
raise ValueError('signature check error')
|
||
# 对密文BASE64解码
|
||
content = base64.decodebytes(content.encode('UTF-8')) ##钉钉返回的消息体
|
||
|
||
iv = self.aesKey[:16] ##初始向量
|
||
aesDecode = AES.new(self.aesKey, AES.MODE_CBC, iv)
|
||
decodeRes = aesDecode.decrypt(content)
|
||
pad = int(decodeRes[-1])
|
||
if pad > 32:
|
||
raise ValueError('Input is not padded or padding is corrupt')
|
||
decodeRes = decodeRes[:-pad]
|
||
l = struct.unpack('!i', decodeRes[16:20])[0]
|
||
##获取去除初始向量,四位msg长度以及尾部corpid
|
||
nl = len(decodeRes)
|
||
|
||
if decodeRes[(20 + l):].decode() != self.key:
|
||
raise ValueError('corpId 钉钉回调校验错误')
|
||
return decodeRes[20:(20 + l)].decode()
|
||
|
||
def encrypt(self, content):
|
||
"""
|
||
加密
|
||
:param content:
|
||
:return:
|
||
"""
|
||
msg_len = self.length(content)
|
||
content = ''.join([self.generateRandomKey(16), msg_len.decode(), content, self.key])
|
||
contentEncode = self.pks7encode(content)
|
||
iv = self.aesKey[:16]
|
||
aesEncode = AES.new(self.aesKey, AES.MODE_CBC, iv)
|
||
aesEncrypt = aesEncode.encrypt(contentEncode.encode('UTF-8'))
|
||
return base64.encodebytes(aesEncrypt).decode('UTF-8')
|
||
|
||
### 生成回调返回使用的签名值
|
||
def generateSignature(self, nonce, timestamp, token, msg_encrypt):
|
||
info(f"{type(nonce)=}, {type(timestamp)=}, {type(token)=}, {type(msg_encrypt)=}")
|
||
v = msg_encrypt
|
||
signList = ''.join(sorted([nonce, timestamp, token, v]))
|
||
return hashlib.sha1(signList.encode()).hexdigest()
|
||
|
||
def length(self, content):
|
||
"""
|
||
将msg_len转为符合要求的四位字节长度
|
||
:param content:
|
||
:return:
|
||
"""
|
||
l = len(content)
|
||
return struct.pack('>l', l)
|
||
|
||
def pks7encode(self, content):
|
||
"""
|
||
安装 PKCS#7 标准填充字符串
|
||
:param text: str
|
||
:return: str
|
||
"""
|
||
l = len(content)
|
||
output = io.StringIO()
|
||
val = 32 - (l % 32)
|
||
for _ in range(val):
|
||
output.write('%02x' % val)
|
||
# print "pks7encode",content,"pks7encode", val, "pks7encode", output.getvalue()
|
||
return content + binascii.unhexlify(output.getvalue()).decode()
|
||
|
||
def pks7decode(self, content):
|
||
nl = len(content)
|
||
val = int(binascii.hexlify(content[-1]), 16)
|
||
if val > 32:
|
||
raise ValueError('Input is not padded or padding is corrupt')
|
||
|
||
l = nl - val
|
||
return content[:l]
|
||
|
||
def generateRandomKey(self, size,
|
||
chars=string.ascii_letters + string.ascii_lowercase + string.ascii_uppercase + string.digits):
|
||
"""
|
||
生成加密所需要的随机字符串
|
||
:param size:
|
||
:param chars:
|
||
:return:
|
||
"""
|
||
return ''.join(random.choice(chars) for i in range(size))
|
||
|
||
|
||
async def save_data(apv_id, finis_hTime, apv_status) -> str:
|
||
"""
|
||
保存审批回调数据
|
||
:param apv_id: 审批id
|
||
:param finis_hTime: 创建时间
|
||
:param apv_status: 流程状态
|
||
"""
|
||
data = {
|
||
"apv_id": apv_id,
|
||
"apv_status": apv_status,
|
||
"update_at": str(datetime.datetime.now())[:19],
|
||
}
|
||
db = DBPools()
|
||
async with db.sqlorContext('kboss') as sor:
|
||
sql = "update apv_data set apv_status = ${apv_status}$ , apv_finish_time=${update_at}$,update_at=${update_at}$ where apv_id = ${apv_id}$"
|
||
await sor.sqlExe(sql, data)
|
||
|
||
data = {
|
||
"id": uuid(),
|
||
"apv_id": apv_id,
|
||
"apv_status": apv_status,
|
||
}
|
||
await sor.C("apv_status_history", data)
|
||
info("数据入库成功")
|
||
|
||
|
||
async def get_key(oid: str) -> str:
|
||
db = DBPools()
|
||
async with db.sqlorContext('kboss') as sor:
|
||
sql = "select http_token,http_aes_key,app_key from apv_key where del_flg = '0' and orgid = ${oid}$"
|
||
data = await sor.sqlExe(sql, {"oid": oid})
|
||
if data:
|
||
return data[0]
|
||
else:
|
||
raise f"oid:{oid},get key is null."
|
||
|
||
|
||
# 内部回调
|
||
async def call_back_inner(processInstanceId, status):
|
||
"""
|
||
内部回调
|
||
:param processInstanceId: apv_id
|
||
:param status: 审批状态
|
||
:return:
|
||
"""
|
||
# get callback url
|
||
db = DBPools()
|
||
async with db.sqlorContext('kboss') as sor:
|
||
sql = "SELECT b.callback_url FROM apv_data d LEFT JOIN apv_business b ON d.business_id=b.id WHERE d.apv_id=${processInstanceId}$"
|
||
data = await sor.sqlExe(sql, {"processInstanceId": processInstanceId})
|
||
if len(data) == 0:
|
||
info(f"钉钉内部回调获取失败:apv_id:{processInstanceId}")
|
||
return 0
|
||
data = data[0]
|
||
url = data.get("callback_url") + f"?apv_id={processInstanceId}&status={status}"
|
||
# url = "https://dev.kaiyuancloud.cn/account/addledgers.dspy?apv_id=ehV03fARQjqtT69piTFtzw07441693812206&status=agree"
|
||
async with aiohttp_client.request("GET",url) as res:
|
||
try:
|
||
json_data = await res.json()
|
||
info(f"apv_id:{processInstanceId},回调内部响应:{res.status},resp json:{json_data}")
|
||
except Exception as e:
|
||
info(f"apv_id:{processInstanceId},回调内部响应:{res.status},resp:{res.text}")
|
||
|
||
async def get_sender_by_apv_id(apv_id):
|
||
db = DBPools()
|
||
async with db.sqlorContext('kboss') as sor:
|
||
data = await sor.R("apv_data", {"apv_id": apv_id})
|
||
if not data:
|
||
return None
|
||
else:
|
||
return data[0]["user_id"]
|
||
return None
|
||
async def callback_apv(ns={}):
|
||
"""
|
||
审批回调
|
||
:param ns:
|
||
:return:
|
||
"""
|
||
oid = ns.get("oid")
|
||
if not oid:
|
||
return "oid is None"
|
||
|
||
data = await get_key(oid)
|
||
|
||
token = password_decode(data["http_token"])
|
||
aes_key = password_decode(data["http_aes_key"])
|
||
key = password_decode(data["app_key"])
|
||
|
||
test = DingCallbackCrypto3(token, aes_key, key)
|
||
|
||
# 解密参数
|
||
text = test.getDecryptMsg(msg_signature=ns.get("signature"), timeStamp=ns.get("timestamp"), nonce=ns.get("nonce"), content=ns.get("encrypt"))
|
||
|
||
text = json.loads(text)
|
||
info(f"回调数据:{text}")
|
||
t = text.get("type", None)
|
||
if t != "finish":
|
||
status = t
|
||
else:
|
||
status = text.get("result", t)
|
||
|
||
processInstanceId = text.get("processInstanceId",None)
|
||
# 加密返回
|
||
res = test.getEncryptedMap("success")
|
||
|
||
if processInstanceId:
|
||
# 判断是否是 api 发起
|
||
f = await get_sender_by_apv_id(processInstanceId)
|
||
if not f:
|
||
info(f"apv_id:{processInstanceId} is not use api")
|
||
return res
|
||
|
||
# 保存数据
|
||
await save_data(apv_id=processInstanceId, finis_hTime=text.get("finishTime"), apv_status=status)
|
||
try:
|
||
# 回调内部接口
|
||
info(f"开始回调内部接口:{processInstanceId}")
|
||
await call_back_inner(processInstanceId=processInstanceId, status=status)
|
||
except Exception as e:
|
||
info(f"回调内部接口失败:{e}")
|
||
return res
|
||
|
||
|
||
ret = await callback_apv(params_kw)
|
||
return ret |