rbac/wwwroot/usersync/index.dspy
2026-05-22 16:54:08 +08:00

162 lines
4.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""用户同步接口 - 检查用户/机构是否存在,不存在则注册并开帐,然后创建/获取apikey"""
from datetime import datetime
from appPublic.dictObject import DictObject
action = params_kw.get('action', 'batch')
dappid = params_kw.get('dappid', '')
if not dappid:
return json.dumps({'status': 'error', 'message': 'dappid参数必填'}, ensure_ascii=False)
dbname = get_module_dbname('dapi')
db = DBPools()
async with db.sqlorContext(dbname) as sor:
dapps = await sor.R('downapp', {'id': dappid})
if not dapps:
return {
"status": "error",
"data":{
"message": f"{dappid=} not found"
}
}
dapp = dapps[0]
secretkey = password_decode(dapp.secretkey)
if action == 'single':
user_data = params_kw.get('user', {})
user_id = user_data.get('id', '')
user_orgid = user_data.get('orgid', '')
if not user_id or not user_orgid:
return json.dumps({'status': 'error', 'message': 'user.id和user.orgid必填'}, ensure_ascii=False)
# 1. 检查用户
users = await sor.R('users', {'id': user_id})
if not users:
debug(f"User {user_id} not found, registering...")
# 1.1 确保机构存在
orgs = await sor.R('organization', {'id': user_orgid})
if not orgs:
org_name = user_data.get('name', user_data.get('username', user_id))
await create_org(sor, DictObject(id=user_orgid, orgname=org_name))
debug(f"Created org: {user_orgid}")
# 1.2 创建用户
pwd = user_data.get('password', '111111')
ns = DictObject(
id=user_id,
orgid=user_orgid,
username=user_data.get('username', user_id),
name=user_data.get('name', ''),
email=user_data.get('email', ''),
password=password_encode(pwd),
created_at=timestampstr(),
sync_from=dappid,
login_fail_count=0
)
await create_user(sor, ns)
debug(f"Created user: {user_id}")
# 1.3 开帐
try:
await openCustomerAccounts(sor, '0', user_orgid)
debug(f"Opened accounts for {user_orgid}")
except Exception as e:
exception(f"Failed to open accounts: {e}")
# 2. 处理 Apikey — 按(dappid, userid)查询
existing = await sor.R('downapikey', {'dappid': dappid, 'userid': user_id})
if existing:
secretkey = password_decode(dapp.secretkey)
apikey = password_decode(existing[0].apikey)
msg = '用户已同步获取现有apikey'
else:
apikey_value = uuid()
ns_key = {
'id': uuid(), 'dappid': dappid, 'userid': user_id,
'apikey': password_encode(apikey_value),
'enabled_date': datetime.now().strftime('%Y-%m-%d'), 'expired_date': '9999-12-31'
}
await sor.C('downapikey', ns_key)
apikey = apikey_value
msg = '用户同步成功apikey已创建'
return json.dumps({
'status': 'success',
'data': [{
'user_id': user_id,
'apikey': apikey,
'appid': dappid,
'secretkey': secretkey,
'status': msg
}]
}, ensure_ascii=False)
elif action == 'batch':
users_list = params_kw.get('users', [])
if not users_list:
return json.dumps({'status': 'error', 'message': 'users必填'}, ensure_ascii=False)
result_data = []
for user_data in users_list:
user_id = user_data.get('id', '')
user_orgid = user_data.get('orgid', '')
if not user_id or not user_orgid:
result_data.append({'user_id': user_id, 'status': 'error', 'message': 'id/orgid必填'})
continue
# 1. 检查/创建用户
users = await sor.R('users', {'id': user_id})
if not users:
orgs = await sor.R('organization', {'id': user_orgid})
if not orgs:
org_name = user_data.get('name', user_data.get('username', user_id))
await create_org(sor, DictObject(id=user_orgid, orgname=org_name))
pwd = user_data.get('password', '111111')
ns = DictObject(
id=user_id, orgid=user_orgid,
username=user_data.get('username', user_id),
name=user_data.get('name', ''), email=user_data.get('email', ''),
password=password_encode(pwd), created_at=timestampstr(), login_fail_count=0
)
await create_user(sor, ns)
# 开帐
try:
await openCustomerAccounts(sor, '0', user_orgid)
except Exception as e:
exception(f"Failed to open accounts: {e}")
# 2. 处理 Apikey — 按(dappid, userid)查询
existing = await sor.R('downapikey', {'dappid': dappid, 'userid': user_id})
if existing:
apikey = password_decode(existing[0].apikey)
status_msg = '用户已同步'
else:
apikey_value = uuid()
ns_key = {
'id': uuid(), 'dappid': dappid, 'userid': user_id,
'apikey': password_encode(apikey_value),
'secretkey': secretkey,
'appid': dappid,
'enabled_date': datetime.now().strftime('%Y-%m-%d'), 'expired_date': '9999-12-31'
}
await sor.C('downapikey', ns_key)
apikey = apikey_value
status_msg = '同步成功'
result_data.append({
'user_id': user_id,
'apikey': apikey,
'status': status_msg
})
return json.dumps({'status': 'success', 'data': result_data}, ensure_ascii=False)