diff --git a/rag/api_service.py b/rag/api_service.py index a50593f..f11f3b5 100644 --- a/rag/api_service.py +++ b/rag/api_service.py @@ -48,7 +48,7 @@ class APIService: try: async with ClientSession() as session: async with session.post( - "http://222.81.167.136:9998/v1/embeddings", + "https://embedding.opencomputing.net:10443/v1/embeddings", # 使用外网地址 headers={"Content-Type": "application/json"}, json={"input": texts if isinstance(texts, list) else [texts]} ) as response: @@ -74,7 +74,7 @@ class APIService: raise ValueError("查询文本不能为空") async with ClientSession() as session: async with session.post( - "http://222.81.167.136:9990/v1/entities", + "https://entities.opencomputing.net:10443/v1/entities", # 使用外网地址 headers={"Content-Type": "application/json"}, json={"query": query} ) as response: @@ -104,7 +104,7 @@ class APIService: timeout=ClientTimeout(total=None) ) as session: async with session.post( - "http://222.81.167.136:9991/v1/triples", + "https://triples.opencomputing.net:10443/v1/triples", # 使用外网地址 headers={"Content-Type": "application/json; charset=utf-8"}, json={"text": text} ) as response: @@ -140,7 +140,7 @@ class APIService: documents = [result.get("text", str(result)) for result in results] async with ClientSession() as session: async with session.post( - "http://222.81.167.136:9997/v1/rerank", + "https://reranker.opencomputing.net:10443/v1/rerank", # 使用外网地址 headers={"Content-Type": "application/json"}, json={ "model": "rerank-001", @@ -170,13 +170,20 @@ class APIService: return results # Neo4j 服务 - async def neo4j_docs(self) -> Dict[str, Any]: - """获取 Neo4j 文档""" - return await self._make_request("http://222.81.167.136:8885/docs", "docs", {}) + async def neo4j_docs(self) -> str: + """获取 Neo4j 文档(返回文本格式)""" + async with ClientSession(timeout=ClientTimeout(total=300)) as session: + async with session.get("https://graphdb.opencomputing.net:10443/docs") as response: + if response.status != 200: + error(f"Neo4j 文档请求失败,状态码: {response.status}") + raise RuntimeError(f"Neo4j 文档请求失败: {response.status}") + text = await response.text() # 获取纯文本内容 + debug(f"Neo4j 文档内容: {text}") + return text async def neo4j_initialize(self) -> Dict[str, Any]: """初始化 Neo4j 服务""" - return await self._make_request("http://222.81.167.136:8885/v1/initialize", "initialize", {}) + return await self._make_request("https://graphdb.opencomputing.net:10443/v1/initialize", "initialize", {}) async def neo4j_insert_triples(self, triples: list, document_id: str, knowledge_base_id: str, userid: str) -> Dict[str, Any]: """插入三元组到 Neo4j""" @@ -186,15 +193,15 @@ class APIService: "knowledge_base_id": knowledge_base_id, "userid": userid } - return await self._make_request("http://222.81.167.136:8885/v1/inserttriples", "inserttriples", params) + return await self._make_request("https://graphdb.opencomputing.net:10443/v1/inserttriples", "inserttriples", params) async def neo4j_delete_document(self, document_id: str) -> Dict[str, Any]: """删除指定文档""" - return await self._make_request("http://222.81.167.136:8885/v1/deletedocument", "deletedocument", {"document_id": document_id}) + return await self._make_request("https://graphdb.opencomputing.net:10443/v1/deletedocument", "deletedocument", {"document_id": document_id}) async def neo4j_delete_knowledgebase(self, userid: str, knowledge_base_id: str) -> Dict[str, Any]: """删除用户知识库""" - return await self._make_request("http://222.81.167.136:8885/v1/deleteknowledgebase", "deleteknowledgebase", + return await self._make_request("https://graphdb.opencomputing.net:10443/v1/deleteknowledgebase", "deleteknowledgebase", {"userid": userid, "knowledge_base_id": knowledge_base_id}) async def neo4j_match_triplets(self, query: str, query_entities: list, userid: str, knowledge_base_id: str) -> Dict[str, Any]: @@ -205,47 +212,101 @@ class APIService: "userid": userid, "knowledge_base_id": knowledge_base_id } - return await self._make_request("http://222.81.167.136:8885/v1/matchtriplets", "matchtriplets", params) + return await self._make_request("https://graphdb.opencomputing.net:10443/v1/matchtriplets", "matchtriplets", params) + + # Milvus 服务 + async def milvus_create_collection(self, db_type: str = "") -> Dict[str, Any]: + """创建 Milvus 集合""" + params = {"db_type": db_type} if db_type else {} + return await self._make_request("https://vectordb.opencomputing.net:10443/v1/createcollection", "createcollection", params) + + async def milvus_delete_collection(self, db_type: str = "") -> Dict[str, Any]: + """删除 Milvus 集合""" + params = {"db_type": db_type} if db_type else {} + return await self._make_request("https://vectordb.opencomputing.net:10443/v1/deletecollection", "deletecollection", params) + + async def milvus_insert_document(self, chunks: List[Dict], db_type: str = "") -> Dict[str, Any]: + """添加 Milvus 记录""" + params = { + "chunks": chunks, + "dbtype": db_type + } + return await self._make_request("https://vectordb.opencomputing.net:10443/v1/insertdocument", "insertdocument", params) + + async def milvus_delete_document(self, userid: str, file_path: str, knowledge_base_id: str, document_id:str, db_type: str = "") -> Dict[str, Any]: + """删除 Milvus 记录""" + params = { + "userid": userid, + "file_path": file_path, + "knowledge_base_id": knowledge_base_id, + "document_id": document_id, + "dbtype": db_type + } + return await self._make_request("https://vectordb.opencomputing.net:10443/v1/deletedocument", "deletedocument", params) + + async def milvus_delete_knowledgebase(self, userid: str, knowledge_base_id: str) -> Dict[str, Any]: + """删除 Milvus 知识库""" + return await self._make_request("https://vectordb.opencomputing.net:10443/v1/deleteknowledgebase", "deleteknowledgebase", + {"userid": userid, "knowledge_base_id": knowledge_base_id}) + + async def milvus_search_query(self, query: str, userid: str, knowledge_base_ids: list, limit: int, offset: int) -> Dict[str, Any]: + """根据用户知识库检索 Milvus""" + params = { + "query": query, + "userid": userid, + "knowledge_base_ids": knowledge_base_ids, + "limit": limit, + "offset": offset + } + return await self._make_request("https://vectordb.opencomputing.net:10443/v1/searchquery", "searchquery", params) + + async def milvus_list_user_files(self, userid: str) -> Dict[str, Any]: + """列出 Milvus 用户知识库列表""" + return await self._make_request("https://vectordb.opencomputing.net:10443/v1/listuserfiles", "listuserfiles", {"userid": userid}) + + async def milvus_list_all_knowledgebases(self) -> Dict[str, Any]: + """列出 Milvus 数据库中所有数据""" + return await self._make_request("https://vectordb.opencomputing.net:10443/v1/listallknowledgebases", "listallknowledgebases", {}) # RAG 服务 async def rag_create_collection(self, db_type: str = "") -> Dict[str, Any]: - """创建集合""" + """创建 RAG 集合""" params = {"db_type": db_type} if db_type else {} - return await self._make_request("http://222.81.167.136:8888/v1/createcollection", "createcollection", params) + return await self._make_request("https://rag.opencomputing.net:10443/v1/createcollection", "createcollection", params) async def rag_delete_collection(self, db_type: str = "") -> Dict[str, Any]: - """删除集合""" + """删除 RAG 集合""" params = {"db_type": db_type} if db_type else {} - return await self._make_request("http://222.81.167.136:8888/v1/deletecollection", "deletecollection", params) + return await self._make_request("https://rag.opencomputing.net:10443/v1/deletecollection", "deletecollection", params) async def rag_insert_file(self, file_path: str, userid: str, knowledge_base_id: str, document_id: str) -> Dict[str, Any]: - """添加记录""" + """添加 RAG 记录""" params = { "file_path": file_path, "userid": userid, "knowledge_base_id": knowledge_base_id, "document_id": document_id } - return await self._make_request("http://222.81.167.136:8888/v1/insertfile", "insertfile", params) + return await self._make_request("https://rag.opencomputing.net:10443/v1/insertfile", "insertfile", params) async def rag_delete_file(self, userid: str, file_path: str, knowledge_base_id: str, document_id: str) -> Dict[str, Any]: - """删除记录""" + """删除 RAG 记录""" params = { "userid": userid, "file_path": file_path, "knowledge_base_id": knowledge_base_id, "document_id": document_id } - return await self._make_request("http://222.81.167.136:8888/v1/deletefile", "deletefile", params) + return await self._make_request("https://rag.opencomputing.net:10443/v1/deletefile", "deletefile", params) async def rag_delete_knowledgebase(self, userid: str, knowledge_base_id: str) -> Dict[str, Any]: - """删除知识库""" - return await self._make_request("http://222.81.167.136:8888/v1/deleteknowledgebase", "deleteknowledgebase", + """删除 RAG 知识库""" + return await self._make_request("https://rag.opencomputing.net:10443/v1/deleteknowledgebase", "deleteknowledgebase", {"userid": userid, "knowledge_base_id": knowledge_base_id}) async def rag_search_query(self, query: str, userid: str, knowledge_base_ids: list, limit: int, offset: int, use_rerank: bool) -> Dict[str, Any]: - """根据用户知识库检索""" + """根据用户知识库检索 RAG""" params = { "query": query, "userid": userid, @@ -254,11 +315,11 @@ class APIService: "offset": offset, "use_rerank": use_rerank } - return await self._make_request("http://222.81.167.136:8888/v1/searchquery", "searchquery", params) + return await self._make_request("https://rag.opencomputing.net:10443/v1/searchquery", "searchquery", params) async def rag_fused_search_query(self, query: str, userid: str, knowledge_base_ids: list, limit: int, offset: int, use_rerank: bool) -> Dict[str, Any]: - """根据用户知识库+知识图谱检索""" + """根据用户知识库+知识图谱检索 RAG""" params = { "query": query, "userid": userid, @@ -267,16 +328,16 @@ class APIService: "offset": offset, "use_rerank": use_rerank } - return await self._make_request("http://222.81.167.136:8888/v1/fusedsearchquery", "fusedsearchquery", params) + return await self._make_request("https://rag.opencomputing.net:10443/v1/fusedsearchquery", "fusedsearchquery", params) async def rag_list_user_files(self, userid: str) -> Dict[str, Any]: - """列出用户知识库列表""" - return await self._make_request("http://222.81.167.136:8888/v1/listuserfiles", "listuserfiles", {"userid": userid}) + """列出 RAG 用户知识库列表""" + return await self._make_request("https://rag.opencomputing.net:10443/v1/listuserfiles", "listuserfiles", {"userid": userid}) async def rag_list_all_knowledgebases(self) -> Dict[str, Any]: - """列出数据库中所有数据""" - return await self._make_request("http://222.81.167.136:8888/v1/listallknowledgebases", "listallknowledgebases", {}) + """列出 RAG 数据库中所有数据""" + return await self._make_request("https://rag.opencomputing.net:10443/v1/listallknowledgebases", "listallknowledgebases", {}) async def rag_docs(self) -> Dict[str, Any]: """获取 RAG 帮助文档""" - return await self._make_request("http://222.81.167.136:8888/v1/docs", "docs", {}) \ No newline at end of file + return await self._make_request("https://rag.opencomputing.net:10443/v1/docs", "docs", {}) \ No newline at end of file diff --git a/rag/file.py b/rag/file.py index bd92832..4911738 100644 --- a/rag/file.py +++ b/rag/file.py @@ -13,39 +13,23 @@ import uuid from datetime import datetime import traceback from filetxt.loader import fileloader - -def init(): - rf = RegisterFunction() - rf.register('fileuploaded', file_uploaded) - rf.register('filedeleted', file_deleted) +from ahserver.serverenv import get_serverenv async def get_orgid_by_id(kdb_id): """ 根据 kdb 的 id 查询对应的 orgid。 """ - dbs = { - "cfae": { - "driver": "mysql.connector", - "coding": "utf8", - "dbname": "cfae", - "kwargs": { - "user": "test", - "db": "cfae", - "password": "test123", - "host": "localhost" - } - } - } - loop = asyncio.get_event_loop() - pool = DBPools(dbs, loop) db = DBPools() - dbname = get_module_dbname('rag') - sql = "SELECT orgid FROM kdb WHERE id = %s" + # f = get_serverenv("get_module_dbname") + # dbname = f("rag") + dbname = "kyrag" + sql = "SELECT orgid FROM kdb WHERE id = ${id}$" try: async with db.sqlorContext(dbname) as sor: - result = await sor.sql(sql, (kdb_id,)) - if result and result.rows: - return result.rows[0][0] + result = await sor.sqlExe(sql,{"id":kdb_id}) + print(result) + if result and len(result) > 0: + return result[0].get('orgid') return None except Exception as e: error(f"查询 orgid 失败: {str(e)}, 堆栈: {traceback.format_exc()}") @@ -128,7 +112,7 @@ async def file_uploaded(params_kw): debug(f"调用插入文件端点: {realpath}") start_milvus = time.time() - result = await api_service.make_milvus_request("insertdocument", {"chunks": chunks_data, "db_type": db_type}) + result = await api_service.milvus_insert_document(chunks_data, db_type) timings["insert_milvus"] = time.time() - start_milvus debug(f"Milvus 插入耗时: {timings['insert_milvus']:.2f} 秒") @@ -175,9 +159,7 @@ async def file_uploaded(params_kw): debug(f"抽取到 {len(unique_triples)} 个三元组,调用 Neo4j 服务插入") start_neo4j = time.time() if unique_triples: - neo4j_result = await api_service.make_neo4j_request("inserttriples", { - "triples": unique_triples, "document_id": id, "knowledge_base_id": fiid, "userid": orgid - }) + neo4j_result = await api_service.neo4j_insert_triples(unique_triples, id, fiid, orgid) debug(f"Neo4j 服务响应: {neo4j_result}") if neo4j_result.get("status") != "success": timings["insert_neo4j"] = time.time() - start_neo4j @@ -226,14 +208,7 @@ async def file_deleted(params_kw): raise ValueError(f"缺少必填字段: {', '.join(missing_fields)}") debug(f"调用删除文件端点: userid={orgid}, file_path={realpath}, knowledge_base_id={fiid}") - document_id = id # 备用,使用 id 作为 document_id - milvus_result = await api_service.make_milvus_request("deletedocument", { - "userid": orgid, - "file_path": realpath, - "knowledge_base_id": fiid, - "document_id": document_id, - "db_type": db_type - }) + milvus_result = await api_service.milvus_delete_document(orgid, realpath, fiid, id, db_type) if milvus_result.get("status") != "success": raise ValueError(milvus_result.get("message", "Milvus 删除失败")) @@ -241,22 +216,22 @@ async def file_deleted(params_kw): neo4j_deleted_nodes = 0 neo4j_deleted_rels = 0 try: - debug(f"调用 Neo4j 删除文档端点: document_id={document_id}") - neo4j_result = await api_service.make_neo4j_request("deletedocument", {"document_id": document_id}) + debug(f"调用 Neo4j 删除文档端点: document_id={id}") + neo4j_result = await api_service.neo4j_delete_document(id) if neo4j_result.get("status") != "success": raise ValueError(neo4j_result.get("message", "Neo4j 删除失败")) nodes_deleted = neo4j_result.get("nodes_deleted", 0) rels_deleted = neo4j_result.get("rels_deleted", 0) neo4j_deleted_nodes += nodes_deleted neo4j_deleted_rels += rels_deleted - info(f"成功删除 document_id={document_id} 的 {nodes_deleted} 个 Neo4j 节点和 {rels_deleted} 个关系") + info(f"成功删除 document_id={id} 的 {nodes_deleted} 个 Neo4j 节点和 {rels_deleted} 个关系") except Exception as e: - error(f"删除 document_id={document_id} 的 Neo4j 数据失败: {str(e)}") + error(f"删除 document_id={id} 的 Neo4j 数据失败: {str(e)}") return { "status": "success", "collection_name": collection_name, - "document_id": document_id, + "document_id": id, "message": f"成功删除 Milvus 记录,{neo4j_deleted_nodes} 个 Neo4j 节点,{neo4j_deleted_rels} 个 Neo4j 关系", "status_code": 200 } @@ -266,18 +241,51 @@ async def file_deleted(params_kw): return { "status": "error", "collection_name": collection_name, - "document_id": document_id, + "document_id": id, "message": f"删除文档失败: {str(e)}", "status_code": 400 } + async def main(): - kdb_id = "textdb" - orgid = await get_orgid_by_id(kdb_id) - if orgid: - print(f"找到的 orgid: {orgid}") - else: - print("未找到对应的 orgid") + dbs = { + "kyrag":{ + "driver":"aiomysql", + "async_mode":True, + "coding":"utf8", + "maxconn":100, + "dbname":"kyrag", + "kwargs":{ + "user":"test", + "db":"kyrag", + "password":"QUZVcXg5V1p1STMybG5Ia6mX9D0v7+g=", + "host":"db" + } + } + } + DBPools(dbs) + # # 测试 file_uploaded + # print("测试 file_uploaded...") + # test_file_path = "/home/wangmeihua/data/kg.txt" + # test_params_upload = { + # "realpath": test_file_path, + # "fiid": "kb1", + # "id": "1" + # } + # upload_result = await file_uploaded(test_params_upload) + # print(f"file_uploaded 结果: {upload_result}") + + # 测试 file_deleted + test_file_path = "/home/wangmeihua/data/kg.txt" + print("测试 file_deleted...") + test_params_delete = { + "realpath": test_file_path, + "fiid": "kb1", + "id": "1" + } + delete_result = await file_deleted(test_params_delete) + print(f"file_deleted 结果: {delete_result}") + if __name__ == "__main__": asyncio.run(main()) diff --git a/rag/init.py b/rag/init.py index caa444e..cd2ad78 100644 --- a/rag/init.py +++ b/rag/init.py @@ -3,6 +3,8 @@ from ahserver.serverenv import ServerEnv import aiohttp from aiohttp import ClientSession, ClientTimeout import json +from .file import file_uploaded, file_deleted +from appPublic.registerfunction import RegisterFunction async def _make_connection_request(action: str, params: dict = None) -> dict: """ @@ -137,6 +139,9 @@ def load_rag(): """ 初始化 ServerEnv,绑定 MilvusConnection 的所有功能。 """ + rf = RegisterFunction() + rf.register('fileuploaded', file_uploaded) + rf.register('filedeleted', file_deleted) env = ServerEnv() env.create_collection = create_collection env.delete_collection = delete_collection