From 85d80ab5573933b5bad909bc64e8f4249613be9e Mon Sep 17 00:00:00 2001 From: wangmeihua <13383952685@163.com> Date: Tue, 5 Aug 2025 17:23:59 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9connection=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rag/connection.py | 22 +++++++----- rag/milvus_connection.py | 73 +++++++++++++++++++--------------------- 2 files changed, 47 insertions(+), 48 deletions(-) diff --git a/rag/connection.py b/rag/connection.py index 06f37ec..ef62065 100644 --- a/rag/connection.py +++ b/rag/connection.py @@ -357,20 +357,22 @@ async def insert_file(request, params_kw, *params, **kw): userid = params_kw.get('userid', '') db_type = params_kw.get('db_type', '') knowledge_base_id = params_kw.get('knowledge_base_id', '') + document_id = params_kw.get('document_id', '') collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" try: - required_fields = ['file_path', 'userid', 'knowledge_base_id'] + required_fields = ['file_path', 'userid', 'knowledge_base_id', 'document_id'] missing_fields = [field for field in required_fields if field not in params_kw or not params_kw[field]] if missing_fields: raise ValueError(f"缺少必填字段: {', '.join(missing_fields)}") debug( - f'Calling insert_document with: file_path={file_path}, userid={userid}, db_type={db_type}, knowledge_base_id={knowledge_base_id}') + f'Calling insert_document with: file_path={file_path}, userid={userid}, db_type={db_type}, knowledge_base_id={knowledge_base_id}, document_id={document_id}') result = await engine.handle_connection("insert_document", { "file_path": file_path, "userid": userid, "db_type": db_type, - "knowledge_base_id": knowledge_base_id + "knowledge_base_id": knowledge_base_id, + "document_id": document_id }) debug(f'Insert result: {result=}') status = 200 if result.get("status") == "success" else 400 @@ -380,7 +382,7 @@ async def insert_file(request, params_kw, *params, **kw): return web.json_response({ "status": "error", "collection_name": collection_name, - "document_id": "", + "document_id": document_id, "message": str(e) }, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=400) @@ -389,21 +391,23 @@ async def delete_file(request, params_kw, *params, **kw): se = ServerEnv() engine = se.engine userid = params_kw.get('userid', '') - filename = params_kw.get('filename', '') + file_path = params_kw.get('file_path', '') db_type = params_kw.get('db_type', '') knowledge_base_id = params_kw.get('knowledge_base_id', '') + document_id = params_kw.get('document_id', '') collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" try: - required_fields = ['userid', 'filename', 'knowledge_base_id'] + required_fields = ['userid', 'file_path', 'knowledge_base_id', 'document_id'] missing_fields = [field for field in required_fields if field not in params_kw or not params_kw[field]] if missing_fields: raise ValueError(f"缺少必填字段: {', '.join(missing_fields)}") - debug(f'Calling delete_document with: userid={userid}, filename={filename}, db_type={db_type}, knowledge_base_id={knowledge_base_id}') + debug(f'Calling delete_document with: userid={userid}, file_path={file_path}, db_type={db_type}, knowledge_base_id={knowledge_base_id}, document_id={document_id}') result = await engine.handle_connection("delete_document", { "userid": userid, - "filename": filename, + "file_path": file_path, "knowledge_base_id": knowledge_base_id, + "document_id": document_id, "db_type": db_type }) debug(f'Delete result: {result=}') @@ -414,7 +418,7 @@ async def delete_file(request, params_kw, *params, **kw): return web.json_response({ "status": "error", "collection_name": collection_name, - "document_id": "", + "document_id": document_id, "message": str(e), "status_code": 400 }, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=400) diff --git a/rag/milvus_connection.py b/rag/milvus_connection.py index 8f552b5..d5c7c43 100644 --- a/rag/milvus_connection.py +++ b/rag/milvus_connection.py @@ -108,30 +108,32 @@ class MilvusConnection: file_path = params.get("file_path", "") userid = params.get("userid", "") knowledge_base_id = params.get("knowledge_base_id", "") - if not file_path or not userid or not knowledge_base_id: - return {"status": "error", "message": "file_path、userid 和 knowledge_base_id 不能为空", + document_id = params.get("document_id", "") + if not file_path or not userid or not knowledge_base_id or not document_id: + return {"status": "error", "message": "file_path、userid document_id和 knowledge_base_id 不能为空", "collection_name": collection_name, "document_id": "", "status_code": 400} if "_" in userid or "_" in knowledge_base_id: return {"status": "error", "message": "userid 和 knowledge_base_id 不能包含下划线", - "collection_name": collection_name, "document_id": "", "status_code": 400} + "collection_name": collection_name, "document_id": document_id, "status_code": 400} if len(knowledge_base_id) > 100: return {"status": "error", "message": "knowledge_base_id 的长度应小于 100", "collection_name": collection_name, "document_id": "", "status_code": 400} - return await self._insert_document(file_path, userid, knowledge_base_id, db_type) + return await self._insert_document(file_path, userid, knowledge_base_id, document_id, db_type) elif action == "delete_document": userid = params.get("userid", "") - filename = params.get("filename", "") + file_path = params.get("file_path", "") knowledge_base_id = params.get("knowledge_base_id", "") - if not userid or not filename or not knowledge_base_id: - return {"status": "error", "message": "userid、filename 和 knowledge_base_id 不能为空", + document_id = params.get("document_id", "") + if not userid or not file_path or not knowledge_base_id or not document_id: + return {"status": "error", "message": "userid、file_path document_id和 knowledge_base_id 不能为空", "collection_name": collection_name, "document_id": "", "status_code": 400} if "_" in userid or "_" in knowledge_base_id: return {"status": "error", "message": "userid 和 knowledge_base_id 不能包含下划线", "collection_name": collection_name, "document_id": "", "status_code": 400} - if len(userid) > 100 or len(filename) > 255 or len(knowledge_base_id) > 100: - return {"status": "error", "message": "userid、filename 或 knowledge_base_id 的长度超出限制", + if len(userid) > 100 or len(file_path) > 255 or len(knowledge_base_id) > 100: + return {"status": "error", "message": "userid、file_path 或 knowledge_base_id 的长度超出限制", "collection_name": collection_name, "document_id": "", "status_code": 400} - return await self._delete_document(userid, filename, knowledge_base_id, db_type) + return await self._delete_document(userid, file_path, knowledge_base_id, document_id, db_type) elif action == "delete_knowledge_base": userid = params.get("userid", "") knowledge_base_id = params.get("knowledge_base_id", "") @@ -233,10 +235,9 @@ class MilvusConnection: "status_code": 400 } - async def _insert_document(self, file_path: str, userid: str, knowledge_base_id: str, db_type: str = "") -> Dict[ + async def _insert_document(self, file_path: str, userid: str, knowledge_base_id: str, document_id:str, db_type: str = "") -> Dict[ str, Any]: """将文档插入 Milvus 并抽取三元组到 Neo4j""" - document_id = str(uuid.uuid4()) collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" debug( f'Inserting document: file_path={file_path}, userid={userid}, db_type={db_type}, knowledge_base_id={knowledge_base_id}, document_id={document_id}') @@ -514,16 +515,17 @@ class MilvusConnection: debug(f"Request #{request_id} traceback: {traceback.format_exc()}") raise RuntimeError(f"三元组抽取服务调用失败: {str(e)}") - async def _delete_document(self, userid: str, filename: str, knowledge_base_id: str, db_type: str = "") -> Dict[str, Any]: + async def _delete_document(self, userid: str, file_path: str, knowledge_base_id: str, document_id:str, db_type: str = "") -> Dict[str, Any]: """删除用户指定文件数据,包括 Milvus 和 Neo4j 中的记录""" collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" try: # 调用 Milvus 删除文件端点 - debug(f"调用删除文件端点: userid={userid}, filename={filename}, knowledge_base_id={knowledge_base_id}") + debug(f"调用删除文件端点: userid={userid}, file_path={file_path}, knowledge_base_id={knowledge_base_id}, document_id={document_id}") milvus_result = await self._make_api_request("deletedocument", { "userid": userid, - "filename": filename, + "file_path": file_path, "knowledge_base_id": knowledge_base_id, + "document_id": document_id, "db_type": db_type }) @@ -531,36 +533,29 @@ class MilvusConnection: error(f"Milvus 删除文件失败: {milvus_result.get('message', '未知错误')}") return milvus_result - document_ids = milvus_result.get("document_id", "").split(",") if milvus_result.get("document_id") else [] - # 调用 Neo4j 删除端点 neo4j_deleted_nodes = 0 neo4j_deleted_rels = 0 - for doc_id in document_ids: - if not doc_id: - continue - try: - debug(f"调用 Neo4j 删除文档端点: document_id={doc_id}") - neo4j_result = await self._make_neo4japi_request("deletedocument", { - "document_id": doc_id - }) - if neo4j_result.get("status") != "success": - error( - f"Neo4j 删除文档失败: document_id={doc_id}, 错误: {neo4j_result.get('message', '未知错误')}") - continue - nodes_deleted = neo4j_result.get("nodes_deleted", 0) - rels_deleted = neo4j_result.get("rels_deleted", 0) - neo4j_deleted_nodes += nodes_deleted - neo4j_deleted_rels += rels_deleted - info(f"成功删除 document_id={doc_id} 的 {nodes_deleted} 个 Neo4j 节点和 {rels_deleted} 个关系") - except Exception as e: - error(f"删除 document_id={doc_id} 的 Neo4j 数据失败: {str(e)}") - continue + try: + debug(f"调用 Neo4j 删除文档端点: document_id={document_id}") + neo4j_result = await self._make_neo4japi_request("deletedocument", { + "document_id": document_id + }) + if neo4j_result.get("status") != "success": + error( + f"Neo4j 删除文档失败: document_id={document_id}, 错误: {neo4j_result.get('message', '未知错误')}") + nodes_deleted = neo4j_result.get("nodes_deleted", 0) + rels_deleted = neo4j_result.get("rels_deleted", 0) + neo4j_deleted_nodes += nodes_deleted + neo4j_deleted_rels += rels_deleted + info(f"成功删除 document_id={document_id} 的 {nodes_deleted} 个 Neo4j 节点和 {rels_deleted} 个关系") + except Exception as e: + error(f"删除 document_id={document_id} 的 Neo4j 数据失败: {str(e)}") return { "status": "success", "collection_name": collection_name, - "document_id": ",".join(document_ids), + "document_id": document_id, "message": f"成功删除 Milvus 记录,{neo4j_deleted_nodes} 个 Neo4j 节点,{neo4j_deleted_rels} 个 Neo4j 关系", "status_code": 200 } @@ -570,7 +565,7 @@ class MilvusConnection: return { "status": "error", "collection_name": collection_name, - "document_id": "", + "document_id": document_id, "message": f"删除文档失败: {str(e)}", "status_code": 400 }