191 lines
7.5 KiB
Python
191 lines
7.5 KiB
Python
import asyncio
|
||
import os
|
||
import time
|
||
from functools import partial
|
||
from aiohttp import web, ClientSession, FormData, ClientTimeout
|
||
from ahserver.configuredServer import add_cleanupctx
|
||
from ahserver.serverenv import ServerEnv
|
||
from appPublic.dictObject import DictObject
|
||
from appPublic.log import debug, exception, error
|
||
from appPublic.zmqapi import zmq_subcribe, zmq_publish
|
||
from appPublic.uniqueID import getID
|
||
from appPublic.jsonConfig import getConfig
|
||
|
||
import asyncio
|
||
import aiohttp
|
||
import os
|
||
import json
|
||
import time
|
||
import logging
|
||
from typing import Optional, Dict, Any, Union
|
||
from pathlib import Path
|
||
|
||
class WeChatMediaManager:
|
||
def __init__(self, app_id: str, app_secret: str):
|
||
self.app_id = app_id
|
||
self.app_secret = app_secret
|
||
self._access_token: Optional[str] = None
|
||
self._token_expires_at: float = 0
|
||
self._token_lock = asyncio.Lock()
|
||
|
||
# ================= 1. Token 管理 (基础) =================
|
||
async def get_access_token(self) -> str:
|
||
"""
|
||
获取 access_token,带内存缓存和锁机制,防止高并发下重复刷新
|
||
"""
|
||
# 如果 token 有效时间大于 200 秒,直接返回缓存
|
||
if self._access_token and time.time() < self._token_expires_at - 20:
|
||
return self._access_token
|
||
|
||
async with self._token_lock:
|
||
# 双重检查锁
|
||
if self._access_token and time.time() < self._token_expires_at - 20:
|
||
return self._access_token
|
||
|
||
url = "https://api.weixin.qq.com/cgi-bin/token"
|
||
params = {
|
||
"grant_type": "client_credential",
|
||
"appid": self.app_id,
|
||
"secret": self.app_secret
|
||
}
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(url, params=params) as resp:
|
||
data = await resp.json()
|
||
|
||
if "access_token" in data:
|
||
self._access_token = data["access_token"]
|
||
expires_in = data.get("expires_in", 7200)
|
||
self._token_expires_at = time.time() + expires_in
|
||
debug("Access Token 刷新成功")
|
||
return self._access_token
|
||
else:
|
||
error(f"获取 Token 失败: {data}")
|
||
raise Exception(f"WeChat API Error: {data}")
|
||
|
||
# ================= 2. 上传媒体文件 =================
|
||
async def upload_media(
|
||
self,
|
||
file_path: Union[str, Path],
|
||
media_type: str,
|
||
is_permanent: bool = False,
|
||
title: Optional[str] = None,
|
||
description: Optional[str] = None
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
上传媒体文件到微信服务器
|
||
|
||
:param file_path: 本地文件路径
|
||
:param media_type: 'image', 'voice', 'video', 'thumb'
|
||
:param is_permanent: False=临时素材(3天), True=永久素材
|
||
:param title/description: 仅当上传永久视频时需要
|
||
"""
|
||
if not os.path.exists(file_path):
|
||
raise FileNotFoundError(f"文件不存在: {file_path}")
|
||
|
||
token = await self.get_access_token()
|
||
|
||
# 区分临时和永久素材接口
|
||
if is_permanent:
|
||
url = f"https://api.weixin.qq.com/cgi-bin/material/add_material?access_token={token}&type={media_type}"
|
||
else:
|
||
url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={token}&type={media_type}"
|
||
|
||
# 构建 multipart/form-data
|
||
form = aiohttp.FormData()
|
||
with open(file_path, 'rb') as f:
|
||
# 微信要求的字段名必须是 'media'
|
||
form.add_field('media', f, filename=os.path.basename(file_path))
|
||
|
||
# 永久视频需要额外的 description 字段 (JSON 格式)
|
||
if is_permanent and media_type == 'video':
|
||
if not title:
|
||
title = os.path.basename(file_path)
|
||
desc_json = json.dumps({
|
||
"title": title,
|
||
"introduction": description or ""
|
||
}, ensure_ascii=False)
|
||
form.add_field('description', desc_json)
|
||
|
||
timeout = aiohttp.ClientTimeout(total=120) # 大文件上传需要更长超时
|
||
|
||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||
async with session.post(url, data=form) as resp:
|
||
result = await resp.json()
|
||
|
||
if resp.status == 200 and ('media_id' in result or 'url' in result):
|
||
debug(f"上传成功 ({'永久' if is_permanent else '临时'}): {result.get('media_id')}")
|
||
return result
|
||
else:
|
||
error(f"上传失败: {result}")
|
||
raise Exception(f"WeChat Upload Error: {result}")
|
||
|
||
# ================= 3. 下载媒体文件 =================
|
||
async def download_media(
|
||
self,
|
||
media_id: str,
|
||
save_path: Union[str, Path],
|
||
is_permanent: bool = False
|
||
) -> str:
|
||
"""
|
||
根据 media_id 下载媒体文件到本地
|
||
|
||
:param media_id: 微信媒体 ID
|
||
:param save_path: 保存到的本地路径
|
||
:param is_permanent: False=临时素材下载, True=永久素材下载
|
||
:return: 保存的文件路径
|
||
"""
|
||
token = await self.get_access_token()
|
||
|
||
# 区分临时和永久素材下载接口
|
||
if is_permanent:
|
||
url = f"https://api.weixin.qq.com/cgi-bin/material/get_material?access_token={token}"
|
||
# 永久素材下载需要 POST JSON body
|
||
post_data = {"media_id": media_id}
|
||
method = "POST"
|
||
else:
|
||
url = f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={token}&media_id={media_id}"
|
||
post_data = None
|
||
method = "GET"
|
||
|
||
timeout = aiohttp.ClientTimeout(total=120)
|
||
|
||
# 确保目录存在
|
||
os.makedirs(os.path.dirname(save_path) or '.', exist_ok=True)
|
||
|
||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||
if method == "GET":
|
||
async with session.get(url) as resp:
|
||
return await self._handle_download_response(resp, save_path, media_id)
|
||
else:
|
||
async with session.post(url, json=post_data) as resp:
|
||
return await self._handle_download_response(resp, save_path, media_id)
|
||
|
||
async def _handle_download_response(self, resp: aiohttp.ClientResponse, save_path: str, media_id: str) -> str:
|
||
"""处理下载响应流"""
|
||
content_type = resp.headers.get('Content-Type', '')
|
||
|
||
# 微信出错时通常返回 JSON
|
||
if 'application/json' in content_type:
|
||
error_data = await resp.json()
|
||
error(f"下载失败 (JSON 错误): {error_data}")
|
||
raise Exception(f"WeChat Download Error: {error_data}")
|
||
|
||
# 流式写入文件,避免大文件撑爆内存
|
||
with open(save_path, 'wb') as f:
|
||
async for chunk in resp.content.iter_chunked(8192): # 每次 8KB
|
||
f.write(chunk)
|
||
|
||
debug(f"文件下载成功: {save_path}")
|
||
return save_path
|
||
|
||
# ================= 4. 获取临时素材 URL (仅临时素材有效) =================
|
||
async def get_media_url(self, media_id: str) -> str:
|
||
"""
|
||
获取临时素材的下载 URL (有效期3天)
|
||
注意:永久素材没有直接的 URL,必须下载
|
||
"""
|
||
token = await self.get_access_token()
|
||
return f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={token}&media_id={media_id}"
|
||
|