salescrm/b/cpcc/cpcpodyaml/new_cpcpodyaml.dspy
2025-10-27 15:50:44 +08:00

172 lines
8.3 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 单位转换函数
def convert_unit(value):
if isinstance(value, int):
return value
if isinstance(value, str):
if value.endswith('m'): # CPU millicores
return float(value[:-1]) / 1000
elif value.endswith('Gi'): # Memory Gi
return float(value[:-2])
elif value.endswith('Mi'): # Memory Mi -> Gi
return float(value[:-2]) / 1024
return float(value)
async def new_cpcpodyaml(params_kw={}):
# params_kw:
# clusterid
debug(f'Web参数{params_kw=}')
# dbname = get_module_dbname('cpcc')
dbname = 'kboss'
db = DBPools()
clusterid = params_kw.clusterid
async with db.sqlorContext(dbname) as sor:
# clusterid -> nodeid
cpclusters = await sor.R('cpccluster', {'id': clusterid})
if len(cpclusters) < 1:
e = Exception(f'cpclist {clusterid=} not exists')
exception(f'{e}')
raise e
cpcluster = cpclusters[0]
# 此处的nodeid即控制节点id
kubeconfig = cpcluster.get("kubeconfig")
nodeid = cpcluster.get("controllerid")
cpcid = cpcluster.get("cpcid")
cpcs = await sor.R('cpclist', {'id': cpcid})
if len(cpcs) < 1:
e = Exception(f'cpclist {cpcid=} not exists')
exception(f'{e}')
raise e
cpc = cpcs[0]
nodes = await sor.R('cpcnode', {'id': nodeid})
# 这里有问题:没有匹配到节点信息!!!
if len(nodes) < 1:
e = Exception(f'cpcnode {nodeid=} 节点基础信息不匹配')
exception(f'{e}')
raise e
node = nodes[0]
url = cpc.pcapi_url + "/pcapi/api/v1/cluster/common/yaml_apply"
debug(f"请求url: {url=}")
debug(f"目标IP认证信息: {node.ip=} {node.sshport=} {node.adminuser=} {node.adminpwd=}")
# 请求方式待定,取决于获取参数值方式
userid = await get_user()
if not userid:
userid = params_kw.get("userid")
orgid = await get_userorgid()
if not orgid:
orgid = params_kw.get("orgid")
if not userid:
userid = params_kw.get("userid")
debug(f' >>> 当前用户组织ID: {orgid} 用户ID: {userid}')
if not userid:
return {'status': False, 'msg': '无组织ID'}
if not orgid:
return {'status': False, 'msg': '无组织ID'}
headers = basic_auth_headers(cpc.api_user, cpc.api_pwd)
ns_name = clusterid.replace("_", "-") + "-" + orgid # 集群信息+组织信息合成命名空间
ns_name = ns_name.lower()
ns_name = ns_name[:-1] + "kube" if ns_name.endswith('-') else ns_name
hc = HttpClient()
# type参数更新和新增都是为apply删除为delete
yamlconfig_id = uuid().replace("_", "-").lower()
import requests
params = {
'cluster_type': "k8s",
'host': node.ip,
'port': node.sshport,
'user': node.adminuser,
'password': node.adminpwd,
'kubeconfig': kubeconfig,
'action': 'apply',
'namespace_name': ns_name,
'serviceaccount_name': ns_name + "-serviceaccount",
'podcd_name': yamlconfig_id + "-" + params_kw.get("source_podengine").lower(),
'service_name': yamlconfig_id + "-service",
'instance_type': params_kw.get("instance_type") # 目前仅支持RelationalDB和LinuxOS
}
params.update(params_kw)
keys_to_remove = ['_webbricks_', 'width', 'height', '_is_mobile']
for key in keys_to_remove:
if key in params:
del params[key]
# 生成可用的外部映射端口号
search_port_sql = f'''select source_outsideport from yaml_config where cpcid='{cpcid}' and clusterid='{clusterid}' group by source_outsideport'''
debug(f'查询已用端口sql: {search_port_sql}')
results = await sor.sqlExe(search_port_sql, {})
used_ports = [x.get("source_outsideport") for x in results]
min_num, max_num = 30000, 32766
num = int()
while True:
num = random.randint(min_num, max_num)
if num not in used_ports:
break
debug(f'已用端口列表: {used_ports} 可用端口是======= {num}')
params['source_outsideport'] = num
#return {'status': False, 'msg': f'此次分配端口:{num}'}
debug(f"============================== 正在消耗系统盘库存 ==============================")
reduce_disksys_consumed = convert_unit(params_kw.disk_sys_limit)
reduce_sql_1 = f'''update cpcwidget set consumed = case when cpcid = '{cpcid}' and type = 'disk' and clusterid = '{clusterid}' and model = 'SYS' then greatest(consumed + {reduce_disksys_consumed}, 0) else consumed end'''
debug(f'库存减少sql-1: {reduce_sql_1}')
await sor.sqlExe(reduce_sql_1, {})
debug(f"============================== 正在消耗数据盘库存 ==============================")
reduce_diskdata_consumed = convert_unit(params_kw.source_storagelimits)
reduce_sql_2 = f'''update cpcwidget set consumed = case when cpcid = '{cpcid}' and type = 'disk' and clusterid = '{clusterid}' and model = 'DATA' then greatest(consumed + {reduce_diskdata_consumed}, 0) else consumed end'''
debug(f'库存减少sql-2: {reduce_sql_2}')
await sor.sqlExe(reduce_sql_2, {})
debug(f"============================== 正在消耗CPU库存 ==============================")
reduce_cpu_consumed = convert_unit(params_kw.source_cpurate)
reduce_sql_3 = f'''update cpcwidget set consumed = case when cpcid = '{cpcid}' and type = 'cpu' and clusterid = '{clusterid}' and model = '{params_kw.cpu_model}' then greatest(consumed + {reduce_cpu_consumed}, 0) else consumed end'''
debug(f'库存减少sql-3: {reduce_sql_3}')
await sor.sqlExe(reduce_sql_3, {})
debug(f"============================== 正在消耗内存库存 ==============================")
reduce_memory_consumed = convert_unit(params_kw.source_memrate)
reduce_sql_4 = f'''update cpcwidget set consumed = case when cpcid = '{cpcid}' and type = 'memory' and clusterid = '{clusterid}' and model = 'STANDARD' then greatest(consumed + {reduce_memory_consumed}, 0) else consumed end'''
debug(f'库存减少sql-4: {reduce_sql_4}')
await sor.sqlExe(reduce_sql_4, {})
debug(f"============================== 正在消耗GPU库存 ==============================")
reduce_gpu_consumed = convert_unit(params_kw.source_gpu)
reduce_sql_5 = f'''update cpcwidget set consumed = case when cpcid = '{cpcid}' and type = 'gpu' and clusterid = '{clusterid}' and model = '{params_kw.gpu_model}' then greatest(consumed + {reduce_gpu_consumed}, 0) else consumed end'''
debug(f'库存减少sql-5: {reduce_sql_5}')
await sor.sqlExe(reduce_sql_5, {})
await_resp = await async_post(
url=url,
headers=headers,
data=params,
timeout=500,
verify=False
)
status_code = await_resp.get('status_code', 505)
if status_code != 200:
raise
return {'status': False, 'msg': f'算力中心服务操作失败{status_code}'}
resp = await_resp.get("content")
debug(f'{type(resp)=}->{resp=}')
keys_to_remove = ['cluster_type', 'host', 'port', 'user', 'password', 'kubeconfig']
for key in keys_to_remove:
if key in params:
del params[key]
params['id'] = yamlconfig_id
params['cpcid'] = cpcid
params['userid'] = userid
if json.loads(resp).get("status") == True:
params["update_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
debug(f"更新资源yaml配置元数据: {params=}")
await sor.C('yaml_config', params)
debug('⌛️ 资源实例创建成功,资源准备中...')
time.sleep(10)
return {'status': True, 'msg': '新增资源实例参数成功,请10秒后查看实时资源实例面板', 'data': params}
else:
raise
res = json.loads(resp).get("info")
return {'status': False, 'msg': f'新增资源实例参数失败:{res}'}
return {'status': False, 'msg': '创建失败'}
ret = await new_cpcpodyaml(params_kw)
return ret