修改connection服务

This commit is contained in:
wangmeihua 2025-08-05 17:23:59 +08:00
parent 29b326e53b
commit 85d80ab557
2 changed files with 47 additions and 48 deletions

View File

@ -357,20 +357,22 @@ async def insert_file(request, params_kw, *params, **kw):
userid = params_kw.get('userid', '') userid = params_kw.get('userid', '')
db_type = params_kw.get('db_type', '') db_type = params_kw.get('db_type', '')
knowledge_base_id = params_kw.get('knowledge_base_id', '') knowledge_base_id = params_kw.get('knowledge_base_id', '')
document_id = params_kw.get('document_id', '')
collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" collection_name = "ragdb" if not db_type else f"ragdb_{db_type}"
try: try:
required_fields = ['file_path', 'userid', 'knowledge_base_id'] required_fields = ['file_path', 'userid', 'knowledge_base_id', 'document_id']
missing_fields = [field for field in required_fields if field not in params_kw or not params_kw[field]] missing_fields = [field for field in required_fields if field not in params_kw or not params_kw[field]]
if missing_fields: if missing_fields:
raise ValueError(f"缺少必填字段: {', '.join(missing_fields)}") raise ValueError(f"缺少必填字段: {', '.join(missing_fields)}")
debug( debug(
f'Calling insert_document with: file_path={file_path}, userid={userid}, db_type={db_type}, knowledge_base_id={knowledge_base_id}') f'Calling insert_document with: file_path={file_path}, userid={userid}, db_type={db_type}, knowledge_base_id={knowledge_base_id}, document_id={document_id}')
result = await engine.handle_connection("insert_document", { result = await engine.handle_connection("insert_document", {
"file_path": file_path, "file_path": file_path,
"userid": userid, "userid": userid,
"db_type": db_type, "db_type": db_type,
"knowledge_base_id": knowledge_base_id "knowledge_base_id": knowledge_base_id,
"document_id": document_id
}) })
debug(f'Insert result: {result=}') debug(f'Insert result: {result=}')
status = 200 if result.get("status") == "success" else 400 status = 200 if result.get("status") == "success" else 400
@ -380,7 +382,7 @@ async def insert_file(request, params_kw, *params, **kw):
return web.json_response({ return web.json_response({
"status": "error", "status": "error",
"collection_name": collection_name, "collection_name": collection_name,
"document_id": "", "document_id": document_id,
"message": str(e) "message": str(e)
}, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=400) }, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=400)
@ -389,21 +391,23 @@ async def delete_file(request, params_kw, *params, **kw):
se = ServerEnv() se = ServerEnv()
engine = se.engine engine = se.engine
userid = params_kw.get('userid', '') userid = params_kw.get('userid', '')
filename = params_kw.get('filename', '') file_path = params_kw.get('file_path', '')
db_type = params_kw.get('db_type', '') db_type = params_kw.get('db_type', '')
knowledge_base_id = params_kw.get('knowledge_base_id', '') knowledge_base_id = params_kw.get('knowledge_base_id', '')
document_id = params_kw.get('document_id', '')
collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" collection_name = "ragdb" if not db_type else f"ragdb_{db_type}"
try: try:
required_fields = ['userid', 'filename', 'knowledge_base_id'] required_fields = ['userid', 'file_path', 'knowledge_base_id', 'document_id']
missing_fields = [field for field in required_fields if field not in params_kw or not params_kw[field]] missing_fields = [field for field in required_fields if field not in params_kw or not params_kw[field]]
if missing_fields: if missing_fields:
raise ValueError(f"缺少必填字段: {', '.join(missing_fields)}") raise ValueError(f"缺少必填字段: {', '.join(missing_fields)}")
debug(f'Calling delete_document with: userid={userid}, filename={filename}, db_type={db_type}, knowledge_base_id={knowledge_base_id}') debug(f'Calling delete_document with: userid={userid}, file_path={file_path}, db_type={db_type}, knowledge_base_id={knowledge_base_id}, document_id={document_id}')
result = await engine.handle_connection("delete_document", { result = await engine.handle_connection("delete_document", {
"userid": userid, "userid": userid,
"filename": filename, "file_path": file_path,
"knowledge_base_id": knowledge_base_id, "knowledge_base_id": knowledge_base_id,
"document_id": document_id,
"db_type": db_type "db_type": db_type
}) })
debug(f'Delete result: {result=}') debug(f'Delete result: {result=}')
@ -414,7 +418,7 @@ async def delete_file(request, params_kw, *params, **kw):
return web.json_response({ return web.json_response({
"status": "error", "status": "error",
"collection_name": collection_name, "collection_name": collection_name,
"document_id": "", "document_id": document_id,
"message": str(e), "message": str(e),
"status_code": 400 "status_code": 400
}, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=400) }, dumps=lambda obj: json.dumps(obj, ensure_ascii=False), status=400)

View File

