kboss/b/baiducloud/get_baiduQualifyInfo.dspy
2025-11-17 14:51:20 +08:00

94 lines
3.5 KiB
Plaintext

async def create_baiduuser(ns):
"""
ns = {
"name": "用户名",
"email": "邮箱",
"mobilePhone": "手机号",
"accountType": "0"为个人|“1”为企业
}
:param ns:
:return:
"""
db = DBPools()
async with db.sqlorContext('kboss') as sor:
user = await sor.R('users',{'id':ns.get('userid')})
data = {'name':user[0]['username'],'mobilePhone':user[0]['mobile'],'accountType':'0'}
method = "POST"
ns_format = '&'.join(['%s=%s' % (k, v) for k, v in ns.items()])
url = 'https://iam.bj.baidubce.com/v1/vs/account?%s' % ns_format
header = {
"Host": "iam.bj.baidubce.com",
"Content-Type": "application/json"
}
header = await get_auth_header(method=method, url=url, header=header)
async with aiohttp_client.request(
method=method,
url=url,
headers=header,
json=data) as res:
data_ = await res.json()
if not data_.get('userId'):
return {
'status': False,
'msg': '创建用户失败',
'data': data_
}
else:
#用户创建成功
userdata = {
'id' : uuid(),
# 'id' : UUID(),
'user_id' : ns.get('userid'),
'baidu_id' : data_.get('userId'),
'baidu_username' : data_.get('name'),
}
await sor.C('baidu_users',userdata)
return {
'status': True,
'msg': '创建用户成功',
'data': data_
}
async def get_baiduQualifyInfo(ns):
"""
获取百度实名认证接口
"""
db_baidu = DBPools()
async with db_baidu.sqlorContext('kboss') as sor:
user = await sor.R('baidu_users', {'user_id': ns.get('user_id'), 'del_flg': '0'})
if not user:
res = await create_baiduuser({'userid': ns.get('user_id')})
if not res.get('status'):
return res
db = DBPools()
async with db.sqlorContext('kboss') as sor:
user = await sor.R('baidu_users',{'user_id':ns.get('user_id')})
if len(user) >= 1:
nss = {}
nss['accountId'] = user[0]['baidu_id']
ns_format = '&'.join(['%s=%s' % (k, v) for k, v in nss.items()])
url = 'https://qualify.baidubce.com/v1/getQualifyInfo?%s' % ns_format
method = 'GET'
header = {
"Host": "qualify.baidubce.com",
"ContentType": "application/json;charset=UTF-8"
}
header = await get_auth_header(method=method, url=url, header=header)
async with aiohttp_client.request(
method=method,
url=url,
headers=header,
json=ns) as res:
data_ = await res.json()
if data_['qualifyType'] != None and data_['status'] == 'PASS':
return {'status': True, 'msg': '实名认证通过', 'data': data_}
else:
return {'status': False, 'msg': '实名认证未通过', 'data': data_}
else:
return {'status': False, 'msg': 'user not exist, please create baidu user first'}
ret = await get_baiduQualifyInfo(params_kw)
return ret