This commit is contained in:
yumoqing 2026-03-01 15:04:07 +08:00
parent 3b906c7081
commit e9ea75b5f7
2 changed files with 187 additions and 116 deletions

View File

@ -1,2 +1,92 @@
# wechat-officeaccount
支持从微信服务号获取用户输入信息正文图片视频语音事件并通过zmq消息发送给订阅了config.json中woa_handler_id的代码处理并将结果通过zmq消息发送会奔模块的等待函数
## 接收消息类型
* TextMessage
* ImageMessage
* VoiceMessage
* VideoMessage
* LocationMessage
* LinkMessage
* EventMessage
发送给消息处理程序的结构:
{
subscribe_id消息处理程序zmq消息返回消息使用的key
received_at:消息接收时间
openid用户的openid号
msgtype消息类型 ['text', 'image', 'video', 'voice', 'location', 'link', 'event' ] 之一
# 以下信息为是msgtype=='text'
content: 正文
#### over
# 以下信息为是msgtype in ['video', 'voice', 'image']
media_url媒体的url通过此url下载相应的图片视频voice等
#### over
# 以下信息是msgtype=='position'
location:{ # LocationMessage位置消息
latitude:纬度
longitude:经度
label地址标签
}
#### over
event
# 以下信息为是msgtype == 'link'
title标题
description描述
url链接
}
## 回微信的消息类型
* TextReply
消息属性
{
msgtype: 'text'
content如果不需要回复微信的消息content='success'
}
* ImageReply
消息属性
{
msgtype:'image'
media_file:图像文件的本地路径
}
* VoiceReply
消息属性
{
msgtype:'voice'
media_file:图像文件的本地路径
}
* VideoReply
消息属性
{
msgtype:'video'
media_file:图像文件的本地路径
}
* ArticlesReply
消息属性
{
msgtype:'video'
articles:[
{
'title':标题
description描述
'image_url':图片url
'url':文章所在url
}
]
}
* MusicReply
消息属性
{
msgtype:'music'
title:标题
description描述
music_url公网可访问的url
hq_music_url高质量链接
}
返回消息有以下属性

View File

