2026-02-28 22:56:29 +08:00

247 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import time
from aiohttp import web, ClientSession, FormData, ClientTimeout
from wechatpy.crypto import WeChatCrypto
from wechatpy.exceptions import InvalidSignatureException, WeChatClientException
from wechatpy.messages import (
Message, TextMessage, ImageMessage, VoiceMessage, VideoMessage,
LocationMessage, LinkMessage, EventMessage
)
from wechatpy.replies import (
TextReply, ImageReply, VoiceReply, VideoReply,
ArticlesReply, MusicReply
)
from ahserver.configuredServer import add_cleanupctx
from ahserver.serverenv import ServerEnv
from appPublic.log import debug, exception, error
class WOAHandler:
media_types = ['image', 'voice', 'video', 'thumb']
def __init__(self, msg_handler):
self.app_id = os.environ.get('WOA_APPID')
self.token = os.environ.get('WOA_TOKEN')
self.aes_key = os.environ.get('WOA_AESKEY')
self.secret = os.environ.get('WOA_SECRET')
# 初始化加解密组件 (如果未开启加密wechatpy 也能处理,但传入 key 更规范)
self.crypto = WeChatCrypto(token, aes_key, app_id)
self.msg_handler = msg_handler
self.client = WeChatClient(APP_ID, APP_SECRET)
self.client_token = None
self.client_token_expire_in = 0
async def get_client_access_token(self):
tim = time.time()
if tim > self.client_token_expire_in - 20:
f = awaitify(self.client.fetch_access_token)
data = await f()
self.client_token = data['access_token']
self.client_token_expire_in = time.time() + data['expire_in']
return self.client_token
async def upload_media(self, media_type:str, file_path:str) -> str:
"""
upload media file to wechat and return a media id
"""
media_id = ''
access_token = await self.get_client_access_token()
url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={access_token}&type={media_type}"
form = FormData()
# 'media' 是微信接口规定的字段名
with open(file_path, 'rb') as f:
form.add_field('media', f, filename=os.path.basename(file_path))
timeout = ClientTimeout(total=60) # 上传大文件需要更长超时
async with ClientSession(timeout=timeout) as session:
async with session.post(url, data=form) as resp:
result = await resp.json()
if resp.status == 200 and 'media_id' in result:
debug(f"临时素材上传成功: {result['media_id']}")
return result['media_id']
else:
exception(f"{file_path}:上传失败: {result}")
raise Exception(f"WeChat API Error: {result}")
async def media_id2url(self, media_id:str) -> str:
if self.client_token is None:
f = awaitify(self.client.fetch_access_token)
self.client_token = await f()
url = f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={self.client_token}&media_id={media_id}"
return url
async def messagehandler(self, request:web.Request, msg:Message):
"""
call self.msg_handler handle message
"""
dic = {}
if self.msg_handler:
return await self.msg_handler(reqest, msg)
return None
def _verify_signature(self, request):
"""验证签名并解密消息体(如果开启加密)"""
signature = request.query.get('signature', '')
timestamp = request.query.get('timestamp', '')
nonce = request.query.get('nonce', '')
echo_str = request.query.get('echostr', '')
# 1. 验证签名
try:
if echo_str:
# GET 请求:验证 URL
echo_str = self.crypto.check_signature(signature, timestamp, nonce, echo_str)
return True, echo_str
else:
# POST 请求:验证签名 (不返回 echo_str只返回 True)
self.crypto.check_signature(signature, timestamp, nonce)
return True, None
except InvalidSignatureException:
logger.warning("签名验证失败")
return False, None
async def handle_get(self, request: web.Request) -> web.Response:
"""处理微信服务器 URL 验证"""
is_valid, echo_str = self._verify_signature(request)
if is_valid:
debug("URL 验证成功")
return web.Response(text=echo_str)
else:
return web.Response(text="failed", status=403)
async def handle_post(self, request: web.Request) -> web.Response:
"""处理用户消息和事件"""
# 1. 验证签名
is_valid, _ = self._verify_signature(request)
if not is_valid:
return web.Response(text="failed", status=403)
# 2. 读取并解密消息体
try:
body = await request.read()
# decrypt_message 会自动处理解密和 XML 解析,返回 wechatpy 的消息对象
# 如果未开启加密,它也会正常解析 XML
msg = self.crypto.decrypt_message(body)
except Exception as e:
logger.error(f"消息解密或解析失败: {e}")
return web.Response(text="success") # 防止微信重试
debug(f"收到消息类型: {type(msg).__name__}")
# 3. 业务逻辑路由
reply = None
if isinstance(msg, TextMessage):
content = msg.content.strip()
debug(f"用户文本: {content},{msg}")
# --- 模拟多媒体回复逻辑 ---
if content == "图片":
reply = self._get_image_reply(msg)
elif content == "语音":
reply = self._get_voice_reply(msg)
elif content == "视频":
reply = self._get_video_reply(msg)
elif content == "图文":
reply = self._get_articles_reply(msg)
else:
# 默认文本回复
reply = TextReply(message=msg, content=f"收到:{content}\n发送'图片''语音''视频''图文'测试多媒体回复。")
elif isinstance(msg, EventMessage):
if msg.event == 'subscribe':
reply = TextReply(message=msg, content="欢迎关注!发送指令测试功能。")
elif msg.event == 'unsubscribe':
debug(f"用户 {msg.source} 取消关注")
# 取消关注通常不需要回复,或者回复了用户也收不到
return web.Response(text="success")
# 4. 生成响应 XML
if reply:
# render() 方法将回复对象转换为 XML 字符串
response_xml = reply.render()
# 如果开启了加密模式,需要再次加密返回给微信
if self.aes_key:
response_xml = self.crypto.encrypt_message(response_xml, request.query.get('nonce'), request.query.get('timestamp'))
return web.Response(text=response_xml, content_type='application/xml')
return web.Response(text="success")
# --- 多媒体回复构造 helper ---
def _get_image_reply(self, msg):
"""
返回图片消息
注意media_id 必须是通过微信素材接口上传后获得的永久或临时素材 ID
这里仅做演示,实际运行需替换为有效的 media_id
"""
# 在实际生产中,你需要先调用 client.material.add(...) 上传获取 media_id
demo_media_id = "DEMO_IMAGE_MEDIA_ID"
logger.warning(f"演示模式:图片 media_id 为 {demo_media_id},请替换为真实 ID")
return ImageReply(
message=msg,
media_id=demo_media_id
)
def _get_voice_reply(self, msg):
"""返回语音消息"""
demo_media_id = "DEMO_VOICE_MEDIA_ID"
logger.warning(f"演示模式:语音 media_id 为 {demo_media_id}")
return VoiceReply(
message=msg,
media_id=demo_media_id
)
def _get_video_reply(self, msg):
"""返回视频消息"""
demo_media_id = "DEMO_VIDEO_MEDIA_ID"
logger.warning(f"演示模式:视频 media_id 为 {demo_media_id}")
return VideoReply(
message=msg,
media_id=demo_media_id,
title="演示视频",
description="这是一个通过 wechatpy 生成的视频回复"
)
def _get_articles_reply(self, msg):
"""返回图文消息 (多图文)"""
reply = ArticlesReply(message=msg)
# 添加第一篇文章
reply.add_article({
'title': '欢迎使用 WeChatPy',
'description': '这是由 aiohttp + wechatpy 驱动的自动回复示例。',
'image': 'https://img.alicdn.com/tfs/TB1.R77XVY7gK0jSZKpXXbT6VXa-1024-1024.png', # 封面图链接
'url': 'https://wechatpy.readthedocs.io/'
})
# 添加第二篇文章 (可选)
reply.add_article({
'title': 'GitHub 仓库',
'description': '点击访问源代码。',
'image': '', # 可以不填
'url': 'https://github.com/wechatpy/wechatpy'
})
return reply
async def woa_get(request):
pass
async def woa_post(request):
pass
async def setup_route(app):
app.router.add_get('/woa/wechat', woa_get)
app.router.add_post('/woa/wechat', woa_post)
yield
return
def load_woa():
add_cleanupctx(setup_route)