diff --git a/woa/media.py b/woa/media.py index 3953ab5..209a21a 100644 --- a/woa/media.py +++ b/woa/media.py @@ -21,75 +21,75 @@ from typing import Optional, Dict, Any, Union from pathlib import Path class WeChatMediaManager: - def __init__(self, app_id: str, app_secret: str): - self.app_id = app_id - self.app_secret = app_secret - self._access_token: Optional[str] = None - self._token_expires_at: float = 0 - self._token_lock = asyncio.Lock() + def __init__(self, app_id: str, app_secret: str): + self.app_id = app_id + self.app_secret = app_secret + self._access_token: Optional[str] = None + self._token_expires_at: float = 0 + self._token_lock = asyncio.Lock() - # ================= 1. Token 管理 (基础) ================= - async def get_access_token(self) -> str: - """ - 获取 access_token,带内存缓存和锁机制,防止高并发下重复刷新 - """ - # 如果 token 有效时间大于 200 秒,直接返回缓存 - if self._access_token and time.time() < self._token_expires_at - 20: - return self._access_token + # ================= 1. Token 管理 (基础) ================= + async def get_access_token(self) -> str: + """ + 获取 access_token,带内存缓存和锁机制,防止高并发下重复刷新 + """ + # 如果 token 有效时间大于 200 秒,直接返回缓存 + if self._access_token and time.time() < self._token_expires_at - 20: + return self._access_token - async with self._token_lock: - # 双重检查锁 - if self._access_token and time.time() < self._token_expires_at - 20: - return self._access_token + async with self._token_lock: + # 双重检查锁 + if self._access_token and time.time() < self._token_expires_at - 20: + return self._access_token - url = "https://api.weixin.qq.com/cgi-bin/token" - params = { - "grant_type": "client_credential", - "appid": self.app_id, - "secret": self.app_secret - } + url = "https://api.weixin.qq.com/cgi-bin/token" + params = { + "grant_type": "client_credential", + "appid": self.app_id, + "secret": self.app_secret + } - async with aiohttp.ClientSession() as session: - async with session.get(url, params=params) as resp: - data = await resp.json() + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params) as resp: + data = await resp.json() - if "access_token" in data: - self._access_token = data["access_token"] - expires_in = data.get("expires_in", 7200) - self._token_expires_at = time.time() + expires_in - debug("Access Token 刷新成功") - return self._access_token - else: - error(f"获取 Token 失败: {data}") - raise Exception(f"WeChat API Error: {data}") + if "access_token" in data: + self._access_token = data["access_token"] + expires_in = data.get("expires_in", 7200) + self._token_expires_at = time.time() + expires_in + debug("Access Token 刷新成功") + return self._access_token + else: + error(f"获取 Token 失败: {data}") + raise Exception(f"WeChat API Error: {data}") - # ================= 2. 上传媒体文件 ================= - async def upload_media( - self, - file_path: Union[str, Path], - media_type: str, - is_permanent: bool = False, - title: Optional[str] = None, - description: Optional[str] = None - ) -> Dict[str, Any]: - """ - 上传媒体文件到微信服务器 + # ================= 2. 上传媒体文件 ================= + async def upload_media( + self, + file_path: Union[str, Path], + media_type: str, + is_permanent: bool = False, + title: Optional[str] = None, + description: Optional[str] = None + ) -> Dict[str, Any]: + """ + 上传媒体文件到微信服务器 - :param file_path: 本地文件路径 - :param media_type: 'image', 'voice', 'video', 'thumb' - :param is_permanent: False=临时素材(3天), True=永久素材 - :param title/description: 仅当上传永久视频时需要 - """ - if not os.path.exists(file_path): - raise FileNotFoundError(f"文件不存在: {file_path}") + :param file_path: 本地文件路径 + :param media_type: 'image', 'voice', 'video', 'thumb' + :param is_permanent: False=临时素材(3天), True=永久素材 + :param title/description: 仅当上传永久视频时需要 + """ + if not os.path.exists(file_path): + raise FileNotFoundError(f"文件不存在: {file_path}") - token = await self.get_access_token() + token = await self.get_access_token() - # 区分临时和永久素材接口 - if is_permanent: - url = f"https://api.weixin.qq.com/cgi-bin/material/add_material?access_token={token}&type={media_type}" - else: - url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={token}&type={media_type}" + # 区分临时和永久素材接口 + if is_permanent: + url = f"https://api.weixin.qq.com/cgi-bin/material/add_material?access_token={token}&type={media_type}" + else: + url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={token}&type={media_type}" form = aiohttp.FormData() f = open(file_path, 'rb') # 不用 with @@ -126,71 +126,71 @@ class WeChatMediaManager: finally: f.close() # 手动关闭 - # ================= 3. 下载媒体文件 ================= - async def download_media( - self, - media_id: str, - save_path: Union[str, Path], - is_permanent: bool = False - ) -> str: - """ - 根据 media_id 下载媒体文件到本地 + # ================= 3. 下载媒体文件 ================= + async def download_media( + self, + media_id: str, + save_path: Union[str, Path], + is_permanent: bool = False + ) -> str: + """ + 根据 media_id 下载媒体文件到本地 - :param media_id: 微信媒体 ID - :param save_path: 保存到的本地路径 - :param is_permanent: False=临时素材下载, True=永久素材下载 - :return: 保存的文件路径 - """ - token = await self.get_access_token() + :param media_id: 微信媒体 ID + :param save_path: 保存到的本地路径 + :param is_permanent: False=临时素材下载, True=永久素材下载 + :return: 保存的文件路径 + """ + token = await self.get_access_token() - # 区分临时和永久素材下载接口 - if is_permanent: - url = f"https://api.weixin.qq.com/cgi-bin/material/get_material?access_token={token}" - # 永久素材下载需要 POST JSON body - post_data = {"media_id": media_id} - method = "POST" - else: - url = f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={token}&media_id={media_id}" - post_data = None - method = "GET" + # 区分临时和永久素材下载接口 + if is_permanent: + url = f"https://api.weixin.qq.com/cgi-bin/material/get_material?access_token={token}" + # 永久素材下载需要 POST JSON body + post_data = {"media_id": media_id} + method = "POST" + else: + url = f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={token}&media_id={media_id}" + post_data = None + method = "GET" - timeout = aiohttp.ClientTimeout(total=120) + timeout = aiohttp.ClientTimeout(total=120) - # 确保目录存在 - os.makedirs(os.path.dirname(save_path) or '.', exist_ok=True) + # 确保目录存在 + os.makedirs(os.path.dirname(save_path) or '.', exist_ok=True) - async with aiohttp.ClientSession(timeout=timeout) as session: - if method == "GET": - async with session.get(url) as resp: - return await self._handle_download_response(resp, save_path, media_id) - else: - async with session.post(url, json=post_data) as resp: - return await self._handle_download_response(resp, save_path, media_id) + async with aiohttp.ClientSession(timeout=timeout) as session: + if method == "GET": + async with session.get(url) as resp: + return await self._handle_download_response(resp, save_path, media_id) + else: + async with session.post(url, json=post_data) as resp: + return await self._handle_download_response(resp, save_path, media_id) - async def _handle_download_response(self, resp: aiohttp.ClientResponse, save_path: str, media_id: str) -> str: - """处理下载响应流""" - content_type = resp.headers.get('Content-Type', '') + async def _handle_download_response(self, resp: aiohttp.ClientResponse, save_path: str, media_id: str) -> str: + """处理下载响应流""" + content_type = resp.headers.get('Content-Type', '') - # 微信出错时通常返回 JSON - if 'application/json' in content_type: - error_data = await resp.json() - error(f"下载失败 (JSON 错误): {error_data}") - raise Exception(f"WeChat Download Error: {error_data}") + # 微信出错时通常返回 JSON + if 'application/json' in content_type: + error_data = await resp.json() + error(f"下载失败 (JSON 错误): {error_data}") + raise Exception(f"WeChat Download Error: {error_data}") - # 流式写入文件,避免大文件撑爆内存 - with open(save_path, 'wb') as f: - async for chunk in resp.content.iter_chunked(8192): # 每次 8KB - f.write(chunk) + # 流式写入文件,避免大文件撑爆内存 + with open(save_path, 'wb') as f: + async for chunk in resp.content.iter_chunked(8192): # 每次 8KB + f.write(chunk) - debug(f"文件下载成功: {save_path}") - return save_path + debug(f"文件下载成功: {save_path}") + return save_path - # ================= 4. 获取临时素材 URL (仅临时素材有效) ================= - async def get_media_url(self, media_id: str) -> str: - """ - 获取临时素材的下载 URL (有效期3天) - 注意:永久素材没有直接的 URL,必须下载 - """ - token = await self.get_access_token() - return f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={token}&media_id={media_id}" + # ================= 4. 获取临时素材 URL (仅临时素材有效) ================= + async def get_media_url(self, media_id: str) -> str: + """ + 获取临时素材的下载 URL (有效期3天) + 注意:永久素材没有直接的 URL,必须下载 + """ + token = await self.get_access_token() + return f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={token}&media_id={media_id}"