diff --git a/woa/init.py b/woa/init.py new file mode 100644 index 0000000..e4163da --- /dev/null +++ b/woa/init.py @@ -0,0 +1,246 @@ +import asyncio +import time +from aiohttp import web, ClientSession, FormData, ClientTimeout +from wechatpy.crypto import WeChatCrypto +from wechatpy.exceptions import InvalidSignatureException, WeChatClientException +from wechatpy.messages import ( + Message, TextMessage, ImageMessage, VoiceMessage, VideoMessage, + LocationMessage, LinkMessage, EventMessage +) +from wechatpy.replies import ( + TextReply, ImageReply, VoiceReply, VideoReply, + ArticlesReply, MusicReply +) +from ahserver.configuredServer import add_cleanupctx +from ahserver.serverenv import ServerEnv +from appPublic.log import debug, exception, error + +class WOAHandler: + media_types = ['image', 'voice', 'video', 'thumb'] + def __init__(self, msg_handler): + self.app_id = os.environ.get('WOA_APPID') + self.token = os.environ.get('WOA_TOKEN') + self.aes_key = os.environ.get('WOA_AESKEY') + self.secret = os.environ.get('WOA_SECRET') + # 初始化加解密组件 (如果未开启加密,wechatpy 也能处理,但传入 key 更规范) + self.crypto = WeChatCrypto(token, aes_key, app_id) + self.msg_handler = msg_handler + self.client = WeChatClient(APP_ID, APP_SECRET) + self.client_token = None + self.client_token_expire_in = 0 + + async def get_client_access_token(self): + tim = time.time() + if tim > self.client_token_expire_in - 20: + f = awaitify(self.client.fetch_access_token) + data = await f() + self.client_token = data['access_token'] + self.client_token_expire_in = time.time() + data['expire_in'] + return self.client_token + + async def upload_media(self, media_type:str, file_path:str) -> str: + """ + upload media file to wechat and return a media id + """ + media_id = '' + access_token = await self.get_client_access_token() + url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={access_token}&type={media_type}" + + form = FormData() + # 'media' 是微信接口规定的字段名 + with open(file_path, 'rb') as f: + form.add_field('media', f, filename=os.path.basename(file_path)) + + timeout = ClientTimeout(total=60) # 上传大文件需要更长超时 + + async with ClientSession(timeout=timeout) as session: + async with session.post(url, data=form) as resp: + result = await resp.json() + + if resp.status == 200 and 'media_id' in result: + debug(f"临时素材上传成功: {result['media_id']}") + return result['media_id'] + else: + exception(f"{file_path}:上传失败: {result}") + raise Exception(f"WeChat API Error: {result}") + + async def media_id2url(self, media_id:str) -> str: + if self.client_token is None: + f = awaitify(self.client.fetch_access_token) + self.client_token = await f() + url = f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={self.client_token}&media_id={media_id}" + return url + + async def messagehandler(self, request:web.Request, msg:Message): + """ + + call self.msg_handler handle message + """ + dic = {} + + if self.msg_handler: + return await self.msg_handler(reqest, msg) + return None + + def _verify_signature(self, request): + """验证签名并解密消息体(如果开启加密)""" + signature = request.query.get('signature', '') + timestamp = request.query.get('timestamp', '') + nonce = request.query.get('nonce', '') + echo_str = request.query.get('echostr', '') + + # 1. 验证签名 + try: + if echo_str: + # GET 请求:验证 URL + echo_str = self.crypto.check_signature(signature, timestamp, nonce, echo_str) + return True, echo_str + else: + # POST 请求:验证签名 (不返回 echo_str,只返回 True) + self.crypto.check_signature(signature, timestamp, nonce) + return True, None + except InvalidSignatureException: + logger.warning("签名验证失败") + return False, None + + async def handle_get(self, request: web.Request) -> web.Response: + """处理微信服务器 URL 验证""" + is_valid, echo_str = self._verify_signature(request) + if is_valid: + debug("URL 验证成功") + return web.Response(text=echo_str) + else: + return web.Response(text="failed", status=403) + + async def handle_post(self, request: web.Request) -> web.Response: + """处理用户消息和事件""" + # 1. 验证签名 + is_valid, _ = self._verify_signature(request) + if not is_valid: + return web.Response(text="failed", status=403) + + # 2. 读取并解密消息体 + try: + body = await request.read() + # decrypt_message 会自动处理解密和 XML 解析,返回 wechatpy 的消息对象 + # 如果未开启加密,它也会正常解析 XML + msg = self.crypto.decrypt_message(body) + except Exception as e: + logger.error(f"消息解密或解析失败: {e}") + return web.Response(text="success") # 防止微信重试 + + debug(f"收到消息类型: {type(msg).__name__}") + + # 3. 业务逻辑路由 + reply = None + + if isinstance(msg, TextMessage): + content = msg.content.strip() + debug(f"用户文本: {content},{msg}") + + # --- 模拟多媒体回复逻辑 --- + if content == "图片": + reply = self._get_image_reply(msg) + elif content == "语音": + reply = self._get_voice_reply(msg) + elif content == "视频": + reply = self._get_video_reply(msg) + elif content == "图文": + reply = self._get_articles_reply(msg) + else: + # 默认文本回复 + reply = TextReply(message=msg, content=f"收到:{content}\n发送'图片'、'语音'、'视频'或'图文'测试多媒体回复。") + + elif isinstance(msg, EventMessage): + if msg.event == 'subscribe': + reply = TextReply(message=msg, content="欢迎关注!发送指令测试功能。") + elif msg.event == 'unsubscribe': + debug(f"用户 {msg.source} 取消关注") + # 取消关注通常不需要回复,或者回复了用户也收不到 + return web.Response(text="success") + + # 4. 生成响应 XML + if reply: + # render() 方法将回复对象转换为 XML 字符串 + response_xml = reply.render() + # 如果开启了加密模式,需要再次加密返回给微信 + if self.aes_key: + response_xml = self.crypto.encrypt_message(response_xml, request.query.get('nonce'), request.query.get('timestamp')) + + return web.Response(text=response_xml, content_type='application/xml') + + return web.Response(text="success") + + # --- 多媒体回复构造 helper --- + + def _get_image_reply(self, msg): + """ + 返回图片消息 + 注意:media_id 必须是通过微信素材接口上传后获得的永久或临时素材 ID + 这里仅做演示,实际运行需替换为有效的 media_id + """ + # 在实际生产中,你需要先调用 client.material.add(...) 上传获取 media_id + demo_media_id = "DEMO_IMAGE_MEDIA_ID" + logger.warning(f"演示模式:图片 media_id 为 {demo_media_id},请替换为真实 ID") + + return ImageReply( + message=msg, + media_id=demo_media_id + ) + + def _get_voice_reply(self, msg): + """返回语音消息""" + demo_media_id = "DEMO_VOICE_MEDIA_ID" + logger.warning(f"演示模式:语音 media_id 为 {demo_media_id}") + return VoiceReply( + message=msg, + media_id=demo_media_id + ) + + def _get_video_reply(self, msg): + """返回视频消息""" + demo_media_id = "DEMO_VIDEO_MEDIA_ID" + logger.warning(f"演示模式:视频 media_id 为 {demo_media_id}") + return VideoReply( + message=msg, + media_id=demo_media_id, + title="演示视频", + description="这是一个通过 wechatpy 生成的视频回复" + ) + + def _get_articles_reply(self, msg): + """返回图文消息 (多图文)""" + reply = ArticlesReply(message=msg) + + # 添加第一篇文章 + reply.add_article({ + 'title': '欢迎使用 WeChatPy', + 'description': '这是由 aiohttp + wechatpy 驱动的自动回复示例。', + 'image': 'https://img.alicdn.com/tfs/TB1.R77XVY7gK0jSZKpXXbT6VXa-1024-1024.png', # 封面图链接 + 'url': 'https://wechatpy.readthedocs.io/' + }) + + # 添加第二篇文章 (可选) + reply.add_article({ + 'title': 'GitHub 仓库', + 'description': '点击访问源代码。', + 'image': '', # 可以不填 + 'url': 'https://github.com/wechatpy/wechatpy' + }) + + return reply + +async def woa_get(request): + pass + +async def woa_post(request): + pass +async def setup_route(app): + app.router.add_get('/woa/wechat', woa_get) + app.router.add_post('/woa/wechat', woa_post) + yield + return + +def load_woa(): + add_cleanupctx(setup_route) +