import asyncio import os import time import base64 import crypt from functools import partial from aiohttp import web, ClientSession, FormData, ClientTimeout from ahserver.configuredServer import add_cleanupctx from ahserver.serverenv import ServerEnv from appPublic.dictObject import DictObject from appPublic.log import debug, exception, error from appPublic.zmqapi import zmq_subcribe, zmq_publish from appPublic.uniqueID import getID from appPublic.jsonConfig import getConfig from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend import base64 import random import struct import hashlib import xml.etree.ElementTree as ET import asyncio import json from typing import Optional, Dict, Any, Union from pathlib import Path from .media import WeChatMediaManager from .reply import ReplyBuilder class WOAHandler: def __init__(self): self.app_id = os.environ.get('WOA_APPID') self.token = os.environ.get('WOA_TOKEN') encoding_aes_key = os.environ.get('WOA_AESKEY') self.secret = os.environ.get('WOA_SECRET', 'test') self.aes_key = None if encoding_aes_key: # 微信的 AES Key 是 base64 编码的 32 字节字符串 self.aes_key = base64.b64decode(encoding_aes_key + "=") self.encoding_aes_key = self.aes_key self.media_manager = WeChatMediaManager(app_id, app_secret) def _check_signature(self, signature: str, timestamp: str, nonce: str) -> bool: """验证签名""" if not signature: return False check_list = sorted([self.token, timestamp, nonce]) check_str = "".join(check_list) hash_code = hashlib.sha1(check_str.encode('utf-8')).hexdigest() return hash_code == signature def _decrypt_msg(self, encrypt_msg: str) -> str: """AES 解密消息体 (CBC 模式)""" if not self.aes_key: return encrypt_msg # 1. Base64 解码 cipher_text = base64.b64decode(encrypt_msg) # 2. AES 解密 iv = self.aes_key[:16] cipher = Cipher(algorithms.AES(self.aes_key), modes.CBC(iv), backend=default_backend()) decryptor = cipher.decryptor() decrypted = decryptor.update(cipher_text) + decryptor.finalize() # 3. 去除填充 (PKCS7) pad_len = decrypted[-1] if pad_len < 1 or pad_len > 32: raise ValueError("Invalid padding") content = decrypted[:-pad_len] # 4. 去除前16字节随机字符串和后4字节长度字符串 (微信协议格式) # 格式: [16字节随机串] + [消息内容] + [4字节网络序长度] + [AppID] # 注意:这里简化处理,通常只需去掉头部随机串和尾部长度+AppID # 实际微信协议:Random(16) + MsgLen(4) + Content + AppID # 但解密后结构通常是:Random(16) + Content + Pad # 让我们重新校准微信的解密逻辑: # 解密后数据 = 16字节随机串 + 4字节内容长度(网络字节序) + 消息内容 + AppID # 修正逻辑: # 1. 去掉前16字节随机数 content_with_len = content[16:] # 2. 读取接下来4字节作为长度 msg_len = struct.unpack(">I", content_with_len[:4])[0] # 3. 截取消息内容 msg_content = content_with_len[4:4+msg_len] return msg_content.decode('utf-8') def _encrypt_msg(self, text: str, nonce: str, timestamp: str) -> str: """AES 加密消息体""" if not self.aes_key: return text # 1. 生成随机16字节 random_bytes = bytes([random.randint(0, 255) for _ in range(16)]) # 2. 获取 AppID (这里假设你有,或者从配置读,加密必须用到 AppID 填充在末尾) # 注意:原生实现需要知道 AppID 才能正确加密,否则微信无法解密 # 为了演示,这里假设有一个 app_id 属性,请自行补充 app_id = "YOUR_APP_ID" # 3. 拼接:Random(16) + Len(4) + Content + AppID content_bytes = text.encode('utf-8') app_id_bytes = app_id.encode('utf-8') len_bytes = struct.pack(">I", len(content_bytes)) raw_data = random_bytes + len_bytes + content_bytes + app_id_bytes # 4. PKCS7 填充 pad_len = 32 - (len(raw_data) % 32) raw_data += bytes([pad_len] * pad_len) # 5. AES 加密 iv = self.aes_key[:16] cipher = Cipher(algorithms.AES(self.aes_key), modes.CBC(iv), backend=default_backend()) encryptor = cipher.encryptor() cipher_text = encryptor.update(raw_data) + encryptor.finalize() # 6. Base64 编码 return base64.b64encode(cipher_text).decode('utf-8') async def handle_get(self, request: web.Request) -> web.Response: """URL 验证""" query = request.query signature = query.get('signature', '') timestamp = query.get('timestamp', '') nonce = query.get('nonce', '') echostr = query.get('echostr', '') if self._check_signature(signature, timestamp, nonce): # 如果开启加密,echostr 也需要解密 if self.encoding_aes_key: try: echostr = self._decrypt_msg(echostr) except Exception as e: error(f"Echostr 解密失败: {e}") return web.Response(text="failed", status=403) return web.Response(text=echostr) else: return web.Response(text="failed", status=403) async def msghandle(self, msg:DictObject): """ """ config = getConfig() if msg.MsgType in ['video', 'voice', 'image']: msg.media_url = self.media_manager.get_media_url(msg.media_id) msg.subscribe_id = getID() await zmq_publish(config.woa_handler_id, json.dumps(msg, ensure_ascii=False)) result = await zmq_subcribe(msg.subscribe_id) rzt_dic = DictObject(**json.loads(result)) if rzt_dic.msgtype in ['video', 'image', 'audio']: rzt_dic.media_id = await self.media_manager.upload_media(rzt_dic.msgtype, rzt_dic.media_filepath) return rzt_dic async def build_reply(self, msg:DictObject, rzt_msg:DictObject): """ ImageReply, VoiceReply, VideoReply, ArticlesReply, MusicReply """ if rzt_msg.msgtype == 'text': reply = ReplyBuilder.text(msg, content=rzt_msg.content) return reply if rzt_msg.msgtype in ['image', 'video', 'voice']: rzt_msg.media_id = await self.upload_media(rzt_msg.msgtype, rzt_msg.media_file) if rzt_msg.msgtype == 'image': reply = ReplyBuilder.image(msg, media_id=rzt_msg.media_id) return reply if rzt_msg.msgtype == 'voice': reply = ReplyBuilder.voice(msg, media_id=rzt_msg.media_id) return reply if rzt_msg.msgtype == 'video': reply = ReplyBuilder.video(msg, title=rzt_msg.title, media_id=rzt_msg.media_id, description=rzt_msg.description) return reply if rzt_msg.msgtype == 'music': reply = ReplyBuilder.music(msg, title=rzt_msg.title, description=rzt_msg.description, music_url=rzt_msg.music_url, # 核心:播放链接 hq_music_url=rzt_msg.hq_music_url, # 核心:高质量链接 thumb_media_id=media_id # 核心:封面图 media_id ) return reply if rzt_msg.msgtype == 'news': reply = ReplyBuilder.news(msg, rzt_msg.articles) return reply async def handle_post(self, request: web.Request) -> web.Response: """接收消息""" query = request.query signature = query.get('signature', '') timestamp = query.get('timestamp', '') nonce = query.get('nonce', '') # 1. 验签 if not self._check_signature(signature, timestamp, nonce): error("签名验证失败") return web.Response(text="failed", status=403) # 2. 读取 Body body = await request.read() if not body: return web.Response(text="success") try: xml_str = body.decode('utf-8') # 3. 如果开启加密,先解密 if self.encoding_aes_key: root = ET.fromstring(xml_str) encrypt_node = root.find('Encrypt') if encrypt_node is not None and encrypt_node.text: xml_str = self._decrypt_msg(encrypt_node.text) else: error("加密模式下未找到 Encrypt 节点") return web.Response(text="success") # 4. 解析 XML 为字典 msg_dict = self._parse_xml(xml_str) debug(f"收到消息: {msg_dict.get('MsgType')} from {msg_dict.get('FromUserName')}") # --- 业务逻辑 --- reply_dic = await self.msghandle(msg_dict) reply_xml = await self.build_reply(msg_dict, reply_dic) # 5. 如果需要加密回复 if self.encoding_aes_key and reply_xml: encrypted_xml_str = self._encrypt_msg(reply_xml, nonce, timestamp) # 构造加密后的返回 XML 包 final_xml = f""" {timestamp} """ return web.Response(text=final_xml, content_type='application/xml') elif reply_xml: return web.Response(text=reply_xml, content_type='application/xml') else: return web.Response(text="success") except Exception as e: exception(f"处理消息异常: {e}") return web.Response(text="success") def _parse_xml(self, xml_str: str) -> Dict[str, Any]: root = ET.fromstring(xml_str) return DictObject(**{child.tag: child.text for child in root}) async def setup_route(woa, app): app.router.add_get('/woa', woa.handle_get) app.router.add_post('/woa', woa.handle_post) yield return def load_woa(): env = ServerEnv() woa = WOAHandler() f = partial(setup_route, woa) add_cleanupctx(f) env.woa_handler = woa