kboss/b/ucloud/create_firewall.dspy
2025-07-16 14:27:17 +08:00

38 lines
1.4 KiB
Plaintext

async def create_firewall(ns={}):
firewall_name = ns.get('name') if ns.get('name') else time.strftime('%Y%m%d%H%M%S')
ns['name'] = ns['orgid'] + '0_____0' + firewall_name
db = DBPools()
async with db.sqlorContext('kboss') as sor:
ucloud_user_li = await sor.R('ucloud_users', {'orgid': 'main_user', 'del_flg': '0'})
if ucloud_user_li:
uc_client = U_Client({
"project_id": ucloud_user_li[0]['projectid'],
"public_key": ucloud_user_li[0]['accesskey'],
"private_key": ucloud_user_li[0]['accesskeysecret'],
})
else:
return {
'status': False,
'msg': 'can not find u cloud user'
}
try:
resp = uc_client.unet().create_firewall({
'Region': ns.get('region'),
"Name": ns.get('name'),
"Tag": ns.get('tag') or 'Default',
"Remark": ns.get('remark'),
"Rule": json.loads(ns['rule']) if isinstance(ns['rule'], str) else ns['rule']
})
return {
'status': True,
'msg': 'create firewall success',
'data': resp
}
except exc.UCloudException as e:
return {
'status': False,
'msg': 'create firewall failed, %s' % str(e)
}
ret = await create_firewall(params_kw)
return ret