kboss/b/baiduc/baidu_users_get_post_pay.dspy
2025-08-26 11:07:56 +08:00

666 lines
30 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

async def resource_offline(ns={}):
# ns = {
# "accountId": "139fc7a23b314596ad78b6bb8e7c1503",
# "service": "EIP",
# "region": "bj"
# }
method = 'POST'
ns_format = '&'.join(['%s=%s' % (k, v) for k, v in ns.items()])
url = 'https://billing.baidubce.com/v1/resource/opt/offline?%s' % ns_format
header = {
"Host": "billing.baidubce.com"
}
header = await get_auth_header(method=method, url=url, header=header)
async with aiohttp_client.request(
method=method,
url=url,
headers=header,
json=ns) as res:
data_ = await res.json()
print('下线返回结果, ', data_)
async def resource_online(ns={}):
# ns = {
# "accountId": "139fc7a23b314596ad78b6bb8e7c1503",
# "service": "EIP",
# "region": "bj"
# }
method = 'POST'
ns_format = '&'.join(['%s=%s' % (k, v) for k, v in ns.items()])
url = 'https://billing.baidubce.com/v1/resource/opt/online?%s' % ns_format
header = {
"Host": "billing.baidubce.com"
}
header = await get_auth_header(method=method, url=url, header=header)
async with aiohttp_client.request(
method=method,
url=url,
headers=header,
json=ns) as res:
data_ = await res.json()
print('上线返回结果, ', data_)
async def get_valid_admin_person(nss={}):
db = DBPools()
async with db.sqlorContext('kboss') as sor:
# 查找客户所在的机构
who_is_org = (await sor.R('organization', {'id': nss['customerid']}))[0]['parentid']
# 查找销售的电话
saleman_phone = 0
saleman_id_li = await sor.R('customer', {'customerid': nss['customerid'], 'del_flg': '0'})
if saleman_id_li and saleman_id_li[0]['salemanid']:
saleman_phone_li = await sor.R('users', {'id': saleman_id_li[0]['salemanid']})
if saleman_phone_li and saleman_phone_li[0]['mobile']:
saleman_phone = saleman_phone_li[0]['mobile']
return saleman_id_li[0]['salemanid'], saleman_phone
if (not saleman_phone) or (not saleman_id_li):
# 首先查找运营电话
yunying_mobile = 0
current_yunying_id = await get_role_user_from_org(orgid=who_is_org, role='运营')
if current_yunying_id:
yunying_mobile = (await sor.R('users', {'id': current_yunying_id, 'del_flg': '0'}))[0]['mobile']
if yunying_mobile:
return current_yunying_id, yunying_mobile
if (not yunying_mobile) or (not current_yunying_id):
# 查找管理员电话
current_admin_id = await get_role_user_from_org(orgid=who_is_org, role='管理员')
if current_admin_id:
admin_mobile = (await sor.R('users', {'id': current_admin_id, 'del_flg': '0'}))[0]['mobile']
if admin_mobile:
return current_admin_id, admin_mobile
print('%s 没有找到对应的管理人员进行电话,站内信发送' % nss['customerid'])
return None, None
async def innder_mail_add_msg(nss={}):
"""
:param ns:
:return:
senderid发送人id
receiverid收件人id
msgtitle消息标头
msgtext消息内容
"""
user_orgid = nss['user_orgid']
user_id = nss['userid']
mail_should_send = False
send_type = '欠费通知'
# 记录到欠费表中
# 首先查询欠费表中时间是否再范围内
db = DBPools()
async with db.sqlorContext('kboss') as sor:
customer_name = (await sor.R('organization', {'id': user_orgid}))[0]['orgname']
mail_exist_li = await sor.R('message', {'receiverid': user_id, 'send_type': send_type, 'sort': ['sendtime desc']})
saleman_id, saleman_mobile = await get_valid_admin_person({'customerid': user_orgid})
if mail_exist_li:
# 如果原来发送成功 时间范围大于24小时 再次发送
sms_exist_time = mail_exist_li[0]['sendtime']
time_diff = await get_time_diff(sms_exist_time, time.strftime('%Y-%m-%d %H:%M:%S'))
# 如果发送过的短信在一天范围内 不再发送短信
if time_diff >= 24 * 60:
mail_should_send = True
else:
print('%s 站内信已经发送过, 但是没有超过阈值时间, 不再发送...' % customer_name)
if mail_should_send or not mail_exist_li:
ns_customer = {
'id': uuid(),
'senderid': 'inner',
'receiverid': user_id,
'msgtitle': '系统新消息通知',
'msgtype': '欠费通知',
'msgtext': '尊敬的用户, 您购买的产品 %s 已欠费, 请及时续费以免影响正常使用。' % nss.get('productname'),
}
ns_saleman = {
'id': uuid(),
'senderid': 'inner',
'receiverid': saleman_id,
'msgtitle': '系统新消息通知',
'msgtype': '欠费通知',
'msgtext': '您好, 客户 %s 购买的产品 %s 已欠费, 请及时联系通知。' % (customer_name, nss.get('productname')),
}
await sor.C('message', ns_customer)
await sor.C('message', ns_saleman)
return {'status': True, 'msg': '添加成功'}
async def get_time_diff(time_old=None, time_new=None):
# # 定义两个时间字符串
# time_old = '2023-12-01 09:40:00'
# time_new = '2023-12-01 23:15:00'
# 将时间字符串解析为datetime对象
time_dt1 = datetime.datetime.strptime(time_old, '%Y-%m-%d %H:%M:%S')
time_dt2 = datetime.datetime.strptime(time_new, '%Y-%m-%d %H:%M:%S')
# 计算时间差
time_difference = time_dt2 - time_dt1
# 提取时间差中的分钟数
minutes_difference = time_difference.total_seconds() / 60
# day = time_difference.days
# hour = time_difference.total_seconds() / 3600
return minutes_difference
async def diff_sms_send_save(sor=None, productname=None, time_interval=None, send_type=None, user_orgid=None, sms_send_dict=None):
sms_should_send = False
send_type = send_type
# 记录到欠费表中
# 首先查询欠费表中时间是否再范围内
try:
print('引用发送短信...')
customer_phone = (await sor.R('organization', {'id': user_orgid}))[0]['contactor_phone']
customer_name = (await sor.R('organization', {'id': user_orgid}))[0]['orgname']
sms_exist_li = await sor.R('sms_record', {'mobile': customer_phone, 'send_type': send_type, 'send_status': '1',
'sort': ['send_time desc']})
if sms_exist_li:
# 如果原来发送成功 时间范围大于24小时 再次发送
if sms_exist_li[0]['send_status']:
sms_exist_time = sms_exist_li[0]['send_time']
time_diff = await get_time_diff(sms_exist_time, time.strftime('%Y-%m-%d %H:%M:%S'))
# 如果发送过的短信在一天范围内 不再发送短信
if time_diff >= time_interval:
sms_should_send = True
else:
print('%s %s短信已经发送过, 但是没有超过阈值时间, 不再发送...' % (send_type, customer_phone))
else:
print('%s %s短信没有发送过, 发送...' % (send_type, customer_phone))
sms_should_send = True
if sms_should_send or not sms_exist_li:
# 给个人发送短信
await send_vcode(customer_phone, send_type, sms_send_dict)
# 给相关管理人员发送短信
saleman_id, saleman_mobile = await get_valid_admin_person({'customerid': user_orgid})
if saleman_mobile:
sms_send_dict_saleman = {
'username': customer_name,
'productname': productname
}
if send_type == '用户欠费通知':
print('sms_send_dict_saleman', sms_send_dict_saleman)
await send_vcode(saleman_mobile, '责任人用户欠费通知', sms_send_dict_saleman)
except Exception as e:
print('发送短信失败: %s' % e)
return {
'status': False,
'msg': '发送短信失败',
'data': e
}
async def baidu_post_pay_charge(bill_exist=None, info_detail=None, baidu_orgid=None, userid=None, baidu_id=None, user_orgid=None, user_parentid=None, sor=None):
"""
计算折扣后的价格
获取针对个人的折扣配置
:param ns:
:return:
"""
info_detail_id = info_detail['id']
bill_date = info_detail['starttime']
financeprice = info_detail['catalogprice']
servicetype = info_detail['servicetype']
# utc时间转本地时间
# bill_date_local_time = await utc2local(bill_date)
# 查找对应的产品id
productid_li = await sor.R('product', {'providerid': baidu_orgid, 'providerpid': 'baidu_' + servicetype, 'del_flg': '0'})
if productid_li:
productid_soure = productid_li[0]['id']
product_name = productid_li[0]['name']
else:
productid_soure = None
product_name = None
real_discount = 1.0
person_discount_sql = """select * from saleprotocol where offer_orgid = '%s' and bid_orgid = '%s' and
salemode = '0' and del_flg = '0' and CURRENT_DATE <= end_date and
CURRENT_DATE >= start_date""" % (user_parentid, user_orgid)
tongyi_discount_sql = """select * from saleprotocol where offer_orgid = '%s' and bid_orgid = '*' and
salemode = '0' and del_flg = '0' and CURRENT_DATE <= end_date and
CURRENT_DATE >= start_date""" % user_parentid
# 没有个人配置 获取百度产品统一配置折扣
for sql_detail in [person_discount_sql, tongyi_discount_sql]:
person_discount_li = await sor.sqlExe(sql_detail, {})
for person_discount in person_discount_li:
protocolid = person_discount['id']
xieyi_zi_search_li = await sor.R('product_salemode', {'protocolid': protocolid, 'del_flg': '0'})
for xieyi_zi in xieyi_zi_search_li:
provider_zi_id = xieyi_zi['providerid']
zi_discount = xieyi_zi['discount']
zi_productid = xieyi_zi['productid']
if provider_zi_id == baidu_orgid and zi_productid == productid_soure:
# 判断产品是否存在 有可能在产品表已经删除
prd_res_exists_li = await sor.R('product', {'id': xieyi_zi['productid'], 'del_flg': '0'})
if not prd_res_exists_li:
continue
real_discount = float(zi_discount)
break
if real_discount != 1.0:
break
# 计算实际支付费用
amount = round(real_discount * float(financeprice), 6)
# 根据params表中bussiness_date进行扣费
bussiness_date = (await sor.R('params', {'pname': 'business_date', 'del_flg': '0'}))[0]['pvalue']
nss_bill = {
'id': uuid(),
'customerid': user_orgid,
'business_op': 'BUY',
'provider_amt': financeprice,
'amount': amount,
'bill_date': bussiness_date,
'bill_timestamp': bussiness_date + ' 00:00:00',
'bill_state': 0,
'productid': productid_soure,
'providerid': baidu_orgid,
'quantity': 1
}
# 比对账户余额和账单金额
db_balance = DBPools()
async with db_balance.sqlorContext('kboss') as sor_balance:
balance_yuan = await getCustomerBalance(sor_balance, user_orgid)
print('%s orgid: %s, 余额为: %s' % (time.strftime('%Y-%m-%d %H:%M:%S'), user_orgid, balance_yuan))
print('将要扣除的金额: %s' % nss_bill['amount'])
if balance_yuan < amount:
sms_send_dict = {
'time': time.strftime('%Y-%m-%d %H:') + '00:00',
'productname': product_name
}
db_sms = DBPools()
async with db_sms.sqlorContext('kboss') as sor_sms:
await diff_sms_send_save(sor=sor_sms, productname=product_name, time_interval=24*60, send_type='用户欠费通知', user_orgid=user_orgid, sms_send_dict=sms_send_dict)
await innder_mail_add_msg({'user_orgid': user_orgid, 'userid': userid, 'productname': product_name})
# 查找是否已经存在
exist_qianfei = await sor.R('arrears', {'billid': info_detail['billid']})
if exist_qianfei:
print('%s 欠费账单已经存在...' % info_detail['billid'])
else:
nss_bill['starttime'] = time.strftime('%Y-%m-%d %H:%M:%S')
nss_bill['source'] = '百度智能云'
nss_bill['localid'] = info_detail['id']
nss_bill['billid'] = info_detail['billid']
nss_bill['del_flg'] = 0
await sor.C('arrears', nss_bill)
# 直接创建账单 不再进行扣费再创建账单流程
if not bill_exist:
info_detail['kaiyuanprice'] = amount
await sor.C('baidu_post_bill', info_detail)
print('%s 用户%s 余额不足' % (time.strftime('%Y-%m-%d %H:%M:%S'), user_orgid))
# 产品欠费下线
ns_offline = {
"accountId": baidu_id,
"service": servicetype,
"region": info_detail['region']
}
print('*************', ns_offline)
await resource_offline(ns_offline)
return
# 如果当前余额不足当前账单24小时内支付 发送预警短信
if amount * 24 > balance_yuan:
sms_send_dict = {
'time': time.strftime('%Y-%m-%d %H:') + '00:00',
'balance': int(balance_yuan)
}
dbs = DBPools()
async with dbs.sqlorContext('kboss') as sor_sms:
await diff_sms_send_save(sor=sor_sms, productname=product_name, time_interval=24*60*2, send_type='余额不足提醒', user_orgid=user_orgid, sms_send_dict=sms_send_dict)
# 记账
# 日期 12-06/12-07 凌晨扣费时出错 由于是事务, 12-06/12-07同时出错 无法扣费
# 所以另开事务 至少有一天可以扣费成功
# 扣费不成功的原因:
# business_date和账单日期对不上/acc_balance余额表中所有日期存在大于账单日期
# 余额部分满足时 扣除可用余额后 事务中余额没变化 此处创建新的事务
db_bill = DBPools()
async with db_bill.sqlorContext('kboss') as sor_bill:
try:
ba = BillAccounting(nss_bill)
await ba.accounting(sor_bill)
# 不存在就在此处创建
if not bill_exist:
info_detail['bill_state'] = 1
info_detail['kaiyuanprice'] = amount
await sor_bill.C('baidu_post_bill', info_detail)
# 修改本地bill状态 0:未支付1:已支付2已取消
# 已经存在就更新
ns_bill_status = {
'id': info_detail_id,
'bill_state': 1,
'kaiyuanprice': amount
}
await sor_bill.U('baidu_post_bill', ns_bill_status)
print('百度账单扣费: %s, 扣费成功' % info_detail['billid'])
except Exception as e:
print('用户: %s, 账单: %s, 扣费失败: %s' % (user_orgid, info_detail['billid'], e))
if not bill_exist:
await sor_bill.C('baidu_post_bill', info_detail)
async def save_baidu_post_bill(data_bill, userid=None, baidu_id=None, user_orgid=None, user_parentid=None, sor=None):
try:
# 查找百度orgid 方便一下查找对应产品的productid
baidu_orgid_li = await sor.R('organization', {'orgname': '百度智能云'})
if baidu_orgid_li:
baidu_orgid = baidu_orgid_li[0]['id']
else:
raise ValueError("无法找到百度机构id, 百度供应商名称是: 百度智能云")
# count = 0
for bill_detail in data_bill['bills']:
# count += 1
# if count > 9:
# return
info_detail = {
'id': uuid(),
'billid': bill_detail.get('billId'),
'accountid': data_bill['subAccountId'],
'servicetype': bill_detail.get('serviceType'),
'servicetypename': bill_detail.get('serviceTypeName'),
'producttype': bill_detail.get('productType'),
'region': bill_detail.get('region'),
'instanceid': bill_detail.get('instanceId'),
'shortid': bill_detail.get('shortId'),
'starttime': bill_detail.get('startTime'),
'endtime': bill_detail.get('endTime'),
'configurationch': bill_detail.get('configurationCH'),
'unitprice': bill_detail.get('unitPrice'),
'pricingunit': bill_detail.get('pricingUnit'),
'chargeitem': bill_detail.get('chargeItem'),
'chargeitemdesc': bill_detail.get('chargeItemDesc'),
'amount': bill_detail.get('amount'),
'amountunit': bill_detail.get('amountUnit'),
'discountamount': bill_detail.get('discountAmount'),
'originprice': bill_detail.get('originPrice'),
'catalogprice': bill_detail.get('catalogPrice'),
'financeprice': bill_detail.get('financePrice'),
'couponprice': bill_detail.get('couponPrice'),
'discountcouponprice': bill_detail.get('discountCouponPrice'),
'cashequivalentcouponprice': bill_detail.get('cashEquivalentCouponPrice'),
'discountprice': bill_detail.get('discountPrice'),
'sysgold': bill_detail.get('sysGold'),
'deductprice': bill_detail.get('deductPrice'),
'originconfig': json.dumps(bill_detail),
'bill_state': 0,
'del_flg': '0'
}
db_sor_bill = DBPools()
async with db_sor_bill.sqlorContext('kboss') as sor_find_bill:
exist_bill_id = await sor_find_bill.R('baidu_post_bill', {'billid': info_detail['billid']})
bill_exist = 0
if exist_bill_id:
if exist_bill_id[0]['bill_state'] == '0':
print('%s 百度后付费账单已经存在, 没有扣款, 账单号: %s' % (user_orgid, info_detail['billid']))
# 此时id已经存在 不再生成新的id
info_detail['id'] = exist_bill_id[0]['id']
bill_exist = 1
else:
# print('%s 百度后付费账单已经存在, 已经扣款, 账单号: %s' % (user_orgid, info_detail['billid']))
continue
else:
pass
# 数据库同时插入和更新死锁 此处不再插入
# await sor.C('baidu_post_bill', info_detail)
# 扣费
await baidu_post_pay_charge(bill_exist=bill_exist, info_detail=info_detail, baidu_orgid=baidu_orgid, userid=userid, baidu_id=baidu_id, user_orgid=user_orgid, user_parentid=user_parentid, sor=sor)
except Exception as e:
print('save_baidu_post_bill: %s' % e)
raise e
async def get_auth_header(method: str, url: str, header: dict, query={}, get_res=False):
import urllib
import hmac
AK = 'ALTAKPk92fX9cgGDax83yNL8mP'
SK = '9b16b8efd4dc463d8bbd5462db1db8a5'
# 解析uri
url_parse = urllib.parse.urlparse(url)
uri = url_parse.path
url_query = url_parse.query
# 获取query
if url_query:
query = dict(urllib.parse.parse_qsl(url_query))
# 2.x-bce-date
x_bce_date = time.gmtime()
x_bce_date = time.strftime('%Y-%m-%dT%H:%M:%SZ', x_bce_date)
# 4.认证字符串前缀
authStringPrefix = "bce-auth-v1" + "/" + AK + "/" + x_bce_date + "/" + "1800"
# windows下为urllib.parse.quoteLinux下为urllib.quote
# 5.生成CanonicalRequest
# 5.1生成CanonicalURI
CanonicalURI = urllib.parse.quote(uri)
# 如果您调用的接口的query比较复杂的话需要做额外处理
# 5.2生成CanonicalQueryString
# CanonicalQueryString = query
result = ['%s=%s' % (k, urllib.parse.quote(str(v))) for k, v in query.items() if k.lower != 'authorization']
result.sort()
CanonicalQueryString = '&'.join(result)
# 5.3生成CanonicalHeaders
result = []
signedHeaders_list = []
for key,value in header.items():
tempStr = str(urllib.parse.quote(key.lower(),safe="")) + ":" + str(urllib.parse.quote(value,safe=""))
signedHeaders_list.append(str(urllib.parse.quote(key.lower(),safe="")))
result.append(tempStr)
result.sort()
signedHeaders = ';'.join(sorted(signedHeaders_list))
CanonicalHeaders = "\n".join(result)
# 5.4拼接得到CanonicalRequest
CanonicalRequest = method + "\n" + CanonicalURI + "\n" + CanonicalQueryString +"\n" + CanonicalHeaders
# 6.生成signingKey
signingKey = hmac.new(SK.encode('utf-8'), authStringPrefix.encode('utf-8'), hashlib.sha256)
# 7.生成Signature
Signature = hmac.new((signingKey.hexdigest()).encode('utf-8'), CanonicalRequest.encode('utf-8'), hashlib.sha256)
# 8.生成Authorization并放到header里
header['Authorization'] = authStringPrefix + "/" + signedHeaders + "/" + Signature.hexdigest()
if get_res:
# 异步请求链接 返回结果
async with aiohttp_client.request(
method=method,
url=url,
headers=header,
allow_redirects=True,
json=query) as res:
return res
else:
return header
async def arrears_charge(customerid=None, source=None):
"""
优先扣除欠费表中的账单
:param ns:
:return:
"""
# customerid = 'gKG8UaYPjA_29K7puzaTM'
# source = '百度智能云'
db_arrears = DBPools()
async with db_arrears.sqlorContext('kboss') as sor_arrears:
bill_date = (await sor_arrears.R('params', {'pname': 'business_date', 'del_flg': '0'}))[0]['pvalue']
try:
arrear_li = await sor_arrears.R('arrears', {'source': source, 'customerid': customerid, 'bill_state': '0'})
arrear_product_types = []
for arr in arrear_li:
nss_bill = {
'id': uuid(),
'customerid': arr['customerid'],
'business_op': arr['business_op'],
'provider_amt': arr['provider_amt'],
'amount': arr['amount'],
'bill_date': bill_date,
'bill_timestamp': bill_date + ' 00:00:00',
'bill_state': 0,
'productid': arr['productid'],
'providerid': arr['providerid'],
'quantity': arr['quantity']
}
ba = BillAccounting(nss_bill)
await ba.accounting(sor_arrears)
# 修改本地bill状态 0:未支付1:已支付2已取消
# 已经存在就更新
arrear_status = {
'id': arr['id'],
'bill_state': 1,
'endtime': time.strftime('%Y-%m-%d %H:%M:%S')
}
await sor_arrears.U('arrears', arrear_status)
print('%s 百度账单-欠费账单-扣费成功, 状态更新成功: %s' % (arr['customerid'], arr['id']))
baidu_post_bill_status = {
'id': arr['localid'],
'bill_state': 1
}
await sor_arrears.U('baidu_post_bill', baidu_post_bill_status)
print('%s 百度账单-欠费账单扣费成功---本地账单-更新成功, 状态更新成功: %s' % (arr['customerid'], arr['billid']))
# 获取百度下线类型列表 用于产品上线
products = await sor_arrears.R('baidu_post_bill', {'id': arr['localid']})
ns_type = {
"accountId": products[0]['accountid'],
"service": products[0]['servicetype'],
"region": products[0]['region']
}
arrear_product_types.append(ns_type)
return {
'status': True,
'data': [dict(t) for t in {frozenset(d.items()) for d in arrear_product_types}]
}
except Exception as e:
print('用户: %s, 欠费账单扣费失败: %s' % (customerid, e))
return {
'status': False
}
async def get_resourcechargeitem_billlist(userid=None, baidu_id=None, user_orgid=None, user_parentid=None, pageno=1, sor=None):
"""
分页获取后付费资源计费项账单详情, 颗粒度:小时 不能查看当月账单
1. 账单过来后余额充足正常扣费
1048条扣费前余额:18327.51 扣费后余额: 18267.13 实际扣除:60.38 实际账单应扣除总额: 62.4347
前10条: 扣费前余额:18267.13 扣费后余额: 18266.93 实际扣除:0.2 实际账单应扣除总额: 0.2255
2. 账单过来后余额不足短信通知
2.1 账单过来后余额不足短信通知-时间范围内是否重复发送
2.1.1 重复发送 创建一个单独事务 修复
2.1.2 充值-但只足够部分账单扣费 余额查询创建单独事务 修复
2.2 账单过来后余额不足短信通知-超过时间范围是否发送 正常发送
2.3 是否在欠费账单里
2.3.1 后续还在欠费 是否在欠费账单重复 不重复
2.3.2 后续余额充足 是否把欠费补充完成 是
3. 余额充足 3个账单 正常扣费 balance由于是小数点两位 造成误差
余额充足 但是不够24小时费用 正常扣费 客户短信正常发送 销售短信发送失败productname=None 修复
余额部分满足 正常扣费 短信不能正常发送/post_bill kaiyuanprice为空没有正常录入 创建新事务 修复
欠费表已存在 重复请求是否重复 不重复
欠费表已存在 余额充足 是否正常扣费 欠费表和post_bill表重复扣费 欠费表创建单独的事务 在查询存在账单时创建事务 修复
欠费表单独事务 - 每一次欠费项一个事务 否则造成欠费表同时不成功 但post_bill表反而扣费成功了 造成不同步 (测试发现状态可以正常改变)
存在问题 1. 小数点过小无法扣除 实际扣除0.0 应扣除0.0205
2. import urllib hmac
3. accounting.bill.BillAccounting 线上删除accounting.bill 此处报错 但账单已保存 测试
:param ns:
:return:
"""
# 优先扣除欠费信息 欠费表/取消释放/忽略小数点后几位较小误差
arrear_types = await arrears_charge(customerid=user_orgid, source='百度智能云')
# 扣费成功 开启下线的服务
if arrear_types and arrear_types.get('status') and arrear_types.get('data'):
for roline in arrear_types['data']:
await resource_online(roline)
# 获取当前日期和时间
current_time = datetime.datetime.now()
# 指定开始日期
# date_string = "2023-11-30"
# format = '%Y-%m-%d' # specifify the format of the date_string.
# current_time = datetime.datetime.strptime(date_string, format)
# 计算七天前的日期
days_ago = current_time - datetime.timedelta(days=7)
# 获取账单百度指定必须在同一个月内
if current_time.month != days_ago.month:
days_ago = datetime.datetime(current_time.year, current_time.month, 1)
# 将日期时间格式化为字符串
current_day = current_time.strftime("%Y-%m-%d")
days_ago_time = days_ago.strftime("%Y-%m-%d")
ns = {
"beginTime": days_ago_time,
"endTime": current_day,
"queryAccountId" : baidu_id,
"pageNo": pageno,
"pageSize": 100
}
method = 'POST'
ns_format = '&'.join(['%s=%s' % (k, v) for k, v in ns.items()])
url = 'https://billing.baidubce.com/v1/bill/resource/chargeitem?%s' % ns_format
header = {
"Host": "billing.baidubce.com"
}
header = await get_auth_header(method=method, url=url, header=header)
async with aiohttp_client.request(
method=method,
url=url,
headers=header,
json=ns) as res:
data_bill = await res.json()
if data_bill.get('bills'):
await save_baidu_post_bill(data_bill=data_bill, userid=userid, baidu_id=baidu_id, user_orgid=user_orgid, user_parentid=user_parentid, sor=sor)
real_page_no = int(data_bill['totalCount'] / data_bill['pageSize']) + 1
if data_bill['pageNo'] < real_page_no:
await get_resourcechargeitem_billlist(userid=userid, baidu_id=baidu_id, user_orgid=user_orgid, user_parentid=user_parentid, pageno=data_bill['pageNo'] + 1, sor=sor)
else:
print('%s 账号ID: %s 后付费账单同步成功, 同步页面数量: %s' % (current_time.strftime("%Y-%m-%d %H:%M:%S"), data_bill['subAccountId'], data_bill['pageNo']))
else:
print('%s 账号ID: %s 或空bills, 后付费账单同步成功, 同步页面数量: %s' % (
current_time.strftime("%Y-%m-%d %H:%M:%S"), data_bill['subAccountId'], data_bill['pageNo']))
async def baidu_users_get_post_pay(ns={}):
db = DBPools()
async with db.sqlorContext('kboss') as sor:
# 获取所有百度用户
baidu_users = await sor.R('baidu_users', {'del_flg': '0'})
for baidu_user in baidu_users:
userid = baidu_user['user_id']
baidu_id = baidu_user['baidu_id']
if baidu_id != 'a6f0dbc20f074ea18b4d3ac3ec77d537':
continue
try:
user_orgid = (await sor.R('users', {'id': userid}))[0]['orgid']
user_parentid = (await sor.R('organization', {'id': user_orgid}))[0]['parentid']
await get_resourcechargeitem_billlist(userid=userid, baidu_id=baidu_id, user_orgid=user_orgid, user_parentid=user_parentid, pageno=1, sor=sor)
except Exception as e:
return {
'status': False,
'msg': '百度后付费账单发生错误',
'data': e
}
return {
'status': True,
'msg': '百度账户后付费扣费成功...'
}
ret = await baidu_users_get_post_pay(params_kw)
return ret