kboss/b/cpcc/cpcwidget/accrual_price.dspy
2025-07-16 14:27:17 +08:00

167 lines
8.2 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

async def accrual_price(params_kw={}):
global_status = True
global_msg = '获取部件组合价格成功'
try:
debug(f'{params_kw=}')
db = DBPools()
dbname = 'kboss'
if not params_kw.cpcid:
global_status = False
global_msg = '无算力中心ID'
return {'status': global_status,'msg': global_msg}
ns = {
"cpcid":params_kw.cpcid
}
async with db.sqlorContext(dbname) as sor:
y_sql = f"""select type, model, unit_price, resource_unit, stock, consumed, (stock-consumed) as avail_stock from cpcwidget where cpcid='{params_kw.cpcid}'"""
clusterid = params_kw.clusterid
if clusterid:
y_sql += f""" and clusterid='{clusterid}'"""
debug(f'sql:{y_sql}')
resulta = await sor.sqlExe(y_sql, {})
debug(f'SQL结果长度:{len(resulta)}')
"""============================核心汇算部件价格逻辑============================"""
# 初始化资源字典
RESOURCE_PRICING = {
"cpu": {},
"memory": {},
"disk": {},
"gpu": {}
}
# 处理查询结果并填充资源字典
for row in resulta:
#debug(f'===row:{row}')
type, model, unit_price, resource_unit, stock, avail_stock =\
row.get("type"),row.get("model"),\
row.get("unit_price"),row.get("resource_unit"),row.get("stock"),row.get("avail_stock")
model = model.strip()
#debug(f'===>>> {type}, {model}, {unit_price}, {resource_unit}, {stock}')
if type in RESOURCE_PRICING:
RESOURCE_PRICING[type][model] = {
"unit_price": float(unit_price),
"resource_unit": resource_unit,
"stock": int(stock),
"avail_stock": int(avail_stock)
}
debug(f'RESOURCE_PRICING:{RESOURCE_PRICING}')
# 汇算计价核心逻辑
# 初始化结果
result = {
"total_price": 0.0,
"currency": "元",
"price_details": [],
"status": True,
"message": "汇算正常"
}
# 计费周期转换系数(统一转换为小/时)
DURATION_FACTOR = {
"小时": 1, # 按小时计费
"天": 24, # 按天计费
"周": 168, # 按7天计算(按周计算)
"月": 720, # 按30天计算(按月计费)
"年": 8760 # 按365天计算(按年计费)
}
# 获取资源配置
resources = params_kw.get("resources", [])
if not resources:
result["status"] = False
result["message"] = "异常:请求中必须包含资源信息"
raise RuntimeError(f'{result["message"]}')
# 获取使用时长并转换为小时
duration = int(params_kw.get("duration", 1))
duration_unit = params_kw.get("duration_unit", "小时")
#debug(f'时间参数: {duration} {duration_unit} {DURATION_FACTOR.get(duration_unit, 1)} {duration * (DURATION_FACTOR.get(duration_unit, 1))}')
duration_hours = duration * DURATION_FACTOR.get(duration_unit, 1)
result["duration"] = duration
result["duration_unit"] = duration_unit
# 遍历计算每种资源的价格
for resource in resources:
if isinstance(resource,str):
resource = eval(resource)
debug(f'用户输入resource:{resource}')
resource_type = resource.get("type")
amount = resource.get("amount", 0)
if not isinstance(amount,int):
amount = int(amount)
try:
if not resource_type:
raise ValueError("不支持的部件类型")
if amount == 0:
debug(f'部件申请量为空: {amount}')
continue
# 获取资源定价信息
if resource_type in ["cpu", "gpu", "disk", "memory"]:
# 需要查询型号的部件类型
model = resource.get("model")
if not model:
raise ValueError(f"{resource_type} 必须指定部件型号/系列")
if not RESOURCE_PRICING.get(resource_type):
raise ValueError(f"没有该类型组件")
resource_info = RESOURCE_PRICING[resource_type].get(model)
if not resource_info:
raise ValueError(f"无效的 {resource_type} 部件型号: {model}")
resource_name = f"{resource_type} {model}"
else:
raise ValueError(f"不支持的资源类型: {resource_type}")
# 检查库存
if amount < 0:
raise ValueError(f'amount:{amount} < 0, Not Allowed!!!')
#此处待考虑暂时选择提前限制cpu,memory,gpu
if resource_info["avail_stock"] <= amount:
debug(f'❌ {resource_name} 可用量: {resource_info["avail_stock"]}, 请求量:{amount}')
raise ValueError(f'{resource_name} 库存可用量不足 (可用量: {resource_info["avail_stock"]}, 请求量: {amount})')
#if resource_info["avail_stock"] <= amount and resource_type == "disk":
# 其他部件类型只警告,磁盘类型会真实限制!!!
# debug(f'❌ {resource_name} 可用量: {resource_info["avail_stock"]}, 请求量:{amount}')
# raise ValueError(f'{resource_name} 库存可用量不足 (可用量: {resource_info["avail_stock"]}, 请求量: {amount})')
# 计算价格,提取并验证 unit_price
unit_price = round(float(resource_info["unit_price"]), 4)
debug(f'unit_price:{unit_price},amount:{amount},duration_hours:{duration_hours}')
subtotal = unit_price * float(amount) * float(duration_hours)
debug(f'subtotal:{subtotal}')
# 更新总价和明细
result["total_price"] += subtotal
result["price_details"].append({
#"resource_name": resource_name,
"model":model,
"resource_type": resource_type,
"amount": amount,
"resource_unit": resource_info["resource_unit"],
"unit_price": unit_price,
"duration": duration,
"duration_unit": duration_unit,
"subtotal": round(subtotal, 2)
})
except Exception as e:
import traceback
traceback.print_exc()
# 记录错误并继续处理其他资源
result["price_details"].append({
"resource_type": resource_type,
"error": str(e)
})
if result["status"] == True:
result["status"] = False
result["message"] = str(e)
global_status = False
global_msg = str(e)
# 结果格式化
result["total_price"] = round(result["total_price"], 2)
"""======================================================================"""
return {'status': global_status,'msg': global_msg,'data': result}
raise RuntimeError("中间逻辑报错")
except Exception as e:
global_status = False
global_msg = f'获取部件组合价格失败:{e}'
return {'status': global_status,'msg': global_msg}
ret = await accrual_price(params_kw)
return ret