Merge branch 'main' of git.opencomputing.cn:yumoqing/rag

This commit is contained in:
yumoqing 2025-08-13 16:36:16 +08:00
commit aa21264fed

View File

@ -2,6 +2,7 @@ from appPublic.log import debug, error
from typing import Dict, Any, List from typing import Dict, Any, List
import uuid import uuid
import json import json
import re
from ahserver.serverenv import ServerEnv from ahserver.serverenv import ServerEnv
from uapi.appapi import UAPI from uapi.appapi import UAPI
from appPublic.dictObject import DictObject from appPublic.dictObject import DictObject
@ -9,96 +10,113 @@ from appPublic.dictObject import DictObject
class APIService: class APIService:
"""处理 API 请求的服务类""" """处理 API 请求的服务类"""
# 嵌入服务 (BAAI/bge-m3) async def handle_uapi_response(self, b: bytes, upappid: str, apiname: str, service_name: str, request_id: str = None) -> dict:
async def get_embeddings(self, request, texts: list, upappid: str, apiname: str, user: str) -> list: """通用处理 uapi.call 响应"""
"""调用嵌入服务获取文本向量""" log_prefix = f"request #{request_id} " if request_id else ""
try:
uapi = UAPI(request, DictObject(**globals()))
debug(
f'{uapi=}, {type(uapi.call)}, upappid={upappid}, apiname={apiname}, user={user}, texts={texts[:2]}') # 仅记录前两个文本以避免日志过长
params_kw = {"input": texts}
b = await uapi.call(upappid, apiname, user, params_kw)
# 检查响应内容
if not b: if not b:
error(f"嵌入服务返回空响应: upappid={upappid}, apiname={apiname}") error(f"{log_prefix}{service_name} 返回空响应: upappid={upappid}, apiname={apiname}")
raise RuntimeError("嵌入服务返回空响应") raise RuntimeError(f"{service_name} 返回空响应")
try: try:
response_text = b.decode('utf-8') response_text = b.decode('utf-8')
except UnicodeDecodeError as decode_err: except UnicodeDecodeError as decode_err:
error(f"响应解码失败: {str(decode_err)}, 原始响应: {b[:100]}") # 记录前100字节 error(f"{log_prefix}{service_name} 响应解码失败: {str(decode_err)}, 原始响应: {b[:100]}")
raise RuntimeError(f"响应解码失败: {str(decode_err)}") raise RuntimeError(f"响应解码失败: {str(decode_err)}")
debug(f"嵌入服务原始响应: {response_text[:500]}") # 记录前500字符以避免日志过长 # 清理响应中的控制字符
response_text = re.sub(r'[\x00-\x1F\x7F]', '', response_text)
debug(f"{log_prefix}{service_name} 原始响应: {response_text[:500]}")
try: try:
d = json.loads(response_text) d = json.loads(response_text)
except json.JSONDecodeError as json_err: except json.JSONDecodeError as json_err:
error(f"JSON 解析失败: {str(json_err)}, 响应内容: {response_text[:500]}") error(f"{log_prefix}{service_name} JSON 解析失败: {str(json_err)}, 响应内容: {response_text[:500]}")
raise RuntimeError(f"JSON 解析失败: {str(json_err)}") raise RuntimeError(f"JSON 解析失败: {str(json_err)}")
if not isinstance(d, dict):
error(f"{log_prefix}{service_name} 响应不是有效字典: {d}")
raise RuntimeError(f"{service_name} 响应格式错误")
return d
# 嵌入服务 (BAAI/bge-m3)
async def get_embeddings(self, request, texts: list, upappid: str, apiname: str, user: str) -> list:
"""调用嵌入服务获取文本向量"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for embeddings, texts={texts[:2]}")
try:
# 清理输入文本
texts = [re.sub(r'[\x00-\x1F\x7F]', '', text) for text in texts]
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"input": texts}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "嵌入服务", request_id)
if d.get("object") != "list" or not d.get("data"): if d.get("object") != "list" or not d.get("data"):
error(f"嵌入服务响应格式错误: {d}") error(f"request #{request_id} 嵌入服务响应格式错误: {d}")
raise RuntimeError("嵌入服务响应格式错误") raise RuntimeError("嵌入服务响应格式错误")
embeddings = [item["embedding"] for item in d["data"]] embeddings = [item["embedding"] for item in d["data"]]
debug(f"成功获取 {len(embeddings)} 个嵌入向量") debug(f"request #{request_id} 成功获取 {len(embeddings)} 个嵌入向量")
return embeddings return embeddings
except Exception as e: except Exception as e:
error(f"嵌入服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}") error(f"request #{request_id} 嵌入服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"嵌入服务调用失败: {str(e)}") raise RuntimeError(f"嵌入服务调用失败: {str(e)}")
# 实体提取服务 (LTP/small) # 实体提取服务 (LTP/small)
async def extract_entities(self, request, query: str, upappid: str, apiname: str, user: str) -> list: async def extract_entities(self, request, query: str, upappid: str, apiname: str, user: str) -> list:
"""调用实体识别服务""" """调用实体识别服务"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for entity extraction, query={query[:100]}")
try: try:
if not query: if not query:
raise ValueError("查询文本不能为空") raise ValueError("查询文本不能为空")
query = re.sub(r'[\x00-\x1F\x7F]', '', query) # 清理输入
uapi = UAPI(request, DictObject(**globals())) uapi = UAPI(request, DictObject(**globals()))
params_kw = {"query": query} params_kw = {"query": query}
b = await uapi.call(upappid, apiname, user, params_kw) b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8')) d = await self.handle_uapi_response(b, upappid, apiname, "实体识别服务", request_id)
if d.get("object") != "list" or not d.get("data"): if d.get("object") != "list" or not d.get("data"):
error(f"实体识别服务响应格式错误: {d}") error(f"request #{request_id} 实体识别服务响应格式错误: {d}")
raise RuntimeError("实体识别服务响应格式错误") raise RuntimeError("实体识别服务响应格式错误")
entities = d["data"] entities = d["data"]
unique_entities = list(dict.fromkeys(entities)) unique_entities = list(dict.fromkeys(entities))
debug(f"成功提取 {len(unique_entities)} 个唯一实体") debug(f"request #{request_id} 成功提取 {len(unique_entities)} 个唯一实体")
return unique_entities return unique_entities
except Exception as e: except Exception as e:
error(f"实体识别服务调用失败: {str(e)}") error(f"request #{request_id} 实体识别服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
return [] return []
# 三元组抽取服务 (Babelscape/mrebel-large) # 三元组抽取服务 (Babelscape/mrebel-large)
async def extract_triples(self, request, text: str, upappid: str, apiname: str, user: str) -> list: async def extract_triples(self, request, text: str, upappid: str, apiname: str, user: str) -> list:
"""调用三元组抽取服务""" """调用三元组抽取服务"""
request_id = str(uuid.uuid4()) request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for triples extraction") debug(f"Request #{request_id} started for triples extraction, text={text[:100]}")
try: try:
text = re.sub(r'[\x00-\x1F\x7F]', '', text) # 清理输入
uapi = UAPI(request, DictObject(**globals())) uapi = UAPI(request, DictObject(**globals()))
params_kw = {"text": text} params_kw = {"text": text}
b = await uapi.call(upappid, apiname, user, params_kw) b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8')) d = await self.handle_uapi_response(b, upappid, apiname, "三元组抽取服务", request_id)
if d.get("object") != "list": if d.get("object") != "list":
error(f"request #{request_id} invalid response format: {d}") error(f"request #{request_id} 三元组抽取服务响应格式错误: {d}")
raise RuntimeError("三元组抽取服务响应格式错误") raise RuntimeError("三元组抽取服务响应格式错误")
triples = d["data"] triples = d.get("data", []) # 允许 data 为空
debug(f"request #{request_id} extracted {len(triples)} triples") debug(f"request #{request_id} extracted {len(triples)} triples")
return triples return triples
except Exception as e: except Exception as e:
error(f"request #{request_id} failed to extract triples: {str(e)}") error(f"request #{request_id} failed to extract triples: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"三元组抽取服务调用失败: {str(e)}") return []
# 重排序服务 (BAAI/bge-reranker-v2-m3) # 重排序服务 (BAAI/bge-reranker-v2-m3)
async def rerank_results(self, request, query: str, results: list, top_n: int, upappid: str, apiname: str, user: str) -> list: async def rerank_results(self, request, query: str, results: list, top_n: int, upappid: str, apiname: str, user: str) -> list:
"""调用重排序服务""" """调用重排序服务"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for reranking, query={query[:100]}")
try: try:
if not results: if not results:
debug("无结果需要重排序") debug(f"request #{request_id} 无结果需要重排序")
return results return results
if not isinstance(top_n, int) or top_n < 1: if not isinstance(top_n, int) or top_n < 1:
debug(f"无效的 top_n 参数: {top_n}, 使用 len(results)={len(results)}") debug(f"request #{request_id} 无效的 top_n 参数: {top_n}, 使用 len(results)={len(results)}")
top_n = len(results) top_n = len(results)
else: else:
top_n = min(top_n, len(results)) top_n = min(top_n, len(results))
documents = [re.sub(r'[\x00-\x1F\x7F]', '', result.get("text", str(result))) for result in results]
documents = [result.get("text", str(result)) for result in results] query = re.sub(r'[\x00-\x1F\x7F]', '', query)
uapi = UAPI(request, DictObject(**globals())) uapi = UAPI(request, DictObject(**globals()))
params_kw = { params_kw = {
"model": "rerank-001", "model": "rerank-001",
@ -107,9 +125,9 @@ class APIService:
"top_n": top_n "top_n": top_n
} }
b = await uapi.call(upappid, apiname, user, params_kw) b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8')) d = await self.handle_uapi_response(b, upappid, apiname, "重排序服务", request_id)
if d.get("object") != "rerank.result" or not d.get("data"): if d.get("object") != "rerank.result" or not d.get("data"):
error(f"重排序服务响应格式错误: {d}") error(f"request #{request_id} 重排序服务响应格式错误: {d}")
raise RuntimeError("重排序服务响应格式错误") raise RuntimeError("重排序服务响应格式错误")
rerank_data = d["data"] rerank_data = d["data"]
reranked_results = [] reranked_results = []
@ -118,40 +136,52 @@ class APIService:
if index < len(results): if index < len(results):
results[index]["rerank_score"] = item["relevance_score"] results[index]["rerank_score"] = item["relevance_score"]
reranked_results.append(results[index]) reranked_results.append(results[index])
debug(f"成功重排序 {len(reranked_results)} 条结果") debug(f"request #{request_id} 成功重排序 {len(reranked_results)} 条结果")
return reranked_results[:top_n] return reranked_results[:top_n]
except Exception as e: except Exception as e:
error(f"重排序服务调用失败: {str(e)}") error(f"request #{request_id} 重排序服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
return results return results
# Neo4j 服务 # Neo4j 服务
async def neo4j_docs(self, request, upappid: str, apiname: str, user: str) -> str: async def neo4j_docs(self, request, upappid: str, apiname: str, user: str) -> str:
"""获取 Neo4j 文档(返回文本格式)""" """获取 Neo4j 文档(返回文本格式)"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Neo4j docs")
try: try:
uapi = UAPI(request, DictObject(**globals())) uapi = UAPI(request, DictObject(**globals()))
params_kw = {} params_kw = {}
b = await uapi.call(upappid, apiname, user, params_kw) b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8')) d = await self.handle_uapi_response(b, upappid, apiname, "Neo4j 文档服务", request_id)
if d.get("status") != 200: if d.get("status") != 200 or not d.get("text"):
error(f"Neo4j 文档请求失败,状态码: {d.get('status')}") error(f"request #{request_id} Neo4j 文档请求格式错误: {d}")
raise RuntimeError(f"Neo4j 文档请求失败: {d.get('status')}") raise RuntimeError(f"Neo4j 文档请求失败: {d.get('status')}")
text = d.get("text") text = d["text"]
debug(f"Neo4j 文档内容: {text}") debug(f"request #{request_id} Neo4j 文档内容: {text[:500]}")
return text return text
except Exception as e: except Exception as e:
error(f"Neo4j 文档请求失败: {str(e)}") error(f"request #{request_id} Neo4j 文档请求失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Neo4j 文档请求失败: {str(e)}") raise RuntimeError(f"Neo4j 文档请求失败: {str(e)}")
async def neo4j_initialize(self, request, upappid: str, apiname: str, user: str) -> Dict[str, Any]: async def neo4j_initialize(self, request, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""初始化 Neo4j 服务""" """初始化 Neo4j 服务"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Neo4j initialization")
try:
uapi = UAPI(request, DictObject(**globals())) uapi = UAPI(request, DictObject(**globals()))
params_kw = {} params_kw = {}
b = await uapi.call(upappid, apiname, user, params_kw) b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8')) d = await self.handle_uapi_response(b, upappid, apiname, "Neo4j 初始化服务", request_id)
debug(f"request #{request_id} Neo4j 初始化成功: {d}")
return d return d
except Exception as e:
error(f"request #{request_id} Neo4j 初始化服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Neo4j 初始化服务调用失败: {str(e)}")
async def neo4j_insert_triples(self, request, triples: list, document_id: str, knowledge_base_id: str, userid: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]: async def neo4j_insert_triples(self, request, triples: list, document_id: str, knowledge_base_id: str, userid: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""插入三元组到 Neo4j""" """插入三元组到 Neo4j"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Neo4j triples insertion")
try:
uapi = UAPI(request, DictObject(**globals())) uapi = UAPI(request, DictObject(**globals()))
params_kw = { params_kw = {
"triples": triples, "triples": triples,
@ -160,27 +190,49 @@ class APIService:
"userid": userid "userid": userid
} }
b = await uapi.call(upappid, apiname, user, params_kw) b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8')) d = await self.handle_uapi_response(b, upappid, apiname, "Neo4j 三元组插入服务", request_id)
debug(f"request #{request_id} Neo4j 三元组插入成功: {d}")
return d return d
except Exception as e:
error(f"request #{request_id} Neo4j 三元组插入服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Neo4j 三元组插入服务调用失败: {str(e)}")
async def neo4j_delete_document(self, request, document_id: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]: async def neo4j_delete_document(self, request, document_id: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""删除指定文档""" """删除指定文档"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Neo4j document deletion")
try:
uapi = UAPI(request, DictObject(**globals())) uapi = UAPI(request, DictObject(**globals()))
params_kw = {"document_id": document_id} params_kw = {"document_id": document_id}
b = await uapi.call(upappid, apiname, user, params_kw) b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8')) d = await self.handle_uapi_response(b, upappid, apiname, "Neo4j 文档删除服务", request_id)
debug(f"request #{request_id} Neo4j 文档删除成功: {d}")
return d return d
except Exception as e:
error(f"request #{request_id} Neo4j 文档删除服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Neo4j 文档删除服务调用失败: {str(e)}")
async def neo4j_delete_knowledgebase(self, request, userid: str, knowledge_base_id: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]: async def neo4j_delete_knowledgebase(self, request, userid: str, knowledge_base_id: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""删除用户知识库""" """删除用户知识库"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Neo4j knowledgebase deletion")
try:
uapi = UAPI(request, DictObject(**globals())) uapi = UAPI(request, DictObject(**globals()))
params_kw = {"userid": userid, "knowledge_base_id": knowledge_base_id} params_kw = {"userid": userid, "knowledge_base_id": knowledge_base_id}
b = await uapi.call(upappid, apiname, user, params_kw) b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8')) d = await self.handle_uapi_response(b, upappid, apiname, "Neo4j 知识库删除服务", request_id)
debug(f"request #{request_id} Neo4j 知识库删除成功: {d}")
return d return d
except Exception as e:
error(f"request #{request_id} Neo4j 知识库删除服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Neo4j 知识库删除服务调用失败: {str(e)}")
async def neo4j_match_triplets(self, request, query: str, query_entities: list, userid: str, knowledge_base_id: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]: async def neo4j_match_triplets(self, request, query: str, query_entities: list, userid: str, knowledge_base_id: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""根据实体匹配相关三元组""" """根据实体匹配相关三元组"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Neo4j triplets matching, query={query[:100]}")
try:
query = re.sub(r'[\x00-\x1F\x7F]', '', query)
uapi = UAPI(request, DictObject(**globals())) uapi = UAPI(request, DictObject(**globals()))
params_kw = { params_kw = {
"query": query, "query": query,
@ -189,39 +241,67 @@ class APIService:
"knowledge_base_id": knowledge_base_id "knowledge_base_id": knowledge_base_id
} }
b = await uapi.call(upappid, apiname, user, params_kw) b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8')) d = await self.handle_uapi_response(b, upappid, apiname, "Neo4j 三元组匹配服务", request_id)
debug(f"request #{request_id} Neo4j 三元组匹配成功: {d}")
return d return d
except Exception as e:
error(f"request #{request_id} Neo4j 三元组匹配服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Neo4j 三元组匹配服务调用失败: {str(e)}")
# Milvus 服务 # Milvus 服务
async def milvus_create_collection(self, request, upappid: str, apiname: str, user: str, db_type: str = "") -> Dict[str, Any]: async def milvus_create_collection(self, request, upappid: str, apiname: str, user: str, db_type: str = "") -> Dict[str, Any]:
"""创建 Milvus 集合""" """创建 Milvus 集合"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus collection creation")
try:
uapi = UAPI(request, DictObject(**globals())) uapi = UAPI(request, DictObject(**globals()))
params_kw = {"db_type": db_type} params_kw = {"db_type": db_type}
b = await uapi.call(upappid, apiname, user, params_kw) b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8')) d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 集合创建服务", request_id)
debug(f"request #{request_id} Milvus 集合创建成功: {d}")
return d return d
except Exception as e:
error(f"request #{request_id} Milvus 集合创建服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 集合创建服务调用失败: {str(e)}")
async def milvus_delete_collection(self, request, upappid: str, apiname: str, user: str, db_type: str = "") -> Dict[str, Any]: async def milvus_delete_collection(self, request, upappid: str, apiname: str, user: str, db_type: str = "") -> Dict[str, Any]:
"""删除 Milvus 集合""" """删除 Milvus 集合"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus collection deletion")
try:
uapi = UAPI(request, DictObject(**globals())) uapi = UAPI(request, DictObject(**globals()))
params_kw = {"db_type": db_type} params_kw = {"db_type": db_type}
b = await uapi.call(upappid, apiname, user, params_kw) b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8')) d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 集合删除服务", request_id)
debug(f"request #{request_id} Milvus 集合删除成功: {d}")
return d return d
except Exception as e:
error(f"request #{request_id} Milvus 集合删除服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 集合删除服务调用失败: {str(e)}")
async def milvus_insert_document(self, request, chunks: List[Dict], upappid: str, apiname: str, user: str, db_type: str = "") -> Dict[str, Any]: async def milvus_insert_document(self, request, chunks: List[Dict], upappid: str, apiname: str, user: str, db_type: str = "") -> Dict[str, Any]:
"""添加 Milvus 记录""" """添加 Milvus 记录"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus document insertion")
try:
uapi = UAPI(request, DictObject(**globals())) uapi = UAPI(request, DictObject(**globals()))
params_kw = { params_kw = {
"chunks": chunks, "chunks": chunks,
"dbtype": db_type "dbtype": db_type
} }
b = await uapi.call(upappid, apiname, user, params_kw) b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8')) d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 文档插入服务", request_id)
debug(f"request #{request_id} Milvus 文档插入成功: {d}")
return d return d
except Exception as e:
error(f"request #{request_id} Milvus 文档插入服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 文档插入服务调用失败: {str(e)}")
async def milvus_delete_document(self, request, userid: str, file_path: str, knowledge_base_id: str, document_id: str, upappid: str, apiname: str, user: str, db_type: str = "") -> Dict[str, Any]: async def milvus_delete_document(self, request, userid: str, file_path: str, knowledge_base_id: str, document_id: str, upappid: str, apiname: str, user: str, db_type: str = "") -> Dict[str, Any]:
"""删除 Milvus 记录""" """删除 Milvus 记录"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus document deletion")
try:
uapi = UAPI(request, DictObject(**globals())) uapi = UAPI(request, DictObject(**globals()))
params_kw = { params_kw = {
"userid": userid, "userid": userid,
@ -231,19 +311,33 @@ class APIService:
"dbtype": db_type "dbtype": db_type
} }
b = await uapi.call(upappid, apiname, user, params_kw) b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8')) d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 文档删除服务", request_id)
debug(f"request #{request_id} Milvus 文档删除成功: {d}")
return d return d
except Exception as e:
error(f"request #{request_id} Milvus 文档删除服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 文档删除服务调用失败: {str(e)}")
async def milvus_delete_knowledgebase(self, request, userid: str, knowledge_base_id: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]: async def milvus_delete_knowledgebase(self, request, userid: str, knowledge_base_id: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""删除 Milvus 知识库""" """删除 Milvus 知识库"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus knowledgebase deletion")
try:
uapi = UAPI(request, DictObject(**globals())) uapi = UAPI(request, DictObject(**globals()))
params_kw = {"userid": userid, "knowledge_base_id": knowledge_base_id} params_kw = {"userid": userid, "knowledge_base_id": knowledge_base_id}
b = await uapi.call(upappid, apiname, user, params_kw) b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8')) d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 知识库删除服务", request_id)
debug(f"request #{request_id} Milvus 知识库删除成功: {d}")
return d return d
except Exception as e:
error(f"request #{request_id} Milvus 知识库删除服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 知识库删除服务调用失败: {str(e)}")
async def milvus_search_query(self, request, query_vector: List[float], userid: str, knowledge_base_ids: list, limit: int, offset: int, upappid: str, apiname: str, user: str) -> Dict[str, Any]: async def milvus_search_query(self, request, query_vector: List[float], userid: str, knowledge_base_ids: list, limit: int, offset: int, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""根据用户知识库检索 Milvus""" """根据用户知识库检索 Milvus"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus search")
try:
uapi = UAPI(request, DictObject(**globals())) uapi = UAPI(request, DictObject(**globals()))
params_kw = { params_kw = {
"query_vector": query_vector, "query_vector": query_vector,
@ -253,21 +347,39 @@ class APIService:
"offset": offset "offset": offset
} }
b = await uapi.call(upappid, apiname, user, params_kw) b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8')) d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 检索服务", request_id)
debug(f"request #{request_id} Milvus 检索成功: {d}")
return d return d
except Exception as e:
error(f"request #{request_id} Milvus 检索服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 检索服务调用失败: {str(e)}")
async def milvus_list_user_files(self, request, userid: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]: async def milvus_list_user_files(self, request, userid: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""列出 Milvus 用户知识库列表""" """列出 Milvus 用户知识库列表"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus user files listing")
try:
uapi = UAPI(request, DictObject(**globals())) uapi = UAPI(request, DictObject(**globals()))
params_kw = {"userid": userid} params_kw = {"userid": userid}
b = await uapi.call(upappid, apiname, user, params_kw) b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8')) d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 用户文件列表服务", request_id)
debug(f"request #{request_id} Milvus 用户文件列表成功: {d}")
return d return d
except Exception as e:
error(f"request #{request_id} Milvus 用户文件列表服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 用户文件列表服务调用失败: {str(e)}")
async def milvus_list_all_knowledgebases(self, request, upappid: str, apiname: str, user: str) -> Dict[str, Any]: async def milvus_list_all_knowledgebases(self, request, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""列出 Milvus 数据库中所有数据""" """列出 Milvus 数据库中所有数据"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus all knowledgebases listing")
try:
uapi = UAPI(request, DictObject(**globals())) uapi = UAPI(request, DictObject(**globals()))
params_kw = {} params_kw = {}
b = await uapi.call(upappid, apiname, user, params_kw) b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8')) d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 知识库列表服务", request_id)
debug(f"request #{request_id} Milvus 知识库列表成功: {d}")
return d return d
except Exception as e:
error(f"request #{request_id} Milvus 知识库列表服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 知识库列表服务调用失败: {str(e)}")