unipay/unipay/providers/wechat.py
2025-12-18 15:51:48 +08:00

262 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding:utf-8 -*-
import base64
import json
import time
import aiohttp
from appPublic.dictObject import DictObject
from typing import Dict, Any
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from ..core import Gateway
WECHAT_API_BASE = "https://api.mch.weixin.qq.com"
class WechatGateway(Gateway):
"""
微信支付网关(适配统一 Gateway 接口)
"""
def __init__(self,
mchid: str,
appid: str,
cert_serial_no: str,
private_key_pem: str,
api_v3_key: str):
self.mchid = mchid
self.appid = appid
self.cert_serial_no = cert_serial_no
self.api_v3_key = api_v3_key.encode()
# 加载私钥
self._private_key = serialization.load_pem_private_key(
private_key_pem,
password=None,
)
# 缓存微信平台证书
self._platform_certs = {} # serial_no -> public_key
# -----------------------------------------------------
# 工具方法:微信签名 V3
# -----------------------------------------------------
def _sign(self, method: str, url_path: str, body: str) -> str:
timestamp = str(int(time.time()))
nonce_str = base64.urlsafe_b64encode(
json.dumps(time.time()).encode()
).decode()[:16]
message = f"{method}\n{url_path}\n{timestamp}\n{nonce_str}\n{body}\n"
signature = self._private_key.sign(
message.encode(),
padding.PKCS1v15(),
hashes.SHA256()
)
signature_b64 = base64.b64encode(signature).decode()
token = (
f'mchid="{self.mchid}",'
f'nonce_str="{nonce_str}",'
f'signature="{signature_b64}",'
f'timestamp="{timestamp}",'
f'serial_no="{self.cert_serial_no}"'
)
return token
# -----------------------------------------------------
# 工具:下载平台证书
# -----------------------------------------------------
async def _ensure_platform_cert(self, session: aiohttp.ClientSession):
url_path = "/v3/certificates"
url = WECHAT_API_BASE + url_path
auth = self._sign("GET", url_path, "")
async with session.get(url, headers={
"Authorization": f"WECHATPAY2-SHA256-RSA2048 {auth}",
"Accept": "application/json"
}) as resp:
data = await resp.json()
for cert in data.get("data", []):
sn = cert["serial_no"]
encrypt_info = cert["encrypt_certificate"]
# 解密平台证书
aesgcm = AESGCM(self.api_v3_key)
pub_pem = aesgcm.decrypt(
nonce=encrypt_info["nonce"].encode(),
data=base64.b64decode(encrypt_info["ciphertext"]),
associated_data=encrypt_info["associated_data"].encode()
).decode()
pub_key = serialization.load_pem_public_key(pub_pem.encode())
self._platform_certs[sn] = pub_key
# -----------------------------------------------------
# 工具:验签回调
# -----------------------------------------------------
async def _verify_callback(self, headers: Dict[str, str], body: str):
serial = headers.get("Wechatpay-Serial")
timestamp = headers.get("Wechatpay-Timestamp")
nonce = headers.get("Wechatpay-Nonce")
signature = headers.get("Wechatpay-Signature")
if serial not in self._platform_certs:
# 自动拉取平台证书
async with aiohttp.ClientSession() as session:
await self._ensure_platform_cert(session)
pub_key = self._platform_certs.get(serial)
if pub_key is None:
raise ValueError("Platform cert not found")
message = f"{timestamp}\n{nonce}\n{body}\n".encode()
try:
pub_key.verify(
base64.b64decode(signature),
message,
padding.PKCS1v15(),
hashes.SHA256()
)
except Exception:
raise ValueError("Invalid callback signature")
# -----------------------------------------------------
# 工具:解密 resource.ciphertext
# -----------------------------------------------------
def _decrypt_resource(self, resource: Dict[str, Any]) -> Dict[str, Any]:
aesgcm = AESGCM(self.api_v3_key)
plaintext = aesgcm.decrypt(
nonce=resource["nonce"].encode(),
data=base64.b64decode(resource["ciphertext"]),
associated_data=resource["associated_data"].encode()
).decode()
return json.loads(plaintext)
# -----------------------------------------------------
# 实现统一接口
# -----------------------------------------------------
async def create_payment(self, payload: Dict[str, Any]) -> str:
"""
H5 下单(默认 H5可改为 jsapi/native
"""
body = {
"appid": self.appid,
"mchid": self.mchid,
"description": payload["subject"],
"out_trade_no": payload["out_trade_no"],
"notify_url": payload["notify_url"],
"amount": {
"total": int(payload["amount"]),
"currency": payload.get("currency", "CNY")
},
"scene_info": {
"payer_client_ip": payload.get("client_ip", "127.0.0.1"),
"h5_info": {"type": "Wap"}
}
}
body_str = json.dumps(body, ensure_ascii=False)
url_path = "/v3/pay/transactions/h5"
url = WECHAT_API_BASE + url_path
auth = self._sign("POST", url_path, body_str)
async with aiohttp.ClientSession() as session:
async with session.post(url, data=body_str, headers={
"Authorization": f"WECHATPAY2-SHA256-RSA2048 {auth}",
"Content-Type": "application/json"
}) as resp:
ret = await resp.json()
return ret.get("h5_url")
return None
# -----------------------------------------------------
async def refund(self, payload: Dict[str, Any]) -> Dict[str, Any]:
body = {
"out_trade_no": payload["out_trade_no"],
"out_refund_no": payload["out_refund_no"],
"amount": {
"refund": int(payload["amount"]),
"total": int(payload["total_amount"]),
"currency": payload.get("currency", "CNY")
},
"reason": payload.get("reason")
}
body_str = json.dumps(body, ensure_ascii=False)
url_path = "/v3/refund/domestic/refunds"
url = WECHAT_API_BASE + url_path
auth = self._sign("POST", url_path, body_str)
async with aiohttp.ClientSession() as session:
async with session.post(url, data=body_str, headers={
"Authorization": f"WECHATPAY2-SHA256-RSA2048 {auth}",
"Content-Type": "application/json"
}) as resp:
ret = await resp.json()
return {
"provider": "wechat",
"data": ret
}
# -----------------------------------------------------
async def query(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
订单查询或退款查询
支持:
/v3/pay/transactions/out-trade-no/{out_trade_no}
/v3/refund/domestic/refunds/{out_refund_no}
"""
if "out_trade_no" in payload:
url_path = f"/v3/pay/transactions/out-trade-no/{payload['out_trade_no']}?mchid={self.mchid}"
else:
url_path = f"/v3/refund/domestic/refunds/{payload['out_refund_no']}"
url = WECHAT_API_BASE + url_path
auth = self._sign("GET", url_path, "")
async with aiohttp.ClientSession() as session:
async with session.get(url, headers={
"Authorization": f"WECHATPAY2-SHA256-RSA2048 {auth}",
"Accept": "application/json"
}) as resp:
ret = await resp.json()
return {
"provider": "wechat",
"data": ret
}
# -----------------------------------------------------
async def handle_notify(self, request) -> Dict[str, Any]:
"""
解密/验证回调,返回标准 dict
out_trade_no
transaction_id
trade_state
payer info
"""
headers = request.headers
body = await request.read()
body = body.decode('utf-8')
await self._verify_callback(headers, body)
payload = json.loads(body)
resource = payload["resource"]
decrypted = self._decrypt_resource(resource)
# 返回标准结构
ret = {
"provider": "wechat",
"params": decrypted
}
return DictObject(**ret)