221 lines
8.1 KiB
Plaintext
221 lines
8.1 KiB
Plaintext
def static_avail_determinat(produce_pm,result):
|
||
# 预处理库存数据
|
||
|
||
#此处待考虑,暂时选择提前限制cpu,memory,gpu
|
||
#for res in result:
|
||
#if res['type'] != 'disk':
|
||
# 1. cpu,内存,gpu等按照默认型号计量;
|
||
# 2. 每个集群的硬件资源是一致的,因此计量上来说型号因素不影响
|
||
# res['model'] = 'STANDARD'
|
||
|
||
usable_map = {(item['type'], item['model']): (item['stock']-item['consumed']) for item in result}
|
||
debug(f"部件剩余库存数:{usable_map}")
|
||
|
||
# 单位转换函数
|
||
def convert_unit(value):
|
||
if isinstance(value, int):
|
||
return value
|
||
if isinstance(value, str):
|
||
if value.endswith('m'): # CPU millicores
|
||
return float(value[:-1]) / 1000
|
||
elif value.endswith('Gi'): # Memory Gi
|
||
return float(value[:-2])
|
||
elif value.endswith('Mi'): # Memory Mi -> Gi
|
||
return float(value[:-2]) / 1000
|
||
return float(value)
|
||
|
||
# 检查超出的资源
|
||
exceeded_resources = []
|
||
for req in produce_pm:
|
||
|
||
#此处待考虑,暂时选择提前限制cpu,memory,gpu
|
||
#if req['type'] != 'disk':
|
||
# debug(f"{req['type']}跳过,后续动态判断库存~")
|
||
# continue
|
||
|
||
key = (req['type'], req['model'])
|
||
usable = usable_map.get(key, 0)
|
||
amount = convert_unit(req['amount'])
|
||
# 这里规定,不能完全占满,只能略小于,毕竟节点上可能还有其他服务会动态占用资源
|
||
if amount >= usable:
|
||
if req['type'] == "gpu" and usable == 0:
|
||
debug(f'【gpu库存为0】&【可用量为0】&【申请量等于0】,此次允许部署CPU实例(其他情况一律不允许部署!)')
|
||
continue
|
||
exceeded_resources.append({
|
||
'type': req['type'],
|
||
'model': req['model'],
|
||
'requested': amount,
|
||
'usable(stock-consumed)': usable
|
||
})
|
||
|
||
#debug(f"超出库存的部件请求:exceeded_resources")
|
||
return exceeded_resources
|
||
|
||
def replace_labels(produce_pm, kyylabels):
|
||
# 解析 kyylabels 字符串为字典
|
||
labels_dict = {}
|
||
for label in kyylabels.split(','):
|
||
if '=' in label:
|
||
key, value = label.split('=', 1)
|
||
labels_dict[key] = value
|
||
|
||
# 处理 produce_pm 列表(假设长度为1)
|
||
if not produce_pm:
|
||
return []
|
||
|
||
#items = produce_pm[0].copy() # 创建副本,避免修改原始数据
|
||
items = []
|
||
|
||
# 替换 cpu-model 和 gpu-model
|
||
kyy_cpu_key = 'kyy-cpu-model'
|
||
kyy_gpu_key = 'kyy-gpu-model'
|
||
|
||
for item in produce_pm:
|
||
if kyy_cpu_key in labels_dict and item['type'] == 'cpu':
|
||
item['model'] = labels_dict[kyy_cpu_key]
|
||
if kyy_gpu_key in labels_dict and item['type'] == 'gpu':
|
||
item['model'] = labels_dict[kyy_gpu_key]
|
||
items.append(item)
|
||
|
||
return items
|
||
|
||
async def determine_accommodat(params_kw={}):
|
||
ns = params_kw.copy()
|
||
|
||
#debug(f'判断站点可部署哪些部件组合:{params_kw}')
|
||
'''
|
||
{'cpcid': 'AROU9udKtPNyh0AZtO_WY',
|
||
'clusterid': '4hBm8atruISOU2bs24t_N',
|
||
'resources': [
|
||
'68994ca4-0e66-4839-93b8-f76fb46432cb':
|
||
{
|
||
"type": "cpu",
|
||
"model": "AMD EPYC 7542 32-Core Processor",
|
||
"amount": "1000m"
|
||
},
|
||
{
|
||
"type": "memory",
|
||
"model": "STANDARD",
|
||
"amount": "512Mi"
|
||
},
|
||
{
|
||
"type": "gpu",
|
||
"model": "RTX4090-24G",
|
||
"amount": 0
|
||
},g
|
||
{
|
||
"type": "disk",
|
||
"model": "SYS",
|
||
"amount": 10
|
||
},
|
||
{
|
||
"type": "disk",
|
||
"model": "DATA",
|
||
"amount": "20Gi"
|
||
}
|
||
]
|
||
}
|
||
'''
|
||
#return {'status': False,'msg': '判断站点可部署哪些部件组合错误'}
|
||
|
||
cpcid = params_kw.cpcid
|
||
clusterid = params_kw.clusterid
|
||
resources = params_kw.resources
|
||
kyylabels = params_kw.kyylabels
|
||
|
||
dbname = "kboss"
|
||
db = DBPools()
|
||
async with db.sqlorContext(dbname) as sor:
|
||
# ----------------------运营角色下先判断数据库中各组件的库存----------------------
|
||
if len(resources) <= 1 and isinstance(kyylabels,str):
|
||
produce_id = next(iter(resources))
|
||
produce_pm = resources[produce_id]
|
||
#debug(f'旧produce_pm:{produce_pm},kyylabels:{kyylabels}')
|
||
debug(f'统一produce_pm:{produce_pm},kyylabels:{kyylabels}')
|
||
#produce_pm = replace_labels(produce_pm, kyylabels)
|
||
#debug(f'新produce_pm:{produce_pm}')
|
||
|
||
debug(f"【检查数据库步骤】检查各部件库存: {produce_id}")
|
||
y_sql = f'''select type,model,stock,consumed from cpcwidget where cpcid="{cpcid}" and clusterid="{clusterid}"'''
|
||
result = await sor.sqlExe(y_sql, {})
|
||
ret = static_avail_determinat(produce_pm, result)
|
||
#debug(f"cpcwidget:{result} -> {ret}")
|
||
if len(ret) >= 1:
|
||
debug(f"申请库存量超限: {ret}")
|
||
return {'status': True,'msg': '获取不可部署产品ID成功','data': [produce_id], 'reason':'static'}
|
||
debug(f"========数据库库存检查通过========")
|
||
# ----------------------下面是远程API判断该集群工作节点中库存----------------------
|
||
debug(f"【远程API判断步骤】判断该集群工作节点中库存")
|
||
# 通过算力中心ID,获取算力中心部署所在设备的http(s)协议信息
|
||
cpcs = await sor.R('cpclist', {'id':cpcid})
|
||
if len(cpcs) < 1:
|
||
e = Exception(f'cpclist {cpcid=} not exists')
|
||
exception(f'{e}')
|
||
raise e
|
||
cpc = cpcs[0]
|
||
# 通过集群ID,获取算力集群控制节点所在设备的ssh协议参数
|
||
cpclusters = await sor.R('cpccluster', {'id':clusterid})
|
||
if len(cpclusters) < 1:
|
||
e = Exception(f'cpclist {clusterid=} not exists')
|
||
exception(f'{e}')
|
||
raise e
|
||
cpcluster = cpclusters[0]
|
||
kubeconfig = cpcluster.get("kubeconfig")
|
||
|
||
url = cpc.pcapi_url + "/pcapi/api/v1/cluster/common/determine_accommodat"
|
||
debug(f"请求url: {url=}")
|
||
|
||
# 请求方式待定,取决于获取参数值方式
|
||
headers = basic_auth_headers(cpc.api_user, cpc.api_pwd)
|
||
|
||
hc = HttpClient()
|
||
|
||
import requests
|
||
params = {
|
||
'kubeconfig':kubeconfig,
|
||
'resources':json.dumps(resources)
|
||
}
|
||
#debug(f'请求参数{params=}')
|
||
|
||
#resp = await hc.request(url, method='GET',
|
||
# headers = headers,
|
||
# data=params
|
||
#)
|
||
# 框架不支持超时时间
|
||
#resp = requests.post(url,
|
||
# headers = headers,
|
||
# data=params,
|
||
# timeout=500,
|
||
# verify=False
|
||
#)
|
||
#debug(f'{type(resp)=}->{resp.status_code=}')
|
||
#if resp.status_code != 200:
|
||
# raise ValueError(f"http status_code:{resp.status_code}")
|
||
|
||
await_resp = await async_post(
|
||
url=url,
|
||
headers=headers,
|
||
data=params,
|
||
timeout=500,
|
||
verify=False
|
||
)
|
||
status_code = await_resp.get('status_code',505)
|
||
if status_code != 200:
|
||
raise
|
||
return {'status': False, 'msg': f'算力中心服务操作失败{status_code}'}
|
||
resp = await_resp.get("content")
|
||
debug(f'{type(resp)=}->{resp=}')
|
||
|
||
debug(f"pcapi返回值: {json.loads(resp)=}")
|
||
data = json.loads(resp).get('data')
|
||
if json.loads(resp).get("status") == True:
|
||
return {'status': True,'msg': '获取不可部署产品ID成功','data': data, 'reason':'dynamic'}
|
||
else:
|
||
return {'status': False,'msg': '获取不可部署产品ID失败', 'reason':'dynamic'}
|
||
|
||
return {'status': False,'msg': '其它异常!'}
|
||
|
||
|
||
|
||
ret = await determine_accommodat(params_kw)
|
||
return ret |