128 lines
5.4 KiB
Plaintext
128 lines
5.4 KiB
Plaintext
async def new_cpcworker(params_kw={}):
|
||
# params_kw:
|
||
# clusterid
|
||
# worker_nodeid
|
||
debug(f'{params_kw=}')
|
||
dbname = 'kboss'
|
||
db = DBPools()
|
||
clusterid = params_kw.clusterid
|
||
nodeid = params_kw.worker_nodeid
|
||
# debug(f"=====> {clusterid=} {worker_nodeid=}")
|
||
|
||
async with db.sqlorContext(dbname) as sor:
|
||
# clusterid -> cpcid
|
||
cpclusters = await sor.R('cpccluster', {'id': clusterid})
|
||
if len(cpclusters) < 1:
|
||
e = Exception(f'cpclist {clusterid=} not exists')
|
||
exception(f'{e}')
|
||
raise e
|
||
cpcluster = cpclusters[0]
|
||
join_command = cpcluster.get("clusterjoin")
|
||
cpcid = cpcluster.get("cpcid")
|
||
cpcs = await sor.R('cpclist', {'id': cpcid})
|
||
if len(cpcs) < 1:
|
||
e = Exception(f'cpclist {cpcid=} not exists')
|
||
exception(f'{e}')
|
||
raise e
|
||
cpc = cpcs[0]
|
||
debug(f'集群注册命令:{join_command=}')
|
||
nodes = await sor.R('cpcnode', {'id': nodeid})
|
||
# 这里有问题:没有匹配到节点信息!!!
|
||
if len(nodes) < 1:
|
||
e = Exception(f'cpcnode {nodeid=} 节点基础信息不匹配')
|
||
exception(f'{e}')
|
||
raise e
|
||
node = nodes[0]
|
||
url = cpc.pcapi_url + "/pcapi/api/v1/cluster/common/new_worker"
|
||
debug(f"请求url: {url=}")
|
||
debug(f"目标IP认证信息: {node.ip=} {node.sshport=} {node.adminuser=} {node.adminpwd=}")
|
||
# 请求方式待定,取决于获取参数值方式
|
||
|
||
headers = basic_auth_headers(cpc.api_user, cpc.api_pwd)
|
||
|
||
hc = HttpClient()
|
||
|
||
import requests
|
||
params = {
|
||
'cluster_type': cluster_type,
|
||
'host': node.ip,
|
||
'port': node.sshport,
|
||
'user': node.adminuser,
|
||
'password': node.adminpwd,
|
||
'role': "worker",
|
||
'join_command': join_command
|
||
}
|
||
debug(f'{params=}')
|
||
|
||
debug(f"###集群新增系统盘部件库存(目前采用工作节点上提供固定大小系统盘存储,其实是系统盘和数据盘中稍大的那个)")
|
||
|
||
# ===================================== 系统盘库存 =====================================
|
||
disk_sys_stock_tuple = (uuid(), cpcid, 'disk', 'SYS', 'Gi', clusterid, node.datadisk_stock)
|
||
debug(f'{disk_sys_stock_tuple=}')
|
||
|
||
# 修改后的SQL语句,使用ON DUPLICATE KEY UPDATE来处理冲突
|
||
cluster_disk_sys_stock_sql = f'''INSERT INTO cpcwidget (id, cpcid, type, model, resource_unit, clusterid, stock) VALUES {disk_sys_stock_tuple} ON DUPLICATE KEY UPDATE stock = VALUES(stock)'''
|
||
|
||
debug(f'{cluster_disk_sys_stock_sql=}')
|
||
await sor.sqlExe(cluster_disk_sys_stock_sql, {})
|
||
|
||
# ===================================== 内存库存 =====================================
|
||
memory_stock_tuple = (uuid(), cpcid, 'memory', node.memory_model, 'Gi', clusterid, node.memory_stock)
|
||
debug(f'{memory_stock_tuple=}')
|
||
|
||
cluster_memory_stock_sql = f'''insert into cpcwidget (id, cpcid, type, model, resource_unit, clusterid, stock) values {memory_stock_tuple} ON DUPLICATE KEY UPDATE stock = VALUES(stock)'''
|
||
|
||
debug(f'{cluster_memory_stock_sql=}')
|
||
await sor.sqlExe(cluster_memory_stock_sql, {})
|
||
|
||
# ===================================== 显卡库存 =====================================
|
||
gpu_stock_tuple = (uuid(), cpcid, 'gpu', node.gpu_model, '卡', clusterid, node.gpu_stock)
|
||
debug(f'{gpu_stock_tuple=}')
|
||
|
||
cluster_gpu_stock_sql = f'''insert into cpcwidget (id, cpcid, type, model, resource_unit, clusterid, stock) values {gpu_stock_tuple} ON DUPLICATE KEY UPDATE stock = VALUES(stock)'''
|
||
|
||
debug(f'{cluster_gpu_stock_sql=}')
|
||
await sor.sqlExe(cluster_gpu_stock_sql, {})
|
||
|
||
# ===================================== CPU库存 =====================================
|
||
cpu_stock_tuple = (uuid(), cpcid, 'cpu', node.cpu_model, '核', clusterid, node.cpu_stock)
|
||
debug(f'{cpu_stock_tuple=}')
|
||
|
||
cluster_cpu_stock_sql = f'''insert into cpcwidget (id, cpcid, type, model, resource_unit, clusterid, stock) values {cpu_stock_tuple} ON DUPLICATE KEY UPDATE stock = VALUES(stock)'''
|
||
|
||
debug(f'{cluster_cpu_stock_sql=}')
|
||
await sor.sqlExe(cluster_cpu_stock_sql, {})
|
||
|
||
await_resp = await async_post(
|
||
url=url,
|
||
headers=headers,
|
||
data=params,
|
||
timeout=500,
|
||
verify=False
|
||
)
|
||
status_code = await_resp.get('status_code',505)
|
||
if status_code != 200:
|
||
raise
|
||
return {'status': False, 'msg': f'算力中心服务操作失败{status_code}'}
|
||
resp = await_resp.get("content")
|
||
debug(f"pcapi返回值: {json.loads(resp)=}")
|
||
data = json.loads(resp).get('data')
|
||
if json.loads(resp).get("status") == True:
|
||
node_ns = {'id': nodeid,
|
||
'node_status': '1',
|
||
'clusterid': clusterid,
|
||
'cpcid': cpcid,
|
||
'role': 'worker'}
|
||
debug(f"更新新增工作节点元数据 {node_ns=}")
|
||
await sor.U('cpcnode', node_ns)
|
||
|
||
return {'status': True, 'msg': f'新增工作节点成功!', 'data': node_ns}
|
||
else:
|
||
raise
|
||
return {'status': False, 'msg': '算力中心服务操作失败'}
|
||
|
||
return {'status': False, 'msg': '新增失败'}
|
||
|
||
ret = await new_cpcworker(params_kw)
|
||
return ret
|