230 lines
7.7 KiB
Python
230 lines
7.7 KiB
Python
import asyncio
|
||
import os
|
||
import time
|
||
from aiohttp import web, ClientSession, FormData, ClientTimeout
|
||
from wechatpy.client import WeChatClient
|
||
from wechatpy.crypto import WeChatCrypto
|
||
from wechatpy.exceptions import InvalidSignatureException, WeChatClientException
|
||
from wechatpy.messages import (
|
||
BaseMessage, TextMessage, ImageMessage, VoiceMessage, VideoMessage,
|
||
LocationMessage, LinkMessage
|
||
)
|
||
from wechatpy.replies import (
|
||
TextReply, ImageReply, VoiceReply, VideoReply,
|
||
ArticlesReply, MusicReply
|
||
)
|
||
from ahserver.configuredServer import add_cleanupctx
|
||
from ahserver.serverenv import ServerEnv
|
||
from appPublic.dictObject import DictObject
|
||
from appPublic.log import debug, exception, error
|
||
from appPublic.zmqapi import zmq_subcribe, zmq_publish
|
||
from appPublic.uniqueID import getID
|
||
from appPublic.jsonConfig import getConfig
|
||
|
||
class WOAHandler:
|
||
media_types = ['image', 'voice', 'video', 'thumb']
|
||
def __init__(self):
|
||
self.app_id = os.environ.get('WOA_APPID')
|
||
self.token = os.environ.get('WOA_TOKEN')
|
||
self.aes_key = os.environ.get('WOA_AESKEY')
|
||
self.secret = os.environ.get('WOA_SECRET')
|
||
# 初始化加解密组件 (如果未开启加密,wechatpy 也能处理,但传入 key 更规范)
|
||
self.crypto = WeChatCrypto(self.token, self.aes_key, self.app_id)
|
||
self.client = WeChatClient(APP_ID, APP_SECRET)
|
||
self.client_token = None
|
||
self.client_token_expire_in = 0
|
||
|
||
async def get_client_access_token(self):
|
||
tim = time.time()
|
||
if tim > self.client_token_expire_in - 20:
|
||
f = awaitify(self.client.fetch_access_token)
|
||
data = await f()
|
||
self.client_token = data['access_token']
|
||
self.client_token_expire_in = time.time() + data['expire_in']
|
||
return self.client_token
|
||
|
||
async def upload_media(self, media_type:str, file_path:str) -> str:
|
||
"""
|
||
upload media file to wechat and return a media id
|
||
"""
|
||
media_id = ''
|
||
access_token = await self.get_client_access_token()
|
||
url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={access_token}&type={media_type}"
|
||
|
||
form = FormData()
|
||
# 'media' 是微信接口规定的字段名
|
||
with open(file_path, 'rb') as f:
|
||
form.add_field('media', f, filename=os.path.basename(file_path))
|
||
|
||
timeout = ClientTimeout(total=60) # 上传大文件需要更长超时
|
||
|
||
async with ClientSession(timeout=timeout) as session:
|
||
async with session.post(url, data=form) as resp:
|
||
result = await resp.json()
|
||
|
||
if resp.status == 200 and 'media_id' in result:
|
||
debug(f"临时素材上传成功: {result['media_id']}")
|
||
return result['media_id']
|
||
else:
|
||
exception(f"{file_path}:上传失败: {result}")
|
||
raise Exception(f"WeChat API Error: {result}")
|
||
|
||
async def media_id2url(self, media_id:str) -> str:
|
||
access_token = await self.get_client_access_token()
|
||
url = f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={access_token}&media_id={media_id}"
|
||
return url
|
||
|
||
async def messagehandler(self, request:web.Request, msg:BaseMessage):
|
||
"""
|
||
TextMessage, ImageMessage, VoiceMessage, VideoMessage,
|
||
LocationMessage, LinkMessage, EventMessage
|
||
"""
|
||
dic = DictObject()
|
||
dic.received_at = time.time()
|
||
dic.openid = msg.source
|
||
dic.msgtype = msg.type
|
||
dic.subscribe_id = getID()
|
||
if msg.type == 'text':
|
||
dic.content = msg.content
|
||
if msg.type in ['video', 'voice', 'image']:
|
||
dic.media_url = await self.media_id2url(msg.media_id)
|
||
if msg.type == 'link':
|
||
dic.title = msg.title
|
||
dic.url = msg.url
|
||
dic.description = msg.description
|
||
if msg.type == 'location':
|
||
dic.location = {
|
||
'latitude': msg.location_x,
|
||
'longitude': msg.location_y,
|
||
'label': msg.label
|
||
}
|
||
if msg.type == 'event':
|
||
dic.event = msg.event
|
||
config = getConfig()
|
||
await zmq_publish(config.woa_handler_id, json.dumps(dic, ensure_ascii=False))
|
||
result = await zmq_subcribe(dic.subscribe_id)
|
||
rzt_dic = DictObject(**json.loads(result))
|
||
if rzt_dic.msgtype in ['video', 'image', 'audio']:
|
||
dic.media_id = await self.upload_media(rzt_dic.msgtype, rzt_dic.media_filepath)
|
||
return rzt_dic
|
||
|
||
async def build_reply(self, msg:BaseMessage, rzt_msg:DictObject):
|
||
"""
|
||
ImageReply, VoiceReply, VideoReply,
|
||
ArticlesReply, MusicReply
|
||
"""
|
||
if rzt_msg.msgtype == 'text':
|
||
reply = TextReply(message=msg, content=rzt_msg.content)
|
||
return reply
|
||
if rzt_msg.msgtype in ['image', 'video', 'voice']:
|
||
media_id = await self.upload_media(rzt_msg.msgtype,
|
||
rzt_msg.media_file)
|
||
if rzt_msg.msgtype == 'image':
|
||
reply = ImageReply(message=msg, media_id=media_id)
|
||
return reply
|
||
if rzt_msg.msgtype == 'voice':
|
||
reply = VoiceReply(message=msg, media_id=media_id)
|
||
return reply
|
||
if rzt_msg.msgtype == 'video':
|
||
reply = VideoReply(message=msg, title=rzt_msg.title,
|
||
media_id=media_id,
|
||
description=rzt_msg.description)
|
||
return reply
|
||
if rzt_msg.msgtype == 'music':
|
||
reply = MusicReply(
|
||
message=msg,
|
||
title=rzt_msg.title,
|
||
description=rzt_msg.description,
|
||
music_url=rzt_msg.music_url, # 核心:播放链接
|
||
hq_music_url=rzt_msg.hq_music_url, # 核心:高质量链接
|
||
thumb_media_id=media_id # 核心:封面图 media_id
|
||
)
|
||
return reply
|
||
if rzt_msg.msgtype == 'article':
|
||
reply = ArticlesReply(message=msg)
|
||
for article in rzt_msg.articles:
|
||
reply.add_article({
|
||
'title': article.title,
|
||
'description': article.description,
|
||
'image': article.image_url,
|
||
'url': article.url
|
||
})
|
||
return reply
|
||
|
||
def _verify_signature(self, request):
|
||
"""验证签名并解密消息体(如果开启加密)"""
|
||
signature = request.query.get('signature', '')
|
||
timestamp = request.query.get('timestamp', '')
|
||
nonce = request.query.get('nonce', '')
|
||
echo_str = request.query.get('echostr', '')
|
||
|
||
# 1. 验证签名
|
||
try:
|
||
if echo_str:
|
||
# GET 请求:验证 URL
|
||
echo_str = self.crypto.check_signature(signature, timestamp, nonce, echo_str)
|
||
return True, echo_str
|
||
else:
|
||
# POST 请求:验证签名 (不返回 echo_str,只返回 True)
|
||
self.crypto.check_signature(signature, timestamp, nonce)
|
||
return True, None
|
||
except InvalidSignatureException:
|
||
logger.warning("签名验证失败")
|
||
return False, None
|
||
|
||
async def handle_get(self, request: web.Request) -> web.Response:
|
||
"""处理微信服务器 URL 验证"""
|
||
is_valid, echo_str = self._verify_signature(request)
|
||
if is_valid:
|
||
debug("URL 验证成功")
|
||
return web.Response(text=echo_str)
|
||
else:
|
||
return web.Response(text="failed", status=403)
|
||
|
||
async def handle_post(self, request: web.Request) -> web.Response:
|
||
"""处理用户消息和事件"""
|
||
# 1. 验证签名
|
||
is_valid, _ = self._verify_signature(request)
|
||
if not is_valid:
|
||
return web.Response(text="failed", status=403)
|
||
|
||
# 2. 读取并解密消息体
|
||
try:
|
||
body = await request.read()
|
||
# decrypt_message 会自动处理解密和 XML 解析,返回 wechatpy 的消息对象
|
||
# 如果未开启加密,它也会正常解析 XML
|
||
msg = self.crypto.decrypt_message(body)
|
||
except Exception as e:
|
||
logger.error(f"消息解密或解析失败: {e}")
|
||
return web.Response(text="success") # 防止微信重试
|
||
|
||
debug(f"收到消息类型: {type(msg).__name__}")
|
||
|
||
# 3. 业务逻辑路由
|
||
reply = None
|
||
rzt_msg = await messagehandler(request, msg)
|
||
await self.build_reply(msg, rzt_msg)
|
||
# render() 方法将回复对象转换为 XML 字符串
|
||
response_xml = reply.render()
|
||
# 如果开启了加密模式,需要再次加密返回给微信
|
||
if self.aes_key:
|
||
response_xml = self.crypto.encrypt_message(response_xml,
|
||
request.query.get('nonce'),
|
||
request.query.get('timestamp'))
|
||
|
||
return web.Response(text=response_xml, content_type='application/xml')
|
||
|
||
async def setup_route(woa, app):
|
||
app.router.add_get('/woa/wechat', woa.handle_get)
|
||
app.router.add_post('/woa/wechat', woa.handle_post)
|
||
yield
|
||
return
|
||
|
||
def load_woa():
|
||
env = ServerEnv()
|
||
woa = WOAHandler()
|
||
f = partial(setup_route, woa)
|
||
add_cleanupctx(f)
|
||
env.woa_handler = woa
|
||
|