rbac/wwwroot/usersync/index.dspy

140 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""用户同步接口 - POST方式同步单个或批量用户到dapi模块并返回apikey"""
from datetime import datetime
# 获取参数
action = params_kw.get('action', 'batch')
dappid = params_kw.get('dappid', '')
# 参数验证
if not dappid:
return json.dumps({
'status': 'error',
'message': 'dappid参数必填'
}, ensure_ascii=False)
# 获取数据库连接
dbname = get_module_dbname('dapi')
db = DBPools()
# 检查dapi模块是否提供了create_user_apikey函数
create_apikey_func = create_user_apikey
async with db.sqlorContext(dbname) as sor:
if action == 'single':
user_data = params_kw.get('user', {})
user_id = user_data.get('id', '')
user_orgid = user_data.get('orgid', '')
if not user_id or not user_orgid:
return json.dumps({
'status': 'error',
'message': 'user.id和user.orgid参数必填'
}, ensure_ascii=False)
# 确保机构存在
org = await sor.R('organization', {'id': user_orgid})
if not org:
await sor.C('organization', {'id': user_orgid, 'orgname': user_data.get('name', user_data.get('username', user_id))})
await sor.C('orgtypes', {'id': getID(), 'orgid': user_orgid, 'orgtypeid': 'customer'})
# 确保用户存在
usr = await sor.R('users', {'id': user_id})
if not usr:
await sor.C('users', {
'id': user_id,
'orgid': user_orgid,
'username': user_data.get('username', user_id),
'name': user_data.get('name', ''),
'email': user_data.get('email', ''),
'password': password_encode(user_data.get('password', '111111'))
})
# 创建apikey
if create_apikey_func:
result = await create_apikey_func(
sor, dappid, user_id, user_orgid,
**{k: v for k, v in user_data.items() if k not in ['id', 'orgid']}
)
return json.dumps(result, ensure_ascii=False)
# 手动创建apikey
existing = await sor.R('downapikey', {'dappid': dappid, 'duserid': user_id, 'dorgid': user_orgid})
if existing:
f = get_serverenv('password_decode')
apikey = f(existing[0].apikey)
return json.dumps({'status': 'success', 'data': [{'user_id': user_id, 'apikey': apikey, 'status': 'existing'}]}, ensure_ascii=False)
apikey_id = getID()
apikey_value = getID()
ns = {
'id': apikey_id, 'dappid': dappid, 'dorgid': user_orgid, 'duserid': user_id,
'orgid': user_orgid, 'userid': user_id, 'apikey': password_encode(apikey_value),
'enabled': '1', 'created_at': datetime.now().strftime('%Y-%m-%d'), 'expired_date': '9999-12-31'
}
await sor.C('downapikey', ns)
return json.dumps({'status': 'success', 'data': [{'user_id': user_id, 'apikey': apikey_value, 'status': 'created'}]}, ensure_ascii=False)
elif action == 'batch':
users_list = params_kw.get('users', [])
if not users_list:
return json.dumps({'status': 'error', 'message': 'users参数必填用户对象数组'}, ensure_ascii=False)
result_data = []
for user_data in users_list:
user_id = user_data.get('id', '')
user_orgid = user_data.get('orgid', '')
if not user_id or not user_orgid:
result_data.append({'user_id': user_id, 'status': 'error', 'message': 'user.id和user.orgid必填'})
continue
# 确保机构存在
org = await sor.R('organization', {'id': user_orgid})
if not org:
await sor.C('organization', {'id': user_orgid, 'orgname': user_data.get('name', user_data.get('username', user_id))})
await sor.C('orgtypes', {'id': getID(), 'orgid': user_orgid, 'orgtypeid': 'customer'})
# 确保用户存在
usr = await sor.R('users', {'id': user_id})
if not usr:
await sor.C('users', {
'id': user_id, 'orgid': user_orgid,
'username': user_data.get('username', user_id), 'name': user_data.get('name', ''),
'email': user_data.get('email', ''), 'password': password_encode(user_data.get('password', '111111'))
})
# 创建apikey
if create_apikey_func:
result = await create_apikey_func(
sor, dappid, user_id, user_orgid,
**{k: v for k, v in user_data.items() if k not in ['id', 'orgid']}
)
result_data.append({
'user_id': user_id, 'username': user_data.get('username', ''),
'apikey': result.get('apikey', ''), 'status': result.get('message', 'created'),
'result_status': result.get('status')
})
else:
existing = await sor.R('downapikey', {'dappid': dappid, 'duserid': user_id, 'dorgid': user_orgid})
if existing:
f = get_serverenv('password_decode')
apikey = f(existing[0].apikey)
result_data.append({'user_id': user_id, 'apikey': apikey, 'status': 'existing'})
else:
apikey_id = getID()
apikey_value = getID()
ns = {
'id': apikey_id, 'dappid': dappid, 'dorgid': user_orgid, 'duserid': user_id,
'orgid': user_orgid, 'userid': user_id, 'apikey': password_encode(apikey_value),
'enabled': '1', 'created_at': datetime.now().strftime('%Y-%m-%d'), 'expired_date': '9999-12-31'
}
await sor.C('downapikey', ns)
result_data.append({'user_id': user_id, 'apikey': apikey_value, 'status': 'created'})
return json.dumps({'status': 'success', 'data': result_data, 'total': len(result_data)}, ensure_ascii=False)
else:
return json.dumps({'status': 'error', 'message': 'action参数必须是single或batch'}, ensure_ascii=False)