async def new_cpcworker(params_kw={}): # params_kw: # clusterid # worker_nodeid debug(f'{params_kw=}') # dbname = get_module_dbname('cpcc') dbname = 'kboss' db = DBPools() clusterid = params_kw.clusterid nodeid = params_kw.worker_nodeid #debug(f"=====> {clusterid=} {worker_nodeid=}") async with db.sqlorContext(dbname) as sor: # clusterid -> cpcid cpclusters = await sor.R('cpccluster', {'id':clusterid}) if len(cpclusters) < 1: e = Exception(f'cpclist {clusterid=} not exists') exception(f'{e}') raise e cpcluster = cpclusters[0] join_command = cpcluster.get("clusterjoin") cpcid = cpcluster.get("cpcid") cpcs = await sor.R('cpclist', {'id':cpcid}) if len(cpcs) < 1: e = Exception(f'cpclist {cpcid=} not exists') exception(f'{e}') raise e cpc = cpcs[0] debug(f'集群注册命令:{join_command=}') nodes = await sor.R('cpcnode', {'id':nodeid}) # 这里有问题:没有匹配到节点信息!!! if len(nodes) < 1: e = Exception(f'cpcnode {nodeid=} 节点基础信息不匹配') exception(f'{e}') raise e node = nodes[0] url = cpc.pcapi_url + "/pcapi/api/v1/cluster/common/new_worker" debug(f"请求url: {url=}") debug(f"目标IP认证信息: {node.ip=} {node.sshport=} {node.adminuser=} {node.adminpwd=}") # 请求方式待定,取决于获取参数值方式 headers = basic_auth_headers(cpc.api_user, cpc.api_pwd) hc = HttpClient() import requests params = { 'cluster_type': cluster_type, 'host':node.ip, 'port':node.sshport, 'user':node.adminuser, 'password':node.adminpwd, 'role':"worker", 'join_command':join_command } debug(f'{params=}') #resp = await hc.request(url, method='POST', # headers = headers, # data=params #) # 框架不支持超时时间 resp = requests.post(url, headers = headers, data=params, timeout=500, verify=False ) resp = json.dumps(resp.json()) #这里模拟hc.request返回的结果写后续逻辑 debug(f'{type(resp)=}->{resp=}') debug(f"pcapi返回值: {json.loads(resp)=}") data = json.loads(resp).get('data') if json.loads(resp).get("status") == True: node_ns = { 'id':nodeid, 'node_status':'1', 'clusterid':clusterid, 'cpcid':cpcid, 'role':'工作节点' } debug(f"更新新增工作节点元数据 {node_ns=}") await sor.U('cpcnode', node_ns) debug(f"###集群新增系统盘部件库存(目前采用工作节点上提供固定大小系统盘存储)") # 1.判断是否可加入集群,依据是一定要和已有的工作节点部件型号一致 # 2.选定工作节点disk中SYS和DATA子类型中较小的值作为集群系统盘的库存量,目前有矛盾,可能是较大的值 # 3.更新各部件库存数量 return {'status': True, 'msg': f'新增工作节点成功!', 'data': node_ns} else: return {'status': False, 'msg': '算力中心服务操作失败'} return {'status': False, 'msg': '其他错误'} ret = await new_cpcworker(params_kw) return ret