async def user_action_record(ns={}): ns_dic_id = uuid() if not ns.get('id') else ns.get('id') ns_dic = { 'id': ns_dic_id, 'source': '百度智能云', 'orderid': ns.get('orderid'), 'ordertype': ns.get('ordertype'), 'userid': ns.get('userid'), 'reason': ns.get('reason') } db = DBPools() async with db.sqlorContext('kboss') as sor: await sor.C('user_action', ns_dic) async def time_convert(resoucetime=None): if not resoucetime: return utc_time = datetime.datetime.strptime(resoucetime, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=datetime.timezone.utc) beijing_time = utc_time.astimezone(datetime.timezone(datetime.timedelta(hours=8))) return beijing_time.strftime("%Y-%m-%d %H:%M:%S") async def cal_expire_time(history_time=None, chargeduration=None, unit=None): chargeduration = int(chargeduration) # 当前时间 # now = datetime.datetime.now() now = datetime.datetime.strptime(history_time, '%Y-%m-%d %H:%M:%S') if unit == 'MONTH': expire_time = now + dateutil.relativedelta.relativedelta(months=chargeduration) elif unit == 'YEAR': expire_time = now + dateutil.relativedelta.relativedelta(years=chargeduration) else: expire_time = None if expire_time: return str(expire_time) else: return None async def affirmbz_order(ns={}): """确认支付""" order_type = ns.get('order_type') sor = ns['sor'] orgid = await sor.R('bz_order', {'id': ns['orderid']}) servicename = orgid[0]['servicename'] product_url = None # if ('BCC' in servicename) or ('GPU' in servicename): # product_url = 'https://console.vcp.baidu.com/bcc/#/bcc/instance/list' date = await get_business_date(sor=None) # await sor.U('bz_order',{'id':ns['orderid'],'order_date': date}) count = await getCustomerBalance(sor, orgid[0]['customerid']) if count == None: count = 0 if count - float(orgid[0]['amount']) < 0: pricedifference = count - round(orgid[0]['amount'],2) return {'status': False, 'msg': '账户余额不足','pricedifference': round(pricedifference,2)} await order2bill(ns['orderid'], sor) bills = await sor.R('bill', {'orderid': ns['orderid'], 'del_flg': '0'}) try: # 需要加事务 for i in bills: ba = BillAccounting(i) r = await ba.accounting(sor) dates = datetime.datetime.now() await sor.U('bz_order', {'id': ns['orderid'], 'order_status': '1','create_at':dates}) await sor.U('bill', {'id': ns['orderid'], 'bill_state': '1'}) order_goods = await sor.R('order_goods', {'orderid': ns['orderid']}) for j in order_goods: # 处理退订逻辑 if order_type == 'REFUND': # 找到资源并更新时间 resource_find_sql = """select id, resourceid, expire_resourceid from customer_goods where FIND_IN_SET('%s', resourceid) and del_flg = '0';""" % j['resourceids'] resource_find_li = await sor.sqlExe(resource_find_sql, {}) resource_find_id = resource_find_li[0]['id'] expire_resourceid = resource_find_li[0]['expire_resourceid'] if expire_resourceid: new_expire_resourceid = expire_resourceid + ',' + j['resourceids'] else: new_expire_resourceid = j['resourceids'] items_refund = resource_find_li[0]['resourceid'].split(',') if resource_find_li[0]['resourceid'] else [] filtered_items = [item for item in items_refund if item != j['resourceids']] result = ','.join(filtered_items) if not result: await sor.U('customer_goods', {'id': resource_find_id, 'del_flg': '1'}) else: await sor.U('customer_goods', {'id': resource_find_id, 'resourceid': result, 'expire_resourceid': new_expire_resourceid}) # 处理续费逻辑 elif order_type == 'RENEW': # 找到资源并更新时间 resource_find_sql = """select id from customer_goods where resourceid = '%s' and del_flg = '0';""" % j['resourceids'] resource_find_li = await sor.sqlExe(resource_find_sql, {}) resource_find_id = resource_find_li[0]['id'] await sor.U('customer_goods', {'id': resource_find_id, 'start_date': j['resourcestarttime'], 'expire_date': j['resourceendtime']}) # 处理购买逻辑 else: product = await sor.R('product', {'id': j['productid']}) nss = {} nss['id'] = uuid() # nss['id'] = UUID() nss['providerrid'] = product[0]['providerid'] nss['productname'] = product[0]['name'] nss['productdesc'] = product[0]['description'] nss['customerid'] = orgid[0]['customerid'] nss['productid'] = product[0]['id'] nss['specdataid'] = j['spec_id'] nss['orderid'] = orgid[0]['id'] nss['start_date'] = j['resourcestarttime'] nss['expire_date'] = j['resourceendtime'] nss['resourceid'] = j['resourceids'] if product_url: nss['product_url'] = product_url else: spec = json.loads(product[0]['spec_note']) if isinstance(product[0]['spec_note'], str) else product[0]['spec_note'] spec_list_url = [item['value'] for item in spec if item['configName'] == 'listUrl'] nss['product_url'] = spec_list_url[0] if spec_list_url else 'https://console.vcp.baidu.com/bcc/#/bcc/instance/list' await sor.C('customer_goods', nss) return {'status': True, 'msg': '支付成功'} except Exception as error: await sor.rollback() import traceback with open('baiducloud_err.txt', 'w') as f: f.write(str(error) + str(traceback.format_exc())) traceback.print_exc() return {'status': False, 'msg': str(error) + str(traceback.format_exc())} async def baidu_new_update_resouce(ns={}): # 增加延迟 import asyncio await asyncio.sleep(12) db = DBPools() async with db.sqlorContext('kboss') as sor: baidu_users = await sor.R('baidu_users', {'user_id': ns.get('userid'),'del_flg':'0'}) user = await sor.R('users', {'id': ns.get('userid')}) orgid = await sor.R('organization', {'id': user[0]['orgid']}) nss = {'uuids': [ns.get('order_id')], 'queryAccountId': baidu_users[0]['baidu_id']} ns_format = '&'.join(['%s=%s' % (k, v) for k, v in ns.items()]) url = 'https://billing.baidubce.com/v1/order/getByUuid?%s' % ns_format method = 'POST' header = { "Host": "billing.baidubce.com", "ContentType": "application/json;charset=UTF-8" } header = await get_auth_header(method=method, url=url, header=header) async with aiohttp_client.request( method=method, url=url, headers=header, json=nss) as res: data_ = await res.json() with open('baidu_new_order_after_confirm.txt', 'a+') as f: f.write(json.dumps(data_) + '\n') orders = data_['orders'] for item in orders: order_items = item['orderItems'] for order_info in order_items: resourceids = ','.join(order_info['shortIds']) if order_info.get('shortIds') else '' if not resourceids: continue resourcestarttime = await time_convert(order_info.get('resourceStartTime')) if order_info.get( 'resourceStartTime') else None resourceendtime = await time_convert(order_info.get('resourceEndTime')) if order_info.get( 'resourceEndTime') else None order_key = order_info['key'] update_order_goods_sql = """ UPDATE order_goods og JOIN bz_order o ON og.orderid = o.id SET og.resourceids = '%s', og.resourcestarttime = '%s', og.resourceendtime = '%s' WHERE og.orderkey = '%s' AND o.provider_orderid = '%s'; """ \ % (resourceids, resourcestarttime, resourceendtime, order_key, ns.get('order_id')) await sor.sqlExe(update_order_goods_sql, {}) update_customer_goods_sql = """ UPDATE customer_goods og JOIN bz_order o ON og.orderid = o.id SET og.resourceid = '%s', og.start_date = '%s', og.expire_date = '%s' WHERE og.orderkey = '%s' AND o.provider_orderid = '%s'; """ \ % (resourceids, resourcestarttime, resourceendtime, order_key, ns.get('order_id')) await sor.sqlExe(update_customer_goods_sql, {}) async def baidu_order_cancel(ns={}): baidu_id = ns['baidu_id'] order_id = ns['order_id'] paydata = {'queryAccountId': baidu_id, 'orderIds': [order_id]} ns_format = '&'.join(['%s=%s' % (k, v) for k, v in ns.items()]) url = 'https://billing.baidubce.com/v1/order/cancel?%s' % ns_format method = 'POST' header = { "Host": "billing.baidubce.com" } header = await get_auth_header(method=method, url=url, header=header) async with aiohttp_client.request( method=method, url=url, headers=header, json=paydata) as res: await res.json() return { 'status': True, 'msg': 'order cancel success' } async def get_baidu_orderlist(ns={}): """ 百度支付 1、获取订单 2、算出购买的产品折扣 3、比对账号余额 """ # 增加延迟 import asyncio await asyncio.sleep(1) db = DBPools() async with db.sqlorContext('kboss') as sor: baidu_users = await sor.R('baidu_users', {'user_id': ns.get('userid'),'del_flg':'0'}) user = await sor.R('users', {'id': ns.get('userid')}) orgid = await sor.R('organization', {'id': user[0]['orgid']}) nss = {'uuids': [ns.get('order_id')], 'queryAccountId': baidu_users[0]['baidu_id']} ns_format = '&'.join(['%s=%s' % (k, v) for k, v in ns.items()]) url = 'https://billing.baidubce.com/v1/order/getByUuid?%s' % ns_format method = 'POST' header = { "Host": "billing.baidubce.com", "ContentType": "application/json;charset=UTF-8" } header = await get_auth_header(method=method, url=url, header=header) async with aiohttp_client.request( method=method, url=url, headers=header, json=nss) as res: data_ = await res.json() with open('baidu_new_order.txt', 'a+') as f: f.write(json.dumps(data_) + '\n') orders = data_['orders'] serviceType = orders[0]['orderItems'] # 可能获取得到的是延迟订单 # if orders[0]['type'] == 'REFUND' and orders[0]['status'] != 'CREATED': # await update_baidu_order_list({'userid': ns.get('userid')}) # res_refund = await baidu_confirm_refund_order({'order_id': ns.get('order_id'), 'baidu_id': baidu_users[0]['baidu_id'], 'user_id': ns.get('userid')}) # return res_refund # 避免重复退订 if orders[0]['type'] == 'REFUND' and orders[0]['status'] == 'CREATED': order_history = await sor.R('bz_order', {'provider_orderid': ns.get('order_id'), 'business_op': 'BUY_REVERSE', 'order_status': '1'}) if order_history: print('此订单之前已经退费成功') return { 'status': True, 'msg': '此订单之前已经退费成功' } # 如果是退款 更新数据库状态 if orders[0]['type'] == 'REFUND': updatetime = await time_convert(orders[0]['updateTime']) if orders[0].get('updateTime') else None update_refund_sql = """UPDATE baidu_orders SET price = '%s', status = '%s', updatetime = '%s' WHERE orderid = '%s';""" % \ (float(orders[0]['price']), orders[0]['status'], updatetime, ns.get('order_id')) await sor.sqlExe(update_refund_sql, {}) productType = 'prepay' # 判断订单item中productType是否有后付费的产品 for item in orders: order_items = item['orderItems'] for order_info in order_items: postpay_price = order_info['itemFee']['price'] if order_info.get('itemFee') else order_info['catalogPrice'] if not postpay_price: # cpt1Price: 固定配置,按分钟计费 postpay_price = order_info['pricingDetail'].get('cpt1Price') if order_info.get('pricingDetail') else 0 # 确定是否是后付费订单 if order_info['productType'] == 'postpay' and postpay_price != 0: productType = 'postpay' # 获取余额 user_balance = await getCustomerBalance(sor, orgid[0]['id']) # 判断余额是否大于50 if user_balance < 50: await sor.rollback() paydata = {'queryAccountId': baidu_users[0]['baidu_id'], 'orderIds': [ns.get('order_id')]} ns_format = '&'.join(['%s=%s' % (k, v) for k, v in ns.items()]) url = 'https://billing.baidubce.com/v1/order/cancel?%s' % ns_format method = 'POST' header = { "Host": "billing.baidubce.com" } header = await get_auth_header(method=method, url=url, header=header) async with aiohttp_client.request( method=method, url=url, headers=header, json=paydata) as res: await res.json() ns_record = { 'orderid': ns.get('order_id'), 'ordertype': orders[0]['type'], 'userid': ns.get('userid'), 'reason': '后付费 该账号余额不足50,无法完成购买' } await user_action_record(ns_record) return { 'status': False, 'msg': '您的余额小于该产品的起购金额50元, 目前无法购买立即充值' } # 实付价格 total_price = 0 # productType = '' # 买/续/退 字段映射 order_type = orders[0]['type'] if order_type == 'NEW': business_op = 'BUY' elif order_type == 'RENEW': business_op = 'RENEW' elif order_type == 'REFUND': business_op = 'BUY_REVERSE' else: ns_record = { 'orderid': ns.get('order_id'), 'ordertype': orders[0]['type'], 'userid': ns.get('userid'), 'reason': '支付形式目前仅包含购买,续费,退订' } await user_action_record(ns_record) return { 'status': False, 'msg': '线上暂不支持, 请联系售后' } try: # 生成本地订单 bz_ns = {} bz_ns['id'] = uuid() bz_ns['order_status'] = '0' bz_ns['business_op'] = business_op bz_ns['userid'] = ns.get('userid') bz_ns['customerid'] = orgid[0]['id'] bz_ns['order_date'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") bz_ns['thirdparty_order'] = ns.get('order_id') bz_ns['source'] = '百度智能云' # bz_ns['originalprice'] = orders[0]['price'] bz_ns['originalprice'] = sum(i['catalogPrice'] for i in serviceType) bz_ns['provider_orderid'] = ns.get('order_id') bz_ns['ordertype'] = orders[0]['productType'] bz_ns['servicename'] = orders[0]['serviceType'] bz_ns['autoreneworder'] = '1' if orders[0]['autoRenewOrder'] else '0' if ns.get('specdataid'): bz_ns['specdataid'] = ns['specdataid'] await sor.C('bz_order', bz_ns) for i in serviceType: # if i['productType'] == 'prepay': # # 预付费 # productType = 'prepay' # financePrice = 0 # 获取产品id product = await sor.R('product', {'providerpid': 'baidu_' + i['serviceType'], 'del_flg': '0'}) if not product: return { 'status': False, 'msg': '未配置该产品, 请联系售后处理' } # 获取协议 saleprotocol_to_person = await sor.R('saleprotocol', {'bid_orgid': orgid[0]['id'], 'offer_orgid': orgid[0]['parentid'], 'del_flg': '0'}) # 等于空就代表这个客户没有特殊折扣,就要找到买方为*的协议 saleprotocol_to_all = await sor.R('saleprotocol', {'bid_orgid': '*', 'offer_orgid': orgid[0]['parentid'], 'del_flg': '0', 'salemode': '0'}) if saleprotocol_to_person: product_salemode = await sor.R('product_salemode', {'protocolid': saleprotocol_to_person[0]['id'], 'productid': product[0]['id'], 'del_flg': '0'}) if not product_salemode: product_salemode = await sor.R('product_salemode', {'protocolid': saleprotocol_to_all[0]['id'], 'productid': product[0]['id'], 'del_flg': '0'}) else: product_salemode = await sor.R('product_salemode', {'protocolid': saleprotocol_to_all[0]['id'], 'productid': product[0]['id'], 'del_flg': '0'}) if not product_salemode: return { 'status': False, 'msg': '还未上线这个产品的协议配置' } supply_price = i['itemFee']['price'] if i.get('itemFee') else i['catalogPrice'] financePrice = abs(supply_price * product_salemode[0]['discount']) total_price += financePrice # 添加订单子表 nss = {} nss['id'] = uuid() nss['orderid'] = bz_ns['id'] nss['productid'] = product[0]['id'] nss['providerid'] = product[0]['providerid'] if i['count'] > 1: nss['list_price'] = abs(i['realCatalogPrice'] / i['count']) else: nss['list_price'] = abs(i['realCatalogPrice']) nss['discount'] = product_salemode[0]['discount'] nss['quantity'] = i['count'] nss['price'] = abs(round(nss['list_price'] * product_salemode[0]['discount'], 2)) # 计算总价 # nss['amount'] = round(total_price, 2) nss['amount'] = abs(round(nss['price'] * nss['quantity'], 2)) nss['chargemode'] = i.get('productType') nss['servicename'] = i.get('serviceType') nss['chargeduration'] = i.get('time') nss['unit'] = i.get('timeUnit') nss['resourceids'] = ','.join(i['shortIds']) if i.get('shortIds') else '' nss['orderkey'] = i.get('key') try: # 保存配置configurations 存入specdata表中 if i.get('configurations'): specdata = json.dumps(i['configurations'], ensure_ascii=False) ns_specificdata = { 'id': uuid(), 'productid': product[0]['id'], 'spec_data': specdata, } nss['spec_id'] = ns_specificdata['id'] await sor.C('specificdata', ns_specificdata) except Exception as e: print('保存配置configurations失败', str(e)) with open('baidu_error.log', 'a') as f: f.write('保存配置configurations失败' + str(e) + '\n') # 如果是续费订单 由于没有返回日期, 重新计算日期 if order_type == 'RENEW': history_time_sql = "select resourcestarttime, resourceendtime from order_goods where FIND_IN_SET('%s', resourceids) order by resourceendtime desc;" % \ nss['resourceids'] history_time = await sor.sqlExe(history_time_sql, {}) new_end_time = await cal_expire_time(history_time=history_time[0]['resourceendtime'], chargeduration=nss['chargeduration'], unit=nss['unit']) # 开始日期不变 更新到期日期 nss['resourcestarttime'] = history_time[0]['resourcestarttime'] nss['resourceendtime'] = new_end_time else: if i.get('resourceStartTime'): nss['resourcestarttime'] = await time_convert(i.get('resourceStartTime')) else: nss['resourcestarttime'] = await time_convert(orders[0]['updateTime']) if i.get('resourceEndTime'): nss['resourceendtime'] = await time_convert(i.get('resourceEndTime')) # 后付费没有资源结束时间 if i.get('productType') == 'prepay': end_time = await time_convert(orders[0]['updateTime']) nss['resourceendtime'] = await cal_expire_time(history_time=end_time, chargeduration=nss['chargeduration'], unit=nss['unit']) else: nss['resourceendtime'] = None await sor.C('order_goods', nss) # 循环后更新订单中总价 await sor.U('bz_order', {'id': bz_ns['id'], 'amount': round(total_price, 2)}) except Exception as e: await baidu_order_cancel({'baidu_id': baidu_users[0]['baidu_id'], 'order_id': ns.get('order_id')}) import traceback ns_record = { 'orderid': ns.get('order_id'), 'ordertype': orders[0]['type'], 'userid': ns.get('userid'), 'reason': '发生错误, %s' % str(traceback.format_exc()) } await user_action_record(ns_record) with open('baiducloud_err.txt', 'w') as f: f.write(str(e) + str(traceback.format_exc())) traceback.print_exc() await sor.rollback() return { 'status': False, 'msg': '产品错误, 请联系售后' } # 判断用户账户余额是否足够支付 try: count = await getCustomerBalance(sor, orgid[0]['id']) if order_type == 'REFUND': count = total_price + 0.1 if round(total_price,2) <= count: #判断预付费或者后付费 if productType == 'prepay' or productType == 'postpay': # 调用扣费接口 affirmbz_order_ns = { 'sor': sor, 'orderid': bz_ns['id'], 'order_type': order_type } affirmbz_order_res = await affirmbz_order(affirmbz_order_ns) if not affirmbz_order_res['status']: # if True: await sor.rollback() ns_record = { 'orderid': ns.get('order_id'), 'ordertype': orders[0]['type'], 'userid': ns.get('userid'), 'reason': '支付错误, 请联系售后, %s' % affirmbz_order_res.get('msg') } await user_action_record(ns_record) return { 'status': False, 'msg': '支付错误, 请联系售后,', 'data': affirmbz_order_res } # 预配置local_refund用于本地操作 if order_type == 'REFUND' and not ns.get('local_refund'): return { 'status': True, 'msg': '本地退费成功' } else: # 调用支付订单接口 paydata = {'queryAccountId':baidu_users[0]['baidu_id'],'orderId':ns.get('order_id')} ns_format = '&'.join(['%s=%s' % (k, v) for k, v in ns.items()]) url = 'https://billing.baidubce.com/v1/order/pay?%s' % ns_format method = 'POST' header = { "Host": "billing.baidubce.com" } header = await get_auth_header(method=method, url=url, header=header) async with aiohttp_client.request( method=method, url=url, headers=header, json=paydata) as res: data_ = await res.json() if data_ == {'success': True}: ns_record = { 'orderid': ns.get('order_id'), 'ordertype': orders[0]['type'], 'userid': ns.get('userid'), 'reason': '购买成功' } await user_action_record(ns_record) ns_cron_job = { 'id': uuid(), 'source': 'baidu', 'orderid': ns.get('order_id'), 'ordertype': orders[0]['type'], 'userid': ns.get('userid'), 'reason': 'buy success' } await sor.C('baidu_cron_job', ns_cron_job) # return { # 'status': True, # 'orderid': bz_ns['id'] # } else: await sor.rollback() await baidu_order_cancel( {'baidu_id': baidu_users[0]['baidu_id'], 'order_id': ns.get('order_id')}) ns_record = { 'orderid': ns.get('order_id'), 'ordertype': orders[0]['type'], 'userid': ns.get('userid'), 'reason': '支付成功后, order_pay接口错误, 回滚, %s' % str(data_)[:400] } await user_action_record(ns_record) return { 'status': False, 'msg': '产品分配排队中, 请联系售后详询' } else: # 取消订单 await sor.rollback() paydata = {'queryAccountId': baidu_users[0]['baidu_id'], 'orderIds': [ns.get('order_id')]} ns_format = '&'.join(['%s=%s' % (k, v) for k, v in ns.items()]) url = 'https://billing.baidubce.com/v1/order/cancel?%s' % ns_format method = 'POST' header = { "Host": "billing.baidubce.com" } header = await get_auth_header(method=method, url=url, header=header) async with aiohttp_client.request( method=method, url=url, headers=header, json=paydata) as res: await res.json() ns_record = { 'orderid': ns.get('order_id'), 'ordertype': orders[0]['type'], 'userid': ns.get('userid'), 'reason': '无法购买后付费产品' } await user_action_record(ns_record) return {'status': False, 'msg': '无法购买后付费产品'} else: #取消订单 await sor.rollback() paydata = {'queryAccountId':baidu_users[0]['baidu_id'],'orderIds':[ns.get('order_id')]} ns_format = '&'.join(['%s=%s' % (k, v) for k, v in ns.items()]) url = 'https://billing.baidubce.com/v1/order/cancel?%s' % ns_format method = 'POST' header = { "Host": "billing.baidubce.com" } header = await get_auth_header(method=method, url=url, header=header) async with aiohttp_client.request( method=method, url=url, headers=header, json=paydata) as res: await res.json() ns_record = { 'orderid': ns.get('order_id'), 'ordertype': orders[0]['type'], 'userid': ns.get('userid'), 'reason': '该账号余额不足,无法完成购买' } await user_action_record(ns_record) return {'status': False,'msg': '该账号余额不足,无法完成购买'} except Exception as e: await baidu_order_cancel({'baidu_id': baidu_users[0]['baidu_id'], 'order_id': ns.get('order_id')}) import traceback with open('baiducloud_err.txt', 'w') as f: f.write(str(e)+ traceback.format_exc()) traceback.print_exc() await sor.rollback() return { 'status': False, 'msg': '产品错误, 请联系售后' } # 更新资源时间 资源id # if order_type == 'NEW': # await baidu_new_update_resouce(ns) return { 'status': True, 'orderid': bz_ns.get('id'), 'originalprice': bz_ns.get('originalprice'), 'servicename': bz_ns.get('servicename'), 'amount': total_price } async def get_order_list_base_page(baidu_id, pageNo=1, pageSize=500): ns = {'queryAccountId': baidu_id, 'pageNo': pageNo, 'pageSize': pageSize} method = 'POST' ns_format = '&'.join(['%s=%s' % (k, v) for k, v in ns.items()]) url = 'https://billing.baidubce.com/v1/order/list?%s' % ns_format header = { "Host": "billing.baidubce.com", "ContentType": "application/json;charset=UTF-8" } header = await get_auth_header(method=method, url=url, header=header) async with aiohttp_client.request( method=method, url=url, headers=header, json=ns) as res: data_orders = await res.json() return data_orders async def update_baidu_order_list(ns={}): """ ns = {'queryAccountId': '139fc7a23b314596ad78b6bb8e7c1503', 'orderType': 'REFUND'} :return: """ db = DBPools() async with db.sqlorContext('kboss') as sor: username = None # 更新机构下全部用户订单信息 if ns.get('orgid'): users_find_sql = """SELECT DISTINCT b.baidu_id FROM organization o INNER JOIN users u ON o.id = u.orgid INNER JOIN baidu_users b ON u.id = b.user_id WHERE o.parentid = '%s' AND b.del_flg = '0';""" % ns.get('orgid') users = await sor.sqlExe(users_find_sql, {}) # 更新个人订单信息 elif ns.get('userid'): users = await sor.R('baidu_users', {'user_id': ns['userid']}) username_li = await sor.R('users', {'id': ns['userid']}) username = username_li[0]['username'] if username_li else None else: users = [] update_count = 0 add_count = 0 for baidu_id in users: data_orders = await get_order_list_base_page(baidu_id['baidu_id'], pageNo=1, pageSize=1000) page_num_count = int(data_orders['totalCount'] / data_orders['pageSize']) + 1 for page_num in range(1, page_num_count + 1): data_orders = await get_order_list_base_page(baidu_id['baidu_id'], pageNo=page_num, pageSize=1000) orders = data_orders['orders'] for item in orders: updatetime = await time_convert(item.get('updateTime')) if item.get('updateTime') else None ns_dic = { "id": uuid(), "orderid": item.get("uuid"), "ordertype": item.get("type"), "accountid": item.get("accountId"), "servicetype": item.get("serviceType"), "producttype": item.get("productType"), "shortids": ','.join(item['shortIds']) if item.get('shortIds') else '', "price": item.get("price"), "status": item.get("status"), "autoreneworder": '1' if item.get("autoRenewOrder") else '0', "createtime": await time_convert(item.get('createTime')) if item.get( 'createTime') else None, "updatetime": updatetime } ns_exist_order = { 'orderid': item.get("uuid") } exist_order = await sor.R('baidu_orders', ns_exist_order) if exist_order and exist_order[0]['updatetime'] != updatetime: update_refund_sql = """UPDATE baidu_orders SET price = '%s', status = '%s', updatetime = '%s' WHERE orderid = '%s';""" % \ (item.get("price"), item.get("status"), updatetime, item.get("uuid")) await sor.sqlExe(update_refund_sql, {}) update_count += 1 if not exist_order: await sor.C('baidu_orders', ns_dic) add_count += 1 return { 'status': True, 'msg': '同步数据成功, 新增 %s 条, 更新 %s 条' % (add_count, update_count), 'data': { 'username': username } } async def baidu_confirm_refund_order(ns={}): # ns = { # 'order_id': ["2996f0baf34c4a0a98e1da0b4e290a35"], # 'userid': 'y_xQK0G62dtZT5EneMQFT' # } import asyncio # 把 NEED_CONFIRM的订单同步到本地库,用于后续状态更新 await update_baidu_order_list({'userid': ns.get('userid')}) db = DBPools() async with db.sqlorContext('kboss') as sor: users = await sor.R('baidu_users', {'user_id': ns['userid']}) ns['baidu_id'] = users[0]['baidu_id'] if users else None if not ns['baidu_id']: return { 'status': False, 'msg': '用户 %s 未绑定百度智能云账号' % ns.get('userid') } orders = json.loads(ns.get('order_id')) if isinstance(ns.get('order_id'), str) else ns.get('order_id') for order_id in orders: db = DBPools() async with db.sqlorContext('kboss') as sor: refund_status_li = await sor.R('baidu_orders', {'orderid': order_id}) refundstatus = refund_status_li[0]['refundstatus'] refund_id = refund_status_li[0]['id'] ns_record_li = await sor.R('user_action', {'orderid': order_id}) ns_record_id = ns_record_li[0]['id'] if ns_record_li else None ns_record = {'id': ns_record_id} if ns_record_id else None if not refundstatus: # data_ = {} # 调用支付订单接口 paydata = {'queryAccountId': ns.get('baidu_id'), 'orderId': order_id} ns_format = '&'.join(['%s=%s' % (k, v) for k, v in ns.items()]) url = 'https://billing.baidubce.com/v1/order/pay?%s' % ns_format method = 'POST' header = { "Host": "billing.baidubce.com" } header = await get_auth_header(method=method, url=url, header=header) async with aiohttp_client.request( method=method, url=url, headers=header, json=paydata) as res: data_ = await res.json() if data_ == {'success': True}: # if True: ns_record = { 'id': uuid(), 'orderid': order_id, 'ordertype': 'REFUND', 'userid': ns.get('userid'), 'reason': '远程退款成功' } await user_action_record(ns_record) db = DBPools() async with db.sqlorContext('kboss') as sor: await sor.U('baidu_orders', {'id': refund_id, 'refundstatus': '1'}) # 增加延迟 await asyncio.sleep(4) # 延迟2-3秒还是获取到 ready状态的订单,那就重复请求一次,目的是尽快刷新状态 async with aiohttp_client.request( method=method, url=url, headers=header, json=paydata) as res: await res.json() # 把 NEED_CONFIRM的本地库改为CREATED await update_baidu_order_list({'userid': ns.get('userid')}) else: ns_record = { 'orderid': order_id, 'ordertype': 'REFUND', 'userid': ns.get('userid'), 'reason': '产品退费失败, %s' % str(data_)[:400] } await user_action_record(ns_record) return { 'status': False, 'msg': '产品退款出错!%s' % str(data_)[:400] } if refundstatus == '2': """ 退款状态为2, 说明退款成功, 无需继续处理 """ continue # 获取created状态后再去退款 local_refund_status = await get_baidu_orderlist({'order_id': order_id, 'userid': ns.get('userid')}) print('local_refund_status', local_refund_status) if local_refund_status.get('status'): db = DBPools() async with db.sqlorContext('kboss') as sor: baidu_orders_status_update = """update baidu_orders set refundstatus='2' where id='%s';""" % refund_id await sor.sqlExe(baidu_orders_status_update, {}) await sor.U('user_action', {'id': ns_record.get('id'), 'ordertype': 'REFUND', 'reason': '远程退款成功, 本地客户退款成功'}) continue # return { # 'status': True, # 'msg': '百度云给平台退款成功,平台给客户退款成功' # } else: if local_refund_status.get('msg') == 'delay_order': return { 'status': False, 'msg': '百度远程订单还未生成, 请十秒后重试, %s' % str(local_refund_status) } return { 'status': False, 'msg': '百度云退款成功,本机构给客户退款出错!, %s' % str(local_refund_status) } return { 'status': True, 'msg': '退款处理完成' } ret = await baidu_confirm_refund_order(params_kw) return ret