import asyncio import time from aiohttp import web, ClientSession, FormData, ClientTimeout from wechatpy.crypto import WeChatCrypto from wechatpy.exceptions import InvalidSignatureException, WeChatClientException from wechatpy.messages import ( Message, TextMessage, ImageMessage, VoiceMessage, VideoMessage, LocationMessage, LinkMessage, EventMessage ) from wechatpy.replies import ( TextReply, ImageReply, VoiceReply, VideoReply, ArticlesReply, MusicReply ) from ahserver.configuredServer import add_cleanupctx from ahserver.serverenv import ServerEnv from appPublic.dictObject import DictObject from appPublic.log import debug, exception, error from appPublic.zmqapi import zmq_subcribe, zmq_publish from appPublic.uniqueID import getID from appPublic.jsonConfig import getConfig class WOAHandler: media_types = ['image', 'voice', 'video', 'thumb'] def __init__(self): self.app_id = os.environ.get('WOA_APPID') self.token = os.environ.get('WOA_TOKEN') self.aes_key = os.environ.get('WOA_AESKEY') self.secret = os.environ.get('WOA_SECRET') # 初始化加解密组件 (如果未开启加密,wechatpy 也能处理,但传入 key 更规范) self.crypto = WeChatCrypto(token, aes_key, app_id) self.client = WeChatClient(APP_ID, APP_SECRET) self.client_token = None self.client_token_expire_in = 0 async def get_client_access_token(self): tim = time.time() if tim > self.client_token_expire_in - 20: f = awaitify(self.client.fetch_access_token) data = await f() self.client_token = data['access_token'] self.client_token_expire_in = time.time() + data['expire_in'] return self.client_token async def upload_media(self, media_type:str, file_path:str) -> str: """ upload media file to wechat and return a media id """ media_id = '' access_token = await self.get_client_access_token() url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={access_token}&type={media_type}" form = FormData() # 'media' 是微信接口规定的字段名 with open(file_path, 'rb') as f: form.add_field('media', f, filename=os.path.basename(file_path)) timeout = ClientTimeout(total=60) # 上传大文件需要更长超时 async with ClientSession(timeout=timeout) as session: async with session.post(url, data=form) as resp: result = await resp.json() if resp.status == 200 and 'media_id' in result: debug(f"临时素材上传成功: {result['media_id']}") return result['media_id'] else: exception(f"{file_path}:上传失败: {result}") raise Exception(f"WeChat API Error: {result}") async def media_id2url(self, media_id:str) -> str: access_token = await self.get_client_access_token() url = f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={access_token}&media_id={media_id}" return url async def messagehandler(self, request:web.Request, msg:Message): """ TextMessage, ImageMessage, VoiceMessage, VideoMessage, LocationMessage, LinkMessage, EventMessage """ dic = DictObject() dic.received_at = time.time() dic.openid = msg.source dic.msgtype = msg.type dic.subscribe_id = getID() if msg.type == 'text': dic.content = msg.content if msg.type in ['video', 'voice', 'image']: dic.media_url = await self.media_id2url(msg.media_id) if msg.type == 'link': dic.title = msg.title dic.url = msg.url dic.description = msg.description if msg.type == 'location': dic.location = { 'latitude': msg.location_x, 'longitude': msg.location_y, 'label': msg.label } if msg.type == 'event': dic.event = msg.event config = getConfig() await zmq_publish(config.woa_handler_id, json.dumps(dic, ensure_ascii=False)) result = await zmq_subcribe(dic.subscribe_id) rzt_dic = DictObject(**json.loads(result)) if rzt_dic.msgtype in ['video', 'image', 'audio']: dic.media_id = await self.upload_media(rzt_dic.msgtype, rzt_dic.media_filepath) return rzt_dic def build_reply(self, msg:Mesage, rzt_msg:DictObject): """ ImageReply, VoiceReply, VideoReply, ArticlesReply, MusicReply """ if rzt_msg.msgtype == 'text': reply = TextReply(message=msg, content=rzt_msg.content) return reply if rzt_msg.msgtype in ['image', 'video', 'voice']: media_id = await self.upload_media(rzt_msg.msgtype, rzt_msg.media_file) if rzt_msg.msgtype == 'image': reply = ImageReply(message=msg, media_id=media_id) return reply if rzt_msg.msgtype == 'voice': reply = VoiceMessage(message=msg, media_id=media_id) return reply if rzt_msg.msgtype == 'video': reply = VideoReply(message=msg, title=rzt_msg.title, media_id=media_id, description=rzt_msg.description) return reply if rzt_msg.msgtype == 'music': reply = MusicReply( message=msg, title=rzt_msg.title, description=rzt_msg.description, music_url=rzt_msg.music_url, # 核心:播放链接 hq_music_url=rzt_msg.hq_music_url, # 核心:高质量链接 thumb_media_id=media_id # 核心:封面图 media_id ) return reply if rzt_msg.msgtype == 'article': reply = ArticlesReply(message=msg) for article in rzt_msg.articles: reply.add_article({ 'title': article.title, 'description': article.description, 'image': article.image_url, 'url': article.url }) return reply def _verify_signature(self, request): """验证签名并解密消息体(如果开启加密)""" signature = request.query.get('signature', '') timestamp = request.query.get('timestamp', '') nonce = request.query.get('nonce', '') echo_str = request.query.get('echostr', '') # 1. 验证签名 try: if echo_str: # GET 请求:验证 URL echo_str = self.crypto.check_signature(signature, timestamp, nonce, echo_str) return True, echo_str else: # POST 请求:验证签名 (不返回 echo_str,只返回 True) self.crypto.check_signature(signature, timestamp, nonce) return True, None except InvalidSignatureException: logger.warning("签名验证失败") return False, None async def handle_get(self, request: web.Request) -> web.Response: """处理微信服务器 URL 验证""" is_valid, echo_str = self._verify_signature(request) if is_valid: debug("URL 验证成功") return web.Response(text=echo_str) else: return web.Response(text="failed", status=403) async def handle_post(self, request: web.Request) -> web.Response: """处理用户消息和事件""" # 1. 验证签名 is_valid, _ = self._verify_signature(request) if not is_valid: return web.Response(text="failed", status=403) # 2. 读取并解密消息体 try: body = await request.read() # decrypt_message 会自动处理解密和 XML 解析,返回 wechatpy 的消息对象 # 如果未开启加密,它也会正常解析 XML msg = self.crypto.decrypt_message(body) except Exception as e: logger.error(f"消息解密或解析失败: {e}") return web.Response(text="success") # 防止微信重试 debug(f"收到消息类型: {type(msg).__name__}") # 3. 业务逻辑路由 reply = None rzt_msg = await messagehandler(request, msg) await self.build_reply(msg, rzt_msg) # render() 方法将回复对象转换为 XML 字符串 response_xml = reply.render() # 如果开启了加密模式,需要再次加密返回给微信 if self.aes_key: response_xml = self.crypto.encrypt_message(response_xml, request.query.get('nonce'), request.query.get('timestamp')) return web.Response(text=response_xml, content_type='application/xml') async def setup_route(woa, app): app.router.add_get('/woa/wechat', woa.handle_get) app.router.add_post('/woa/wechat', woa.handle_post) yield return def load_woa(): env = ServerEnv() woa = WOAHandler() f = partial(setup_route, woa) add_cleanupctx(f) env.woa_handler = woa