@ -108,30 +108,32 @@ class MilvusConnection:
file_path = params.get("file_path", "") file_path = params.get("file_path", "")
userid = params.get("userid", "") userid = params.get("userid", "")
knowledge_base_id = params.get("knowledge_base_id", "") knowledge_base_id = params.get("knowledge_base_id", "")
if not file_path or not userid or not knowledge_base_id: document_id = params.get("document_id", "")
return {"status": "error", "message": "file_path、userid 和 knowledge_base_id 不能为空", if not file_path or not userid or not knowledge_base_id or not document_id:
return {"status": "error", "message": "file_path、userid document_id和 knowledge_base_id 不能为空",
"collection_name": collection_name, "document_id": "", "status_code": 400} "collection_name": collection_name, "document_id": "", "status_code": 400}
if "_" in userid or "_" in knowledge_base_id: if "_" in userid or "_" in knowledge_base_id:
return {"status": "error", "message": "userid 和 knowledge_base_id 不能包含下划线", return {"status": "error", "message": "userid 和 knowledge_base_id 不能包含下划线",
"collection_name": collection_name, "document_id": "", "status_code": 400} "collection_name": collection_name, "document_id": document_id, "status_code": 400}
if len(knowledge_base_id) > 100: if len(knowledge_base_id) > 100:
return {"status": "error", "message": "knowledge_base_id 的长度应小于 100", return {"status": "error", "message": "knowledge_base_id 的长度应小于 100",
"collection_name": collection_name, "document_id": "", "status_code": 400} "collection_name": collection_name, "document_id": "", "status_code": 400}
return await self._insert_document(file_path, userid, knowledge_base_id, db_type) return await self._insert_document(file_path, userid, knowledge_base_id, document_id, db_type)
elif action == "delete_document": elif action == "delete_document":
userid = params.get("userid", "") userid = params.get("userid", "")
filename = params.get("filename", "") file_path = params.get("file_path", "")
knowledge_base_id = params.get("knowledge_base_id", "") knowledge_base_id = params.get("knowledge_base_id", "")
if not userid or not filename or not knowledge_base_id: document_id = params.get("document_id", "")
return {"status": "error", "message": "userid、filename 和 knowledge_base_id 不能为空", if not userid or not file_path or not knowledge_base_id or not document_id:
return {"status": "error", "message": "userid、file_path document_id和 knowledge_base_id 不能为空",
"collection_name": collection_name, "document_id": "", "status_code": 400} "collection_name": collection_name, "document_id": "", "status_code": 400}
if "_" in userid or "_" in knowledge_base_id: if "_" in userid or "_" in knowledge_base_id:
return {"status": "error", "message": "userid 和 knowledge_base_id 不能包含下划线", return {"status": "error", "message": "userid 和 knowledge_base_id 不能包含下划线",
"collection_name": collection_name, "document_id": "", "status_code": 400} "collection_name": collection_name, "document_id": "", "status_code": 400}
if len(userid) > 100 or len(filename) > 255 or len(knowledge_base_id) > 100: if len(userid) > 100 or len(file_path) > 255 or len(knowledge_base_id) > 100:
return {"status": "error", "message": "userid、filename 或 knowledge_base_id 的长度超出限制", return {"status": "error", "message": "userid、file_path 或 knowledge_base_id 的长度超出限制",
"collection_name": collection_name, "document_id": "", "status_code": 400} "collection_name": collection_name, "document_id": "", "status_code": 400}
return await self._delete_document(userid, filename, knowledge_base_id, db_type) return await self._delete_document(userid, file_path, knowledge_base_id, document_id, db_type)
elif action == "delete_knowledge_base": elif action == "delete_knowledge_base":
userid = params.get("userid", "") userid = params.get("userid", "")
knowledge_base_id = params.get("knowledge_base_id", "") knowledge_base_id = params.get("knowledge_base_id", "")
@ -233,10 +235,9 @@ class MilvusConnection:
"status_code": 400 "status_code": 400
} }
async def _insert_document(self, file_path: str, userid: str, knowledge_base_id: str, db_type: str = "") -> Dict[ async def _insert_document(self, file_path: str, userid: str, knowledge_base_id: str, document_id:str, db_type: str = "") -> Dict[
str, Any]: str, Any]:
"""将文档插入 Milvus 并抽取三元组到 Neo4j""" """将文档插入 Milvus 并抽取三元组到 Neo4j"""
document_id = str(uuid.uuid4())
collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" collection_name = "ragdb" if not db_type else f"ragdb_{db_type}"
debug( debug(
f'Inserting document: file_path={file_path}, userid={userid}, db_type={db_type}, knowledge_base_id={knowledge_base_id}, document_id={document_id}') f'Inserting document: file_path={file_path}, userid={userid}, db_type={db_type}, knowledge_base_id={knowledge_base_id}, document_id={document_id}')
@ -514,16 +515,17 @@ class MilvusConnection:
debug(f"Request #{request_id} traceback: {traceback.format_exc()}") debug(f"Request #{request_id} traceback: {traceback.format_exc()}")
raise RuntimeError(f"三元组抽取服务调用失败: {str(e)}") raise RuntimeError(f"三元组抽取服务调用失败: {str(e)}")
async def _delete_document(self, userid: str, filename: str, knowledge_base_id: str, db_type: str = "") -> Dict[str, Any]: async def _delete_document(self, userid: str, file_path: str, knowledge_base_id: str, document_id:str, db_type: str = "") -> Dict[str, Any]:
"""删除用户指定文件数据,包括 Milvus 和 Neo4j 中的记录""" """删除用户指定文件数据,包括 Milvus 和 Neo4j 中的记录"""
collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" collection_name = "ragdb" if not db_type else f"ragdb_{db_type}"
try: try:
# 调用 Milvus 删除文件端点 # 调用 Milvus 删除文件端点
debug(f"调用删除文件端点: userid={userid}, filename={filename}, knowledge_base_id={knowledge_base_id}") debug(f"调用删除文件端点: userid={userid}, file_path={file_path}, knowledge_base_id={knowledge_base_id}, document_id={document_id}")
milvus_result = await self._make_api_request("deletedocument", { milvus_result = await self._make_api_request("deletedocument", {
"userid": userid, "userid": userid,
"filename": filename, "file_path": file_path,
"knowledge_base_id": knowledge_base_id, "knowledge_base_id": knowledge_base_id,
"document_id": document_id,
"db_type": db_type "db_type": db_type
}) })
@ -531,36 +533,29 @@ class MilvusConnection:
error(f"Milvus 删除文件失败: {milvus_result.get('message', '未知错误')}") error(f"Milvus 删除文件失败: {milvus_result.get('message', '未知错误')}")
return milvus_result return milvus_result
document_ids = milvus_result.get("document_id", "").split(",") if milvus_result.get("document_id") else []
# 调用 Neo4j 删除端点 # 调用 Neo4j 删除端点
neo4j_deleted_nodes = 0 neo4j_deleted_nodes = 0
neo4j_deleted_rels = 0 neo4j_deleted_rels = 0
for doc_id in document_ids:
if not doc_id:
continue
try: try:
debug(f"调用 Neo4j 删除文档端点: document_id={doc_id}") debug(f"调用 Neo4j 删除文档端点: document_id={document_id}")
neo4j_result = await self._make_neo4japi_request("deletedocument", { neo4j_result = await self._make_neo4japi_request("deletedocument", {
"document_id": doc_id "document_id": document_id
}) })
if neo4j_result.get("status") != "success": if neo4j_result.get("status") != "success":
error( error(
f"Neo4j 删除文档失败: document_id={doc_id}, 错误: {neo4j_result.get('message', '未知错误')}") f"Neo4j 删除文档失败: document_id={document_id}, 错误: {neo4j_result.get('message', '未知错误')}")
continue
nodes_deleted = neo4j_result.get("nodes_deleted", 0) nodes_deleted = neo4j_result.get("nodes_deleted", 0)
rels_deleted = neo4j_result.get("rels_deleted", 0) rels_deleted = neo4j_result.get("rels_deleted", 0)
neo4j_deleted_nodes += nodes_deleted neo4j_deleted_nodes += nodes_deleted
neo4j_deleted_rels += rels_deleted neo4j_deleted_rels += rels_deleted
info(f"成功删除 document_id={doc_id}{nodes_deleted} 个 Neo4j 节点和 {rels_deleted} 个关系") info(f"成功删除 document_id={document_id}{nodes_deleted} 个 Neo4j 节点和 {rels_deleted} 个关系")
except Exception as e: except Exception as e:
error(f"删除 document_id={doc_id} 的 Neo4j 数据失败: {str(e)}") error(f"删除 document_id={document_id} 的 Neo4j 数据失败: {str(e)}")
continue
return { return {
"status": "success", "status": "success",
"collection_name": collection_name, "collection_name": collection_name,
"document_id": ",".join(document_ids), "document_id": document_id,
"message": f"成功删除 Milvus 记录,{neo4j_deleted_nodes} 个 Neo4j 节点,{neo4j_deleted_rels} 个 Neo4j 关系", "message": f"成功删除 Milvus 记录,{neo4j_deleted_nodes} 个 Neo4j 节点,{neo4j_deleted_rels} 个 Neo4j 关系",
"status_code": 200 "status_code": 200
} }
@ -570,7 +565,7 @@ class MilvusConnection:
return { return {
"status": "error", "status": "error",
"collection_name": collection_name, "collection_name": collection_name,
"document_id": "", "document_id": document_id,
"message": f"删除文档失败: {str(e)}", "message": f"删除文档失败: {str(e)}",
"status_code": 400 "status_code": 400
} }