async def accrual_price(params_kw={}): global_status = True global_msg = '获取部件组合价格成功' try: debug(f'{params_kw=}') db = DBPools() dbname = 'kboss' if not params_kw.cpcid: global_status = False global_msg = '无算力中心ID' return {'status': global_status,'msg': global_msg} ns = { "cpcid":params_kw.cpcid } async with db.sqlorContext(dbname) as sor: y_sql = f"""select type, model, unit_price, resource_unit, stock, consumed, (stock-consumed) as avail_stock from cpcwidget where cpcid='{params_kw.cpcid}'""" clusterid = params_kw.clusterid if clusterid: y_sql += f""" and clusterid='{clusterid}'""" debug(f'sql:{y_sql}') resulta = await sor.sqlExe(y_sql, {}) debug(f'SQL结果长度:{len(resulta)}') """============================核心汇算部件价格逻辑============================""" # 初始化资源字典 RESOURCE_PRICING = { "cpu": {}, "memory": {}, "disk": {}, "gpu": {} } # 处理查询结果并填充资源字典 for row in resulta: #debug(f'===row:{row}') type, model, unit_price, resource_unit, stock, avail_stock =\ row.get("type"),row.get("model"),\ row.get("unit_price"),row.get("resource_unit"),row.get("stock"),row.get("avail_stock") model = model.strip() #debug(f'===>>> {type}, {model}, {unit_price}, {resource_unit}, {stock}') if type in RESOURCE_PRICING: RESOURCE_PRICING[type][model] = { "unit_price": float(unit_price), "resource_unit": resource_unit, "stock": int(stock), "avail_stock": int(avail_stock) } debug(f'RESOURCE_PRICING:{RESOURCE_PRICING}') # 汇算计价核心逻辑 # 初始化结果 result = { "total_price": 0.0, "currency": "元", "price_details": [], "status": True, "message": "汇算正常" } # 计费周期转换系数(统一转换为小/时) DURATION_FACTOR = { "小时": 1, # 按小时计费 "天": 24, # 按天计费 "周": 168, # 按7天计算(按周计算) "月": 720, # 按30天计算(按月计费) "年": 8760 # 按365天计算(按年计费) } # 获取资源配置 resources = params_kw.get("resources", []) if not resources: result["status"] = False result["message"] = "异常:请求中必须包含资源信息" raise RuntimeError(f'{result["message"]}') # 获取使用时长并转换为小时 duration = int(params_kw.get("duration", 1)) duration_unit = params_kw.get("duration_unit", "小时") #debug(f'时间参数: {duration} {duration_unit} {DURATION_FACTOR.get(duration_unit, 1)} {duration * (DURATION_FACTOR.get(duration_unit, 1))}') duration_hours = duration * DURATION_FACTOR.get(duration_unit, 1) result["duration"] = duration result["duration_unit"] = duration_unit # 遍历计算每种资源的价格 for resource in resources: if isinstance(resource,str): resource = eval(resource) debug(f'用户输入resource:{resource}') resource_type = resource.get("type") amount = resource.get("amount", 0) if not isinstance(amount,int): amount = int(amount) try: if not resource_type: raise ValueError("不支持的部件类型") if amount == 0: debug(f'部件申请量为空: {amount}') continue # 获取资源定价信息 if resource_type in ["cpu", "gpu", "disk", "memory"]: # 需要查询型号的部件类型 model = resource.get("model") if not model: raise ValueError(f"{resource_type} 必须指定部件型号/系列") if not RESOURCE_PRICING.get(resource_type): raise ValueError(f"没有该类型组件") resource_info = RESOURCE_PRICING[resource_type].get(model) if not resource_info: raise ValueError(f"无效的 {resource_type} 部件型号: {model}") resource_name = f"{resource_type} {model}" else: raise ValueError(f"不支持的资源类型: {resource_type}") # 检查库存 if amount < 0: raise ValueError(f'amount:{amount} < 0, Not Allowed!!!') #此处待考虑,暂时选择提前限制cpu,memory,gpu if resource_info["avail_stock"] <= amount: debug(f'❌ {resource_name} 可用量: {resource_info["avail_stock"]}, 请求量:{amount}') raise ValueError(f'{resource_name} 库存可用量不足 (可用量: {resource_info["avail_stock"]}, 请求量: {amount})') #if resource_info["avail_stock"] <= amount and resource_type == "disk": # 其他部件类型只警告,磁盘类型会真实限制!!! # debug(f'❌ {resource_name} 可用量: {resource_info["avail_stock"]}, 请求量:{amount}') # raise ValueError(f'{resource_name} 库存可用量不足 (可用量: {resource_info["avail_stock"]}, 请求量: {amount})') # 计算价格,提取并验证 unit_price unit_price = round(float(resource_info["unit_price"]), 4) debug(f'unit_price:{unit_price},amount:{amount},duration_hours:{duration_hours}') subtotal = unit_price * float(amount) * float(duration_hours) debug(f'subtotal:{subtotal}') # 更新总价和明细 result["total_price"] += subtotal result["price_details"].append({ #"resource_name": resource_name, "model":model, "resource_type": resource_type, "amount": amount, "resource_unit": resource_info["resource_unit"], "unit_price": unit_price, "duration": duration, "duration_unit": duration_unit, "subtotal": round(subtotal, 2) }) except Exception as e: import traceback traceback.print_exc() # 记录错误并继续处理其他资源 result["price_details"].append({ "resource_type": resource_type, "error": str(e) }) if result["status"] == True: result["status"] = False result["message"] = str(e) global_status = False global_msg = str(e) # 结果格式化 result["total_price"] = round(result["total_price"], 2) """======================================================================""" return {'status': global_status,'msg': global_msg,'data': result} raise RuntimeError("中间逻辑报错") except Exception as e: global_status = False global_msg = f'获取部件组合价格失败:{e}' return {'status': global_status,'msg': global_msg} ret = await accrual_price(params_kw) return ret