rag/rag/api_service.py
2025-08-07 16:23:38 +08:00

343 lines
18 KiB
Python

from appPublic.log import debug, error
from typing import Dict, Any, List
import aiohttp
from aiohttp import ClientSession, ClientTimeout
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import traceback
import uuid
import json
class APIService:
"""处理 API 请求的服务类"""
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((aiohttp.ClientError, RuntimeError)),
before_sleep=lambda retry_state: debug(f"重试 API 请求,第 {retry_state.attempt_number}")
)
async def _make_request(self, url: str, action: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""通用 API 请求函数"""
debug(f"开始 API 请求: action={action}, params={params}, url={url}")
try:
async with ClientSession(timeout=ClientTimeout(total=300)) as session:
async with session.post(
url,
headers={"Content-Type": "application/json"},
json=params
) as response:
debug(f"收到响应: status={response.status}, headers={response.headers}")
response_text = await response.text()
debug(f"响应内容: {response_text}")
result = await response.json()
debug(f"API 响应内容: {result}")
if response.status == 400:
debug(f"客户端错误,状态码: {response.status}, 返回响应: {result}")
return result
if response.status != 200:
error(f"API 调用失败,动作: {action}, 状态码: {response.status}, 响应: {response_text}")
raise RuntimeError(f"API 调用失败: {response.status}")
return result
except Exception as e:
error(f"API 调用失败: {action}, 错误: {str(e)}, 堆栈: {traceback.format_exc()}")
raise RuntimeError(f"API 调用失败: {str(e)}")
# 嵌入服务 (BAAI/bge-m3)
async def get_embeddings(self, texts: list) -> list:
"""调用嵌入服务获取文本向量"""
try:
async with ClientSession() as session:
async with session.post(
"https://embedding.opencomputing.net:10443/v1/embeddings", # 使用外网地址
headers={"Content-Type": "application/json"},
json={"input": texts if isinstance(texts, list) else [texts]}
) as response:
if response.status != 200:
error(f"嵌入服务调用失败,状态码: {response.status}")
raise RuntimeError(f"嵌入服务调用失败: {response.status}")
result = await response.json()
if result.get("object") != "list" or not result.get("data"):
error(f"嵌入服务响应格式错误: {result}")
raise RuntimeError("嵌入服务响应格式错误")
embeddings = [item["embedding"] for item in result["data"]]
debug(f"成功获取 {len(embeddings)} 个嵌入向量")
return embeddings
except Exception as e:
error(f"嵌入服务调用失败: {str(e)}")
raise RuntimeError(f"嵌入服务调用失败: {str(e)}")
# 实体提取服务 (LTP/small)
async def extract_entities(self, query: str) -> list:
"""调用实体识别服务"""
try:
if not query:
raise ValueError("查询文本不能为空")
async with ClientSession() as session:
async with session.post(
"https://entities.opencomputing.net:10443/v1/entities", # 使用外网地址
headers={"Content-Type": "application/json"},
json={"query": query}
) as response:
if response.status != 200:
error(f"实体识别服务调用失败,状态码: {response.status}")
raise RuntimeError(f"实体识别服务调用失败: {response.status}")
result = await response.json()
if result.get("object") != "list" or not result.get("data"):
error(f"实体识别服务响应格式错误: {result}")
raise RuntimeError("实体识别服务响应格式错误")
entities = result["data"]
unique_entities = list(dict.fromkeys(entities))
debug(f"成功提取 {len(unique_entities)} 个唯一实体")
return unique_entities
except Exception as e:
error(f"实体识别服务调用失败: {str(e)}")
return []
# 三元组抽取服务 (Babelscape/mrebel-large)
async def extract_triples(self, text: str) -> list:
"""调用三元组抽取服务"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for triples extraction")
try:
async with ClientSession(
connector=aiohttp.TCPConnector(limit=30),
timeout=ClientTimeout(total=None)
) as session:
async with session.post(
"https://triples.opencomputing.net:10443/v1/triples", # 使用外网地址
headers={"Content-Type": "application/json; charset=utf-8"},
json={"text": text}
) as response:
if response.status != 200:
error_text = await response.text()
error(f"Request #{request_id} failed, status: {response.status}, response: {error_text}")
raise RuntimeError(f"三元组抽取服务调用失败: {response.status}, {error_text}")
result = await response.json()
if result.get("object") != "list" or not result.get("data"):
error(f"Request #{request_id} invalid response format: {result}")
raise RuntimeError("三元组抽取服务响应格式错误")
triples = result["data"]
debug(f"Request #{request_id} extracted {len(triples)} triples")
return triples
except Exception as e:
error(f"Request #{request_id} failed to extract triples: {str(e)}")
raise RuntimeError(f"三元组抽取服务调用失败: {str(e)}")
# 重排序服务 (BAAI/bge-reranker-v2-m3)
async def rerank_results(self, query: str, results: list, top_n: int) -> list:
"""调用重排序服务"""
try:
if not results:
debug("无结果需要重排序")
return results
if not isinstance(top_n, int) or top_n < 1:
debug(f"无效的 top_n 参数: {top_n}, 使用 len(results)={len(results)}")
top_n = len(results)
else:
top_n = min(top_n, len(results))
documents = [result.get("text", str(result)) for result in results]
async with ClientSession() as session:
async with session.post(
"https://reranker.opencomputing.net:10443/v1/rerank", # 使用外网地址
headers={"Content-Type": "application/json"},
json={
"model": "rerank-001",
"query": query,
"documents": documents,
"top_n": top_n
}
) as response:
if response.status != 200:
error(f"重排序服务调用失败,状态码: {response.status}")
raise RuntimeError(f"重排序服务调用失败: {response.status}")
result = await response.json()
if result.get("object") != "rerank.result" or not result.get("data"):
error(f"重排序服务响应格式错误: {result}")
raise RuntimeError("重排序服务响应格式错误")
rerank_data = result["data"]
reranked_results = []
for item in rerank_data:
index = item["index"]
if index < len(results):
results[index]["rerank_score"] = item["relevance_score"]
reranked_results.append(results[index])
debug(f"成功重排序 {len(reranked_results)} 条结果")
return reranked_results[:top_n]
except Exception as e:
error(f"重排序服务调用失败: {str(e)}")
return results
# Neo4j 服务
async def neo4j_docs(self) -> str:
"""获取 Neo4j 文档(返回文本格式)"""
async with ClientSession(timeout=ClientTimeout(total=300)) as session:
async with session.get("https://graphdb.opencomputing.net:10443/docs") as response:
if response.status != 200:
error(f"Neo4j 文档请求失败,状态码: {response.status}")
raise RuntimeError(f"Neo4j 文档请求失败: {response.status}")
text = await response.text() # 获取纯文本内容
debug(f"Neo4j 文档内容: {text}")
return text
async def neo4j_initialize(self) -> Dict[str, Any]:
"""初始化 Neo4j 服务"""
return await self._make_request("https://graphdb.opencomputing.net:10443/v1/initialize", "initialize", {})
async def neo4j_insert_triples(self, triples: list, document_id: str, knowledge_base_id: str, userid: str) -> Dict[str, Any]:
"""插入三元组到 Neo4j"""
params = {
"triples": triples,
"document_id": document_id,
"knowledge_base_id": knowledge_base_id,
"userid": userid
}
return await self._make_request("https://graphdb.opencomputing.net:10443/v1/inserttriples", "inserttriples", params)
async def neo4j_delete_document(self, document_id: str) -> Dict[str, Any]:
"""删除指定文档"""
return await self._make_request("https://graphdb.opencomputing.net:10443/v1/deletedocument", "deletedocument", {"document_id": document_id})
async def neo4j_delete_knowledgebase(self, userid: str, knowledge_base_id: str) -> Dict[str, Any]:
"""删除用户知识库"""
return await self._make_request("https://graphdb.opencomputing.net:10443/v1/deleteknowledgebase", "deleteknowledgebase",
{"userid": userid, "knowledge_base_id": knowledge_base_id})
async def neo4j_match_triplets(self, query: str, query_entities: list, userid: str, knowledge_base_id: str) -> Dict[str, Any]:
"""根据实体匹配相关三元组"""
params = {
"query": query,
"query_entities": query_entities,
"userid": userid,
"knowledge_base_id": knowledge_base_id
}
return await self._make_request("https://graphdb.opencomputing.net:10443/v1/matchtriplets", "matchtriplets", params)
# Milvus 服务
async def milvus_create_collection(self, db_type: str = "") -> Dict[str, Any]:
"""创建 Milvus 集合"""
params = {"db_type": db_type} if db_type else {}
return await self._make_request("https://vectordb.opencomputing.net:10443/v1/createcollection", "createcollection", params)
async def milvus_delete_collection(self, db_type: str = "") -> Dict[str, Any]:
"""删除 Milvus 集合"""
params = {"db_type": db_type} if db_type else {}
return await self._make_request("https://vectordb.opencomputing.net:10443/v1/deletecollection", "deletecollection", params)
async def milvus_insert_document(self, chunks: List[Dict], db_type: str = "") -> Dict[str, Any]:
"""添加 Milvus 记录"""
params = {
"chunks": chunks,
"dbtype": db_type
}
return await self._make_request("https://vectordb.opencomputing.net:10443/v1/insertdocument", "insertdocument", params)
async def milvus_delete_document(self, userid: str, file_path: str, knowledge_base_id: str, document_id:str, db_type: str = "") -> Dict[str, Any]:
"""删除 Milvus 记录"""
params = {
"userid": userid,
"file_path": file_path,
"knowledge_base_id": knowledge_base_id,
"document_id": document_id,
"dbtype": db_type
}
return await self._make_request("https://vectordb.opencomputing.net:10443/v1/deletedocument", "deletedocument", params)
async def milvus_delete_knowledgebase(self, userid: str, knowledge_base_id: str) -> Dict[str, Any]:
"""删除 Milvus 知识库"""
return await self._make_request("https://vectordb.opencomputing.net:10443/v1/deleteknowledgebase", "deleteknowledgebase",
{"userid": userid, "knowledge_base_id": knowledge_base_id})
async def milvus_search_query(self, query_vector: List[float], userid: str, knowledge_base_ids: list, limit: int, offset: int) -> Dict[str, Any]:
"""根据用户知识库检索 Milvus"""
params = {
"query_vector": query_vector,
"userid": userid,
"knowledge_base_ids": knowledge_base_ids,
"limit": limit,
"offset": offset
}
return await self._make_request("https://vectordb.opencomputing.net:10443/v1/searchquery", "searchquery", params)
async def milvus_list_user_files(self, userid: str) -> Dict[str, Any]:
"""列出 Milvus 用户知识库列表"""
return await self._make_request("https://vectordb.opencomputing.net:10443/v1/listuserfiles", "listuserfiles", {"userid": userid})
async def milvus_list_all_knowledgebases(self) -> Dict[str, Any]:
"""列出 Milvus 数据库中所有数据"""
return await self._make_request("https://vectordb.opencomputing.net:10443/v1/listallknowledgebases", "listallknowledgebases", {})
# RAG 服务
async def rag_create_collection(self, db_type: str = "") -> Dict[str, Any]:
"""创建 RAG 集合"""
params = {"db_type": db_type} if db_type else {}
return await self._make_request("https://rag.opencomputing.net:10443/v1/createcollection", "createcollection", params)
async def rag_delete_collection(self, db_type: str = "") -> Dict[str, Any]:
"""删除 RAG 集合"""
params = {"db_type": db_type} if db_type else {}
return await self._make_request("https://rag.opencomputing.net:10443/v1/deletecollection", "deletecollection", params)
async def rag_insert_file(self, file_path: str, userid: str, knowledge_base_id: str, document_id: str) -> Dict[str, Any]:
"""添加 RAG 记录"""
params = {
"file_path": file_path,
"userid": userid,
"knowledge_base_id": knowledge_base_id,
"document_id": document_id
}
return await self._make_request("https://rag.opencomputing.net:10443/v1/insertfile", "insertfile", params)
async def rag_delete_file(self, userid: str, file_path: str, knowledge_base_id: str, document_id: str) -> Dict[str, Any]:
"""删除 RAG 记录"""
params = {
"userid": userid,
"file_path": file_path,
"knowledge_base_id": knowledge_base_id,
"document_id": document_id
}
return await self._make_request("https://rag.opencomputing.net:10443/v1/deletefile", "deletefile", params)
async def rag_delete_knowledgebase(self, userid: str, knowledge_base_id: str) -> Dict[str, Any]:
"""删除 RAG 知识库"""
return await self._make_request("https://rag.opencomputing.net:10443/v1/deleteknowledgebase", "deleteknowledgebase",
{"userid": userid, "knowledge_base_id": knowledge_base_id})
async def rag_search_query(self, query: str, userid: str, knowledge_base_ids: list, limit: int, offset: int,
use_rerank: bool) -> Dict[str, Any]:
"""根据用户知识库检索 RAG"""
params = {
"query": query,
"userid": userid,
"knowledge_base_ids": knowledge_base_ids,
"limit": limit,
"offset": offset,
"use_rerank": use_rerank
}
return await self._make_request("https://rag.opencomputing.net:10443/v1/searchquery", "searchquery", params)
async def rag_fused_search_query(self, query: str, userid: str, knowledge_base_ids: list, limit: int, offset: int,
use_rerank: bool) -> Dict[str, Any]:
"""根据用户知识库+知识图谱检索 RAG"""
params = {
"query": query,
"userid": userid,
"knowledge_base_ids": knowledge_base_ids,
"limit": limit,
"offset": offset,
"use_rerank": use_rerank
}
return await self._make_request("https://rag.opencomputing.net:10443/v1/fusedsearchquery", "fusedsearchquery", params)
async def rag_list_user_files(self, userid: str) -> Dict[str, Any]:
"""列出 RAG 用户知识库列表"""
return await self._make_request("https://rag.opencomputing.net:10443/v1/listuserfiles", "listuserfiles", {"userid": userid})
async def rag_list_all_knowledgebases(self) -> Dict[str, Any]:
"""列出 RAG 数据库中所有数据"""
return await self._make_request("https://rag.opencomputing.net:10443/v1/listallknowledgebases", "listallknowledgebases", {})
async def rag_docs(self) -> Dict[str, Any]:
"""获取 RAG 帮助文档"""
return await self._make_request("https://rag.opencomputing.net:10443/v1/docs", "docs", {})