2026-03-02 13:52:31 +08:00

233 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import os
import time
from functools import partial
from aiohttp import web, ClientSession, FormData, ClientTimeout
from wechatpy.client import WeChatClient
from wechatpy.crypto import WeChatCrypto
from wechatpy.exceptions import InvalidSignatureException, WeChatClientException
from wechatpy.messages import (
BaseMessage, TextMessage, ImageMessage, VoiceMessage, VideoMessage,
LocationMessage, LinkMessage
)
from wechatpy.replies import (
TextReply, ImageReply, VoiceReply, VideoReply,
ArticlesReply, MusicReply
)
from ahserver.configuredServer import add_cleanupctx
from ahserver.serverenv import ServerEnv
from appPublic.dictObject import DictObject
from appPublic.log import debug, exception, error
from appPublic.zmqapi import zmq_subcribe, zmq_publish
from appPublic.uniqueID import getID
from appPublic.jsonConfig import getConfig
class WOAHandler:
media_types = ['image', 'voice', 'video', 'thumb']
def __init__(self):
self.app_id = os.environ.get('WOA_APPID')
self.token = os.environ.get('WOA_TOKEN')
self.aes_key = os.environ.get('WOA_AESKEY')
self.secret = os.environ.get('WOA_SECRET', 'test')
# 初始化加解密组件 (如果未开启加密wechatpy 也能处理,但传入 key 更规范)
self.crypto = WeChatCrypto(self.token, self.aes_key, self.app_id)
self.client = WeChatClient(self.app_id, self.secret)
self.client_token = None
self.client_token_expire_in = 0
async def get_client_access_token(self):
tim = time.time()
if tim > self.client_token_expire_in - 20:
f = awaitify(self.client.fetch_access_token)
data = await f()
self.client_token = data['access_token']
self.client_token_expire_in = time.time() + data['expire_in']
return self.client_token
async def upload_media(self, media_type:str, file_path:str) -> str:
"""
upload media file to wechat and return a media id
"""
media_id = ''
access_token = await self.get_client_access_token()
url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={access_token}&type={media_type}"
form = FormData()
# 'media' 是微信接口规定的字段名
with open(file_path, 'rb') as f:
form.add_field('media', f, filename=os.path.basename(file_path))
timeout = ClientTimeout(total=60) # 上传大文件需要更长超时
async with ClientSession(timeout=timeout) as session:
async with session.post(url, data=form) as resp:
result = await resp.json()
if resp.status == 200 and 'media_id' in result:
debug(f"临时素材上传成功: {result['media_id']}")
return result['media_id']
else:
exception(f"{file_path}:上传失败: {result}")
raise Exception(f"WeChat API Error: {result}")
async def media_id2url(self, media_id:str) -> str:
access_token = await self.get_client_access_token()
url = f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={access_token}&media_id={media_id}"
return url
async def messagehandler(self, request:web.Request, msg:BaseMessage):
"""
TextMessage, ImageMessage, VoiceMessage, VideoMessage,
LocationMessage, LinkMessage, EventMessage
"""
dic = DictObject()
dic.received_at = time.time()
dic.openid = msg.source
dic.msgtype = msg.type
dic.subscribe_id = getID()
if msg.type == 'text':
dic.content = msg.content
if msg.type in ['video', 'voice', 'image']:
dic.media_url = await self.media_id2url(msg.media_id)
if msg.type == 'link':
dic.title = msg.title
dic.url = msg.url
dic.description = msg.description
if msg.type == 'location':
dic.location = {
'latitude': msg.location_x,
'longitude': msg.location_y,
'label': msg.label
}
if msg.type == 'event':
dic.event = msg.event
config = getConfig()
await zmq_publish(config.woa_handler_id, json.dumps(dic, ensure_ascii=False))
result = await zmq_subcribe(dic.subscribe_id)
rzt_dic = DictObject(**json.loads(result))
if rzt_dic.msgtype in ['video', 'image', 'audio']:
dic.media_id = await self.upload_media(rzt_dic.msgtype, rzt_dic.media_filepath)
return rzt_dic
async def build_reply(self, msg:BaseMessage, rzt_msg:DictObject):
"""
ImageReply, VoiceReply, VideoReply,
ArticlesReply, MusicReply
"""
if rzt_msg.msgtype == 'text':
reply = TextReply(message=msg, content=rzt_msg.content)
return reply
if rzt_msg.msgtype in ['image', 'video', 'voice']:
media_id = await self.upload_media(rzt_msg.msgtype,
rzt_msg.media_file)
if rzt_msg.msgtype == 'image':
reply = ImageReply(message=msg, media_id=media_id)
return reply
if rzt_msg.msgtype == 'voice':
reply = VoiceReply(message=msg, media_id=media_id)
return reply
if rzt_msg.msgtype == 'video':
reply = VideoReply(message=msg, title=rzt_msg.title,
media_id=media_id,
description=rzt_msg.description)
return reply
if rzt_msg.msgtype == 'music':
reply = MusicReply(
message=msg,
title=rzt_msg.title,
description=rzt_msg.description,
music_url=rzt_msg.music_url, # 核心:播放链接
hq_music_url=rzt_msg.hq_music_url, # 核心:高质量链接
thumb_media_id=media_id # 核心:封面图 media_id
)
return reply
if rzt_msg.msgtype == 'article':
reply = ArticlesReply(message=msg)
for article in rzt_msg.articles:
reply.add_article({
'title': article.title,
'description': article.description,
'image': article.image_url,
'url': article.url
})
return reply
def _verify_signature(self, request):
"""验证签名并解密消息体(如果开启加密)"""
signature = request.query.get('signature', '')
timestamp = request.query.get('timestamp', '')
nonce = request.query.get('nonce', '')
echo_str = request.query.get('echostr', '')
debug(f'{request.query=}')
# 1. 验证签名
try:
echo_str = self.crypto._check_signature(signature, timestamp, nonce, echo_str)
return True, echo_str
except InvalidSignatureException:
logger.warning("签名验证失败")
return False, None
async def handle_get(self, request: web.Request) -> web.Response:
"""处理微信服务器 URL 验证"""
is_valid, echo_str = self._verify_signature(request)
if is_valid:
debug("URL 验证成功")
return web.Response(text=echo_str)
else:
return web.Response(text="failed", status=403)
async def handle_post(self, request: web.Request) -> web.Response:
"""处理用户消息和事件"""
# 1. 验证签名
query = request.query
signature = query.get('signature', '')
timestamp = query.get('timestamp', '')
nonce = query.get('nonce', '')
is_valid, _ = self._verify_signature(request)
if not is_valid:
return web.Response(text="failed", status=403)
# 2. 读取并解密消息体
body=''
try:
body = await request.read()
# decrypt_message 会自动处理解密和 XML 解析,返回 wechatpy 的消息对象
# 如果未开启加密,它也会正常解析 XML
msg = self.crypto.decrypt_message(body,
signature=signature,
timestamp=timestamp,
nonce=nonce)
except Exception as e:
error(f"消息解密或解析失败: {e}, {body=}")
return web.Response(text="success") # 防止微信重试
debug(f"收到消息类型: {type(msg).__name__}")
# 3. 业务逻辑路由
reply = None
rzt_msg = await messagehandler(request, msg)
await self.build_reply(msg, rzt_msg)
# render() 方法将回复对象转换为 XML 字符串
response_xml = reply.render()
# 如果开启了加密模式,需要再次加密返回给微信
if self.aes_key:
response_xml = self.crypto.encrypt_message(response_xml,
request.query.get('nonce'),
request.query.get('timestamp'))
return web.Response(text=response_xml, content_type='application/xml')
async def setup_route(woa, app):
app.router.add_get('/woa', woa.handle_get)
app.router.add_post('/woa', woa.handle_post)
yield
return
def load_woa():
env = ServerEnv()
woa = WOAHandler()
f = partial(setup_route, woa)
add_cleanupctx(f)
env.woa_handler = woa