import asyncio import os import time from functools import partial from aiohttp import web, ClientSession, FormData, ClientTimeout from ahserver.configuredServer import add_cleanupctx from ahserver.serverenv import ServerEnv from appPublic.dictObject import DictObject from appPublic.log import debug, exception, error from appPublic.zmqapi import zmq_subcribe, zmq_publish from appPublic.uniqueID import getID from appPublic.jsonConfig import getConfig import asyncio import aiohttp import os import json import time import logging from typing import Optional, Dict, Any, Union from pathlib import Path class WeChatMediaManager: def __init__(self, app_id: str, app_secret: str): self.app_id = app_id self.app_secret = app_secret self._access_token: Optional[str] = None self._token_expires_at: float = 0 self._token_lock = asyncio.Lock() # ================= 1. Token 管理 (基础) ================= async def get_access_token(self) -> str: """ 获取 access_token,带内存缓存和锁机制,防止高并发下重复刷新 """ # 如果 token 有效时间大于 200 秒,直接返回缓存 if self._access_token and time.time() < self._token_expires_at - 20: return self._access_token async with self._token_lock: # 双重检查锁 if self._access_token and time.time() < self._token_expires_at - 20: return self._access_token url = "https://api.weixin.qq.com/cgi-bin/token" params = { "grant_type": "client_credential", "appid": self.app_id, "secret": self.app_secret } async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as resp: data = await resp.json() if "access_token" in data: self._access_token = data["access_token"] expires_in = data.get("expires_in", 7200) self._token_expires_at = time.time() + expires_in debug("Access Token 刷新成功") return self._access_token else: error(f"获取 Token 失败: {data}") raise Exception(f"WeChat API Error: {data}") # ================= 2. 上传媒体文件 ================= async def upload_media( self, file_path: Union[str, Path], media_type: str, is_permanent: bool = False, title: Optional[str] = None, description: Optional[str] = None ) -> Dict[str, Any]: """ 上传媒体文件到微信服务器 :param file_path: 本地文件路径 :param media_type: 'image', 'voice', 'video', 'thumb' :param is_permanent: False=临时素材(3天), True=永久素材 :param title/description: 仅当上传永久视频时需要 """ if not os.path.exists(file_path): raise FileNotFoundError(f"文件不存在: {file_path}") token = await self.get_access_token() # 区分临时和永久素材接口 if is_permanent: url = f"https://api.weixin.qq.com/cgi-bin/material/add_material?access_token={token}&type={media_type}" else: url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={token}&type={media_type}" # 构建 multipart/form-data form = aiohttp.FormData() with open(file_path, 'rb') as f: # 微信要求的字段名必须是 'media' form.add_field('media', f, filename=os.path.basename(file_path)) # 永久视频需要额外的 description 字段 (JSON 格式) if is_permanent and media_type == 'video': if not title: title = os.path.basename(file_path) desc_json = json.dumps({ "title": title, "introduction": description or "" }, ensure_ascii=False) form.add_field('description', desc_json) timeout = aiohttp.ClientTimeout(total=120) # 大文件上传需要更长超时 async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post(url, data=form) as resp: result = await resp.json() if resp.status == 200 and ('media_id' in result or 'url' in result): debug(f"上传成功 ({'永久' if is_permanent else '临时'}): {result.get('media_id')}") return result else: error(f"上传失败: {result}") raise Exception(f"WeChat Upload Error: {result}") # ================= 3. 下载媒体文件 ================= async def download_media( self, media_id: str, save_path: Union[str, Path], is_permanent: bool = False ) -> str: """ 根据 media_id 下载媒体文件到本地 :param media_id: 微信媒体 ID :param save_path: 保存到的本地路径 :param is_permanent: False=临时素材下载, True=永久素材下载 :return: 保存的文件路径 """ token = await self.get_access_token() # 区分临时和永久素材下载接口 if is_permanent: url = f"https://api.weixin.qq.com/cgi-bin/material/get_material?access_token={token}" # 永久素材下载需要 POST JSON body post_data = {"media_id": media_id} method = "POST" else: url = f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={token}&media_id={media_id}" post_data = None method = "GET" timeout = aiohttp.ClientTimeout(total=120) # 确保目录存在 os.makedirs(os.path.dirname(save_path) or '.', exist_ok=True) async with aiohttp.ClientSession(timeout=timeout) as session: if method == "GET": async with session.get(url) as resp: return await self._handle_download_response(resp, save_path, media_id) else: async with session.post(url, json=post_data) as resp: return await self._handle_download_response(resp, save_path, media_id) async def _handle_download_response(self, resp: aiohttp.ClientResponse, save_path: str, media_id: str) -> str: """处理下载响应流""" content_type = resp.headers.get('Content-Type', '') # 微信出错时通常返回 JSON if 'application/json' in content_type: error_data = await resp.json() error(f"下载失败 (JSON 错误): {error_data}") raise Exception(f"WeChat Download Error: {error_data}") # 流式写入文件,避免大文件撑爆内存 with open(save_path, 'wb') as f: async for chunk in resp.content.iter_chunked(8192): # 每次 8KB f.write(chunk) debug(f"文件下载成功: {save_path}") return save_path # ================= 4. 获取临时素材 URL (仅临时素材有效) ================= async def get_media_url(self, media_id: str) -> str: """ 获取临时素材的下载 URL (有效期3天) 注意:永久素材没有直接的 URL,必须下载 """ token = await self.get_access_token() return f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={token}&media_id={media_id}"