This commit is contained in:
yumoqing 2025-12-17 11:16:17 +08:00
parent 1b1dfb7f16
commit cb3eaa2c8c
2 changed files with 183 additions and 183 deletions

View File

@ -19,184 +19,184 @@ ALIPAY_GATEWAY = "https://openapi.alipay.com/gateway.do"
class AlipayGateway(Gateway): class AlipayGateway(Gateway):
""" """
生产级支付宝接口H5 支付 / 退款 / 查询 / 回调验签 生产级支付宝接口H5 支付 / 退款 / 查询 / 回调验签
""" """
def __init__( def __init__(
self, self,
app_id: str, app_id: str,
app_private_key_pem: str, app_private_key_pem: str,
alipay_public_key_pem: str, alipay_public_key_pem: str,
): ):
self.app_id = app_id self.app_id = app_id
# -------- load keys ------- # -------- load keys -------
self._private_key = serialization.load_pem_private_key( self._private_key = serialization.load_pem_private_key(
app_private_key_pem, password=None app_private_key_pem, password=None
) )
self._alipay_public_key = serialization.load_pem_public_key( self._alipay_public_key = serialization.load_pem_public_key(
alipay_public_key_pem alipay_public_key_pem
) )
self.session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=20)) self.session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=20))
# ============================================================================== # ==============================================================================
# 工具函数(签名 / 验签) # 工具函数(签名 / 验签)
# ============================================================================== # ==============================================================================
def _sign(self, unsigned_str: str) -> str: def _sign(self, unsigned_str: str) -> str:
""" """
RSA2SHA256签名 RSA2SHA256签名
""" """
signature = self._private_key.sign( signature = self._private_key.sign(
unsigned_str.encode(), unsigned_str.encode(),
padding.PKCS1v15(), padding.PKCS1v15(),
hashes.SHA256(), hashes.SHA256(),
) )
return base64.b64encode(signature).decode() return base64.b64encode(signature).decode()
def _verify(self, data: str, sign: str) -> bool: def _verify(self, data: str, sign: str) -> bool:
""" """
验证支付宝回调签名 验证支付宝回调签名
""" """
try: try:
self._alipay_public_key.verify( self._alipay_public_key.verify(
base64.b64decode(sign), base64.b64decode(sign),
data.encode(), data.encode(),
padding.PKCS1v15(), padding.PKCS1v15(),
hashes.SHA256() hashes.SHA256()
) )
return True return True
except Exception: except Exception:
return False return False
def _build_sign_content(self, params: Dict[str, str]) -> str: def _build_sign_content(self, params: Dict[str, str]) -> str:
""" """
生成 "k=v&k2=v2" 的签名串 生成 "k=v&k2=v2" 的签名串
""" """
sorted_items = sorted((k, v) for k, v in params.items() if v is not None) sorted_items = sorted((k, v) for k, v in params.items() if v is not None)
return "&".join(f"{k}={v}" for k, v in sorted_items) return "&".join(f"{k}={v}" for k, v in sorted_items)
# ============================================================================== # ==============================================================================
# 支付 - H5 页面跳转 # 支付 - H5 页面跳转
# ============================================================================== # ==============================================================================
async def create_payment(self, payload: Dict[str, Any]) -> str: async def create_payment(self, payload: Dict[str, Any]) -> str:
""" """
返回一个可以在 H5 里直接重定向的支付宝支付 URL 返回一个可以在 H5 里直接重定向的支付宝支付 URL
""" """
debug(f'{payload=}') debug(f'{payload=}')
biz_content = { biz_content = {
"out_trade_no": payload["out_trade_no"], "out_trade_no": payload["out_trade_no"],
"total_amount": payload["amount"], "total_amount": payload["amount"],
"subject": payload["payment_name"], "subject": payload["payment_name"],
"product_code": "QUICK_WAP_WAY" "product_code": "QUICK_WAP_WAY"
} }
params = { params = {
"app_id": self.app_id, "app_id": self.app_id,
"method": "alipay.trade.wap.pay", "method": "alipay.trade.wap.pay",
"format": "JSON", "format": "JSON",
"charset": "utf-8", "charset": "utf-8",
"sign_type": "RSA2", "sign_type": "RSA2",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"version": "1.0", "version": "1.0",
"notify_url": payload["notify_url"], "notify_url": payload["notify_url"],
"biz_content": json.dumps(biz_content, separators=(",", ":")) "biz_content": json.dumps(biz_content, separators=(",", ":"))
} }
# 生成签名 # 生成签名
unsigned_str = self._build_sign_content(params) unsigned_str = self._build_sign_content(params)
sign = self._sign(unsigned_str) sign = self._sign(unsigned_str)
params["sign"] = sign params["sign"] = sign
debug(f'{params=}') debug(f'{params=}')
query_str = urllib.parse.urlencode(params) query_str = urllib.parse.urlencode(params)
return f"{ALIPAY_GATEWAY}?{query_str}" return f"{ALIPAY_GATEWAY}?{query_str}"
# ============================================================================== # ==============================================================================
# 查询订单 # 查询订单
# ============================================================================== # ==============================================================================
async def query(self, out_trade_no: str) -> Dict[str, Any]: async def query(self, out_trade_no: str) -> Dict[str, Any]:
biz_content = { biz_content = {
"out_trade_no": out_trade_no, "out_trade_no": out_trade_no,
} }
params = { params = {
"app_id": self.app_id, "app_id": self.app_id,
"method": "alipay.trade.query", "method": "alipay.trade.query",
"format": "JSON", "format": "JSON",
"charset": "utf-8", "charset": "utf-8",
"sign_type": "RSA2", "sign_type": "RSA2",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"version": "1.0", "version": "1.0",
"biz_content": json.dumps(biz_content, separators=(",", ":")) "biz_content": json.dumps(biz_content, separators=(",", ":"))
} }
# 签名 # 签名
unsigned_str = self._build_sign_content(params) unsigned_str = self._build_sign_content(params)
params["sign"] = self._sign(unsigned_str) params["sign"] = self._sign(unsigned_str)
async with self.session.post(ALIPAY_GATEWAY, data=params) as resp: async with self.session.post(ALIPAY_GATEWAY, data=params) as resp:
data = await resp.json() data = await resp.json()
return data return data
# ============================================================================== # ==============================================================================
# 退款 # 退款
# ============================================================================== # ==============================================================================
async def refund(self, *, out_trade_no: str, refund_amount: str, out_request_no: str) -> Dict[str, Any]: async def refund(self, *, out_trade_no: str, refund_amount: str, out_request_no: str) -> Dict[str, Any]:
""" """
out_request_no 必须全局唯一一个退款请求一个编号 out_request_no 必须全局唯一一个退款请求一个编号
""" """
biz_content = { biz_content = {
"out_trade_no": out_trade_no, "out_trade_no": out_trade_no,
"refund_amount": refund_amount, "refund_amount": refund_amount,
"out_request_no": out_request_no "out_request_no": out_request_no
} }
params = { params = {
"app_id": self.app_id, "app_id": self.app_id,
"method": "alipay.trade.refund", "method": "alipay.trade.refund",
"format": "JSON", "format": "JSON",
"charset": "utf-8", "charset": "utf-8",
"sign_type": "RSA2", "sign_type": "RSA2",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"version": "1.0", "version": "1.0",
"biz_content": json.dumps(biz_content, separators=(",", ":")) "biz_content": json.dumps(biz_content, separators=(",", ":"))
} }
unsigned_str = self._build_sign_content(params) unsigned_str = self._build_sign_content(params)
params["sign"] = self._sign(unsigned_str) params["sign"] = self._sign(unsigned_str)
async with self.session.post(ALIPAY_GATEWAY, data=params) as resp: async with self.session.post(ALIPAY_GATEWAY, data=params) as resp:
return await resp.json() return await resp.json()
# ============================================================================== # ==============================================================================
# 回调 / 异步通知(验签) # 回调 / 异步通知(验签)
# ============================================================================== # ==============================================================================
async def handle_notify(self, request) -> Dict[str, Any]: async def handle_notify(self, request) -> Dict[str, Any]:
""" """
支付宝异步通知验签 支付宝异步通知验签
""" """
form = await request.post() form = await request.post()
params = dict(form) params = dict(form)
sign = params.pop("sign", None) sign = params.pop("sign", None)
sign_type = params.pop("sign_type", None) sign_type = params.pop("sign_type", None)
unsigned_str = self._build_sign_content(params) unsigned_str = self._build_sign_content(params)
if not sign: if not sign:
return {"verified": False, "msg": "no sign"} return {"verified": False, "msg": "no sign"}
ok = self._verify(unsigned_str, sign) ok = self._verify(unsigned_str, sign)
return { return {
"verified": ok, "verified": ok,
"provider": "alipay", "provider": "alipay",
"data": params, "data": params,
} }

