This commit is contained in:
ping 2026-05-22 19:18:37 +08:00
parent dcfb872267
commit 2e34cc4d82
15 changed files with 894 additions and 110 deletions

160
b/cntoai/chat_send.dspy Normal file
View File

@ -0,0 +1,160 @@
def _escape(value):
if value is None:
return None
return str(value).replace("'", "''")
def _title_from_message(ns):
text = ns.get('message') or ns.get('text') or ''
text = str(text).strip().replace('\n', ' ')
if not text:
return '新对话'
return text[:30] + ('...' if len(text) > 30 else '')
def _build_user_content(ns):
text_parts = []
if ns.get('message'):
text_parts.append(str(ns.get('message')))
if ns.get('text'):
text_parts.append(str(ns.get('text')))
if ns.get('document_text'):
text_parts.append(str(ns.get('document_text')))
parts = []
merged_text = '\n'.join([p for p in text_parts if p]).strip()
if merged_text:
parts.append({'type': 'text', 'text': merged_text})
if ns.get('image_url'):
parts.append({'type': 'image_url', 'image_url': {'url': ns.get('image_url')}})
if ns.get('image_base64'):
mime = ns.get('image_mime') or 'image/jpeg'
b64 = ns.get('image_base64')
if not str(b64).startswith('data:'):
b64 = 'data:%s;base64,%s' % (mime, b64)
parts.append({'type': 'image_url', 'image_url': {'url': b64}})
if ns.get('document_url'):
parts.append({'type': 'file', 'file': {'file_url': ns.get('document_url')}})
if not parts:
return ''
if len(parts) == 1 and parts[0]['type'] == 'text':
return parts[0]['text']
return parts
async def _load_session_messages(sor, session_id):
sql = """
SELECT role, content, content_type
FROM chat_message
WHERE session_id = '%s'
ORDER BY created_at ASC;
""" % _escape(session_id)
rows = await sor.sqlExe(sql, {})
messages = []
for row in rows:
content = row.get('content') or ''
if row.get('content_type') == 'mixed':
import json
try:
content = json.loads(content)
except Exception:
pass
messages.append({'role': row['role'], 'content': content})
return messages
async def chat_send(ns={}):
"""
发送消息并保存多轮对话(需先执行 chat_tables.sql
参数model, message, stream(默认true), session_id,
image_url, image_base64, document_url, document_text
"""
import json
import traceback
model = ns.get('model')
if not model:
return {'status': False, 'msg': 'model is required'}
userid = ns.get('userid') or await get_user()
if not userid:
return {'status': False, 'msg': '未找到用户'}
user_content = _build_user_content(ns)
if not user_content:
return {'status': False, 'msg': '请输入文本,或提供图片/文档参数'}
content_type = 'mixed' if isinstance(user_content, list) else 'text'
store_content = json.dumps(user_content, ensure_ascii=False) if content_type == 'mixed' else str(user_content)
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
session_id = ns.get('session_id')
if not session_id:
session_id = uuid()
await sor.C('chat_session', {
'id': session_id,
'userid': userid,
'model': model,
'title': _title_from_message(ns),
})
else:
sessions = await sor.R('chat_session', {'id': session_id, 'userid': userid})
if not sessions:
return {'status': False, 'msg': '会话不存在'}
await sor.C('chat_message', {
'id': uuid(),
'session_id': session_id,
'role': 'user',
'content': store_content,
'content_type': content_type,
})
history = await _load_session_messages(sor, session_id)
stream_val = ns.get('stream', True)
if isinstance(stream_val, str):
stream_val = stream_val.lower() in ('1', 'true', 'yes', 'on')
chat_result = await path_call('llm_chat_completions.dspy', {
'model': model,
'messages': history,
'stream': stream_val,
'userid': userid,
'api_url': ns.get('api_url'),
'api_key': ns.get('api_key'),
'model_id': ns.get('model_id'),
})
if not chat_result.get('status'):
return chat_result
reply = chat_result['data']['reply']
await sor.C('chat_message', {
'id': uuid(),
'session_id': session_id,
'role': 'assistant',
'content': reply,
'content_type': 'text',
})
await sor.sqlExe(
"UPDATE chat_session SET updated_at = NOW() WHERE id = '%s';"
% _escape(session_id),
{},
)
return {
'status': True,
'msg': 'send success',
'data': {
'session_id': session_id,
'reply': reply,
'model': model,
},
}
except Exception:
return {'status': False, 'msg': 'send failed, %s' % traceback.format_exc()}
ret = await chat_send(params_kw)
return ret

View File

