diff --git a/rag/folderinfo.py b/rag/folderinfo.py index ff31f44..8cf3d99 100644 --- a/rag/folderinfo.py +++ b/rag/folderinfo.py @@ -216,358 +216,4 @@ where a.orgid = b.orgid "message": f"处理 {len(recs)} 个文件,成功删除 {sum(1 for r in results if r['status'] == 'success')} 个", "status_code": 200 if all(r["status"] == "success" for r in results) else 207 } - # async def get_doucment_chunks(self, realpath, timings): - # """加载文件并进行文本分片""" - # debug(f"加载文件: {realpath}") - # start_load = time.time() - # supported_formats = File2Text.supported_types() - # debug(f"支持的文件格式:{supported_formats}") - # ext = realpath.rsplit('.', 1)[1].lower() if '.' in realpath else '' - # if ext not in supported_formats: - # raise ValueError(f"不支持的文件格式: {ext}, 支持的格式: {', '.join(supported_formats)}") - # text = fileloader(realpath) - # text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s.;,\n/]', '', text) - # timings["load_file"] = time.time() - start_load - # debug(f"加载文件耗时: {timings['load_file']:.2f} 秒, 文本长度: {len(text)}") - # - # if not text or not text.strip(): - # raise ValueError(f"文件 {realpath} 加载为空") - # - # document = Document(page_content=text) - # text_splitter = RecursiveCharacterTextSplitter( - # chunk_size=500, - # chunk_overlap=100, - # length_function=len - # ) - # debug("开始分片文件内容") - # start_split = time.time() - # chunks = text_splitter.split_documents([document]) - # timings["split_text"] = time.time() - start_split - # debug(f"文本分片耗时: {timings['split_text']:.2f} 秒, 分片数量: {len(chunks)}") - # - # if not chunks: - # raise ValueError(f"文件 {realpath} 未生成任何文档块") - # - # return chunks - # - # async def docs_embedding(self, request, chunks, service_params, userid, timings): - # """调用嵌入服务生成向量""" - # debug("调用嵌入服务生成向量") - # start_embedding = time.time() - # texts = [chunk.page_content for chunk in chunks] - # embeddings = [] - # for i in range(0, len(texts), 10): - # batch_texts = texts[i:i + 10] - # batch_embeddings = await APIService().get_embeddings( - # request=request, - # texts=batch_texts, - # upappid=service_params['embedding'], - # apiname="BAAI/bge-m3", - # user=userid - # ) - # embeddings.extend(batch_embeddings) - # - # if not embeddings or not all(len(vec) == 1024 for vec in embeddings): - # raise ValueError("所有嵌入向量必须是长度为 1024 的浮点数列表") - # - # timings["generate_embeddings"] = time.time() - start_embedding - # debug(f"生成嵌入向量耗时: {timings['generate_embeddings']:.2f} 秒, 嵌入数量: {len(embeddings)}") - # return embeddings - # - # async def embedding_2_vdb(self, request, chunks, embeddings, realpath, orgid, fiid, id, service_params, userid, - # db_type, timings): - # """准备数据并插入 Milvus""" - # debug(f"准备数据并调用插入文件端点: {realpath}") - # filename = os.path.basename(realpath).rsplit('.', 1)[0] - # ext = realpath.rsplit('.', 1)[1].lower() if '.' in realpath else '' - # upload_time = datetime.now().isoformat() - # - # chunks_data = [ - # { - # "userid": orgid, - # "knowledge_base_id": fiid, - # "text": chunk.page_content, - # "vector": embeddings[i], - # "document_id": id, - # "filename": filename + '.' + ext, - # "file_path": realpath, - # "upload_time": upload_time, - # "file_type": ext, - # } - # for i, chunk in enumerate(chunks) - # ] - # - # start_milvus = time.time() - # for i in range(0, len(chunks_data), 10): - # batch_chunks = chunks_data[i:i + 10] - # result = await APIService().milvus_insert_document( - # request=request, - # chunks=batch_chunks, - # db_type=db_type, - # upappid=service_params['vdb'], - # apiname="milvus/insertdocument", - # user=userid - # ) - # if result.get("status") != "success": - # raise ValueError(result.get("message", "Milvus 插入失败")) - # - # timings["insert_milvus"] = time.time() - start_milvus - # debug(f"Milvus 插入耗时: {timings['insert_milvus']:.2f} 秒") - # return chunks_data - # - # async def get_triples(self, request, chunks, service_params, userid, timings): - # """调用三元组抽取服务""" - # debug("调用三元组抽取服务") - # start_triples = time.time() - # chunk_texts = [doc.page_content for doc in chunks] - # triples = [] - # for i, chunk in enumerate(chunk_texts): - # result = await APIService().extract_triples( - # request=request, - # text=chunk, - # upappid=service_params['triples'], - # apiname="Babelscape/mrebel-large", - # user=userid - # ) - # if isinstance(result, list): - # triples.extend(result) - # debug(f"分片 {i + 1} 抽取到 {len(result)} 个三元组") - # else: - # error(f"分片 {i + 1} 处理失败: {str(result)}") - # - # unique_triples = [] - # seen = set() - # for t in triples: - # identifier = (t['head'].lower(), t['tail'].lower(), t['type'].lower()) - # if identifier not in seen: - # seen.add(identifier) - # unique_triples.append(t) - # else: - # for existing in unique_triples: - # if (existing['head'].lower() == t['head'].lower() and - # existing['tail'].lower() == t['tail'].lower() and - # len(t['type']) > len(existing['type'])): - # unique_triples.remove(existing) - # unique_triples.append(t) - # debug(f"替换三元组为更具体类型: {t}") - # break - # - # timings["extract_triples"] = time.time() - start_triples - # debug(f"三元组抽取耗时: {timings['extract_triples']:.2f} 秒, 抽取到 {len(unique_triples)} 个三元组") - # return unique_triples - # - # async def triple2graphdb(self, request, unique_triples, id, fiid, orgid, service_params, userid, timings): - # """调用 Neo4j 插入三元组""" - # debug(f"插入 {len(unique_triples)} 个三元组到 Neo4j") - # start_neo4j = time.time() - # if unique_triples: - # for i in range(0, len(unique_triples), 30): - # batch_triples = unique_triples[i:i + 30] - # neo4j_result = await APIService().neo4j_insert_triples( - # request=request, - # triples=batch_triples, - # document_id=id, - # knowledge_base_id=fiid, - # userid=orgid, - # upappid=service_params['gdb'], - # apiname="neo4j/inserttriples", - # user=userid - # ) - # if neo4j_result.get("status") != "success": - # raise ValueError(f"Neo4j 三元组插入失败: {neo4j_result.get('message', '未知错误')}") - # info(f"文件三元组成功插入 Neo4j: {neo4j_result.get('message')}") - # timings["insert_neo4j"] = time.time() - start_neo4j - # debug(f"Neo4j 插入耗时: {timings['insert_neo4j']:.2f} 秒") - # else: - # debug("未抽取到三元组") - # timings["insert_neo4j"] = 0.0 - # - # async def delete_from_milvus(self, request, orgid, realpath, fiid, id, service_params, userid, db_type): - # """调用 Milvus 删除文档""" - # debug(f"调用删除文件端点: userid={orgid}, file_path={realpath}, knowledge_base_id={fiid}, document_id={id}") - # milvus_result = await APIService().milvus_delete_document( - # request=request, - # userid=orgid, - # file_path=realpath, - # knowledge_base_id=fiid, - # document_id=id, - # db_type=db_type, - # upappid=service_params['vdb'], - # apiname="milvus/deletedocument", - # user=userid - # ) - # if milvus_result.get("status") != "success": - # raise ValueError(milvus_result.get("message", "Milvus 删除失败")) - # - # async def delete_from_neo4j(self, request, id, service_params, userid): - # """调用 Neo4j 删除文档""" - # debug(f"调用 Neo4j 删除文档端点: document_id={id}") - # neo4j_result = await APIService().neo4j_delete_document( - # request=request, - # document_id=id, - # upappid=service_params['gdb'], - # apiname="neo4j/deletedocument", - # user=userid - # ) - # if neo4j_result.get("status") != "success": - # raise ValueError(neo4j_result.get("message", "Neo4j 删除失败")) - # nodes_deleted = neo4j_result.get("nodes_deleted", 0) - # rels_deleted = neo4j_result.get("rels_deleted", 0) - # info(f"成功删除 document_id={id} 的 {nodes_deleted} 个 Neo4j 节点和 {rels_deleted} 个关系") - # return nodes_deleted, rels_deleted - - # async def file_uploaded(self, request, ns, userid): - # """将文档插入 Milvus 并抽取三元组到 Neo4j""" - # debug(f'Received ns: {ns=}') - # env = request._run_ns - # realpath = ns.get('realpath', '') - # fiid = ns.get('fiid', '') - # id = ns.get('id', '') - # orgid = ns.get('ownerid', '') - # db_type = '' - # - # debug(f'Inserting document: file_path={realpath}, userid={orgid}, db_type={db_type}, knowledge_base_id={fiid}, document_id={id}') - # - # timings = {} - # start_total = time.time() - # - # try: - # if not orgid or not fiid or not id: - # raise ValueError("orgid、fiid 和 id 不能为空") - # if len(orgid) > 32 or len(fiid) > 255: - # raise ValueError("orgid 或 fiid 的长度超出限制") - # if not os.path.exists(realpath): - # raise ValueError(f"文件 {realpath} 不存在") - # - # # 获取服务参数 - # service_params = await get_service_params(orgid) - # if not service_params: - # raise ValueError("无法获取服务参数") - # - # chunks = await self.get_doucment_chunks(realpath, timings) - # embeddings = await self.docs_embedding(request, chunks, service_params, userid, timings) - # await self.embedding_2_vdb(request, chunks, embeddings, realpath, orgid, fiid, id, service_params, userid,db_type, timings) - # triples = await self.get_triples(request, chunks, service_params, userid, timings) - # await self.triple2graphdb(request, triples, id, fiid, orgid, service_params, userid, timings) - # - # timings["total"] = time.time() - start_total - # debug(f"总耗时: {timings['total']:.2f} 秒") - # return { - # "status": "success", - # "userid": orgid, - # "document_id": id, - # "collection_name": "ragdb", - # "timings": timings, - # "unique_triples": triples, - # "message": f"文件 {realpath} 成功嵌入并处理三元组", - # "status_code": 200 - # } - # except Exception as e: - # error(f"插入文档失败: {str(e)}, 堆栈: {traceback.format_exc()}") - # timings["total"] = time.time() - start_total - # return { - # "status": "error", - # "document_id": id, - # "collection_name": "ragdb", - # "timings": timings, - # "message": f"插入文档失败: {str(e)}", - # "status_code": 400 - # } - # - # async def file_deleted(self, request, recs, userid): - # """删除用户指定文件数据,包括 Milvus 和 Neo4j 中的记录""" - # if not isinstance(recs, list): - # recs = [recs] - # results = [] - # total_nodes_deleted = 0 - # total_rels_deleted = 0 - # - # for rec in recs: - # id = rec.get('id', '') - # realpath = rec.get('realpath', '') - # fiid = rec.get('fiid', '') - # orgid = rec.get('ownerid', '') - # db_type = '' - # collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" - # - # try: - # required_fields = ['id', 'realpath', 'fiid', 'ownerid'] - # missing_fields = [field for field in required_fields if not rec.get(field, '')] - # if missing_fields: - # raise ValueError(f"缺少必填字段: {', '.join(missing_fields)}") - # - # service_params = await get_service_params(orgid) - # if not service_params: - # raise ValueError("无法获取服务参数") - # - # # 调用 Milvus 删除 - # await self.delete_from_milvus(request, orgid, realpath, fiid, id, service_params, userid, db_type) - # - # # 调用 Neo4j 删除 - # neo4j_deleted_nodes = 0 - # neo4j_deleted_rels = 0 - # try: - # nodes_deleted, rels_deleted = await self.delete_from_neo4j(request, id, service_params, userid) - # neo4j_deleted_nodes += nodes_deleted - # neo4j_deleted_rels += rels_deleted - # total_nodes_deleted += nodes_deleted - # total_rels_deleted += rels_deleted - # except Exception as e: - # error(f"删除 document_id={id} 的 Neo4j 数据失败: {str(e)}") - # - # results.append({ - # "status": "success", - # "collection_name": collection_name, - # "document_id": id, - # "message": f"成功删除文件 {realpath} 的 Milvus 记录,{neo4j_deleted_nodes} 个 Neo4j 节点,{neo4j_deleted_rels} 个 Neo4j 关系", - # "status_code": 200 - # }) - # - # except Exception as e: - # error(f"删除文档 {realpath} 失败: {str(e)}, 堆栈: {traceback.format_exc()}") - # results.append({ - # "status": "error", - # "collection_name": collection_name, - # "document_id": id, - # "message": f"删除文档 {realpath} 失败: {str(e)}", - # "status_code": 400 - # }) - # - # return { - # "status": "success" if all(r["status"] == "success" for r in results) else "partial", - # "results": results, - # "total_nodes_deleted": total_nodes_deleted, - # "total_rels_deleted": total_rels_deleted, - # "message": f"处理 {len(recs)} 个文件,成功删除 {sum(1 for r in results if r['status'] == 'success')} 个", - # "status_code": 200 if all(r["status"] == "success" for r in results) else 207 - # } - - -# async def test_ragfilemgr(): -# """测试 RagFileMgr 类的 get_service_params""" -# print("初始化数据库连接池...") -# dbs = { -# "kyrag": { -# "driver": "aiomysql", -# "async_mode": True, -# "coding": "utf8", -# "maxconn": 100, -# "dbname": "kyrag", -# "kwargs": { -# "user": "test", -# "db": "kyrag", -# "password": "QUZVcXg5V1p1STMybG5Ia6mX9D0v7+g=", -# "host": "db" -# } -# } -# } -# DBPools(dbs) -# -# ragfilemgr = RagFileMgr() -# orgid = "04J6VbxLqB_9RPMcgOv_8" -# result = await get_service_params(orgid) -# print(f"get_service_params 结果: {result}") -# -# -# if __name__ == "__main__": -# asyncio.run(test_ragfilemgr()) +