kboss/b/capital/ecs_family_info.dspy
2025-07-16 14:27:17 +08:00

100 lines
3.9 KiB
Plaintext

async def describe_zone_instance_type(ns={}):
"""
可以获取销售状态和处理器型号
:param ns:
:return:
"""
"""
:param ns:
:return:
"""
import aiohttp
NETWORK_URL = 'http://api.capitalonline.net/ecs/v1'
action = 'DescribeZoneInstanceType'
method = "GET"
param = {
"AvailableZoneCode": ns["AvailableZoneCode"],
"EcsFamilyName": ns.get("EcsFamilyName"),
# "Cpu": ns.get("Cpu"),
# "Ram": ns.get("Ram"),
# "Gpu": ns.get("Gpu"),
}
url = get_signature(action, method, NETWORK_URL, param=param)
async with aiohttp.ClientSession() as session:
async with session.request(method, url) as response:
resp = await response.text()
result = json.loads(resp)
# print(result)
if result['Code'] == 'Success':
result['status'] = True
result.pop('Code')
result['data'] = result.pop('Data')
if result.get('Message'):
result['msg'] = result.pop('Message')
else:
result['msg'] = result.pop('Msg')
else:
result['status'] = False
if result.get('Message'):
result['msg'] = result.pop('Message')
else:
result['msg'] = result.pop('Msg')
return result
async def ecs_family_info(ns={}):
"""
获取计算类型族
"""
import aiohttp
AvailableZoneCode = ns.get('AvailableZoneCode')
ecs_url = 'http://api.capitalonline.net/ecs/v1'
action = "DescribeEcsFamilyInfo"
method = "GET"
param = {
"AvailableZoneCode": AvailableZoneCode,
"BillingMethod": "1"
}
url = get_signature(action, method, ecs_url, param)
async with aiohttp.ClientSession() as session:
async with session.request(method, url) as response:
resp = await response.text()
result = json.loads(resp)
if result['Code'] == 'Success':
result['status'] = True
result.pop('Code')
result['data'] = result.pop('Data')
for ecs_info in result['data']['EcsFamilyInfo']:
ns['EcsFamilyName'] = ecs_info['EcsFamilyName']
res = await describe_zone_instance_type(ns)
# print(res)
if res.get('status'):
list1 = res['data'][0]['SpecList']
# 如果不是庆阳A改为售罄
if ns['AvailableZoneCode'] != 'CN_Qingyang_A':
for j in list1:
j['Status'] = 'SELLOUT'
else:
# 如果是庆阳A但是不是对应配置 改为售罄
for j in list1:
if j['Cpu'] != 16 or j['Ram'] != 64 or res['data'][0]['EcsFamilyName'] != '推理型智算云主机igch.c8.nr4':
j['Status'] = 'SELLOUT'
list2 = ecs_info['SpecList']
merged_list = []
for item1 in list1:
for item2 in list2:
if item1['Gpu'] == item2['Gpu'] and item1['Cpu'] == item2['Cpu'] and item1['Ram'] == \
item2['Ram']:
merged_item = {**item1, **item2}
merged_list.append(merged_item)
break
if merged_list:
ecs_info['SpecList'] = merged_list
result['msg'] = result.pop('Msg')
else:
result['status'] = False
result['msg'] = result.pop('Msg')
return result
ret = await ecs_family_info(params_kw)
return ret