kboss/b/ucloud/ucloud_create_user.dspy
2025-07-16 14:27:17 +08:00

143 lines
4.9 KiB
Plaintext

# 主账号创建用户
async def create_user(ns={}):
uc_client = ns.get('uc_client')
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
kyy_name = ns.get('orgid')
resp = uc_client.invoke("CreateUser", {
"UserName": kyy_name,
"AccessKeyStatus": "Active",
"LoginProfileStatus": "Inactive",
"DisplayName": kyy_name
})
new_user = {
'id': uuid(),
'orgid': ns.get('orgid'),
'accesskey': resp.get('AccessKeyID'),
'accesskeysecret': resp.get('AccessKeySecret'),
'username': kyy_name,
'useremail': resp.get('UserEmail'),
}
await sor.C('ucloud_users', new_user)
return {
'status': True,
'msg': 'create user success',
'username': kyy_name,
'data': resp
}
except exc.RetCodeException as e:
resp = e.json()
return {
'status': False,
'msg': resp
}
# 主账号创建项目
async def create_project(ns={}):
"""
创建项目 项目名称=用户orgid
:param ns:
:return:
"""
uc_client = ns.get('uc_client')
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
orgid = ns.get('orgid')
resp = uc_client.invoke("CreateProject", {
"ProjectName": orgid
})
if not resp.get('RetCode'):
projectid = resp.get('ProjectId')
sql_u = """update ucloud_users set projectid = '%s' where orgid = '%s';""" % (projectid, orgid)
await sor.sqlExe(sql_u, {})
return {
'status': True,
'projectid': projectid,
'msg': 'create project success'
}
else:
return {
'status': False,
'msg': resp.get('Message')
}
except exc.RetCodeException as e:
resp = e.json()
return {
'status': False,
'msg': str(resp)
}
# 主账号关联IAM策略到用户和项目
async def attach_policies_to_user(ns={}):
uc_client = ns.get('uc_client')
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
resp = uc_client.invoke("AttachPoliciesToUser", {
"UserName": ns.get('username'),
"Scope": "Specified",
"ProjectID": ns.get('projectid'),
"PolicyURNs": ["ucs:iam::ucs:policy/AdministratorAccess"]
})
if not resp.get('RetCode'):
sql_u = """update ucloud_users set attachpoliciesstatus = '1' where orgid = '%s';""" % ns.get('username')
await sor.sqlExe(sql_u, {})
return {
'status': True,
'msg': 'bind user to polices success'
}
except exc.RetCodeException as e:
resp = e.json()
return {
'status': False,
'msg': 'bind user to polices failed, %s' % str(resp)
}
async def ucloud_create_user(ns={}):
uc_client = None
db = DBPools()
async with db.sqlorContext('kboss') as sor:
orgid_exist = await sor.R('ucloud_users', {'orgid': ns.get('orgid'), 'del_flg': '0'})
# 数据库中是否已经存在
if orgid_exist:
if not orgid_exist[0]['projectid'] or orgid_exist[0]['attachpoliciesstatus'] == '0':
print('%s projectid or attach, ucloud create user failed' % ns.get('orgid'))
return {
'status': False,
'msg': 'projectid or attach wrong'
}
return {
'status': False,
'msg': 'UserName Already Exist In Local'
}
# 查找主账号密钥
main_user_li = await sor.R('ucloud_users', {'orgid': 'main_user', 'del_flg': '0'})
public_key = main_user_li[0]['accesskey']
private_key = main_user_li[0]['accesskeysecret']
uc_client = U_Client({
"public_key": public_key,
"private_key": private_key
})
ns_uc = {
'orgid': ns.get('orgid'),
'uc_client': uc_client
}
create_user_res = await create_user(ns_uc)
ns_uc['username'] = create_user_res['username']
create_project_res = await create_project(ns_uc)
ns_uc['projectid'] = create_project_res['projectid']
res = await attach_policies_to_user(ns_uc)
if res.get('status'):
return {
'status': True,
'msg': 'u cloud create user success'
}
else:
return {
'status': False,
'msg': 'u cloud create user failed'
}
ret = await ucloud_create_user(params_kw)
return ret