rbac/wwwroot/usersync/index.dspy

232 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""用户同步接口 - POST方式同步单个或批量用户到dapi模块并返回apikey
POST /rbac/usersync/
请求体示例(单个用户):
{
"action": "single",
"dappid": "myapp",
"user": {
"id": "user123",
"orgid": "org456",
"username": "testuser",
"name": "测试用户",
"email": "test@example.com"
}
}
请求体示例(批量用户):
{
"action": "batch",
"dappid": "myapp",
"users": [
{"id": "user1", "orgid": "org1", "username": "user1"},
{"id": "user2", "orgid": "org1", "username": "user2"}
]
}
"""
# 获取参数
action = params_kw.get('action', 'batch')
dappid = params_kw.get('dappid', '')
# 参数验证
if not dappid:
return json.dumps({
'status': 'error',
'message': 'dappid参数必填'
}, ensure_ascii=False)
# 获取数据库连接
dbname = get_module_dbname('dapi')
db = DBPools()
# 检查dapi模块是否提供了create_user_apikey函数
create_apikey_func = create_user_apikey
async with db.sqlorContext(dbname) as sor:
if action == 'single':
# 单个用户同步
user_data = params_kw.get('user', {})
user_id = user_data.get('id', '')
user_orgid = user_data.get('orgid', '')
if not user_id or not user_orgid:
return json.dumps({
'status': 'error',
'message': 'user.id和user.orgid参数必填'
}, ensure_ascii=False)
# 如果dapi模块提供了create_user_apikey函数直接调用
if create_apikey_func:
result = await create_user_apikey(
sor,
dappid,
user_id,
user_orgid,
**{k: v for k, v in user_data.items() if k not in ['id', 'orgid']}
)
return json.dumps(result, ensure_ascii=False)
# 否则自己创建apikey
# 检查apikey是否已存在
existing = await sor.R('downapikey', {
'dappid': dappid,
'duserid': user_id,
'dorgid': user_orgid
})
if existing:
# 验证用户是否存在
user_check = await sor.R('users', {'id': existing[0].userid})
if not user_check:
# 脏数据downapikey 存在但用户已删除,清理后重新创建
await sor.D('downapikey', {'id': existing[0].id})
existing = None
else:
apikey = password_decode(existing[0].apikey)
return json.dumps({
'status': 'success',
'data': [{
'user_id': user_id,
'username': user_data.get('username', ''),
'apikey': apikey,
'status': 'existing'
}]
}, ensure_ascii=False)
# 创建新apikey
apikey_id = getID()
apikey_value = getID()
ns = {
'id': apikey_id,
'dappid': dappid,
'dorgid': user_orgid,
'duserid': user_id,
'orgid': user_orgid,
'userid': user_id,
'apikey': password_encode(apikey_value),
'enabled': '1',
'created_at': datetime.now().strftime('%Y-%m-%d'),
'expired_date': '9999-12-31'
}
await sor.C('downapikey', ns)
return json.dumps({
'status': 'success',
'data': [{
'user_id': user_id,
'username': user_data.get('username', ''),
'apikey': apikey_value,
'status': 'created'
}]
}, ensure_ascii=False)
elif action == 'batch':
# 批量用户同步
users_list = params_kw.get('users', [])
if not users_list:
return json.dumps({
'status': 'error',
'message': 'users参数必填用户对象数组'
}, ensure_ascii=False)
result_data = []
for user_data in users_list:
user_id = user_data.get('id', '')
user_orgid = user_data.get('orgid', '')
if not user_id or not user_orgid:
result_data.append({
'user_id': user_id,
'status': 'error',
'message': 'user.id和user.orgid必填'
})
continue
# 如果dapi模块提供了create_user_apikey函数直接调用
if create_apikey_func:
result = await create_user_apikey(
sor,
dappid,
user_id,
user_orgid,
**{k: v for k, v in user_data.items() if k not in ['id', 'orgid']}
)
result_data.append({
'user_id': user_id,
'username': user_data.get('username', ''),
'apikey': result.get('apikey', ''),
'status': result.get('message', 'created'),
'result_status': result.get('status')
})
else:
# 检查apikey是否已存在
existing = await sor.R('downapikey', {
'dappid': dappid,
'duserid': user_id,
'dorgid': user_orgid
})
if existing:
# 验证用户是否存在
user_check = await sor.R('users', {'id': existing[0].userid})
if not user_check:
# 脏数据,清理后重新创建
await sor.D('downapikey', {'id': existing[0].id})
existing = None
else:
apikey = password_decode(existing[0].apikey)
result_data.append({
'user_id': user_id,
'username': user_data.get('username', ''),
'apikey': apikey,
'status': 'existing'
})
continue
# 创建新apikey
apikey_id = getID()
apikey_value = getID()
ns = {
'id': apikey_id,
'dappid': dappid,
'dorgid': user_orgid,
'duserid': user_id,
'orgid': user_orgid,
'userid': user_id,
'apikey': password_encode(apikey_value),
'enabled': '1',
'created_at': datetime.now().strftime('%Y-%m-%d'),
'expired_date': '9999-12-31'
}
await sor.C('downapikey', ns)
result_data.append({
'user_id': user_id,
'username': user_data.get('username', ''),
'apikey': apikey_value,
'status': 'created'
})
return json.dumps({
'status': 'success',
'data': result_data,
'total': len(result_data)
}, ensure_ascii=False)
else:
return json.dumps({
'status': 'error',
'message': 'action参数必须是single或batch'
}, ensure_ascii=False)