284 lines
9.6 KiB
Python
284 lines
9.6 KiB
Python
import asyncio
|
||
from traceback import format_exc
|
||
import os
|
||
import time
|
||
import base64
|
||
import crypt
|
||
from functools import partial
|
||
from aiohttp import web, ClientSession, FormData, ClientTimeout
|
||
from ahserver.configuredServer import add_cleanupctx
|
||
from ahserver.serverenv import ServerEnv
|
||
from appPublic.dictObject import DictObject
|
||
from appPublic.log import debug, exception, error
|
||
from appPublic.zmqapi import zmq_subcribe, zmq_publish
|
||
from appPublic.uniqueID import getID
|
||
from appPublic.jsonConfig import getConfig
|
||
|
||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||
from cryptography.hazmat.backends import default_backend
|
||
import base64
|
||
import random
|
||
import struct
|
||
import hashlib
|
||
import xml.etree.ElementTree as ET
|
||
import asyncio
|
||
import json
|
||
from typing import Optional, Dict, Any, Union
|
||
from pathlib import Path
|
||
from .media import WeChatMediaManager
|
||
from .reply import ReplyBuilder
|
||
|
||
class WOAHandler:
|
||
def __init__(self):
|
||
self.app_id = os.environ.get('WOA_APPID')
|
||
self.token = os.environ.get('WOA_TOKEN')
|
||
encoding_aes_key = os.environ.get('WOA_AESKEY')
|
||
self.secret = os.environ.get('WOA_SECRET', 'test')
|
||
self.aes_key = None
|
||
if encoding_aes_key:
|
||
# 微信的 AES Key 是 base64 编码的 32 字节字符串
|
||
self.aes_key = base64.b64decode(encoding_aes_key + "=")
|
||
self.encoding_aes_key = self.aes_key
|
||
self.media_manager = WeChatMediaManager(self.app_id, self.secret)
|
||
|
||
def _check_signature(self, signature: str, timestamp: str, nonce: str) -> bool:
|
||
"""验证签名"""
|
||
if not signature:
|
||
return False
|
||
check_list = sorted([self.token, timestamp, nonce])
|
||
check_str = "".join(check_list)
|
||
hash_code = hashlib.sha1(check_str.encode('utf-8')).hexdigest()
|
||
return hash_code == signature
|
||
|
||
def _decrypt_msg(self, encrypt_msg: str) -> str:
|
||
"""AES 解密消息体 (CBC 模式)"""
|
||
if not self.aes_key:
|
||
return encrypt_msg
|
||
|
||
# 1. Base64 解码
|
||
cipher_text = base64.b64decode(encrypt_msg)
|
||
|
||
# 2. AES 解密
|
||
iv = self.aes_key[:16]
|
||
cipher = Cipher(algorithms.AES(self.aes_key), modes.CBC(iv), backend=default_backend())
|
||
decryptor = cipher.decryptor()
|
||
decrypted = decryptor.update(cipher_text) + decryptor.finalize()
|
||
|
||
# 3. 去除填充 (PKCS7)
|
||
pad_len = decrypted[-1]
|
||
if pad_len < 1 or pad_len > 32:
|
||
raise ValueError("Invalid padding")
|
||
content = decrypted[:-pad_len]
|
||
|
||
# 4. 去除前16字节随机字符串和后4字节长度字符串 (微信协议格式)
|
||
# 格式: [16字节随机串] + [消息内容] + [4字节网络序长度] + [AppID]
|
||
# 注意:这里简化处理,通常只需去掉头部随机串和尾部长度+AppID
|
||
# 实际微信协议:Random(16) + MsgLen(4) + Content + AppID
|
||
# 但解密后结构通常是:Random(16) + Content + Pad
|
||
# 让我们重新校准微信的解密逻辑:
|
||
# 解密后数据 = 16字节随机串 + 4字节内容长度(网络字节序) + 消息内容 + AppID
|
||
|
||
# 修正逻辑:
|
||
# 1. 去掉前16字节随机数
|
||
content_with_len = content[16:]
|
||
# 2. 读取接下来4字节作为长度
|
||
msg_len = struct.unpack(">I", content_with_len[:4])[0]
|
||
# 3. 截取消息内容
|
||
msg_content = content_with_len[4:4+msg_len]
|
||
|
||
return msg_content.decode('utf-8')
|
||
|
||
def _encrypt_msg(self, text: str, nonce: str, timestamp: str) -> str:
|
||
"""AES 加密消息体"""
|
||
if not self.aes_key:
|
||
return text
|
||
|
||
# 1. 生成随机16字节
|
||
random_bytes = bytes([random.randint(0, 255) for _ in range(16)])
|
||
# 2. 获取 AppID (这里假设你有,或者从配置读,加密必须用到 AppID 填充在末尾)
|
||
# 注意:原生实现需要知道 AppID 才能正确加密,否则微信无法解密
|
||
# 为了演示,这里假设有一个 app_id 属性,请自行补充
|
||
app_id = self.app_id
|
||
|
||
# 3. 拼接:Random(16) + Len(4) + Content + AppID
|
||
content_bytes = text.encode('utf-8')
|
||
app_id_bytes = app_id.encode('utf-8')
|
||
len_bytes = struct.pack(">I", len(content_bytes))
|
||
|
||
raw_data = random_bytes + len_bytes + content_bytes + app_id_bytes
|
||
|
||
# 4. PKCS7 填充
|
||
pad_len = 32 - (len(raw_data) % 32)
|
||
raw_data += bytes([pad_len] * pad_len)
|
||
|
||
# 5. AES 加密
|
||
iv = self.aes_key[:16]
|
||
cipher = Cipher(algorithms.AES(self.aes_key), modes.CBC(iv), backend=default_backend())
|
||
encryptor = cipher.encryptor()
|
||
cipher_text = encryptor.update(raw_data) + encryptor.finalize()
|
||
|
||
# 6. Base64 编码
|
||
return base64.b64encode(cipher_text).decode('utf-8')
|
||
|
||
async def handle_get(self, request: web.Request) -> web.Response:
|
||
"""URL 验证"""
|
||
query = request.query
|
||
signature = query.get('signature', '')
|
||
timestamp = query.get('timestamp', '')
|
||
nonce = query.get('nonce', '')
|
||
echostr = query.get('echostr', '')
|
||
|
||
if self._check_signature(signature, timestamp, nonce):
|
||
# 如果开启加密,echostr 也需要解密
|
||
debug(f'handle_get():{request.query=}')
|
||
return web.Response(text=echostr)
|
||
if self.encoding_aes_key:
|
||
try:
|
||
echostr = self._decrypt_msg(echostr)
|
||
except Exception as e:
|
||
error(f"Echostr 解密失败: {e}")
|
||
return web.Response(text="failed", status=403)
|
||
return web.Response(text=echostr)
|
||
else:
|
||
return web.Response(text="failed", status=403)
|
||
|
||
async def msghandle(self, msg:DictObject):
|
||
"""
|
||
"""
|
||
config = getConfig()
|
||
if msg.MsgType in ['video', 'voice', 'image']:
|
||
msg.media_url = self.media_manager.get_media_url(msg.media_id)
|
||
msg.subscribe_id = getID()
|
||
rzt_dic = DictObject(**{
|
||
'msgtype':'text',
|
||
'content': '收到'
|
||
})
|
||
return rzt_dic
|
||
await zmq_publish(config.woa_handler_id, json.dumps(msg, ensure_ascii=False))
|
||
result = await zmq_subcribe(msg.subscribe_id)
|
||
rzt_dic = DictObject(**json.loads(result))
|
||
if rzt_dic.msgtype in ['video', 'image', 'audio']:
|
||
rzt_dic.media_id = await self.media_manager.upload_media(rzt_dic.msgtype, rzt_dic.media_filepath)
|
||
return rzt_dic
|
||
|
||
async def build_reply(self, msg:DictObject, rzt_msg:DictObject):
|
||
"""
|
||
ImageReply, VoiceReply, VideoReply,
|
||
ArticlesReply, MusicReply
|
||
"""
|
||
if rzt_msg.msgtype == 'text':
|
||
reply = ReplyBuilder.text(msg, content=rzt_msg.content)
|
||
return reply
|
||
if rzt_msg.msgtype in ['image', 'video', 'voice']:
|
||
rzt_msg.media_id = await self.upload_media(rzt_msg.msgtype,
|
||
rzt_msg.media_file)
|
||
if rzt_msg.msgtype == 'image':
|
||
reply = ReplyBuilder.image(msg, media_id=rzt_msg.media_id)
|
||
return reply
|
||
if rzt_msg.msgtype == 'voice':
|
||
reply = ReplyBuilder.voice(msg, media_id=rzt_msg.media_id)
|
||
return reply
|
||
if rzt_msg.msgtype == 'video':
|
||
reply = ReplyBuilder.video(msg, title=rzt_msg.title,
|
||
media_id=rzt_msg.media_id,
|
||
description=rzt_msg.description)
|
||
return reply
|
||
if rzt_msg.msgtype == 'music':
|
||
reply = ReplyBuilder.music(msg,
|
||
title=rzt_msg.title,
|
||
description=rzt_msg.description,
|
||
music_url=rzt_msg.music_url, # 核心:播放链接
|
||
hq_music_url=rzt_msg.hq_music_url, # 核心:高质量链接
|
||
thumb_media_id=media_id # 核心:封面图 media_id
|
||
)
|
||
return reply
|
||
if rzt_msg.msgtype == 'news':
|
||
reply = ReplyBuilder.news(msg, rzt_msg.articles)
|
||
return reply
|
||
|
||
async def handle_post(self, request: web.Request) -> web.Response:
|
||
"""接收消息"""
|
||
query = request.query
|
||
signature = query.get('signature', '')
|
||
timestamp = query.get('timestamp', '')
|
||
nonce = query.get('nonce', '')
|
||
|
||
# 1. 验签
|
||
if not self._check_signature(signature, timestamp, nonce):
|
||
error("签名验证失败")
|
||
return web.Response(text="failed", status=403)
|
||
|
||
# 2. 读取 Body
|
||
body = await request.read()
|
||
if not body:
|
||
return web.Response(text="success")
|
||
|
||
try:
|
||
xml_str = body.decode('utf-8')
|
||
|
||
# 3. 如果开启加密,先解密
|
||
if self.encoding_aes_key:
|
||
root = ET.fromstring(xml_str)
|
||
encrypt_node = root.find('Encrypt')
|
||
if encrypt_node is not None and encrypt_node.text:
|
||
xml_str = self._decrypt_msg(encrypt_node.text)
|
||
else:
|
||
error("加密模式下未找到 Encrypt 节点")
|
||
return web.Response(text="success")
|
||
|
||
# 4. 解析 XML 为字典
|
||
msg_dict = self._parse_xml(xml_str)
|
||
debug(f"收到消息: {msg_dict}")
|
||
|
||
# --- 业务逻辑 ---
|
||
reply_dic = await self.msghandle(msg_dict)
|
||
debug(f'返回的消息:{reply_dic}')
|
||
reply_xml = await self.build_reply(msg_dict, reply_dic)
|
||
debug(f'收到微信消息:{msg_dict} 返回:{reply_xml}')
|
||
|
||
# 5. 如果需要加密回复
|
||
if self.encoding_aes_key and reply_xml:
|
||
# 1. 生成新的时间戳和随机数 (不要用请求里的旧数据)
|
||
timestamp = str(int(time.time()))
|
||
nonce = ''.join([str(random.randint(0, 9)) for _ in range(10)])
|
||
encrypted_xml_str = self._encrypt_msg(reply_xml, nonce, timestamp)
|
||
# 构造加密后的返回 XML 包
|
||
final_xml = f"""
|
||
<xml>
|
||
<Encrypt><![CDATA[{encrypted_xml_str}]]></Encrypt>
|
||
<MsgSignature><![CDATA[{hashlib.sha1((self.token + timestamp + nonce + encrypted_xml_str).encode()).hexdigest()}]]></MsgSignature>
|
||
<TimeStamp>{timestamp}</TimeStamp>
|
||
<Nonce><![CDATA[{nonce}]]></Nonce>
|
||
</xml>
|
||
"""
|
||
debug(f'加密后的返回:{final_xml}')
|
||
return web.Response(text=final_xml, content_type='application/xml')
|
||
elif reply_xml:
|
||
debug(f'返回未加密:{reply_xml}')
|
||
return web.Response(text=reply_xml, content_type='application/xml')
|
||
else:
|
||
debug(f'仅通知,无信息返回')
|
||
return web.Response(text="success")
|
||
|
||
except Exception as e:
|
||
debug(f"处理消息异常: {e}:{format_exc()}")
|
||
return web.Response(text="success")
|
||
|
||
def _parse_xml(self, xml_str: str) -> Dict[str, Any]:
|
||
root = ET.fromstring(xml_str)
|
||
return DictObject(**{child.tag: child.text for child in root})
|
||
|
||
async def setup_route(woa, app):
|
||
app.router.add_get('/woa', woa.handle_get)
|
||
app.router.add_post('/woa', woa.handle_post)
|
||
yield
|
||
return
|
||
|
||
def load_woa():
|
||
env = ServerEnv()
|
||
woa = WOAHandler()
|
||
f = partial(setup_route, woa)
|
||
add_cleanupctx(f)
|
||
env.woa_handler = woa
|
||
|