kboss/b/k8server/create_pod.dspy
2025-07-16 14:27:17 +08:00

282 lines
10 KiB
Plaintext

async def check_resources(cpu, memory, storage):
"""Check if the resources are available in the Kubernetes cluster."""
api_instance = client.CoreV1Api()
nodes = api_instance.list_node() # Get all nodes in the cluster
for node in nodes.items:
# Get node capacity
node_cpu = int(node.status.capacity["cpu"])
node_memory = int(node.status.capacity["memory"].replace('Ki', '')) / (1024 ** 2) # Convert Ki to Mi
node_storage = int(node.status.capacity["ephemeral-storage"].replace('Ki', '')) / (
1024 ** 2) # Convert Ki to Mi
# Compare with requested resources
requested_cpu = int(cpu)
requested_memory = int(memory[:-2]) # Remove 'Gi' and convert to int
requested_storage = int(storage[:-2]) # Remove 'Gi' and convert to int
if node_cpu >= requested_cpu and node_memory >= requested_memory and node_storage >= requested_storage:
return True, node # Enough resources on this node, return the node
return False, None # Resources not enough on any node
async def check_nodeport_available(node_port):
"""Check if the specified NodePort is available by attempting to create a temporary service."""
try:
api_instance = client.CoreV1Api()
# Create a temporary service with the specified nodePort
service = client.V1Service(
metadata=client.V1ObjectMeta(name=f"temp-service-{node_port}"),
spec=client.V1ServiceSpec(
type="NodePort",
ports=[client.V1ServicePort(
protocol="TCP",
port=80,
target_port=80,
node_port=node_port
)]
)
)
# Try to create the service with the specified nodePort
api_instance.create_namespaced_service(namespace="default", body=service)
# If successful, delete the service and return True (port available)
api_instance.delete_namespaced_service(name=f"temp-service-{node_port}", namespace="default")
return True
except client.exceptions.ApiException as e:
# If the error code is 409, it means the port is already in use
if e.status == 409:
return False
raise e # Rethrow other exceptions
async def get_pod_events(pod_name, namespace="default"):
"""Fetch the events related to the Pod creation."""
api_instance = client.CoreV1Api()
events = api_instance.list_namespaced_event(namespace=namespace, field_selector=f"involvedObject.name={pod_name}")
event_logs = []
for event in events.items:
# Collect event type and message
event_logs.append({
"type": event.type,
"reason": event.reason,
"message": event.message,
"timestamp": event.last_timestamp
})
return event_logs
async def create_pod(ns={}):
import time
import random
import string
import hashlib
all_letters = "abcdefghijklmnopqrtuvwxyz"
ns['pvcname'] = 'container-' + random.choice(all_letters) + hashlib.md5(str(time.time()).encode()).hexdigest()[:10]
ns['podname'] = ns['pvcname']
ns['containername'] = ns['pvcname']
ns['volumename'] = ns['pvcname']
ns['namespace'] = ns['namespace'] if ns.get('namespace') else 'default'
namespace = ns['namespace'] # 使用的命名空间
# Load kubeconfig
# Check if there are enough resources available
resources_available, node = await check_resources(ns['cpu'], str(ns['memory']) + 'Gi', str(ns['storage']) + 'Gi')
if not resources_available:
print("资源不足,无法创建实例")
return {
'status': False,
'msg': "资源不足,无法创建实例"
}
# Define pod name and generate random password
# pod_name = f"ssh-pod-{generate_random_string()}"
characters = string.ascii_letters + string.digits + "##&&**@^"
root_password = ''.join(random.choices(characters, k=8))
# Find an available NodePort
node_port = 0
for i in range(300):
node_port = random.randint(30000, 32767)
port_res = await check_nodeport_available(node_port)
if port_res:
break
if not node_port:
return {
'status': False,
'msg': 'can not find available port'
}
# Create a container with SSH service
if ns.get('gpu') and ns.get('gpumem'):
requests_content = {
"cpu": str(ns['cpu']),
"memory": str(ns['memory']) + "Gi",
}
limits = {
"cpu": str(ns['cpu']),
"memory": str(ns['memory']) + "Gi",
"nvidia.com/gpu": ns['gpu'],
"nvidia.com/gpumem": ns['gpumem']
}
else:
requests_content = {
"cpu": str(ns['cpu']),
"memory": str(ns['memory']) + "Gi",
}
limits = {
"cpu": str(ns['cpu']), # 限制最多使用 4 个 CPU
"memory": str(ns['memory']) + "Gi" # 限制最多使用 8 GB 内存
}
container = client.V1Container(
name=ns['containername'],
image=ns['image'],
command=["/bin/bash", "-c",
f"apt-get update && apt-get install -y vim && apt-get install -y openssh-server sudo && "
f"echo 'root:{root_password}' | chpasswd && "
"sed -i 's/^#*PermitRootLogin.*/PermitRootLogin yes/' /etc/ssh/sshd_config && "
#"echo 'root ALL=(ALL) NOPASSWD:ALL' > /etc/sudoers.d/root && "
"service ssh start && tail -f /dev/null"],
ports=[client.V1ContainerPort(container_port=22)],
resources=client.V1ResourceRequirements(
requests=requests_content,
limits=limits
),
security_context=client.V1SecurityContext(privileged=False)
)
# 创建 PVC
#create_persistent_volume_claim(api_instance, namespace)
pvc = client.V1PersistentVolumeClaim(
metadata=client.V1ObjectMeta(name=ns['pvcname']),
spec=client.V1PersistentVolumeClaimSpec(
access_modes=["ReadWriteOnce"],
resources=client.V1ResourceRequirements(
requests={"storage": str(ns['storage']) + "Gi"}
)
)
)
# Create the pod specification
pod_spec = client.V1PodSpec(
containers=[container],
volumes=[
client.V1Volume(
name=ns['volumename'],
persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
claim_name=ns['pvcname']
),
)
],
)
# Create the pod
pod = client.V1Pod(
metadata=client.V1ObjectMeta(name=ns['podname'], labels={'app': ns['podname']}),
spec=pod_spec
)
# Create Service (NodePort) to expose pod
service = client.V1Service(
metadata=client.V1ObjectMeta(name=ns['podname'], labels={'app': ns['podname']}),
spec=client.V1ServiceSpec(
type="NodePort",
selector={"app": ns['podname']},
ports=[client.V1ServicePort(
protocol="TCP",
port=22,
target_port=22,
node_port=node_port
)]
)
)
try:
api_instance = client.CoreV1Api()
# Attempt to create Pvc
try:
api_instance.create_namespaced_persistent_volume_claim(namespace=namespace, body=pvc)
except client.exceptions.ApiException as e:
print(f"Error creating Pvc: {e}")
return {
'status': False,
'msg': '错误代码004, 无法创建实例'
}
# Attempt to create Pod
try:
api_instance.create_namespaced_pod(namespace=namespace, body=pod)
except client.exceptions.ApiException as e:
print(f"Error creating Pod: {e}")
return {
'status': False,
'msg': '错误代码005, 无法创建实例'
}
# Attempt to create Service
try:
api_instance.create_namespaced_service(namespace=namespace, body=service)
except client.exceptions.ApiException as e:
print(f"Error creating Service: {e}")
return {
'status': False,
'msg': '错误代码006, 无法创建实例'
}
# Wait for the Pod to be in Running state
timeout = 10 # 10 minutes timeout for waiting
elapsed_time = 0
pod_status = None
while elapsed_time < timeout:
try:
pod_status = api_instance.read_namespaced_pod_status(ns['podname'], namespace)
if pod_status.status.phase == "Running":
break
except client.exceptions.ApiException as e:
print(f"Error fetching Pod status: {e}")
return {
'status': False,
'msg': '错误代码007, 无法创建实例'
}
time.sleep(5)
elapsed_time += 5
# Fetch Pod events (logs)
events = await get_pod_events(ns['podname'])
if events:
print("Pod Events (Logs):")
for event in events:
msg = f"Type: {event['type']}, Reason: {event['reason']}, Message: {event['message']}, Timestamp: {event['timestamp']}"
if 'FailedScheduling' in event['reason']:
return {
'status': False,
'msg': msg
}
# Return the results if the pod is running
ns['ip_region'] = 'pd4e.com'
ns['port'] = service.spec.ports[0].node_port
ns['servicename'] = service.metadata.name
ns['loginname'] = 'root'
ns['loginpwd'] = root_password
ns['status'] = True
ns['msg'] = '创建实例成功'
return ns
except Exception as e:
print(f"Error creating resources: {e}")
return {
'status': False,
'msg': '错误代码008, 无法创建实例'
}
ret = await create_pod(params_kw)
return ret