first commit

This commit is contained in:
yumoqing 2025-12-04 18:28:17 +08:00
commit dcf7a239d4
15 changed files with 863 additions and 0 deletions

BIN
unipay/.notify.py.swp Normal file

Binary file not shown.

0
unipay/__init__.py Normal file
View File

44
unipay/core.py Normal file
View File

@ -0,0 +1,44 @@
# unipay/core.py
from abc import ABC, abstractmethod
from typing import Any, Dict
class GatewayError(Exception):
pass
class Gateway(ABC):
"""
抽象统一支付网关接口
实现应异步aiohttp
"""
@abstractmethod
async def create_payment(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
创建支付订单
payload 包含amount, currency, subject/description, return_url, notify_url, customer/context
返回标准字典包含至少{"provider":"wechat|alipay|paypal|stripe", "data": {...}}
"""
raise NotImplementedError
@abstractmethod
async def refund(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
发起退款
payload 内常见字段out_trade_no, out_refund_no, amount, total_amount, reason
"""
raise NotImplementedError
@abstractmethod
async def query(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
查询订单或退款状态
"""
raise NotImplementedError
@abstractmethod
async def handle_notify(self, headers: Dict[str,str], body: str) -> Dict[str, Any]:
"""
验签并解密回调返回解密后的标准 dict包括 out_trade_no, status, attach
"""
raise NotImplementedError

109
unipay/init.py Normal file
View File

@ -0,0 +1,109 @@
# init.py
import os
from unipay.notify import get_provider
from ahserver.serverenv import ServerEnv
# 从 env 或配置载入 provider conf这里只示例
CONF = {
"wechat": {
"mchid": os.getenv("WXP_MCHID",""),
"serial_no": os.getenv("WXP_SERIAL",""),
"privkey_pem": open(os.getenv("WXP_PRIVKEY","./merchant_private_key.pem"),"rb").read(),
"platform_pub_pem": open(os.getenv("WXP_PLATFORM_PUB","./platform_pub.pem"),"rb").read(),
"api_v3_key": os.getenv("WXP_API_V3_KEY","").encode()
},
"paypal": {
"client_id": os.getenv("PP_ID",""),
"client_secret": os.getenv("PP_SECRET",""),
"sandbox": True
},
"alipay": {
"app_id": os.getenv("ALIPAY_APPID",""),
"privkey_pem": open(os.getenv("ALIPAY_PRIV","./alipay_priv.pem"),"rb").read(),
"alipay_pub_pem": open(os.getenv("ALIPAY_PUB","./alipay_pub.pem"),"rb").read(),
"sandbox": True
},
"stripe": {
"api_key": os.getenv("STRIPE_KEY","")
}
}
PROVIDERS = {}
# 下单接口(统一)
async def create_payment(request, params_kw=None):
if params_kw is None:
params_kw = request.paams_kw
data = params_kw
provider = data.get("provider")
if provider not in PROVIDERS:
return {"error":"unknown provider"}
try:
res = await PROVIDERS[provider].create_payment(data)
return res
except Exception as e:
return {"error": str(e)}
# 查询
async def query_payment(request, params_kw=None):
if params_kw is None:
params_kw = request.paams_kw
data = params_kw
provider = data.get("provider")
if provider not in PROVIDERS:
return {"error":"unknown provider"}
try:
res = await PROVIDERS[provider].query(data)
return res
except Exception as e:
return {"error": str(e)}
# 退款
async def refund_payment(request, params_kw=None):
if params_kw is None:
params_kw = request.paams_kw
data = params_kw
provider = data.get("provider")
if provider not in PROVIDERS:
return {"error":"unknown provider"}
try:
res = await PROVIDERS[provider].refund(data)
return res
except Exception as e:
return {"error": str(e)}
# 回调入口:你可把厂商回调用各自 endpoint 再转发到这里,或在厂商控制台按各自 URL 配置
async def payment_notify(request, callback, params_kw=None):
if params_kw is None:
params_kw = request.paams_kw
data = params_kw
provider = params_kw.provider
headers = dict(request.headers)
body = await request.text()
try:
data = await PROVIDERS[provider].handle_notify(headers, body)
# 这里 data 应包含标准化字段out_trade_no/status/attach 等
# TODO: 业务幂等处理
# 返回厂商要求的固定成功响应
await callback(request, data)
if provider == "wechat":
return {"code":"SUCCESS", "message":"OK"}
else:
return "OK"
except Exception as e:
return web.Response(status=500, text=str(e))
# callback url= "/unipay/notify/{provider}"
def load_unipay():
PROVIDERS["wechat"] = get_provider("wechat", CONF["wechat"]),
PROVIDERS["paypal"] = get_provider("paypal", CONF["paypal"]),
PROVIDERS["alipay"] = get_provider("alipay", CONF["alipay"]),
PROVIDERS["stripe"] = get_provider("stripe", CONF["stripe"])
env = ServerEnv()
env.payment_notify = payment_notify
env.create_payment = create_payment
env.query_payment = query_payment
env.refund_payment = refund_payment

19
unipay/notify.py Normal file
View File

@ -0,0 +1,19 @@
# unipay/notify.py
from typing import Dict
from .providers.wechat import WechatGateway
from .providers.paypal import PayPalGateway
from .providers.alipay import AlipayGateway
from .providers.stripe import StripeGateway
# 简单工厂:你可以按需扩展配置注入
def get_provider(name: str, conf: Dict):
if name == "wechat":
return WechatGateway(**conf)
if name == "paypal":
return PayPalGateway(**conf)
if name == "alipay":
return AlipayGateway(**conf)
if name == "stripe":
return StripeGateway(**conf)
raise ValueError("unknown provider")

View File

203
unipay/providers/alipay.py Normal file
View File

@ -0,0 +1,203 @@
# providers/alipay.py
import json
import time
import base64
import hashlib
import urllib.parse
from typing import Any, Dict, Optional
import aiohttp
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.hazmat.primitives import serialization
from ..core import Gateway, GatewayError
from ..utils import safe_json_dumps
ALIPAY_GATEWAY = "https://openapi.alipay.com/gateway.do"
class AlipayProvider(GateWay):
"""
生产级支付宝接口H5 支付 / 退款 / 查询 / 回调验签
"""
def __init__(
self,
app_id: str,
app_private_key_pem: str,
alipay_public_key_pem: str,
notify_url: str,
):
self.app_id = app_id
self.notify_url = notify_url
# -------- load keys -------
self._private_key = serialization.load_pem_private_key(
app_private_key_pem.encode(), password=None
)
self._alipay_public_key = serialization.load_pem_public_key(
alipay_public_key_pem.encode()
)
self.session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=20))
# ==============================================================================
# 工具函数(签名 / 验签)
# ==============================================================================
def _sign(self, unsigned_str: str) -> str:
"""
RSA2SHA256签名
"""
signature = self._private_key.sign(
unsigned_str.encode(),
padding.PKCS1v15(),
hashes.SHA256(),
)
return base64.b64encode(signature).decode()
def _verify(self, data: str, sign: str) -> bool:
"""
验证支付宝回调签名
"""
try:
self._alipay_public_key.verify(
base64.b64decode(sign),
data.encode(),
padding.PKCS1v15(),
hashes.SHA256()
)
return True
except Exception:
return False
def _build_sign_content(self, params: Dict[str, str]) -> str:
"""
生成 "k=v&k2=v2" 的签名串
"""
sorted_items = sorted((k, v) for k, v in params.items() if v is not None)
return "&".join(f"{k}={v}" for k, v in sorted_items)
# ==============================================================================
# 支付 - H5 页面跳转
# ==============================================================================
async def create_payment(self, *, out_trade_no: str, subject: str, amount: str) -> str:
"""
返回一个可以在 H5 里直接重定向的支付宝支付 URL
"""
biz_content = {
"out_trade_no": out_trade_no,
"total_amount": amount,
"subject": subject,
"product_code": "QUICK_WAP_WAY"
}
params = {
"app_id": self.app_id,
"method": "alipay.trade.wap.pay",
"format": "JSON",
"charset": "utf-8",
"sign_type": "RSA2",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"version": "1.0",
"notify_url": self.notify_url,
"biz_content": json.dumps(biz_content, separators=(",", ":"))
}
# 生成签名
unsigned_str = self._build_sign_content(params)
sign = self._sign(unsigned_str)
params["sign"] = sign
query_str = urllib.parse.urlencode(params)
return f"{ALIPAY_GATEWAY}?{query_str}"
# ==============================================================================
# 查询订单
# ==============================================================================
async def query(self, out_trade_no: str) -> Dict[str, Any]:
biz_content = {
"out_trade_no": out_trade_no,
}
params = {
"app_id": self.app_id,
"method": "alipay.trade.query",
"format": "JSON",
"charset": "utf-8",
"sign_type": "RSA2",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"version": "1.0",
"biz_content": json.dumps(biz_content, separators=(",", ":"))
}
# 签名
unsigned_str = self._build_sign_content(params)
params["sign"] = self._sign(unsigned_str)
async with self.session.post(ALIPAY_GATEWAY, data=params) as resp:
data = await resp.json()
return data
# ==============================================================================
# 退款
# ==============================================================================
async def refund(self, *, out_trade_no: str, refund_amount: str, out_request_no: str) -> Dict[str, Any]:
"""
out_request_no 必须全局唯一一个退款请求一个编号
"""
biz_content = {
"out_trade_no": out_trade_no,
"refund_amount": refund_amount,
"out_request_no": out_request_no
}
params = {
"app_id": self.app_id,
"method": "alipay.trade.refund",
"format": "JSON",
"charset": "utf-8",
"sign_type": "RSA2",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"version": "1.0",
"biz_content": json.dumps(biz_content, separators=(",", ":"))
}
unsigned_str = self._build_sign_content(params)
params["sign"] = self._sign(unsigned_str)
async with self.session.post(ALIPAY_GATEWAY, data=params) as resp:
return await resp.json()
# ==============================================================================
# 回调 / 异步通知(验签)
# ==============================================================================
async def handle_notify(self, request) -> Dict[str, Any]:
"""
支付宝异步通知验签
"""
form = await request.post()
params = dict(form)
sign = params.pop("sign", None)
sign_type = params.pop("sign_type", None)
unsigned_str = self._build_sign_content(params)
if not sign:
return {"verified": False, "msg": "no sign"}
ok = self._verify(unsigned_str, sign)
return {
"verified": ok,
"provider": "alipay",
"data": params,
}

151
unipay/providers/paypal.py Normal file
View File

@ -0,0 +1,151 @@
# providers/paypal.py
import aiohttp
import asyncio
import typing as t
from abc import ABC, abstractmethod
import os
from ..core import Gateway, GatewayError
from ..utils import safe_json_dumps
class PayPalGateway(Gateway):
def __init__(self, client_id: str, client_secret: str, sandbox: bool = True):
self.client_id = client_id
self.client_secret = client_secret
self.sandbox = sandbox
self.base_url = "https://api-m.sandbox.paypal.com" if sandbox else "https://api-m.paypal.com"
self._token: t.Optional[str] = None
self._token_expiry: t.Optional[float] = None
async def _get_access_token(self) -> str:
if self._token and self._token_expiry and self._token_expiry > asyncio.get_event_loop().time():
return self._token
url = f"{self.base_url}/v1/oauth2/token"
auth = aiohttp.BasicAuth(self.client_id, self.client_secret)
data = {"grant_type": "client_credentials"}
async with aiohttp.ClientSession() as session:
async with session.post(url, data=data, auth=auth) as resp:
resp.raise_for_status()
r = await resp.json()
self._token = r["access_token"]
self._token_expiry = asyncio.get_event_loop().time() + r.get("expires_in", 300) - 10
return self._token
async def create_payment(self, payload: t.Dict[str, t.Any]) -> str:
"""
创建 PayPal 订单
payload 必须包含
- amount: 数值
- currency: 币种
- description / subject
- return_url
- notify_url
- out_trade_no: 商户订单号
"""
access_token = await self._get_access_token()
url = f"{self.base_url}/v2/checkout/orders"
# PayPal 订单 body
body = {
"intent": "CAPTURE",
"purchase_units": [
{
"amount": {
"currency_code": payload["currency"],
"value": f"{payload['amount']:.2f}"
},
"custom_id": payload["out_trade_no"], # 保留商户交易号
"description": payload.get("description") or payload.get("subject", "")
}
],
"application_context": {
"return_url": payload["return_url"],
"cancel_url": payload.get("cancel_url", payload["return_url"])
}
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=body, headers={"Authorization": f"Bearer {access_token}"}) as resp:
resp.raise_for_status()
data = await resp.json()
# 获取 PayPal 端交易号
transaction_id = data["id"]
# 获取跳转 URL
approval_url = next((link["href"] for link in data.get("links", []) if link.get("rel") == "approve"), None)
return approval_url
# return {"provider": "paypal", "data": data, "out_trade_no": payload["out_trade_no"], "transaction_id": transaction_id}
async def refund(self, payload: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
"""
payload:
- transaction_id: PayPal 端交易号
- amount
- out_refund_no
"""
access_token = await self._get_access_token()
url = f"{self.base_url}/v2/payments/captures/{payload['transaction_id']}/refund"
body = {"amount": {"value": f"{payload['amount']:.2f}", "currency_code": payload.get("currency", "USD")}}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=body, headers={"Authorization": f"Bearer {access_token}"}) as resp:
resp.raise_for_status()
r = await resp.json()
return {
"out_trade_no": payload.get("out_trade_no"),
"refund_id": r.get("id"),
"status": r.get("status")
}
async def query(self, payload: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
"""
查询订单状态
payload:
- transaction_id: PayPal 端交易号
"""
access_token = await self._get_access_token()
url = f"{self.base_url}/v2/checkout/orders/{payload['transaction_id']}"
async with aiohttp.ClientSession() as session:
async with session.get(url, headers={"Authorization": f"Bearer {access_token}"}) as resp:
resp.raise_for_status()
r = await resp.json()
out_trade_no = None
if "purchase_units" in r and len(r["purchase_units"]) > 0:
out_trade_no = r["purchase_units"][0].get("custom_id")
return {
"out_trade_no": out_trade_no,
"transaction_id": r.get("id"),
"status": r.get("status"),
"data": r
}
async def handle_notify(self, headers: t.Dict[str,str], body: str) -> t.Dict[str, t.Any]:
"""
PayPal webhook 处理
返回 dict 包含
- out_trade_no: 商户交易号
- transaction_id: PayPal 端交易号
- status: COMPLETED/REFUNDED
- attach: webhook 原始数据
"""
import json
event = json.loads(body)
out_trade_no = None
transaction_id = None
if event.get("resource"):
transaction_id = event["resource"].get("id")
# 先取 custom_id创建订单时传入
pu = event["resource"].get("purchase_units", [])
if pu and len(pu) > 0:
out_trade_no = pu[0].get("custom_id")
# fallback: invoice_id
if not out_trade_no and event["resource"].get("invoice_id"):
out_trade_no = event["resource"]["invoice_id"]
return {
"provider": "paypal",
"out_trade_no": out_trade_no,
"transaction_id": transaction_id,
"status": event.get("event_type"),
"attach": event
}

View File

@ -0,0 +1,39 @@
# unipay/providers/stripe.py
import aiohttp, json
from ..core import Gateway, GatewayError
class StripeGateway(Gateway):
def __init__(self, api_key: str):
self.api_key = api_key
self.base = "https://api.stripe.com/v1"
async def create_payment(self, payload):
# 使用 PaymentIntent -> 前端使用 stripe.js 完成卡片采集
url = self.base + "/payment_intents"
body = {
"amount": str(payload["amount_total"]), # in cents
"currency": payload.get("currency","usd"),
"payment_method_types[]": "card",
"description": payload.get("description",""),
"metadata[out_trade_no]": payload.get("out_trade_no","")
}
async with aiohttp.ClientSession() as s:
async with s.post(url, data=body, auth=aiohttp.BasicAuth(self.api_key, "")) as r:
return {"provider":"stripe","data": await r.json()}
async def refund(self, payload):
url = self.base + "/refunds"
body = {"charge": payload["charge_id"], "amount": str(payload.get("refund_amount"))}
async with aiohttp.ClientSession() as s:
async with s.post(url, data=body, auth=aiohttp.BasicAuth(self.api_key, "")) as r:
return {"provider":"stripe","data": await r.json()}
async def query(self, payload):
# query payment intent or charge
pid = payload.get("payment_intent_id") or payload.get("charge_id")
if not pid:
raise GatewayError("need payment_intent_id or charge_id")
async with aiohttp.ClientSession() as s:
async with s.get(self.base + f"/payment_intents/{pid}", auth=aiohttp.BasicAuth(self.api_key, "")) as r:
return {"provider":"stripe","data": await r.json()}
async def handle_notify(self, headers, body):
# stripe webhook: verify signature header (Stripe-Signature) — production use official lib or implement verification
return {"provider":"stripe","data": json.loads(body)}

266
unipay/providers/wechat.py Normal file
View File

@ -0,0 +1,266 @@
# -*- coding:utf-8 -*-
import base64
import json
import time
import aiohttp
from abc import ABC
from typing import Dict, Any
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
WECHAT_API_BASE = "https://api.mch.weixin.qq.com"
class WeChatGateway(ABC):
"""
微信支付网关适配统一 Gateway 接口
"""
def __init__(self,
mchid: str,
appid: str,
cert_serial_no: str,
private_key_pem: str,
api_v3_key: str):
self.mchid = mchid
self.appid = appid
self.cert_serial_no = cert_serial_no
self.api_v3_key = api_v3_key.encode()
# 加载私钥
self._private_key = serialization.load_pem_private_key(
private_key_pem.encode(),
password=None,
)
# 缓存微信平台证书
self._platform_certs = {} # serial_no -> public_key
# -----------------------------------------------------
# 工具方法:微信签名 V3
# -----------------------------------------------------
def _sign(self, method: str, url_path: str, body: str) -> str:
timestamp = str(int(time.time()))
nonce_str = base64.urlsafe_b64encode(
json.dumps(time.time()).encode()
).decode()[:16]
message = f"{method}\n{url_path}\n{timestamp}\n{nonce_str}\n{body}\n"
signature = self._private_key.sign(
message.encode(),
padding.PKCS1v15(),
hashes.SHA256()
)
signature_b64 = base64.b64encode(signature).decode()
token = (
f'mchid="{self.mchid}",'
f'nonce_str="{nonce_str}",'
f'signature="{signature_b64}",'
f'timestamp="{timestamp}",'
f'serial_no="{self.cert_serial_no}"'
)
return token
# -----------------------------------------------------
# 工具:下载平台证书
# -----------------------------------------------------
async def _ensure_platform_cert(self, session: aiohttp.ClientSession):
url_path = "/v3/certificates"
url = WECHAT_API_BASE + url_path
auth = self._sign("GET", url_path, "")
async with session.get(url, headers={
"Authorization": f"WECHATPAY2-SHA256-RSA2048 {auth}",
"Accept": "application/json"
}) as resp:
data = await resp.json()
for cert in data.get("data", []):
sn = cert["serial_no"]
encrypt_info = cert["encrypt_certificate"]
# 解密平台证书
aesgcm = AESGCM(self.api_v3_key)
pub_pem = aesgcm.decrypt(
nonce=encrypt_info["nonce"].encode(),
data=base64.b64decode(encrypt_info["ciphertext"]),
associated_data=encrypt_info["associated_data"].encode()
).decode()
pub_key = serialization.load_pem_public_key(pub_pem.encode())
self._platform_certs[sn] = pub_key
# -----------------------------------------------------
# 工具:验签回调
# -----------------------------------------------------
async def _verify_callback(self, headers: Dict[str, str], body: str):
serial = headers.get("Wechatpay-Serial")
timestamp = headers.get("Wechatpay-Timestamp")
nonce = headers.get("Wechatpay-Nonce")
signature = headers.get("Wechatpay-Signature")
if serial not in self._platform_certs:
# 自动拉取平台证书
async with aiohttp.ClientSession() as session:
await self._ensure_platform_cert(session)
pub_key = self._platform_certs.get(serial)
if pub_key is None:
raise ValueError("Platform cert not found")
message = f"{timestamp}\n{nonce}\n{body}\n".encode()
try:
pub_key.verify(
base64.b64decode(signature),
message,
padding.PKCS1v15(),
hashes.SHA256()
)
except Exception:
raise ValueError("Invalid callback signature")
# -----------------------------------------------------
# 工具:解密 resource.ciphertext
# -----------------------------------------------------
def _decrypt_resource(self, resource: Dict[str, Any]) -> Dict[str, Any]:
aesgcm = AESGCM(self.api_v3_key)
plaintext = aesgcm.decrypt(
nonce=resource["nonce"].encode(),
data=base64.b64decode(resource["ciphertext"]),
associated_data=resource["associated_data"].encode()
).decode()
return json.loads(plaintext)
# -----------------------------------------------------
# 实现统一接口
# -----------------------------------------------------
async def create_payment(self, payload: Dict[str, Any]) -> str:
"""
H5 下单默认 H5可改为 jsapi/native
"""
body = {
"appid": self.appid,
"mchid": self.mchid,
"description": payload["subject"],
"out_trade_no": payload["out_trade_no"],
"notify_url": payload["notify_url"],
"amount": {
"total": int(payload["amount"]),
"currency": payload.get("currency", "CNY")
},
"scene_info": {
"payer_client_ip": payload.get("client_ip", "127.0.0.1"),
"h5_info": {"type": "Wap"}
}
}
body_str = json.dumps(body, ensure_ascii=False)
url_path = "/v3/pay/transactions/h5"
url = WECHAT_API_BASE + url_path
auth = self._sign("POST", url_path, body_str)
async with aiohttp.ClientSession() as session:
async with session.post(url, data=body_str, headers={
"Authorization": f"WECHATPAY2-SHA256-RSA2048 {auth}",
"Content-Type": "application/json"
}) as resp:
ret = await resp.json()
return ret.get("h5_url")
return {
"provider": "wechat",
"data": {
"out_trade_no": body["out_trade_no"],
"h5_url": ret.get("h5_url"),
"wx_transaction_id": ret.get("transaction_id")
}
}
# -----------------------------------------------------
async def refund(self, payload: Dict[str, Any]) -> Dict[str, Any]:
body = {
"out_trade_no": payload["out_trade_no"],
"out_refund_no": payload["out_refund_no"],
"amount": {
"refund": int(payload["amount"]),
"total": int(payload["total_amount"]),
"currency": payload.get("currency", "CNY")
},
"reason": payload.get("reason")
}
body_str = json.dumps(body, ensure_ascii=False)
url_path = "/v3/refund/domestic/refunds"
url = WECHAT_API_BASE + url_path
auth = self._sign("POST", url_path, body_str)
async with aiohttp.ClientSession() as session:
async with session.post(url, data=body_str, headers={
"Authorization": f"WECHATPAY2-SHA256-RSA2048 {auth}",
"Content-Type": "application/json"
}) as resp:
ret = await resp.json()
return {
"provider": "wechat",
"data": ret
}
# -----------------------------------------------------
async def query(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
订单查询或退款查询
支持
/v3/pay/transactions/out-trade-no/{out_trade_no}
/v3/refund/domestic/refunds/{out_refund_no}
"""
if "out_trade_no" in payload:
url_path = f"/v3/pay/transactions/out-trade-no/{payload['out_trade_no']}?mchid={self.mchid}"
else:
url_path = f"/v3/refund/domestic/refunds/{payload['out_refund_no']}"
url = WECHAT_API_BASE + url_path
auth = self._sign("GET", url_path, "")
async with aiohttp.ClientSession() as session:
async with session.get(url, headers={
"Authorization": f"WECHATPAY2-SHA256-RSA2048 {auth}",
"Accept": "application/json"
}) as resp:
ret = await resp.json()
return {
"provider": "wechat",
"data": ret
}
# -----------------------------------------------------
async def handle_notify(self, headers: Dict[str, str], body: str) -> Dict[str, Any]:
"""
解密/验证回调返回标准 dict
out_trade_no
transaction_id
trade_state
payer info
"""
await self._verify_callback(headers, body)
payload = json.loads(body)
resource = payload["resource"]
decrypted = self._decrypt_resource(resource)
# 返回标准结构
return {
"provider": "wechat",
"data": decrypted,
"out_trade_no": decrypted.get("out_trade_no"),
"trans_no": decrypted.get("transaction_id"),
"status": decrypted.get("trade_state")
}

11
unipay/utils.py Normal file
View File

@ -0,0 +1,11 @@
# unipay/utils.py
import time, secrets, json
def now_ts() -> str:
return str(int(time.time()))
def nonce_str(n=32):
return secrets.token_hex(n//2)
def safe_json_dumps(obj):
return json.dumps(obj, separators=(",", ":"), ensure_ascii=False)

View File

@ -0,0 +1,5 @@
debug(f'/unipay/notify/alipay/index.dspy:{params_kw}')
ns = params_kw.copy()
ns.provider = 'alipay'
return await payment_notify(request, recharge_accounting, params_kw=ns)

View File

@ -0,0 +1,5 @@
debug(f'/unipay/notify/paypal/index.dspy:{params_kw}')
ns = params_kw.copy()
ns.provider = 'paypal'
return await payment_notify(request, recharge_accounting, params_kw=ns)

View File

@ -0,0 +1,5 @@
debug(f'/unipay/notify/stripe/index.dspy:{params_kw}')
ns = params_kw.copy()
ns.provider = 'stripe'
return await payment_notify(request, recharge_accounting, params_kw=ns)

View File

@ -0,0 +1,6 @@
debug(f'/unipay/notify/wechat/index.dspy:{params_kw}')
ns = params_kw.copy()
ns.provider = 'wechat'
return await payment_notify(request, recharge_accounting, params_kw=ns)