kboss/b/ucloud/describe_firewall.dspy
2025-07-16 14:27:17 +08:00

47 lines
1.9 KiB
Plaintext

async def describe_firewall(ns={}):
db = DBPools()
async with db.sqlorContext('kboss') as sor:
ucloud_user_li = await sor.R('ucloud_users', {'orgid': 'main_user', 'del_flg': '0'})
if ucloud_user_li:
uc_client = U_Client({
"project_id": ucloud_user_li[0]['projectid'],
"public_key": ucloud_user_li[0]['accesskey'],
"private_key": ucloud_user_li[0]['accesskeysecret'],
})
else:
return {
'status': False,
'msg': 'can not find u cloud user'
}
try:
resp = uc_client.unet().describe_firewall({
'Region': ns.get('region'),
'ResourceType': ns.get('ResourceType'),
'ResourceId': ns.get('ResourceId'),
'Limit': int(ns.get('limit')) if ns.get('limit') else 50,
'Offset': int(ns.get('offset')) if ns.get('offset') else 0
})
if resp['TotalCount'] > 0:
new_firewall_li = []
for firewall in resp['DataSet']:
if firewall['GroupId'] in ['98696', '98697']:
new_firewall_li.append(firewall)
if ns.get('orgid') in firewall['Name']:
firewall['Name'] = firewall['Name'].split('0_____0')[-1]
new_firewall_li.append(firewall)
resp['DataSet'] = new_firewall_li
resp['TotalCount'] = len(new_firewall_li)
return {
'status': True,
'msg': 'get firewall success',
'data': resp.get('DataSet')
}
except exc.UCloudException as e:
return {
'status': False,
'msg': 'get firewall failed, %s' % str(e)
}
ret = await describe_firewall(params_kw)
return ret