async def check_resources(cpu, memory, storage): """Check if the resources are available in the Kubernetes cluster.""" api_instance = client.CoreV1Api() nodes = api_instance.list_node() # Get all nodes in the cluster for node in nodes.items: # Get node capacity node_cpu = int(node.status.capacity["cpu"]) node_memory = int(node.status.capacity["memory"].replace('Ki', '')) / (1024 ** 2) # Convert Ki to Mi node_storage = int(node.status.capacity["ephemeral-storage"].replace('Ki', '')) / ( 1024 ** 2) # Convert Ki to Mi # Compare with requested resources requested_cpu = int(cpu) requested_memory = int(memory[:-2]) # Remove 'Gi' and convert to int requested_storage = int(storage[:-2]) # Remove 'Gi' and convert to int if node_cpu >= requested_cpu and node_memory >= requested_memory and node_storage >= requested_storage: return True, node # Enough resources on this node, return the node return False, None # Resources not enough on any node async def check_nodeport_available(node_port): """Check if the specified NodePort is available by attempting to create a temporary service.""" try: api_instance = client.CoreV1Api() # Create a temporary service with the specified nodePort service = client.V1Service( metadata=client.V1ObjectMeta(name=f"temp-service-{node_port}"), spec=client.V1ServiceSpec( type="NodePort", ports=[client.V1ServicePort( protocol="TCP", port=80, target_port=80, node_port=node_port )] ) ) # Try to create the service with the specified nodePort api_instance.create_namespaced_service(namespace="default", body=service) # If successful, delete the service and return True (port available) api_instance.delete_namespaced_service(name=f"temp-service-{node_port}", namespace="default") return True except client.exceptions.ApiException as e: # If the error code is 409, it means the port is already in use if e.status == 409: return False raise e # Rethrow other exceptions async def get_pod_events(pod_name, namespace="default"): """Fetch the events related to the Pod creation.""" api_instance = client.CoreV1Api() events = api_instance.list_namespaced_event(namespace=namespace, field_selector=f"involvedObject.name={pod_name}") event_logs = [] for event in events.items: # Collect event type and message event_logs.append({ "type": event.type, "reason": event.reason, "message": event.message, "timestamp": event.last_timestamp }) return event_logs async def create_pod(ns={}): import time import random import string import hashlib all_letters = "abcdefghijklmnopqrtuvwxyz" ns['pvcname'] = 'container-' + random.choice(all_letters) + hashlib.md5(str(time.time()).encode()).hexdigest()[:10] ns['podname'] = ns['pvcname'] ns['containername'] = ns['pvcname'] ns['volumename'] = ns['pvcname'] ns['namespace'] = ns['namespace'] if ns.get('namespace') else 'default' namespace = ns['namespace'] # 使用的命名空间 # Load kubeconfig # Check if there are enough resources available resources_available, node = await check_resources(ns['cpu'], str(ns['memory']) + 'Gi', str(ns['storage']) + 'Gi') if not resources_available: print("资源不足,无法创建实例") return { 'status': False, 'msg': "资源不足,无法创建实例" } # Define pod name and generate random password # pod_name = f"ssh-pod-{generate_random_string()}" characters = string.ascii_letters + string.digits + "##&&**@^" root_password = ''.join(random.choices(characters, k=8)) # Find an available NodePort node_port = 0 for i in range(300): node_port = random.randint(30000, 32767) port_res = await check_nodeport_available(node_port) if port_res: break if not node_port: return { 'status': False, 'msg': 'can not find available port' } # Create a container with SSH service if ns.get('gpu') and ns.get('gpumem'): requests_content = { "cpu": str(ns['cpu']), "memory": str(ns['memory']) + "Gi", } limits = { "cpu": str(ns['cpu']), "memory": str(ns['memory']) + "Gi", "nvidia.com/gpu": ns['gpu'], "nvidia.com/gpumem": ns['gpumem'] } else: requests_content = { "cpu": str(ns['cpu']), "memory": str(ns['memory']) + "Gi", } limits = { "cpu": str(ns['cpu']), # 限制最多使用 4 个 CPU "memory": str(ns['memory']) + "Gi" # 限制最多使用 8 GB 内存 } container = client.V1Container( name=ns['containername'], image=ns['image'], command=["/bin/bash", "-c", f"apt-get update && apt-get install -y vim && apt-get install -y openssh-server sudo && " f"echo 'root:{root_password}' | chpasswd && " "sed -i 's/^#*PermitRootLogin.*/PermitRootLogin yes/' /etc/ssh/sshd_config && " #"echo 'root ALL=(ALL) NOPASSWD:ALL' > /etc/sudoers.d/root && " "service ssh start && tail -f /dev/null"], ports=[client.V1ContainerPort(container_port=22)], resources=client.V1ResourceRequirements( requests=requests_content, limits=limits ), security_context=client.V1SecurityContext(privileged=False) ) # 创建 PVC #create_persistent_volume_claim(api_instance, namespace) pvc = client.V1PersistentVolumeClaim( metadata=client.V1ObjectMeta(name=ns['pvcname']), spec=client.V1PersistentVolumeClaimSpec( access_modes=["ReadWriteOnce"], resources=client.V1ResourceRequirements( requests={"storage": str(ns['storage']) + "Gi"} ) ) ) # Create the pod specification pod_spec = client.V1PodSpec( containers=[container], volumes=[ client.V1Volume( name=ns['volumename'], persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( claim_name=ns['pvcname'] ), ) ], ) # Create the pod pod = client.V1Pod( metadata=client.V1ObjectMeta(name=ns['podname'], labels={'app': ns['podname']}), spec=pod_spec ) # Create Service (NodePort) to expose pod service = client.V1Service( metadata=client.V1ObjectMeta(name=ns['podname'], labels={'app': ns['podname']}), spec=client.V1ServiceSpec( type="NodePort", selector={"app": ns['podname']}, ports=[client.V1ServicePort( protocol="TCP", port=22, target_port=22, node_port=node_port )] ) ) try: api_instance = client.CoreV1Api() # Attempt to create Pvc try: api_instance.create_namespaced_persistent_volume_claim(namespace=namespace, body=pvc) except client.exceptions.ApiException as e: print(f"Error creating Pvc: {e}") return { 'status': False, 'msg': '错误代码004, 无法创建实例' } # Attempt to create Pod try: api_instance.create_namespaced_pod(namespace=namespace, body=pod) except client.exceptions.ApiException as e: print(f"Error creating Pod: {e}") return { 'status': False, 'msg': '错误代码005, 无法创建实例' } # Attempt to create Service try: api_instance.create_namespaced_service(namespace=namespace, body=service) except client.exceptions.ApiException as e: print(f"Error creating Service: {e}") return { 'status': False, 'msg': '错误代码006, 无法创建实例' } # Wait for the Pod to be in Running state timeout = 10 # 10 minutes timeout for waiting elapsed_time = 0 pod_status = None while elapsed_time < timeout: try: pod_status = api_instance.read_namespaced_pod_status(ns['podname'], namespace) if pod_status.status.phase == "Running": break except client.exceptions.ApiException as e: print(f"Error fetching Pod status: {e}") return { 'status': False, 'msg': '错误代码007, 无法创建实例' } time.sleep(5) elapsed_time += 5 # Fetch Pod events (logs) events = await get_pod_events(ns['podname']) if events: print("Pod Events (Logs):") for event in events: msg = f"Type: {event['type']}, Reason: {event['reason']}, Message: {event['message']}, Timestamp: {event['timestamp']}" if 'FailedScheduling' in event['reason']: return { 'status': False, 'msg': msg } # Return the results if the pod is running ns['ip_region'] = 'pd4e.com' ns['port'] = service.spec.ports[0].node_port ns['servicename'] = service.metadata.name ns['loginname'] = 'root' ns['loginpwd'] = root_password ns['status'] = True ns['msg'] = '创建实例成功' return ns except Exception as e: print(f"Error creating resources: {e}") return { 'status': False, 'msg': '错误代码008, 无法创建实例' } ret = await create_pod(params_kw) return ret