This commit is contained in:
yumoqing 2026-03-03 16:32:18 +08:00
parent 54b41c7160
commit ba211de526

View File

@ -21,75 +21,75 @@ from typing import Optional, Dict, Any, Union
from pathlib import Path
class WeChatMediaManager:
def __init__(self, app_id: str, app_secret: str):
self.app_id = app_id
self.app_secret = app_secret
self._access_token: Optional[str] = None
self._token_expires_at: float = 0
self._token_lock = asyncio.Lock()
def __init__(self, app_id: str, app_secret: str):
self.app_id = app_id
self.app_secret = app_secret
self._access_token: Optional[str] = None
self._token_expires_at: float = 0
self._token_lock = asyncio.Lock()
# ================= 1. Token 管理 (基础) =================
async def get_access_token(self) -> str:
"""
获取 access_token带内存缓存和锁机制防止高并发下重复刷新
"""
# 如果 token 有效时间大于 200 秒,直接返回缓存
if self._access_token and time.time() < self._token_expires_at - 20:
return self._access_token
# ================= 1. Token 管理 (基础) =================
async def get_access_token(self) -> str:
"""
获取 access_token带内存缓存和锁机制防止高并发下重复刷新
"""
# 如果 token 有效时间大于 200 秒,直接返回缓存
if self._access_token and time.time() < self._token_expires_at - 20:
return self._access_token
async with self._token_lock:
# 双重检查锁
if self._access_token and time.time() < self._token_expires_at - 20:
return self._access_token
async with self._token_lock:
# 双重检查锁
if self._access_token and time.time() < self._token_expires_at - 20:
return self._access_token
url = "https://api.weixin.qq.com/cgi-bin/token"
params = {
"grant_type": "client_credential",
"appid": self.app_id,
"secret": self.app_secret
}
url = "https://api.weixin.qq.com/cgi-bin/token"
params = {
"grant_type": "client_credential",
"appid": self.app_id,
"secret": self.app_secret
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as resp:
data = await resp.json()
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as resp:
data = await resp.json()
if "access_token" in data:
self._access_token = data["access_token"]
expires_in = data.get("expires_in", 7200)
self._token_expires_at = time.time() + expires_in
debug("Access Token 刷新成功")
return self._access_token
else:
error(f"获取 Token 失败: {data}")
raise Exception(f"WeChat API Error: {data}")
if "access_token" in data:
self._access_token = data["access_token"]
expires_in = data.get("expires_in", 7200)
self._token_expires_at = time.time() + expires_in
debug("Access Token 刷新成功")
return self._access_token
else:
error(f"获取 Token 失败: {data}")
raise Exception(f"WeChat API Error: {data}")
# ================= 2. 上传媒体文件 =================
async def upload_media(
self,
file_path: Union[str, Path],
media_type: str,
is_permanent: bool = False,
title: Optional[str] = None,
description: Optional[str] = None
) -> Dict[str, Any]:
"""
上传媒体文件到微信服务器
# ================= 2. 上传媒体文件 =================
async def upload_media(
self,
file_path: Union[str, Path],
media_type: str,
is_permanent: bool = False,
title: Optional[str] = None,
description: Optional[str] = None
) -> Dict[str, Any]:
"""
上传媒体文件到微信服务器
:param file_path: 本地文件路径
:param media_type: 'image', 'voice', 'video', 'thumb'
:param is_permanent: False=临时素材(3), True=永久素材
:param title/description: 仅当上传永久视频时需要
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
:param file_path: 本地文件路径
:param media_type: 'image', 'voice', 'video', 'thumb'
:param is_permanent: False=临时素材(3), True=永久素材
:param title/description: 仅当上传永久视频时需要
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
token = await self.get_access_token()
token = await self.get_access_token()
# 区分临时和永久素材接口
if is_permanent:
url = f"https://api.weixin.qq.com/cgi-bin/material/add_material?access_token={token}&type={media_type}"
else:
url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={token}&type={media_type}"
# 区分临时和永久素材接口
if is_permanent:
url = f"https://api.weixin.qq.com/cgi-bin/material/add_material?access_token={token}&type={media_type}"
else:
url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={token}&type={media_type}"
form = aiohttp.FormData()
f = open(file_path, 'rb') # 不用 with
@ -126,71 +126,71 @@ class WeChatMediaManager:
finally:
f.close() # 手动关闭
# ================= 3. 下载媒体文件 =================
async def download_media(
self,
media_id: str,
save_path: Union[str, Path],
is_permanent: bool = False
) -> str:
"""
根据 media_id 下载媒体文件到本地
# ================= 3. 下载媒体文件 =================
async def download_media(
self,
media_id: str,
save_path: Union[str, Path],
is_permanent: bool = False
) -> str:
"""
根据 media_id 下载媒体文件到本地
:param media_id: 微信媒体 ID
:param save_path: 保存到的本地路径
:param is_permanent: False=临时素材下载, True=永久素材下载
:return: 保存的文件路径
"""
token = await self.get_access_token()
:param media_id: 微信媒体 ID
:param save_path: 保存到的本地路径
:param is_permanent: False=临时素材下载, True=永久素材下载
:return: 保存的文件路径
"""
token = await self.get_access_token()
# 区分临时和永久素材下载接口
if is_permanent:
url = f"https://api.weixin.qq.com/cgi-bin/material/get_material?access_token={token}"
# 永久素材下载需要 POST JSON body
post_data = {"media_id": media_id}
method = "POST"
else:
url = f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={token}&media_id={media_id}"
post_data = None
method = "GET"
# 区分临时和永久素材下载接口
if is_permanent:
url = f"https://api.weixin.qq.com/cgi-bin/material/get_material?access_token={token}"
# 永久素材下载需要 POST JSON body
post_data = {"media_id": media_id}
method = "POST"
else:
url = f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={token}&media_id={media_id}"
post_data = None
method = "GET"
timeout = aiohttp.ClientTimeout(total=120)
timeout = aiohttp.ClientTimeout(total=120)
# 确保目录存在
os.makedirs(os.path.dirname(save_path) or '.', exist_ok=True)
# 确保目录存在
os.makedirs(os.path.dirname(save_path) or '.', exist_ok=True)
async with aiohttp.ClientSession(timeout=timeout) as session:
if method == "GET":
async with session.get(url) as resp:
return await self._handle_download_response(resp, save_path, media_id)
else:
async with session.post(url, json=post_data) as resp:
return await self._handle_download_response(resp, save_path, media_id)
async with aiohttp.ClientSession(timeout=timeout) as session:
if method == "GET":
async with session.get(url) as resp:
return await self._handle_download_response(resp, save_path, media_id)
else:
async with session.post(url, json=post_data) as resp:
return await self._handle_download_response(resp, save_path, media_id)
async def _handle_download_response(self, resp: aiohttp.ClientResponse, save_path: str, media_id: str) -> str:
"""处理下载响应流"""
content_type = resp.headers.get('Content-Type', '')
async def _handle_download_response(self, resp: aiohttp.ClientResponse, save_path: str, media_id: str) -> str:
"""处理下载响应流"""
content_type = resp.headers.get('Content-Type', '')
# 微信出错时通常返回 JSON
if 'application/json' in content_type:
error_data = await resp.json()
error(f"下载失败 (JSON 错误): {error_data}")
raise Exception(f"WeChat Download Error: {error_data}")
# 微信出错时通常返回 JSON
if 'application/json' in content_type:
error_data = await resp.json()
error(f"下载失败 (JSON 错误): {error_data}")
raise Exception(f"WeChat Download Error: {error_data}")
# 流式写入文件,避免大文件撑爆内存
with open(save_path, 'wb') as f:
async for chunk in resp.content.iter_chunked(8192): # 每次 8KB
f.write(chunk)
# 流式写入文件,避免大文件撑爆内存
with open(save_path, 'wb') as f:
async for chunk in resp.content.iter_chunked(8192): # 每次 8KB
f.write(chunk)
debug(f"文件下载成功: {save_path}")
return save_path
debug(f"文件下载成功: {save_path}")
return save_path
# ================= 4. 获取临时素材 URL (仅临时素材有效) =================
async def get_media_url(self, media_id: str) -> str:
"""
获取临时素材的下载 URL (有效期3天)
注意永久素材没有直接的 URL必须下载
"""
token = await self.get_access_token()
return f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={token}&media_id={media_id}"
# ================= 4. 获取临时素材 URL (仅临时素材有效) =================
async def get_media_url(self, media_id: str) -> str:
"""
获取临时素材的下载 URL (有效期3天)
注意永久素材没有直接的 URL必须下载
"""
token = await self.get_access_token()
return f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={token}&media_id={media_id}"