调用neo4j服务

This commit is contained in:
wangmeihua 2025-07-29 17:51:01 +08:00
parent e55b8b7052
commit 5eeaa77e22

View File

@ -1,10 +1,8 @@
from appPublic.jsonConfig import getConfig
import os
from appPublic.log import debug, error, info
import yaml
from threading import Lock
from llmengine.base_connection import connection_register
from typing import Dict, List, Any
import numpy as np
import aiohttp
from aiohttp import ClientSession, ClientTimeout
from langchain_core.documents import Document
@ -12,49 +10,52 @@ from langchain_text_splitters import RecursiveCharacterTextSplitter
import uuid
from datetime import datetime
from filetxt.loader import fileloader
from llmengine.kgc import KnowledgeGraph
import numpy as np
from py2neo import Graph
from scipy.spatial.distance import cosine
import time
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import traceback
import asyncio
import re
# 嵌入缓存
EMBED_CACHE = {}
class MilvusConnection:
_instance = None
_lock = Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super(MilvusConnection, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
pass
@retry(stop = stop_after_attempt(3))
async def _make_neo4japi_request(self, action: str, params: Dict[str, Any]) -> Dict[str, Any]:
debug(f"开始API请求action={action}, params={params}")
try:
config = getConfig()
self.neo4j_uri = config['neo4j']['uri']
self.neo4j_user = config['neo4j']['user']
self.neo4j_password = config['neo4j']['password']
except KeyError as e:
error(f"配置文件缺少必要字段: {str(e)}")
raise RuntimeError(f"配置文件缺少必要字段: {str(e)}")
self._initialized = True
info("Neo4jConnection initialized")
async with ClientSession(timeout=ClientTimeout(total=300)) as session:
url = f"http://localhost:8885/v1/{action}"
debug(f"发起POST请求{url}")
async with session.post(
url,
headers={'Content-Type': 'application/json'},
json=params
) as response:
debug(f"收到相应: status={response.status}, headers={response.headers}")
respose_text = await response.text()
debug(f"响应内容: {respose_text}")
result = await response.json()
debug(f"API响应内容: {result}")
if response.status == 400:
debug(f"客户端错误,状态码: {response.status},返回响应: {result}")
return result
if response.status != 200:
error(f"API 调用失败,动作: {action}, 状态码: {response.status}, 响应: {response_text}")
raise RuntimeError(f"API 调用失败: {response.status}")
debug(f"API 调用成功: {action}, 响应: {result}")
return result
except Exception as e:
error(f"API 调用失败: {action}, 错误: {str(e)}, 堆栈: {traceback.format_exc()}")
raise RuntimeError(f"API 调用失败: {str(e)}")
@retry(stop=stop_after_attempt(3))
async def _make_api_request(self, action: str, params: Dict[str, Any]) -> Dict[str, Any]:
debug(f"开始 API 请求: action={action}, params={params}")
try:
async with ClientSession(timeout=ClientTimeout(total=10)) as session:
async with ClientSession(timeout=ClientTimeout(total=300)) as session:
url = f"http://localhost:8886/v1/{action}"
debug(f"发起 POST 请求: {url}")
async with session.post(
@ -377,15 +378,28 @@ class MilvusConnection:
f"三元组抽取耗时: {timings['extract_triples']:.2f} 秒, 抽取到 {len(unique_triples)} 个三元组: {unique_triples[:5]}")
# Neo4j 插入
debug(f"抽取到 {len(unique_triples)} 个三元组,插入 Neo4j")
debug(f"抽取到 {len(unique_triples)} 个三元组,调用Neo4j服务插入")
start_neo4j = time.time()
if unique_triples:
kg = KnowledgeGraph(triples=unique_triples, document_id=document_id,
knowledge_base_id=knowledge_base_id, userid=userid)
kg.create_graphnodes()
kg.create_graphrels()
kg.export_data()
info(f"文件 {file_path} 三元组成功插入 Neo4j")
neo4j_result = await self._make_neo4japi_request("inserttriples", {
"triples":unique_triples,
"document_id": document_id,
"knowledge_base_id": knowledge_base_id,
"userid": userid
})
debug(f"Neo4j服务响应: {neo4j_result}")
if neo4j_result.get("status") != "success":
timings["insert_neo4j"] = time.time() - start_neo4j
timings["total"] = time.time() - start_total
return{
"status": "error",
"document_id": document_id,
"collection_name": collection_name,
"timings": timings,
"message": f"Neo4j 三元组插入失败: {neo4j_result.get('message', '未知错误')}",
"status_code": 400
}
info(f"文件 {file_path} 三元组成功插入 Neo4j: {neo4j_result.get('message')}")
else:
debug(f"文件 {file_path} 未抽取到三元组")
timings["insert_neo4j"] = time.time() - start_neo4j
@ -519,33 +533,28 @@ class MilvusConnection:
document_ids = milvus_result.get("document_id", "").split(",") if milvus_result.get("document_id") else []
# 调用 Neo4j 删除端点
neo4j_deleted_nodes = 0
neo4j_deleted_rels = 0
# 删除 Neo4j 数据
for doc_id in document_ids:
if not doc_id:
continue
try:
graph = Graph(self.neo4j_uri, auth=(self.neo4j_user, self.neo4j_password))
query = """
MATCH (n {document_id: $document_id})
OPTIONAL MATCH (n)-[r {document_id: $document_id}]->()
WITH collect(r) AS rels, collect(n) AS nodes
FOREACH (r IN rels | DELETE r)
FOREACH (n IN nodes | DELETE n)
RETURN size(nodes) AS node_count, size(rels) AS rel_count, [r IN rels | type(r)] AS rel_types
"""
result = graph.run(query, document_id=doc_id).data()
nodes_deleted = result[0]['node_count'] if result else 0
rels_deleted = result[0]['rel_count'] if result else 0
rel_types = result[0]['rel_types'] if result else []
info(
f"成功删除 document_id={doc_id}{nodes_deleted} 个 Neo4j 节点和 {rels_deleted} 个关系,关系类型: {rel_types}")
debug(f"调用 Neo4j 删除文档端点: document_id={doc_id}")
neo4j_result = await self._make_neo4japi_request("deletedocument", {
"document_id": doc_id
})
if neo4j_result.get("status") != "success":
error(
f"Neo4j 删除文档失败: document_id={doc_id}, 错误: {neo4j_result.get('message', '未知错误')}")
continue
nodes_deleted = neo4j_result.get("nodes_deleted", 0)
rels_deleted = neo4j_result.get("rels_deleted", 0)
neo4j_deleted_nodes += nodes_deleted
neo4j_deleted_rels += rels_deleted
info(f"成功删除 document_id={doc_id}{nodes_deleted} 个 Neo4j 节点和 {rels_deleted} 个关系")
except Exception as e:
error(f"删除 document_id={doc_id} 的 Neo4j 三元组失败: {str(e)}")
error(f"删除 document_id={doc_id} 的 Neo4j 数据失败: {str(e)}")
continue
return {
@ -584,30 +593,30 @@ class MilvusConnection:
deleted_files = milvus_result.get("deleted_files", [])
# 删除 Neo4j 数据
# 新增:调用 Neo4j 删除知识库端点
neo4j_deleted_nodes = 0
neo4j_deleted_rels = 0
try:
debug(f"尝试连接 Neo4j: uri={self.neo4j_uri}, user={self.neo4j_user}")
graph = Graph(self.neo4j_uri, auth=(self.neo4j_user, self.neo4j_password))
debug("Neo4j 连接成功")
query = """
MATCH (n {userid: $userid, knowledge_base_id: $knowledge_base_id})
OPTIONAL MATCH (n)-[r {userid: $userid, knowledge_base_id: $knowledge_base_id}]->()
WITH collect(r) AS rels, collect(n) AS nodes
FOREACH (r IN rels | DELETE r)
FOREACH (n IN nodes | DELETE n)
RETURN size(nodes) AS node_count, size(rels) AS rel_count, [r IN rels | type(r)] AS rel_types
"""
result = graph.run(query, userid=userid, knowledge_base_id=knowledge_base_id).data()
nodes_deleted = result[0]['node_count'] if result else 0
rels_deleted = result[0]['rel_count'] if result else 0
rel_types = result[0]['rel_types'] if result else []
neo4j_deleted_nodes += nodes_deleted
neo4j_deleted_rels += rels_deleted
info(f"成功删除 {nodes_deleted} 个 Neo4j 节点和 {rels_deleted} 个关系,关系类型: {rel_types}")
debug(f"调用 Neo4j 删除知识库端点: userid={userid}, knowledge_base_id={knowledge_base_id}")
neo4j_result = await self._make_neo4japi_request("deleteknowledgebase", {
"userid": userid,
"knowledge_base_id": knowledge_base_id
})
if neo4j_result.get("status") == "success":
neo4j_deleted_nodes = neo4j_result.get("nodes_deleted", 0)
neo4j_deleted_rels = neo4j_result.get("rels_deleted", 0)
info(f"成功删除 {neo4j_deleted_nodes} 个 Neo4j 节点和 {neo4j_deleted_rels} 个关系")
else:
error(f"Neo4j 删除知识库失败: {neo4j_result.get('message', '未知错误')}")
return {
"status": "success",
"collection_name": collection_name,
"deleted_files": deleted_files,
"message": f"成功删除 Milvus 知识库,{neo4j_deleted_nodes} 个 Neo4j 节点,{neo4j_deleted_rels} 个 Neo4j 关系,但 Neo4j 删除失败: {neo4j_result.get('message')}",
"status_code": 200
}
except Exception as e:
error(f"删除 Neo4j 数据失败: {str(e)}")
error(f"Neo4j 删除知识库失败: {str(e)}")
return {
"status": "success",
"collection_name": collection_name,
@ -672,119 +681,6 @@ class MilvusConnection:
error(f"实体识别服务调用失败: {str(e)}")
return []
async def _match_triplets(self, query: str, query_entities: List[str], userid: str, knowledge_base_id: str) -> List[Dict]:
"""匹配查询实体与 Neo4j 中的三元组"""
start_time = time.time() # 记录开始时间
matched_triplets = []
ENTITY_SIMILARITY_THRESHOLD = 0.8
try:
graph = Graph(self.neo4j_uri, auth=(self.neo4j_user, self.neo4j_password))
debug(f"已连接到 Neo4j: {self.neo4j_uri}")
neo4j_connect_time = time.time() - start_time
debug(f"Neo4j 连接耗时: {neo4j_connect_time:.3f}")
matched_names = set()
entity_match_start = time.time()
for entity in query_entities:
normalized_entity = entity.lower().strip()
query = """
MATCH (n {userid: $userid, knowledge_base_id: $knowledge_base_id})
WHERE toLower(n.name) CONTAINS $entity
OR apoc.text.levenshteinSimilarity(toLower(n.name), $entity) > 0.7
RETURN n.name, apoc.text.levenshteinSimilarity(toLower(n.name), $entity) AS sim
ORDER BY sim DESC
LIMIT 100
"""
try:
results = graph.run(query, userid=userid, knowledge_base_id=knowledge_base_id, entity=normalized_entity).data()
for record in results:
matched_names.add(record['n.name'])
debug(f"实体 {entity} 匹配节点: {record['n.name']} (Levenshtein 相似度: {record['sim']:.2f})")
except Exception as e:
debug(f"模糊匹配实体 {entity} 失败: {str(e)}")
continue
entity_match_time = time.time() - entity_match_start
debug(f"实体匹配耗时: {entity_match_time:.3f}")
triplets = []
if matched_names:
triplet_query_start = time.time()
query = """
MATCH (h {userid: $userid, knowledge_base_id: $knowledge_base_id})-[r {userid: $userid, knowledge_base_id: $knowledge_base_id}]->(t {userid: $userid, knowledge_base_id: $knowledge_base_id})
WHERE h.name IN $matched_names OR t.name IN $matched_names
RETURN h.name AS head, r.name AS type, t.name AS tail
LIMIT 100
"""
try:
results = graph.run(query, userid=userid, knowledge_base_id=knowledge_base_id, matched_names=list(matched_names)).data()
seen = set()
for record in results:
head, type_, tail = record['head'], record['type'], record['tail']
triplet_key = (head.lower(), type_.lower(), tail.lower())
if triplet_key not in seen:
seen.add(triplet_key)
triplets.append({
'head': head,
'type': type_,
'tail': tail,
'head_type': '',
'tail_type': ''
})
debug(f"从 Neo4j 加载三元组: knowledge_base_id={knowledge_base_id}, 数量={len(triplets)}")
except Exception as e:
error(f"检索三元组失败: knowledge_base_id={knowledge_base_id}, 错误: {str(e)}")
return []
triplet_query_time = time.time() - triplet_query_start
debug(f"Neo4j 三元组查询耗时: {triplet_query_time:.3f}")
if not triplets:
debug(f"知识库 knowledge_base_id={knowledge_base_id} 无匹配三元组")
return []
embedding_start = time.time()
texts_to_embed = query_entities + [t['head'] for t in triplets] + [t['tail'] for t in triplets]
embeddings = await self._get_embeddings(texts_to_embed)
entity_vectors = {entity: embeddings[i] for i, entity in enumerate(query_entities)}
head_vectors = {t['head']: embeddings[len(query_entities) + i] for i, t in enumerate(triplets)}
tail_vectors = {t['tail']: embeddings[len(query_entities) + len(triplets) + i] for i, t in enumerate(triplets)}
debug(f"成功获取 {len(embeddings)} 个嵌入向量({len(query_entities)} entities + {len(triplets)} heads + {len(triplets)} tails")
embedding_time = time.time() - embedding_start
debug(f"嵌入向量生成耗时: {embedding_time:.3f}")
similarity_start = time.time()
for entity in query_entities:
entity_vec = entity_vectors[entity]
for d_triplet in triplets:
d_head_vec = head_vectors[d_triplet['head']]
d_tail_vec = tail_vectors[d_triplet['tail']]
head_similarity = 1 - cosine(entity_vec, d_head_vec)
tail_similarity = 1 - cosine(entity_vec, d_tail_vec)
if head_similarity >= ENTITY_SIMILARITY_THRESHOLD or tail_similarity >= ENTITY_SIMILARITY_THRESHOLD:
matched_triplets.append(d_triplet)
debug(f"匹配三元组: {d_triplet['head']} - {d_triplet['type']} - {d_triplet['tail']} "
f"(entity={entity}, head_sim={head_similarity:.2f}, tail_sim={tail_similarity:.2f})")
similarity_time = time.time() - similarity_start
debug(f"相似度计算耗时: {similarity_time:.3f}")
unique_matched = []
seen = set()
for t in matched_triplets:
identifier = (t['head'].lower(), t['type'].lower(), t['tail'].lower())
if identifier not in seen:
seen.add(identifier)
unique_matched.append(t)
total_time = time.time() - start_time
debug(f"_match_triplets 总耗时: {total_time:.3f}")
info(f"找到 {len(unique_matched)} 个匹配的三元组")
return unique_matched
except Exception as e:
error(f"匹配三元组失败: {str(e)}")
return []
async def _rerank_results(self, query: str, results: List[Dict], top_n: int) -> List[Dict]:
"""调用重排序服务"""
try:
@ -936,14 +832,28 @@ class MilvusConnection:
timing_stats["entity_extraction"] = time.time() - entity_extract_start
debug(f"提取实体: {query_entities}, 耗时: {timing_stats['entity_extraction']:.3f}")
# 匹配三元组
# 调用 Neo4j 服务进行三元组匹配
all_triplets = []
triplet_match_start = time.time()
for kb_id in knowledge_base_ids:
debug(f"处理知识库: {kb_id}")
matched_triplets = await self._match_triplets(query, query_entities, userid, kb_id)
debug(f"知识库 {kb_id} 匹配三元组: {len(matched_triplets)}")
all_triplets.extend(matched_triplets)
debug(f"调用 Neo4j 三元组匹配: knowledge_base_id={kb_id}")
try:
neo4j_result = await self._make_neo4japi_request("matchtriplets", {
"query": query,
"query_entities": query_entities,
"userid": userid,
"knowledge_base_id": kb_id
})
if neo4j_result.get("status") == "success":
triplets = neo4j_result.get("triplets", [])
all_triplets.extend(triplets)
debug(f"知识库 {kb_id} 匹配到 {len(triplets)} 个三元组: {triplets[:5]}")
else:
error(
f"Neo4j 三元组匹配失败: knowledge_base_id={kb_id}, 错误: {neo4j_result.get('message', '未知错误')}")
except Exception as e:
error(f"Neo4j 三元组匹配失败: knowledge_base_id={kb_id}, 错误: {str(e)}")
continue
timing_stats["triplet_matching"] = time.time() - triplet_match_start
debug(f"三元组匹配总耗时: {timing_stats['triplet_matching']:.3f}")
@ -977,7 +887,7 @@ class MilvusConnection:
# 调用融合搜索端点
search_start = time.time()
result = await self._make_api_request("searchquery", { # 注意:使用 searchquery 端点
result = await self._make_api_request("searchquery", {
"query_vector": query_vector.tolist(),
"userid": userid,
"knowledge_base_ids": knowledge_base_ids,