# 单位转换函数 def convert_unit(value): if isinstance(value, int): return value if isinstance(value, str): if value.endswith('m'): # CPU millicores return float(value[:-1]) / 1000 elif value.endswith('Gi'): # Memory Gi return float(value[:-2]) elif value.endswith('Mi'): # Memory Mi -> Gi return float(value[:-2]) / 1024 return float(value) async def new_cpcpodyaml(params_kw={}): # params_kw: # clusterid debug(f'Web参数:{params_kw=}') # dbname = get_module_dbname('cpcc') dbname = 'kboss' db = DBPools() clusterid = params_kw.clusterid async with db.sqlorContext(dbname) as sor: # clusterid -> nodeid cpclusters = await sor.R('cpccluster', {'id': clusterid}) if len(cpclusters) < 1: e = Exception(f'cpclist {clusterid=} not exists') exception(f'{e}') raise e cpcluster = cpclusters[0] # 此处的nodeid即控制节点id kubeconfig = cpcluster.get("kubeconfig") nodeid = cpcluster.get("controllerid") cpcid = cpcluster.get("cpcid") cpcs = await sor.R('cpclist', {'id': cpcid}) if len(cpcs) < 1: e = Exception(f'cpclist {cpcid=} not exists') exception(f'{e}') raise e cpc = cpcs[0] nodes = await sor.R('cpcnode', {'id': nodeid}) # 这里有问题:没有匹配到节点信息!!! if len(nodes) < 1: e = Exception(f'cpcnode {nodeid=} 节点基础信息不匹配') exception(f'{e}') raise e node = nodes[0] url = cpc.pcapi_url + "/pcapi/api/v1/cluster/common/yaml_apply" debug(f"请求url: {url=}") debug(f"目标IP认证信息: {node.ip=} {node.sshport=} {node.adminuser=} {node.adminpwd=}") # 请求方式待定,取决于获取参数值方式 userid = await get_user() if not userid: userid = params_kw.get("userid") orgid = await get_userorgid() if not orgid: orgid = params_kw.get("orgid") if not userid: userid = params_kw.get("userid") debug(f' >>> 当前用户组织ID: {orgid} 用户ID: {userid}') if not userid: return {'status': False, 'msg': '无组织ID'} if not orgid: return {'status': False, 'msg': '无组织ID'} headers = basic_auth_headers(cpc.api_user, cpc.api_pwd) ns_name = clusterid.replace("_", "-") + "-" + orgid # 集群信息+组织信息合成命名空间 ns_name = ns_name.lower() ns_name = ns_name[:-1] + "kube" if ns_name.endswith('-') else ns_name hc = HttpClient() # type参数:更新和新增都是为apply,删除为delete yamlconfig_id = uuid().replace("_", "-").lower() import requests params = { 'cluster_type': "k8s", 'host': node.ip, 'port': node.sshport, 'user': node.adminuser, 'password': node.adminpwd, 'kubeconfig': kubeconfig, 'action': 'apply', 'namespace_name': ns_name, 'serviceaccount_name': ns_name + "-serviceaccount", 'podcd_name': yamlconfig_id + "-" + params_kw.get("source_podengine").lower(), 'service_name': yamlconfig_id + "-service", 'instance_type': params_kw.get("instance_type") # 目前仅支持RelationalDB和LinuxOS } params.update(params_kw) keys_to_remove = ['_webbricks_', 'width', 'height', '_is_mobile'] for key in keys_to_remove: if key in params: del params[key] # 生成可用的外部映射端口号 search_port_sql = f'''select source_outsideport from yaml_config where cpcid='{cpcid}' and clusterid='{clusterid}' group by source_outsideport''' debug(f'查询已用端口sql: {search_port_sql}') results = await sor.sqlExe(search_port_sql, {}) used_ports = [x.get("source_outsideport") for x in results] min_num, max_num = 30000, 32766 num = int() while True: num = random.randint(min_num, max_num) if num not in used_ports: break debug(f'已用端口列表: {used_ports} 可用端口是======= {num}') params['source_outsideport'] = num #return {'status': False, 'msg': f'此次分配端口:{num}'} debug(f"============================== 正在消耗系统盘库存 ==============================") reduce_disksys_consumed = convert_unit(params_kw.disk_sys_limit) reduce_sql_1 = f'''update cpcwidget set consumed = case when cpcid = '{cpcid}' and type = 'disk' and clusterid = '{clusterid}' and model = 'SYS' then greatest(consumed + {reduce_disksys_consumed}, 0) else consumed end''' debug(f'库存减少sql-1: {reduce_sql_1}') await sor.sqlExe(reduce_sql_1, {}) debug(f"============================== 正在消耗数据盘库存 ==============================") reduce_diskdata_consumed = convert_unit(params_kw.source_storagelimits) reduce_sql_2 = f'''update cpcwidget set consumed = case when cpcid = '{cpcid}' and type = 'disk' and clusterid = '{clusterid}' and model = 'DATA' then greatest(consumed + {reduce_diskdata_consumed}, 0) else consumed end''' debug(f'库存减少sql-2: {reduce_sql_2}') await sor.sqlExe(reduce_sql_2, {}) debug(f"============================== 正在消耗CPU库存 ==============================") reduce_cpu_consumed = convert_unit(params_kw.source_cpurate) reduce_sql_3 = f'''update cpcwidget set consumed = case when cpcid = '{cpcid}' and type = 'cpu' and clusterid = '{clusterid}' and model = '{params_kw.cpu_model}' then greatest(consumed + {reduce_cpu_consumed}, 0) else consumed end''' debug(f'库存减少sql-3: {reduce_sql_3}') await sor.sqlExe(reduce_sql_3, {}) debug(f"============================== 正在消耗内存库存 ==============================") reduce_memory_consumed = convert_unit(params_kw.source_memrate) reduce_sql_4 = f'''update cpcwidget set consumed = case when cpcid = '{cpcid}' and type = 'memory' and clusterid = '{clusterid}' and model = 'STANDARD' then greatest(consumed + {reduce_memory_consumed}, 0) else consumed end''' debug(f'库存减少sql-4: {reduce_sql_4}') await sor.sqlExe(reduce_sql_4, {}) debug(f"============================== 正在消耗GPU库存 ==============================") reduce_gpu_consumed = convert_unit(params_kw.source_gpu) reduce_sql_5 = f'''update cpcwidget set consumed = case when cpcid = '{cpcid}' and type = 'gpu' and clusterid = '{clusterid}' and model = '{params_kw.gpu_model}' then greatest(consumed + {reduce_gpu_consumed}, 0) else consumed end''' debug(f'库存减少sql-5: {reduce_sql_5}') await sor.sqlExe(reduce_sql_5, {}) await_resp = await async_post( url=url, headers=headers, data=params, timeout=500, verify=False ) status_code = await_resp.get('status_code', 505) if status_code != 200: raise return {'status': False, 'msg': f'算力中心服务操作失败{status_code}'} resp = await_resp.get("content") debug(f'{type(resp)=}->{resp=}') keys_to_remove = ['cluster_type', 'host', 'port', 'user', 'password', 'kubeconfig'] for key in keys_to_remove: if key in params: del params[key] params['id'] = yamlconfig_id params['cpcid'] = cpcid params['userid'] = userid if json.loads(resp).get("status") == True: params["update_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) debug(f"更新资源yaml配置元数据: {params=}") await sor.C('yaml_config', params) debug('⌛️ 资源实例创建成功,资源准备中...') time.sleep(10) return {'status': True, 'msg': '新增资源实例参数成功,请10秒后查看实时资源实例面板', 'data': params} else: raise res = json.loads(resp).get("info") return {'status': False, 'msg': f'新增资源实例参数失败:{res}'} return {'status': False, 'msg': '创建失败'} ret = await new_cpcpodyaml(params_kw) return ret