From cd35153bdeafbcbe839a6fdbdb9e59c95ac737ab Mon Sep 17 00:00:00 2001 From: wangmeihua <13383952685@163.com> Date: Mon, 21 Jul 2025 17:07:13 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- llmengine/base_db.py | 33 ++ llmengine/db_service.py | 532 ++++++++++++++++++++++++ llmengine/milvus_db.py | 823 ++++++++++++++++++++++++++++++++++++++ llmengine/requirments.txt | 0 4 files changed, 1388 insertions(+) create mode 100644 llmengine/base_db.py create mode 100644 llmengine/db_service.py create mode 100644 llmengine/milvus_db.py delete mode 100644 llmengine/requirments.txt diff --git a/llmengine/base_db.py b/llmengine/base_db.py new file mode 100644 index 0000000..68d71dc --- /dev/null +++ b/llmengine/base_db.py @@ -0,0 +1,33 @@ +from typing import Dict +from appPublic.log import debug, error, info + +connection_pathMap = {} + +def connection_register(connection_key, Klass): + """为给定的数据库注册一个数据库类""" + global connection_pathMap + connection_pathMap[connection_key] = Klass + info(f"Registered {connection_key} with class {Klass}") + +def get_connection_class(connection_path): + """根据连接路径查找对应的连接类""" + global connection_pathMap + debug(f"connection_pathMap: {connection_pathMap}") + klass = connection_pathMap.get(connection_path) + if klass is None: + error(f"{connection_path} has not mapping to a connection class") + raise Exception(f"{connection_path} has not mapping to a connection class") + return klass + +class BaseDBConnection: + async def handle_connection(self, action: str, params: Dict = None) -> Dict: + """默认的数据库操作处理方法,子类可重写""" + if params is None: + params = {} + return { + "status": "error", + "message": f"Action {action} not implemented in {self.__class__.__name__}", + "collection_name": "", + "document_id": "", + "status_code": 400 + } \ No newline at end of file diff --git a/llmengine/db_service.py b/llmengine/db_service.py new file mode 100644 index 0000000..6ed65a2 --- /dev/null +++ b/llmengine/db_service.py @@ -0,0 +1,532 @@ +import argparse +from aiohttp import web +from llmengine.base_db import get_connection_class +from llmengine.milvus_db import MilvusDBConnection +from appPublic.registerfunction import RegisterFunction +from appPublic.log import debug, error, info +from ahserver.serverenv import ServerEnv +from ahserver.webapp import webserver +import os +import json + +helptext = """Milvus Database Service API (Port 8886): + +1. Create Collection Endpoint: +path: /v1/createcollection +method: POST +headers: {"Content-Type": "application/json"} +data: { + "db_type": "textdb" // 可选,若不提供则使用默认集合 ragdb +} +response: +- Success: HTTP 200, {"status": "success", "collection_name": "ragdb" or "ragdb_textdb", "message": "集合 ragdb 或 ragdb_textdb 创建成功"} +- Error: HTTP 400, {"status": "error", "collection_name": "ragdb" or "ragdb_textdb", "message": ""} + +2. Delete Collection Endpoint: +path: /v1/deletecollection +method: POST +headers: {"Content-Type": "application/json"} +data: { + "db_type": "textdb" // 可选,若不提供则删除默认集合 ragdb +} +response: +- Success: HTTP 200, {"status": "success", "collection_name": "ragdb" or "ragdb_textdb", "message": "集合 ragdb 或 ragdb_textdb 删除成功"} +- Success (collection does not exist): HTTP 200, {"status": "success", "collection_name": "ragdb" or "ragdb_textdb", "message": "集合 ragdb 或 ragdb_textdb 不存在,无需删除"} +- Error: HTTP 400, {"status": "error", "collection_name": "ragdb" or "ragdb_textdb", "message": ""} + +3. Insert Document Endpoint: +path: /v1/insertdocument +method: POST +headers: {"Content-Type": "application/json"} +data: { + "userid": "user123", + "knowledge_base_id": "kb123", + "document_id": "", // 可选,若不提供则自动生成 + "texts": ["text1", "text2", ...], + "embeddings": [[float, ...], [float, ...], ...], // 长度为 1024 的向量 + "filename": "file.txt", + "file_path": "/path/to/file.txt", + "upload_time": "", // 可选,若不提供则使用当前时间 + "file_type": "txt", + "db_type": "textdb" // 可选,若不提供则使用默认集合 ragdb +} +response: +- Success: HTTP 200, {"status": "success", "document_id": "", "collection_name": "ragdb" or "ragdb_textdb", "message": "成功插入 个文档到 ", "status_code": 200} +- Error: HTTP 400, {"status": "error", "document_id": "", "collection_name": "ragdb" or "ragdb_textdb", "message": "", "status_code": 400} + +4. Delete Document Endpoint: +path: /v1/deletedocument +method: POST +headers: {"Content-Type": "application/json"} +data: { + "userid": "user123", + "filename": "file.txt", + "knowledge_base_id": "kb123", + "db_type": "textdb" // 可选,若不提供则使用默认集合 ragdb +} +response: +- Success: HTTP 200, {"status": "success", "document_id": "", "collection_name": "ragdb" or "ragdb_textdb", "message": "成功删除 条 Milvus 记录,userid=, filename=, knowledge_base_id=", "status_code": 200} +- Success (no records): HTTP 200, {"status": "success", "document_id": "", "collection_name": "ragdb" or "ragdb_textdb", "message": "没有找到 userid=, filename=, knowledge_base_id= 的记录,无需删除", "status_code": 200} +- Success (collection missing): HTTP 200, {"status": "success", "document_id": "", "collection_name": "ragdb" or "ragdb_textdb", "message": "集合 不存在,无需删除", "status_code": 200} +- Error: HTTP 400, {"status": "error", "document_id": "", "collection_name": "ragdb" or "ragdb_textdb", "message": "", "status_code": 400} + +5. Delete Knowledge Base Endpoint: +path: /v1/deleteknowledgebase +method: POST +headers: {"Content-Type": "application/json"} +data: { + "userid": "user123", + "knowledge_base_id": "kb123", + "db_type": "textdb" // 可选,若不提供则使用默认集合 ragdb +} +response: +- Success: HTTP 200, {"status": "success", "collection_name": "ragdb" or "ragdb_textdb", "deleted_files": ["file1.txt", "file2.pdf"], "message": "成功删除 条 Milvus 记录,删除文件: , userid=, knowledge_base_id=", "status_code": 200} +- Success (no records): HTTP 200, {"status": "success", "collection_name": "ragdb" or "ragdb_textdb", "deleted_files": [], "message": "没有找到 userid=, knowledge_base_id= 的记录,无需删除", "status_code": 200} +- Success (collection missing): HTTP 200, {"status": "success", "collection_name": "ragdb" or "ragdb_textdb", "deleted_files": [], "message": "集合 不存在,无需删除", "status_code": 200} +- Error: HTTP 400, {"status": "error", "collection_name": "ragdb" or "ragdb_textdb", "deleted_files": [], "message": "", "status_code": 400} + +6. Search Query Endpoint: +path: /v1/searchquery +method: POST +headers: {"Content-Type": "application/json"} +data: { + "query_vector": [float, ...], // 长度为 1024 的向量 + "userid": "user1", + "knowledge_base_ids": ["kb123"], + "limit": 5, + "offset": 0, + "db_type": "textdb" // 可选,若不提供则使用默认集合 ragdb +} +response: +- Success: HTTP 200, { + "status": "success", + "results": [ + { + "text": "<完整文本内容>", + "distance": 0.95, + "source": "vector_query", + "metadata": { + "userid": "user1", + "document_id": "", + "filename": "file.txt", + "file_path": "/path/to/file.txt", + "upload_time": "", + "file_type": "txt" + } + }, + ... + ], + "timing": { + "collection_load": , + "vector_search": , + "deduplication": , + "total_time": + }, + "collection_name": "ragdb" or "ragdb_textdb" +} +- Error: HTTP 400, { + "status": "error", + "message": "", + "collection_name": "ragdb" or "ragdb_textdb" +} + +7. List User Files Endpoint: +path: /v1/listuserfiles +method: POST +headers: {"Content-Type": "application/json"} +data: { + "userid": "user1", + "db_type": "textdb" // 可选,若不提供则使用默认集合 ragdb +} +response: +- Success: HTTP 200, { + "status": "success", + "files_by_knowledge_base": { + "kb123": [ + { + "document_id": "", + "filename": "file1.txt", + "file_path": "/path/to/file1.txt", + "upload_time": "", + "file_type": "txt", + "knowledge_base_id": "kb123" + }, + ... + ], + "kb456": [...] + }, + "collection_name": "ragdb" or "ragdb_textdb" +} +- Error: HTTP 400, { + "status": "error", + "message": "", + "collection_name": "ragdb" or "ragdb_textdb" +} + +8. List All Knowledge Bases Endpoint: +path: /v1/listallknowledgebases +method: POST +headers: {"Content-Type": "application/json"} +data: { + "db_type": "textdb" // 可选,若不提供则使用默认集合 ragdb +} +response: +- Success: HTTP 200, { + "status": "success", + "users_knowledge_bases": { + "user1": { + "kb123": [ + { + "document_id": "", + "filename": "file1.txt", + "file_path": "/path/to/file1.txt", + "upload_time": "", + "file_type": "txt", + "knowledge_base_id": "kb123" + }, + ... + ], + "kb456": [...] + }, + "user2": {...} + }, + "collection_name": "ragdb" or "ragdb_textdb", + "message": "成功列出 个用户的知识库和文件", + "status_code": 200 +} +- Error: HTTP 400, { + "status": "error", + "users_knowledge_bases": {}, + "collection_name": "ragdb" or "ragdb_textdb", + "message": "", + "status_code": 400 +} + +9. Connection Endpoint (for compatibility): +path: /v1/connection +method: POST +headers: {"Content-Type": "application/json"} +data: { + "action": "", + "params": {...} +} +response: +- Success: HTTP 200, {"status": "success", ...} +- Error: HTTP 400, {"status": "error", "message": ""} + +10. Docs Endpoint: +path: /docs +method: GET +response: This help text +""" + +def init(): + rf = RegisterFunction() + rf.register('createcollection', create_collection) + rf.register('deletecollection', delete_collection) + rf.register('insertdocument', insert_document) + rf.register('deletedocument', delete_document) + rf.register('deleteknowledgebase', delete_knowledge_base) + rf.register('searchquery', search_query) + rf.register('listuserfiles', list_user_files) + rf.register('listallknowledgebases', list_all_knowledge_bases) + rf.register('connection', handle_connection) + rf.register('docs', docs) + +async def docs(request, params_kw, *params, **kw): + return web.Response(text=helptext, content_type='text/plain') + +async def create_collection(request, params_kw, *params, **kw): + debug(f'{params_kw=}') + se = ServerEnv() + engine = se.engine + db_type = params_kw.get('db_type', '') + collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" + try: + result = await engine.handle_connection("create_collection", {"db_type": db_type}) + debug(f'{result=}') + return web.json_response(result, dumps=lambda obj: json.dumps(obj, ensure_ascii=False)) + except Exception as e: + error(f'创建集合失败: {str(e)}') + return web.json_response({ + "status": "error", + "collection_name": collection_name, + "message": str(e) + }, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=400) + +async def delete_collection(request, params_kw, *params, **kw): + debug(f'{params_kw=}') + se = ServerEnv() + engine = se.engine + db_type = params_kw.get('db_type', '') + collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" + try: + result = await engine.handle_connection("delete_collection", {"db_type": db_type}) + debug(f'{result=}') + return web.json_response(result, dumps=lambda obj: json.dumps(obj, ensure_ascii=False)) + except Exception as e: + error(f'删除集合失败: {str(e)}') + return web.json_response({ + "status": "error", + "collection_name": collection_name, + "message": str(e) + }, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=400) + +async def insert_document(request, params_kw, *params, **kw): + debug(f'Received params: {params_kw=}') + se = ServerEnv() + engine = se.engine + userid = params_kw.get('userid', '') + knowledge_base_id = params_kw.get('knowledge_base_id', '') + document_id = params_kw.get('document_id', '') + texts = params_kw.get('texts', []) + embeddings = params_kw.get('embeddings', []) + filename = params_kw.get('filename', '') + file_path = params_kw.get('file_path', '') + upload_time = params_kw.get('upload_time', '') + file_type = params_kw.get('file_type', '') + db_type = params_kw.get('db_type', '') + collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" + try: + required_fields = ['userid', 'knowledge_base_id', 'texts', 'embeddings'] + missing_fields = [field for field in required_fields if field not in params_kw or not params_kw[field]] + if missing_fields: + raise ValueError(f"缺少必填字段: {', '.join(missing_fields)}") + + result = await engine.handle_connection("insert_document", { + "userid": userid, + "knowledge_base_id": knowledge_base_id, + "document_id": document_id, + "texts": texts, + "embeddings": embeddings, + "filename": filename, + "file_path": file_path, + "upload_time": upload_time, + "file_type": file_type, + "db_type": db_type + }) + debug(f'Insert result: {result=}') + status = 200 if result.get("status") == "success" else 400 + return web.json_response(result, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=status) + except Exception as e: + error(f'插入文档失败: {str(e)}') + return web.json_response({ + "status": "error", + "document_id": "", + "collection_name": collection_name, + "message": str(e), + "status_code": 400 + }, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=400) + +async def delete_document(request, params_kw, *params, **kw): + debug(f'Received delete_document params: {params_kw=}') + se = ServerEnv() + engine = se.engine + userid = params_kw.get('userid', '') + filename = params_kw.get('filename', '') + knowledge_base_id = params_kw.get('knowledge_base_id', '') + db_type = params_kw.get('db_type', '') + collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" + try: + required_fields = ['userid', 'filename', 'knowledge_base_id'] + missing_fields = [field for field in required_fields if field not in params_kw or not params_kw[field]] + if missing_fields: + raise ValueError(f"缺少必填字段: {', '.join(missing_fields)}") + + result = await engine.handle_connection("delete_document", { + "userid": userid, + "filename": filename, + "knowledge_base_id": knowledge_base_id, + "db_type": db_type + }) + debug(f'Delete result: {result=}') + status = 200 if result.get("status") == "success" else 400 + return web.json_response(result, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=status) + except Exception as e: + error(f'删除文档失败: {str(e)}') + return web.json_response({ + "status": "error", + "collection_name": collection_name, + "document_id": "", + "message": str(e), + "status_code": 400 + }, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=400) + +async def delete_knowledge_base(request, params_kw, *params, **kw): + debug(f'Received delete_knowledge_base params: {params_kw=}') + se = ServerEnv() + engine = se.engine + userid = params_kw.get('userid', '') + knowledge_base_id = params_kw.get('knowledge_base_id', '') + db_type = params_kw.get('db_type', '') + collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" + try: + required_fields = ['userid', 'knowledge_base_id'] + missing_fields = [field for field in required_fields if field not in params_kw or not params_kw[field]] + if missing_fields: + raise ValueError(f"缺少必填字段: {', '.join(missing_fields)}") + + result = await engine.handle_connection("delete_knowledge_base", { + "userid": userid, + "knowledge_base_id": knowledge_base_id, + "db_type": db_type + }) + debug(f'Delete knowledge base result: {result=}') + status = 200 if result.get("status") == "success" else 400 + return web.json_response(result, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=status) + except Exception as e: + error(f'删除知识库失败: {str(e)}') + return web.json_response({ + "status": "error", + "collection_name": collection_name, + "deleted_files": [], + "message": str(e), + "status_code": 400 + }, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=400) + +async def search_query(request, params_kw, *params, **kw): + debug(f'{params_kw=}') + se = ServerEnv() + engine = se.engine + query_vector = params_kw.get('query_vector', []) + userid = params_kw.get('userid', '') + knowledge_base_ids = params_kw.get('knowledge_base_ids', []) + limit = params_kw.get('limit', 5) + offset = params_kw.get('offset', 0) + db_type = params_kw.get('db_type', '') + collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" + try: + if not query_vector or not userid or not knowledge_base_ids: + debug(f'query_vector, userid 或 knowledge_base_ids 未提供') + return web.json_response({ + "status": "error", + "message": "query_vector, userid 或 knowledge_base_ids 未提供", + "collection_name": collection_name + }, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=400) + result = await engine.handle_connection("search_query", { + "query_vector": query_vector, + "userid": userid, + "knowledge_base_ids": knowledge_base_ids, + "limit": limit, + "offset": offset, + "db_type": db_type + }) + debug(f'{result=}') + response = { + "status": "success", + "results": result.get("results", []), + "timing": result.get("timing", {}), + "collection_name": collection_name + } + return web.json_response(response, dumps=lambda obj: json.dumps(obj, ensure_ascii=False)) + except Exception as e: + error(f'向量搜索失败: {str(e)}') + return web.json_response({ + "status": "error", + "message": str(e), + "collection_name": collection_name + }, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=400) + +async def list_user_files(request, params_kw, *params, **kw): + debug(f'{params_kw=}') + se = ServerEnv() + engine = se.engine + userid = params_kw.get('userid', '') + db_type = params_kw.get('db_type', '') + collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" + try: + if not userid: + debug(f'userid 未提供') + return web.json_response({ + "status": "error", + "message": "userid 未提供", + "collection_name": collection_name + }, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=400) + result = await engine.handle_connection("list_user_files", { + "userid": userid, + "db_type": db_type + }) + debug(f'{result=}') + response = { + "status": "success", + "files_by_knowledge_base": result, + "collection_name": collection_name + } + return web.json_response(response, dumps=lambda obj: json.dumps(obj, ensure_ascii=False)) + except Exception as e: + error(f'列出用户文件失败: {str(e)}') + return web.json_response({ + "status": "error", + "message": str(e), + "collection_name": collection_name + }, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=400) + +async def list_all_knowledge_bases(request, params_kw, *params, **kw): + debug(f'{params_kw=}') + se = ServerEnv() + engine = se.engine + db_type = params_kw.get('db_type', '') + collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" + try: + result = await engine.handle_connection("list_all_knowledge_bases", { + "db_type": db_type + }) + debug(f'{result=}') + response = { + "status": result.get("status", "success"), + "users_knowledge_bases": result.get("users_knowledge_bases", {}), + "collection_name": collection_name, + "message": result.get("message", ""), + "status_code": result.get("status_code", 200) + } + return web.json_response(response, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=response["status_code"]) + except Exception as e: + error(f'列出所有用户知识库失败: {str(e)}') + return web.json_response({ + "status": "error", + "users_knowledge_bases": {}, + "collection_name": collection_name, + "message": str(e), + "status_code": 400 + }, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=400) + +async def handle_connection(request, params_kw, *params, **kw): + debug(f'{params_kw=}') + se = ServerEnv() + engine = se.engine + try: + data = await request.json() + action = data.get('action') + if not action: + debug(f'action 未提供') + return web.json_response({ + "status": "error", + "message": "action 参数未提供" + }, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=400) + result = await engine.handle_connection(action, data.get('params', {})) + debug(f'{result=}') + return web.json_response(result, dumps=lambda obj: json.dumps(obj, ensure_ascii=False)) + except Exception as e: + error(f'处理连接操作失败: {str(e)}') + return web.json_response({ + "status": "error", + "message": str(e) + }, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=400) + +def main(): + parser = argparse.ArgumentParser(prog="Milvus Database Service") + parser.add_argument('-w', '--workdir') + parser.add_argument('-p', '--port', default='8886') + parser.add_argument('connection_path') + args = parser.parse_args() + debug(f"Arguments: {args}") + Klass = get_connection_class(args.connection_path) + se = ServerEnv() + se.engine = Klass() + workdir = args.workdir or os.getcwd() + port = args.port + debug(f'{args=}') + webserver(init, workdir, port) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/llmengine/milvus_db.py b/llmengine/milvus_db.py new file mode 100644 index 0000000..600d99e --- /dev/null +++ b/llmengine/milvus_db.py @@ -0,0 +1,823 @@ +from appPublic.jsonConfig import getConfig +import os +from appPublic.log import debug, error, info +from pymilvus import connections, utility, Collection, CollectionSchema, FieldSchema, DataType +from threading import Lock +from typing import Dict, List, Any +import uuid +from datetime import datetime +from llmengine.base_db import connection_register, BaseDBConnection + +class MilvusDBConnection(BaseDBConnection): + _instance = None + _lock = Lock() + + def __new__(cls): + with cls._lock: + if cls._instance is None: + cls._instance = super(MilvusDBConnection, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + try: + config = getConfig() + self.db_path = config['milvus_db'] + except KeyError as e: + error(f"配置文件缺少必要字段: {str(e)}") + raise RuntimeError(f"配置文件缺少必要字段: {str(e)}") + self._initialize_connection() + self._initialized = True + info(f"MilvusDBConnection initialized with db_path: {self.db_path}") + + def _initialize_connection(self): + """初始化 Milvus 连接,确保单一连接""" + try: + db_dir = os.path.dirname(self.db_path) + if not os.path.exists(db_dir): + os.makedirs(db_dir, exist_ok=True) + debug(f"创建 Milvus 目录: {db_dir}") + if not os.access(db_dir, os.W_OK): + raise RuntimeError(f"Milvus 目录 {db_dir} 不可写") + if not connections.has_connection("default"): + connections.connect("default", uri=self.db_path) + debug(f"已连接到 Milvus Lite,路径: {self.db_path}") + else: + debug("已存在 Milvus 连接,跳过重复连接") + except Exception as e: + error(f"连接 Milvus 失败: {str(e)}") + raise RuntimeError(f"连接 Milvus 失败: {str(e)}") + + async def handle_connection(self, action: str, params: Dict = None) -> Dict: + """处理数据库操作""" + try: + debug(f"处理操作: action={action}, params={params}") + if not params: + params = {} + db_type = params.get("db_type", "") + collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" + if db_type and "_" in db_type: + return {"status": "error", "message": "db_type 不能包含下划线", "collection_name": collection_name, + "document_id": "", "status_code": 400} + if db_type and len(db_type) > 100: + return {"status": "error", "message": "db_type 的长度应小于 100", "collection_name": collection_name, + "document_id": "", "status_code": 400} + + if action == "initialize": + return {"status": "success", "message": f"Milvus 连接已初始化,路径: {self.db_path}"} + elif action == "get_params": + return {"status": "success", "params": {"uri": self.db_path}} + elif action == "create_collection": + return await self._create_collection(db_type) + elif action == "delete_collection": + return await self._delete_collection(db_type) + elif action == "insert_document": + userid = params.get("userid", "") + knowledge_base_id = params.get("knowledge_base_id", "") + document_id = params.get("document_id", str(uuid.uuid4())) + texts = params.get("texts", []) + embeddings = params.get("embeddings", []) + filename = params.get("filename", "") + file_path = params.get("file_path", "") + upload_time = params.get("upload_time", datetime.now().isoformat()) + file_type = params.get("file_type", "") + if not userid or not knowledge_base_id or not texts or not embeddings: + return {"status": "error", "message": "userid、knowledge_base_id、texts 和 embeddings 不能为空", + "collection_name": collection_name, "document_id": "", "status_code": 400} + if "_" in userid or "_" in knowledge_base_id: + return {"status": "error", "message": "userid 和 knowledge_base_id 不能包含下划线", + "collection_name": collection_name, "document_id": "", "status_code": 400} + if len(knowledge_base_id) > 100 or len(userid) > 100: + return {"status": "error", "message": "userid 或 knowledge_base_id 的长度应小于 100", + "collection_name": collection_name, "document_id": "", "status_code": 400} + return await self._insert_document(collection_name, userid, knowledge_base_id, document_id, texts, embeddings, + filename, file_path, upload_time, file_type) + elif action == "delete_document": + userid = params.get("userid", "") + filename = params.get("filename", "") + knowledge_base_id = params.get("knowledge_base_id", "") + if not userid or not filename or not knowledge_base_id: + return {"status": "error", "message": "userid、filename 和 knowledge_base_id 不能为空", + "collection_name": collection_name, "document_id": "", "status_code": 400} + if "_" in userid or "_" in knowledge_base_id: + return {"status": "error", "message": "userid 和 knowledge_base_id 不能包含下划线", + "collection_name": collection_name, "document_id": "", "status_code": 400} + if len(userid) > 100 or len(filename) > 255 or len(knowledge_base_id) > 100: + return {"status": "error", "message": "userid、filename 或 knowledge_base_id 的长度超出限制", + "collection_name": collection_name, "document_id": "", "status_code": 400} + return await self._delete_document(db_type, userid, filename, knowledge_base_id) + elif action == "delete_knowledge_base": + userid = params.get("userid", "") + knowledge_base_id = params.get("knowledge_base_id", "") + if not userid or not knowledge_base_id: + return {"status": "error", "message": "userid 和 knowledge_base_id 不能为空", + "collection_name": collection_name, "document_id": "", "status_code": 400} + if "_" in userid or "_" in knowledge_base_id: + return {"status": "error", "message": "userid 和 knowledge_base_id 不能包含下划线", + "collection_name": collection_name, "document_id": "", "status_code": 400} + if len(userid) > 100 or len(knowledge_base_id) > 100: + return {"status": "error", "message": "userid 或 knowledge_base_id 的长度超出限制", + "collection_name": collection_name, "document_id": "", "status_code": 400} + return await self._delete_knowledge_base(db_type, userid, knowledge_base_id) + elif action == "search_query": + query_vector = params.get("query_vector", []) + userid = params.get("userid", "") + knowledge_base_ids = params.get("knowledge_base_ids", []) + limit = params.get("limit", 5) + offset = params.get("offset", 0) + if not query_vector or not userid or not knowledge_base_ids: + return {"status": "error", "message": "query_vector、userid 或 knowledge_base_ids 不能为空", + "collection_name": collection_name, "document_id": "", "status_code": 400} + if limit < 1 or limit > 16384: + return {"status": "error", "message": "limit 必须在 1 到 16384 之间", + "collection_name": collection_name, "document_id": "", "status_code": 400} + return await self._search_query(collection_name, query_vector, userid, knowledge_base_ids, limit, offset) + elif action == "list_user_files": + userid = params.get("userid", "") + if not userid: + return {"status": "error", "message": "userid 不能为空", "collection_name": collection_name, + "document_id": "", "status_code": 400} + return await self._list_user_files(userid, db_type) + elif action == "list_all_knowledge_bases": + return await self._list_all_knowledge_bases(db_type) + else: + return {"status": "error", "message": f"未知的 action: {action}", "collection_name": collection_name, + "document_id": "", "status_code": 400} + except Exception as e: + error(f"处理操作失败: action={action}, 错误: {str(e)}") + return { + "status": "error", + "message": f"服务器错误: {str(e)}", + "collection_name": collection_name, + "document_id": "", + "status_code": 400 + } + + async def _create_collection(self, db_type: str = "") -> Dict: + """创建 Milvus 集合""" + try: + collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" + if len(collection_name) > 255: + raise ValueError(f"集合名称 {collection_name} 超过 255 个字符") + if db_type and "_" in db_type: + raise ValueError("db_type 不能包含下划线") + if db_type and len(db_type) > 100: + raise ValueError("db_type 的长度应小于 100") + debug(f"集合名称: {collection_name}") + + fields = [ + FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, max_length=36, auto_id=True), + FieldSchema(name="userid", dtype=DataType.VARCHAR, max_length=100), + FieldSchema(name="knowledge_base_id", dtype=DataType.VARCHAR, max_length=100), + FieldSchema(name="document_id", dtype=DataType.VARCHAR, max_length=36), + FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535), + FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=1024), + FieldSchema(name="filename", dtype=DataType.VARCHAR, max_length=255), + FieldSchema(name="file_path", dtype=DataType.VARCHAR, max_length=1024), + FieldSchema(name="upload_time", dtype=DataType.VARCHAR, max_length=64), + FieldSchema(name="file_type", dtype=DataType.VARCHAR, max_length=64), + ] + schema = CollectionSchema( + fields=fields, + description="统一数据集合,包含用户ID、知识库ID、document_id 和元数据字段", + auto_id=True, + primary_field="pk", + ) + + if utility.has_collection(collection_name): + try: + collection = Collection(collection_name) + existing_schema = collection.schema + expected_fields = {f.name for f in fields} + actual_fields = {f.name for f in existing_schema.fields} + vector_field = next((f for f in existing_schema.fields if f.name == "vector"), None) + + schema_compatible = False + if expected_fields == actual_fields and vector_field is not None and vector_field.dtype == DataType.FLOAT_VECTOR: + dim = vector_field.params.get('dim', None) if hasattr(vector_field, 'params') and vector_field.params else None + schema_compatible = dim == 1024 + debug(f"检查集合 {collection_name} 的 schema: 字段匹配={expected_fields == actual_fields}, " + f"vector_field存在={vector_field is not None}, dtype={vector_field.dtype if vector_field else '无'}, " + f"dim={dim if dim is not None else '未定义'}") + if not schema_compatible: + debug(f"集合 {collection_name} 的 schema 不兼容,原因: " + f"字段不匹配: {expected_fields.symmetric_difference(actual_fields) or '无'}, " + f"vector_field: {vector_field is not None}, " + f"dtype: {vector_field.dtype if vector_field else '无'}, " + f"dim: {vector_field.params.get('dim', '未定义') if vector_field and hasattr(vector_field, 'params') and vector_field.params else '未定义'}") + utility.drop_collection(collection_name) + else: + collection.load() + debug(f"集合 {collection_name} 已存在并加载成功") + return { + "status": "success", + "collection_name": collection_name, + "message": f"集合 {collection_name} 已存在" + } + except Exception as e: + error(f"加载集合 {collection_name} 失败: {str(e)}") + return { + "status": "error", + "collection_name": collection_name, + "message": str(e) + } + + try: + collection = Collection(collection_name, schema) + collection.create_index( + field_name="vector", + index_params={"index_type": "AUTOINDEX", "metric_type": "COSINE"} + ) + for field in ["userid", "knowledge_base_id", "document_id", "filename", "file_path", "upload_time", "file_type"]: + collection.create_index( + field_name=field, + index_params={"index_type": "INVERTED"} + ) + collection.load() + debug(f"成功创建并加载集合: {collection_name}") + return { + "status": "success", + "collection_name": collection_name, + "message": f"集合 {collection_name} 创建成功" + } + except Exception as e: + error(f"创建集合 {collection_name} 失败: {str(e)}") + return { + "status": "error", + "collection_name": collection_name, + "message": str(e) + } + except Exception as e: + error(f"创建集合失败: {str(e)}") + return { + "status": "error", + "collection_name": collection_name, + "message": str(e) + } + + async def _delete_collection(self, db_type: str = "") -> Dict: + """删除 Milvus 集合""" + try: + collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" + if len(collection_name) > 255: + raise ValueError(f"集合名称 {collection_name} 超过 255 个字符") + if db_type and "_" in db_type: + raise ValueError("db_type 不能包含下划线") + if db_type and len(db_type) > 100: + raise ValueError("db_type 的长度应小于 100") + debug(f"集合名称: {collection_name}") + + if not utility.has_collection(collection_name): + debug(f"集合 {collection_name} 不存在") + return { + "status": "success", + "collection_name": collection_name, + "message": f"集合 {collection_name} 不存在,无需删除" + } + + try: + utility.drop_collection(collection_name) + debug(f"成功删除集合: {collection_name}") + return { + "status": "success", + "collection_name": collection_name, + "message": f"集合 {collection_name} 删除成功" + } + except Exception as e: + error(f"删除集合 {collection_name} 失败: {str(e)}") + return { + "status": "error", + "collection_name": collection_name, + "message": str(e) + } + except Exception as e: + error(f"删除集合失败: {str(e)}") + return { + "status": "error", + "collection_name": collection_name, + "message": str(e) + } + + async def _insert_document(self, collection_name: str, userid: str, knowledge_base_id: str, document_id: str, + texts: List[str], embeddings: List[List[float]], filename: str, file_path: str, + upload_time: str, file_type: str) -> Dict[str, Any]: + """插入文档到 Milvus""" + try: + # 检查集合是否存在 + create_result = await self._create_collection(collection_name.split('_')[-1] if '_' in collection_name else "") + if create_result["status"] == "error": + raise RuntimeError(f"集合创建失败: {create_result['message']}") + + # 检查输入数据 + if len(texts) != len(embeddings): + raise ValueError("texts 和 embeddings 的长度必须一致") + if not all(isinstance(emb, list) and len(emb) == 1024 for emb in embeddings): + raise ValueError("embeddings 必须是长度为 1024 的浮点数列表") + + # 插入 Milvus + collection = Collection(collection_name) + collection.load() + data = { + "userid": [userid] * len(texts), + "knowledge_base_id": [knowledge_base_id] * len(texts), + "document_id": [document_id] * len(texts), + "text": texts, + "vector": embeddings, + "filename": [filename] * len(texts), + "file_path": [file_path] * len(texts), + "upload_time": [upload_time] * len(texts), + "file_type": [file_type] * len(texts), + } + collection.insert([data[field.name] for field in collection.schema.fields if field.name != "pk"]) + collection.flush() + debug(f"成功插入 {len(texts)} 个文档到集合 {collection_name}") + return { + "status": "success", + "document_id": document_id, + "collection_name": collection_name, + "message": f"成功插入 {len(texts)} 个文档到 {collection_name}", + "status_code": 200 + } + except Exception as e: + error(f"插入文档失败: {str(e)}") + return { + "status": "error", + "document_id": document_id, + "collection_name": collection_name, + "message": f"插入文档失败: {str(e)}", + "status_code": 400 + } + + async def _delete_document(self, db_type: str, userid: str, filename: str, knowledge_base_id: str) -> Dict[str, Any]: + """删除用户指定文件数据,仅处理 Milvus 记录""" + collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" + try: + if not utility.has_collection(collection_name): + debug(f"集合 {collection_name} 不存在") + return { + "status": "success", + "collection_name": collection_name, + "document_id": "", + "message": f"集合 {collection_name} 不存在,无需删除", + "status_code": 200 + } + + try: + collection = Collection(collection_name) + collection.load() + debug(f"加载集合: {collection_name}") + except Exception as e: + error(f"加载集合 {collection_name} 失败: {str(e)}") + return { + "status": "error", + "collection_name": collection_name, + "document_id": "", + "message": f"加载集合失败: {str(e)}", + "status_code": 400 + } + + expr = f"userid == '{userid}' and filename == '{filename}' and knowledge_base_id == '{knowledge_base_id}'" + debug(f"查询表达式: {expr}") + try: + results = collection.query( + expr=expr, + output_fields=["document_id"], + limit=1000 + ) + if not results: + debug( + f"没有找到 userid={userid}, filename={filename}, knowledge_base_id={knowledge_base_id} 的记录") + return { + "status": "success", + "collection_name": collection_name, + "document_id": "", + "message": f"没有找到 userid={userid}, filename={filename}, knowledge_base_id={knowledge_base_id} 的记录,无需删除", + "status_code": 200 + } + document_ids = list(set(result["document_id"] for result in results if "document_id" in result)) + debug(f"找到 {len(document_ids)} 个 document_id: {document_ids}") + except Exception as e: + error(f"查询 document_id 失败: {str(e)}") + return { + "status": "error", + "collection_name": collection_name, + "document_id": "", + "message": f"查询失败: {str(e)}", + "status_code": 400 + } + + total_deleted = 0 + for doc_id in document_ids: + try: + delete_expr = f"document_id == '{doc_id}'" + debug(f"删除表达式: {delete_expr}") + delete_result = collection.delete(delete_expr) + deleted_count = delete_result.delete_count + total_deleted += deleted_count + info(f"成功删除 document_id={doc_id} 的 {deleted_count} 条 Milvus 记录") + except Exception as e: + error(f"删除 document_id={doc_id} 的 Milvus 记录失败: {str(e)}") + continue + + if total_deleted == 0: + debug( + f"没有删除任何 Milvus 记录,userid={userid}, filename={filename}, knowledge_base_id={knowledge_base_id}") + return { + "status": "success", + "collection_name": collection_name, + "document_id": "", + "message": f"没有删除任何记录,userid={userid}, filename={filename}, knowledge_base_id={knowledge_base_id}", + "status_code": 200 + } + + info( + f"总计删除 {total_deleted} 条 Milvus 记录,userid={userid}, filename={filename}, knowledge_base_id={knowledge_base_id}") + return { + "status": "success", + "collection_name": collection_name, + "document_id": ",".join(document_ids), + "message": f"成功删除 {total_deleted} 条 Milvus 记录,userid={userid}, filename={filename}, knowledge_base_id={knowledge_base_id}", + "status_code": 200 + } + + except Exception as e: + error(f"删除文档失败: {str(e)}") + return { + "status": "error", + "collection_name": collection_name, + "document_id": "", + "message": f"删除文档失败: {str(e)}", + "status_code": 400 + } + + async def _delete_knowledge_base(self, db_type: str, userid: str, knowledge_base_id: str) -> Dict[str, Any]: + """删除用户的整个知识库,仅处理 Milvus 记录""" + collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" + try: + if not utility.has_collection(collection_name): + debug(f"集合 {collection_name} 不存在") + return { + "status": "success", + "collection_name": collection_name, + "deleted_files": [], + "message": f"集合 {collection_name} 不存在,无需删除", + "status_code": 200 + } + + try: + collection = Collection(collection_name) + debug(f"加载集合: {collection_name}") + except Exception as e: + error(f"加载集合 {collection_name} 失败: {str(e)}") + return { + "status": "error", + "collection_name": collection_name, + "deleted_files": [], + "message": f"加载集合失败: {str(e)}", + "status_code": 400 + } + + deleted_files = [] + try: + expr = f"userid == '{userid}' and knowledge_base_id == '{knowledge_base_id}'" + debug(f"查询表达式: {expr}") + results = collection.query( + expr=expr, + output_fields=["file_path"], + limit=1000 + ) + if results: + deleted_files = list(set(result["file_path"] for result in results if "file_path" in result)) + debug(f"找到 {len(deleted_files)} 个唯一文件: {deleted_files}") + else: + debug(f"没有找到 userid={userid}, knowledge_base_id={knowledge_base_id} 的记录") + except Exception as e: + error(f"查询 file_path 失败: {str(e)}") + return { + "status": "error", + "collection_name": collection_name, + "deleted_files": [], + "message": f"查询 file_path 失败: {str(e)}", + "status_code": 400 + } + + total_deleted = 0 + try: + delete_expr = f"userid == '{userid}' and knowledge_base_id == '{knowledge_base_id}'" + debug(f"删除表达式: {delete_expr}") + delete_result = collection.delete(delete_expr) + total_deleted = delete_result.delete_count + info(f"成功删除 {total_deleted} 条 Milvus 记录") + except Exception as e: + error(f"删除 Milvus 记录失败: {str(e)}") + return { + "status": "error", + "collection_name": collection_name, + "deleted_files": deleted_files, + "message": f"删除 Milvus 记录失败: {str(e)}", + "status_code": 400 + } + + if total_deleted == 0: + debug(f"没有删除任何记录,userid={userid}, knowledge_base_id={knowledge_base_id}") + return { + "status": "success", + "collection_name": collection_name, + "deleted_files": [], + "message": f"没有找到 userid={userid}, knowledge_base_id={knowledge_base_id} 的记录,无需删除", + "status_code": 200 + } + + info( + f"总计删除 {total_deleted} 条 Milvus 记录,删除文件: {deleted_files}, userid={userid}, knowledge_base_id={knowledge_base_id}") + return { + "status": "success", + "collection_name": collection_name, + "deleted_files": deleted_files, + "message": f"成功删除 {total_deleted} 条 Milvus 记录,删除文件: {deleted_files}, userid={userid}, knowledge_base_id={knowledge_base_id}", + "status_code": 200 + } + + except Exception as e: + error(f"删除知识库失败: {str(e)}") + return { + "status": "error", + "collection_name": collection_name, + "deleted_files": [], + "message": f"删除知识库失败: {str(e)}", + "status_code": 400 + } + + async def _search_query(self, collection_name: str, query_vector: List[float], userid: str, + knowledge_base_ids: List[str], limit: int = 5, offset: int = 0) -> Dict[str, Any]: + """基于向量搜索 Milvus 集合""" + timing_stats = {} + start_time = time.time() + try: + if not query_vector or not isinstance(query_vector, list) or len(query_vector) != 1024: + raise ValueError("query_vector 必须是长度为 1024 的浮点数列表") + if not userid: + raise ValueError("userid 不能为空") + if not knowledge_base_ids: + raise ValueError("knowledge_base_ids 不能为空") + if "_" in userid: + raise ValueError("userid 不能包含下划线") + if len(userid) > 100: + raise ValueError("userid 的长度超出限制") + if limit <= 0 or limit > 16384: + raise ValueError("limit 必须在 1 到 16384 之间") + if offset < 0: + raise ValueError("offset 不能为负数") + if limit + offset > 16384: + raise ValueError("limit + offset 不能超过 16384") + for kb_id in knowledge_base_ids: + if not isinstance(kb_id, str): + raise ValueError(f"knowledge_base_id 必须是字符串: {kb_id}") + if len(kb_id) > 100: + raise ValueError(f"knowledge_base_id 长度超出 100 个字符: {kb_id}") + if "_" in kb_id: + raise ValueError(f"knowledge_base_id 不能包含下划线: {kb_id}") + + if not utility.has_collection(collection_name): + debug(f"集合 {collection_name} 不存在") + return {"results": [], "timing": timing_stats} + + try: + collection = Collection(collection_name) + collection.load() + debug(f"加载集合: {collection_name}") + timing_stats["collection_load"] = time.time() - start_time + debug(f"集合加载耗时: {timing_stats['collection_load']:.3f} 秒") + except Exception as e: + error(f"加载集合 {collection_name} 失败: {str(e)}") + return {"results": [], "timing": timing_stats} + + search_start = time.time() + search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}} + kb_id_expr = " or ".join([f"knowledge_base_id == '{kb_id}'" for kb_id in knowledge_base_ids]) + expr = f"userid == '{userid}' and ({kb_id_expr})" + debug(f"搜索表达式: {expr}") + + try: + results = collection.search( + data=[query_vector], + anns_field="vector", + param=search_params, + limit=100, + expr=expr, + output_fields=["text", "userid", "document_id", "filename", "file_path", "upload_time", + "file_type"], + offset=offset + ) + except Exception as e: + error(f"搜索失败: {str(e)}") + return {"results": [], "timing": timing_stats} + timing_stats["vector_search"] = time.time() - search_start + debug(f"向量搜索耗时: {timing_stats['vector_search']:.3f} 秒") + + search_results = [] + for hits in results: + for hit in hits: + metadata = { + "userid": hit.entity.get("userid"), + "document_id": hit.entity.get("document_id"), + "filename": hit.entity.get("filename"), + "file_path": hit.entity.get("file_path"), + "upload_time": hit.entity.get("upload_time"), + "file_type": hit.entity.get("file_type") + } + result = { + "text": hit.entity.get("text"), + "distance": hit.distance, + "source": "vector_query", + "metadata": metadata + } + search_results.append(result) + debug( + f"命中: text={result['text'][:100]}..., distance={hit.distance}, filename={metadata['filename']}") + + dedup_start = time.time() + unique_results = [] + seen_texts = set() + for result in sorted(search_results, key=lambda x: x['distance'], reverse=True): + if result['text'] not in seen_texts: + unique_results.append(result) + seen_texts.add(result['text']) + timing_stats["deduplication"] = time.time() - dedup_start + debug(f"去重耗时: {timing_stats['deduplication']:.3f} 秒") + info(f"去重后结果数量: {len(unique_results)} (原始数量: {len(search_results)})") + + timing_stats["total_time"] = time.time() - start_time + info(f"向量搜索完成,返回 {len(unique_results)} 条结果,总耗时: {timing_stats['total_time']:.3f} 秒") + return {"results": unique_results[:limit], "timing": timing_stats} + + except Exception as e: + error(f"向量搜索失败: {str(e)}") + return {"results": [], "timing": timing_stats} + + async def _list_user_files(self, userid: str, db_type: str = "") -> Dict[str, List[Dict]]: + """列出用户的所有知识库及其文件,按 knowledge_base_id 分组""" + collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" + try: + info(f"列出用户文件: userid={userid}, db_type={db_type}") + + if not userid: + raise ValueError("userid 不能为空") + if "_" in userid or (db_type and "_" in db_type): + raise ValueError("userid 和 db_type 不能包含下划线") + if (db_type and len(db_type) > 100) or len(userid) > 100: + raise ValueError("userid 或 db_type 的长度超出限制") + + if not utility.has_collection(collection_name): + debug(f"集合 {collection_name} 不存在") + return {} + + try: + collection = Collection(collection_name) + collection.load() + debug(f"加载集合: {collection_name}") + except Exception as e: + error(f"加载集合 {collection_name} 失败: {str(e)}") + return {} + + expr = f"userid == '{userid}'" + debug(f"查询表达式: {expr}") + + try: + results = collection.query( + expr=expr, + output_fields=["document_id", "filename", "file_path", "upload_time", "file_type", "knowledge_base_id"], + limit=1000 + ) + except Exception as e: + error(f"查询用户文件失败: {str(e)}") + return {} + + files_by_kb = {} + seen_document_ids = set() + for result in results: + document_id = result.get("document_id") + kb_id = result.get("knowledge_base_id") + if document_id not in seen_document_ids: + seen_document_ids.add(document_id) + file_info = { + "document_id": document_id, + "filename": result.get("filename"), + "file_path": result.get("file_path"), + "upload_time": result.get("upload_time"), + "file_type": result.get("file_type"), + "knowledge_base_id": kb_id + } + if kb_id not in files_by_kb: + files_by_kb[kb_id] = [] + files_by_kb[kb_id].append(file_info) + debug(f"找到文件: document_id={document_id}, filename={result.get('filename')}, knowledge_base_id={kb_id}") + + info(f"找到 {len(seen_document_ids)} 个文件,userid={userid}, 知识库数量={len(files_by_kb)}") + return files_by_kb + + except Exception as e: + error(f"列出用户文件失败: {str(e)}") + return {} + + async def _list_all_knowledge_bases(self, db_type: str = "") -> Dict[str, Any]: + """列出数据库中所有用户的知识库及其文件,按用户分组""" + collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" + try: + info(f"列出所有用户的知识库: db_type={db_type}") + + if db_type and "_" in db_type: + raise ValueError("db_type 不能包含下划线") + if db_type and len(db_type) > 100: + raise ValueError("db_type 的长度应小于 100") + + if not utility.has_collection(collection_name): + debug(f"集合 {collection_name} 不存在") + return { + "status": "success", + "users_knowledge_bases": {}, + "collection_name": collection_name, + "message": f"集合 {collection_name} 不存在", + "status_code": 200 + } + + try: + collection = Collection(collection_name) + collection.load() + debug(f"加载集合: {collection_name}") + except Exception as e: + error(f"加载集合 {collection_name} 失败: {str(e)}") + return { + "status": "error", + "users_knowledge_bases": {}, + "collection_name": collection_name, + "message": f"加载集合失败: {str(e)}", + "status_code": 400 + } + + expr = "userid != ''" + debug(f"查询表达式: {expr}") + try: + results = collection.query( + expr=expr, + output_fields=["userid", "knowledge_base_id", "document_id", "filename", "file_path", "upload_time", + "file_type"], + limit=10000 + ) + except Exception as e: + error(f"查询所有用户文件失败: {str(e)}") + return { + "status": "error", + "users_knowledge_bases": {}, + "collection_name": collection_name, + "message": f"查询失败: {str(e)}", + "status_code": 400 + } + + users_knowledge_bases = {} + seen_document_ids = set() + for result in results: + userid = result.get("userid") + kb_id = result.get("knowledge_base_id") + document_id = result.get("document_id") + if document_id not in seen_document_ids: + seen_document_ids.add(document_id) + file_info = { + "document_id": document_id, + "filename": result.get("filename"), + "file_path": result.get("file_path"), + "upload_time": result.get("upload_time"), + "file_type": result.get("file_type"), + "knowledge_base_id": kb_id + } + if userid not in users_knowledge_bases: + users_knowledge_bases[userid] = {} + if kb_id not in users_knowledge_bases[userid]: + users_knowledge_bases[userid][kb_id] = [] + users_knowledge_bases[userid][kb_id].append(file_info) + debug( + f"找到文件: userid={userid}, knowledge_base_id={kb_id}, document_id={document_id}, filename={result.get('filename')}") + + info(f"找到 {len(seen_document_ids)} 个文件,涉及 {len(users_knowledge_bases)} 个用户") + return { + "status": "success", + "users_knowledge_bases": users_knowledge_bases, + "collection_name": collection_name, + "message": f"成功列出 {len(users_knowledge_bases)} 个用户的知识库和文件", + "status_code": 200 + } + + except Exception as e: + error(f"列出所有用户知识库失败: {str(e)}") + return { + "status": "error", + "users_knowledge_bases": {}, + "collection_name": collection_name, + "message": f"列出所有用户知识库失败: {str(e)}", + "status_code": 400 + } + +connection_register('Milvus', MilvusDBConnection) +info("MilvusDBConnection registered") \ No newline at end of file diff --git a/llmengine/requirments.txt b/llmengine/requirments.txt deleted file mode 100644 index e69de29..0000000