2026-03-03 16:14:08 +08:00

303 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
from traceback import format_exc
import os
import time
import base64
import crypt
from functools import partial
from aiohttp import web, ClientSession, FormData, ClientTimeout
from ahserver.configuredServer import add_cleanupctx
from ahserver.serverenv import ServerEnv
from appPublic.dictObject import DictObject
from appPublic.log import debug, exception, error
from appPublic.zmqapi import zmq_subcribe, zmq_publish
from appPublic.uniqueID import getID
from appPublic.jsonConfig import getConfig
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import base64
import random
import struct
import hashlib
import xml.etree.ElementTree as ET
import asyncio
import json
from typing import Optional, Dict, Any, Union
from pathlib import Path
from .media import WeChatMediaManager
from .reply import ReplyBuilder
class WOAHandler:
def __init__(self):
self.app_id = os.environ.get('WOA_APPID')
self.token = os.environ.get('WOA_TOKEN')
encoding_aes_key = os.environ.get('WOA_AESKEY')
self.secret = os.environ.get('WOA_SECRET', 'test')
self.aes_key = None
if encoding_aes_key:
# 微信的 AES Key 是 base64 编码的 32 字节字符串
def add_padding(s):
return s + '=' * (4 - len(s) % 4) if len(s) % 4 != 0 else s
self.aes_key = base64.b64decode(add_padding(encoding_aes_key))
debug(f'{self.app_id=}::{self.token=}::{encoding_aes_key=}::{len(encoding_aes_key)=}')
self.media_manager = WeChatMediaManager(self.app_id, self.secret)
def _signature(self, timestamp: str, nonce: str, encrypt:str=None) -> str:
lst = [self.token, timestamp, nonce]
if encrypt:
lst.append(encrypt)
check_list = sorted(lst)
check_str = "".join(check_list)
hash_code = hashlib.sha1(check_str.encode('utf-8')).hexdigest()
return hash_code
def _check_signature(self, signature: str, timestamp: str, nonce: str) -> bool:
"""验证签名"""
if not signature:
return False
hash_code = self._signature(timestamp, nonce)
return hash_code == signature
def _decrypt_msg(self, encrypt_msg: str) -> str:
"""AES 解密消息体 (CBC 模式)"""
if not self.aes_key:
return encrypt_msg
# 1. Base64 解码
cipher_text = base64.b64decode(encrypt_msg)
# 2. AES 解密
iv = self.aes_key[:16]
cipher = Cipher(algorithms.AES(self.aes_key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
decrypted = decryptor.update(cipher_text) + decryptor.finalize()
# 3. 去除填充 (PKCS7)
pad_len = decrypted[-1]
if pad_len < 1 or pad_len > 32:
raise ValueError("Invalid padding")
content = decrypted[:-pad_len]
# 4. 去除前16字节随机字符串和后4字节长度字符串 (微信协议格式)
# 格式: [16字节随机串] + [消息内容] + [4字节网络序长度] + [AppID]
# 注意:这里简化处理,通常只需去掉头部随机串和尾部长度+AppID
# 实际微信协议Random(16) + MsgLen(4) + Content + AppID
# 但解密后结构通常是Random(16) + Content + Pad
# 让我们重新校准微信的解密逻辑:
# 解密后数据 = 16字节随机串 + 4字节内容长度(网络字节序) + 消息内容 + AppID
# 修正逻辑:
# 1. 去掉前16字节随机数
content_with_len = content[16:]
# 2. 读取接下来4字节作为长度
msg_len = struct.unpack(">I", content_with_len[:4])[0]
# 3. 截取消息内容
msg_content = content_with_len[4:4+msg_len]
return msg_content.decode('utf-8')
def _encrypt_msg(self, text: str, nonce: str, timestamp: str) -> str:
"""AES 加密消息体"""
if not self.aes_key:
return text
# 1. 生成随机16字节
random_bytes = bytes([random.randint(0, 255) for _ in range(16)])
# 2. 获取 AppID (这里假设你有,或者从配置读,加密必须用到 AppID 填充在末尾)
# 注意:原生实现需要知道 AppID 才能正确加密,否则微信无法解密
# 为了演示,这里假设有一个 app_id 属性,请自行补充
app_id = self.app_id
# 3. 拼接Random(16) + Len(4) + Content + AppID
content_bytes = text.encode('utf-8')
app_id_bytes = app_id.encode('utf-8')
len_bytes = struct.pack(">I", len(content_bytes))
raw_data = random_bytes + len_bytes + content_bytes + app_id_bytes
# 4. PKCS7 填充
pad_len = 32 - (len(raw_data) % 32)
raw_data += bytes([pad_len] * pad_len)
# 5. AES 加密
iv = self.aes_key[:16]
cipher = Cipher(algorithms.AES(self.aes_key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
cipher_text = encryptor.update(raw_data) + encryptor.finalize()
# 6. Base64 编码
return base64.b64encode(cipher_text).decode('utf-8')
async def handle_get(self, request: web.Request) -> web.Response:
"""URL 验证"""
query = request.query
signature = query.get('signature', '')
timestamp = query.get('timestamp', '')
nonce = query.get('nonce', '')
echostr = query.get('echostr', '')
if self._check_signature(signature, timestamp, nonce):
# 如果开启加密echostr 也需要解密
debug(f'handle_get():{request.query=}')
return web.Response(text=echostr)
if self.aes_key:
try:
echostr = self._decrypt_msg(echostr)
except Exception as e:
error(f"Echostr 解密失败: {e}")
return web.Response(text="failed", status=403)
return web.Response(text=echostr)
else:
return web.Response(text="failed", status=403)
async def msghandle(self, msg:DictObject):
"""
"""
config = getConfig()
if msg.MsgType in ['video', 'voice', 'image']:
msg.media_url = await self.media_manager.get_media_url(msg.MediaId)
msg.subscribe_id = getID()
# 文本测试, 已通过
"""
rzt_dic = DictObject(**{
'msgtype':'text',
'content': '收到'
})
"""
fs = FileStorage()
rzt_dic = DictObject(**{
"msgtype": "video",
"media_file": fs.realPath('/UiEi7hKqAmU1-jqQEVhZe/5/171/99/17/vidu-1.mp4')
})
return rzt_dic
await zmq_publish(config.woa_handler_id, json.dumps(msg, ensure_ascii=False))
result = await zmq_subcribe(msg.subscribe_id)
rzt_dic = DictObject(**json.loads(result))
if rzt_dic.msgtype in ['video', 'image', 'audio']:
rzt_dic.media_id = await self.media_manager.upload_media(rzt_dic.msgtype, rzt_dic.media_filepath)
return rzt_dic
async def build_reply(self, msg:DictObject, rzt_msg:DictObject, timestamp):
"""
ImageReply, VoiceReply, VideoReply,
ArticlesReply, MusicReply
"""
if rzt_msg.msgtype == 'text':
reply = ReplyBuilder.text(msg, content=rzt_msg.content, create_time=timestamp)
return reply
if rzt_msg.msgtype in ['image', 'video', 'voice']:
rzt_msg.media_id = await self.upload_media(rzt_msg.msgtype,
rzt_msg.media_file)
if rzt_msg.msgtype == 'image':
reply = ReplyBuilder.image(msg, media_id=rzt_msg.media_id, create_time=timestamp)
return reply
if rzt_msg.msgtype == 'voice':
reply = ReplyBuilder.voice(msg, media_id=rzt_msg.media_id, create_time=timestamp)
return reply
if rzt_msg.msgtype == 'video':
reply = ReplyBuilder.video(msg, title=rzt_msg.title,
media_id=rzt_msg.media_id,
description=rzt_msg.description, create_time=timestamp)
return reply
if rzt_msg.msgtype == 'music':
reply = ReplyBuilder.music(msg,
title=rzt_msg.title,
description=rzt_msg.description,
music_url=rzt_msg.music_url, # 核心:播放链接
hq_music_url=rzt_msg.hq_music_url, # 核心:高质量链接
thumb_media_id=media_id # 核心:封面图 media_id
, create_time=timestamp)
return reply
if rzt_msg.msgtype == 'news':
reply = ReplyBuilder.news(msg, rzt_msg.articles, create_time=timestamp)
return reply
async def handle_post(self, request: web.Request) -> web.Response:
"""接收消息"""
query = request.query
signature = query.get('signature', '')
timestamp = query.get('timestamp', '')
nonce = query.get('nonce', '')
# 1. 验签
if not self._check_signature(signature, timestamp, nonce):
error("签名验证失败")
return web.Response(text="failed", status=403)
# 2. 读取 Body
body = await request.read()
if not body:
return web.Response(text="success")
try:
xml_str = body.decode('utf-8')
# 3. 如果开启加密,先解密
if self.aes_key:
root = ET.fromstring(xml_str)
encrypt_node = root.find('Encrypt')
if encrypt_node is not None and encrypt_node.text:
xml_str = self._decrypt_msg(encrypt_node.text)
else:
error("加密模式下未找到 Encrypt 节点")
return web.Response(text="success")
# 4. 解析 XML 为字典
msg_dict = self._parse_xml(xml_str)
debug(f"收到消息: {msg_dict}")
# --- 业务逻辑 ---
reply_dic = await self.msghandle(msg_dict)
debug(f'返回的消息:{reply_dic}')
timestamp = str(int(time.time()))
nonce = ''.join([str(random.randint(0, 9)) for _ in range(10)])
reply_xml = await self.build_reply(msg_dict, reply_dic, timestamp)
debug(f'收到微信消息:{msg_dict} 返回:{reply_xml}')
# 5. 如果需要加密回复
if self.aes_key and reply_xml:
# 1. 生成新的时间戳和随机数 (不要用请求里的旧数据)
encrypted_xml_str = self._encrypt_msg(reply_xml, nonce, timestamp)
try:
decode_data = self._decrypt_msg(encrypted_xml_str)
# NEwxml = base64.b64decode(decode_data)
debug(f'{decode_data=},{reply_xml=}')
except Exception as e:
exception(f'{e}')
# 构造加密后的返回 XML 包
sign = self._signature(timestamp, nonce, encrypt=encrypted_xml_str)
final_xml = f"""
<xml><Encrypt><![CDATA[{encrypted_xml_str}]]></Encrypt><MsgSignature><![CDATA[{sign}]]></MsgSignature><TimeStamp>{timestamp}</TimeStamp><Nonce><![CDATA[{nonce}]]></Nonce></xml>"""
debug(f'加密后的返回:{final_xml}')
return web.Response(text=final_xml, content_type='application/xml')
elif reply_xml:
debug(f'返回未加密:{reply_xml}')
return web.Response(text=reply_xml, content_type='application/xml')
else:
debug(f'仅通知,无信息返回')
return web.Response(text="success")
except Exception as e:
debug(f"处理消息异常: {e}:{format_exc()}")
return web.Response(text="success")
def _parse_xml(self, xml_str: str) -> Dict[str, Any]:
root = ET.fromstring(xml_str)
return DictObject(**{child.tag: child.text for child in root})
async def setup_route(woa, app):
app.router.add_get('/woa', woa.handle_get)
app.router.add_post('/woa', woa.handle_post)
yield
return
def load_woa():
env = ServerEnv()
woa = WOAHandler()
f = partial(setup_route, woa)
add_cleanupctx(f)
env.woa_handler = woa