kboss/b/cntoai/get_model_apikey.dspy
2026-05-22 19:18:37 +08:00

109 lines
3.9 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

async def get_model_apikey(ns={}):
import aiohttp
if not ns.get('userid'):
ns['userid'] = await get_user()
if not ns.get('userid'):
return {
'status': False,
'msg': '未找到用户'
}
action = ns.get('action')
if not action:
action = 'user_self_create'
# 通过userid从user_api_keys表中查询opc_apikey
db = DBPools()
async with db.sqlorContext('kboss') as sor:
records = await sor.R('user_api_keys', {'userid': ns['userid'], 'action': action})
if not records:
return {
'status': False,
'msg': 'apikey不存在'
}
return {
'status': True,
'msg': '获取模型apikey成功',
'data': records
}
# already_sync_user_key = records[0]['opc_apikey']
# already_sync_user_appid = records[0]['appid']
# # domain 从数据库params表中获取到pname=cntoai_domain的pvalue值
# db = DBPools()
# async with db.sqlorContext('kboss') as sor:
# domain = await sor.R('params', {'pname': 'cntoai_domain'})
# if domain:
# domain = domain[0]['pvalue']
# else:
# debug(f"get_model_apikey未找到域名")
# return {
# 'status': False,
# 'msg': '未找到域名'
# }
# # 目标URL
# url = f"{domain}/dapi/downapps.dspy"
# # 请求头
# headers = {
# "Content-Type": "application/json",
# "Authorization": "Bearer %s" % already_sync_user_key
# }
# try:
# # 创建一个异步会话
# result_sysnc = None
# async with aiohttp.ClientSession() as session:
# # 发送GET请求
# async with session.get(url, headers=headers) as response:
# # 打印响应状态码
# debug(f"get_model_apikey状态码: {response.status}")
# result_sysnc = await response.json()
# if not result_sysnc.get('status') == 'ok':
# debug(f"get_model_apikey获取模型apikey失败: {result_sysnc}")
# return {
# 'status': False,
# 'msg': f"获取模型apikey失败: {result_sysnc}"
# }
# db = DBPools()
# async with db.sqlorContext('kboss') as sor:
# # user_api_keys表格 userid/opc_apikey
# # 首先判断apikey是否存在
# apikeys = result_sysnc['data']['apikeys']
# # 遍历apikeys如果apikey不存在则创建 如果存在则做更新 根据userid和remote_table_id判断
# for apikey_item in apikeys:
# remote_table_id = apikey_item.get('id')
# name = '' if not apikey_item.get('name') else apikey_item.get('name')
# apikeyid = apikey_item.get('apikeyid')
# exist_record = await sor.R('user_api_keys', {'userid': ns['userid'], 'remote_table_id': remote_table_id})
# if exist_record:
# update_sql = f"UPDATE user_api_keys SET name = '{name}', opc_apikey = '{apikeyid}' WHERE userid = '{ns['userid']}' AND remote_table_id = '{remote_table_id}'"
# await sor.sqlExe(update_sql, {})
# else:
# await sor.C('user_api_keys', {
# 'userid': ns['userid'],
# 'remote_table_id': remote_table_id,
# 'name': name,
# 'opc_apikey': apikeyid,
# 'action': 'user_self_create',
# })
# result_sysnc['status'] = True
# return result_sysnc
# except Exception as e:
# debug(f"get_model_apikey获取模型apikey失败: {e}")
# return {
# 'status': False,
# 'msg': f"get_model_apikey获取模型apikey失败: {e}"
# }
ret = await get_model_apikey(params_kw)
return ret