def static_avail_determinat(produce_pm,result): # 预处理库存数据 #此处待考虑,暂时选择提前限制cpu,memory,gpu #for res in result: #if res['type'] != 'disk': # 1. cpu,内存,gpu等按照默认型号计量; # 2. 每个集群的硬件资源是一致的,因此计量上来说型号因素不影响 # res['model'] = 'STANDARD' usable_map = {(item['type'], item['model']): (item['stock']-item['consumed']) for item in result} debug(f"部件剩余库存数:{usable_map}") # 单位转换函数 def convert_unit(value): if isinstance(value, int): return value if isinstance(value, str): if value.endswith('m'): # CPU millicores return float(value[:-1]) / 1000 elif value.endswith('Gi'): # Memory Gi return float(value[:-2]) elif value.endswith('Mi'): # Memory Mi -> Gi return float(value[:-2]) / 1000 return float(value) # 检查超出的资源 exceeded_resources = [] for req in produce_pm: #此处待考虑,暂时选择提前限制cpu,memory,gpu #if req['type'] != 'disk': # debug(f"{req['type']}跳过,后续动态判断库存~") # continue key = (req['type'], req['model']) usable = usable_map.get(key, 0) amount = convert_unit(req['amount']) # 这里规定,不能完全占满,只能略小于,毕竟节点上可能还有其他服务会动态占用资源 if amount >= usable: if req['type'] == "gpu" and usable == 0: debug(f'【gpu库存为0】&【可用量为0】&【申请量等于0】,此次允许部署CPU实例(其他情况一律不允许部署!)') continue exceeded_resources.append({ 'type': req['type'], 'model': req['model'], 'requested': amount, 'usable(stock-consumed)': usable }) #debug(f"超出库存的部件请求:exceeded_resources") return exceeded_resources def replace_labels(produce_pm, kyylabels): # 解析 kyylabels 字符串为字典 labels_dict = {} for label in kyylabels.split(','): if '=' in label: key, value = label.split('=', 1) labels_dict[key] = value # 处理 produce_pm 列表(假设长度为1) if not produce_pm: return [] #items = produce_pm[0].copy() # 创建副本,避免修改原始数据 items = [] # 替换 cpu-model 和 gpu-model kyy_cpu_key = 'kyy-cpu-model' kyy_gpu_key = 'kyy-gpu-model' for item in produce_pm: if kyy_cpu_key in labels_dict and item['type'] == 'cpu': item['model'] = labels_dict[kyy_cpu_key] if kyy_gpu_key in labels_dict and item['type'] == 'gpu': item['model'] = labels_dict[kyy_gpu_key] items.append(item) return items async def determine_accommodat(params_kw={}): ns = params_kw.copy() #debug(f'判断站点可部署哪些部件组合:{params_kw}') ''' {'cpcid': 'AROU9udKtPNyh0AZtO_WY', 'clusterid': '4hBm8atruISOU2bs24t_N', 'resources': [ '68994ca4-0e66-4839-93b8-f76fb46432cb': { "type": "cpu", "model": "AMD EPYC 7542 32-Core Processor", "amount": "1000m" }, { "type": "memory", "model": "STANDARD", "amount": "512Mi" }, { "type": "gpu", "model": "RTX4090-24G", "amount": 0 },g { "type": "disk", "model": "SYS", "amount": 10 }, { "type": "disk", "model": "DATA", "amount": "20Gi" } ] } ''' #return {'status': False,'msg': '判断站点可部署哪些部件组合错误'} cpcid = params_kw.cpcid clusterid = params_kw.clusterid resources = params_kw.resources kyylabels = params_kw.kyylabels dbname = "kboss" db = DBPools() async with db.sqlorContext(dbname) as sor: # ----------------------运营角色下先判断数据库中各组件的库存---------------------- if len(resources) <= 1 and isinstance(kyylabels,str): produce_id = next(iter(resources)) produce_pm = resources[produce_id] #debug(f'旧produce_pm:{produce_pm},kyylabels:{kyylabels}') debug(f'统一produce_pm:{produce_pm},kyylabels:{kyylabels}') #produce_pm = replace_labels(produce_pm, kyylabels) #debug(f'新produce_pm:{produce_pm}') debug(f"【检查数据库步骤】检查各部件库存: {produce_id}") y_sql = f'''select type,model,stock,consumed from cpcwidget where cpcid="{cpcid}" and clusterid="{clusterid}"''' result = await sor.sqlExe(y_sql, {}) ret = static_avail_determinat(produce_pm, result) #debug(f"cpcwidget:{result} -> {ret}") if len(ret) >= 1: debug(f"申请库存量超限: {ret}") return {'status': True,'msg': '获取不可部署产品ID成功','data': [produce_id], 'reason':'static'} debug(f"========数据库库存检查通过========") # ----------------------下面是远程API判断该集群工作节点中库存---------------------- debug(f"【远程API判断步骤】判断该集群工作节点中库存") # 通过算力中心ID,获取算力中心部署所在设备的http(s)协议信息 cpcs = await sor.R('cpclist', {'id':cpcid}) if len(cpcs) < 1: e = Exception(f'cpclist {cpcid=} not exists') exception(f'{e}') raise e cpc = cpcs[0] # 通过集群ID,获取算力集群控制节点所在设备的ssh协议参数 cpclusters = await sor.R('cpccluster', {'id':clusterid}) if len(cpclusters) < 1: e = Exception(f'cpclist {clusterid=} not exists') exception(f'{e}') raise e cpcluster = cpclusters[0] kubeconfig = cpcluster.get("kubeconfig") url = cpc.pcapi_url + "/pcapi/api/v1/cluster/common/determine_accommodat" debug(f"请求url: {url=}") # 请求方式待定,取决于获取参数值方式 headers = basic_auth_headers(cpc.api_user, cpc.api_pwd) hc = HttpClient() import requests params = { 'kubeconfig':kubeconfig, 'resources':json.dumps(resources) } #debug(f'请求参数{params=}') #resp = await hc.request(url, method='GET', # headers = headers, # data=params #) # 框架不支持超时时间 #resp = requests.post(url, # headers = headers, # data=params, # timeout=500, # verify=False #) #debug(f'{type(resp)=}->{resp.status_code=}') #if resp.status_code != 200: # raise ValueError(f"http status_code:{resp.status_code}") await_resp = await async_post( url=url, headers=headers, data=params, timeout=500, verify=False ) status_code = await_resp.get('status_code',505) if status_code != 200: raise return {'status': False, 'msg': f'算力中心服务操作失败{status_code}'} resp = await_resp.get("content") debug(f'{type(resp)=}->{resp=}') debug(f"pcapi返回值: {json.loads(resp)=}") data = json.loads(resp).get('data') if json.loads(resp).get("status") == True: return {'status': True,'msg': '获取不可部署产品ID成功','data': data, 'reason':'dynamic'} else: return {'status': False,'msg': '获取不可部署产品ID失败', 'reason':'dynamic'} return {'status': False,'msg': '其它异常!'} ret = await determine_accommodat(params_kw) return ret