unipay/unipay/providers/alipay.py
2025-12-17 18:23:20 +08:00

218 lines
5.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# providers/alipay.py
import json
import time
import base64
import hashlib
import urllib.parse
from typing import Any, Dict, Optional
from appPublic.log import debug, exception
import aiohttp
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.hazmat.primitives import serialization
from ..core import Gateway, GatewayError
from ..utils import safe_json_dumps
ALIPAY_GATEWAY = "https://openapi.alipay.com/gateway.do"
class AlipayGateway(Gateway):
"""
生产级支付宝接口H5 支付 / 退款 / 查询 / 回调验签)
"""
def __init__(
self,
app_id: str,
app_private_key_pem: str,
alipay_public_key_pem: str,
):
self.app_id = app_id
# -------- load keys -------
self._private_key = serialization.load_pem_private_key(
app_private_key_pem, password=None
)
self._alipay_public_key = serialization.load_pem_public_key(
alipay_public_key_pem
)
self.session = None
def setup_session(self):
if self.session:
return
self.session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=20))
# ==============================================================================
# 工具函数(签名 / 验签)
# ==============================================================================
def _sign(self, unsigned_str: str) -> str:
"""
RSA2SHA256签名
"""
signature = self._private_key.sign(
unsigned_str.encode(),
padding.PKCS1v15(),
hashes.SHA256(),
)
return base64.b64encode(signature).decode()
def _verify(self, data: str, sign: str) -> bool:
"""
验证支付宝回调签名
"""
try:
self._alipay_public_key.verify(
base64.b64decode(sign),
data.encode(),
padding.PKCS1v15(),
hashes.SHA256()
)
return True
except Exception:
return False
def _build_sign_content(self, params: Dict[str, str]) -> str:
"""
生成 "k=v&k2=v2" 的签名串
"""
sorted_items = sorted((k, v) for k, v in params.items() if v is not None)
return "&".join(f"{k}={v}" for k, v in sorted_items)
# ==============================================================================
# 支付 - H5 页面跳转
# ==============================================================================
async def create_payment(self, payload: Dict[str, Any]) -> str:
"""
返回一个可以在 H5 里直接重定向的支付宝支付 URL
"""
self.setup_session()
debug(f'{payload=}')
product_code = 'FAST_INSTANT_TRADE_PAY'
if payload._is_mobile:
product_code = 'QUICK_WAP_WAY'
biz_content = {
"out_trade_no": payload["out_trade_no"],
"total_amount": payload["amount"],
"subject": payload["payment_name"],
"product_code": product_code
}
method='alipay.trade.page.pay'
if payload._is_mobile:
method = 'alipay.trade.wap.pay'
params = {
"app_id": self.app_id,
"method": method,
"format": "JSON",
"charset": "utf-8",
"sign_type": "RSA2",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"version": "1.0",
"notify_url": payload["notify_url"],
"biz_content": json.dumps(biz_content,
ensure_ascii=False,
separators=(",", ":"))
}
# 生成签名
unsigned_str = self._build_sign_content(params)
sign = self._sign(unsigned_str)
params["sign"] = sign
debug(f'{params=}')
query_str = urllib.parse.urlencode(params)
return f"{ALIPAY_GATEWAY}?{query_str}"
# ==============================================================================
# 查询订单
# ==============================================================================
async def query(self, out_trade_no: str) -> Dict[str, Any]:
self.setup_session()
biz_content = {
"out_trade_no": out_trade_no,
}
params = {
"app_id": self.app_id,
"method": "alipay.trade.query",
"format": "JSON",
"charset": "utf-8",
"sign_type": "RSA2",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"version": "1.0",
"biz_content": json.dumps(biz_content, separators=(",", ":"))
}
# 签名
unsigned_str = self._build_sign_content(params)
params["sign"] = self._sign(unsigned_str)
async with self.session.post(ALIPAY_GATEWAY, data=params) as resp:
data = await resp.json()
return data
# ==============================================================================
# 退款
# ==============================================================================
async def refund(self, *, out_trade_no: str, refund_amount: str, out_request_no: str) -> Dict[str, Any]:
"""
out_request_no 必须全局唯一(一个退款请求一个编号)
"""
self.setup_session()
biz_content = {
"out_trade_no": out_trade_no,
"refund_amount": refund_amount,
"out_request_no": out_request_no
}
params = {
"app_id": self.app_id,
"method": "alipay.trade.refund",
"format": "JSON",
"charset": "utf-8",
"sign_type": "RSA2",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"version": "1.0",
"biz_content": json.dumps(biz_content, separators=(",", ":"))
}
unsigned_str = self._build_sign_content(params)
params["sign"] = self._sign(unsigned_str)
async with self.session.post(ALIPAY_GATEWAY, data=params) as resp:
return await resp.json()
# ==============================================================================
# 回调 / 异步通知(验签)
# ==============================================================================
async def handle_notify(self, request) -> Dict[str, Any]:
"""
支付宝异步通知验签
"""
self.setup_session()
form = await request.post()
params = dict(form)
sign = params.pop("sign", None)
sign_type = params.pop("sign_type", None)
unsigned_str = self._build_sign_content(params)
if not sign:
return {"verified": False, "msg": "no sign"}
ok = self._verify(unsigned_str, sign)
return {
"verified": ok,
"provider": "alipay",
"data": params,
}