@ -0,0 +1,39 @@
def _escape(value):
if value is None:
return None
return str(value).replace("'", "''")
async def chat_session_delete(ns={}):
"""删除会话及其全部消息"""
session_id = ns.get('session_id')
if not session_id:
return {'status': False, 'msg': 'session_id is required'}
userid = ns.get('userid') or await get_user()
if not userid:
return {'status': False, 'msg': '未找到用户'}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
sessions = await sor.R('chat_session', {'id': session_id, 'userid': userid})
if not sessions:
return {'status': False, 'msg': '会话不存在'}
await sor.sqlExe(
"DELETE FROM chat_message WHERE session_id = '%s';" % _escape(session_id),
{},
)
await sor.sqlExe(
"DELETE FROM chat_session WHERE id = '%s' AND userid = '%s';"
% (_escape(session_id), _escape(userid)),
{},
)
return {'status': True, 'msg': 'delete success'}
except Exception as e:
return {'status': False, 'msg': 'delete failed, %s' % str(e)}
ret = await chat_session_delete(params_kw)
return ret

View File

@ -0,0 +1,50 @@
def _escape(value):
if value is None:
return None
return str(value).replace("'", "''")
async def chat_session_list(ns={}):
"""当前用户的对话会话列表(左侧栏历史)"""
userid = ns.get('userid') or await get_user()
if not userid:
return {'status': False, 'msg': '未找到用户'}
page_size = int(ns.get('page_size', 50))
current_page = int(ns.get('current_page', 1))
offset = (current_page - 1) * page_size
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
count_sql = """
SELECT COUNT(*) AS total_count FROM chat_session
WHERE userid = '%s';
""" % _escape(userid)
total = (await sor.sqlExe(count_sql, {}))[0]['total_count']
find_sql = """
SELECT id, model, title, created_at, updated_at
FROM chat_session
WHERE userid = '%s'
ORDER BY updated_at DESC
LIMIT %s OFFSET %s;
""" % (_escape(userid), page_size, offset)
sessions = await sor.sqlExe(find_sql, {})
return {
'status': True,
'msg': 'list success',
'data': {
'total_count': total,
'page_size': page_size,
'current_page': current_page,
'sessions': sessions,
},
}
except Exception as e:
return {'status': False, 'msg': 'list failed, %s' % str(e)}
ret = await chat_session_list(params_kw)
return ret

View File

@ -0,0 +1,66 @@
def _escape(value):
if value is None:
return None
return str(value).replace("'", "''")
async def chat_session_messages(ns={}):
"""获取某次会话的全部消息"""
session_id = ns.get('session_id')
if not session_id:
return {'status': False, 'msg': 'session_id is required'}
userid = ns.get('userid') or await get_user()
if not userid:
return {'status': False, 'msg': '未找到用户'}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
sessions = await sor.R('chat_session', {'id': session_id, 'userid': userid})
if not sessions:
return {'status': False, 'msg': '会话不存在'}
sql = """
SELECT id, role, content, content_type, created_at
FROM chat_message
WHERE session_id = '%s'
ORDER BY created_at ASC;
""" % _escape(session_id)
rows = await sor.sqlExe(sql, {})
messages = []
for row in rows:
content = row.get('content') or ''
if row.get('content_type') == 'mixed':
import json
try:
content = json.loads(content)
except Exception:
pass
if isinstance(content, list):
text_parts = [p.get('text', '') for p in content if p.get('type') == 'text']
display = '\n'.join([t for t in text_parts if t]) or '[多媒体消息]'
else:
display = content
messages.append({
'id': row['id'],
'role': row['role'],
'content': display,
'created_at': row.get('created_at'),
})
return {
'status': True,
'msg': 'get messages success',
'data': {
'session': sessions[0],
'messages': messages,
},
}
except Exception as e:
return {'status': False, 'msg': 'get messages failed, %s' % str(e)}
ret = await chat_session_messages(params_kw)
return ret

23
b/cntoai/chat_tables.sql Normal file
View File

