kboss/b/k8cloud/user_create_pod.dspy
2025-07-16 14:27:17 +08:00

60 lines
2.3 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

async def user_create_pod(ns={}):
# ns = {
# 'storage': 10,
# 'cpu': 10,
# 'memory': 10,
# 'gpu': 10,
# 'gpumem': 10,
# 'image': 'ubuntu',
# 'orgid': 'gKG8UaYPjA_29K7puzaTM',
# }
# 客户方需要提供用户密码用basic_auth_headers函数生成验证用户所需
# 的http请求的headers并在请求是放在headers参数中
headers = basic_auth_headers('kyycloud','Kyy@123456')
hc = HttpClient()
# https://pcapi.opencomputing.cn/api/v1/create_pod.dspy
resp = await hc.request('https://pcapi.opencomputing.cn/api/v1/create_pod.dspy',
method='GET',
params=ns,
headers=headers)
resp = json.loads(resp) if isinstance(resp, str) else resp
if resp['status']:
db = DBPools()
async with db.sqlorContext('kboss') as sor:
k8s_dic = {
"id": uuid(),
"orgid": resp.get("orgid"), # TODO
"podname": resp.get("podname"),
"namespace": resp.get("namespace"),
"pvcname": resp.get("pvcname"),
"containername": resp.get("containername"),
"image": resp.get("image"),
"volumename": resp.get("volumename"),
"storage": resp.get("storage"),
"cpu": resp.get("cpu"),
"memory": resp.get("memory"),
"gpu": resp.get("gpu"),
"gpumem": resp.get("gpumem"),
"status": resp.get("status"),
"productid": resp.get("productid"),
"productname": resp.get("productname"),
"customer_goods_id": resp.get("customer_goods_id"),
"ip_region": resp.get("ip_region"),
"port": resp.get("port"),
"loginname": resp.get("loginname"),
"loginpwd": resp.get("loginpwd"),
}
await sor.C('kyy8s_ins', k8s_dic)
return {
'status': True,
'msg': '创建实例成功, %s' % str(resp)
}
else:
return {
'status': False,
'msg': '创建实例失败, %s' % str(resp)
}
ret = await user_create_pod(params_kw)
return ret