kboss/b/baiducloud/create_baiduuser.dspy
2025-07-16 14:27:17 +08:00

55 lines
1.8 KiB
Plaintext

async def create_baiduuser(ns):
"""
ns = {
"name": "用户名",
"email": "邮箱",
"mobilePhone": "手机号",
"accountType": "0"为个人|“1”为企业
}
:param ns:
:return:
"""
db = DBPools()
async with db.sqlorContext('kboss') as sor:
user = await sor.R('users',{'id':ns.get('userid')})
data = {'name':user[0]['username'],'mobilePhone':user[0]['mobile'],'accountType':'0'}
method = "POST"
ns_format = '&'.join(['%s=%s' % (k, v) for k, v in ns.items()])
url = 'https://iam.bj.baidubce.com/v1/vs/account?%s' % ns_format
header = {
"Host": "iam.bj.baidubce.com",
"Content-Type": "application/json"
}
header = await get_auth_header(method=method, url=url, header=header)
async with aiohttp_client.request(
method=method,
url=url,
headers=header,
json=data) as res:
data_ = await res.json()
if not data_.get('userId'):
return {
'status': False,
'msg': '创建用户失败',
'data': data_
}
else:
#用户创建成功
userdata = {
'id' : uuid(),
# 'id' : UUID(),
'user_id' : ns.get('userid'),
'baidu_id' : data_.get('userId'),
'baidu_username' : data_.get('name'),
}
await sor.C('baidu_users',userdata)
return {
'status': True,
'msg': '创建用户成功',
'data': data_
}
ret = await create_baiduuser(params_kw)
return ret