This commit is contained in:
yumoqing 2026-03-02 18:12:58 +08:00
parent 2745963fbc
commit 08e62f8d65
3 changed files with 570 additions and 154 deletions

View File

@ -3,17 +3,6 @@ import os
import time import time
from functools import partial from functools import partial
from aiohttp import web, ClientSession, FormData, ClientTimeout from aiohttp import web, ClientSession, FormData, ClientTimeout
from wechatpy.client import WeChatClient
from wechatpy.crypto import WeChatCrypto
from wechatpy.exceptions import InvalidSignatureException, WeChatClientException
from wechatpy.messages import (
BaseMessage, TextMessage, ImageMessage, VoiceMessage, VideoMessage,
LocationMessage, LinkMessage
)
from wechatpy.replies import (
TextReply, ImageReply, VoiceReply, VideoReply,
ArticlesReply, MusicReply
)
from ahserver.configuredServer import add_cleanupctx from ahserver.configuredServer import add_cleanupctx
from ahserver.serverenv import ServerEnv from ahserver.serverenv import ServerEnv
from appPublic.dictObject import DictObject from appPublic.dictObject import DictObject
@ -22,118 +11,163 @@ from appPublic.zmqapi import zmq_subcribe, zmq_publish
from appPublic.uniqueID import getID from appPublic.uniqueID import getID
from appPublic.jsonConfig import getConfig from appPublic.jsonConfig import getConfig
import asyncio
import json
from typing import Optional, Dict, Any, Union
from pathlib import Path
from .media import WeChatMediaManager
from .reply import ReplyBuilder
class WOAHandler: class WOAHandler:
media_types = ['image', 'voice', 'video', 'thumb']
def __init__(self): def __init__(self):
self.app_id = os.environ.get('WOA_APPID') self.app_id = os.environ.get('WOA_APPID')
self.token = os.environ.get('WOA_TOKEN') self.token = os.environ.get('WOA_TOKEN')
self.aes_key = os.environ.get('WOA_AESKEY') encoding_aes_key = os.environ.get('WOA_AESKEY')
self.secret = os.environ.get('WOA_SECRET', 'test') self.secret = os.environ.get('WOA_SECRET', 'test')
# 初始化加解密组件 (如果未开启加密wechatpy 也能处理,但传入 key 更规范) self.aes_key = None
self.crypto = WeChatCrypto(self.token, self.aes_key, self.app_id) if encoding_aes_key:
self.client = WeChatClient(self.app_id, self.secret) # 微信的 AES Key 是 base64 编码的 32 字节字符串
self.client_token = None self.aes_key = base64.b64decode(encoding_aes_key + "=")
self.client_token_expire_in = 0 self.encoding_aes_key = self.aes_key
self.media_manager = WeChatMediaManager(app_id, app_secret)
async def get_client_access_token(self): def _check_signature(self, signature: str, timestamp: str, nonce: str) -> bool:
tim = time.time() """验证签名"""
if tim > self.client_token_expire_in - 20: if not signature:
f = awaitify(self.client.fetch_access_token) return False
data = await f() check_list = sorted([self.token, timestamp, nonce])
self.client_token = data['access_token'] check_str = "".join(check_list)
self.client_token_expire_in = time.time() + data['expire_in'] hash_code = hashlib.sha1(check_str.encode('utf-8')).hexdigest()
return self.client_token return hash_code == signature
async def upload_media(self, media_type:str, file_path:str) -> str: def _decrypt_msg(self, encrypt_msg: str) -> str:
""" """AES 解密消息体 (CBC 模式)"""
upload media file to wechat and return a media id if not self.aes_key:
""" return encrypt_msg
media_id = ''
access_token = await self.get_client_access_token()
url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={access_token}&type={media_type}"
form = FormData() # 1. Base64 解码
# 'media' 是微信接口规定的字段名 cipher_text = base64.b64decode(encrypt_msg)
with open(file_path, 'rb') as f:
form.add_field('media', f, filename=os.path.basename(file_path))
timeout = ClientTimeout(total=60) # 上传大文件需要更长超时 # 2. AES 解密
iv = self.aes_key[:16]
cipher = Cipher(algorithms.AES(self.aes_key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
decrypted = decryptor.update(cipher_text) + decryptor.finalize()
async with ClientSession(timeout=timeout) as session: # 3. 去除填充 (PKCS7)
async with session.post(url, data=form) as resp: pad_len = decrypted[-1]
result = await resp.json() if pad_len < 1 or pad_len > 32:
raise ValueError("Invalid padding")
content = decrypted[:-pad_len]
if resp.status == 200 and 'media_id' in result: # 4. 去除前16字节随机字符串和后4字节长度字符串 (微信协议格式)
debug(f"临时素材上传成功: {result['media_id']}") # 格式: [16字节随机串] + [消息内容] + [4字节网络序长度] + [AppID]
return result['media_id'] # 注意:这里简化处理,通常只需去掉头部随机串和尾部长度+AppID
# 实际微信协议Random(16) + MsgLen(4) + Content + AppID
# 但解密后结构通常是Random(16) + Content + Pad
# 让我们重新校准微信的解密逻辑:
# 解密后数据 = 16字节随机串 + 4字节内容长度(网络字节序) + 消息内容 + AppID
# 修正逻辑:
# 1. 去掉前16字节随机数
content_with_len = content[16:]
# 2. 读取接下来4字节作为长度
msg_len = struct.unpack(">I", content_with_len[:4])[0]
# 3. 截取消息内容
msg_content = content_with_len[4:4+msg_len]
return msg_content.decode('utf-8')
def _encrypt_msg(self, text: str, nonce: str, timestamp: str) -> str:
"""AES 加密消息体"""
if not self.aes_key:
return text
# 1. 生成随机16字节
random_bytes = bytes([random.randint(0, 255) for _ in range(16)])
# 2. 获取 AppID (这里假设你有,或者从配置读,加密必须用到 AppID 填充在末尾)
# 注意:原生实现需要知道 AppID 才能正确加密,否则微信无法解密
# 为了演示,这里假设有一个 app_id 属性,请自行补充
app_id = "YOUR_APP_ID"
# 3. 拼接Random(16) + Len(4) + Content + AppID
content_bytes = text.encode('utf-8')
app_id_bytes = app_id.encode('utf-8')
len_bytes = struct.pack(">I", len(content_bytes))
raw_data = random_bytes + len_bytes + content_bytes + app_id_bytes
# 4. PKCS7 填充
pad_len = 32 - (len(raw_data) % 32)
raw_data += bytes([pad_len] * pad_len)
# 5. AES 加密
iv = self.aes_key[:16]
cipher = Cipher(algorithms.AES(self.aes_key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
cipher_text = encryptor.update(raw_data) + encryptor.finalize()
# 6. Base64 编码
return base64.b64encode(cipher_text).decode('utf-8')
async def handle_get(self, request: web.Request) -> web.Response:
"""URL 验证"""
query = request.query
signature = query.get('signature', '')
timestamp = query.get('timestamp', '')
nonce = query.get('nonce', '')
echostr = query.get('echostr', '')
if self._check_signature(signature, timestamp, nonce):
# 如果开启加密echostr 也需要解密
if self.encoding_aes_key:
try:
echostr = self._decrypt_msg(echostr)
except Exception as e:
error(f"Echostr 解密失败: {e}")
return web.Response(text="failed", status=403)
return web.Response(text=echostr)
else: else:
exception(f"{file_path}:上传失败: {result}") return web.Response(text="failed", status=403)
raise Exception(f"WeChat API Error: {result}")
async def media_id2url(self, media_id:str) -> str: async def msghandle(self, msg:DictObject):
access_token = await self.get_client_access_token()
url = f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={access_token}&media_id={media_id}"
return url
async def messagehandler(self, request:web.Request, msg:BaseMessage):
""" """
TextMessage, ImageMessage, VoiceMessage, VideoMessage,
LocationMessage, LinkMessage, EventMessage
""" """
dic = DictObject()
dic.received_at = time.time()
dic.openid = msg.source
dic.msgtype = msg.type
dic.subscribe_id = getID()
if msg.type == 'text':
dic.content = msg.content
if msg.type in ['video', 'voice', 'image']:
dic.media_url = await self.media_id2url(msg.media_id)
if msg.type == 'link':
dic.title = msg.title
dic.url = msg.url
dic.description = msg.description
if msg.type == 'location':
dic.location = {
'latitude': msg.location_x,
'longitude': msg.location_y,
'label': msg.label
}
if msg.type == 'event':
dic.event = msg.event
config = getConfig() config = getConfig()
await zmq_publish(config.woa_handler_id, json.dumps(dic, ensure_ascii=False)) if msg.MsgType in ['video', 'voice', 'image']:
result = await zmq_subcribe(dic.subscribe_id) msg.media_url = self.media_manager.get_media_url(msg.media_id)
msg.subscribe_id = getID()
await zmq_publish(config.woa_handler_id, json.dumps(msg, ensure_ascii=False))
result = await zmq_subcribe(msg.subscribe_id)
rzt_dic = DictObject(**json.loads(result)) rzt_dic = DictObject(**json.loads(result))
if rzt_dic.msgtype in ['video', 'image', 'audio']: if rzt_dic.msgtype in ['video', 'image', 'audio']:
dic.media_id = await self.upload_media(rzt_dic.msgtype, rzt_dic.media_filepath) rzt_dic.media_id = await self.media_manager.upload_media(rzt_dic.msgtype, rzt_dic.media_filepath)
return rzt_dic return rzt_dic
async def build_reply(self, msg:BaseMessage, rzt_msg:DictObject): def build_reply(self, msg:DictObject, rzt_msg:DictObject):
""" """
ImageReply, VoiceReply, VideoReply, ImageReply, VoiceReply, VideoReply,
ArticlesReply, MusicReply ArticlesReply, MusicReply
""" """
if rzt_msg.msgtype == 'text': if rzt_msg.msgtype == 'text':
reply = TextReply(message=msg, content=rzt_msg.content) reply = ReplyBuilder.text(msg, content=rzt_msg.content)
return reply return reply
if rzt_msg.msgtype in ['image', 'video', 'voice']: if rzt_msg.msgtype in ['image', 'video', 'voice']:
media_id = await self.upload_media(rzt_msg.msgtype, media_id = await self.upload_media(rzt_msg.msgtype,
rzt_msg.media_file) rzt_msg.media_file)
if rzt_msg.msgtype == 'image': if rzt_msg.msgtype == 'image':
reply = ImageReply(message=msg, media_id=media_id) reply = ReplyBuilder.image(msg, media_id=media_id)
return reply return reply
if rzt_msg.msgtype == 'voice': if rzt_msg.msgtype == 'voice':
reply = VoiceReply(message=msg, media_id=media_id) reply = ReplyBuilder.voice(msg, media_id=media_id)
return reply return reply
if rzt_msg.msgtype == 'video': if rzt_msg.msgtype == 'video':
reply = VideoReply(message=msg, title=rzt_msg.title, reply = ReplyBuilder.video(msg, title=rzt_msg.title,
media_id=media_id, media_id=media_id,
description=rzt_msg.description) description=rzt_msg.description)
return reply return reply
if rzt_msg.msgtype == 'music': if rzt_msg.msgtype == 'music':
reply = MusicReply( reply = ReplyBuilder.music(msg,
message=msg,
title=rzt_msg.title, title=rzt_msg.title,
description=rzt_msg.description, description=rzt_msg.description,
music_url=rzt_msg.music_url, # 核心:播放链接 music_url=rzt_msg.music_url, # 核心:播放链接
@ -141,81 +175,73 @@ class WOAHandler:
thumb_media_id=media_id # 核心:封面图 media_id thumb_media_id=media_id # 核心:封面图 media_id
) )
return reply return reply
if rzt_msg.msgtype == 'article': if rzt_msg.msgtype == 'news':
reply = ArticlesReply(message=msg) reply = ReplyBuilder.news(msg, rzt_msg.articles)
for article in rzt_msg.articles:
reply.add_article({
'title': article.title,
'description': article.description,
'image': article.image_url,
'url': article.url
})
return reply return reply
def _verify_signature(self, request):
"""验证签名并解密消息体(如果开启加密)"""
signature = request.query.get('signature', '')
timestamp = request.query.get('timestamp', '')
nonce = request.query.get('nonce', '')
echo_str = request.query.get('echostr', '')
debug(f'{request.query=}')
# 1. 验证签名
try:
echo_str = self.crypto._check_signature(signature, timestamp, nonce, echo_str)
return True, echo_str
except InvalidSignatureException:
logger.warning("签名验证失败")
return False, None
async def handle_get(self, request: web.Request) -> web.Response:
"""处理微信服务器 URL 验证"""
is_valid, echo_str = self._verify_signature(request)
if is_valid:
debug("URL 验证成功")
return web.Response(text=echo_str)
else:
return web.Response(text="failed", status=403)
async def handle_post(self, request: web.Request) -> web.Response: async def handle_post(self, request: web.Request) -> web.Response:
"""处理用户消息和事件""" """接收消息"""
# 1. 验证签名
query = request.query query = request.query
signature = query.get('signature', '') signature = query.get('signature', '')
timestamp = query.get('timestamp', '') timestamp = query.get('timestamp', '')
nonce = query.get('nonce', '') nonce = query.get('nonce', '')
is_valid, _ = self._verify_signature(request)
if not is_valid: # 1. 验签
if not self._check_signature(signature, timestamp, nonce):
error("签名验证失败")
return web.Response(text="failed", status=403) return web.Response(text="failed", status=403)
# 2. 读取并解密消息体 # 2. 读取 Body
body=''
try:
body = await request.read() body = await request.read()
# decrypt_message 会自动处理解密和 XML 解析,返回 wechatpy 的消息对象 if not body:
# 如果未开启加密,它也会正常解析 XML return web.Response(text="success")
msg = self.crypto.decrypt_message(body,
signature=signature, try:
timestamp=timestamp, xml_str = body.decode('utf-8')
nonce=nonce)
# 3. 如果开启加密,先解密
if self.encoding_aes_key:
root = ET.fromstring(xml_str)
encrypt_node = root.find('Encrypt')
if encrypt_node is not None and encrypt_node.text:
xml_str = self._decrypt_msg(encrypt_node.text)
else:
error("加密模式下未找到 Encrypt 节点")
return web.Response(text="success")
# 4. 解析 XML 为字典
msg_dict = self._parse_xml(xml_str)
debug(f"收到消息: {msg_dict.get('MsgType')} from {msg_dict.get('FromUserName')}")
# --- 业务逻辑 ---
reply_dic = await self.msghandle(msg_dict)
reply_xml = self.build_reply(msg_dict, reply_dic)
# 5. 如果需要加密回复
if self.encoding_aes_key and reply_xml:
encrypted_xml_str = self._encrypt_msg(reply_xml, nonce, timestamp)
# 构造加密后的返回 XML 包
final_xml = f"""
<xml>
<Encrypt><![CDATA[{encrypted_xml_str}]]></Encrypt>
<MsgSignature><![CDATA[{hashlib.sha1((self.token + timestamp + nonce + encrypted_xml_str).encode()).hexdigest()}]]></MsgSignature>
<TimeStamp>{timestamp}</TimeStamp>
<Nonce><![CDATA[{nonce}]]></Nonce>
</xml>
"""
return web.Response(text=final_xml, content_type='application/xml')
elif reply_xml:
return web.Response(text=reply_xml, content_type='application/xml')
else:
return web.Response(text="success")
except Exception as e: except Exception as e:
error(f"消息解密或解析失败: {e}, {body=}") exception(f"处理消息异常: {e}")
return web.Response(text="success") # 防止微信重试 return web.Response(text="success")
debug(f"收到消息类型: {type(msg).__name__}") def _parse_xml(self, xml_str: str) -> Dict[str, Any]:
root = ET.fromstring(xml_str)
# 3. 业务逻辑路由 return DictObject(**{child.tag: child.text for child in root})
reply = None
rzt_msg = await messagehandler(request, msg)
await self.build_reply(msg, rzt_msg)
# render() 方法将回复对象转换为 XML 字符串
response_xml = reply.render()
# 如果开启了加密模式,需要再次加密返回给微信
if self.aes_key:
response_xml = self.crypto.encrypt_message(response_xml,
request.query.get('nonce'),
request.query.get('timestamp'))
return web.Response(text=response_xml, content_type='application/xml')
async def setup_route(woa, app): async def setup_route(woa, app):
app.router.add_get('/woa', woa.handle_get) app.router.add_get('/woa', woa.handle_get)

190
woa/media.py Normal file
View File

@ -0,0 +1,190 @@
import asyncio
import os
import time
from functools import partial
from aiohttp import web, ClientSession, FormData, ClientTimeout
from ahserver.configuredServer import add_cleanupctx
from ahserver.serverenv import ServerEnv
from appPublic.dictObject import DictObject
from appPublic.log import debug, exception, error
from appPublic.zmqapi import zmq_subcribe, zmq_publish
from appPublic.uniqueID import getID
from appPublic.jsonConfig import getConfig
import asyncio
import aiohttp
import os
import json
import time
import logging
from typing import Optional, Dict, Any, Union
from pathlib import Path
class WeChatMediaManager:
def __init__(self, app_id: str, app_secret: str):
self.app_id = app_id
self.app_secret = app_secret
self._access_token: Optional[str] = None
self._token_expires_at: float = 0
self._token_lock = asyncio.Lock()
# ================= 1. Token 管理 (基础) =================
async def get_access_token(self) -> str:
"""
获取 access_token带内存缓存和锁机制防止高并发下重复刷新
"""
# 如果 token 有效时间大于 200 秒,直接返回缓存
if self._access_token and time.time() < self._token_expires_at - 20:
return self._access_token
async with self._token_lock:
# 双重检查锁
if self._access_token and time.time() < self._token_expires_at - 20:
return self._access_token
url = "https://api.weixin.qq.com/cgi-bin/token"
params = {
"grant_type": "client_credential",
"appid": self.app_id,
"secret": self.app_secret
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as resp:
data = await resp.json()
if "access_token" in data:
self._access_token = data["access_token"]
expires_in = data.get("expires_in", 7200)
self._token_expires_at = time.time() + expires_in
debug("Access Token 刷新成功")
return self._access_token
else:
error(f"获取 Token 失败: {data}")
raise Exception(f"WeChat API Error: {data}")
# ================= 2. 上传媒体文件 =================
async def upload_media(
self,
file_path: Union[str, Path],
media_type: str,
is_permanent: bool = False,
title: Optional[str] = None,
description: Optional[str] = None
) -> Dict[str, Any]:
"""
上传媒体文件到微信服务器
:param file_path: 本地文件路径
:param media_type: 'image', 'voice', 'video', 'thumb'
:param is_permanent: False=临时素材(3), True=永久素材
:param title/description: 仅当上传永久视频时需要
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
token = await self.get_access_token()
# 区分临时和永久素材接口
if is_permanent:
url = f"https://api.weixin.qq.com/cgi-bin/material/add_material?access_token={token}&type={media_type}"
else:
url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={token}&type={media_type}"
# 构建 multipart/form-data
form = aiohttp.FormData()
with open(file_path, 'rb') as f:
# 微信要求的字段名必须是 'media'
form.add_field('media', f, filename=os.path.basename(file_path))
# 永久视频需要额外的 description 字段 (JSON 格式)
if is_permanent and media_type == 'video':
if not title:
title = os.path.basename(file_path)
desc_json = json.dumps({
"title": title,
"introduction": description or ""
}, ensure_ascii=False)
form.add_field('description', desc_json)
timeout = aiohttp.ClientTimeout(total=120) # 大文件上传需要更长超时
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(url, data=form) as resp:
result = await resp.json()
if resp.status == 200 and ('media_id' in result or 'url' in result):
debug(f"上传成功 ({'永久' if is_permanent else '临时'}): {result.get('media_id')}")
return result
else:
error(f"上传失败: {result}")
raise Exception(f"WeChat Upload Error: {result}")
# ================= 3. 下载媒体文件 =================
async def download_media(
self,
media_id: str,
save_path: Union[str, Path],
is_permanent: bool = False
) -> str:
"""
根据 media_id 下载媒体文件到本地
:param media_id: 微信媒体 ID
:param save_path: 保存到的本地路径
:param is_permanent: False=临时素材下载, True=永久素材下载
:return: 保存的文件路径
"""
token = await self.get_access_token()
# 区分临时和永久素材下载接口
if is_permanent:
url = f"https://api.weixin.qq.com/cgi-bin/material/get_material?access_token={token}"
# 永久素材下载需要 POST JSON body
post_data = {"media_id": media_id}
method = "POST"
else:
url = f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={token}&media_id={media_id}"
post_data = None
method = "GET"
timeout = aiohttp.ClientTimeout(total=120)
# 确保目录存在
os.makedirs(os.path.dirname(save_path) or '.', exist_ok=True)
async with aiohttp.ClientSession(timeout=timeout) as session:
if method == "GET":
async with session.get(url) as resp:
return await self._handle_download_response(resp, save_path, media_id)
else:
async with session.post(url, json=post_data) as resp:
return await self._handle_download_response(resp, save_path, media_id)
async def _handle_download_response(self, resp: aiohttp.ClientResponse, save_path: str, media_id: str) -> str:
"""处理下载响应流"""
content_type = resp.headers.get('Content-Type', '')
# 微信出错时通常返回 JSON
if 'application/json' in content_type:
error_data = await resp.json()
error(f"下载失败 (JSON 错误): {error_data}")
raise Exception(f"WeChat Download Error: {error_data}")
# 流式写入文件,避免大文件撑爆内存
with open(save_path, 'wb') as f:
async for chunk in resp.content.iter_chunked(8192): # 每次 8KB
f.write(chunk)
debug(f"文件下载成功: {save_path}")
return save_path
# ================= 4. 获取临时素材 URL (仅临时素材有效) =================
async def get_media_url(self, media_id: str) -> str:
"""
获取临时素材的下载 URL (有效期3天)
注意永久素材没有直接的 URL必须下载
"""
token = await self.get_access_token()
return f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={token}&media_id={media_id}"

200
woa/reply.py Normal file
View File

@ -0,0 +1,200 @@
import time
from typing import Optional, List, Dict, Any
import time
from typing import List, Dict, Any, Optional
class ReplyBuilder:
"""
原生微信消息回复构建器 (Native WeChat Reply Builder)
功能
1. 生成所有标准类型的被动回复 XML (明文)
2. 自动处理 ToUserName/FromUserName 的反转
3. 支持文本图片语音视频音乐图文客服转发
注意
- 返回的是明文字符串如果公众号开启了加密模式需在主程序中对返回结果进行 AES 加密
- 所有 media_id 必须是通过微信接口上传后获得的有效 ID
"""
@staticmethod
def _get_base_xml(to_user: str, from_user: str, create_time: int, msg_type: str) -> str:
"""生成公共的 XML 头部"""
return f"""<xml>
<ToUserName><![CDATA[{to_user}]]></ToUserName>
<FromUserName><![CDATA[{from_user}]]></FromUserName>
<CreateTime>{create_time}</CreateTime>
<MsgType><![CDATA[{msg_type}]]></MsgType>
"""
@staticmethod
def text(msg: Dict[str, Any], content: str) -> str:
"""
构造文本回复
:param msg: 原始接收消息字典
:param content: 回复的文本内容
"""
to_user = msg.get('FromUserName')
from_user = msg.get('ToUserName')
create_time = int(time.time())
xml = ReplyBuilder._get_base_xml(to_user, from_user, create_time, 'text')
xml += f"<Content><![CDATA[{content}]]></Content>\n</xml>"
return xml
@staticmethod
def image(msg: Dict[str, Any], media_id: str) -> str:
"""
构造图片回复
:param msg: 原始接收消息字典
:param media_id: 微信服务器返回的图片媒体 ID
"""
to_user = msg.get('FromUserName')
from_user = msg.get('ToUserName')
create_time = int(time.time())
xml = ReplyBuilder._get_base_xml(to_user, from_user, create_time, 'image')
xml += f"""<Image>
<MediaId><![CDATA[{media_id}]]></MediaId>
</Image>
</xml>"""
return xml
@staticmethod
def voice(msg: Dict[str, Any], media_id: str) -> str:
"""
构造语音回复
:param msg: 原始接收消息字典
:param media_id: 微信服务器返回的语音媒体 ID
"""
to_user = msg.get('FromUserName')
from_user = msg.get('ToUserName')
create_time = int(time.time())
xml = ReplyBuilder._get_base_xml(to_user, from_user, create_time, 'voice')
xml += f"""<Voice>
<MediaId><![CDATA[{media_id}]]></MediaId>
</Voice>
</xml>"""
return xml
@staticmethod
def video(msg: Dict[str, Any], media_id: str, title: str = "", description: str = "") -> str:
"""
构造视频回复
:param msg: 原始接收消息字典
:param media_id: 微信服务器返回的视频媒体 ID
:param title: 视频标题 (可选建议填写)
:param description: 视频描述 (可选)
"""
to_user = msg.get('FromUserName')
from_user = msg.get('ToUserName')
create_time = int(time.time())
xml = ReplyBuilder._get_base_xml(to_user, from_user, create_time, 'video')
xml += f"""<Video>
<MediaId><![CDATA[{media_id}]]></MediaId>
<Title><![CDATA[{title}]]></Title>
<Description><![CDATA[{description}]]></Description>
</Video>
</xml>"""
return xml
@staticmethod
def music(msg: Dict[str, Any], title: str, description: str, music_url: str, hq_music_url: str, thumb_media_id: str) -> str:
"""
构造音乐回复
:param msg: 原始接收消息字典
:param title: 音乐标题
:param description: 音乐描述
:param music_url: 音乐播放链接 (普通质量http/https)
:param hq_music_url: 音乐播放链接 (高质量http/https)
:param thumb_media_id: 封面图的媒体 ID (必须已上传到微信)
"""
to_user = msg.get('FromUserName')
from_user = msg.get('ToUserName')
create_time = int(time.time())
xml = ReplyBuilder._get_base_xml(to_user, from_user, create_time, 'music')
xml += f"""<Music>
<Title><![CDATA[{title}]]></Title>
<Description><![CDATA[{description}]]></Description>
<MusicUrl><![CDATA[{music_url}]]></MusicUrl>
<HQMusicUrl><![CDATA[{hq_music_url}]]></HQMusicUrl>
<ThumbMediaId><![CDATA[{thumb_media_id}]]></ThumbMediaId>
</Music>
</xml>"""
return xml
@staticmethod
def news(msg: Dict[str, Any], articles: List[Dict[str, str]]) -> str:
"""
构造图文消息回复 (支持 1-8 )
:param msg: 原始接收消息字典
:param articles: 文章列表每项包含 title, description, image, url
"""
to_user = msg.get('FromUserName')
from_user = msg.get('ToUserName')
create_time = int(time.time())
if not articles:
# 如果没有文章,返回空字符串或默认文本,避免生成非法 XML
return ""
count = len(articles)
if count > 8:
count = 8
articles = articles[:8]
xml = ReplyBuilder._get_base_xml(to_user, from_user, create_time, 'news')
xml += f"<ArticleCount>{count}</ArticleCount>\n<Articles>\n"
for item in articles:
title = item.get('title', '无标题')
desc = item.get('description', '')
img_url = item.get('image', '')
link_url = item.get('url', '#')
xml += f"""<item>
<Title><![CDATA[{title}]]></Title>
<Description><![CDATA[{desc}]]></Description>
<PicUrl><![CDATA[{img_url}]]></PicUrl>
<Url><![CDATA[{link_url}]]></Url>
</item>\n"""
xml += "</Articles>\n</xml>"
return xml
@staticmethod
def single_article(msg: Dict[str, Any], title: str, description: str, image: str, url: str) -> str:
"""
快捷方法构造单篇图文消息
"""
article = {
"title": title,
"description": description,
"image": image,
"url": url
}
return ReplyBuilder.news(msg, [article])
@staticmethod
def transfer_customer_service(msg: Dict[str, Any]) -> str:
"""
构造转发客服消息指令
用途当机器人无法回答时将此消息返回给微信用户消息会进入客服队列
由人工客服或多客服系统接管
:param msg: 原始接收消息字典
"""
to_user = msg.get('FromUserName')
from_user = msg.get('ToUserName')
create_time = int(time.time())
# 这种类型的回复没有 Content 或其他节点,只有 MsgType
xml = f"""<xml>
<ToUserName><![CDATA[{to_user}]]></ToUserName>
<FromUserName><![CDATA[{from_user}]]></FromUserName>
<CreateTime>{create_time}</CreateTime>
<MsgType><![CDATA[transfer_customer_service]]></MsgType>
</xml>"""
return xml