#!/usr/bin/env python3 # -*- coding: utf-8 -*- """用户同步接口 - 检查用户/机构是否存在,不存在则注册并开帐,然后创建/获取apikey""" from datetime import datetime from appPublic.dictObject import DictObject action = params_kw.get('action', 'batch') dappid = params_kw.get('dappid', '') if not dappid: return json.dumps({'status': 'error', 'message': 'dappid参数必填'}, ensure_ascii=False) dbname = get_module_dbname('dapi') db = DBPools() async with db.sqlorContext(dbname) as sor: dapps = await sor.R('downapp', {'id': dappid}) if not dapps: return { "status": "error", "data":{ "message": f"{dappid=} not found" } } dapp = dapps[0] secretkey = password_decode(dapp.secretkey) if action == 'single': user_data = params_kw.get('user', {}) user_id = user_data.get('id', '') user_orgid = user_data.get('orgid', '') if not user_id or not user_orgid: return json.dumps({'status': 'error', 'message': 'user.id和user.orgid必填'}, ensure_ascii=False) # 1. 检查用户 users = await sor.R('users', {'id': user_id}) if not users: debug(f"User {user_id} not found, registering...") # 1.1 确保机构存在 orgs = await sor.R('organization', {'id': user_orgid}) if not orgs: org_name = user_data.get('name', user_data.get('username', user_id)) await create_org(sor, DictObject(id=user_orgid, orgname=org_name)) debug(f"Created org: {user_orgid}") # 1.2 创建用户 pwd = user_data.get('password', '111111') ns = DictObject( id=user_id, orgid=user_orgid, username=user_data.get('username', user_id), name=user_data.get('name', ''), email=user_data.get('email', ''), password=password_encode(pwd), created_at=timestampstr(), sync_from=dappid, login_fail_count=0 ) await create_user(sor, ns) debug(f"Created user: {user_id}") # 1.3 开帐 try: await openCustomerAccounts(sor, '0', user_orgid) debug(f"Opened accounts for {user_orgid}") except Exception as e: exception(f"Failed to open accounts: {e}") # 2. 处理 Apikey — 按(dappid, userid)查询 existing = await sor.R('downapikey', {'dappid': dappid, 'userid': user_id}) if existing: secretkey = password_decode(dapp.secretkey) apikey = password_decode(existing[0].apikey) msg = '用户已同步,获取现有apikey' else: apikey_value = uuid() ns_key = { 'id': uuid(), 'dappid': dappid, 'userid': user_id, 'apikey': password_encode(apikey_value), 'enabled_date': datetime.now().strftime('%Y-%m-%d'), 'expired_date': '9999-12-31' } await sor.C('downapikey', ns_key) apikey = apikey_value msg = '用户同步成功,apikey已创建' return json.dumps({ 'status': 'success', 'data': [{ 'user_id': user_id, 'apikey': apikey, 'appid': dappid, 'secretkey': secretkey, 'status': msg }] }, ensure_ascii=False) elif action == 'batch': users_list = params_kw.get('users', []) if not users_list: return json.dumps({'status': 'error', 'message': 'users必填'}, ensure_ascii=False) result_data = [] for user_data in users_list: user_id = user_data.get('id', '') user_orgid = user_data.get('orgid', '') if not user_id or not user_orgid: result_data.append({'user_id': user_id, 'status': 'error', 'message': 'id/orgid必填'}) continue # 1. 检查/创建用户 users = await sor.R('users', {'id': user_id}) if not users: orgs = await sor.R('organization', {'id': user_orgid}) if not orgs: org_name = user_data.get('name', user_data.get('username', user_id)) await create_org(sor, DictObject(id=user_orgid, orgname=org_name)) pwd = user_data.get('password', '111111') ns = DictObject( id=user_id, orgid=user_orgid, username=user_data.get('username', user_id), name=user_data.get('name', ''), email=user_data.get('email', ''), password=password_encode(pwd), created_at=timestampstr(), login_fail_count=0 ) await create_user(sor, ns) # 开帐 try: await openCustomerAccounts(sor, '0', user_orgid) except Exception as e: exception(f"Failed to open accounts: {e}") # 2. 处理 Apikey — 按(dappid, userid)查询 existing = await sor.R('downapikey', {'dappid': dappid, 'userid': user_id}) if existing: apikey = password_decode(existing[0].apikey) status_msg = '用户已同步' else: apikey_value = uuid() ns_key = { 'id': uuid(), 'dappid': dappid, 'userid': user_id, 'apikey': password_encode(apikey_value), 'secretkey': secretkey, 'appid': dappid, 'enabled_date': datetime.now().strftime('%Y-%m-%d'), 'expired_date': '9999-12-31' } await sor.C('downapikey', ns_key) apikey = apikey_value status_msg = '同步成功' result_data.append({ 'user_id': user_id, 'apikey': apikey, 'status': status_msg }) return json.dumps({'status': 'success', 'data': result_data}, ensure_ascii=False)