@ -13,18 +13,21 @@ from wechatpy.replies import (
)
from ahserver.configuredServer import add_cleanupctx
from ahserver.serverenv import ServerEnv
from appPublic.dictObject import DictObject
from appPublic.log import debug, exception, error
from appPublic.zmqapi import zmq_subcribe, zmq_publish
from appPublic.uniqueID import getID
from appPublic.jsonConfig import getConfig
class WOAHandler:
media_types = ['image', 'voice', 'video', 'thumb']
def __init__(self, msg_handler):
def __init__(self):
self.app_id = os.environ.get('WOA_APPID')
self.token = os.environ.get('WOA_TOKEN')
self.aes_key = os.environ.get('WOA_AESKEY')
self.secret = os.environ.get('WOA_SECRET')
# 初始化加解密组件 (如果未开启加密wechatpy 也能处理,但传入 key 更规范)
self.crypto = WeChatCrypto(token, aes_key, app_id)
self.msg_handler = msg_handler
self.client = WeChatClient(APP_ID, APP_SECRET)
self.client_token = None
self.client_token_expire_in = 0
@ -65,22 +68,86 @@ class WOAHandler:
raise Exception(f"WeChat API Error: {result}")
async def media_id2url(self, media_id:str) -> str:
if self.client_token is None:
f = awaitify(self.client.fetch_access_token)
self.client_token = await f()
url = f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={self.client_token}&media_id={media_id}"
access_token = await self.get_client_access_token()
url = f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={access_token}&media_id={media_id}"
return url
async def messagehandler(self, request:web.Request, msg:Message):
"""
call self.msg_handler handle message
TextMessage, ImageMessage, VoiceMessage, VideoMessage,
LocationMessage, LinkMessage, EventMessage
"""
dic = {}
dic = DictObject()
dic.received_at = time.time()
dic.openid = msg.source
dic.msgtype = msg.type
dic.subscribe_id = getID()
if msg.type == 'text':
dic.content = msg.content
if msg.type in ['video', 'voice', 'image']:
dic.media_url = await self.media_id2url(msg.media_id)
if msg.type == 'link':
dic.title = msg.title
dic.url = msg.url
dic.description = msg.description
if msg.type == location':
dic.location = {
'latitude': msg.location_x,
'longitude': msg.location_y,
'label': msg.label
}
if msg.type == 'event':
dic.event = msg.event
config = getConfig()
await zmq_publish(config.woa_handler_id, json.dumps(dic, ensure_ascii=False))
result = await zmq_subcribe(dic.subscribe_id)
rzt_dic = DictObject(**json.loads(result))
if rzt_dic.msgtype in ['video', 'image', 'audio']:
dic.media_id = await self.upload_media(rzt_dic.msgtype, rzt_dic.media_filepath)
return rzt_dic
if self.msg_handler:
return await self.msg_handler(reqest, msg)
return None
def build_reply(self, msg:Mesage, rzt_msg:DictObject):
"""
ImageReply, VoiceReply, VideoReply,
ArticlesReply, MusicReply
"""
if rzt_msg.msgtype == 'text':
reply = TextReply(message=msg, content=rzt_msg.content)
return reply
if rzt_msg.msgtype in ['image', 'video', 'voice']:
media_id = await self.upload_media(rzt_msg.msgtype,
rzt_msg.media_file)
if rzt_msg.msgtype == 'image':
reply = ImageReply(message=msg, media_id=media_id)
return reply
if rzt_msg.msgtype == 'voice':
reply = VoiceMessage(message=msg, media_id=media_id)
return reply
if rzt_msg.msgtype == 'video':
reply = VideoReply(message=msg, title=rzt_msg.title,
media_id=media_id,
description=rzt_msg.description)
return reply
if rzt_msg.msgtype == 'music':
reply = MusicReply(
message=msg,
title=rzt_msg.title,
description=rzt_msg.description,
music_url=rzt_msg.music_url, # 核心:播放链接
hq_music_url=rzt_msg.hq_music_url, # 核心:高质量链接
thumb_media_id=media_id # 核心:封面图 media_id
)
return reply
if rzt_msg.msgtype == 'article':
reply = ArticlesReply(message=msg)
for article in rzt_msg.articles:
reply.add_article({
'title': article.title,
'description': article.description,
'image': article.image_url,
'url': article.url
})
return reply
def _verify_signature(self, request):
"""验证签名并解密消息体(如果开启加密)"""
@ -133,114 +200,28 @@ class WOAHandler:
# 3. 业务逻辑路由
reply = None
if isinstance(msg, TextMessage):
content = msg.content.strip()
debug(f"用户文本: {content},{msg}")
# --- 模拟多媒体回复逻辑 ---
if content == "图片":
reply = self._get_image_reply(msg)
elif content == "语音":
reply = self._get_voice_reply(msg)
elif content == "视频":
reply = self._get_video_reply(msg)
elif content == "图文":
reply = self._get_articles_reply(msg)
else:
# 默认文本回复
reply = TextReply(message=msg, content=f"收到:{content}\n发送'图片''语音''视频''图文'测试多媒体回复。")
elif isinstance(msg, EventMessage):
if msg.event == 'subscribe':
reply = TextReply(message=msg, content="欢迎关注!发送指令测试功能。")
elif msg.event == 'unsubscribe':
debug(f"用户 {msg.source} 取消关注")
# 取消关注通常不需要回复,或者回复了用户也收不到
return web.Response(text="success")
# 4. 生成响应 XML
if reply:
rzt_msg = await messagehandler(request, msg)
await self.build_reply(msg, rzt_msg)
# render() 方法将回复对象转换为 XML 字符串
response_xml = reply.render()
# 如果开启了加密模式,需要再次加密返回给微信
if self.aes_key:
response_xml = self.crypto.encrypt_message(response_xml, request.query.get('nonce'), request.query.get('timestamp'))
response_xml = self.crypto.encrypt_message(response_xml,
request.query.get('nonce'),
request.query.get('timestamp'))
return web.Response(text=response_xml, content_type='application/xml')
return web.Response(text="success")
# --- 多媒体回复构造 helper ---
def _get_image_reply(self, msg):
"""
返回图片消息
注意media_id 必须是通过微信素材接口上传后获得的永久或临时素材 ID
这里仅做演示实际运行需替换为有效的 media_id
"""
# 在实际生产中,你需要先调用 client.material.add(...) 上传获取 media_id
demo_media_id = "DEMO_IMAGE_MEDIA_ID"
logger.warning(f"演示模式:图片 media_id 为 {demo_media_id},请替换为真实 ID")
return ImageReply(
message=msg,
media_id=demo_media_id
)
def _get_voice_reply(self, msg):
"""返回语音消息"""
demo_media_id = "DEMO_VOICE_MEDIA_ID"
logger.warning(f"演示模式:语音 media_id 为 {demo_media_id}")
return VoiceReply(
message=msg,
media_id=demo_media_id
)
def _get_video_reply(self, msg):
"""返回视频消息"""
demo_media_id = "DEMO_VIDEO_MEDIA_ID"
logger.warning(f"演示模式:视频 media_id 为 {demo_media_id}")
return VideoReply(
message=msg,
media_id=demo_media_id,
title="演示视频",
description="这是一个通过 wechatpy 生成的视频回复"
)
def _get_articles_reply(self, msg):
"""返回图文消息 (多图文)"""
reply = ArticlesReply(message=msg)
# 添加第一篇文章
reply.add_article({
'title': '欢迎使用 WeChatPy',
'description': '这是由 aiohttp + wechatpy 驱动的自动回复示例。',
'image': 'https://img.alicdn.com/tfs/TB1.R77XVY7gK0jSZKpXXbT6VXa-1024-1024.png', # 封面图链接
'url': 'https://wechatpy.readthedocs.io/'
})
# 添加第二篇文章 (可选)
reply.add_article({
'title': 'GitHub 仓库',
'description': '点击访问源代码。',
'image': '', # 可以不填
'url': 'https://github.com/wechatpy/wechatpy'
})
return reply
async def woa_get(request):
pass
async def woa_post(request):
pass
async def setup_route(app):
app.router.add_get('/woa/wechat', woa_get)
app.router.add_post('/woa/wechat', woa_post)
async def setup_route(woa, app):
app.router.add_get('/woa/wechat', woa.handle_get)
app.router.add_post('/woa/wechat', woa.handle_post)
yield
return
def load_woa():
add_cleanupctx(setup_route)
env = ServerEnv()
woa = WOAHandler()
f = partial(setup_route, woa)
add_cleanupctx(f)
env.woa_handler = woa