kboss/b/cpcc/cpcbuy/determine_accommodat.dspy
2025-07-16 14:27:17 +08:00

221 lines
8.1 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

def static_avail_determinat(produce_pm,result):
# 预处理库存数据
#此处待考虑暂时选择提前限制cpu,memory,gpu
#for res in result:
#if res['type'] != 'disk':
# 1. cpu内存gpu等按照默认型号计量
# 2. 每个集群的硬件资源是一致的,因此计量上来说型号因素不影响
# res['model'] = 'STANDARD'
usable_map = {(item['type'], item['model']): (item['stock']-item['consumed']) for item in result}
debug(f"部件剩余库存数:{usable_map}")
# 单位转换函数
def convert_unit(value):
if isinstance(value, int):
return value
if isinstance(value, str):
if value.endswith('m'): # CPU millicores
return float(value[:-1]) / 1000
elif value.endswith('Gi'): # Memory Gi
return float(value[:-2])
elif value.endswith('Mi'): # Memory Mi -> Gi
return float(value[:-2]) / 1000
return float(value)
# 检查超出的资源
exceeded_resources = []
for req in produce_pm:
#此处待考虑暂时选择提前限制cpu,memory,gpu
#if req['type'] != 'disk':
# debug(f"{req['type']}跳过,后续动态判断库存~")
# continue
key = (req['type'], req['model'])
usable = usable_map.get(key, 0)
amount = convert_unit(req['amount'])
# 这里规定,不能完全占满,只能略小于,毕竟节点上可能还有其他服务会动态占用资源
if amount >= usable:
if req['type'] == "gpu" and usable == 0:
debug(f'【gpu库存为0】&【可用量为0】&【申请量等于0】,此次允许部署CPU实例(其他情况一律不允许部署!)')
continue
exceeded_resources.append({
'type': req['type'],
'model': req['model'],
'requested': amount,
'usable(stock-consumed)': usable
})
#debug(f"超出库存的部件请求:exceeded_resources")
return exceeded_resources
def replace_labels(produce_pm, kyylabels):
# 解析 kyylabels 字符串为字典
labels_dict = {}
for label in kyylabels.split(','):
if '=' in label:
key, value = label.split('=', 1)
labels_dict[key] = value
# 处理 produce_pm 列表假设长度为1
if not produce_pm:
return []
#items = produce_pm[0].copy() # 创建副本,避免修改原始数据
items = []
# 替换 cpu-model 和 gpu-model
kyy_cpu_key = 'kyy-cpu-model'
kyy_gpu_key = 'kyy-gpu-model'
for item in produce_pm:
if kyy_cpu_key in labels_dict and item['type'] == 'cpu':
item['model'] = labels_dict[kyy_cpu_key]
if kyy_gpu_key in labels_dict and item['type'] == 'gpu':
item['model'] = labels_dict[kyy_gpu_key]
items.append(item)
return items
async def determine_accommodat(params_kw={}):
ns = params_kw.copy()
#debug(f'判断站点可部署哪些部件组合:{params_kw}')
'''
{'cpcid': 'AROU9udKtPNyh0AZtO_WY',
'clusterid': '4hBm8atruISOU2bs24t_N',
'resources': [
'68994ca4-0e66-4839-93b8-f76fb46432cb':
{
"type": "cpu",
"model": "AMD EPYC 7542 32-Core Processor",
"amount": "1000m"
},
{
"type": "memory",
"model": "STANDARD",
"amount": "512Mi"
},
{
"type": "gpu",
"model": "RTX4090-24G",
"amount": 0
},g
{
"type": "disk",
"model": "SYS",
"amount": 10
},
{
"type": "disk",
"model": "DATA",
"amount": "20Gi"
}
]
}
'''
#return {'status': False,'msg': '判断站点可部署哪些部件组合错误'}
cpcid = params_kw.cpcid
clusterid = params_kw.clusterid
resources = params_kw.resources
kyylabels = params_kw.kyylabels
dbname = "kboss"
db = DBPools()
async with db.sqlorContext(dbname) as sor:
# ----------------------运营角色下先判断数据库中各组件的库存----------------------
if len(resources) <= 1 and isinstance(kyylabels,str):
produce_id = next(iter(resources))
produce_pm = resources[produce_id]
#debug(f'旧produce_pm:{produce_pm},kyylabels:{kyylabels}')
debug(f'统一produce_pm:{produce_pm},kyylabels:{kyylabels}')
#produce_pm = replace_labels(produce_pm, kyylabels)
#debug(f'新produce_pm:{produce_pm}')
debug(f"【检查数据库步骤】检查各部件库存: {produce_id}")
y_sql = f'''select type,model,stock,consumed from cpcwidget where cpcid="{cpcid}" and clusterid="{clusterid}"'''
result = await sor.sqlExe(y_sql, {})
ret = static_avail_determinat(produce_pm, result)
#debug(f"cpcwidget:{result} -> {ret}")
if len(ret) >= 1:
debug(f"申请库存量超限: {ret}")
return {'status': True,'msg': '获取不可部署产品ID成功','data': [produce_id], 'reason':'static'}
debug(f"========数据库库存检查通过========")
# ----------------------下面是远程API判断该集群工作节点中库存----------------------
debug(f"【远程API判断步骤】判断该集群工作节点中库存")
# 通过算力中心ID,获取算力中心部署所在设备的http(s)协议信息
cpcs = await sor.R('cpclist', {'id':cpcid})
if len(cpcs) < 1:
e = Exception(f'cpclist {cpcid=} not exists')
exception(f'{e}')
raise e
cpc = cpcs[0]
# 通过集群ID,获取算力集群控制节点所在设备的ssh协议参数
cpclusters = await sor.R('cpccluster', {'id':clusterid})
if len(cpclusters) < 1:
e = Exception(f'cpclist {clusterid=} not exists')
exception(f'{e}')
raise e
cpcluster = cpclusters[0]
kubeconfig = cpcluster.get("kubeconfig")
url = cpc.pcapi_url + "/pcapi/api/v1/cluster/common/determine_accommodat"
debug(f"请求url: {url=}")
# 请求方式待定,取决于获取参数值方式
headers = basic_auth_headers(cpc.api_user, cpc.api_pwd)
hc = HttpClient()
import requests
params = {
'kubeconfig':kubeconfig,
'resources':json.dumps(resources)
}
#debug(f'请求参数{params=}')
#resp = await hc.request(url, method='GET',
# headers = headers,
# data=params
#)
# 框架不支持超时时间
#resp = requests.post(url,
# headers = headers,
# data=params,
# timeout=500,
# verify=False
#)
#debug(f'{type(resp)=}->{resp.status_code=}')
#if resp.status_code != 200:
# raise ValueError(f"http status_code:{resp.status_code}")
await_resp = await async_post(
url=url,
headers=headers,
data=params,
timeout=500,
verify=False
)
status_code = await_resp.get('status_code',505)
if status_code != 200:
raise
return {'status': False, 'msg': f'算力中心服务操作失败{status_code}'}
resp = await_resp.get("content")
debug(f'{type(resp)=}->{resp=}')
debug(f"pcapi返回值: {json.loads(resp)=}")
data = json.loads(resp).get('data')
if json.loads(resp).get("status") == True:
return {'status': True,'msg': '获取不可部署产品ID成功','data': data, 'reason':'dynamic'}
else:
return {'status': False,'msg': '获取不可部署产品ID失败', 'reason':'dynamic'}
return {'status': False,'msg': '其它异常!'}
ret = await determine_accommodat(params_kw)
return ret