kboss/b/cntoai/process_user_billing.dspy
2026-05-23 14:52:58 +08:00

415 lines
16 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

async def _lookup_product(sor, providername, productname):
"""
按厂商名 + 产品名解析 product 记录。
依次尝试provider.name + product.name → providerpid → 仅 product.name
"""
provider_list = await sor.R('provider', {'name': providername, 'del_flg': '0'})
if provider_list:
product_list = await sor.R(
'product',
{'name': productname, 'providerid': provider_list[0]['orgid'], 'del_flg': '0'},
)
if product_list:
return product_list[0]
product_list = await sor.R('product', {'name': productname, 'providerid': provider_list[0]['orgid'], 'del_flg': '0'})
if product_list:
return product_list[0]
return None
async def _charge_order(sor, orderid, order_type='NEW'):
"""
确认支付:校验余额 → order2bill → BillAccounting → 更新订单/账单 → customer_goods。
逻辑来自 get_baidu_orderlist.dspy 的 affirmbz_order。
"""
order_rows = await sor.R('bz_order', {'id': orderid})
if not order_rows:
debug(f"订单不存在")
return {'status': 'error', 'msg': '订单不存在'}
order_row = order_rows[0]
product_url = None
await get_business_date(sor=None)
count = await getCustomerBalance(sor, order_row['customerid'])
if count is None:
count = 0
if count - float(order_row['amount']) < 0:
pricedifference = count - round(order_row['amount'], 2)
debug(f"账户余额不足,订单金额: {order_row['amount']}, 账户余额: {count}, 差额: {pricedifference}")
return {
'status': 'error',
'msg': '账户余额不足',
'pricedifference': round(pricedifference, 10),
}
await order2bill(orderid, sor)
bills = await sor.R('bill', {'orderid': orderid, 'del_flg': '0'})
try:
for bill in bills:
ba = BillAccounting(bill)
await ba.accounting(sor)
dates = datetime.datetime.now()
await sor.U('bz_order', {'id': orderid, 'order_status': '1', 'create_at': dates})
await sor.U('bill', {'id': orderid, 'bill_state': '1'})
# 暂时不处理customer_goods
# order_goods = await sor.R('order_goods', {'orderid': orderid})
# for item in order_goods:
# if order_type == 'REFUND':
# resource_find_sql = (
# "select id from customer_goods where resourceid = '%s';"
# % item['resourceids']
# )
# resource_find_li = await sor.sqlExe(resource_find_sql, {})
# resource_find_id = resource_find_li[0]['id']
# await sor.U('customer_goods', {'id': resource_find_id, 'del_flg': '1'})
# elif order_type == 'RENEW':
# resource_find_sql = (
# "select id from customer_goods where FIND_IN_SET('%s', resourceid) and del_flg = '0';"
# % item['resourceids']
# )
# resource_find_li = await sor.sqlExe(resource_find_sql, {})
# resource_find_id = resource_find_li[0]['id']
# await sor.U(
# 'customer_goods',
# {
# 'id': resource_find_id,
# 'start_date': item['resourcestarttime'],
# 'expire_date': item['resourceendtime'],
# },
# )
# else:
# if item.get('chargemode') == 'postpay' and item.get('orderkey') == 'snapshot':
# continue
# product = await sor.R('product', {'id': item['productid']})
# now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# nss = {
# 'id': uuid(),
# 'providerrid': product[0]['providerid'],
# 'productname': product[0]['name'],
# 'productdesc': product[0]['description'],
# 'customerid': order_row['customerid'],
# 'productid': product[0]['id'],
# 'specdataid': item.get('spec_id'),
# 'orderid': order_row['id'],
# 'start_date': item.get('resourcestarttime') or now_str,
# 'expire_date': item.get('resourceendtime'),
# 'resourceid': item.get('resourceids') or '',
# 'orderkey': item.get('orderkey'),
# }
# if product_url:
# nss['product_url'] = product_url
# else:
# spec = (
# json.loads(product[0]['spec_note'])
# if isinstance(product[0]['spec_note'], str)
# else product[0]['spec_note']
# )
# spec_list_url = [
# spec_item['value']
# for spec_item in (spec or [])
# if spec_item.get('configName') == 'listUrl'
# ]
# nss['product_url'] = (
# spec_list_url[0]
# if spec_list_url
# else 'https://console.vcp.baidu.com/bcc/#/bcc/instance/list'
# )
# await sor.C('customer_goods', nss)
debug(f"支付成功")
return {'status': True, 'msg': '支付成功'}
except Exception as error:
debug(f"支付失败: {error}")
return {'status': 'error', 'msg': str(error)}
async def calc_price_by_saleprotocol(sor, org, product_id, supply_price, quantity=1):
"""
查 saleprotocol、product_salemode按折扣计算应付金额。
:param sor: sqlor 上下文kboss
:param org: organization 记录,须含 id、parentid
:param product_id: product 表主键
:param supply_price: 供应价/目录价(折扣前单价,与百度脚本 catalogPrice / itemFee.price 同义)
:param quantity: 数量,默认 1
:return: dict
成功: status=True, amount(行总金额), price(折后单价), list_price, discount
失败: status='error', msg
"""
try:
supply_price = abs(float(supply_price))
quantity = int(quantity)
except (TypeError, ValueError):
debug(f"calc_price_by_saleprotocol supply_price / quantity 必须为有效数字")
return {'status': 'error', 'msg': 'supply_price / quantity 必须为有效数字'}
if supply_price <= 0:
debug(f"calc_price_by_saleprotocol supply_price 必须大于 0")
return {'status': 'error', 'msg': 'supply_price 必须大于 0'}
if quantity <= 0:
debug(f"calc_price_by_saleprotocol quantity 必须大于 0")
return {'status': 'error', 'msg': 'quantity 必须大于 0'}
saleprotocol_to_person = await sor.R(
'saleprotocol',
{'bid_orgid': org['id'], 'offer_orgid': org['parentid'], 'del_flg': '0'},
)
saleprotocol_to_all = await sor.R(
'saleprotocol',
{
'bid_orgid': '*',
'offer_orgid': org['parentid'],
'del_flg': '0',
'salemode': '0',
},
)
product_salemode = None
if saleprotocol_to_person:
product_salemode = await sor.R(
'product_salemode',
{
'protocolid': saleprotocol_to_person[0]['id'],
'productid': product_id,
'del_flg': '0',
},
)
if not product_salemode and saleprotocol_to_all:
product_salemode = await sor.R(
'product_salemode',
{
'protocolid': saleprotocol_to_all[0]['id'],
'productid': product_id,
'del_flg': '0',
},
)
elif saleprotocol_to_all:
product_salemode = await sor.R(
'product_salemode',
{
'protocolid': saleprotocol_to_all[0]['id'],
'productid': product_id,
'del_flg': '0',
},
)
if not product_salemode:
debug(f"calc_price_by_saleprotocol 还未上线这个产品的协议配置")
return {'status': 'error', 'msg': '还未上线这个产品的协议配置'}
discount = product_salemode[0]['discount']
list_price = supply_price
price = abs(round(list_price * discount, 12))
amount = abs(round(price * quantity, 12))
return {
'status': True,
'amount': amount,
'price': price,
'list_price': list_price,
'discount': discount,
'protocolid': product_salemode[0]['protocolid'],
'product_salemode_id': product_salemode[0].get('id'),
}
async def process_user_billing(ns={}):
"""
通用记账扣费:创建本地订单 → 校验余额 → 出账记账。
:param userid: 用户 ID
:param providername: 厂商名称(写入 bz_order.source并用于查 product
:param productname: 产品名称(写入 servicename并用于查 product
:param amount: 扣费金额use_saleprotocol='error' 时为最终扣费额;
use_saleprotocol=True 时为供应价/目录价(折扣前单价),走协议算价
:param use_saleprotocol: 是否启用 saleprotocol_pricing 协议折扣算价,默认 'error' 直接按 amount 扣费
:param quantity: 仅 use_saleprotocol=True 时生效,数量默认 1
:return: dict含 status、msg成功时含 orderid、amount
"""
# 存储输入值到usage表
db = DBPools()
async with db.sqlorContext('kboss') as sor:
usage_ns = {
'id': uuid(),
'userid': ns.get('userid'),
'apikey': ns.get('apikey'),
'llmid': ns.get('llmid'),
'original_price': ns.get('amount'),
'usage_content': json.dumps(ns.get('usage')) if isinstance(ns.get('usage'), dict) else ns.get('usage')
}
await sor.C('model_usage', usage_ns)
apikey = ns.get('apikey')
userid = ns.get('userid')
providername = ns.get('providername')
productname = ns.get('productname')
amount = ns.get('amount')
use_saleprotocol = ns.get('use_saleprotocol', True)
quantity = int(ns.get('quantity', 1))
llmid = ns.get('llmid')
if not llmid:
debug(f"{userid} process_user_billing llmid必传")
return {
'status': 'error',
'msg': 'llmid必传'
}
try:
amount = round(float(amount), 12)
except (TypeError, ValueError):
debug(f"{userid} process_user_billing amount 必须为有效数字")
return {'status': 'error', 'msg': 'amount 必须为有效数字'}
if amount <= 0:
debug(f"{userid} process_user_billing amount 必须大于 0")
return {'status': 'error', 'msg': 'amount 必须大于 0'}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
product_li = await sor.R('product', {'providerpid': llmid, 'del_flg': '0'})
if not product_li:
debug(f"{userid} process_user_billing 未找到对应产品,请确认")
return {
'status': 'error',
'msg': '未找到对应产品,请确认'
}
product = product_li[0]
productname = product['name']
providerid = product['providerid']
providername_list = await sor.R('organization', {'id': providerid})
if not providername_list:
debug(f"{userid} process_user_billing 厂商不存在 %s" % providername)
return {
'status': 'error',
'msg': '厂商不存在 %s' % providername
}
providername = providername_list[0]['orgname']
# userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey})
# if not userid_li:
# debug(f"{userid} process_user_billing apikey无效请联系管理员")
# return {
# 'status': 'error',
# 'msg': 'apikey无效请联系管理员'
# }
# userid = userid_li[0]['userid']
user_list = await sor.R('users', {'id': userid})
if not user_list:
debug(f"{userid} process_user_billing 用户不存在 %s" % userid)
return {'status': 'error', 'msg': '用户不存在 %s' % userid}
org_list = await sor.R('organization', {'id': user_list[0]['orgid']})
if not org_list:
debug(f"{userid} process_user_billing 用户所属机构不存在")
return {'status': 'error', 'msg': '用户所属机构不存在'}
customerid = org_list[0]['id']
# product = await _lookup_product(sor, providername, productname)
# if not product:
# return {
# 'status': 'error',
# 'msg': '未找到对应产品,请确认 providername/productname 与库中 provider、product 配置一致',
# }
list_price = amount
unit_price = amount
discount = 1
originalprice = amount
if use_saleprotocol:
price_res = await calc_price_by_saleprotocol(
sor, org_list[0], product['id'], amount, quantity=quantity,
)
if not price_res['status']:
return price_res
debug(price_res)
debug('list_price %s' % list_price)
amount = price_res['amount']
list_price = price_res['list_price']
unit_price = price_res['price']
discount = price_res['discount']
originalprice = list_price * quantity
balance = await getCustomerBalance(sor, customerid)
if balance is None:
balance = 0
if amount > balance:
return {
'status': 'error',
'msg': '账户余额不足',
'pricedifference': round(balance - amount, 12),
}
order_id = uuid()
now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
bz_ns = {
'id': order_id,
'order_status': '0',
'business_op': 'BUY',
'userid': userid,
'customerid': customerid,
'order_date': now_str,
'source': providername,
'amount': amount,
'originalprice': round(originalprice, 12),
'ordertype': 'prepay',
'servicename': productname,
'is_big_model': 1
}
await sor.C('bz_order', bz_ns)
goods_ns = {
'id': uuid(),
'orderid': order_id,
'productid': product['id'],
'providerid': product['providerid'],
'list_price': list_price,
'discount': discount,
'quantity': quantity if use_saleprotocol else 1,
'price': unit_price,
'amount': amount,
'chargemode': 'prepay',
'servicename': productname,
'resourceids': '',
'resourcestarttime': now_str,
'resourceendtime': None,
'is_big_model': 1
}
await sor.C('order_goods', goods_ns)
charge_res = await _charge_order(sor, order_id, order_type='NEW')
if not charge_res['status']:
await sor.rollback()
return charge_res
await sor.U('model_usage', {'id': usage_ns['id'], 'orderid': order_id, 'bill_status': 1})
result = {
'status': 'ok',
'msg': '扣费成功',
'orderid': order_id,
'amount': amount,
'productid': product['id'],
}
if use_saleprotocol:
result['discount'] = discount
result['list_price'] = list_price
result['price'] = unit_price
return result
except Exception as e:
sor.rollback()
return {
'status': 'error',
'msg': str(e)
}
ret = await process_user_billing(params_kw)
return ret