kboss/b/cpcc/cpcworker/add_sharedstorage2.dspy
2025-07-16 14:27:17 +08:00

94 lines
3.1 KiB
Plaintext

async def new_cpcworker(params_kw={}):
# params_kw:
# clusterid
# worker_nodeid
debug(f'{params_kw=}')
# dbname = get_module_dbname('cpcc')
dbname = 'kboss'
db = DBPools()
clusterid = params_kw.clusterid
nodeid = params_kw.worker_nodeid
#debug(f"=====> {clusterid=} {worker_nodeid=}")
async with db.sqlorContext(dbname) as sor:
# clusterid -> cpcid
cpclusters = await sor.R('cpccluster', {'id':clusterid})
if len(cpclusters) < 1:
e = Exception(f'cpclist {clusterid=} not exists')
exception(f'{e}')
raise e
cpcluster = cpclusters[0]
join_command = cpcluster.get("clusterjoin")
cpcid = cpcluster.get("cpcid")
cpcs = await sor.R('cpclist', {'id':cpcid})
if len(cpcs) < 1:
e = Exception(f'cpclist {cpcid=} not exists')
exception(f'{e}')
raise e
cpc = cpcs[0]
debug(f'集群注册命令:{join_command=}')
nodes = await sor.R('cpcnode', {'id':nodeid})
# 这里有问题:没有匹配到节点信息!!!
if len(nodes) < 1:
e = Exception(f'cpcnode {nodeid=} 节点基础信息不匹配')
exception(f'{e}')
raise e
node = nodes[0]
url = cpc.pcapi_url + "/pcapi/api/v1/cluster/common/new_worker"
debug(f"请求url: {url=}")
debug(f"目标IP认证信息: {node.ip=} {node.sshport=} {node.adminuser=} {node.adminpwd=}")
# 请求方式待定,取决于获取参数值方式
headers = basic_auth_headers(cpc.api_user, cpc.api_pwd)
hc = HttpClient()
import requests
params = {
'cluster_type': cluster_type,
'host':node.ip,
'port':node.sshport,
'user':node.adminuser,
'password':node.adminpwd,
'role':"worker",
'join_command':join_command
}
debug(f'{params=}')
#resp = await hc.request(url, method='POST',
# headers = headers,
# data=params
#)
# 框架不支持超时时间
resp = requests.post(url,
headers = headers,
data=params,
timeout=500,
verify=False
)
resp = json.dumps(resp.json()) #这里模拟hc.request返回的结果写后续逻辑
debug(f'{type(resp)=}->{resp=}')
debug(f"pcapi返回值: {json.loads(resp)=}")
data = json.loads(resp).get('data')
if json.loads(resp).get("status") == True:
node_ns = {
'id':nodeid,
'node_status':'1',
'clusterid':clusterid,
'cpcid':cpcid,
'role':'工作节点'
}
debug(f"更新新增工作节点元数据 {node_ns=}")
await sor.U('cpcnode', node_ns)
debug(f"###集群新增系统盘部件库存(目前采用工作节点上提供固定大小系统盘存储)")
# 1.判断是否可加入集群,依据是一定要和已有的工作节点部件型号一致
# 2.选定工作节点disk中SYS和DATA子类型中较小的值作为集群系统盘的库存量,目前有矛盾,可能是较大的值
# 3.更新各部件库存数量
return {'status': True, 'msg': f'新增工作节点成功!', 'data': node_ns}
else:
return {'status': False, 'msg': '算力中心服务操作失败'}
return {'status': False, 'msg': '其他错误'}
ret = await new_cpcworker(params_kw)
return ret