rag/rag/uapi_service.py
2025-10-10 11:49:20 +08:00

374 lines
21 KiB
Python

from traceback import format_exc
from appPublic.log import debug, error
from typing import Dict, Any, List
import uuid
import json
import re
from ahserver.serverenv import ServerEnv
from uapi.appapi import UAPI
from appPublic.dictObject import DictObject
class APIService:
"""处理 API 请求的服务类"""
async def handle_uapi_response(self, b: bytes, upappid: str, apiname: str, service_name: str, request_id: str = None) -> dict:
"""通用处理 uapi.call 响应"""
log_prefix = f"request #{request_id} " if request_id else ""
if not b:
error(f"{log_prefix}{service_name} 返回空响应: upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"{service_name} 返回空响应\n{format_exc()}")
try:
d = json.loads(b.decode('utf-8'))
except Exception as e:
error(
f"request #{request_id} JSON 解析失败: {str(e)}, upappid={upappid}, apiname={apiname}\n{format_exc()}")
return d
# 嵌入服务 (BAAI/bge-m3)
async def get_embeddings(self, request, texts: list, upappid: str, apiname: str, user: str) -> list:
"""调用嵌入服务获取文本向量"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for embeddings, texts={texts[:2]}")
try:
debug(f"get_embeddings request: {request}")
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"input": texts}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "嵌入服务", request_id)
if d.get("object") != "list" or not d.get("data"):
error(f"request #{request_id} 嵌入服务响应格式错误: {d}")
raise RuntimeError("嵌入服务响应格式错误")
embeddings = [item["embedding"] for item in d["data"]]
debug(f"request #{request_id} 成功获取 {len(embeddings)} 个嵌入向量")
return embeddings
except Exception as e:
error(f"request #{request_id} 嵌入服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"嵌入服务调用失败: {str(e)}")
# 实体提取服务 (LTP/small)
async def extract_entities(self, request, query: str, upappid: str, apiname: str, user: str) -> list:
"""调用实体识别服务"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for entity extraction, query={query[:100]}")
try:
if not query:
raise ValueError("查询文本不能为空")
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"query": query}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "实体识别服务", request_id)
if d.get("object") != "list" or not d.get("data"):
error(f"request #{request_id} 实体识别服务响应格式错误: {d}")
raise RuntimeError("实体识别服务响应格式错误")
entities = d["data"]
unique_entities = list(dict.fromkeys(entities))
debug(f"request #{request_id} 成功提取 {len(unique_entities)} 个唯一实体")
return unique_entities
except Exception as e:
error(f"request #{request_id} 实体识别服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
return []
# 三元组抽取服务 (Babelscape/mrebel-large)
async def extract_triples(self, request, text: str, upappid: str, apiname: str, user: str) -> list:
"""调用三元组抽取服务"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for triples extraction, text={text[:100]}")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"text": text}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "三元组抽取服务", request_id)
if d.get("object") != "list":
error(f"request #{request_id} 三元组抽取服务响应格式错误: {d}")
raise RuntimeError("三元组抽取服务响应格式错误")
triples = d.get("data", []) # 允许 data 为空
debug(f"request #{request_id} extracted {len(triples)} triples")
return triples
except Exception as e:
error(f"request #{request_id} failed to extract triples: {str(e)}, upappid={upappid}, apiname={apiname}")
return []
# 重排序服务 (BAAI/bge-reranker-v2-m3)
async def rerank_results(self, request, query: str, results: list, top_n: int, upappid: str, apiname: str, user: str) -> list:
"""调用重排序服务"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for reranking, query={query[:100]}")
try:
if not results:
debug(f"request #{request_id} 无结果需要重排序")
return results
if not isinstance(top_n, int) or top_n < 1:
debug(f"request #{request_id} 无效的 top_n 参数: {top_n}, 使用 len(results)={len(results)}")
top_n = len(results)
else:
top_n = min(top_n, len(results))
debug(f"top_n={top_n}")
uapi = UAPI(request, DictObject(**globals()))
documents = [result.get("text", str(result)) for result in results]
debug(f"文档块长度:{len(documents)},\ndocuments: {documents}")
params_kw = {
"model": "rerank-001",
"query": query,
"documents": documents,
"top_n": top_n
}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "重排序服务", request_id)
if d.get("object") != "rerank.result" or not d.get("data"):
error(f"request #{request_id} 重排序服务响应格式错误: {d}")
raise RuntimeError("重排序服务响应格式错误")
rerank_data = d["data"]
reranked_results = []
for item in rerank_data:
index = item["index"]
if index < len(results):
results[index]["rerank_score"] = item["relevance_score"]
reranked_results.append(results[index])
debug(f"request #{request_id} 成功重排序 {len(reranked_results)} 条结果")
return reranked_results[:top_n]
except Exception as e:
error(f"request #{request_id} 重排序服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
return results
# Neo4j 服务
async def neo4j_docs(self, request, upappid: str, apiname: str, user: str) -> str:
"""获取 Neo4j 文档(返回文本格式)"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Neo4j docs")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Neo4j 文档服务", request_id)
if d.get("status") != 200 or not d.get("text"):
error(f"request #{request_id} Neo4j 文档请求格式错误: {d}")
raise RuntimeError(f"Neo4j 文档请求失败: {d.get('status')}")
text = d["text"]
debug(f"request #{request_id} Neo4j 文档内容: {text[:500]}")
return text
except Exception as e:
error(f"request #{request_id} Neo4j 文档请求失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Neo4j 文档请求失败: {str(e)}")
async def neo4j_initialize(self, request, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""初始化 Neo4j 服务"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Neo4j initialization")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Neo4j 初始化服务", request_id)
debug(f"request #{request_id} Neo4j 初始化成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Neo4j 初始化服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Neo4j 初始化服务调用失败: {str(e)}")
async def neo4j_insert_triples(self, request, triples: list, document_id: str, knowledge_base_id: str, userid: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""插入三元组到 Neo4j"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Neo4j triples insertion")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {
"triples": triples,
"document_id": document_id,
"knowledge_base_id": knowledge_base_id,
"userid": userid
}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Neo4j 三元组插入服务", request_id)
debug(f"request #{request_id} Neo4j 三元组插入成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Neo4j 三元组插入服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Neo4j 三元组插入服务调用失败: {str(e)}")
async def neo4j_delete_document(self, request, document_id: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""删除指定文档"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Neo4j document deletion")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"document_id": document_id}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Neo4j 文档删除服务", request_id)
debug(f"request #{request_id} Neo4j 文档删除成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Neo4j 文档删除服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Neo4j 文档删除服务调用失败: {str(e)}")
async def neo4j_delete_knowledgebase(self, request, userid: str, knowledge_base_id: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""删除用户知识库"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Neo4j knowledgebase deletion")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"userid": userid, "knowledge_base_id": knowledge_base_id}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Neo4j 知识库删除服务", request_id)
debug(f"request #{request_id} Neo4j 知识库删除成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Neo4j 知识库删除服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Neo4j 知识库删除服务调用失败: {str(e)}")
async def neo4j_match_triplets(self, request, query: str, query_entities: list, userid: str, knowledge_base_id: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""根据实体匹配相关三元组"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Neo4j triplets matching, query={query[:100]}")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {
"query": query,
"query_entities": query_entities,
"userid": userid,
"knowledge_base_id": knowledge_base_id
}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Neo4j 三元组匹配服务", request_id)
debug(f"request #{request_id} Neo4j 三元组匹配成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Neo4j 三元组匹配服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Neo4j 三元组匹配服务调用失败: {str(e)}")
# Milvus 服务
async def milvus_create_collection(self, request, upappid: str, apiname: str, user: str, db_type: str = "") -> Dict[str, Any]:
"""创建 Milvus 集合"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus collection creation")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"db_type": db_type}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 集合创建服务", request_id)
debug(f"request #{request_id} Milvus 集合创建成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Milvus 集合创建服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 集合创建服务调用失败: {str(e)}")
async def milvus_delete_collection(self, request, upappid: str, apiname: str, user: str, db_type: str = "") -> Dict[str, Any]:
"""删除 Milvus 集合"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus collection deletion")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"db_type": db_type}
b = await uapi.call(upappid, apiname, user, params_kw)
debug(f"{b=}")
d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 集合删除服务", request_id)
debug(f"request #{request_id} Milvus 集合删除成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Milvus 集合删除服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 集合删除服务调用失败: {str(e)}")
async def milvus_insert_document(self, request, chunks: List[Dict], upappid: str, apiname: str, user: str, db_type: str = "") -> Dict[str, Any]:
"""添加 Milvus 记录"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus document insertion")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {
"chunks": chunks,
"dbtype": db_type
}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 文档插入服务", request_id)
debug(f"request #{request_id} Milvus 文档插入成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Milvus 文档插入服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 文档插入服务调用失败: {str(e)}")
async def milvus_delete_document(self, request, userid: str, file_path: str, knowledge_base_id: str, document_id: str, upappid: str, apiname: str, user: str, db_type: str = "") -> Dict[str, Any]:
"""删除 Milvus 记录"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus document deletion")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {
"userid": userid,
"file_path": file_path,
"knowledge_base_id": knowledge_base_id,
"document_id": document_id,
"dbtype": db_type
}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 文档删除服务", request_id)
debug(f"request #{request_id} Milvus 文档删除成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Milvus 文档删除服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 文档删除服务调用失败: {str(e)}")
async def milvus_delete_knowledgebase(self, request, userid: str, knowledge_base_id: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""删除 Milvus 知识库"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus knowledgebase deletion")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"userid": userid, "knowledge_base_id": knowledge_base_id}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 知识库删除服务", request_id)
debug(f"request #{request_id} Milvus 知识库删除成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Milvus 知识库删除服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 知识库删除服务调用失败: {str(e)}")
async def milvus_search_query(self, request, query_vector: List[float], userid: str, knowledge_base_ids: list, limit: int, offset: int, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""根据用户知识库检索 Milvus"""
request_id = str(uuid.uuid4())
debug(f"userid:{userid}")
debug(f"Request #{request_id} started for Milvus search")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {
"query_vector": query_vector,
"userid": userid,
"knowledge_base_ids": knowledge_base_ids,
"limit": limit,
"offset": offset
}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 检索服务", request_id)
debug(f"request #{request_id} Milvus 检索成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Milvus 检索服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 检索服务调用失败: {str(e)}")
async def milvus_list_user_files(self, request, userid: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""列出 Milvus 用户知识库列表"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus user files listing")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"userid": userid}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 用户文件列表服务", request_id)
debug(f"request #{request_id} Milvus 用户文件列表成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Milvus 用户文件列表服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 用户文件列表服务调用失败: {str(e)}")
async def milvus_list_all_knowledgebases(self, request, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""列出 Milvus 数据库中所有数据"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus all knowledgebases listing")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 知识库列表服务", request_id)
debug(f"request #{request_id} Milvus 知识库列表成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Milvus 知识库列表服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 知识库列表服务调用失败: {str(e)}")