kboss/b/cntoai/get_model_apikey.dspy
2026-05-20 19:10:26 +08:00

85 lines
2.7 KiB
Plaintext

async def get_model_apikey(ns={}):
import aiohttp
if not ns.get('userid'):
ns['userid'] = await get_user()
if not ns.get('userid'):
return {
'status': False,
'msg': '未找到用户'
}
# 通过userid从user_api_keys表中查询opc_apikey
db = DBPools()
async with db.sqlorContext('kboss') as sor:
records = await sor.R('user_api_keys', {'userid': ns['userid'], 'action': 'sync'})
if not records:
return {
'status': False,
'msg': '未找到用户opc_apikey'
}
already_sync_user_key = records[0]['opc_apikey']
already_sync_user_appid = records[0]['appid']
# 目标URL
base_url = 'https://ai.atvoe.com'
url = f"{base_url}/dapi/apply_apikey.dspy"
# 请求头
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer %s" % already_sync_user_key
}
try:
# 创建一个异步会话
result_sysnc = None
async with aiohttp.ClientSession() as session:
# 发送GET请求
async with session.get(url, headers=headers) as response:
# 打印响应状态码
debug(f"get_model_apikey状态码: {response.status}")
result_sysnc = await response.json()
if not result_sysnc.get('status') == 'success':
debug(f"get_model_apikey获取模型apikey失败: {result_sysnc}")
return {
'status': False,
'msg': f"获取模型apikey失败: {result_sysnc}"
}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
# user_api_keys表格 userid/opc_apikey
# 首先判断apikey是否存在
apikey = result_sysnc['data'][0].get('apikey')
appid = result_sysnc['data'][0].get('appid')
secretkey = result_sysnc['data'][0].get('secretkey')
await sor.C('user_api_keys', {
'userid': userid,
'opc_apikey': apikey,
'appid': appid,
'secretkey': secretkey,
'action': 'user_self_create',
'expire_time': None,
})
debug(f"sync_cn_ai_user用户{payload['user']['id']}同步成功")
return {
'status': True,
'msg': '用户同步成功'
}
except Exception as e:
debug(f"sync_cn_ai_user{userid}同步用户失败: {e}")
return {
'status': False,
'msg': f"sync_cn_ai_user{userid}同步用户失败: {e}"
}
ret = await get_model_apikey(params_kw)
return ret