async def resource_offline(ns={}): # ns = { # "accountId": "139fc7a23b314596ad78b6bb8e7c1503", # "service": "EIP", # "region": "bj" # } method = 'POST' ns_format = '&'.join(['%s=%s' % (k, v) for k, v in ns.items()]) url = 'https://billing.baidubce.com/v1/resource/opt/offline?%s' % ns_format header = { "Host": "billing.baidubce.com" } header = await get_auth_header(method=method, url=url, header=header) async with aiohttp_client.request( method=method, url=url, headers=header, json=ns) as res: data_ = await res.json() print('下线返回结果, ', data_) async def resource_online(ns={}): # ns = { # "accountId": "139fc7a23b314596ad78b6bb8e7c1503", # "service": "EIP", # "region": "bj" # } method = 'POST' ns_format = '&'.join(['%s=%s' % (k, v) for k, v in ns.items()]) url = 'https://billing.baidubce.com/v1/resource/opt/online?%s' % ns_format header = { "Host": "billing.baidubce.com" } header = await get_auth_header(method=method, url=url, header=header) async with aiohttp_client.request( method=method, url=url, headers=header, json=ns) as res: data_ = await res.json() print('上线返回结果, ', data_) async def get_valid_admin_person(nss={}): db = DBPools() async with db.sqlorContext('kboss') as sor: # 查找客户所在的机构 who_is_org = (await sor.R('organization', {'id': nss['customerid']}))[0]['parentid'] # 查找销售的电话 saleman_phone = 0 saleman_id_li = await sor.R('customer', {'customerid': nss['customerid'], 'del_flg': '0'}) if saleman_id_li and saleman_id_li[0]['salemanid']: saleman_phone_li = await sor.R('users', {'id': saleman_id_li[0]['salemanid']}) if saleman_phone_li and saleman_phone_li[0]['mobile']: saleman_phone = saleman_phone_li[0]['mobile'] return saleman_id_li[0]['salemanid'], saleman_phone if (not saleman_phone) or (not saleman_id_li): # 首先查找运营电话 yunying_mobile = 0 current_yunying_id = await get_role_user_from_org(orgid=who_is_org, role='运营') if current_yunying_id: yunying_mobile = (await sor.R('users', {'id': current_yunying_id, 'del_flg': '0'}))[0]['mobile'] if yunying_mobile: return current_yunying_id, yunying_mobile if (not yunying_mobile) or (not current_yunying_id): # 查找管理员电话 current_admin_id = await get_role_user_from_org(orgid=who_is_org, role='管理员') if current_admin_id: admin_mobile = (await sor.R('users', {'id': current_admin_id, 'del_flg': '0'}))[0]['mobile'] if admin_mobile: return current_admin_id, admin_mobile print('%s 没有找到对应的管理人员进行电话,站内信发送' % nss['customerid']) return None, None async def innder_mail_add_msg(nss={}): """ :param ns: :return: senderid发送人id receiverid收件人id msgtitle消息标头 msgtext消息内容 """ user_orgid = nss['user_orgid'] user_id = nss['userid'] mail_should_send = False send_type = '欠费通知' # 记录到欠费表中 # 首先查询欠费表中时间是否再范围内 db = DBPools() async with db.sqlorContext('kboss') as sor: customer_name = (await sor.R('organization', {'id': user_orgid}))[0]['orgname'] mail_exist_li = await sor.R('message', {'receiverid': user_id, 'send_type': send_type, 'sort': ['sendtime desc']}) saleman_id, saleman_mobile = await get_valid_admin_person({'customerid': user_orgid}) if mail_exist_li: # 如果原来发送成功 时间范围大于24小时 再次发送 sms_exist_time = mail_exist_li[0]['sendtime'] time_diff = await get_time_diff(sms_exist_time, time.strftime('%Y-%m-%d %H:%M:%S')) # 如果发送过的短信在一天范围内 不再发送短信 if time_diff >= 24 * 60: mail_should_send = True else: print('%s 站内信已经发送过, 但是没有超过阈值时间, 不再发送...' % customer_name) if mail_should_send or not mail_exist_li: ns_customer = { 'id': uuid(), 'senderid': 'inner', 'receiverid': user_id, 'msgtitle': '系统新消息通知', 'msgtype': '欠费通知', 'msgtext': '尊敬的用户, 您购买的产品 %s 已欠费, 请及时续费以免影响正常使用。' % nss.get('productname'), } ns_saleman = { 'id': uuid(), 'senderid': 'inner', 'receiverid': saleman_id, 'msgtitle': '系统新消息通知', 'msgtype': '欠费通知', 'msgtext': '您好, 客户 %s 购买的产品 %s 已欠费, 请及时联系通知。' % (customer_name, nss.get('productname')), } await sor.C('message', ns_customer) await sor.C('message', ns_saleman) return {'status': True, 'msg': '添加成功'} async def get_time_diff(time_old=None, time_new=None): # # 定义两个时间字符串 # time_old = '2023-12-01 09:40:00' # time_new = '2023-12-01 23:15:00' # 将时间字符串解析为datetime对象 time_dt1 = datetime.datetime.strptime(time_old, '%Y-%m-%d %H:%M:%S') time_dt2 = datetime.datetime.strptime(time_new, '%Y-%m-%d %H:%M:%S') # 计算时间差 time_difference = time_dt2 - time_dt1 # 提取时间差中的分钟数 minutes_difference = time_difference.total_seconds() / 60 # day = time_difference.days # hour = time_difference.total_seconds() / 3600 return minutes_difference async def diff_sms_send_save(sor=None, productname=None, time_interval=None, send_type=None, user_orgid=None, sms_send_dict=None): sms_should_send = False send_type = send_type # 记录到欠费表中 # 首先查询欠费表中时间是否再范围内 try: print('引用发送短信...') customer_phone = (await sor.R('organization', {'id': user_orgid}))[0]['contactor_phone'] customer_name = (await sor.R('organization', {'id': user_orgid}))[0]['orgname'] sms_exist_li = await sor.R('sms_record', {'mobile': customer_phone, 'send_type': send_type, 'send_status': '1', 'sort': ['send_time desc']}) if sms_exist_li: # 如果原来发送成功 时间范围大于24小时 再次发送 if sms_exist_li[0]['send_status']: sms_exist_time = sms_exist_li[0]['send_time'] time_diff = await get_time_diff(sms_exist_time, time.strftime('%Y-%m-%d %H:%M:%S')) # 如果发送过的短信在一天范围内 不再发送短信 if time_diff >= time_interval: sms_should_send = True else: print('%s %s短信已经发送过, 但是没有超过阈值时间, 不再发送...' % (send_type, customer_phone)) else: print('%s %s短信没有发送过, 发送...' % (send_type, customer_phone)) sms_should_send = True if sms_should_send or not sms_exist_li: # 给个人发送短信 await send_vcode(customer_phone, send_type, sms_send_dict) # 给相关管理人员发送短信 saleman_id, saleman_mobile = await get_valid_admin_person({'customerid': user_orgid}) if saleman_mobile: sms_send_dict_saleman = { 'username': customer_name, 'productname': productname } if send_type == '用户欠费通知': print('sms_send_dict_saleman', sms_send_dict_saleman) await send_vcode(saleman_mobile, '责任人用户欠费通知', sms_send_dict_saleman) except Exception as e: print('发送短信失败: %s' % e) return { 'status': False, 'msg': '发送短信失败', 'data': e } async def baidu_post_pay_charge(bill_exist=None, info_detail=None, baidu_orgid=None, userid=None, baidu_id=None, user_orgid=None, user_parentid=None, sor=None): """ 计算折扣后的价格 获取针对个人的折扣配置 :param ns: :return: """ info_detail_id = info_detail['id'] bill_date = info_detail['starttime'] financeprice = info_detail['catalogprice'] servicetype = info_detail['servicetype'] # utc时间转本地时间 # bill_date_local_time = await utc2local(bill_date) # 查找对应的产品id productid_li = await sor.R('product', {'providerid': baidu_orgid, 'providerpid': 'baidu_' + servicetype, 'del_flg': '0'}) if productid_li: productid_soure = productid_li[0]['id'] product_name = productid_li[0]['name'] else: productid_soure = None product_name = None real_discount = 1.0 person_discount_sql = """select * from saleprotocol where offer_orgid = '%s' and bid_orgid = '%s' and salemode = '0' and del_flg = '0' and CURRENT_DATE <= end_date and CURRENT_DATE >= start_date""" % (user_parentid, user_orgid) tongyi_discount_sql = """select * from saleprotocol where offer_orgid = '%s' and bid_orgid = '*' and salemode = '0' and del_flg = '0' and CURRENT_DATE <= end_date and CURRENT_DATE >= start_date""" % user_parentid # 没有个人配置 获取百度产品统一配置折扣 for sql_detail in [person_discount_sql, tongyi_discount_sql]: person_discount_li = await sor.sqlExe(sql_detail, {}) for person_discount in person_discount_li: protocolid = person_discount['id'] xieyi_zi_search_li = await sor.R('product_salemode', {'protocolid': protocolid, 'del_flg': '0'}) for xieyi_zi in xieyi_zi_search_li: provider_zi_id = xieyi_zi['providerid'] zi_discount = xieyi_zi['discount'] zi_productid = xieyi_zi['productid'] if provider_zi_id == baidu_orgid and zi_productid == productid_soure: # 判断产品是否存在 有可能在产品表已经删除 prd_res_exists_li = await sor.R('product', {'id': xieyi_zi['productid'], 'del_flg': '0'}) if not prd_res_exists_li: continue real_discount = float(zi_discount) break if real_discount != 1.0: break # 计算实际支付费用 amount = round(real_discount * float(financeprice), 6) # 根据params表中bussiness_date进行扣费 bussiness_date = (await sor.R('params', {'pname': 'business_date', 'del_flg': '0'}))[0]['pvalue'] nss_bill = { 'id': uuid(), 'customerid': user_orgid, 'business_op': 'BUY', 'provider_amt': financeprice, 'amount': amount, 'bill_date': bussiness_date, 'bill_timestamp': bussiness_date + ' 00:00:00', 'bill_state': 0, 'productid': productid_soure, 'providerid': baidu_orgid, 'quantity': 1 } # 比对账户余额和账单金额 db_balance = DBPools() async with db_balance.sqlorContext('kboss') as sor_balance: balance_yuan = await getCustomerBalance(sor_balance, user_orgid) print('%s orgid: %s, 余额为: %s' % (time.strftime('%Y-%m-%d %H:%M:%S'), user_orgid, balance_yuan)) print('将要扣除的金额: %s' % nss_bill['amount']) if balance_yuan < amount: sms_send_dict = { 'time': time.strftime('%Y-%m-%d %H:') + '00:00', 'productname': product_name } db_sms = DBPools() async with db_sms.sqlorContext('kboss') as sor_sms: await diff_sms_send_save(sor=sor_sms, productname=product_name, time_interval=24*60, send_type='用户欠费通知', user_orgid=user_orgid, sms_send_dict=sms_send_dict) await innder_mail_add_msg({'user_orgid': user_orgid, 'userid': userid, 'productname': product_name}) # 查找是否已经存在 exist_qianfei = await sor.R('arrears', {'billid': info_detail['billid']}) if exist_qianfei: print('%s 欠费账单已经存在...' % info_detail['billid']) else: nss_bill['starttime'] = time.strftime('%Y-%m-%d %H:%M:%S') nss_bill['source'] = '百度智能云' nss_bill['localid'] = info_detail['id'] nss_bill['billid'] = info_detail['billid'] nss_bill['del_flg'] = 0 await sor.C('arrears', nss_bill) # 直接创建账单 不再进行扣费再创建账单流程 if not bill_exist: info_detail['kaiyuanprice'] = amount await sor.C('baidu_post_bill', info_detail) print('%s 用户%s 余额不足' % (time.strftime('%Y-%m-%d %H:%M:%S'), user_orgid)) # 产品欠费下线 ns_offline = { "accountId": baidu_id, "service": servicetype, "region": info_detail['region'] } print('*************', ns_offline) await resource_offline(ns_offline) return # 如果当前余额不足当前账单24小时内支付 发送预警短信 if amount * 24 > balance_yuan: sms_send_dict = { 'time': time.strftime('%Y-%m-%d %H:') + '00:00', 'balance': int(balance_yuan) } dbs = DBPools() async with dbs.sqlorContext('kboss') as sor_sms: await diff_sms_send_save(sor=sor_sms, productname=product_name, time_interval=24*60*2, send_type='余额不足提醒', user_orgid=user_orgid, sms_send_dict=sms_send_dict) # 记账 # 日期 12-06/12-07 凌晨扣费时出错 由于是事务, 12-06/12-07同时出错 无法扣费 # 所以另开事务 至少有一天可以扣费成功 # 扣费不成功的原因: # business_date和账单日期对不上/acc_balance余额表中所有日期存在大于账单日期 # 余额部分满足时 扣除可用余额后 事务中余额没变化 此处创建新的事务 db_bill = DBPools() async with db_bill.sqlorContext('kboss') as sor_bill: try: ba = BillAccounting(nss_bill) await ba.accounting(sor_bill) # 不存在就在此处创建 if not bill_exist: info_detail['bill_state'] = 1 info_detail['kaiyuanprice'] = amount await sor_bill.C('baidu_post_bill', info_detail) # 修改本地bill状态 0:未支付,1:已支付;2:已取消 # 已经存在就更新 ns_bill_status = { 'id': info_detail_id, 'bill_state': 1, 'kaiyuanprice': amount } await sor_bill.U('baidu_post_bill', ns_bill_status) print('百度账单扣费: %s, 扣费成功' % info_detail['billid']) except Exception as e: print('用户: %s, 账单: %s, 扣费失败: %s' % (user_orgid, info_detail['billid'], e)) if not bill_exist: await sor_bill.C('baidu_post_bill', info_detail) async def save_baidu_post_bill(data_bill, userid=None, baidu_id=None, user_orgid=None, user_parentid=None, sor=None): try: # 查找百度orgid 方便一下查找对应产品的productid baidu_orgid_li = await sor.R('organization', {'orgname': '百度智能云'}) if baidu_orgid_li: baidu_orgid = baidu_orgid_li[0]['id'] else: raise ValueError("无法找到百度机构id, 百度供应商名称是: 百度智能云") # count = 0 for bill_detail in data_bill['bills']: # count += 1 # if count > 9: # return info_detail = { 'id': uuid(), 'billid': bill_detail.get('billId'), 'accountid': data_bill['subAccountId'], 'servicetype': bill_detail.get('serviceType'), 'servicetypename': bill_detail.get('serviceTypeName'), 'producttype': bill_detail.get('productType'), 'region': bill_detail.get('region'), 'instanceid': bill_detail.get('instanceId'), 'shortid': bill_detail.get('shortId'), 'starttime': bill_detail.get('startTime'), 'endtime': bill_detail.get('endTime'), 'configurationch': bill_detail.get('configurationCH'), 'unitprice': bill_detail.get('unitPrice'), 'pricingunit': bill_detail.get('pricingUnit'), 'chargeitem': bill_detail.get('chargeItem'), 'chargeitemdesc': bill_detail.get('chargeItemDesc'), 'amount': bill_detail.get('amount'), 'amountunit': bill_detail.get('amountUnit'), 'discountamount': bill_detail.get('discountAmount'), 'originprice': bill_detail.get('originPrice'), 'catalogprice': bill_detail.get('catalogPrice'), 'financeprice': bill_detail.get('financePrice'), 'couponprice': bill_detail.get('couponPrice'), 'discountcouponprice': bill_detail.get('discountCouponPrice'), 'cashequivalentcouponprice': bill_detail.get('cashEquivalentCouponPrice'), 'discountprice': bill_detail.get('discountPrice'), 'sysgold': bill_detail.get('sysGold'), 'deductprice': bill_detail.get('deductPrice'), 'originconfig': json.dumps(bill_detail), 'bill_state': 0, 'del_flg': '0' } db_sor_bill = DBPools() async with db_sor_bill.sqlorContext('kboss') as sor_find_bill: exist_bill_id = await sor_find_bill.R('baidu_post_bill', {'billid': info_detail['billid']}) bill_exist = 0 if exist_bill_id: if exist_bill_id[0]['bill_state'] == '0': print('%s 百度后付费账单已经存在, 没有扣款, 账单号: %s' % (user_orgid, info_detail['billid'])) # 此时id已经存在 不再生成新的id info_detail['id'] = exist_bill_id[0]['id'] bill_exist = 1 else: # print('%s 百度后付费账单已经存在, 已经扣款, 账单号: %s' % (user_orgid, info_detail['billid'])) continue else: pass # 数据库同时插入和更新死锁 此处不再插入 # await sor.C('baidu_post_bill', info_detail) # 扣费 await baidu_post_pay_charge(bill_exist=bill_exist, info_detail=info_detail, baidu_orgid=baidu_orgid, userid=userid, baidu_id=baidu_id, user_orgid=user_orgid, user_parentid=user_parentid, sor=sor) except Exception as e: print('save_baidu_post_bill: %s' % e) raise e async def get_auth_header(method: str, url: str, header: dict, query={}, get_res=False): import urllib import hmac AK = 'ALTAKPk92fX9cgGDax83yNL8mP' SK = '9b16b8efd4dc463d8bbd5462db1db8a5' # 解析uri url_parse = urllib.parse.urlparse(url) uri = url_parse.path url_query = url_parse.query # 获取query if url_query: query = dict(urllib.parse.parse_qsl(url_query)) # 2.x-bce-date x_bce_date = time.gmtime() x_bce_date = time.strftime('%Y-%m-%dT%H:%M:%SZ', x_bce_date) # 4.认证字符串前缀 authStringPrefix = "bce-auth-v1" + "/" + AK + "/" + x_bce_date + "/" + "1800" # windows下为urllib.parse.quote,Linux下为urllib.quote # 5.生成CanonicalRequest # 5.1生成CanonicalURI CanonicalURI = urllib.parse.quote(uri) # 如果您调用的接口的query比较复杂的话,需要做额外处理 # 5.2生成CanonicalQueryString # CanonicalQueryString = query result = ['%s=%s' % (k, urllib.parse.quote(str(v))) for k, v in query.items() if k.lower != 'authorization'] result.sort() CanonicalQueryString = '&'.join(result) # 5.3生成CanonicalHeaders result = [] signedHeaders_list = [] for key,value in header.items(): tempStr = str(urllib.parse.quote(key.lower(),safe="")) + ":" + str(urllib.parse.quote(value,safe="")) signedHeaders_list.append(str(urllib.parse.quote(key.lower(),safe=""))) result.append(tempStr) result.sort() signedHeaders = ';'.join(sorted(signedHeaders_list)) CanonicalHeaders = "\n".join(result) # 5.4拼接得到CanonicalRequest CanonicalRequest = method + "\n" + CanonicalURI + "\n" + CanonicalQueryString +"\n" + CanonicalHeaders # 6.生成signingKey signingKey = hmac.new(SK.encode('utf-8'), authStringPrefix.encode('utf-8'), hashlib.sha256) # 7.生成Signature Signature = hmac.new((signingKey.hexdigest()).encode('utf-8'), CanonicalRequest.encode('utf-8'), hashlib.sha256) # 8.生成Authorization并放到header里 header['Authorization'] = authStringPrefix + "/" + signedHeaders + "/" + Signature.hexdigest() if get_res: # 异步请求链接 返回结果 async with aiohttp_client.request( method=method, url=url, headers=header, allow_redirects=True, json=query) as res: return res else: return header async def arrears_charge(customerid=None, source=None): """ 优先扣除欠费表中的账单 :param ns: :return: """ # customerid = 'gKG8UaYPjA_29K7puzaTM' # source = '百度智能云' db_arrears = DBPools() async with db_arrears.sqlorContext('kboss') as sor_arrears: bill_date = (await sor_arrears.R('params', {'pname': 'business_date', 'del_flg': '0'}))[0]['pvalue'] try: arrear_li = await sor_arrears.R('arrears', {'source': source, 'customerid': customerid, 'bill_state': '0'}) arrear_product_types = [] for arr in arrear_li: nss_bill = { 'id': uuid(), 'customerid': arr['customerid'], 'business_op': arr['business_op'], 'provider_amt': arr['provider_amt'], 'amount': arr['amount'], 'bill_date': bill_date, 'bill_timestamp': bill_date + ' 00:00:00', 'bill_state': 0, 'productid': arr['productid'], 'providerid': arr['providerid'], 'quantity': arr['quantity'] } ba = BillAccounting(nss_bill) await ba.accounting(sor_arrears) # 修改本地bill状态 0:未支付,1:已支付;2:已取消 # 已经存在就更新 arrear_status = { 'id': arr['id'], 'bill_state': 1, 'endtime': time.strftime('%Y-%m-%d %H:%M:%S') } await sor_arrears.U('arrears', arrear_status) print('%s 百度账单-欠费账单-扣费成功, 状态更新成功: %s' % (arr['customerid'], arr['id'])) baidu_post_bill_status = { 'id': arr['localid'], 'bill_state': 1 } await sor_arrears.U('baidu_post_bill', baidu_post_bill_status) print('%s 百度账单-欠费账单扣费成功---本地账单-更新成功, 状态更新成功: %s' % (arr['customerid'], arr['billid'])) # 获取百度下线类型列表 用于产品上线 products = await sor_arrears.R('baidu_post_bill', {'id': arr['localid']}) ns_type = { "accountId": products[0]['accountid'], "service": products[0]['servicetype'], "region": products[0]['region'] } arrear_product_types.append(ns_type) return { 'status': True, 'data': [dict(t) for t in {frozenset(d.items()) for d in arrear_product_types}] } except Exception as e: print('用户: %s, 欠费账单扣费失败: %s' % (customerid, e)) return { 'status': False } async def get_resourcechargeitem_billlist(userid=None, baidu_id=None, user_orgid=None, user_parentid=None, pageno=1, sor=None): """ 分页获取后付费资源计费项账单详情, 颗粒度:小时 不能查看当月账单 1. 账单过来后余额充足正常扣费 1048条:扣费前余额:18327.51 扣费后余额: 18267.13 实际扣除:60.38 实际账单应扣除总额: 62.4347 前10条: 扣费前余额:18267.13 扣费后余额: 18266.93 实际扣除:0.2 实际账单应扣除总额: 0.2255 2. 账单过来后余额不足短信通知 2.1 账单过来后余额不足短信通知-时间范围内是否重复发送 2.1.1 重复发送 创建一个单独事务 修复 2.1.2 充值-但只足够部分账单扣费 余额查询创建单独事务 修复 2.2 账单过来后余额不足短信通知-超过时间范围是否发送 正常发送 2.3 是否在欠费账单里 2.3.1 后续还在欠费 是否在欠费账单重复 不重复 2.3.2 后续余额充足 是否把欠费补充完成 是 3. 余额充足 3个账单 正常扣费 balance由于是小数点两位 造成误差 余额充足 但是不够24小时费用 正常扣费 客户短信正常发送 销售短信发送失败:productname=None 修复 余额部分满足 正常扣费 短信不能正常发送/post_bill kaiyuanprice为空没有正常录入 创建新事务 修复 欠费表已存在 重复请求是否重复 不重复 欠费表已存在 余额充足 是否正常扣费 欠费表和post_bill表重复扣费 欠费表创建单独的事务 在查询存在账单时创建事务 修复 欠费表单独事务 - 每一次欠费项一个事务 否则造成欠费表同时不成功 但post_bill表反而扣费成功了 造成不同步 (测试发现状态可以正常改变) 存在问题 1. 小数点过小无法扣除 实际扣除0.0 应扣除0.0205 2. import urllib hmac 3. accounting.bill.BillAccounting 线上删除accounting.bill 此处报错 但账单已保存 测试 :param ns: :return: """ # 优先扣除欠费信息 欠费表/取消释放/忽略小数点后几位较小误差 arrear_types = await arrears_charge(customerid=user_orgid, source='百度智能云') # 扣费成功 开启下线的服务 if arrear_types and arrear_types.get('status') and arrear_types.get('data'): for roline in arrear_types['data']: await resource_online(roline) # 获取当前日期和时间 current_time = datetime.datetime.now() # 指定开始日期 # date_string = "2023-11-30" # format = '%Y-%m-%d' # specifify the format of the date_string. # current_time = datetime.datetime.strptime(date_string, format) # 计算七天前的日期 days_ago = current_time - datetime.timedelta(days=7) # 获取账单百度指定必须在同一个月内 if current_time.month != days_ago.month: days_ago = datetime.datetime(current_time.year, current_time.month, 1) # 将日期时间格式化为字符串 current_day = current_time.strftime("%Y-%m-%d") days_ago_time = days_ago.strftime("%Y-%m-%d") ns = { "beginTime": days_ago_time, "endTime": current_day, "queryAccountId" : baidu_id, "pageNo": pageno, "pageSize": 100 } method = 'POST' ns_format = '&'.join(['%s=%s' % (k, v) for k, v in ns.items()]) url = 'https://billing.baidubce.com/v1/bill/resource/chargeitem?%s' % ns_format header = { "Host": "billing.baidubce.com" } header = await get_auth_header(method=method, url=url, header=header) async with aiohttp_client.request( method=method, url=url, headers=header, json=ns) as res: data_bill = await res.json() if data_bill.get('bills'): await save_baidu_post_bill(data_bill=data_bill, userid=userid, baidu_id=baidu_id, user_orgid=user_orgid, user_parentid=user_parentid, sor=sor) real_page_no = int(data_bill['totalCount'] / data_bill['pageSize']) + 1 if data_bill['pageNo'] < real_page_no: await get_resourcechargeitem_billlist(userid=userid, baidu_id=baidu_id, user_orgid=user_orgid, user_parentid=user_parentid, pageno=data_bill['pageNo'] + 1, sor=sor) else: print('%s 账号ID: %s 后付费账单同步成功, 同步页面数量: %s' % (current_time.strftime("%Y-%m-%d %H:%M:%S"), data_bill['subAccountId'], data_bill['pageNo'])) else: print('%s 账号ID: %s 或空bills, 后付费账单同步成功, 同步页面数量: %s' % ( current_time.strftime("%Y-%m-%d %H:%M:%S"), data_bill['subAccountId'], data_bill['pageNo'])) async def baidu_users_get_post_pay(ns={}): db = DBPools() async with db.sqlorContext('kboss') as sor: # 获取所有百度用户 baidu_users = await sor.R('baidu_users', {'del_flg': '0'}) for baidu_user in baidu_users: userid = baidu_user['user_id'] baidu_id = baidu_user['baidu_id'] if baidu_id != 'a6f0dbc20f074ea18b4d3ac3ec77d537': continue try: user_orgid = (await sor.R('users', {'id': userid}))[0]['orgid'] user_parentid = (await sor.R('organization', {'id': user_orgid}))[0]['parentid'] await get_resourcechargeitem_billlist(userid=userid, baidu_id=baidu_id, user_orgid=user_orgid, user_parentid=user_parentid, pageno=1, sor=sor) except Exception as e: return { 'status': False, 'msg': '百度后付费账单发生错误', 'data': e } return { 'status': True, 'msg': '百度账户后付费扣费成功...' } ret = await baidu_users_get_post_pay(params_kw) return ret