async def user_create_pod(ns={}): # ns = { # 'storage': 10, # 'cpu': 10, # 'memory': 10, # 'gpu': 10, # 'gpumem': 10, # 'image': 'ubuntu', # 'orgid': 'gKG8UaYPjA_29K7puzaTM', # } # 客户方需要提供用户密码,用basic_auth_headers函数生成验证用户所需 # 的http请求的headers,并在请求是放在headers参数中 headers = basic_auth_headers('kyycloud','Kyy@123456') hc = HttpClient() # https://pcapi.opencomputing.cn/api/v1/create_pod.dspy resp = await hc.request('https://pcapi.opencomputing.cn/api/v1/create_pod.dspy', method='GET', params=ns, headers=headers) resp = json.loads(resp) if isinstance(resp, str) else resp if resp['status']: db = DBPools() async with db.sqlorContext('kboss') as sor: k8s_dic = { "id": uuid(), "orgid": resp.get("orgid"), # TODO "podname": resp.get("podname"), "namespace": resp.get("namespace"), "pvcname": resp.get("pvcname"), "containername": resp.get("containername"), "image": resp.get("image"), "volumename": resp.get("volumename"), "storage": resp.get("storage"), "cpu": resp.get("cpu"), "memory": resp.get("memory"), "gpu": resp.get("gpu"), "gpumem": resp.get("gpumem"), "status": resp.get("status"), "productid": resp.get("productid"), "productname": resp.get("productname"), "customer_goods_id": resp.get("customer_goods_id"), "ip_region": resp.get("ip_region"), "port": resp.get("port"), "loginname": resp.get("loginname"), "loginpwd": resp.get("loginpwd"), } await sor.C('kyy8s_ins', k8s_dic) return { 'status': True, 'msg': '创建实例成功, %s' % str(resp) } else: return { 'status': False, 'msg': '创建实例失败, %s' % str(resp) } ret = await user_create_pod(params_kw) return ret