2026-03-03 16:37:44 +08:00

197 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import os
import time
from functools import partial
from aiohttp import web, ClientSession, FormData, ClientTimeout
from ahserver.configuredServer import add_cleanupctx
from ahserver.serverenv import ServerEnv
from appPublic.dictObject import DictObject
from appPublic.log import debug, exception, error
from appPublic.zmqapi import zmq_subcribe, zmq_publish
from appPublic.uniqueID import getID
from appPublic.jsonConfig import getConfig
import asyncio
import aiohttp
import os
import json
import time
import logging
from typing import Optional, Dict, Any, Union
from pathlib import Path
class WeChatMediaManager:
def __init__(self, app_id: str, app_secret: str):
self.app_id = app_id
self.app_secret = app_secret
self._access_token: Optional[str] = None
self._token_expires_at: float = 0
self._token_lock = asyncio.Lock()
# ================= 1. Token 管理 (基础) =================
async def get_access_token(self) -> str:
"""
获取 access_token带内存缓存和锁机制防止高并发下重复刷新
"""
# 如果 token 有效时间大于 200 秒,直接返回缓存
if self._access_token and time.time() < self._token_expires_at - 20:
return self._access_token
async with self._token_lock:
# 双重检查锁
if self._access_token and time.time() < self._token_expires_at - 20:
return self._access_token
url = "https://api.weixin.qq.com/cgi-bin/token"
params = {
"grant_type": "client_credential",
"appid": self.app_id,
"secret": self.app_secret
}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as resp:
data = await resp.json()
if "access_token" in data:
self._access_token = data["access_token"]
expires_in = data.get("expires_in", 7200)
self._token_expires_at = time.time() + expires_in
debug("Access Token 刷新成功")
return self._access_token
else:
error(f"获取 Token 失败: {data}")
raise Exception(f"WeChat API Error: {data}")
# ================= 2. 上传媒体文件 =================
async def upload_media(
self,
file_path: Union[str, Path],
media_type: str,
is_permanent: bool = False,
title: Optional[str] = None,
description: Optional[str] = None
) -> Dict[str, Any]:
"""
上传媒体文件到微信服务器
:param file_path: 本地文件路径
:param media_type: 'image', 'voice', 'video', 'thumb'
:param is_permanent: False=临时素材(3天), True=永久素材
:param title/description: 仅当上传永久视频时需要
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
token = await self.get_access_token()
# 区分临时和永久素材接口
if is_permanent:
url = f"https://api.weixin.qq.com/cgi-bin/material/add_material?access_token={token}&type={media_type}"
else:
url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={token}&type={media_type}"
form = aiohttp.FormData()
f = open(file_path, 'rb') # 不用 with
try:
form.add_field(
'media',
f,
filename=os.path.basename(file_path),
content_type='application/octet-stream'
)
if is_permanent and media_type == 'video':
if not title:
title = os.path.basename(file_path)
desc_json = json.dumps({
"title": title,
"introduction": description or ""
}, ensure_ascii=False)
form.add_field('description', desc_json)
timeout = aiohttp.ClientTimeout(total=120)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(url, data=form) as resp:
text = await resp.text()
if resp.status == 200:
result = json.loads(text)
return result['media_id']
else:
raise Exception(f"WeChat Upload Error: {text}")
finally:
f.close() # 手动关闭
# ================= 3. 下载媒体文件 =================
async def download_media(
self,
media_id: str,
save_path: Union[str, Path],
is_permanent: bool = False
) -> str:
"""
根据 media_id 下载媒体文件到本地
:param media_id: 微信媒体 ID
:param save_path: 保存到的本地路径
:param is_permanent: False=临时素材下载, True=永久素材下载
:return: 保存的文件路径
"""
token = await self.get_access_token()
# 区分临时和永久素材下载接口
if is_permanent:
url = f"https://api.weixin.qq.com/cgi-bin/material/get_material?access_token={token}"
# 永久素材下载需要 POST JSON body
post_data = {"media_id": media_id}
method = "POST"
else:
url = f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={token}&media_id={media_id}"
post_data = None
method = "GET"
timeout = aiohttp.ClientTimeout(total=120)
# 确保目录存在
os.makedirs(os.path.dirname(save_path) or '.', exist_ok=True)
async with aiohttp.ClientSession(timeout=timeout) as session:
if method == "GET":
async with session.get(url) as resp:
return await self._handle_download_response(resp, save_path, media_id)
else:
async with session.post(url, json=post_data) as resp:
return await self._handle_download_response(resp, save_path, media_id)
async def _handle_download_response(self, resp: aiohttp.ClientResponse, save_path: str, media_id: str) -> str:
"""处理下载响应流"""
content_type = resp.headers.get('Content-Type', '')
# 微信出错时通常返回 JSON
if 'application/json' in content_type:
error_data = await resp.json()
error(f"下载失败 (JSON 错误): {error_data}")
raise Exception(f"WeChat Download Error: {error_data}")
# 流式写入文件,避免大文件撑爆内存
with open(save_path, 'wb') as f:
async for chunk in resp.content.iter_chunked(8192): # 每次 8KB
f.write(chunk)
debug(f"文件下载成功: {save_path}")
return save_path
# ================= 4. 获取临时素材 URL (仅临时素材有效) =================
async def get_media_url(self, media_id: str) -> str:
"""
获取临时素材的下载 URL (有效期3天)
注意:永久素材没有直接的 URL必须下载
"""
token = await self.get_access_token()
return f"https://api.weixin.qq.com/cgi-bin/media/get?access_token={token}&media_id={media_id}"