@ -0,0 +1,23 @@
-- 多轮对话:请先执行本脚本创建表后再使用 chat_send / chat_session_* 接口
CREATE TABLE IF NOT EXISTS `chat_session` (
`id` varchar(64) NOT NULL COMMENT '会话ID',
`userid` varchar(64) NOT NULL COMMENT '用户ID',
`model` varchar(128) NOT NULL COMMENT '模型名称',
`title` varchar(255) DEFAULT NULL COMMENT '会话标题(首条问题摘要)',
`created_at` datetime DEFAULT current_timestamp() COMMENT '创建时间',
`updated_at` datetime DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '更新时间',
PRIMARY KEY (`id`),
KEY `idx_userid_updated` (`userid`, `updated_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='模型对话会话';
CREATE TABLE IF NOT EXISTS `chat_message` (
`id` varchar(64) NOT NULL COMMENT '消息ID',
`session_id` varchar(64) NOT NULL COMMENT '会话ID',
`role` varchar(32) NOT NULL COMMENT '角色: user / assistant / system',
`content` mediumtext COMMENT '消息内容纯文本或JSON',
`content_type` varchar(32) DEFAULT 'text' COMMENT 'text / mixed',
`created_at` datetime DEFAULT current_timestamp() COMMENT '创建时间',
PRIMARY KEY (`id`),
KEY `idx_session_id` (`session_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='模型对话消息';

View File

@ -60,6 +60,7 @@ async def create_model_apikey(ns={}):
async with session.post(url, headers=headers, data=json.dumps(payload)) as response:
# 打印响应状态码
debug(f"create_model_apikey状态码: {response.status}")
debug(f"create_model_apikey响应: {await response.text()}")
result_sysnc = await response.json()
if not result_sysnc.get('status') == 'ok':
@ -76,12 +77,13 @@ async def create_model_apikey(ns={}):
remote_table_id = result_sysnc['data'].get('id')
name = result_sysnc['data'].get('name')
secretkey = result_sysnc['data'].get('secretkey')
apikey = result_sysnc['data'].get('apikey')
await sor.C('user_api_keys', {
'userid': ns['userid'],
'remote_table_id': remote_table_id,
'name': name,
'opc_apikey': 1,
'opc_apikey': apikey,
'secretkey': secretkey,
'action': 'user_self_create',
})

View File

@ -0,0 +1,53 @@
def _escape(value):
if value is None:
return None
return str(value).replace("'", "''")
async def get_model_api_doc(ns={}):
"""
根据 model_id 查询模型 API 文档。
参数:
model_id (str) 模型ID必填
返回 data 字段:
id, model_id, curl_code, python_code, created_at, updated_at
"""
model_id = ns.get('id')
if not model_id:
return {'status': False, 'msg': 'model id is required'}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
# 通过model_id从model_management表中查询model_name
model_name_sql = """
SELECT model_name FROM model_management WHERE id = '%s' LIMIT 1;
""" % _escape(model_id)
model_name = await sor.sqlExe(model_name_sql, {})
if not model_name:
return {'status': False, 'msg': 'model not found'}
model_name = model_name[0]['model_name']
find_sql = """
SELECT id, api_url, model_id, curl_code, python_code, created_at, updated_at
FROM model_api_doc
WHERE model_id = '%s'
LIMIT 1;
""" % _escape(model_id)
result = await sor.sqlExe(find_sql, {})
if not result:
return {'status': False, 'msg': 'api doc not found'}
result[0]['model_name'] = model_name
return {
'status': True,
'msg': 'get model api doc success',
'data': result[0],
}
except Exception as e:
return {'status': False, 'msg': 'get model api doc failed, %s' % str(e)}
ret = await get_model_api_doc(params_kw)
return ret

View File

@ -10,90 +10,99 @@ async def get_model_apikey(ns={}):
'msg': '未找到用户'
}
action = ns.get('action')
if not action:
action = 'user_self_create'
# 通过userid从user_api_keys表中查询opc_apikey
db = DBPools()
async with db.sqlorContext('kboss') as sor:
records = await sor.R('user_api_keys', {'userid': ns['userid'], 'action': 'sync'})
records = await sor.R('user_api_keys', {'userid': ns['userid'], 'action': action})
if not records:
return {
'status': False,
'msg': '未找到用户opc_apikey'
'msg': 'apikey不存在'
}
already_sync_user_key = records[0]['opc_apikey']
already_sync_user_appid = records[0]['appid']
# domain 从数据库params表中获取到pname=cntoai_domain的pvalue值
db = DBPools()
async with db.sqlorContext('kboss') as sor:
domain = await sor.R('params', {'pname': 'cntoai_domain'})
if domain:
domain = domain[0]['pvalue']
else:
debug(f"get_model_apikey未找到域名")
return {
'status': False,
'msg': '未找到域名'
}
# 目标URL
url = f"{domain}/dapi/downapps.dspy"
# 请求头
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer %s" % already_sync_user_key
return {
'status': True,
'msg': '获取模型apikey成功',
'data': records
}
try:
# 创建一个异步会话
result_sysnc = None
async with aiohttp.ClientSession() as session:
# 发送GET请求
async with session.get(url, headers=headers) as response:
# 打印响应状态码
debug(f"get_model_apikey状态码: {response.status}")
result_sysnc = await response.json()
if not result_sysnc.get('status') == 'ok':
debug(f"get_model_apikey获取模型apikey失败: {result_sysnc}")
return {
'status': False,
'msg': f"获取模型apikey失败: {result_sysnc}"
}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
# user_api_keys表格 userid/opc_apikey
# 首先判断apikey是否存在
apikeys = result_sysnc['data']['apikeys']
# 遍历apikeys如果apikey不存在则创建 如果存在则做更新 根据userid和remote_table_id判断
for apikey_item in apikeys:
remote_table_id = apikey_item.get('id')
name = '' if not apikey_item.get('name') else apikey_item.get('name')
apikeyid = apikey_item.get('apikeyid')
exist_record = await sor.R('user_api_keys', {'userid': ns['userid'], 'remote_table_id': remote_table_id})
if exist_record:
update_sql = f"UPDATE user_api_keys SET name = '{name}', opc_apikey = '{apikeyid}' WHERE userid = '{ns['userid']}' AND remote_table_id = '{remote_table_id}'"
await sor.sqlExe(update_sql, {})
else:
await sor.C('user_api_keys', {
'userid': ns['userid'],
'remote_table_id': remote_table_id,
'name': name,
'opc_apikey': apikeyid,
'action': 'user_self_create',
})
result_sysnc['status'] = True
return result_sysnc
# already_sync_user_key = records[0]['opc_apikey']
# already_sync_user_appid = records[0]['appid']
except Exception as e:
debug(f"get_model_apikey获取模型apikey失败: {e}")
return {
'status': False,
'msg': f"get_model_apikey获取模型apikey失败: {e}"
}
# # domain 从数据库params表中获取到pname=cntoai_domain的pvalue值
# db = DBPools()
# async with db.sqlorContext('kboss') as sor:
# domain = await sor.R('params', {'pname': 'cntoai_domain'})
# if domain:
# domain = domain[0]['pvalue']
# else:
# debug(f"get_model_apikey未找到域名")
# return {
# 'status': False,
# 'msg': '未找到域名'
# }
# # 目标URL
# url = f"{domain}/dapi/downapps.dspy"
# # 请求头
# headers = {
# "Content-Type": "application/json",
# "Authorization": "Bearer %s" % already_sync_user_key
# }
# try:
# # 创建一个异步会话
# result_sysnc = None
# async with aiohttp.ClientSession() as session:
# # 发送GET请求
# async with session.get(url, headers=headers) as response:
# # 打印响应状态码
# debug(f"get_model_apikey状态码: {response.status}")
# result_sysnc = await response.json()
# if not result_sysnc.get('status') == 'ok':
# debug(f"get_model_apikey获取模型apikey失败: {result_sysnc}")
# return {
# 'status': False,
# 'msg': f"获取模型apikey失败: {result_sysnc}"
# }
# db = DBPools()
# async with db.sqlorContext('kboss') as sor:
# # user_api_keys表格 userid/opc_apikey
# # 首先判断apikey是否存在
# apikeys = result_sysnc['data']['apikeys']
# # 遍历apikeys如果apikey不存在则创建 如果存在则做更新 根据userid和remote_table_id判断
# for apikey_item in apikeys:
# remote_table_id = apikey_item.get('id')
# name = '' if not apikey_item.get('name') else apikey_item.get('name')
# apikeyid = apikey_item.get('apikeyid')
# exist_record = await sor.R('user_api_keys', {'userid': ns['userid'], 'remote_table_id': remote_table_id})
# if exist_record:
# update_sql = f"UPDATE user_api_keys SET name = '{name}', opc_apikey = '{apikeyid}' WHERE userid = '{ns['userid']}' AND remote_table_id = '{remote_table_id}'"
# await sor.sqlExe(update_sql, {})
# else:
# await sor.C('user_api_keys', {
# 'userid': ns['userid'],
# 'remote_table_id': remote_table_id,
# 'name': name,
# 'opc_apikey': apikeyid,
# 'action': 'user_self_create',
# })
# result_sysnc['status'] = True
# return result_sysnc
# except Exception as e:
# debug(f"get_model_apikey获取模型apikey失败: {e}")
# return {
# 'status': False,
# 'msg': f"get_model_apikey获取模型apikey失败: {e}"
# }
ret = await get_model_apikey(params_kw)

View File

@ -6,22 +6,27 @@ async def get_user_balance(ns={}):
:return: 账户余额(与 getCustomerBalance 返回值一致)
"""
debug(ns)
apikey = ns.get('apikey')
# apikey = ns.get('apikey')
userid = ns.get('userid')
db = DBPools()
async with db.sqlorContext('kboss') as sor:
if not apikey:
return {
'status': 'error',
'msg': 'apikey is required'
}
userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey})
if not userid_li:
return {
'status': 'error',
'msg': 'apikey无效请联系管理员'
}
# if not apikey:
# return {
# 'status': 'error',
# 'msg': 'apikey is required'
# }
# userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey})
# if not userid_li:
# return {
# 'status': 'error',
# 'msg': 'apikey无效请联系管理员'
# }
# userid = userid_li[0]['userid']
if not userid:
return {
'status': 'error',
'msg': 'userid is required'
}
user = await sor.R('users', {'id': userid})
if not user:
return {

View File

@ -0,0 +1,239 @@
def _escape(value):
if value is None:
return None
return str(value).replace("'", "''")
def _parse_bool(value, default=True):
if value is None or value == '':
return default
if isinstance(value, bool):
return value
return str(value).lower() in ('1', 'true', 'yes', 'on')
def _parse_messages(ns):
"""解析历史消息:支持 list 或 JSON 字符串"""
raw = ns.get('messages')
if not raw:
return []
if isinstance(raw, list):
return raw
if isinstance(raw, str):
import json
try:
return json.loads(raw)
except Exception:
return []
return []
def build_user_content(ns):
"""
构建单条 user 消息的 content支持文本 / 图片 / 文档链接。
参数(可组合):
message / text 文本
image_url 图片 URL
image_base64 图片 base64不含 data: 前缀)
document_url 文档 URL以 file 类型传给兼容接口)
document_text 文档纯文本(拼入 text
"""
text_parts = []
if ns.get('message'):
text_parts.append(str(ns.get('message')))
if ns.get('text'):
text_parts.append(str(ns.get('text')))
if ns.get('document_text'):
text_parts.append(str(ns.get('document_text')))
parts = []
merged_text = '\n'.join([p for p in text_parts if p]).strip()
if merged_text:
parts.append({'type': 'text', 'text': merged_text})
if ns.get('image_url'):
parts.append({
'type': 'image_url',
'image_url': {'url': ns.get('image_url')},
})
if ns.get('image_base64'):
mime = ns.get('image_mime') or 'image/jpeg'
b64 = ns.get('image_base64')
if not str(b64).startswith('data:'):
b64 = 'data:%s;base64,%s' % (mime, b64)
parts.append({
'type': 'image_url',
'image_url': {'url': b64},
})
if ns.get('document_url'):
parts.append({
'type': 'file',
'file': {'file_url': ns.get('document_url')},
})
if not parts:
return ''
if len(parts) == 1 and parts[0]['type'] == 'text':
return parts[0]['text']
return parts
async def _resolve_chat_config(ns, sor):
"""解析 API 地址与 Bearer Token"""
api_url = ns.get('api_url')
api_key = ns.get('api_key')
if not api_url and ns.get('model_id'):
doc_rows = await sor.sqlExe(
"SELECT api_url FROM model_api_doc WHERE model_id = '%s' LIMIT 1;"
% _escape(ns.get('model_id')),
{},
)
if doc_rows and doc_rows[0].get('api_url'):
api_url = doc_rows[0]['api_url']
if not str(api_url).endswith('/chat/completions'):
api_url = str(api_url).rstrip('/') + '/chat/completions'
if not api_url:
param_rows = await sor.R('params', {'pname': 'cntoai_llm_chat_url'})
if param_rows:
api_url = param_rows[0]['pvalue']
else:
domain_rows = await sor.R('params', {'pname': 'cntoai_domain'})
if domain_rows:
api_url = domain_rows[0]['pvalue'].rstrip('/') + '/llmage/v1/chat/completions'
else:
api_url = 'https://ai.atvoe.com/llmage/v1/chat/completions'
if not api_key:
userid = ns.get('userid') or await get_user()
if userid:
action = ns.get('apikey_action') or 'user_self_create'
keys = await sor.R('user_api_keys', {'userid': userid, 'action': action})
if not keys:
keys = await sor.R('user_api_keys', {'userid': userid, 'action': 'sync'})
if keys:
api_key = keys[0].get('opc_apikey')
if not api_key:
key_rows = await sor.R('params', {'pname': 'cntoai_llm_api_key'})
if key_rows:
api_key = key_rows[0]['pvalue']
return api_url, api_key
async def _read_stream_response(response):
"""解析 SSE 流式响应,汇总 assistant 文本"""
import json
chunks = []
buffer = ''
async for raw in response.content:
buffer += raw.decode('utf-8', errors='ignore')
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
line = line.strip()
if not line.startswith('data:'):
continue
data = line[5:].strip()
if data == '[DONE]':
return ''.join(chunks)
try:
payload = json.loads(data)
choice = (payload.get('choices') or [{}])[0]
delta = choice.get('delta') or {}
piece = delta.get('content') or ''
if piece:
chunks.append(piece)
except Exception:
continue
return ''.join(chunks)
async def llm_chat_completions(ns={}):
"""
OpenAI 兼容 chat/completionsaiohttp
参数:
model (str) 模型名,必填
message / text 当前用户文本
messages 历史消息 JSON 数组或 list多轮对话
stream (bool) 是否流式,默认 True
image_url / image_base64 图片
document_url / document_text 文档
api_url / api_key 可覆盖默认配置
model_id 从 model_api_doc 读取 api_url
userid 用于查 user_api_keys
"""
import aiohttp
import json
import traceback
model = ns.get('model')
if not model:
return {'status': False, 'msg': 'model is required'}
stream = _parse_bool(ns.get('stream'), True)
history = _parse_messages(ns)
user_content = build_user_content(ns)
if not user_content and not history:
return {'status': False, 'msg': 'message is required'}
messages = list(history)
if user_content:
messages.append({'role': 'user', 'content': user_content})
payload = {
'model': model,
'stream': stream,
'messages': messages,
}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
api_url, api_key = await _resolve_chat_config(ns, sor)
if not api_key:
return {'status': False, 'msg': '未找到 API Key请先创建或配置 cntoai_llm_api_key'}
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer %s' % api_key,
}
async with aiohttp.ClientSession() as session:
async with session.post(api_url, headers=headers, json=payload) as response:
if response.status != 200:
err_text = await response.text()
return {
'status': False,
'msg': '模型请求失败 HTTP %s: %s' % (response.status, err_text[:500]),
}
if stream:
reply = await _read_stream_response(response)
usage = {}
else:
body = await response.json()
choice = (body.get('choices') or [{}])[0]
msg = choice.get('message') or {}
reply = msg.get('content') or ''
usage = body.get('usage') or {}
return {
'status': True,
'msg': 'chat success',
'data': {
'model': model,
'reply': reply,
'messages': messages + [{'role': 'assistant', 'content': reply}],
'usage': usage,
'stream': stream,
},
}
except Exception:
return {'status': False, 'msg': 'chat failed, %s' % traceback.format_exc()}
ret = await llm_chat_completions(params_kw)
return ret

View File

@ -26,6 +26,7 @@ async def _charge_order(sor, orderid, order_type='NEW'):
"""
order_rows = await sor.R('bz_order', {'id': orderid})
if not order_rows:
debug(f"订单不存在")
return {'status': 'error', 'msg': '订单不存在'}
order_row = order_rows[0]
@ -38,10 +39,11 @@ async def _charge_order(sor, orderid, order_type='NEW'):
count = 0
if count - float(order_row['amount']) < 0:
pricedifference = count - round(order_row['amount'], 2)
debug(f"账户余额不足,订单金额: {order_row['amount']}, 账户余额: {count}, 差额: {pricedifference}")
return {
'status': 'error',
'msg': '账户余额不足',
'pricedifference': round(pricedifference, 2),
'pricedifference': round(pricedifference, 10),
}
await order2bill(orderid, sor)
@ -121,8 +123,10 @@ async def _charge_order(sor, orderid, order_type='NEW'):
# )
# await sor.C('customer_goods', nss)
debug(f"支付成功")
return {'status': True, 'msg': '支付成功'}
except Exception as error:
debug(f"支付失败: {error}")
return {'status': 'error', 'msg': str(error)}
async def calc_price_by_saleprotocol(sor, org, product_id, supply_price, quantity=1):
@ -142,11 +146,14 @@ async def calc_price_by_saleprotocol(sor, org, product_id, supply_price, quantit
supply_price = abs(float(supply_price))
quantity = int(quantity)
except (TypeError, ValueError):
debug(f"calc_price_by_saleprotocol supply_price / quantity 必须为有效数字")
return {'status': 'error', 'msg': 'supply_price / quantity 必须为有效数字'}
if supply_price <= 0:
debug(f"calc_price_by_saleprotocol supply_price 必须大于 0")
return {'status': 'error', 'msg': 'supply_price 必须大于 0'}
if quantity <= 0:
debug(f"calc_price_by_saleprotocol quantity 必须大于 0")
return {'status': 'error', 'msg': 'quantity 必须大于 0'}
saleprotocol_to_person = await sor.R(
@ -193,12 +200,13 @@ async def calc_price_by_saleprotocol(sor, org, product_id, supply_price, quantit
)
if not product_salemode:
debug(f"calc_price_by_saleprotocol 还未上线这个产品的协议配置")
return {'status': 'error', 'msg': '还未上线这个产品的协议配置'}
discount = product_salemode[0]['discount']
list_price = supply_price
price = abs(round(list_price * discount, 2))
amount = abs(round(price * quantity, 2))
price = abs(round(list_price * discount, 12))
amount = abs(round(price * quantity, 12))
return {
'status': True,
@ -246,17 +254,20 @@ async def process_user_billing(ns={}):
llmid = ns.get('llmid')
if not llmid:
debug(f"{userid} process_user_billing llmid必传")
return {
'status': 'error',
'msg': 'llmid必传'
}
try:
amount = round(float(amount), 2)
amount = round(float(amount), 12)
except (TypeError, ValueError):
debug(f"{userid} process_user_billing amount 必须为有效数字")
return {'status': 'error', 'msg': 'amount 必须为有效数字'}
if amount <= 0:
debug(f"{userid} process_user_billing amount 必须大于 0")
return {'status': 'error', 'msg': 'amount 必须大于 0'}
db = DBPools()
@ -264,6 +275,7 @@ async def process_user_billing(ns={}):
try:
product_li = await sor.R('product', {'providerpid': llmid, 'del_flg': '0'})
if not product_li:
debug(f"{userid} process_user_billing 未找到对应产品,请确认")
return {
'status': 'error',
'msg': '未找到对应产品,请确认'
@ -273,26 +285,30 @@ async def process_user_billing(ns={}):
providerid = product['providerid']
providername_list = await sor.R('organization', {'id': providerid})
if not providername_list:
debug(f"{userid} process_user_billing 厂商不存在 %s" % providername)
return {
'status': 'error',
'msg': '厂商不存在 %s' % providername
}
providername = providername_list[0]['orgname']
userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey})
if not userid_li:
return {
'status': 'error',
'msg': 'apikey无效请联系管理员'
}
# userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey})
# if not userid_li:
# debug(f"{userid} process_user_billing apikey无效请联系管理员")
# return {
# 'status': 'error',
# 'msg': 'apikey无效请联系管理员'
# }
# userid = userid_li[0]['userid']
user_list = await sor.R('users', {'id': userid})
if not user_list:
debug(f"{userid} process_user_billing 用户不存在 %s" % userid)
return {'status': 'error', 'msg': '用户不存在 %s' % userid}
org_list = await sor.R('organization', {'id': user_list[0]['orgid']})
if not org_list:
debug(f"{userid} process_user_billing 用户所属机构不存在")
return {'status': 'error', 'msg': '用户所属机构不存在'}
customerid = org_list[0]['id']
@ -329,7 +345,7 @@ async def process_user_billing(ns={}):
return {
'status': 'error',
'msg': '账户余额不足',
'pricedifference': round(balance - amount, 2),
'pricedifference': round(balance - amount, 12),
}
order_id = uuid()
@ -343,7 +359,7 @@ async def process_user_billing(ns={}):
'order_date': now_str,
'source': providername,
'amount': amount,
'originalprice': round(originalprice, 2),
'originalprice': round(originalprice, 12),
'ordertype': 'prepay',
'servicename': productname,
}

View File

@ -19,9 +19,6 @@ async def sync_cn_ai_user(ns={}):
email = user_info[0]['email']
debug(f"sync_cn_ai_user同步用户: {userid}, {orgid}, {username}, {name}, {email}")
already_sync_user_key = '2i68AZ81di_q5f8AySDrJ'
already_sync_user_dappid = 'cndemo'
# 目标URL
# domain 从数据库params表中获取到pname=cntoai_domain的pvalue值
domain = None
@ -36,6 +33,24 @@ async def sync_cn_ai_user(ns={}):
'status': False,
'msg': '未找到域名'
}
already_sync_user_key = await sor.R('params', {'pname': 'cntoai_already_sync_user_key'})
if already_sync_user_key:
already_sync_user_key = already_sync_user_key[0]['pvalue']
else:
debug(f"sync_cn_ai_user未找到已同步用户key")
return {
'status': False,
'msg': '未找到已同步用户key'
}
already_sync_user_dappid = await sor.R('params', {'pname': 'cntoai_already_sync_user_dappid'})
if already_sync_user_dappid:
already_sync_user_dappid = already_sync_user_dappid[0]['pvalue']
else:
debug(f"sync_cn_ai_user未找到已同步用户dappid")
return {
'status': False,
'msg': '未找到已同步用户dappid'
}
url = f"{domain}/rbac/usersync"

View File

@ -0,0 +1,70 @@
async def forgotPassword(ns):
"""
忘记密码:校验短信验证码后重置密码。
参数:
id (str) 用户ID找回验证码接口返回的 userid
password (str) 新密码
codeid (str) 验证码ID
vcode (str) 验证码
也可传 mobile 或 username 定位用户(未传 id 时)。
"""
import re
import traceback
if not ns.get('password'):
return {'status': False, 'msg': '新密码不能为空'}
if len(ns.get('password')) < 8 or not re.search(r'[a-zA-Z]', ns.get('password')) or not re.search(r'[0-9]', ns.get('password')):
return {'status': False, 'msg': '密码至少8位包含大小写字母、特殊字符、数字'}
if not ns.get('codeid'):
return {'status': False, 'msg': '验证码ID不能为空'}
if not ns.get('vcode'):
return {'status': False, 'msg': '验证码不能为空'}
db = DBPools()
async with db.sqlorContext('kboss') as sor:
try:
code = await sor.R('validatecode', {'id': ns.get('codeid'), 'vcode': ns.get('vcode')})
if code:
create_at = code[0]['create_at']
now = datetime.datetime.now()
create_at_dt = datetime.datetime.strptime(create_at, "%Y-%m-%d %H:%M:%S")
if (now - create_at_dt).seconds > 500:
return {'status': False, 'msg': '验证码过期'}
else:
return {'status': False, 'msg': '验证码不正确'}
user = None
if ns.get('id'):
users = await sor.R('users', {'id': ns.get('id'), 'del_flg': '0'})
if users:
user = users[0]
elif ns.get('mobile'):
users = await sor.R('users', {'mobile': ns.get('mobile'), 'del_flg': '0'})
if users:
user = users[0]
elif ns.get('username'):
users = await sor.R('users', {'username': ns.get('username'), 'del_flg': '0'})
if not users:
users = await sor.R('users', {'mobile': ns.get('username'), 'del_flg': '0'})
if users:
user = users[0]
else:
return {'status': False, 'msg': '用户标识不能为空'}
if not user:
return {'status': False, 'msg': '用户不存在'}
new_password = password_encode(ns['password'])
update_sql = """UPDATE users SET password = '%s' WHERE id = '%s';""" % (new_password, user['id'])
await sor.sqlExe(update_sql, {})
return {'status': True, 'msg': '密码重置成功'}
except Exception as error:
debug(f"forgotPassword 错误: {error}")
debug(f"forgotPassword 错误堆栈: {traceback.format_exc()}")
return {'status': False, 'msg': '密码重置失败, %s' % str(error)}
ret = await forgotPassword(params_kw)
return ret

View File

@ -1,14 +1,40 @@
async def sync_cn_ai_user(userid=None, orgid=None, username=None, name=None, email=None):
import aiohttp
debug(f"sync_cn_ai_user同步用户: {userid}, {orgid}, {username}, {name}, {email}")
already_sync_user_key = '2i68AZ81di_q5f8AySDrJ'
already_sync_user_dappid = 'cndemo'
# 目标URL
url = "https://ai.atvoe.com/rbac/usersync"
# url = 'https://ai.atvoe.com/tmp/env.dspy'
# domain 从数据库params表中获取到pname=cntoai_domain的pvalue值
domain = None
db = DBPools()
async with db.sqlorContext('kboss') as sor:
domain = await sor.R('params', {'pname': 'cntoai_domain'})
if domain:
domain = domain[0]['pvalue']
else:
debug(f"sync_cn_ai_user未找到域名")
return {
'status': False,
'msg': '未找到域名'
}
already_sync_user_key = await sor.R('params', {'pname': 'cntoai_already_sync_user_key'})
if already_sync_user_key:
already_sync_user_key = already_sync_user_key[0]['pvalue']
else:
debug(f"sync_cn_ai_user未找到已同步用户key")
return {
'status': False,
'msg': '未找到已同步用户key'
}
already_sync_user_dappid = await sor.R('params', {'pname': 'cntoai_already_sync_user_dappid'})
if already_sync_user_dappid:
already_sync_user_dappid = already_sync_user_dappid[0]['pvalue']
else:
debug(f"sync_cn_ai_user未找到已同步用户dappid")
return {
'status': False,
'msg': '未找到已同步用户dappid'
}
url = f"{domain}/rbac/usersync"
# 请求头
headers = {
@ -82,7 +108,6 @@ async def sync_cn_ai_user(userid=None, orgid=None, username=None, name=None, ema
'msg': f"sync_cn_ai_user{userid}同步用户失败: {e}"
}
async def registerUser(ns):
"""
用户注册

View File

@ -112,6 +112,18 @@ async def mobilecode(ns):
return {'status': False, 'msg': '发送失败'}
else:
return {'status': False, 'action': 'redirect', 'msg': '用户未注册, 请到注册页面注册'}
# 忘记密码逻辑:检查手机号是否存在
elif action_type == 'forgotpassword':
if len(userreacs) >= 1:
code = await generate_vcode()
nss = await send_vcode(userreacs[0]['mobile'], '用户注册登录验证', {'SMSvCode': code.get('vcode')})
if nss['status']:
return {'status': True, 'msg': '验证码发送成功', 'codeid': code.get('id')}
else:
return {'status': False, 'msg': '发送失败'}
else:
return {'status': False, 'action': 'redirect', 'msg': '用户未注册, 请到注册页面注册'}
# 原有逻辑如果没有指定action_type保持原有逻辑
else: