import asyncio import os import time from functools import partial from aiohttp import web, ClientSession, FormData, ClientTimeout from wechatpy.client import WeChatClient from wechatpy.crypto import WeChatCrypto from wechatpy.exceptions import InvalidSignatureException, WeChatClientException from wechatpy.messages import ( BaseMessage, TextMessage, ImageMessage, VoiceMessage, VideoMessage, LocationMessage, LinkMessage ) from wechatpy.replies import ( TextReply, ImageReply, VoiceReply, VideoReply, ArticlesReply, MusicReply ) from ahserver.configuredServer import add_cleanupctx from ahserver.serverenv import ServerEnv from appPublic.dictObject import DictObject from appPublic.log import debug, exception, error from appPublic.zmqapi import zmq_subcribe, zmq_publish from appPublic.uniqueID import getID from appPublic.jsonConfig import getConfig class WOAHandler: media_types = ['image', 'voice', 'video', 'thumb'] def __init__(self): self.app_id = os.environ.get('WOA_APPID') self.token = os.environ.get('WOA_TOKEN') self.aes_key = os.environ.get('WOA_AESKEY') self.secret = os.environ.get('WOA_SECRET', 'test') # 初始化加解密组件 (如果未开启加密,wechatpy 也能处理,但传入 key 更规范) self.crypto = WeChatCrypto(self.token, self.aes_key, self.app_id) self.client = WeChatClient(self.app_id, self.secret) self.client_token = None self.client_token_expire_in = 0 async def get_client_access_token(self): tim = time.time() if tim > self.client_token_expire_in - 20: f = awaitify(self.client.fetch_access_token) data = await f() self.client_token = data['access_token'] self.client_token_expire_in = time.time() + data['expire_in'] return self.client_token async def upload_media(self, media_type:str, file_path:str) -> str: """ upload media file to wechat and return a media id """ media_id = '' access_token = await self.get_client_access_token() url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={access_token}&type={media_type}" form = FormData() # 'media' 是微信接口规定的字段名 with open(file_path, 'rb') as f: form.add_field('media', f, filename=os.path.basename(file_path)) timeout = ClientTimeout(total=60) # 上传大文件需要更长超时 async with ClientSession(timeout=timeout) as session: async with session.post(url, data=form) as resp: result = await resp.json() if resp.status == 200 and 'media_id' in result: debug(f"临时素材上传成功: {result['media_id']}") return result['media_id'] else: exception(f"{file_path}:上传失败: {result}") raise Exception(f"WeChat API Error: {result}") async def media_id2url(self, media_id:str) -> str: access_token = await self.get_client_access_token() url = f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={access_token}&media_id={media_id}" return url async def messagehandler(self, request:web.Request, msg:BaseMessage): """ TextMessage, ImageMessage, VoiceMessage, VideoMessage, LocationMessage, LinkMessage, EventMessage """ dic = DictObject() dic.received_at = time.time() dic.openid = msg.source dic.msgtype = msg.type dic.subscribe_id = getID() if msg.type == 'text': dic.content = msg.content if msg.type in ['video', 'voice', 'image']: dic.media_url = await self.media_id2url(msg.media_id) if msg.type == 'link': dic.title = msg.title dic.url = msg.url dic.description = msg.description if msg.type == 'location': dic.location = { 'latitude': msg.location_x, 'longitude': msg.location_y, 'label': msg.label } if msg.type == 'event': dic.event = msg.event config = getConfig() await zmq_publish(config.woa_handler_id, json.dumps(dic, ensure_ascii=False)) result = await zmq_subcribe(dic.subscribe_id) rzt_dic = DictObject(**json.loads(result)) if rzt_dic.msgtype in ['video', 'image', 'audio']: dic.media_id = await self.upload_media(rzt_dic.msgtype, rzt_dic.media_filepath) return rzt_dic async def build_reply(self, msg:BaseMessage, rzt_msg:DictObject): """ ImageReply, VoiceReply, VideoReply, ArticlesReply, MusicReply """ if rzt_msg.msgtype == 'text': reply = TextReply(message=msg, content=rzt_msg.content) return reply if rzt_msg.msgtype in ['image', 'video', 'voice']: media_id = await self.upload_media(rzt_msg.msgtype, rzt_msg.media_file) if rzt_msg.msgtype == 'image': reply = ImageReply(message=msg, media_id=media_id) return reply if rzt_msg.msgtype == 'voice': reply = VoiceReply(message=msg, media_id=media_id) return reply if rzt_msg.msgtype == 'video': reply = VideoReply(message=msg, title=rzt_msg.title, media_id=media_id, description=rzt_msg.description) return reply if rzt_msg.msgtype == 'music': reply = MusicReply( message=msg, title=rzt_msg.title, description=rzt_msg.description, music_url=rzt_msg.music_url, # 核心:播放链接 hq_music_url=rzt_msg.hq_music_url, # 核心:高质量链接 thumb_media_id=media_id # 核心:封面图 media_id ) return reply if rzt_msg.msgtype == 'article': reply = ArticlesReply(message=msg) for article in rzt_msg.articles: reply.add_article({ 'title': article.title, 'description': article.description, 'image': article.image_url, 'url': article.url }) return reply def _verify_signature(self, request): """验证签名并解密消息体(如果开启加密)""" signature = request.query.get('signature', '') timestamp = request.query.get('timestamp', '') nonce = request.query.get('nonce', '') echo_str = request.query.get('echostr', '') debug(f'{request.query=}') # 1. 验证签名 try: echo_str = self.crypto._check_signature(signature, timestamp, nonce, echo_str) return True, echo_str except InvalidSignatureException: logger.warning("签名验证失败") return False, None async def handle_get(self, request: web.Request) -> web.Response: """处理微信服务器 URL 验证""" is_valid, echo_str = self._verify_signature(request) if is_valid: debug("URL 验证成功") return web.Response(text=echo_str) else: return web.Response(text="failed", status=403) async def handle_post(self, request: web.Request) -> web.Response: """处理用户消息和事件""" # 1. 验证签名 is_valid, _ = self._verify_signature(request) if not is_valid: return web.Response(text="failed", status=403) # 2. 读取并解密消息体 try: body = await request.read() # decrypt_message 会自动处理解密和 XML 解析,返回 wechatpy 的消息对象 # 如果未开启加密,它也会正常解析 XML msg = self.crypto.decrypt_message(body) except Exception as e: logger.error(f"消息解密或解析失败: {e}") return web.Response(text="success") # 防止微信重试 debug(f"收到消息类型: {type(msg).__name__}") # 3. 业务逻辑路由 reply = None rzt_msg = await messagehandler(request, msg) await self.build_reply(msg, rzt_msg) # render() 方法将回复对象转换为 XML 字符串 response_xml = reply.render() # 如果开启了加密模式,需要再次加密返回给微信 if self.aes_key: response_xml = self.crypto.encrypt_message(response_xml, request.query.get('nonce'), request.query.get('timestamp')) return web.Response(text=response_xml, content_type='application/xml') async def setup_route(woa, app): app.router.add_get('/woa', woa.handle_get) app.router.add_post('/woa', woa.handle_post) yield return def load_woa(): env = ServerEnv() woa = WOAHandler() f = partial(setup_route, woa) add_cleanupctx(f) env.woa_handler = woa