async def new_cpcworker(params_kw={}): # params_kw: # clusterid # worker_nodeid debug(f'{params_kw=}') dbname = 'kboss' db = DBPools() clusterid = params_kw.clusterid nodeid = params_kw.worker_nodeid # debug(f"=====> {clusterid=} {worker_nodeid=}") async with db.sqlorContext(dbname) as sor: # clusterid -> cpcid cpclusters = await sor.R('cpccluster', {'id': clusterid}) if len(cpclusters) < 1: e = Exception(f'cpclist {clusterid=} not exists') exception(f'{e}') raise e cpcluster = cpclusters[0] join_command = cpcluster.get("clusterjoin") cpcid = cpcluster.get("cpcid") cpcs = await sor.R('cpclist', {'id': cpcid}) if len(cpcs) < 1: e = Exception(f'cpclist {cpcid=} not exists') exception(f'{e}') raise e cpc = cpcs[0] debug(f'集群注册命令:{join_command=}') nodes = await sor.R('cpcnode', {'id': nodeid}) # 这里有问题:没有匹配到节点信息!!! if len(nodes) < 1: e = Exception(f'cpcnode {nodeid=} 节点基础信息不匹配') exception(f'{e}') raise e node = nodes[0] url = cpc.pcapi_url + "/pcapi/api/v1/cluster/common/new_worker" debug(f"请求url: {url=}") debug(f"目标IP认证信息: {node.ip=} {node.sshport=} {node.adminuser=} {node.adminpwd=}") # 请求方式待定,取决于获取参数值方式 headers = basic_auth_headers(cpc.api_user, cpc.api_pwd) hc = HttpClient() import requests params = { 'cluster_type': cluster_type, 'host': node.ip, 'port': node.sshport, 'user': node.adminuser, 'password': node.adminpwd, 'role': "worker", 'join_command': join_command } debug(f'{params=}') debug(f"###集群新增系统盘部件库存(目前采用工作节点上提供固定大小系统盘存储,其实是系统盘和数据盘中稍大的那个)") # ===================================== 系统盘库存 ===================================== disk_sys_stock_tuple = (uuid(), cpcid, 'disk', 'SYS', 'Gi', clusterid, node.datadisk_stock) debug(f'{disk_sys_stock_tuple=}') # 修改后的SQL语句,使用ON DUPLICATE KEY UPDATE来处理冲突 cluster_disk_sys_stock_sql = f'''INSERT INTO cpcwidget (id, cpcid, type, model, resource_unit, clusterid, stock) VALUES {disk_sys_stock_tuple} ON DUPLICATE KEY UPDATE stock = VALUES(stock)''' debug(f'{cluster_disk_sys_stock_sql=}') await sor.sqlExe(cluster_disk_sys_stock_sql, {}) # ===================================== 内存库存 ===================================== memory_stock_tuple = (uuid(), cpcid, 'memory', node.memory_model, 'Gi', clusterid, node.memory_stock) debug(f'{memory_stock_tuple=}') cluster_memory_stock_sql = f'''insert into cpcwidget (id, cpcid, type, model, resource_unit, clusterid, stock) values {memory_stock_tuple} ON DUPLICATE KEY UPDATE stock = VALUES(stock)''' debug(f'{cluster_memory_stock_sql=}') await sor.sqlExe(cluster_memory_stock_sql, {}) # ===================================== 显卡库存 ===================================== gpu_stock_tuple = (uuid(), cpcid, 'gpu', node.gpu_model, '卡', clusterid, node.gpu_stock) debug(f'{gpu_stock_tuple=}') cluster_gpu_stock_sql = f'''insert into cpcwidget (id, cpcid, type, model, resource_unit, clusterid, stock) values {gpu_stock_tuple} ON DUPLICATE KEY UPDATE stock = VALUES(stock)''' debug(f'{cluster_gpu_stock_sql=}') await sor.sqlExe(cluster_gpu_stock_sql, {}) # ===================================== CPU库存 ===================================== cpu_stock_tuple = (uuid(), cpcid, 'cpu', node.cpu_model, '核', clusterid, node.cpu_stock) debug(f'{cpu_stock_tuple=}') cluster_cpu_stock_sql = f'''insert into cpcwidget (id, cpcid, type, model, resource_unit, clusterid, stock) values {cpu_stock_tuple} ON DUPLICATE KEY UPDATE stock = VALUES(stock)''' debug(f'{cluster_cpu_stock_sql=}') await sor.sqlExe(cluster_cpu_stock_sql, {}) await_resp = await async_post( url=url, headers=headers, data=params, timeout=500, verify=False ) status_code = await_resp.get('status_code',505) if status_code != 200: raise return {'status': False, 'msg': f'算力中心服务操作失败{status_code}'} resp = await_resp.get("content") debug(f"pcapi返回值: {json.loads(resp)=}") data = json.loads(resp).get('data') if json.loads(resp).get("status") == True: node_ns = {'id': nodeid, 'node_status': '1', 'clusterid': clusterid, 'cpcid': cpcid, 'role': 'worker'} debug(f"更新新增工作节点元数据 {node_ns=}") await sor.U('cpcnode', node_ns) return {'status': True, 'msg': f'新增工作节点成功!', 'data': node_ns} else: raise return {'status': False, 'msg': '算力中心服务操作失败'} return {'status': False, 'msg': '新增失败'} ret = await new_cpcworker(params_kw) return ret