增加上传/删除文档功能

This commit is contained in:
wangmeihua 2025-08-07 10:55:03 +08:00
parent 296e5a739a
commit 4ea743a1fe
3 changed files with 154 additions and 80 deletions

View File

@ -48,7 +48,7 @@ class APIService:
try: try:
async with ClientSession() as session: async with ClientSession() as session:
async with session.post( async with session.post(
"http://222.81.167.136:9998/v1/embeddings", "https://embedding.opencomputing.net:10443/v1/embeddings", # 使用外网地址
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
json={"input": texts if isinstance(texts, list) else [texts]} json={"input": texts if isinstance(texts, list) else [texts]}
) as response: ) as response:
@ -74,7 +74,7 @@ class APIService:
raise ValueError("查询文本不能为空") raise ValueError("查询文本不能为空")
async with ClientSession() as session: async with ClientSession() as session:
async with session.post( async with session.post(
"http://222.81.167.136:9990/v1/entities", "https://entities.opencomputing.net:10443/v1/entities", # 使用外网地址
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
json={"query": query} json={"query": query}
) as response: ) as response:
@ -104,7 +104,7 @@ class APIService:
timeout=ClientTimeout(total=None) timeout=ClientTimeout(total=None)
) as session: ) as session:
async with session.post( async with session.post(
"http://222.81.167.136:9991/v1/triples", "https://triples.opencomputing.net:10443/v1/triples", # 使用外网地址
headers={"Content-Type": "application/json; charset=utf-8"}, headers={"Content-Type": "application/json; charset=utf-8"},
json={"text": text} json={"text": text}
) as response: ) as response:
@ -140,7 +140,7 @@ class APIService:
documents = [result.get("text", str(result)) for result in results] documents = [result.get("text", str(result)) for result in results]
async with ClientSession() as session: async with ClientSession() as session:
async with session.post( async with session.post(
"http://222.81.167.136:9997/v1/rerank", "https://reranker.opencomputing.net:10443/v1/rerank", # 使用外网地址
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
json={ json={
"model": "rerank-001", "model": "rerank-001",
@ -170,13 +170,20 @@ class APIService:
return results return results
# Neo4j 服务 # Neo4j 服务
async def neo4j_docs(self) -> Dict[str, Any]: async def neo4j_docs(self) -> str:
"""获取 Neo4j 文档""" """获取 Neo4j 文档(返回文本格式)"""
return await self._make_request("http://222.81.167.136:8885/docs", "docs", {}) async with ClientSession(timeout=ClientTimeout(total=300)) as session:
async with session.get("https://graphdb.opencomputing.net:10443/docs") as response:
if response.status != 200:
error(f"Neo4j 文档请求失败,状态码: {response.status}")
raise RuntimeError(f"Neo4j 文档请求失败: {response.status}")
text = await response.text() # 获取纯文本内容
debug(f"Neo4j 文档内容: {text}")
return text
async def neo4j_initialize(self) -> Dict[str, Any]: async def neo4j_initialize(self) -> Dict[str, Any]:
"""初始化 Neo4j 服务""" """初始化 Neo4j 服务"""
return await self._make_request("http://222.81.167.136:8885/v1/initialize", "initialize", {}) return await self._make_request("https://graphdb.opencomputing.net:10443/v1/initialize", "initialize", {})
async def neo4j_insert_triples(self, triples: list, document_id: str, knowledge_base_id: str, userid: str) -> Dict[str, Any]: async def neo4j_insert_triples(self, triples: list, document_id: str, knowledge_base_id: str, userid: str) -> Dict[str, Any]:
"""插入三元组到 Neo4j""" """插入三元组到 Neo4j"""
@ -186,15 +193,15 @@ class APIService:
"knowledge_base_id": knowledge_base_id, "knowledge_base_id": knowledge_base_id,
"userid": userid "userid": userid
} }
return await self._make_request("http://222.81.167.136:8885/v1/inserttriples", "inserttriples", params) return await self._make_request("https://graphdb.opencomputing.net:10443/v1/inserttriples", "inserttriples", params)
async def neo4j_delete_document(self, document_id: str) -> Dict[str, Any]: async def neo4j_delete_document(self, document_id: str) -> Dict[str, Any]:
"""删除指定文档""" """删除指定文档"""
return await self._make_request("http://222.81.167.136:8885/v1/deletedocument", "deletedocument", {"document_id": document_id}) return await self._make_request("https://graphdb.opencomputing.net:10443/v1/deletedocument", "deletedocument", {"document_id": document_id})
async def neo4j_delete_knowledgebase(self, userid: str, knowledge_base_id: str) -> Dict[str, Any]: async def neo4j_delete_knowledgebase(self, userid: str, knowledge_base_id: str) -> Dict[str, Any]:
"""删除用户知识库""" """删除用户知识库"""
return await self._make_request("http://222.81.167.136:8885/v1/deleteknowledgebase", "deleteknowledgebase", return await self._make_request("https://graphdb.opencomputing.net:10443/v1/deleteknowledgebase", "deleteknowledgebase",
{"userid": userid, "knowledge_base_id": knowledge_base_id}) {"userid": userid, "knowledge_base_id": knowledge_base_id})
async def neo4j_match_triplets(self, query: str, query_entities: list, userid: str, knowledge_base_id: str) -> Dict[str, Any]: async def neo4j_match_triplets(self, query: str, query_entities: list, userid: str, knowledge_base_id: str) -> Dict[str, Any]:
@ -205,47 +212,101 @@ class APIService:
"userid": userid, "userid": userid,
"knowledge_base_id": knowledge_base_id "knowledge_base_id": knowledge_base_id
} }
return await self._make_request("http://222.81.167.136:8885/v1/matchtriplets", "matchtriplets", params) return await self._make_request("https://graphdb.opencomputing.net:10443/v1/matchtriplets", "matchtriplets", params)
# Milvus 服务
async def milvus_create_collection(self, db_type: str = "") -> Dict[str, Any]:
"""创建 Milvus 集合"""
params = {"db_type": db_type} if db_type else {}
return await self._make_request("https://vectordb.opencomputing.net:10443/v1/createcollection", "createcollection", params)
async def milvus_delete_collection(self, db_type: str = "") -> Dict[str, Any]:
"""删除 Milvus 集合"""
params = {"db_type": db_type} if db_type else {}
return await self._make_request("https://vectordb.opencomputing.net:10443/v1/deletecollection", "deletecollection", params)
async def milvus_insert_document(self, chunks: List[Dict], db_type: str = "") -> Dict[str, Any]:
"""添加 Milvus 记录"""
params = {
"chunks": chunks,
"dbtype": db_type
}
return await self._make_request("https://vectordb.opencomputing.net:10443/v1/insertdocument", "insertdocument", params)
async def milvus_delete_document(self, userid: str, file_path: str, knowledge_base_id: str, document_id:str, db_type: str = "") -> Dict[str, Any]:
"""删除 Milvus 记录"""
params = {
"userid": userid,
"file_path": file_path,
"knowledge_base_id": knowledge_base_id,
"document_id": document_id,
"dbtype": db_type
}
return await self._make_request("https://vectordb.opencomputing.net:10443/v1/deletedocument", "deletedocument", params)
async def milvus_delete_knowledgebase(self, userid: str, knowledge_base_id: str) -> Dict[str, Any]:
"""删除 Milvus 知识库"""
return await self._make_request("https://vectordb.opencomputing.net:10443/v1/deleteknowledgebase", "deleteknowledgebase",
{"userid": userid, "knowledge_base_id": knowledge_base_id})
async def milvus_search_query(self, query: str, userid: str, knowledge_base_ids: list, limit: int, offset: int) -> Dict[str, Any]:
"""根据用户知识库检索 Milvus"""
params = {
"query": query,
"userid": userid,
"knowledge_base_ids": knowledge_base_ids,
"limit": limit,
"offset": offset
}
return await self._make_request("https://vectordb.opencomputing.net:10443/v1/searchquery", "searchquery", params)
async def milvus_list_user_files(self, userid: str) -> Dict[str, Any]:
"""列出 Milvus 用户知识库列表"""
return await self._make_request("https://vectordb.opencomputing.net:10443/v1/listuserfiles", "listuserfiles", {"userid": userid})
async def milvus_list_all_knowledgebases(self) -> Dict[str, Any]:
"""列出 Milvus 数据库中所有数据"""
return await self._make_request("https://vectordb.opencomputing.net:10443/v1/listallknowledgebases", "listallknowledgebases", {})
# RAG 服务 # RAG 服务
async def rag_create_collection(self, db_type: str = "") -> Dict[str, Any]: async def rag_create_collection(self, db_type: str = "") -> Dict[str, Any]:
"""创建集合""" """创建 RAG 集合"""
params = {"db_type": db_type} if db_type else {} params = {"db_type": db_type} if db_type else {}
return await self._make_request("http://222.81.167.136:8888/v1/createcollection", "createcollection", params) return await self._make_request("https://rag.opencomputing.net:10443/v1/createcollection", "createcollection", params)
async def rag_delete_collection(self, db_type: str = "") -> Dict[str, Any]: async def rag_delete_collection(self, db_type: str = "") -> Dict[str, Any]:
"""删除集合""" """删除 RAG 集合"""
params = {"db_type": db_type} if db_type else {} params = {"db_type": db_type} if db_type else {}
return await self._make_request("http://222.81.167.136:8888/v1/deletecollection", "deletecollection", params) return await self._make_request("https://rag.opencomputing.net:10443/v1/deletecollection", "deletecollection", params)
async def rag_insert_file(self, file_path: str, userid: str, knowledge_base_id: str, document_id: str) -> Dict[str, Any]: async def rag_insert_file(self, file_path: str, userid: str, knowledge_base_id: str, document_id: str) -> Dict[str, Any]:
"""添加记录""" """添加 RAG 记录"""
params = { params = {
"file_path": file_path, "file_path": file_path,
"userid": userid, "userid": userid,
"knowledge_base_id": knowledge_base_id, "knowledge_base_id": knowledge_base_id,
"document_id": document_id "document_id": document_id
} }
return await self._make_request("http://222.81.167.136:8888/v1/insertfile", "insertfile", params) return await self._make_request("https://rag.opencomputing.net:10443/v1/insertfile", "insertfile", params)
async def rag_delete_file(self, userid: str, file_path: str, knowledge_base_id: str, document_id: str) -> Dict[str, Any]: async def rag_delete_file(self, userid: str, file_path: str, knowledge_base_id: str, document_id: str) -> Dict[str, Any]:
"""删除记录""" """删除 RAG 记录"""
params = { params = {
"userid": userid, "userid": userid,
"file_path": file_path, "file_path": file_path,
"knowledge_base_id": knowledge_base_id, "knowledge_base_id": knowledge_base_id,
"document_id": document_id "document_id": document_id
} }
return await self._make_request("http://222.81.167.136:8888/v1/deletefile", "deletefile", params) return await self._make_request("https://rag.opencomputing.net:10443/v1/deletefile", "deletefile", params)
async def rag_delete_knowledgebase(self, userid: str, knowledge_base_id: str) -> Dict[str, Any]: async def rag_delete_knowledgebase(self, userid: str, knowledge_base_id: str) -> Dict[str, Any]:
"""删除知识库""" """删除 RAG 知识库"""
return await self._make_request("http://222.81.167.136:8888/v1/deleteknowledgebase", "deleteknowledgebase", return await self._make_request("https://rag.opencomputing.net:10443/v1/deleteknowledgebase", "deleteknowledgebase",
{"userid": userid, "knowledge_base_id": knowledge_base_id}) {"userid": userid, "knowledge_base_id": knowledge_base_id})
async def rag_search_query(self, query: str, userid: str, knowledge_base_ids: list, limit: int, offset: int, async def rag_search_query(self, query: str, userid: str, knowledge_base_ids: list, limit: int, offset: int,
use_rerank: bool) -> Dict[str, Any]: use_rerank: bool) -> Dict[str, Any]:
"""根据用户知识库检索""" """根据用户知识库检索 RAG"""
params = { params = {
"query": query, "query": query,
"userid": userid, "userid": userid,
@ -254,11 +315,11 @@ class APIService:
"offset": offset, "offset": offset,
"use_rerank": use_rerank "use_rerank": use_rerank
} }
return await self._make_request("http://222.81.167.136:8888/v1/searchquery", "searchquery", params) return await self._make_request("https://rag.opencomputing.net:10443/v1/searchquery", "searchquery", params)
async def rag_fused_search_query(self, query: str, userid: str, knowledge_base_ids: list, limit: int, offset: int, async def rag_fused_search_query(self, query: str, userid: str, knowledge_base_ids: list, limit: int, offset: int,
use_rerank: bool) -> Dict[str, Any]: use_rerank: bool) -> Dict[str, Any]:
"""根据用户知识库+知识图谱检索""" """根据用户知识库+知识图谱检索 RAG"""
params = { params = {
"query": query, "query": query,
"userid": userid, "userid": userid,
@ -267,16 +328,16 @@ class APIService:
"offset": offset, "offset": offset,
"use_rerank": use_rerank "use_rerank": use_rerank
} }
return await self._make_request("http://222.81.167.136:8888/v1/fusedsearchquery", "fusedsearchquery", params) return await self._make_request("https://rag.opencomputing.net:10443/v1/fusedsearchquery", "fusedsearchquery", params)
async def rag_list_user_files(self, userid: str) -> Dict[str, Any]: async def rag_list_user_files(self, userid: str) -> Dict[str, Any]:
"""列出用户知识库列表""" """列出 RAG 用户知识库列表"""
return await self._make_request("http://222.81.167.136:8888/v1/listuserfiles", "listuserfiles", {"userid": userid}) return await self._make_request("https://rag.opencomputing.net:10443/v1/listuserfiles", "listuserfiles", {"userid": userid})
async def rag_list_all_knowledgebases(self) -> Dict[str, Any]: async def rag_list_all_knowledgebases(self) -> Dict[str, Any]:
"""列出数据库中所有数据""" """列出 RAG 数据库中所有数据"""
return await self._make_request("http://222.81.167.136:8888/v1/listallknowledgebases", "listallknowledgebases", {}) return await self._make_request("https://rag.opencomputing.net:10443/v1/listallknowledgebases", "listallknowledgebases", {})
async def rag_docs(self) -> Dict[str, Any]: async def rag_docs(self) -> Dict[str, Any]:
"""获取 RAG 帮助文档""" """获取 RAG 帮助文档"""
return await self._make_request("http://222.81.167.136:8888/v1/docs", "docs", {}) return await self._make_request("https://rag.opencomputing.net:10443/v1/docs", "docs", {})

