async def describe_zone_instance_type(ns={}): """ 可以获取销售状态和处理器型号 :param ns: :return: """ """ :param ns: :return: """ import aiohttp NETWORK_URL = 'http://api.capitalonline.net/ecs/v1' action = 'DescribeZoneInstanceType' method = "GET" param = { "AvailableZoneCode": ns["AvailableZoneCode"], "EcsFamilyName": ns.get("EcsFamilyName"), # "Cpu": ns.get("Cpu"), # "Ram": ns.get("Ram"), # "Gpu": ns.get("Gpu"), } url = get_signature(action, method, NETWORK_URL, param=param) async with aiohttp.ClientSession() as session: async with session.request(method, url) as response: resp = await response.text() result = json.loads(resp) # print(result) if result['Code'] == 'Success': result['status'] = True result.pop('Code') result['data'] = result.pop('Data') if result.get('Message'): result['msg'] = result.pop('Message') else: result['msg'] = result.pop('Msg') else: result['status'] = False if result.get('Message'): result['msg'] = result.pop('Message') else: result['msg'] = result.pop('Msg') return result async def ecs_family_info(ns={}): """ 获取计算类型族 """ import aiohttp AvailableZoneCode = ns.get('AvailableZoneCode') ecs_url = 'http://api.capitalonline.net/ecs/v1' action = "DescribeEcsFamilyInfo" method = "GET" param = { "AvailableZoneCode": AvailableZoneCode, "BillingMethod": "1" } url = get_signature(action, method, ecs_url, param) async with aiohttp.ClientSession() as session: async with session.request(method, url) as response: resp = await response.text() result = json.loads(resp) if result['Code'] == 'Success': result['status'] = True result.pop('Code') result['data'] = result.pop('Data') for ecs_info in result['data']['EcsFamilyInfo']: ns['EcsFamilyName'] = ecs_info['EcsFamilyName'] res = await describe_zone_instance_type(ns) # print(res) if res.get('status'): list1 = res['data'][0]['SpecList'] # 如果不是庆阳A改为售罄 if ns['AvailableZoneCode'] != 'CN_Qingyang_A': for j in list1: j['Status'] = 'SELLOUT' else: # 如果是庆阳A但是不是对应配置 改为售罄 for j in list1: if j['Cpu'] != 16 or j['Ram'] != 64 or res['data'][0]['EcsFamilyName'] != '推理型智算云主机igch.c8.nr4': j['Status'] = 'SELLOUT' list2 = ecs_info['SpecList'] merged_list = [] for item1 in list1: for item2 in list2: if item1['Gpu'] == item2['Gpu'] and item1['Cpu'] == item2['Cpu'] and item1['Ram'] == \ item2['Ram']: merged_item = {**item1, **item2} merged_list.append(merged_item) break if merged_list: ecs_info['SpecList'] = merged_list result['msg'] = result.pop('Msg') else: result['status'] = False result['msg'] = result.pop('Msg') return result ret = await ecs_family_info(params_kw) return ret