kboss/b/cpcc/cpccluster/new_cluster.dspy
2025-07-16 14:27:17 +08:00

127 lines
4.9 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

async def new_cluster(params_kw={}):
# params_kw:
# cpcid
# cluster_type
# ctl_nodeid
# cluster_name
debug(f'{params_kw=}')
dbname = 'kboss'
db = DBPools()
cpcid = params_kw.cpcid
nodeid = params_kw.ctl_nodeid
cluster_type = params_kw.cluster_type
cluster_name = params_kw.cluster_name
enable_date = params_kw.enable_date
export_date = params_kw.export_date
# debug(f"=====> {cluster_type=} {cluster_name=} {nodeid=} {cpcid=}")
async with db.sqlorContext(dbname) as sor:
cpcs = await sor.R('cpclist', {'id': cpcid})
if len(cpcs) < 1:
e = Exception(f'cpclist {cpcid=} not exists')
exception(f'{e}')
raise e
cpc = cpcs[0]
nodes = await sor.R('cpcnode', {'id': nodeid})
# 这里有问题:没有匹配到节点信息!!!
if len(nodes) < 1:
e = Exception(f'cpcnode {nodeid=} 节点基础信息不匹配')
exception(f'{e}')
raise e
node = nodes[0]
url = cpc.pcapi_url + "/pcapi/api/v1/cluster/common/new_cluster"
debug(f"请求url: {url=}")
debug(f"目标IP认证信息: {node.ip=} {node.sshport=} {node.adminuser=} {node.adminpwd=}")
# 请求方式待定,取决于获取参数值方式
pcapi_user = cpc.api_user
pcapi_passwd = cpc.api_pwd
debug(f'pacpi auth: {pcapi_user} + {pcapi_passwd}')
headers = basic_auth_headers(pcapi_user, pcapi_passwd)
hc = HttpClient()
import requests
params = {
'cluster_type': cluster_type,
'host': node.ip,
'port': node.sshport,
'user': node.adminuser,
'password': node.adminpwd,
'role': "master"
}
debug(f'{params=}')
clusterid = uuid()
debug(f"###集群新增数据盘部件库存(目前采用控制节点上提供NFS共享存储服务即容器数据盘)")
#已手动执行,代码不用做更改了
#alert_sql = f'''alter table cpcwidget add unique key unique_cpc_entry (cpcid, type, clusterid, model)'''
#debug(f'{alert_sql=}')
#await sor.sqlExe(alert_sql, {})
disk_data_stock_tuple = (uuid(), cpcid, 'disk', 'DATA', 'Gi', clusterid, node.datadisk_stock)
debug(f'{disk_data_stock_tuple=}')
cluster_disk_sys_stock_sql = f'''insert into cpcwidget (id, cpcid, type, model, resource_unit, clusterid, stock)
values {disk_data_stock_tuple} on duplicate key update stock = greatest(stock + {node.datadisk_stock}, 0)'''
debug(f'{cluster_disk_sys_stock_sql=}')
await sor.sqlExe(cluster_disk_sys_stock_sql, {})
await_resp = await async_post(
url=url,
headers=headers,
data=params,
timeout=600,
verify=False
)
status_code = await_resp.get('status_code',505)
if status_code != 200:
raise
return {'status': False, 'msg': f'算力中心服务操作失败{status_code}'}
resp = await_resp.get("content")
todatas = json.loads(resp).get('data')
debug(f"todatas: {todatas=}")
if not todatas:
return {'status': False, 'msg': f'安装失败,算力中心返回{todatas}'}
todatas = todatas.split("###")
clusterjoin = todatas[0]
kubeconfig_context = todatas[1]
# 目前以集群上下文的实体长度来判断集群是否安装正常
if len(kubeconfig_context) < 2000:
raise "集群上下文异常"
if json.loads(resp).get("status") == True:
# new cluster info write to database
# update node_status to '1' cpcnode record identify by ctl_nodeid ctl_node
ns = {
'id': clusterid,
'clustertype': cluster_type,
'cpcid': cpcid,
'controllerid': nodeid,
'name': cluster_name,
'enable_date': enable_date,
'export_date': export_date,
'clusterjoin': clusterjoin,
'kubeconfig': kubeconfig_context,
'ready': 1
}
debug(f"新创建的集群元数据: {ns=}")
await sor.C('cpccluster', ns)
node_ns = {
'id': nodeid,
'node_status': '1',
'clusterid': clusterid,
'cpcid': cpcid,
'role': 'master'
}
debug(f"更新控制节点元数据 {node_ns=}")
await sor.U('cpcnode', node_ns)
return {'status': True,'msg': f'操作成功! 加入集群凭证: {clusterjoin}','data': node_ns}
else:
raise
return {'status': False, 'msg': '算力中心服务操作失败'}
return {'status': False, 'msg': '创建失败'}
ret = await new_cluster(params_kw)
return ret