kboss/b/cntoai/sync_cn_ai_user.dspy
2026-05-22 19:18:37 +08:00

131 lines
4.4 KiB
Plaintext

async def sync_cn_ai_user(ns={}):
import aiohttp
user_info = None
if ns.get('userid'):
userid = ns.get('userid')
db = DBPools()
async with db.sqlorContext('kboss') as sor:
user_info = await sor.R('users', {'id': userid})
if not user_info:
return {
'status': False,
'msg': '未找到匹配的用户'
}
userid = user_info[0]['id']
orgid = user_info[0]['orgid']
username = user_info[0]['username']
name = user_info[0]['name']
email = user_info[0]['email']
debug(f"sync_cn_ai_user同步用户: {userid}, {orgid}, {username}, {name}, {email}")
# 目标URL
# domain 从数据库params表中获取到pname=cntoai_domain的pvalue值
domain = None
db = DBPools()
async with db.sqlorContext('kboss') as sor:
domain = await sor.R('params', {'pname': 'cntoai_domain'})
if domain:
domain = domain[0]['pvalue']
else:
debug(f"sync_cn_ai_user未找到域名")
return {
'status': False,
'msg': '未找到域名'
}
already_sync_user_key = await sor.R('params', {'pname': 'cntoai_already_sync_user_key'})
if already_sync_user_key:
already_sync_user_key = already_sync_user_key[0]['pvalue']
else:
debug(f"sync_cn_ai_user未找到已同步用户key")
return {
'status': False,
'msg': '未找到已同步用户key'
}
already_sync_user_dappid = await sor.R('params', {'pname': 'cntoai_already_sync_user_dappid'})
if already_sync_user_dappid:
already_sync_user_dappid = already_sync_user_dappid[0]['pvalue']
else:
debug(f"sync_cn_ai_user未找到已同步用户dappid")
return {
'status': False,
'msg': '未找到已同步用户dappid'
}
url = f"{domain}/rbac/usersync"
# 请求头
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer %s" % already_sync_user_key
}
# 请求体数据
payload = {
"action": "single",
"dappid": already_sync_user_dappid,
"user": {
"id": userid,
"orgid": orgid,
"username": username,
"name": name,
"email": email
}
}
try:
# 创建一个异步会话
result_sysnc = None
async with aiohttp.ClientSession() as session:
# 发送POST请求
async with session.post(url, headers=headers, data=json.dumps(payload)) as response:
# 打印响应状态码
debug(f"sync_cn_ai_user状态码: {response.status}")
result_sysnc = await response.json()
if not result_sysnc.get('status') == 'success':
debug(f"sync_cn_ai_user同步用户失败: {result_sysnc}")
return {
'status': False
}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
# user_api_keys表格 userid/opc_apikey
# 首先判断apikey是否存在
apikey = result_sysnc['data'][0].get('apikey')
appid = result_sysnc['data'][0].get('appid')
secretkey = result_sysnc['data'][0].get('secretkey')
records = await sor.R('user_api_keys', {'opc_apikey': apikey})
if records:
debug(f"sync_cn_ai_user用户{payload['user']['id']}已存在")
return {
'status': False,
'msg': '用户opc_apikey已存在'
}
await sor.C('user_api_keys', {
'userid': userid,
'opc_apikey': apikey,
'appid': appid,
'secretkey': secretkey,
'action': 'sync',
'expire_time': None,
})
debug(f"sync_cn_ai_user用户{payload['user']['id']}同步成功")
return {
'status': True,
'msg': '用户同步成功'
}
except Exception as e:
debug(f"sync_cn_ai_user{userid}同步用户失败: {e}")
return {
'status': False,
'msg': f"sync_cn_ai_user{userid}同步用户失败: {e}"
}
ret = await sync_cn_ai_user(params_kw)
return ret