# -*- coding: utf-8 -*- import os import time import asyncio from appPublic.jsonConfig import getConfig from alipay import AliPay from sqlor.dbpools import DBPools async def kboss_alipay(out_trade_no, total_amount, subject): db = DBPools() async with db.sqlorContext('kboss') as sor: config = getConfig() prifile = config.alipay.private pubfile = config.alipay.public appid = config.alipay.appid huidiao = config.alipay.huidiao app_private_key_string = '' alipay_public_key_string = '' alipay_appid = '' alipay_huidiao = '' """ 支付宝支付 传递参数: out_trade_no : 订单 total_amount : 金额 subject : 产品 """ with open(pubfile, 'r') as f: alipay_public_key_string = f.read() with open(prifile, 'r') as f: app_private_key_string = f.read() with open(appid, 'r') as f: alipay_appid = f.read() with open(huidiao, 'r') as f: alipay_huidiao = f.read() log_time = time.strftime('%Y-%m-%d %H:%M:%S') if alipay_public_key_string and app_private_key_string and alipay_appid and alipay_huidiao: with open('ali_pay_log.txt', 'a+') as f: f.write(log_time + ' ' + '支付宝密钥文件读取成功' + '\n') else: with open('ali_pay_log.txt', 'a+') as f: f.write(log_time + ' ' + '支付宝密钥文件读取失败, appid路径: ' + appid + '\n') try: if not alipay_huidiao: alipay_huidiao = 'https://sage.opencomputing.cn/api/callback/alipay' alipay = AliPay( appid=alipay_appid, app_notify_url=alipay_huidiao, # 默认回调url app_private_key_string=app_private_key_string, # 支付宝的公钥,验证支付宝回传消息使用,不是自己的公钥, alipay_public_key_string=alipay_public_key_string, sign_type="RSA2", # RSA 或者 RSA2 debug=True, # 默认False ) # 电脑网站支付,需要跳转到https://openapi.alipay.com/gateway.do? + order_string order_string = alipay.api_alipay_trade_page_pay( # 订单号 out_trade_no=out_trade_no, # 金额 total_amount=total_amount, # 产品 subject=subject, # 回调地址和默认配置一样即可 return_url=alipay_huidiao, notify_url=alipay_huidiao # 可选, 不填则使用默认notify url ) except Exception as e: import traceback with open('ali_pay_log.txt', 'w') as f: f.write(str(e) + traceback.format_exc()) traceback.print_exc() return { 'status': False, 'msg': '支付发生错误, %s' % str(e) } if order_string: with open('ali_pay_log.txt', 'a+') as f: f.write(log_time + ' ' + '支付宝跳转链接生成成功' + '\n') else: with open('ali_pay_log.txt', 'a+') as f: f.write(log_time + ' ' + '支付宝跳转链接生成失败, ' + str(order_string) + '\n') url = f"https://openapi.alipay.com/gateway.do?{order_string}" return url if __name__ == '__main__': p = os.getcwd() config = getConfig(p) DBPools(config.databases) loop = asyncio.get_event_loop() print(loop.run_until_complete(kboss_alipay('qwe22d12312541',1,'大模型充值')))