kboss/b/ws/cpcc_xterm.xterm
2025-07-16 14:27:17 +08:00

44 lines
1.6 KiB
Plaintext

async def cpcc_xterm(ns={}):
"""
wss://www.kaiyunacloud.cn/wss/ws/cpcc_xterm.xterm
:param ns:
:return:
"""
domain_url = ns.get('domain_url')
if not domain_url:
return {
'status': False,
'msg': '没有传递domain url'
}
if ('https://www.kaiyuancloud.cn/dev' in domain_url) or ('localhost' in domain_url):
base_url = 'https://www.kaiyuancloud.cn/dev'
elif 'https://www.kaiyuancloud.cn' in domain_url:
base_url = 'https://www.kaiyuancloud.cn'
else:
base_url = domain_url.split('://')[0] + '://' + domain_url.split('://')[1].split('/')[0]
xterm_url = base_url + '/wss/ws/cpcc_xterm.xterm'
db = DBPools()
async with db.sqlorContext('kboss') as sor:
resource_data_li = await sor.R('specificdata', {'sourceid': ns.get('resourceid')})
if not resource_data_li:
return {
'status': False,
'msg': '没有匹配的资源'
}
resource_data = json.loads(resource_data_li[0]['spec_data'])
cpcid = resource_data['cpcid']
clusterid = resource_data['clusterid']
external_ip_li = await sor.R('cpcnode', {'cpcid': cpcid,
'clusterid': clusterid, 'role': 'master'})
return {
"host": external_ip_li[0]['external_ip'],
"port": resource_data['source_outsideport'],
"username": resource_data['source_authuser'],
"password": resource_data['source_authpasswd'],
'xterm_url': xterm_url
}
ret = await cpcc_xterm(params_kw)
return ret