diff --git a/b/cntoai/chat_send.dspy b/b/cntoai/chat_send.dspy new file mode 100644 index 0000000..0611dfb --- /dev/null +++ b/b/cntoai/chat_send.dspy @@ -0,0 +1,160 @@ +def _escape(value): + if value is None: + return None + return str(value).replace("'", "''") + + +def _title_from_message(ns): + text = ns.get('message') or ns.get('text') or '' + text = str(text).strip().replace('\n', ' ') + if not text: + return '新对话' + return text[:30] + ('...' if len(text) > 30 else '') + + +def _build_user_content(ns): + text_parts = [] + if ns.get('message'): + text_parts.append(str(ns.get('message'))) + if ns.get('text'): + text_parts.append(str(ns.get('text'))) + if ns.get('document_text'): + text_parts.append(str(ns.get('document_text'))) + + parts = [] + merged_text = '\n'.join([p for p in text_parts if p]).strip() + if merged_text: + parts.append({'type': 'text', 'text': merged_text}) + if ns.get('image_url'): + parts.append({'type': 'image_url', 'image_url': {'url': ns.get('image_url')}}) + if ns.get('image_base64'): + mime = ns.get('image_mime') or 'image/jpeg' + b64 = ns.get('image_base64') + if not str(b64).startswith('data:'): + b64 = 'data:%s;base64,%s' % (mime, b64) + parts.append({'type': 'image_url', 'image_url': {'url': b64}}) + if ns.get('document_url'): + parts.append({'type': 'file', 'file': {'file_url': ns.get('document_url')}}) + if not parts: + return '' + if len(parts) == 1 and parts[0]['type'] == 'text': + return parts[0]['text'] + return parts + + +async def _load_session_messages(sor, session_id): + sql = """ + SELECT role, content, content_type + FROM chat_message + WHERE session_id = '%s' + ORDER BY created_at ASC; + """ % _escape(session_id) + rows = await sor.sqlExe(sql, {}) + messages = [] + for row in rows: + content = row.get('content') or '' + if row.get('content_type') == 'mixed': + import json + try: + content = json.loads(content) + except Exception: + pass + messages.append({'role': row['role'], 'content': content}) + return messages + + +async def chat_send(ns={}): + """ + 发送消息并保存多轮对话(需先执行 chat_tables.sql)。 + + 参数:model, message, stream(默认true), session_id, + image_url, image_base64, document_url, document_text + """ + import json + import traceback + + model = ns.get('model') + if not model: + return {'status': False, 'msg': 'model is required'} + + userid = ns.get('userid') or await get_user() + if not userid: + return {'status': False, 'msg': '未找到用户'} + + user_content = _build_user_content(ns) + if not user_content: + return {'status': False, 'msg': '请输入文本,或提供图片/文档参数'} + + content_type = 'mixed' if isinstance(user_content, list) else 'text' + store_content = json.dumps(user_content, ensure_ascii=False) if content_type == 'mixed' else str(user_content) + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + session_id = ns.get('session_id') + if not session_id: + session_id = uuid() + await sor.C('chat_session', { + 'id': session_id, + 'userid': userid, + 'model': model, + 'title': _title_from_message(ns), + }) + else: + sessions = await sor.R('chat_session', {'id': session_id, 'userid': userid}) + if not sessions: + return {'status': False, 'msg': '会话不存在'} + + await sor.C('chat_message', { + 'id': uuid(), + 'session_id': session_id, + 'role': 'user', + 'content': store_content, + 'content_type': content_type, + }) + + history = await _load_session_messages(sor, session_id) + stream_val = ns.get('stream', True) + if isinstance(stream_val, str): + stream_val = stream_val.lower() in ('1', 'true', 'yes', 'on') + chat_result = await path_call('llm_chat_completions.dspy', { + 'model': model, + 'messages': history, + 'stream': stream_val, + 'userid': userid, + 'api_url': ns.get('api_url'), + 'api_key': ns.get('api_key'), + 'model_id': ns.get('model_id'), + }) + if not chat_result.get('status'): + return chat_result + + reply = chat_result['data']['reply'] + await sor.C('chat_message', { + 'id': uuid(), + 'session_id': session_id, + 'role': 'assistant', + 'content': reply, + 'content_type': 'text', + }) + await sor.sqlExe( + "UPDATE chat_session SET updated_at = NOW() WHERE id = '%s';" + % _escape(session_id), + {}, + ) + + return { + 'status': True, + 'msg': 'send success', + 'data': { + 'session_id': session_id, + 'reply': reply, + 'model': model, + }, + } + except Exception: + return {'status': False, 'msg': 'send failed, %s' % traceback.format_exc()} + + +ret = await chat_send(params_kw) +return ret diff --git a/b/cntoai/chat_session_delete.dspy b/b/cntoai/chat_session_delete.dspy new file mode 100644 index 0000000..bb12134 --- /dev/null +++ b/b/cntoai/chat_session_delete.dspy @@ -0,0 +1,39 @@ +def _escape(value): + if value is None: + return None + return str(value).replace("'", "''") + + +async def chat_session_delete(ns={}): + """删除会话及其全部消息""" + session_id = ns.get('session_id') + if not session_id: + return {'status': False, 'msg': 'session_id is required'} + + userid = ns.get('userid') or await get_user() + if not userid: + return {'status': False, 'msg': '未找到用户'} + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + sessions = await sor.R('chat_session', {'id': session_id, 'userid': userid}) + if not sessions: + return {'status': False, 'msg': '会话不存在'} + + await sor.sqlExe( + "DELETE FROM chat_message WHERE session_id = '%s';" % _escape(session_id), + {}, + ) + await sor.sqlExe( + "DELETE FROM chat_session WHERE id = '%s' AND userid = '%s';" + % (_escape(session_id), _escape(userid)), + {}, + ) + return {'status': True, 'msg': 'delete success'} + except Exception as e: + return {'status': False, 'msg': 'delete failed, %s' % str(e)} + + +ret = await chat_session_delete(params_kw) +return ret diff --git a/b/cntoai/chat_session_list.dspy b/b/cntoai/chat_session_list.dspy new file mode 100644 index 0000000..c814ef6 --- /dev/null +++ b/b/cntoai/chat_session_list.dspy @@ -0,0 +1,50 @@ +def _escape(value): + if value is None: + return None + return str(value).replace("'", "''") + + +async def chat_session_list(ns={}): + """当前用户的对话会话列表(左侧栏历史)""" + userid = ns.get('userid') or await get_user() + if not userid: + return {'status': False, 'msg': '未找到用户'} + + page_size = int(ns.get('page_size', 50)) + current_page = int(ns.get('current_page', 1)) + offset = (current_page - 1) * page_size + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + count_sql = """ + SELECT COUNT(*) AS total_count FROM chat_session + WHERE userid = '%s'; + """ % _escape(userid) + total = (await sor.sqlExe(count_sql, {}))[0]['total_count'] + + find_sql = """ + SELECT id, model, title, created_at, updated_at + FROM chat_session + WHERE userid = '%s' + ORDER BY updated_at DESC + LIMIT %s OFFSET %s; + """ % (_escape(userid), page_size, offset) + sessions = await sor.sqlExe(find_sql, {}) + + return { + 'status': True, + 'msg': 'list success', + 'data': { + 'total_count': total, + 'page_size': page_size, + 'current_page': current_page, + 'sessions': sessions, + }, + } + except Exception as e: + return {'status': False, 'msg': 'list failed, %s' % str(e)} + + +ret = await chat_session_list(params_kw) +return ret diff --git a/b/cntoai/chat_session_messages.dspy b/b/cntoai/chat_session_messages.dspy new file mode 100644 index 0000000..d702f8d --- /dev/null +++ b/b/cntoai/chat_session_messages.dspy @@ -0,0 +1,66 @@ +def _escape(value): + if value is None: + return None + return str(value).replace("'", "''") + + +async def chat_session_messages(ns={}): + """获取某次会话的全部消息""" + session_id = ns.get('session_id') + if not session_id: + return {'status': False, 'msg': 'session_id is required'} + + userid = ns.get('userid') or await get_user() + if not userid: + return {'status': False, 'msg': '未找到用户'} + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + sessions = await sor.R('chat_session', {'id': session_id, 'userid': userid}) + if not sessions: + return {'status': False, 'msg': '会话不存在'} + + sql = """ + SELECT id, role, content, content_type, created_at + FROM chat_message + WHERE session_id = '%s' + ORDER BY created_at ASC; + """ % _escape(session_id) + rows = await sor.sqlExe(sql, {}) + + messages = [] + for row in rows: + content = row.get('content') or '' + if row.get('content_type') == 'mixed': + import json + try: + content = json.loads(content) + except Exception: + pass + if isinstance(content, list): + text_parts = [p.get('text', '') for p in content if p.get('type') == 'text'] + display = '\n'.join([t for t in text_parts if t]) or '[多媒体消息]' + else: + display = content + messages.append({ + 'id': row['id'], + 'role': row['role'], + 'content': display, + 'created_at': row.get('created_at'), + }) + + return { + 'status': True, + 'msg': 'get messages success', + 'data': { + 'session': sessions[0], + 'messages': messages, + }, + } + except Exception as e: + return {'status': False, 'msg': 'get messages failed, %s' % str(e)} + + +ret = await chat_session_messages(params_kw) +return ret diff --git a/b/cntoai/chat_tables.sql b/b/cntoai/chat_tables.sql new file mode 100644 index 0000000..f84777e --- /dev/null +++ b/b/cntoai/chat_tables.sql @@ -0,0 +1,23 @@ +-- 多轮对话:请先执行本脚本创建表后再使用 chat_send / chat_session_* 接口 + +CREATE TABLE IF NOT EXISTS `chat_session` ( + `id` varchar(64) NOT NULL COMMENT '会话ID', + `userid` varchar(64) NOT NULL COMMENT '用户ID', + `model` varchar(128) NOT NULL COMMENT '模型名称', + `title` varchar(255) DEFAULT NULL COMMENT '会话标题(首条问题摘要)', + `created_at` datetime DEFAULT current_timestamp() COMMENT '创建时间', + `updated_at` datetime DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '更新时间', + PRIMARY KEY (`id`), + KEY `idx_userid_updated` (`userid`, `updated_at`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='模型对话会话'; + +CREATE TABLE IF NOT EXISTS `chat_message` ( + `id` varchar(64) NOT NULL COMMENT '消息ID', + `session_id` varchar(64) NOT NULL COMMENT '会话ID', + `role` varchar(32) NOT NULL COMMENT '角色: user / assistant / system', + `content` mediumtext COMMENT '消息内容(纯文本或JSON)', + `content_type` varchar(32) DEFAULT 'text' COMMENT 'text / mixed', + `created_at` datetime DEFAULT current_timestamp() COMMENT '创建时间', + PRIMARY KEY (`id`), + KEY `idx_session_id` (`session_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='模型对话消息'; diff --git a/b/cntoai/create_model_apikey.dspy b/b/cntoai/create_model_apikey.dspy index 5068403..9969df8 100644 --- a/b/cntoai/create_model_apikey.dspy +++ b/b/cntoai/create_model_apikey.dspy @@ -60,6 +60,7 @@ async def create_model_apikey(ns={}): async with session.post(url, headers=headers, data=json.dumps(payload)) as response: # 打印响应状态码 debug(f"create_model_apikey状态码: {response.status}") + debug(f"create_model_apikey响应: {await response.text()}") result_sysnc = await response.json() if not result_sysnc.get('status') == 'ok': @@ -76,12 +77,13 @@ async def create_model_apikey(ns={}): remote_table_id = result_sysnc['data'].get('id') name = result_sysnc['data'].get('name') secretkey = result_sysnc['data'].get('secretkey') + apikey = result_sysnc['data'].get('apikey') await sor.C('user_api_keys', { 'userid': ns['userid'], 'remote_table_id': remote_table_id, 'name': name, - 'opc_apikey': 1, + 'opc_apikey': apikey, 'secretkey': secretkey, 'action': 'user_self_create', }) diff --git a/b/cntoai/get_model_api_doc.dspy b/b/cntoai/get_model_api_doc.dspy new file mode 100644 index 0000000..b0a1ebd --- /dev/null +++ b/b/cntoai/get_model_api_doc.dspy @@ -0,0 +1,53 @@ +def _escape(value): + if value is None: + return None + return str(value).replace("'", "''") + + +async def get_model_api_doc(ns={}): + """ + 根据 model_id 查询模型 API 文档。 + + 参数: + model_id (str) 模型ID,必填 + + 返回 data 字段: + id, model_id, curl_code, python_code, created_at, updated_at + """ + model_id = ns.get('id') + if not model_id: + return {'status': False, 'msg': 'model id is required'} + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + # 通过model_id从model_management表中查询model_name + model_name_sql = """ + SELECT model_name FROM model_management WHERE id = '%s' LIMIT 1; + """ % _escape(model_id) + model_name = await sor.sqlExe(model_name_sql, {}) + if not model_name: + return {'status': False, 'msg': 'model not found'} + model_name = model_name[0]['model_name'] + + find_sql = """ + SELECT id, api_url, model_id, curl_code, python_code, created_at, updated_at + FROM model_api_doc + WHERE model_id = '%s' + LIMIT 1; + """ % _escape(model_id) + result = await sor.sqlExe(find_sql, {}) + if not result: + return {'status': False, 'msg': 'api doc not found'} + result[0]['model_name'] = model_name + return { + 'status': True, + 'msg': 'get model api doc success', + 'data': result[0], + } + except Exception as e: + return {'status': False, 'msg': 'get model api doc failed, %s' % str(e)} + + +ret = await get_model_api_doc(params_kw) +return ret diff --git a/b/cntoai/get_model_apikey.dspy b/b/cntoai/get_model_apikey.dspy index 30ba719..9dda40e 100644 --- a/b/cntoai/get_model_apikey.dspy +++ b/b/cntoai/get_model_apikey.dspy @@ -10,90 +10,99 @@ async def get_model_apikey(ns={}): 'msg': '未找到用户' } + action = ns.get('action') + if not action: + action = 'user_self_create' + # 通过userid从user_api_keys表中查询opc_apikey db = DBPools() async with db.sqlorContext('kboss') as sor: - records = await sor.R('user_api_keys', {'userid': ns['userid'], 'action': 'sync'}) + records = await sor.R('user_api_keys', {'userid': ns['userid'], 'action': action}) if not records: return { 'status': False, - 'msg': '未找到用户opc_apikey' + 'msg': 'apikey不存在' } - - already_sync_user_key = records[0]['opc_apikey'] - already_sync_user_appid = records[0]['appid'] - - # domain 从数据库params表中获取到pname=cntoai_domain的pvalue值 - db = DBPools() - async with db.sqlorContext('kboss') as sor: - domain = await sor.R('params', {'pname': 'cntoai_domain'}) - if domain: - domain = domain[0]['pvalue'] - else: - debug(f"get_model_apikey未找到域名") - return { - 'status': False, - 'msg': '未找到域名' - } - - # 目标URL - url = f"{domain}/dapi/downapps.dspy" - # 请求头 - headers = { - "Content-Type": "application/json", - "Authorization": "Bearer %s" % already_sync_user_key + return { + 'status': True, + 'msg': '获取模型apikey成功', + 'data': records } - - try: - # 创建一个异步会话 - result_sysnc = None - async with aiohttp.ClientSession() as session: - # 发送GET请求 - async with session.get(url, headers=headers) as response: - # 打印响应状态码 - debug(f"get_model_apikey状态码: {response.status}") - result_sysnc = await response.json() - - if not result_sysnc.get('status') == 'ok': - debug(f"get_model_apikey获取模型apikey失败: {result_sysnc}") - return { - 'status': False, - 'msg': f"获取模型apikey失败: {result_sysnc}" - } - - db = DBPools() - async with db.sqlorContext('kboss') as sor: - # user_api_keys表格 userid/opc_apikey - # 首先判断apikey是否存在 - apikeys = result_sysnc['data']['apikeys'] - # 遍历apikeys,如果apikey不存在,则创建, 如果存在则做更新 根据userid和remote_table_id判断 - for apikey_item in apikeys: - remote_table_id = apikey_item.get('id') - name = '' if not apikey_item.get('name') else apikey_item.get('name') - apikeyid = apikey_item.get('apikeyid') - exist_record = await sor.R('user_api_keys', {'userid': ns['userid'], 'remote_table_id': remote_table_id}) - if exist_record: - update_sql = f"UPDATE user_api_keys SET name = '{name}', opc_apikey = '{apikeyid}' WHERE userid = '{ns['userid']}' AND remote_table_id = '{remote_table_id}'" - await sor.sqlExe(update_sql, {}) - else: - await sor.C('user_api_keys', { - 'userid': ns['userid'], - 'remote_table_id': remote_table_id, - 'name': name, - 'opc_apikey': apikeyid, - 'action': 'user_self_create', - }) - - result_sysnc['status'] = True - return result_sysnc + # already_sync_user_key = records[0]['opc_apikey'] + # already_sync_user_appid = records[0]['appid'] - except Exception as e: - debug(f"get_model_apikey获取模型apikey失败: {e}") - return { - 'status': False, - 'msg': f"get_model_apikey获取模型apikey失败: {e}" - } + # # domain 从数据库params表中获取到pname=cntoai_domain的pvalue值 + # db = DBPools() + # async with db.sqlorContext('kboss') as sor: + # domain = await sor.R('params', {'pname': 'cntoai_domain'}) + # if domain: + # domain = domain[0]['pvalue'] + # else: + # debug(f"get_model_apikey未找到域名") + # return { + # 'status': False, + # 'msg': '未找到域名' + # } + + # # 目标URL + # url = f"{domain}/dapi/downapps.dspy" + + # # 请求头 + # headers = { + # "Content-Type": "application/json", + # "Authorization": "Bearer %s" % already_sync_user_key + # } + + # try: + # # 创建一个异步会话 + # result_sysnc = None + # async with aiohttp.ClientSession() as session: + # # 发送GET请求 + # async with session.get(url, headers=headers) as response: + # # 打印响应状态码 + # debug(f"get_model_apikey状态码: {response.status}") + # result_sysnc = await response.json() + + # if not result_sysnc.get('status') == 'ok': + # debug(f"get_model_apikey获取模型apikey失败: {result_sysnc}") + # return { + # 'status': False, + # 'msg': f"获取模型apikey失败: {result_sysnc}" + # } + + # db = DBPools() + # async with db.sqlorContext('kboss') as sor: + # # user_api_keys表格 userid/opc_apikey + # # 首先判断apikey是否存在 + # apikeys = result_sysnc['data']['apikeys'] + # # 遍历apikeys,如果apikey不存在,则创建, 如果存在则做更新 根据userid和remote_table_id判断 + # for apikey_item in apikeys: + # remote_table_id = apikey_item.get('id') + # name = '' if not apikey_item.get('name') else apikey_item.get('name') + # apikeyid = apikey_item.get('apikeyid') + # exist_record = await sor.R('user_api_keys', {'userid': ns['userid'], 'remote_table_id': remote_table_id}) + # if exist_record: + # update_sql = f"UPDATE user_api_keys SET name = '{name}', opc_apikey = '{apikeyid}' WHERE userid = '{ns['userid']}' AND remote_table_id = '{remote_table_id}'" + # await sor.sqlExe(update_sql, {}) + # else: + # await sor.C('user_api_keys', { + # 'userid': ns['userid'], + # 'remote_table_id': remote_table_id, + # 'name': name, + # 'opc_apikey': apikeyid, + # 'action': 'user_self_create', + # }) + + # result_sysnc['status'] = True + # return result_sysnc + + # except Exception as e: + # debug(f"get_model_apikey获取模型apikey失败: {e}") + # return { + # 'status': False, + # 'msg': f"get_model_apikey获取模型apikey失败: {e}" + # } ret = await get_model_apikey(params_kw) diff --git a/b/cntoai/get_user_balance.dspy b/b/cntoai/get_user_balance.dspy index 16ed070..53c6ce8 100644 --- a/b/cntoai/get_user_balance.dspy +++ b/b/cntoai/get_user_balance.dspy @@ -6,22 +6,27 @@ async def get_user_balance(ns={}): :return: 账户余额(与 getCustomerBalance 返回值一致) """ debug(ns) - apikey = ns.get('apikey') + # apikey = ns.get('apikey') userid = ns.get('userid') db = DBPools() async with db.sqlorContext('kboss') as sor: - if not apikey: - return { - 'status': 'error', - 'msg': 'apikey is required' - } - userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey}) - if not userid_li: - return { - 'status': 'error', - 'msg': 'apikey无效,请联系管理员' - } + # if not apikey: + # return { + # 'status': 'error', + # 'msg': 'apikey is required' + # } + # userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey}) + # if not userid_li: + # return { + # 'status': 'error', + # 'msg': 'apikey无效,请联系管理员' + # } # userid = userid_li[0]['userid'] + if not userid: + return { + 'status': 'error', + 'msg': 'userid is required' + } user = await sor.R('users', {'id': userid}) if not user: return { diff --git a/b/cntoai/llm_chat_completions.dspy b/b/cntoai/llm_chat_completions.dspy new file mode 100644 index 0000000..d910986 --- /dev/null +++ b/b/cntoai/llm_chat_completions.dspy @@ -0,0 +1,239 @@ +def _escape(value): + if value is None: + return None + return str(value).replace("'", "''") + + +def _parse_bool(value, default=True): + if value is None or value == '': + return default + if isinstance(value, bool): + return value + return str(value).lower() in ('1', 'true', 'yes', 'on') + + +def _parse_messages(ns): + """解析历史消息:支持 list 或 JSON 字符串""" + raw = ns.get('messages') + if not raw: + return [] + if isinstance(raw, list): + return raw + if isinstance(raw, str): + import json + try: + return json.loads(raw) + except Exception: + return [] + return [] + + +def build_user_content(ns): + """ + 构建单条 user 消息的 content,支持文本 / 图片 / 文档链接。 + + 参数(可组合): + message / text 文本 + image_url 图片 URL + image_base64 图片 base64(不含 data: 前缀) + document_url 文档 URL(以 file 类型传给兼容接口) + document_text 文档纯文本(拼入 text) + """ + text_parts = [] + if ns.get('message'): + text_parts.append(str(ns.get('message'))) + if ns.get('text'): + text_parts.append(str(ns.get('text'))) + if ns.get('document_text'): + text_parts.append(str(ns.get('document_text'))) + + parts = [] + merged_text = '\n'.join([p for p in text_parts if p]).strip() + if merged_text: + parts.append({'type': 'text', 'text': merged_text}) + + if ns.get('image_url'): + parts.append({ + 'type': 'image_url', + 'image_url': {'url': ns.get('image_url')}, + }) + if ns.get('image_base64'): + mime = ns.get('image_mime') or 'image/jpeg' + b64 = ns.get('image_base64') + if not str(b64).startswith('data:'): + b64 = 'data:%s;base64,%s' % (mime, b64) + parts.append({ + 'type': 'image_url', + 'image_url': {'url': b64}, + }) + if ns.get('document_url'): + parts.append({ + 'type': 'file', + 'file': {'file_url': ns.get('document_url')}, + }) + + if not parts: + return '' + if len(parts) == 1 and parts[0]['type'] == 'text': + return parts[0]['text'] + return parts + + +async def _resolve_chat_config(ns, sor): + """解析 API 地址与 Bearer Token""" + api_url = ns.get('api_url') + api_key = ns.get('api_key') + + if not api_url and ns.get('model_id'): + doc_rows = await sor.sqlExe( + "SELECT api_url FROM model_api_doc WHERE model_id = '%s' LIMIT 1;" + % _escape(ns.get('model_id')), + {}, + ) + if doc_rows and doc_rows[0].get('api_url'): + api_url = doc_rows[0]['api_url'] + if not str(api_url).endswith('/chat/completions'): + api_url = str(api_url).rstrip('/') + '/chat/completions' + + if not api_url: + param_rows = await sor.R('params', {'pname': 'cntoai_llm_chat_url'}) + if param_rows: + api_url = param_rows[0]['pvalue'] + else: + domain_rows = await sor.R('params', {'pname': 'cntoai_domain'}) + if domain_rows: + api_url = domain_rows[0]['pvalue'].rstrip('/') + '/llmage/v1/chat/completions' + else: + api_url = 'https://ai.atvoe.com/llmage/v1/chat/completions' + + if not api_key: + userid = ns.get('userid') or await get_user() + if userid: + action = ns.get('apikey_action') or 'user_self_create' + keys = await sor.R('user_api_keys', {'userid': userid, 'action': action}) + if not keys: + keys = await sor.R('user_api_keys', {'userid': userid, 'action': 'sync'}) + if keys: + api_key = keys[0].get('opc_apikey') + if not api_key: + key_rows = await sor.R('params', {'pname': 'cntoai_llm_api_key'}) + if key_rows: + api_key = key_rows[0]['pvalue'] + + return api_url, api_key + + +async def _read_stream_response(response): + """解析 SSE 流式响应,汇总 assistant 文本""" + import json + chunks = [] + buffer = '' + async for raw in response.content: + buffer += raw.decode('utf-8', errors='ignore') + while '\n' in buffer: + line, buffer = buffer.split('\n', 1) + line = line.strip() + if not line.startswith('data:'): + continue + data = line[5:].strip() + if data == '[DONE]': + return ''.join(chunks) + try: + payload = json.loads(data) + choice = (payload.get('choices') or [{}])[0] + delta = choice.get('delta') or {} + piece = delta.get('content') or '' + if piece: + chunks.append(piece) + except Exception: + continue + return ''.join(chunks) + + +async def llm_chat_completions(ns={}): + """ + OpenAI 兼容 chat/completions(aiohttp)。 + + 参数: + model (str) 模型名,必填 + message / text 当前用户文本 + messages 历史消息 JSON 数组或 list,多轮对话 + stream (bool) 是否流式,默认 True + image_url / image_base64 图片 + document_url / document_text 文档 + api_url / api_key 可覆盖默认配置 + model_id 从 model_api_doc 读取 api_url + userid 用于查 user_api_keys + """ + import aiohttp + import json + import traceback + + model = ns.get('model') + if not model: + return {'status': False, 'msg': 'model is required'} + + stream = _parse_bool(ns.get('stream'), True) + history = _parse_messages(ns) + user_content = build_user_content(ns) + if not user_content and not history: + return {'status': False, 'msg': 'message is required'} + + messages = list(history) + if user_content: + messages.append({'role': 'user', 'content': user_content}) + + payload = { + 'model': model, + 'stream': stream, + 'messages': messages, + } + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + api_url, api_key = await _resolve_chat_config(ns, sor) + if not api_key: + return {'status': False, 'msg': '未找到 API Key,请先创建或配置 cntoai_llm_api_key'} + + headers = { + 'Content-Type': 'application/json', + 'Authorization': 'Bearer %s' % api_key, + } + + async with aiohttp.ClientSession() as session: + async with session.post(api_url, headers=headers, json=payload) as response: + if response.status != 200: + err_text = await response.text() + return { + 'status': False, + 'msg': '模型请求失败 HTTP %s: %s' % (response.status, err_text[:500]), + } + + if stream: + reply = await _read_stream_response(response) + usage = {} + else: + body = await response.json() + choice = (body.get('choices') or [{}])[0] + msg = choice.get('message') or {} + reply = msg.get('content') or '' + usage = body.get('usage') or {} + + return { + 'status': True, + 'msg': 'chat success', + 'data': { + 'model': model, + 'reply': reply, + 'messages': messages + [{'role': 'assistant', 'content': reply}], + 'usage': usage, + 'stream': stream, + }, + } + except Exception: + return {'status': False, 'msg': 'chat failed, %s' % traceback.format_exc()} + + +ret = await llm_chat_completions(params_kw) +return ret diff --git a/b/cntoai/process_user_billing.dspy b/b/cntoai/process_user_billing.dspy index 03964fd..9204e7c 100644 --- a/b/cntoai/process_user_billing.dspy +++ b/b/cntoai/process_user_billing.dspy @@ -26,6 +26,7 @@ async def _charge_order(sor, orderid, order_type='NEW'): """ order_rows = await sor.R('bz_order', {'id': orderid}) if not order_rows: + debug(f"订单不存在") return {'status': 'error', 'msg': '订单不存在'} order_row = order_rows[0] @@ -38,10 +39,11 @@ async def _charge_order(sor, orderid, order_type='NEW'): count = 0 if count - float(order_row['amount']) < 0: pricedifference = count - round(order_row['amount'], 2) + debug(f"账户余额不足,订单金额: {order_row['amount']}, 账户余额: {count}, 差额: {pricedifference}") return { 'status': 'error', 'msg': '账户余额不足', - 'pricedifference': round(pricedifference, 2), + 'pricedifference': round(pricedifference, 10), } await order2bill(orderid, sor) @@ -121,8 +123,10 @@ async def _charge_order(sor, orderid, order_type='NEW'): # ) # await sor.C('customer_goods', nss) + debug(f"支付成功") return {'status': True, 'msg': '支付成功'} except Exception as error: + debug(f"支付失败: {error}") return {'status': 'error', 'msg': str(error)} async def calc_price_by_saleprotocol(sor, org, product_id, supply_price, quantity=1): @@ -142,11 +146,14 @@ async def calc_price_by_saleprotocol(sor, org, product_id, supply_price, quantit supply_price = abs(float(supply_price)) quantity = int(quantity) except (TypeError, ValueError): + debug(f"calc_price_by_saleprotocol supply_price / quantity 必须为有效数字") return {'status': 'error', 'msg': 'supply_price / quantity 必须为有效数字'} if supply_price <= 0: + debug(f"calc_price_by_saleprotocol supply_price 必须大于 0") return {'status': 'error', 'msg': 'supply_price 必须大于 0'} if quantity <= 0: + debug(f"calc_price_by_saleprotocol quantity 必须大于 0") return {'status': 'error', 'msg': 'quantity 必须大于 0'} saleprotocol_to_person = await sor.R( @@ -193,12 +200,13 @@ async def calc_price_by_saleprotocol(sor, org, product_id, supply_price, quantit ) if not product_salemode: + debug(f"calc_price_by_saleprotocol 还未上线这个产品的协议配置") return {'status': 'error', 'msg': '还未上线这个产品的协议配置'} discount = product_salemode[0]['discount'] list_price = supply_price - price = abs(round(list_price * discount, 2)) - amount = abs(round(price * quantity, 2)) + price = abs(round(list_price * discount, 12)) + amount = abs(round(price * quantity, 12)) return { 'status': True, @@ -246,17 +254,20 @@ async def process_user_billing(ns={}): llmid = ns.get('llmid') if not llmid: + debug(f"{userid} process_user_billing llmid必传") return { 'status': 'error', 'msg': 'llmid必传' } try: - amount = round(float(amount), 2) + amount = round(float(amount), 12) except (TypeError, ValueError): + debug(f"{userid} process_user_billing amount 必须为有效数字") return {'status': 'error', 'msg': 'amount 必须为有效数字'} if amount <= 0: + debug(f"{userid} process_user_billing amount 必须大于 0") return {'status': 'error', 'msg': 'amount 必须大于 0'} db = DBPools() @@ -264,6 +275,7 @@ async def process_user_billing(ns={}): try: product_li = await sor.R('product', {'providerpid': llmid, 'del_flg': '0'}) if not product_li: + debug(f"{userid} process_user_billing 未找到对应产品,请确认") return { 'status': 'error', 'msg': '未找到对应产品,请确认' @@ -273,26 +285,30 @@ async def process_user_billing(ns={}): providerid = product['providerid'] providername_list = await sor.R('organization', {'id': providerid}) if not providername_list: + debug(f"{userid} process_user_billing 厂商不存在 %s" % providername) return { 'status': 'error', 'msg': '厂商不存在 %s' % providername } providername = providername_list[0]['orgname'] - userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey}) - if not userid_li: - return { - 'status': 'error', - 'msg': 'apikey无效,请联系管理员' - } + # userid_li = await sor.R('user_api_keys', {'opc_apikey': apikey}) + # if not userid_li: + # debug(f"{userid} process_user_billing apikey无效,请联系管理员") + # return { + # 'status': 'error', + # 'msg': 'apikey无效,请联系管理员' + # } # userid = userid_li[0]['userid'] user_list = await sor.R('users', {'id': userid}) if not user_list: + debug(f"{userid} process_user_billing 用户不存在 %s" % userid) return {'status': 'error', 'msg': '用户不存在 %s' % userid} org_list = await sor.R('organization', {'id': user_list[0]['orgid']}) if not org_list: + debug(f"{userid} process_user_billing 用户所属机构不存在") return {'status': 'error', 'msg': '用户所属机构不存在'} customerid = org_list[0]['id'] @@ -329,7 +345,7 @@ async def process_user_billing(ns={}): return { 'status': 'error', 'msg': '账户余额不足', - 'pricedifference': round(balance - amount, 2), + 'pricedifference': round(balance - amount, 12), } order_id = uuid() @@ -343,7 +359,7 @@ async def process_user_billing(ns={}): 'order_date': now_str, 'source': providername, 'amount': amount, - 'originalprice': round(originalprice, 2), + 'originalprice': round(originalprice, 12), 'ordertype': 'prepay', 'servicename': productname, } diff --git a/b/cntoai/sync_cn_ai_user.dspy b/b/cntoai/sync_cn_ai_user.dspy index 96e9181..f5ab279 100644 --- a/b/cntoai/sync_cn_ai_user.dspy +++ b/b/cntoai/sync_cn_ai_user.dspy @@ -19,9 +19,6 @@ async def sync_cn_ai_user(ns={}): email = user_info[0]['email'] debug(f"sync_cn_ai_user同步用户: {userid}, {orgid}, {username}, {name}, {email}") - already_sync_user_key = '2i68AZ81di_q5f8AySDrJ' - already_sync_user_dappid = 'cndemo' - # 目标URL # domain 从数据库params表中获取到pname=cntoai_domain的pvalue值 domain = None @@ -36,6 +33,24 @@ async def sync_cn_ai_user(ns={}): 'status': False, 'msg': '未找到域名' } + already_sync_user_key = await sor.R('params', {'pname': 'cntoai_already_sync_user_key'}) + if already_sync_user_key: + already_sync_user_key = already_sync_user_key[0]['pvalue'] + else: + debug(f"sync_cn_ai_user未找到已同步用户key") + return { + 'status': False, + 'msg': '未找到已同步用户key' + } + already_sync_user_dappid = await sor.R('params', {'pname': 'cntoai_already_sync_user_dappid'}) + if already_sync_user_dappid: + already_sync_user_dappid = already_sync_user_dappid[0]['pvalue'] + else: + debug(f"sync_cn_ai_user未找到已同步用户dappid") + return { + 'status': False, + 'msg': '未找到已同步用户dappid' + } url = f"{domain}/rbac/usersync" diff --git a/b/customer/forgotPassword.dspy b/b/customer/forgotPassword.dspy new file mode 100644 index 0000000..b12e8b5 --- /dev/null +++ b/b/customer/forgotPassword.dspy @@ -0,0 +1,70 @@ +async def forgotPassword(ns): + """ + 忘记密码:校验短信验证码后重置密码。 + + 参数: + id (str) 用户ID(找回验证码接口返回的 userid) + password (str) 新密码 + codeid (str) 验证码ID + vcode (str) 验证码 + + 也可传 mobile 或 username 定位用户(未传 id 时)。 + """ + import re + import traceback + + if not ns.get('password'): + return {'status': False, 'msg': '新密码不能为空'} + if len(ns.get('password')) < 8 or not re.search(r'[a-zA-Z]', ns.get('password')) or not re.search(r'[0-9]', ns.get('password')): + return {'status': False, 'msg': '密码至少8位,包含大小写字母、特殊字符、数字'} + if not ns.get('codeid'): + return {'status': False, 'msg': '验证码ID不能为空'} + if not ns.get('vcode'): + return {'status': False, 'msg': '验证码不能为空'} + + db = DBPools() + async with db.sqlorContext('kboss') as sor: + try: + code = await sor.R('validatecode', {'id': ns.get('codeid'), 'vcode': ns.get('vcode')}) + if code: + create_at = code[0]['create_at'] + now = datetime.datetime.now() + create_at_dt = datetime.datetime.strptime(create_at, "%Y-%m-%d %H:%M:%S") + if (now - create_at_dt).seconds > 500: + return {'status': False, 'msg': '验证码过期'} + else: + return {'status': False, 'msg': '验证码不正确'} + + user = None + if ns.get('id'): + users = await sor.R('users', {'id': ns.get('id'), 'del_flg': '0'}) + if users: + user = users[0] + elif ns.get('mobile'): + users = await sor.R('users', {'mobile': ns.get('mobile'), 'del_flg': '0'}) + if users: + user = users[0] + elif ns.get('username'): + users = await sor.R('users', {'username': ns.get('username'), 'del_flg': '0'}) + if not users: + users = await sor.R('users', {'mobile': ns.get('username'), 'del_flg': '0'}) + if users: + user = users[0] + else: + return {'status': False, 'msg': '用户标识不能为空'} + + if not user: + return {'status': False, 'msg': '用户不存在'} + + new_password = password_encode(ns['password']) + update_sql = """UPDATE users SET password = '%s' WHERE id = '%s';""" % (new_password, user['id']) + await sor.sqlExe(update_sql, {}) + return {'status': True, 'msg': '密码重置成功'} + except Exception as error: + debug(f"forgotPassword 错误: {error}") + debug(f"forgotPassword 错误堆栈: {traceback.format_exc()}") + return {'status': False, 'msg': '密码重置失败, %s' % str(error)} + + +ret = await forgotPassword(params_kw) +return ret diff --git a/b/customer/registerUser.dspy b/b/customer/registerUser.dspy index 0118a5a..7fe3897 100644 --- a/b/customer/registerUser.dspy +++ b/b/customer/registerUser.dspy @@ -1,14 +1,40 @@ async def sync_cn_ai_user(userid=None, orgid=None, username=None, name=None, email=None): import aiohttp - debug(f"sync_cn_ai_user同步用户: {userid}, {orgid}, {username}, {name}, {email}") - already_sync_user_key = '2i68AZ81di_q5f8AySDrJ' - already_sync_user_dappid = 'cndemo' - - # 目标URL - url = "https://ai.atvoe.com/rbac/usersync" - # url = 'https://ai.atvoe.com/tmp/env.dspy' + # domain 从数据库params表中获取到pname=cntoai_domain的pvalue值 + domain = None + db = DBPools() + async with db.sqlorContext('kboss') as sor: + domain = await sor.R('params', {'pname': 'cntoai_domain'}) + if domain: + domain = domain[0]['pvalue'] + else: + debug(f"sync_cn_ai_user未找到域名") + return { + 'status': False, + 'msg': '未找到域名' + } + already_sync_user_key = await sor.R('params', {'pname': 'cntoai_already_sync_user_key'}) + if already_sync_user_key: + already_sync_user_key = already_sync_user_key[0]['pvalue'] + else: + debug(f"sync_cn_ai_user未找到已同步用户key") + return { + 'status': False, + 'msg': '未找到已同步用户key' + } + already_sync_user_dappid = await sor.R('params', {'pname': 'cntoai_already_sync_user_dappid'}) + if already_sync_user_dappid: + already_sync_user_dappid = already_sync_user_dappid[0]['pvalue'] + else: + debug(f"sync_cn_ai_user未找到已同步用户dappid") + return { + 'status': False, + 'msg': '未找到已同步用户dappid' + } + + url = f"{domain}/rbac/usersync" # 请求头 headers = { @@ -82,7 +108,6 @@ async def sync_cn_ai_user(userid=None, orgid=None, username=None, name=None, ema 'msg': f"sync_cn_ai_user{userid}同步用户失败: {e}" } - async def registerUser(ns): """ 用户注册 diff --git a/b/user/mobilecode.dspy b/b/user/mobilecode.dspy index 5c23f9a..3bb2a31 100644 --- a/b/user/mobilecode.dspy +++ b/b/user/mobilecode.dspy @@ -112,6 +112,18 @@ async def mobilecode(ns): return {'status': False, 'msg': '发送失败'} else: return {'status': False, 'action': 'redirect', 'msg': '用户未注册, 请到注册页面注册'} + + # 忘记密码逻辑:检查手机号是否存在 + elif action_type == 'forgotpassword': + if len(userreacs) >= 1: + code = await generate_vcode() + nss = await send_vcode(userreacs[0]['mobile'], '用户注册登录验证', {'SMSvCode': code.get('vcode')}) + if nss['status']: + return {'status': True, 'msg': '验证码发送成功', 'codeid': code.get('id')} + else: + return {'status': False, 'msg': '发送失败'} + else: + return {'status': False, 'action': 'redirect', 'msg': '用户未注册, 请到注册页面注册'} # 原有逻辑:如果没有指定action_type,保持原有逻辑 else: