416 lines
16 KiB
Plaintext
416 lines
16 KiB
Plaintext
async def _lookup_product(sor, providername, productname):
|
||
"""
|
||
按厂商名 + 产品名解析 product 记录。
|
||
依次尝试:provider.name + product.name → providerpid → 仅 product.name
|
||
"""
|
||
provider_list = await sor.R('provider', {'name': providername, 'del_flg': '0'})
|
||
if provider_list:
|
||
product_list = await sor.R(
|
||
'product',
|
||
{'name': productname, 'providerid': provider_list[0]['orgid'], 'del_flg': '0'},
|
||
)
|
||
if product_list:
|
||
return product_list[0]
|
||
|
||
product_list = await sor.R('product', {'name': productname, 'providerid': provider_list[0]['orgid'], 'del_flg': '0'})
|
||
if product_list:
|
||
return product_list[0]
|
||
|
||
return None
|
||
|
||
|
||
async def _charge_order(sor, orderid, order_type='NEW'):
|
||
"""
|
||
确认支付:校验余额 → order2bill → BillAccounting → 更新订单/账单 → customer_goods。
|
||
逻辑来自 get_baidu_orderlist.dspy 的 affirmbz_order。
|
||
"""
|
||
order_rows = await sor.R('bz_order', {'id': orderid})
|
||
if not order_rows:
|
||
debug(f"订单不存在")
|
||
return {'status': 'error', 'msg': '订单不存在'}
|
||
|
||
order_row = order_rows[0]
|
||
product_url = None
|
||
|
||
await get_business_date(sor=None)
|
||
|
||
count = await getCustomerBalance(sor, order_row['customerid'])
|
||
if count is None:
|
||
count = 0
|
||
if count - float(order_row['amount']) < 0:
|
||
pricedifference = count - round(order_row['amount'], 2)
|
||
debug(f"账户余额不足,订单金额: {order_row['amount']}, 账户余额: {count}, 差额: {pricedifference}")
|
||
return {
|
||
'status': 'error',
|
||
'msg': '账户余额不足',
|
||
'pricedifference': round(pricedifference, 10),
|
||
}
|
||
|
||
await order2bill(orderid, sor)
|
||
bills = await sor.R('bill', {'orderid': orderid, 'del_flg': '0'})
|
||
try:
|
||
for bill in bills:
|
||
ba = BillAccounting(bill)
|
||
await ba.accounting(sor)
|
||
dates = datetime.datetime.now()
|
||
await sor.U('bz_order', {'id': orderid, 'order_status': '1', 'create_at': dates})
|
||
await sor.U('bill', {'id': orderid, 'bill_state': '1'})
|
||
|
||
# 暂时不处理customer_goods
|
||
# order_goods = await sor.R('order_goods', {'orderid': orderid})
|
||
# for item in order_goods:
|
||
# if order_type == 'REFUND':
|
||
# resource_find_sql = (
|
||
# "select id from customer_goods where resourceid = '%s';"
|
||
# % item['resourceids']
|
||
# )
|
||
# resource_find_li = await sor.sqlExe(resource_find_sql, {})
|
||
# resource_find_id = resource_find_li[0]['id']
|
||
# await sor.U('customer_goods', {'id': resource_find_id, 'del_flg': '1'})
|
||
# elif order_type == 'RENEW':
|
||
# resource_find_sql = (
|
||
# "select id from customer_goods where FIND_IN_SET('%s', resourceid) and del_flg = '0';"
|
||
# % item['resourceids']
|
||
# )
|
||
# resource_find_li = await sor.sqlExe(resource_find_sql, {})
|
||
# resource_find_id = resource_find_li[0]['id']
|
||
# await sor.U(
|
||
# 'customer_goods',
|
||
# {
|
||
# 'id': resource_find_id,
|
||
# 'start_date': item['resourcestarttime'],
|
||
# 'expire_date': item['resourceendtime'],
|
||
# },
|
||
# )
|
||
# else:
|
||
# if item.get('chargemode') == 'postpay' and item.get('orderkey') == 'snapshot':
|
||
# continue
|
||
|
||
# product = await sor.R('product', {'id': item['productid']})
|
||
# now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
# nss = {
|
||
# 'id': uuid(),
|
||
# 'providerrid': product[0]['providerid'],
|
||
# 'productname': product[0]['name'],
|
||
# 'productdesc': product[0]['description'],
|
||
# 'customerid': order_row['customerid'],
|
||
# 'productid': product[0]['id'],
|
||
# 'specdataid': item.get('spec_id'),
|
||
# 'orderid': order_row['id'],
|
||
# 'start_date': item.get('resourcestarttime') or now_str,
|
||
# 'expire_date': item.get('resourceendtime'),
|
||
# 'resourceid': item.get('resourceids') or '',
|
||
# 'orderkey': item.get('orderkey'),
|
||
# }
|
||
|
||
# if product_url:
|
||
# nss['product_url'] = product_url
|
||
# else:
|
||
# spec = (
|
||
# json.loads(product[0]['spec_note'])
|
||
# if isinstance(product[0]['spec_note'], str)
|
||
# else product[0]['spec_note']
|
||
# )
|
||
# spec_list_url = [
|
||
# spec_item['value']
|
||
# for spec_item in (spec or [])
|
||
# if spec_item.get('configName') == 'listUrl'
|
||
# ]
|
||
# nss['product_url'] = (
|
||
# spec_list_url[0]
|
||
# if spec_list_url
|
||
# else 'https://console.vcp.baidu.com/bcc/#/bcc/instance/list'
|
||
# )
|
||
|
||
# await sor.C('customer_goods', nss)
|
||
debug(f"支付成功")
|
||
return {'status': True, 'msg': '支付成功'}
|
||
except Exception as error:
|
||
debug(f"支付失败: {error}")
|
||
return {'status': 'error', 'msg': str(error)}
|
||
|
||
async def calc_price_by_saleprotocol(sor, org, product_id, supply_price, quantity=1):
|
||
"""
|
||
查 saleprotocol、product_salemode,按折扣计算应付金额。
|
||
|
||
:param sor: sqlor 上下文(kboss)
|
||
:param org: organization 记录,须含 id、parentid
|
||
:param product_id: product 表主键
|
||
:param supply_price: 供应价/目录价(折扣前单价,与百度脚本 catalogPrice / itemFee.price 同义)
|
||
:param quantity: 数量,默认 1
|
||
:return: dict
|
||
成功: status=True, amount(行总金额), price(折后单价), list_price, discount
|
||
失败: status='error', msg
|
||
"""
|
||
try:
|
||
supply_price = abs(float(supply_price))
|
||
quantity = int(quantity)
|
||
except (TypeError, ValueError):
|
||
debug(f"calc_price_by_saleprotocol supply_price / quantity 必须为有效数字")
|
||
return {'status': 'error', 'msg': 'supply_price / quantity 必须为有效数字'}
|
||
|
||
if supply_price <= 0:
|
||
debug(f"calc_price_by_saleprotocol supply_price 必须大于 0")
|
||
return {'status': 'error', 'msg': 'supply_price 必须大于 0'}
|
||
if quantity <= 0:
|
||
debug(f"calc_price_by_saleprotocol quantity 必须大于 0")
|
||
return {'status': 'error', 'msg': 'quantity 必须大于 0'}
|
||
|
||
saleprotocol_to_person = await sor.R(
|
||
'saleprotocol',
|
||
{'bid_orgid': org['id'], 'offer_orgid': org['parentid'], 'del_flg': '0'},
|
||
)
|
||
saleprotocol_to_all = await sor.R(
|
||
'saleprotocol',
|
||
{
|
||
'bid_orgid': '*',
|
||
'offer_orgid': org['parentid'],
|
||
'del_flg': '0',
|
||
'salemode': '0',
|
||
},
|
||
)
|
||
|
||
product_salemode = None
|
||
if saleprotocol_to_person:
|
||
product_salemode = await sor.R(
|
||
'product_salemode',
|
||
{
|
||
'protocolid': saleprotocol_to_person[0]['id'],
|
||
'productid': product_id,
|
||
'del_flg': '0',
|
||
},
|
||
)
|
||
if not product_salemode and saleprotocol_to_all:
|
||
product_salemode = await sor.R(
|
||
'product_salemode',
|
||
{
|
||
'protocolid': saleprotocol_to_all[0]['id'],
|
||
'productid': product_id,
|
||
'del_flg': '0',
|
||
},
|
||
)
|
||
elif saleprotocol_to_all:
|
||
product_salemode = await sor.R(
|
||
'product_salemode',
|
||
{
|
||
'protocolid': saleprotocol_to_all[0]['id'],
|
||
'productid': product_id,
|
||
'del_flg': '0',
|
||
},
|
||
)
|
||
|
||
if not product_salemode:
|
||
debug(f"calc_price_by_saleprotocol 还未上线这个产品的协议配置")
|
||
return {'status': 'error', 'msg': '还未上线这个产品的协议配置'}
|
||
|
||
discount = product_salemode[0]['discount']
|
||
list_price = supply_price
|
||
price = abs(round(list_price * discount, 12))
|
||
amount = abs(round(price * quantity, 12))
|
||
|
||
return {
|
||
'status': True,
|
||
'amount': amount,
|
||
'price': price,
|
||
'list_price': list_price,
|
||
'discount': discount,
|
||
'protocolid': product_salemode[0]['protocolid'],
|
||
'product_salemode_id': product_salemode[0].get('id'),
|
||
}
|
||
|
||
async def process_user_billing(ns={}):
|
||
"""
|
||
通用记账扣费:创建本地订单 → 校验余额 → 出账记账。
|
||
|
||
:param userid: 用户 ID
|
||
:param providername: 厂商名称(写入 bz_order.source,并用于查 product)
|
||
:param productname: 产品名称(写入 servicename,并用于查 product)
|
||
:param amount: 扣费金额;use_saleprotocol='error' 时为最终扣费额;
|
||
use_saleprotocol=True 时为供应价/目录价(折扣前单价),走协议算价
|
||
:param use_saleprotocol: 是否启用 saleprotocol_pricing 协议折扣算价,默认 'error' 直接按 amount 扣费
|
||
:param quantity: 仅 use_saleprotocol=True 时生效,数量默认 1
|
||
:return: dict,含 status、msg;成功时含 orderid、amount
|
||
"""
|
||
debug(f"process_user_billing 传递参数: {ns}")
|
||
# 存储输入值到usage表
|
||
db = DBPools()
|
||
async with db.sqlorContext('kboss') as sor_init:
|
||
usage_ns = {
|
||
'id': uuid(),
|
||
'userid': ns.get('userid'),
|
||
'apikey': ns.get('apikey'),
|
||
'llmid': ns.get('llmid'),
|
||
'original_price': ns.get('amount'),
|
||
'usage_content': json.dumps(ns.get('usage')) if isinstance(ns.get('usage'), dict) else ns.get('usage')
|
||
}
|
||
await sor_init.C('model_usage', usage_ns)
|
||
|
||
apikey = ns.get('apikey')
|
||
userid = ns.get('userid')
|
||
providername = ns.get('providername')
|
||
productname = ns.get('productname')
|
||
amount = ns.get('amount')
|
||
use_saleprotocol = ns.get('use_saleprotocol', True)
|
||
quantity = int(ns.get('quantity', 1))
|
||
|
||
llmid = ns.get('llmid')
|
||
if not llmid:
|
||
debug(f"{userid} process_user_billing llmid必传")
|
||
return {
|
||
'status': 'error',
|
||
'msg': 'llmid必传'
|
||
}
|
||
|
||
try:
|
||
amount = round(float(amount), 12)
|
||
except (TypeError, ValueError):
|
||
debug(f"{userid} process_user_billing amount 必须为有效数字")
|
||
return {'status': 'error', 'msg': 'amount 必须为有效数字'}
|
||
|
||
if amount <= 0:
|
||
debug(f"{userid} process_user_billing amount 必须大于 0")
|
||
return {'status': 'error', 'msg': 'amount 必须大于 0'}
|
||
|
||
db = DBPools()
|
||
async with db.sqlorContext('kboss') as sor:
|
||
try:
|
||
product_li = await sor.R('product', {'providerpid': llmid, 'del_flg': '0'})
|
||
if not product_li:
|
||
debug(f"{userid} process_user_billing 未找到对应产品,请确认")
|
||
return {
|
||
'status': 'error',
|
||
'msg': '未找到对应产品,请确认'
|
||
}
|
||
product = product_li[0]
|
||
productname = product['name']
|
||
providerid = product['providerid']
|
||
providername_list = await sor.R('organization', {'id': providerid})
|
||
if not providername_list:
|
||
debug(f"{userid} process_user_billing 厂商不存在 %s" % providername)
|
||
return {
|
||
'status': 'error',
|
||
'msg': '厂商不存在 %s' % providername
|
||
}
|
||
providername = providername_list[0]['orgname']
|
||
|
||
# userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey})
|
||
# if not userid_li:
|
||
# debug(f"{userid} process_user_billing apikey无效,请联系管理员")
|
||
# return {
|
||
# 'status': 'error',
|
||
# 'msg': 'apikey无效,请联系管理员'
|
||
# }
|
||
# userid = userid_li[0]['userid']
|
||
|
||
user_list = await sor.R('users', {'id': userid})
|
||
if not user_list:
|
||
debug(f"{userid} process_user_billing 用户不存在 %s" % userid)
|
||
return {'status': 'error', 'msg': '用户不存在 %s' % userid}
|
||
|
||
org_list = await sor.R('organization', {'id': user_list[0]['orgid']})
|
||
if not org_list:
|
||
debug(f"{userid} process_user_billing 用户所属机构不存在")
|
||
return {'status': 'error', 'msg': '用户所属机构不存在'}
|
||
|
||
customerid = org_list[0]['id']
|
||
# product = await _lookup_product(sor, providername, productname)
|
||
# if not product:
|
||
# return {
|
||
# 'status': 'error',
|
||
# 'msg': '未找到对应产品,请确认 providername/productname 与库中 provider、product 配置一致',
|
||
# }
|
||
|
||
list_price = amount
|
||
unit_price = amount
|
||
discount = 1
|
||
originalprice = amount
|
||
|
||
if use_saleprotocol:
|
||
price_res = await calc_price_by_saleprotocol(
|
||
sor, org_list[0], product['id'], amount, quantity=quantity,
|
||
)
|
||
if price_res['status'] == 'error':
|
||
return price_res
|
||
debug(price_res)
|
||
debug('list_price %s' % list_price)
|
||
amount = price_res['amount']
|
||
list_price = price_res['list_price']
|
||
unit_price = price_res['price']
|
||
discount = price_res['discount']
|
||
originalprice = list_price * quantity
|
||
|
||
balance = await getCustomerBalance(sor, customerid)
|
||
if balance is None:
|
||
balance = 0
|
||
if amount > balance:
|
||
return {
|
||
'status': 'error',
|
||
'msg': '账户余额不足',
|
||
'pricedifference': round(balance - amount, 12),
|
||
}
|
||
|
||
order_id = uuid()
|
||
now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
bz_ns = {
|
||
'id': order_id,
|
||
'order_status': '0',
|
||
'business_op': 'BUY',
|
||
'userid': userid,
|
||
'customerid': customerid,
|
||
'order_date': now_str,
|
||
'source': providername,
|
||
'amount': amount,
|
||
'originalprice': round(originalprice, 12),
|
||
'ordertype': 'prepay',
|
||
'servicename': productname,
|
||
'is_big_model': 1
|
||
}
|
||
await sor.C('bz_order', bz_ns)
|
||
|
||
goods_ns = {
|
||
'id': uuid(),
|
||
'orderid': order_id,
|
||
'productid': product['id'],
|
||
'providerid': product['providerid'],
|
||
'list_price': list_price,
|
||
'discount': discount,
|
||
'quantity': quantity if use_saleprotocol else 1,
|
||
'price': unit_price,
|
||
'amount': amount,
|
||
'chargemode': 'prepay',
|
||
'servicename': productname,
|
||
'resourceids': '',
|
||
'resourcestarttime': now_str,
|
||
'resourceendtime': None,
|
||
'is_big_model': 1
|
||
}
|
||
await sor.C('order_goods', goods_ns)
|
||
|
||
charge_res = await _charge_order(sor, order_id, order_type='NEW')
|
||
if charge_res['status'] == 'error':
|
||
await sor.rollback()
|
||
return charge_res
|
||
|
||
await sor.U('model_usage', {'id': usage_ns['id'], 'orderid': order_id, 'bill_status': 1})
|
||
|
||
result = {
|
||
'status': 'ok',
|
||
'msg': '扣费成功',
|
||
'orderid': order_id,
|
||
'amount': amount,
|
||
'productid': product['id'],
|
||
}
|
||
if use_saleprotocol:
|
||
result['discount'] = discount
|
||
result['list_price'] = list_price
|
||
result['price'] = unit_price
|
||
return result
|
||
except Exception as e:
|
||
sor.rollback()
|
||
return {
|
||
'status': 'error',
|
||
'msg': str(e)
|
||
}
|
||
|
||
ret = await process_user_billing(params_kw)
|
||
return ret |