View File

@ -13,39 +13,23 @@ import uuid
from datetime import datetime from datetime import datetime
import traceback import traceback
from filetxt.loader import fileloader from filetxt.loader import fileloader
from ahserver.serverenv import get_serverenv
def init():
rf = RegisterFunction()
rf.register('fileuploaded', file_uploaded)
rf.register('filedeleted', file_deleted)
async def get_orgid_by_id(kdb_id): async def get_orgid_by_id(kdb_id):
""" """
根据 kdb id 查询对应的 orgid 根据 kdb id 查询对应的 orgid
""" """
dbs = {
"cfae": {
"driver": "mysql.connector",
"coding": "utf8",
"dbname": "cfae",
"kwargs": {
"user": "test",
"db": "cfae",
"password": "test123",
"host": "localhost"
}
}
}
loop = asyncio.get_event_loop()
pool = DBPools(dbs, loop)
db = DBPools() db = DBPools()
dbname = get_module_dbname('rag') # f = get_serverenv("get_module_dbname")
sql = "SELECT orgid FROM kdb WHERE id = %s" # dbname = f("rag")
dbname = "kyrag"
sql = "SELECT orgid FROM kdb WHERE id = ${id}$"
try: try:
async with db.sqlorContext(dbname) as sor: async with db.sqlorContext(dbname) as sor:
result = await sor.sql(sql, (kdb_id,)) result = await sor.sqlExe(sql,{"id":kdb_id})
if result and result.rows: print(result)
return result.rows[0][0] if result and len(result) > 0:
return result[0].get('orgid')
return None return None
except Exception as e: except Exception as e:
error(f"查询 orgid 失败: {str(e)}, 堆栈: {traceback.format_exc()}") error(f"查询 orgid 失败: {str(e)}, 堆栈: {traceback.format_exc()}")
@ -128,7 +112,7 @@ async def file_uploaded(params_kw):
debug(f"调用插入文件端点: {realpath}") debug(f"调用插入文件端点: {realpath}")
start_milvus = time.time() start_milvus = time.time()
result = await api_service.make_milvus_request("insertdocument", {"chunks": chunks_data, "db_type": db_type}) result = await api_service.milvus_insert_document(chunks_data, db_type)
timings["insert_milvus"] = time.time() - start_milvus timings["insert_milvus"] = time.time() - start_milvus
debug(f"Milvus 插入耗时: {timings['insert_milvus']:.2f}") debug(f"Milvus 插入耗时: {timings['insert_milvus']:.2f}")
@ -175,9 +159,7 @@ async def file_uploaded(params_kw):
debug(f"抽取到 {len(unique_triples)} 个三元组,调用 Neo4j 服务插入") debug(f"抽取到 {len(unique_triples)} 个三元组,调用 Neo4j 服务插入")
start_neo4j = time.time() start_neo4j = time.time()
if unique_triples: if unique_triples:
neo4j_result = await api_service.make_neo4j_request("inserttriples", { neo4j_result = await api_service.neo4j_insert_triples(unique_triples, id, fiid, orgid)
"triples": unique_triples, "document_id": id, "knowledge_base_id": fiid, "userid": orgid
})
debug(f"Neo4j 服务响应: {neo4j_result}") debug(f"Neo4j 服务响应: {neo4j_result}")
if neo4j_result.get("status") != "success": if neo4j_result.get("status") != "success":
timings["insert_neo4j"] = time.time() - start_neo4j timings["insert_neo4j"] = time.time() - start_neo4j
@ -226,14 +208,7 @@ async def file_deleted(params_kw):
raise ValueError(f"缺少必填字段: {', '.join(missing_fields)}") raise ValueError(f"缺少必填字段: {', '.join(missing_fields)}")
debug(f"调用删除文件端点: userid={orgid}, file_path={realpath}, knowledge_base_id={fiid}") debug(f"调用删除文件端点: userid={orgid}, file_path={realpath}, knowledge_base_id={fiid}")
document_id = id # 备用,使用 id 作为 document_id milvus_result = await api_service.milvus_delete_document(orgid, realpath, fiid, id, db_type)
milvus_result = await api_service.make_milvus_request("deletedocument", {
"userid": orgid,
"file_path": realpath,
"knowledge_base_id": fiid,
"document_id": document_id,
"db_type": db_type
})
if milvus_result.get("status") != "success": if milvus_result.get("status") != "success":
raise ValueError(milvus_result.get("message", "Milvus 删除失败")) raise ValueError(milvus_result.get("message", "Milvus 删除失败"))
@ -241,22 +216,22 @@ async def file_deleted(params_kw):
neo4j_deleted_nodes = 0 neo4j_deleted_nodes = 0
neo4j_deleted_rels = 0 neo4j_deleted_rels = 0
try: try:
debug(f"调用 Neo4j 删除文档端点: document_id={document_id}") debug(f"调用 Neo4j 删除文档端点: document_id={id}")
neo4j_result = await api_service.make_neo4j_request("deletedocument", {"document_id": document_id}) neo4j_result = await api_service.neo4j_delete_document(id)
if neo4j_result.get("status") != "success": if neo4j_result.get("status") != "success":
raise ValueError(neo4j_result.get("message", "Neo4j 删除失败")) raise ValueError(neo4j_result.get("message", "Neo4j 删除失败"))
nodes_deleted = neo4j_result.get("nodes_deleted", 0) nodes_deleted = neo4j_result.get("nodes_deleted", 0)
rels_deleted = neo4j_result.get("rels_deleted", 0) rels_deleted = neo4j_result.get("rels_deleted", 0)
neo4j_deleted_nodes += nodes_deleted neo4j_deleted_nodes += nodes_deleted
neo4j_deleted_rels += rels_deleted neo4j_deleted_rels += rels_deleted
info(f"成功删除 document_id={document_id}{nodes_deleted} 个 Neo4j 节点和 {rels_deleted} 个关系") info(f"成功删除 document_id={id}{nodes_deleted} 个 Neo4j 节点和 {rels_deleted} 个关系")
except Exception as e: except Exception as e:
error(f"删除 document_id={document_id} 的 Neo4j 数据失败: {str(e)}") error(f"删除 document_id={id} 的 Neo4j 数据失败: {str(e)}")
return { return {
"status": "success", "status": "success",
"collection_name": collection_name, "collection_name": collection_name,
"document_id": document_id, "document_id": id,
"message": f"成功删除 Milvus 记录,{neo4j_deleted_nodes} 个 Neo4j 节点,{neo4j_deleted_rels} 个 Neo4j 关系", "message": f"成功删除 Milvus 记录,{neo4j_deleted_nodes} 个 Neo4j 节点,{neo4j_deleted_rels} 个 Neo4j 关系",
"status_code": 200 "status_code": 200
} }
@ -266,18 +241,51 @@ async def file_deleted(params_kw):
return { return {
"status": "error", "status": "error",
"collection_name": collection_name, "collection_name": collection_name,
"document_id": document_id, "document_id": id,
"message": f"删除文档失败: {str(e)}", "message": f"删除文档失败: {str(e)}",
"status_code": 400 "status_code": 400
} }
async def main(): async def main():
kdb_id = "textdb" dbs = {
orgid = await get_orgid_by_id(kdb_id) "kyrag":{
if orgid: "driver":"aiomysql",
print(f"找到的 orgid: {orgid}") "async_mode":True,
else: "coding":"utf8",
print("未找到对应的 orgid") "maxconn":100,
"dbname":"kyrag",
"kwargs":{
"user":"test",
"db":"kyrag",
"password":"QUZVcXg5V1p1STMybG5Ia6mX9D0v7+g=",
"host":"db"
}
}
}
DBPools(dbs)
# # 测试 file_uploaded
# print("测试 file_uploaded...")
# test_file_path = "/home/wangmeihua/data/kg.txt"
# test_params_upload = {
# "realpath": test_file_path,
# "fiid": "kb1",
# "id": "1"
# }
# upload_result = await file_uploaded(test_params_upload)
# print(f"file_uploaded 结果: {upload_result}")
# 测试 file_deleted
test_file_path = "/home/wangmeihua/data/kg.txt"
print("测试 file_deleted...")
test_params_delete = {
"realpath": test_file_path,
"fiid": "kb1",
"id": "1"
}
delete_result = await file_deleted(test_params_delete)
print(f"file_deleted 结果: {delete_result}")
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) asyncio.run(main())

View File

@ -3,6 +3,8 @@ from ahserver.serverenv import ServerEnv
import aiohttp import aiohttp
from aiohttp import ClientSession, ClientTimeout from aiohttp import ClientSession, ClientTimeout
import json import json
from .file import file_uploaded, file_deleted
from appPublic.registerfunction import RegisterFunction
async def _make_connection_request(action: str, params: dict = None) -> dict: async def _make_connection_request(action: str, params: dict = None) -> dict:
""" """
@ -137,6 +139,9 @@ def load_rag():
""" """
初始化 ServerEnv绑定 MilvusConnection 的所有功能 初始化 ServerEnv绑定 MilvusConnection 的所有功能
""" """
rf = RegisterFunction()
rf.register('fileuploaded', file_uploaded)
rf.register('filedeleted', file_deleted)
env = ServerEnv() env = ServerEnv()
env.create_collection = create_collection env.create_collection = create_collection
env.delete_collection = delete_collection env.delete_collection = delete_collection