kboss/b/capital/describe_vpc.dspy
2025-07-16 14:27:17 +08:00

89 lines
3.4 KiB
Plaintext

async def create_vpc(nss={}):
ns = {
'orgid': nss['orgid'],
"RegionCode": nss['RegionCode'],
"VPCName": nss['Keyword'],
"VPCSegment": "10.0.0.0/16",
"VPCSegmentType": "manual",
"BandwidthType": "Bandwidth_China_Telecom",
"AvailableZoneCode": nss['AvailableZoneCode'],
"SubnetName": "DefaultNet",
"SubnetSegment": "10.0.0.0/24"
}
import aiohttp
NETWORK_URL = 'http://cdsapi.capitalonline.net/vpc'
action = 'CreateVPC'
method = "POST"
param = {}
url = get_signature(action, method, NETWORK_URL, param=param)
body = {
"RegionCode": ns.get('RegionCode'),
"VPCName": ns.get('VPCName'),
"VPCSegment": ns.get('VPCSegment'),
"VPCSegmentType": "manual",
"BandwidthType": ns.get('BandwidthType'),
"SubnetList": [{
"AvailableZoneCode": ns.get('AvailableZoneCode'),
"SubnetName": ns.get('SubnetName'),
"SubnetSegment": ns.get('SubnetSegment')
}
]
}
async with aiohttp.ClientSession() as session:
async with session.request(method, url, json=body) as response:
resp = await response.text()
print(resp)
result = json.loads(resp)
if result['Code'] == 'Success':
result['status'] = True
result.pop('Code')
result['data'] = result.pop('Data')
result['msg'] = result.pop('Message')
else:
result['status'] = False
result['msg'] = result.pop('Message')
return result
async def describe_vpc(ns={}):
import aiohttp
NETWORK_URL = 'http://cdsapi.capitalonline.net/vpc'
action = 'DescribeVPC'
method = "POST"
param = {}
url = get_signature(action, method, NETWORK_URL, param=param)
body = {
"RegionCode": ns.get('RegionCode'),
'Keyword': ns.get('orgid') + ns.get('RegionCode') + '0_____0Default'
}
async with aiohttp.ClientSession() as session:
async with session.request(method, url, json=body) as response:
resp = await response.text()
result = json.loads(resp)
if result['Code'] == 'Success':
result['status'] = True
result.pop('Code')
result['data'] = result.pop('Data')
if result['data']['Total'] > 0:
result['data']['VPCList'][0]['VPCNmae'] = result['data']['VPCList'][0]['VPCNmae'][-7:]
# 如果进入到了VPC详情页 此时还没有创建Default
if result['data']['Total'] == 0 and ns.get('flag') == 'inner':
ns['Keyword'] = body['Keyword']
create_vpc_res = await create_vpc(ns)
if create_vpc_res.get('status'):
ns.pop('flag')
return await describe_vpc(ns)
else:
return {
'status': False,
'msg': '初始化VPC延迟, 请稍后重试!%s' % create_vpc_res.get('msg')
}
result['msg'] = result.pop('Message')
else:
result['status'] = False
result['msg'] = result.pop('Message')
return result
ret = await describe_vpc(params_kw)
return ret