View File

@ -3,37 +3,37 @@ import aiohttp, json
from ..core import Gateway, GatewayError from ..core import Gateway, GatewayError
class StripeGateway(Gateway): class StripeGateway(Gateway):
def __init__(self, api_key: str): def __init__(self, api_key: str):
self.api_key = api_key self.api_key = api_key
self.base = "https://api.stripe.com/v1" self.base = "https://api.stripe.com/v1"
async def create_payment(self, payload): async def create_payment(self, payload):
# 使用 PaymentIntent -> 前端使用 stripe.js 完成卡片采集 # 使用 PaymentIntent -> 前端使用 stripe.js 完成卡片采集
url = self.base + "/payment_intents" url = self.base + "/payment_intents"
body = { body = {
"amount": str(payload["amount_total"]), # in cents "amount": str(payload["amount_total"]), # in cents
"currency": payload.get("currency","usd"), "currency": payload.get("currency","usd"),
"payment_method_types[]": "card", "payment_method_types[]": "card",
"description": payload.get("description",""), "description": payload.get("description",""),
"metadata[out_trade_no]": payload.get("out_trade_no","") "metadata[out_trade_no]": payload.get("out_trade_no","")
} }
async with aiohttp.ClientSession() as s: async with aiohttp.ClientSession() as s:
async with s.post(url, data=body, auth=aiohttp.BasicAuth(self.api_key, "")) as r: async with s.post(url, data=body, auth=aiohttp.BasicAuth(self.api_key, "")) as r:
return {"provider":"stripe","data": await r.json()} return {"provider":"stripe","data": await r.json()}
async def refund(self, payload): async def refund(self, payload):
url = self.base + "/refunds" url = self.base + "/refunds"
body = {"charge": payload["charge_id"], "amount": str(payload.get("refund_amount"))} body = {"charge": payload["charge_id"], "amount": str(payload.get("refund_amount"))}
async with aiohttp.ClientSession() as s: async with aiohttp.ClientSession() as s:
async with s.post(url, data=body, auth=aiohttp.BasicAuth(self.api_key, "")) as r: async with s.post(url, data=body, auth=aiohttp.BasicAuth(self.api_key, "")) as r:
return {"provider":"stripe","data": await r.json()} return {"provider":"stripe","data": await r.json()}
async def query(self, payload): async def query(self, payload):
# query payment intent or charge # query payment intent or charge
pid = payload.get("payment_intent_id") or payload.get("charge_id") pid = payload.get("payment_intent_id") or payload.get("charge_id")
if not pid: if not pid:
raise GatewayError("need payment_intent_id or charge_id") raise GatewayError("need payment_intent_id or charge_id")
async with aiohttp.ClientSession() as s: async with aiohttp.ClientSession() as s:
async with s.get(self.base + f"/payment_intents/{pid}", auth=aiohttp.BasicAuth(self.api_key, "")) as r: async with s.get(self.base + f"/payment_intents/{pid}", auth=aiohttp.BasicAuth(self.api_key, "")) as r:
return {"provider":"stripe","data": await r.json()} return {"provider":"stripe","data": await r.json()}
async def handle_notify(self, headers, body): async def handle_notify(self, headers, body):
# stripe webhook: verify signature header (Stripe-Signature) — production use official lib or implement verification # stripe webhook: verify signature header (Stripe-Signature) — production use official lib or implement verification
return {"provider":"stripe","data": json.loads(body)} return {"provider":"stripe","data": json.loads(body)}