rbac/wwwroot/usersync/index.dspy
yumoqing ceb26adf53 feat: 添加用户同步接口 /rbac/usersync/
- 新增POST接口支持单个和批量用户同步到dapi模块
- 返回每个用户的dapi apikey
- 优先调用dapi模块的create_user_apikey函数
- 添加API说明书文档
2026-05-11 15:10:56 +08:00

220 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""用户同步接口 - POST方式同步单个或批量用户到dapi模块并返回apikey
POST /rbac/usersync/
请求体示例(单个用户):
{
"action": "single",
"dappid": "myapp",
"user": {
"id": "user123",
"orgid": "org456",
"username": "testuser",
"name": "测试用户",
"email": "test@example.com"
}
}
请求体示例(批量用户):
{
"action": "batch",
"dappid": "myapp",
"users": [
{"id": "user1", "orgid": "org1", "username": "user1"},
{"id": "user2", "orgid": "org1", "username": "user2"}
]
}
"""
# 获取参数
action = params_kw.get('action', 'batch')
dappid = params_kw.get('dappid', '')
# 参数验证
if not dappid:
return json.dumps({
'status': 'error',
'message': 'dappid参数必填'
}, ensure_ascii=False)
# 获取数据库连接
env = ServerEnv()
dbname = env.get_module_dbname('dapi')
config = getConfig()
db = DBPools()
db.databases = config.databases
# 检查dapi模块是否提供了create_user_apikey函数
create_apikey_func = getattr(env, 'create_user_apikey', None)
async with db.sqlorContext(dbname) as sor:
if action == 'single':
# 单个用户同步
user_data = params_kw.get('user', {})
user_id = user_data.get('id', '')
user_orgid = user_data.get('orgid', '')
if not user_id or not user_orgid:
return json.dumps({
'status': 'error',
'message': 'user.id和user.orgid参数必填'
}, ensure_ascii=False)
# 如果dapi模块提供了create_user_apikey函数直接调用
if create_apikey_func:
result = await create_user_apikey(
sor,
dappid,
user_id,
user_orgid,
**{k: v for k, v in user_data.items() if k not in ['id', 'orgid']}
)
return json.dumps(result, ensure_ascii=False)
# 否则自己创建apikey
# 检查apikey是否已存在
existing = await sor.R('downapikey', {
'dappid': dappid,
'duserid': user_id,
'dorgid': user_orgid
})
if existing:
apikey = password_decode(existing[0].apikey)
return json.dumps({
'status': 'success',
'data': [{
'user_id': user_id,
'username': user_data.get('username', ''),
'apikey': apikey,
'status': 'existing'
}]
}, ensure_ascii=False)
# 创建新apikey
apikey_id = getID()
apikey_value = getID()
ns = {
'id': apikey_id,
'dappid': dappid,
'dorgid': user_orgid,
'duserid': user_id,
'orgid': user_orgid,
'userid': user_id,
'apikey': password_encode(apikey_value),
'enabled': '1',
'created_at': datetime.now().strftime('%Y-%m-%d'),
'expires_at': '9999-12-31'
}
await sor.C('downapikey', ns)
return json.dumps({
'status': 'success',
'data': [{
'user_id': user_id,
'username': user_data.get('username', ''),
'apikey': apikey_value,
'status': 'created'
}]
}, ensure_ascii=False)
elif action == 'batch':
# 批量用户同步
users_list = params_kw.get('users', [])
if not users_list:
return json.dumps({
'status': 'error',
'message': 'users参数必填用户对象数组'
}, ensure_ascii=False)
result_data = []
for user_data in users_list:
user_id = user_data.get('id', '')
user_orgid = user_data.get('orgid', '')
if not user_id or not user_orgid:
result_data.append({
'user_id': user_id,
'status': 'error',
'message': 'user.id和user.orgid必填'
})
continue
# 如果dapi模块提供了create_user_apikey函数直接调用
if create_apikey_func:
result = await create_user_apikey(
sor,
dappid,
user_id,
user_orgid,
**{k: v for k, v in user_data.items() if k not in ['id', 'orgid']}
)
result_data.append({
'user_id': user_id,
'username': user_data.get('username', ''),
'apikey': result.get('apikey', ''),
'status': result.get('message', 'created'),
'result_status': result.get('status')
})
else:
# 检查apikey是否已存在
existing = await sor.R('downapikey', {
'dappid': dappid,
'duserid': user_id,
'dorgid': user_orgid
})
if existing:
apikey = password_decode(existing[0].apikey)
result_data.append({
'user_id': user_id,
'username': user_data.get('username', ''),
'apikey': apikey,
'status': 'existing'
})
else:
# 创建新apikey
apikey_id = getID()
apikey_value = getID()
ns = {
'id': apikey_id,
'dappid': dappid,
'dorgid': user_orgid,
'duserid': user_id,
'orgid': user_orgid,
'userid': user_id,
'apikey': password_encode(apikey_value),
'enabled': '1',
'created_at': datetime.now().strftime('%Y-%m-%d'),
'expires_at': '9999-12-31'
}
await sor.C('downapikey', ns)
result_data.append({
'user_id': user_id,
'username': user_data.get('username', ''),
'apikey': apikey_value,
'status': 'created'
})
return json.dumps({
'status': 'success',
'data': result_data,
'total': len(result_data)
}, ensure_ascii=False)
else:
return json.dumps({
'status': 'error',
'message': 'action参数必须是single或batch'
}, ensure_ascii=False)