172 lines
8.3 KiB
Plaintext
172 lines
8.3 KiB
Plaintext
# 单位转换函数
|
||
def convert_unit(value):
|
||
if isinstance(value, int):
|
||
return value
|
||
if isinstance(value, str):
|
||
if value.endswith('m'): # CPU millicores
|
||
return float(value[:-1]) / 1000
|
||
elif value.endswith('Gi'): # Memory Gi
|
||
return float(value[:-2])
|
||
elif value.endswith('Mi'): # Memory Mi -> Gi
|
||
return float(value[:-2]) / 1024
|
||
return float(value)
|
||
|
||
async def new_cpcpodyaml(params_kw={}):
|
||
# params_kw:
|
||
# clusterid
|
||
debug(f'Web参数:{params_kw=}')
|
||
# dbname = get_module_dbname('cpcc')
|
||
dbname = 'kboss'
|
||
db = DBPools()
|
||
clusterid = params_kw.clusterid
|
||
|
||
async with db.sqlorContext(dbname) as sor:
|
||
# clusterid -> nodeid
|
||
cpclusters = await sor.R('cpccluster', {'id': clusterid})
|
||
if len(cpclusters) < 1:
|
||
e = Exception(f'cpclist {clusterid=} not exists')
|
||
exception(f'{e}')
|
||
raise e
|
||
cpcluster = cpclusters[0]
|
||
# 此处的nodeid即控制节点id
|
||
kubeconfig = cpcluster.get("kubeconfig")
|
||
nodeid = cpcluster.get("controllerid")
|
||
cpcid = cpcluster.get("cpcid")
|
||
cpcs = await sor.R('cpclist', {'id': cpcid})
|
||
if len(cpcs) < 1:
|
||
e = Exception(f'cpclist {cpcid=} not exists')
|
||
exception(f'{e}')
|
||
raise e
|
||
cpc = cpcs[0]
|
||
nodes = await sor.R('cpcnode', {'id': nodeid})
|
||
# 这里有问题:没有匹配到节点信息!!!
|
||
if len(nodes) < 1:
|
||
e = Exception(f'cpcnode {nodeid=} 节点基础信息不匹配')
|
||
exception(f'{e}')
|
||
raise e
|
||
node = nodes[0]
|
||
url = cpc.pcapi_url + "/pcapi/api/v1/cluster/common/yaml_apply"
|
||
debug(f"请求url: {url=}")
|
||
debug(f"目标IP认证信息: {node.ip=} {node.sshport=} {node.adminuser=} {node.adminpwd=}")
|
||
# 请求方式待定,取决于获取参数值方式
|
||
userid = await get_user()
|
||
if not userid:
|
||
userid = params_kw.get("userid")
|
||
orgid = await get_userorgid()
|
||
if not orgid:
|
||
orgid = params_kw.get("orgid")
|
||
if not userid:
|
||
userid = params_kw.get("userid")
|
||
debug(f' >>> 当前用户组织ID: {orgid} 用户ID: {userid}')
|
||
if not userid:
|
||
return {'status': False, 'msg': '无组织ID'}
|
||
if not orgid:
|
||
return {'status': False, 'msg': '无组织ID'}
|
||
headers = basic_auth_headers(cpc.api_user, cpc.api_pwd)
|
||
ns_name = clusterid.replace("_", "-") + "-" + orgid # 集群信息+组织信息合成命名空间
|
||
ns_name = ns_name.lower()
|
||
ns_name = ns_name[:-1] + "kube" if ns_name.endswith('-') else ns_name
|
||
hc = HttpClient()
|
||
# type参数:更新和新增都是为apply,删除为delete
|
||
yamlconfig_id = uuid().replace("_", "-").lower()
|
||
import requests
|
||
params = {
|
||
'cluster_type': "k8s",
|
||
'host': node.ip,
|
||
'port': node.sshport,
|
||
'user': node.adminuser,
|
||
'password': node.adminpwd,
|
||
'kubeconfig': kubeconfig,
|
||
'action': 'apply',
|
||
'namespace_name': ns_name,
|
||
'serviceaccount_name': ns_name + "-serviceaccount",
|
||
'podcd_name': yamlconfig_id + "-" + params_kw.get("source_podengine").lower(),
|
||
'service_name': yamlconfig_id + "-service",
|
||
'instance_type': params_kw.get("instance_type") # 目前仅支持RelationalDB和LinuxOS
|
||
}
|
||
params.update(params_kw)
|
||
|
||
keys_to_remove = ['_webbricks_', 'width', 'height', '_is_mobile']
|
||
for key in keys_to_remove:
|
||
if key in params:
|
||
del params[key]
|
||
|
||
# 生成可用的外部映射端口号
|
||
search_port_sql = f'''select source_outsideport from yaml_config where cpcid='{cpcid}' and clusterid='{clusterid}' group by source_outsideport'''
|
||
debug(f'查询已用端口sql: {search_port_sql}')
|
||
results = await sor.sqlExe(search_port_sql, {})
|
||
used_ports = [x.get("source_outsideport") for x in results]
|
||
min_num, max_num = 30000, 32766
|
||
num = int()
|
||
while True:
|
||
num = random.randint(min_num, max_num)
|
||
if num not in used_ports:
|
||
break
|
||
debug(f'已用端口列表: {used_ports} 可用端口是======= {num}')
|
||
params['source_outsideport'] = num
|
||
#return {'status': False, 'msg': f'此次分配端口:{num}'}
|
||
|
||
debug(f"============================== 正在消耗系统盘库存 ==============================")
|
||
reduce_disksys_consumed = convert_unit(params_kw.disk_sys_limit)
|
||
reduce_sql_1 = f'''update cpcwidget set consumed = case when cpcid = '{cpcid}' and type = 'disk' and clusterid = '{clusterid}' and model = 'SYS' then greatest(consumed + {reduce_disksys_consumed}, 0) else consumed end'''
|
||
debug(f'库存减少sql-1: {reduce_sql_1}')
|
||
await sor.sqlExe(reduce_sql_1, {})
|
||
debug(f"============================== 正在消耗数据盘库存 ==============================")
|
||
reduce_diskdata_consumed = convert_unit(params_kw.source_storagelimits)
|
||
reduce_sql_2 = f'''update cpcwidget set consumed = case when cpcid = '{cpcid}' and type = 'disk' and clusterid = '{clusterid}' and model = 'DATA' then greatest(consumed + {reduce_diskdata_consumed}, 0) else consumed end'''
|
||
debug(f'库存减少sql-2: {reduce_sql_2}')
|
||
await sor.sqlExe(reduce_sql_2, {})
|
||
debug(f"============================== 正在消耗CPU库存 ==============================")
|
||
reduce_cpu_consumed = convert_unit(params_kw.source_cpurate)
|
||
reduce_sql_3 = f'''update cpcwidget set consumed = case when cpcid = '{cpcid}' and type = 'cpu' and clusterid = '{clusterid}' and model = '{params_kw.cpu_model}' then greatest(consumed + {reduce_cpu_consumed}, 0) else consumed end'''
|
||
debug(f'库存减少sql-3: {reduce_sql_3}')
|
||
await sor.sqlExe(reduce_sql_3, {})
|
||
debug(f"============================== 正在消耗内存库存 ==============================")
|
||
reduce_memory_consumed = convert_unit(params_kw.source_memrate)
|
||
reduce_sql_4 = f'''update cpcwidget set consumed = case when cpcid = '{cpcid}' and type = 'memory' and clusterid = '{clusterid}' and model = 'STANDARD' then greatest(consumed + {reduce_memory_consumed}, 0) else consumed end'''
|
||
debug(f'库存减少sql-4: {reduce_sql_4}')
|
||
await sor.sqlExe(reduce_sql_4, {})
|
||
debug(f"============================== 正在消耗GPU库存 ==============================")
|
||
reduce_gpu_consumed = convert_unit(params_kw.source_gpu)
|
||
reduce_sql_5 = f'''update cpcwidget set consumed = case when cpcid = '{cpcid}' and type = 'gpu' and clusterid = '{clusterid}' and model = '{params_kw.gpu_model}' then greatest(consumed + {reduce_gpu_consumed}, 0) else consumed end'''
|
||
debug(f'库存减少sql-5: {reduce_sql_5}')
|
||
await sor.sqlExe(reduce_sql_5, {})
|
||
|
||
await_resp = await async_post(
|
||
url=url,
|
||
headers=headers,
|
||
data=params,
|
||
timeout=500,
|
||
verify=False
|
||
)
|
||
status_code = await_resp.get('status_code', 505)
|
||
if status_code != 200:
|
||
raise
|
||
return {'status': False, 'msg': f'算力中心服务操作失败{status_code}'}
|
||
resp = await_resp.get("content")
|
||
debug(f'{type(resp)=}->{resp=}')
|
||
keys_to_remove = ['cluster_type', 'host', 'port', 'user', 'password', 'kubeconfig']
|
||
for key in keys_to_remove:
|
||
if key in params:
|
||
del params[key]
|
||
|
||
params['id'] = yamlconfig_id
|
||
params['cpcid'] = cpcid
|
||
params['userid'] = userid
|
||
if json.loads(resp).get("status") == True:
|
||
params["update_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
debug(f"更新资源yaml配置元数据: {params=}")
|
||
await sor.C('yaml_config', params)
|
||
debug('⌛️ 资源实例创建成功,资源准备中...')
|
||
time.sleep(10)
|
||
return {'status': True, 'msg': '新增资源实例参数成功,请10秒后查看实时资源实例面板', 'data': params}
|
||
else:
|
||
raise
|
||
res = json.loads(resp).get("info")
|
||
return {'status': False, 'msg': f'新增资源实例参数失败:{res}'}
|
||
|
||
return {'status': False, 'msg': '创建失败'}
|
||
|
||
ret = await new_cpcpodyaml(params_kw)
|
||
return ret
|