This commit is contained in:
wangmeihua 2025-08-13 16:22:30 +08:00
parent c25a9c05ca
commit 4b733827af

View File

@ -2,6 +2,7 @@ from appPublic.log import debug, error
from typing import Dict, Any, List
import uuid
import json
import re
from ahserver.serverenv import ServerEnv
from uapi.appapi import UAPI
from appPublic.dictObject import DictObject
@ -9,59 +10,74 @@ from appPublic.dictObject import DictObject
class APIService:
"""处理 API 请求的服务类"""
async def handle_uapi_response(self, b: bytes, upappid: str, apiname: str, service_name: str, request_id: str = None) -> dict:
"""通用处理 uapi.call 响应"""
log_prefix = f"request #{request_id} " if request_id else ""
if not b:
error(f"{log_prefix}{service_name} 返回空响应: upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"{service_name} 返回空响应")
try:
response_text = b.decode('utf-8')
except UnicodeDecodeError as decode_err:
error(f"{log_prefix}{service_name} 响应解码失败: {str(decode_err)}, 原始响应: {b[:100]}")
raise RuntimeError(f"响应解码失败: {str(decode_err)}")
# 清理响应中的控制字符
response_text = re.sub(r'[\x00-\x1F\x7F]', '', response_text)
debug(f"{log_prefix}{service_name} 原始响应: {response_text[:500]}")
try:
d = json.loads(response_text)
except json.JSONDecodeError as json_err:
error(f"{log_prefix}{service_name} JSON 解析失败: {str(json_err)}, 响应内容: {response_text[:500]}")
raise RuntimeError(f"JSON 解析失败: {str(json_err)}")
if not isinstance(d, dict):
error(f"{log_prefix}{service_name} 响应不是有效字典: {d}")
raise RuntimeError(f"{service_name} 响应格式错误")
return d
# 嵌入服务 (BAAI/bge-m3)
async def get_embeddings(self, request, texts: list, upappid: str, apiname: str, user: str) -> list:
"""调用嵌入服务获取文本向量"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for embeddings, texts={texts[:2]}")
try:
# 清理输入文本
texts = [re.sub(r'[\x00-\x1F\x7F]', '', text) for text in texts]
uapi = UAPI(request, DictObject(**globals()))
debug(
f'{uapi=}, {type(uapi.call)}, upappid={upappid}, apiname={apiname}, user={user}, texts={texts[:2]}') # 仅记录前两个文本以避免日志过长
params_kw = {"input": texts}
b = await uapi.call(upappid, apiname, user, params_kw)
# 检查响应内容
if not b:
error(f"嵌入服务返回空响应: upappid={upappid}, apiname={apiname}")
raise RuntimeError("嵌入服务返回空响应")
try:
response_text = b.decode('utf-8')
except UnicodeDecodeError as decode_err:
error(f"响应解码失败: {str(decode_err)}, 原始响应: {b[:100]}") # 记录前100字节
raise RuntimeError(f"响应解码失败: {str(decode_err)}")
debug(f"嵌入服务原始响应: {response_text[:500]}") # 记录前500字符以避免日志过长
try:
d = json.loads(response_text)
except json.JSONDecodeError as json_err:
error(f"JSON 解析失败: {str(json_err)}, 响应内容: {response_text[:500]}")
raise RuntimeError(f"JSON 解析失败: {str(json_err)}")
d = await self.handle_uapi_response(b, upappid, apiname, "嵌入服务", request_id)
if d.get("object") != "list" or not d.get("data"):
error(f"嵌入服务响应格式错误: {d}")
error(f"request #{request_id} 嵌入服务响应格式错误: {d}")
raise RuntimeError("嵌入服务响应格式错误")
embeddings = [item["embedding"] for item in d["data"]]
debug(f"成功获取 {len(embeddings)} 个嵌入向量")
debug(f"request #{request_id} 成功获取 {len(embeddings)} 个嵌入向量")
return embeddings
except Exception as e:
error(f"嵌入服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
error(f"request #{request_id} 嵌入服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"嵌入服务调用失败: {str(e)}")
# 实体提取服务 (LTP/small)
async def extract_entities(self, request, query: str, upappid: str, apiname: str, user: str) -> list:
"""调用实体识别服务"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for entity extraction, query={query[:100]}")
try:
if not query:
raise ValueError("查询文本不能为空")
query = re.sub(r'[\x00-\x1F\x7F]', '', query) # 清理输入
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"query": query}
b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8'))
d = await self.handle_uapi_response(b, upappid, apiname, "实体识别服务", request_id)
if d.get("object") != "list" or not d.get("data"):
error(f"实体识别服务响应格式错误: {d}")
error(f"request #{request_id} 实体识别服务响应格式错误: {d}")
raise RuntimeError("实体识别服务响应格式错误")
entities = d["data"]
unique_entities = list(dict.fromkeys(entities))
debug(f"成功提取 {len(unique_entities)} 个唯一实体")
debug(f"request #{request_id} 成功提取 {len(unique_entities)} 个唯一实体")
return unique_entities
except Exception as e:
error(f"实体识别服务调用失败: {str(e)}")
error(f"request #{request_id} 实体识别服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
return []
# 三元组抽取服务 (Babelscape/mrebel-large)
@ -70,53 +86,37 @@ class APIService:
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for triples extraction, text={text[:100]}")
try:
# 清理输入文本,移除控制字符
import re
text = re.sub(r'[\x00-\x1F\x7F]', '', text) # 移除 ASCII 控制字符
text = re.sub(r'[\x00-\x1F\x7F]', '', text) # 清理输入
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"text": text}
b = await uapi.call(upappid, apiname, user, params_kw)
if not b:
error(f"request #{request_id} 三元组抽取服务返回空响应: upappid={upappid}, apiname={apiname}")
raise RuntimeError("三元组抽取服务返回空响应")
try:
response_text = b.decode('utf-8')
except UnicodeDecodeError as decode_err:
error(f"request #{request_id} 响应解码失败: {str(decode_err)}, 原始响应: {b[:100]}")
raise RuntimeError(f"响应解码失败: {str(decode_err)}")
debug(f"request #{request_id} 三元组抽取服务原始响应: {response_text[:500]}")
# 清理响应字符串,移除控制字符
response_text = re.sub(r'[\x00-\x1F\x7F]', '', response_text)
try:
d = json.loads(response_text)
except json.JSONDecodeError as json_err:
error(f"request #{request_id} JSON 解析失败: {str(json_err)}, 响应内容: {response_text[:500]}")
raise RuntimeError(f"JSON 解析失败: {str(json_err)}")
if d.get("object") != "list" or not d.get("data"):
d = await self.handle_uapi_response(b, upappid, apiname, "三元组抽取服务", request_id)
if d.get("object") != "list":
error(f"request #{request_id} 三元组抽取服务响应格式错误: {d}")
raise RuntimeError("三元组抽取服务响应格式错误")
triples = d["data"]
triples = d.get("data", []) # 允许 data 为空
debug(f"request #{request_id} extracted {len(triples)} triples")
return triples
except Exception as e:
error(f"request #{request_id} failed to extract triples: {str(e)}, upappid={upappid}, apiname={apiname}")
return [] # 根据业务需求返回空列表
return []
# 重排序服务 (BAAI/bge-reranker-v2-m3)
async def rerank_results(self, request, query: str, results: list, top_n: int, upappid: str, apiname: str, user: str) -> list:
"""调用重排序服务"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for reranking, query={query[:100]}")
try:
if not results:
debug("无结果需要重排序")
debug(f"request #{request_id} 无结果需要重排序")
return results
if not isinstance(top_n, int) or top_n < 1:
debug(f"无效的 top_n 参数: {top_n}, 使用 len(results)={len(results)}")
debug(f"request #{request_id} 无效的 top_n 参数: {top_n}, 使用 len(results)={len(results)}")
top_n = len(results)
else:
top_n = min(top_n, len(results))
documents = [result.get("text", str(result)) for result in results]
documents = [re.sub(r'[\x00-\x1F\x7F]', '', result.get("text", str(result))) for result in results]
query = re.sub(r'[\x00-\x1F\x7F]', '', query)
uapi = UAPI(request, DictObject(**globals()))
params_kw = {
"model": "rerank-001",
@ -125,9 +125,9 @@ class APIService:
"top_n": top_n
}
b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8'))
d = await self.handle_uapi_response(b, upappid, apiname, "重排序服务", request_id)
if d.get("object") != "rerank.result" or not d.get("data"):
error(f"重排序服务响应格式错误: {d}")
error(f"request #{request_id} 重排序服务响应格式错误: {d}")
raise RuntimeError("重排序服务响应格式错误")
rerank_data = d["data"]
reranked_results = []
@ -136,156 +136,250 @@ class APIService:
if index < len(results):
results[index]["rerank_score"] = item["relevance_score"]
reranked_results.append(results[index])
debug(f"成功重排序 {len(reranked_results)} 条结果")
debug(f"request #{request_id} 成功重排序 {len(reranked_results)} 条结果")
return reranked_results[:top_n]
except Exception as e:
error(f"重排序服务调用失败: {str(e)}")
error(f"request #{request_id} 重排序服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
return results
# Neo4j 服务
async def neo4j_docs(self, request, upappid: str, apiname: str, user: str) -> str:
"""获取 Neo4j 文档(返回文本格式)"""
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Neo4j docs")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {}
b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8'))
if d.get("status") != 200:
error(f"Neo4j 文档请求失败,状态码: {d.get('status')}")
d = await self.handle_uapi_response(b, upappid, apiname, "Neo4j 文档服务", request_id)
if d.get("status") != 200 or not d.get("text"):
error(f"request #{request_id} Neo4j 文档请求格式错误: {d}")
raise RuntimeError(f"Neo4j 文档请求失败: {d.get('status')}")
text = d.get("text")
debug(f"Neo4j 文档内容: {text}")
text = d["text"]
debug(f"request #{request_id} Neo4j 文档内容: {text[:500]}")
return text
except Exception as e:
error(f"Neo4j 文档请求失败: {str(e)}")
error(f"request #{request_id} Neo4j 文档请求失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Neo4j 文档请求失败: {str(e)}")
async def neo4j_initialize(self, request, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""初始化 Neo4j 服务"""
uapi = UAPI(request, DictObject(**globals()))
params_kw = {}
b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8'))
return d
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Neo4j initialization")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Neo4j 初始化服务", request_id)
debug(f"request #{request_id} Neo4j 初始化成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Neo4j 初始化服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Neo4j 初始化服务调用失败: {str(e)}")
async def neo4j_insert_triples(self, request, triples: list, document_id: str, knowledge_base_id: str, userid: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""插入三元组到 Neo4j"""
uapi = UAPI(request, DictObject(**globals()))
params_kw = {
"triples": triples,
"document_id": document_id,
"knowledge_base_id": knowledge_base_id,
"userid": userid
}
b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8'))
return d
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Neo4j triples insertion")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {
"triples": triples,
"document_id": document_id,
"knowledge_base_id": knowledge_base_id,
"userid": userid
}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Neo4j 三元组插入服务", request_id)
debug(f"request #{request_id} Neo4j 三元组插入成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Neo4j 三元组插入服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Neo4j 三元组插入服务调用失败: {str(e)}")
async def neo4j_delete_document(self, request, document_id: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""删除指定文档"""
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"document_id": document_id}
b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8'))
return d
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Neo4j document deletion")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"document_id": document_id}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Neo4j 文档删除服务", request_id)
debug(f"request #{request_id} Neo4j 文档删除成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Neo4j 文档删除服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Neo4j 文档删除服务调用失败: {str(e)}")
async def neo4j_delete_knowledgebase(self, request, userid: str, knowledge_base_id: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""删除用户知识库"""
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"userid": userid, "knowledge_base_id": knowledge_base_id}
b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8'))
return d
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Neo4j knowledgebase deletion")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"userid": userid, "knowledge_base_id": knowledge_base_id}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Neo4j 知识库删除服务", request_id)
debug(f"request #{request_id} Neo4j 知识库删除成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Neo4j 知识库删除服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Neo4j 知识库删除服务调用失败: {str(e)}")
async def neo4j_match_triplets(self, request, query: str, query_entities: list, userid: str, knowledge_base_id: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""根据实体匹配相关三元组"""
uapi = UAPI(request, DictObject(**globals()))
params_kw = {
"query": query,
"query_entities": query_entities,
"userid": userid,
"knowledge_base_id": knowledge_base_id
}
b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8'))
return d
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Neo4j triplets matching, query={query[:100]}")
try:
query = re.sub(r'[\x00-\x1F\x7F]', '', query)
uapi = UAPI(request, DictObject(**globals()))
params_kw = {
"query": query,
"query_entities": query_entities,
"userid": userid,
"knowledge_base_id": knowledge_base_id
}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Neo4j 三元组匹配服务", request_id)
debug(f"request #{request_id} Neo4j 三元组匹配成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Neo4j 三元组匹配服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Neo4j 三元组匹配服务调用失败: {str(e)}")
# Milvus 服务
async def milvus_create_collection(self, request, upappid: str, apiname: str, user: str, db_type: str = "") -> Dict[str, Any]:
"""创建 Milvus 集合"""
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"db_type": db_type}
b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8'))
return d
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus collection creation")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"db_type": db_type}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 集合创建服务", request_id)
debug(f"request #{request_id} Milvus 集合创建成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Milvus 集合创建服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 集合创建服务调用失败: {str(e)}")
async def milvus_delete_collection(self, request, upappid: str, apiname: str, user: str, db_type: str = "") -> Dict[str, Any]:
"""删除 Milvus 集合"""
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"db_type": db_type}
b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8'))
return d
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus collection deletion")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"db_type": db_type}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 集合删除服务", request_id)
debug(f"request #{request_id} Milvus 集合删除成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Milvus 集合删除服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 集合删除服务调用失败: {str(e)}")
async def milvus_insert_document(self, request, chunks: List[Dict], upappid: str, apiname: str, user: str, db_type: str = "") -> Dict[str, Any]:
"""添加 Milvus 记录"""
uapi = UAPI(request, DictObject(**globals()))
params_kw = {
"chunks": chunks,
"dbtype": db_type
}
b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8'))
return d
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus document insertion")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {
"chunks": chunks,
"dbtype": db_type
}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 文档插入服务", request_id)
debug(f"request #{request_id} Milvus 文档插入成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Milvus 文档插入服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 文档插入服务调用失败: {str(e)}")
async def milvus_delete_document(self, request, userid: str, file_path: str, knowledge_base_id: str, document_id: str, upappid: str, apiname: str, user: str, db_type: str = "") -> Dict[str, Any]:
"""删除 Milvus 记录"""
uapi = UAPI(request, DictObject(**globals()))
params_kw = {
"userid": userid,
"file_path": file_path,
"knowledge_base_id": knowledge_base_id,
"document_id": document_id,
"dbtype": db_type
}
b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8'))
return d
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus document deletion")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {
"userid": userid,
"file_path": file_path,
"knowledge_base_id": knowledge_base_id,
"document_id": document_id,
"dbtype": db_type
}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 文档删除服务", request_id)
debug(f"request #{request_id} Milvus 文档删除成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Milvus 文档删除服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 文档删除服务调用失败: {str(e)}")
async def milvus_delete_knowledgebase(self, request, userid: str, knowledge_base_id: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""删除 Milvus 知识库"""
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"userid": userid, "knowledge_base_id": knowledge_base_id}
b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8'))
return d
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus knowledgebase deletion")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"userid": userid, "knowledge_base_id": knowledge_base_id}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 知识库删除服务", request_id)
debug(f"request #{request_id} Milvus 知识库删除成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Milvus 知识库删除服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 知识库删除服务调用失败: {str(e)}")
async def milvus_search_query(self, request, query_vector: List[float], userid: str, knowledge_base_ids: list, limit: int, offset: int, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""根据用户知识库检索 Milvus"""
uapi = UAPI(request, DictObject(**globals()))
params_kw = {
"query_vector": query_vector,
"userid": userid,
"knowledge_base_ids": knowledge_base_ids,
"limit": limit,
"offset": offset
}
b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8'))
return d
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus search")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {
"query_vector": query_vector,
"userid": userid,
"knowledge_base_ids": knowledge_base_ids,
"limit": limit,
"offset": offset
}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 检索服务", request_id)
debug(f"request #{request_id} Milvus 检索成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Milvus 检索服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 检索服务调用失败: {str(e)}")
async def milvus_list_user_files(self, request, userid: str, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""列出 Milvus 用户知识库列表"""
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"userid": userid}
b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8'))
return d
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus user files listing")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {"userid": userid}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 用户文件列表服务", request_id)
debug(f"request #{request_id} Milvus 用户文件列表成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Milvus 用户文件列表服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 用户文件列表服务调用失败: {str(e)}")
async def milvus_list_all_knowledgebases(self, request, upappid: str, apiname: str, user: str) -> Dict[str, Any]:
"""列出 Milvus 数据库中所有数据"""
uapi = UAPI(request, DictObject(**globals()))
params_kw = {}
b = await uapi.call(upappid, apiname, user, params_kw)
d = json.loads(b.decode('utf-8'))
return d
request_id = str(uuid.uuid4())
debug(f"Request #{request_id} started for Milvus all knowledgebases listing")
try:
uapi = UAPI(request, DictObject(**globals()))
params_kw = {}
b = await uapi.call(upappid, apiname, user, params_kw)
d = await self.handle_uapi_response(b, upappid, apiname, "Milvus 知识库列表服务", request_id)
debug(f"request #{request_id} Milvus 知识库列表成功: {d}")
return d
except Exception as e:
error(f"request #{request_id} Milvus 知识库列表服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}")
raise RuntimeError(f"Milvus 知识库列表服务调用失败: {str(e)}")