This commit is contained in:
wangmeihua 2025-10-10 14:51:18 +08:00
parent 210b77dda7
commit 40625583c8

View File

@ -216,358 +216,4 @@ where a.orgid = b.orgid
"message": f"处理 {len(recs)} 个文件,成功删除 {sum(1 for r in results if r['status'] == 'success')}",
"status_code": 200 if all(r["status"] == "success" for r in results) else 207
}
# async def get_doucment_chunks(self, realpath, timings):
# """加载文件并进行文本分片"""
# debug(f"加载文件: {realpath}")
# start_load = time.time()
# supported_formats = File2Text.supported_types()
# debug(f"支持的文件格式:{supported_formats}")
# ext = realpath.rsplit('.', 1)[1].lower() if '.' in realpath else ''
# if ext not in supported_formats:
# raise ValueError(f"不支持的文件格式: {ext}, 支持的格式: {', '.join(supported_formats)}")
# text = fileloader(realpath)
# text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s.;,\n/]', '', text)
# timings["load_file"] = time.time() - start_load
# debug(f"加载文件耗时: {timings['load_file']:.2f} 秒, 文本长度: {len(text)}")
#
# if not text or not text.strip():
# raise ValueError(f"文件 {realpath} 加载为空")
#
# document = Document(page_content=text)
# text_splitter = RecursiveCharacterTextSplitter(
# chunk_size=500,
# chunk_overlap=100,
# length_function=len
# )
# debug("开始分片文件内容")
# start_split = time.time()
# chunks = text_splitter.split_documents([document])
# timings["split_text"] = time.time() - start_split
# debug(f"文本分片耗时: {timings['split_text']:.2f} 秒, 分片数量: {len(chunks)}")
#
# if not chunks:
# raise ValueError(f"文件 {realpath} 未生成任何文档块")
#
# return chunks
#
# async def docs_embedding(self, request, chunks, service_params, userid, timings):
# """调用嵌入服务生成向量"""
# debug("调用嵌入服务生成向量")
# start_embedding = time.time()
# texts = [chunk.page_content for chunk in chunks]
# embeddings = []
# for i in range(0, len(texts), 10):
# batch_texts = texts[i:i + 10]
# batch_embeddings = await APIService().get_embeddings(
# request=request,
# texts=batch_texts,
# upappid=service_params['embedding'],
# apiname="BAAI/bge-m3",
# user=userid
# )
# embeddings.extend(batch_embeddings)
#
# if not embeddings or not all(len(vec) == 1024 for vec in embeddings):
# raise ValueError("所有嵌入向量必须是长度为 1024 的浮点数列表")
#
# timings["generate_embeddings"] = time.time() - start_embedding
# debug(f"生成嵌入向量耗时: {timings['generate_embeddings']:.2f} 秒, 嵌入数量: {len(embeddings)}")
# return embeddings
#
# async def embedding_2_vdb(self, request, chunks, embeddings, realpath, orgid, fiid, id, service_params, userid,
# db_type, timings):
# """准备数据并插入 Milvus"""
# debug(f"准备数据并调用插入文件端点: {realpath}")
# filename = os.path.basename(realpath).rsplit('.', 1)[0]
# ext = realpath.rsplit('.', 1)[1].lower() if '.' in realpath else ''
# upload_time = datetime.now().isoformat()
#
# chunks_data = [
# {
# "userid": orgid,
# "knowledge_base_id": fiid,
# "text": chunk.page_content,
# "vector": embeddings[i],
# "document_id": id,
# "filename": filename + '.' + ext,
# "file_path": realpath,
# "upload_time": upload_time,
# "file_type": ext,
# }
# for i, chunk in enumerate(chunks)
# ]
#
# start_milvus = time.time()
# for i in range(0, len(chunks_data), 10):
# batch_chunks = chunks_data[i:i + 10]
# result = await APIService().milvus_insert_document(
# request=request,
# chunks=batch_chunks,
# db_type=db_type,
# upappid=service_params['vdb'],
# apiname="milvus/insertdocument",
# user=userid
# )
# if result.get("status") != "success":
# raise ValueError(result.get("message", "Milvus 插入失败"))
#
# timings["insert_milvus"] = time.time() - start_milvus
# debug(f"Milvus 插入耗时: {timings['insert_milvus']:.2f} 秒")
# return chunks_data
#
# async def get_triples(self, request, chunks, service_params, userid, timings):
# """调用三元组抽取服务"""
# debug("调用三元组抽取服务")
# start_triples = time.time()
# chunk_texts = [doc.page_content for doc in chunks]
# triples = []
# for i, chunk in enumerate(chunk_texts):
# result = await APIService().extract_triples(
# request=request,
# text=chunk,
# upappid=service_params['triples'],
# apiname="Babelscape/mrebel-large",
# user=userid
# )
# if isinstance(result, list):
# triples.extend(result)
# debug(f"分片 {i + 1} 抽取到 {len(result)} 个三元组")
# else:
# error(f"分片 {i + 1} 处理失败: {str(result)}")
#
# unique_triples = []
# seen = set()
# for t in triples:
# identifier = (t['head'].lower(), t['tail'].lower(), t['type'].lower())
# if identifier not in seen:
# seen.add(identifier)
# unique_triples.append(t)
# else:
# for existing in unique_triples:
# if (existing['head'].lower() == t['head'].lower() and
# existing['tail'].lower() == t['tail'].lower() and
# len(t['type']) > len(existing['type'])):
# unique_triples.remove(existing)
# unique_triples.append(t)
# debug(f"替换三元组为更具体类型: {t}")
# break
#
# timings["extract_triples"] = time.time() - start_triples
# debug(f"三元组抽取耗时: {timings['extract_triples']:.2f} 秒, 抽取到 {len(unique_triples)} 个三元组")
# return unique_triples
#
# async def triple2graphdb(self, request, unique_triples, id, fiid, orgid, service_params, userid, timings):
# """调用 Neo4j 插入三元组"""
# debug(f"插入 {len(unique_triples)} 个三元组到 Neo4j")
# start_neo4j = time.time()
# if unique_triples:
# for i in range(0, len(unique_triples), 30):
# batch_triples = unique_triples[i:i + 30]
# neo4j_result = await APIService().neo4j_insert_triples(
# request=request,
# triples=batch_triples,
# document_id=id,
# knowledge_base_id=fiid,
# userid=orgid,
# upappid=service_params['gdb'],
# apiname="neo4j/inserttriples",
# user=userid
# )
# if neo4j_result.get("status") != "success":
# raise ValueError(f"Neo4j 三元组插入失败: {neo4j_result.get('message', '未知错误')}")
# info(f"文件三元组成功插入 Neo4j: {neo4j_result.get('message')}")
# timings["insert_neo4j"] = time.time() - start_neo4j
# debug(f"Neo4j 插入耗时: {timings['insert_neo4j']:.2f} 秒")
# else:
# debug("未抽取到三元组")
# timings["insert_neo4j"] = 0.0
#
# async def delete_from_milvus(self, request, orgid, realpath, fiid, id, service_params, userid, db_type):
# """调用 Milvus 删除文档"""
# debug(f"调用删除文件端点: userid={orgid}, file_path={realpath}, knowledge_base_id={fiid}, document_id={id}")
# milvus_result = await APIService().milvus_delete_document(
# request=request,
# userid=orgid,
# file_path=realpath,
# knowledge_base_id=fiid,
# document_id=id,
# db_type=db_type,
# upappid=service_params['vdb'],
# apiname="milvus/deletedocument",
# user=userid
# )
# if milvus_result.get("status") != "success":
# raise ValueError(milvus_result.get("message", "Milvus 删除失败"))
#
# async def delete_from_neo4j(self, request, id, service_params, userid):
# """调用 Neo4j 删除文档"""
# debug(f"调用 Neo4j 删除文档端点: document_id={id}")
# neo4j_result = await APIService().neo4j_delete_document(
# request=request,
# document_id=id,
# upappid=service_params['gdb'],
# apiname="neo4j/deletedocument",
# user=userid
# )
# if neo4j_result.get("status") != "success":
# raise ValueError(neo4j_result.get("message", "Neo4j 删除失败"))
# nodes_deleted = neo4j_result.get("nodes_deleted", 0)
# rels_deleted = neo4j_result.get("rels_deleted", 0)
# info(f"成功删除 document_id={id} 的 {nodes_deleted} 个 Neo4j 节点和 {rels_deleted} 个关系")
# return nodes_deleted, rels_deleted
# async def file_uploaded(self, request, ns, userid):
# """将文档插入 Milvus 并抽取三元组到 Neo4j"""
# debug(f'Received ns: {ns=}')
# env = request._run_ns
# realpath = ns.get('realpath', '')
# fiid = ns.get('fiid', '')
# id = ns.get('id', '')
# orgid = ns.get('ownerid', '')
# db_type = ''
#
# debug(f'Inserting document: file_path={realpath}, userid={orgid}, db_type={db_type}, knowledge_base_id={fiid}, document_id={id}')
#
# timings = {}
# start_total = time.time()
#
# try:
# if not orgid or not fiid or not id:
# raise ValueError("orgid、fiid 和 id 不能为空")
# if len(orgid) > 32 or len(fiid) > 255:
# raise ValueError("orgid 或 fiid 的长度超出限制")
# if not os.path.exists(realpath):
# raise ValueError(f"文件 {realpath} 不存在")
#
# # 获取服务参数
# service_params = await get_service_params(orgid)
# if not service_params:
# raise ValueError("无法获取服务参数")
#
# chunks = await self.get_doucment_chunks(realpath, timings)
# embeddings = await self.docs_embedding(request, chunks, service_params, userid, timings)
# await self.embedding_2_vdb(request, chunks, embeddings, realpath, orgid, fiid, id, service_params, userid,db_type, timings)
# triples = await self.get_triples(request, chunks, service_params, userid, timings)
# await self.triple2graphdb(request, triples, id, fiid, orgid, service_params, userid, timings)
#
# timings["total"] = time.time() - start_total
# debug(f"总耗时: {timings['total']:.2f} 秒")
# return {
# "status": "success",
# "userid": orgid,
# "document_id": id,
# "collection_name": "ragdb",
# "timings": timings,
# "unique_triples": triples,
# "message": f"文件 {realpath} 成功嵌入并处理三元组",
# "status_code": 200
# }
# except Exception as e:
# error(f"插入文档失败: {str(e)}, 堆栈: {traceback.format_exc()}")
# timings["total"] = time.time() - start_total
# return {
# "status": "error",
# "document_id": id,
# "collection_name": "ragdb",
# "timings": timings,
# "message": f"插入文档失败: {str(e)}",
# "status_code": 400
# }
#
# async def file_deleted(self, request, recs, userid):
# """删除用户指定文件数据,包括 Milvus 和 Neo4j 中的记录"""
# if not isinstance(recs, list):
# recs = [recs]
# results = []
# total_nodes_deleted = 0
# total_rels_deleted = 0
#
# for rec in recs:
# id = rec.get('id', '')
# realpath = rec.get('realpath', '')
# fiid = rec.get('fiid', '')
# orgid = rec.get('ownerid', '')
# db_type = ''
# collection_name = "ragdb" if not db_type else f"ragdb_{db_type}"
#
# try:
# required_fields = ['id', 'realpath', 'fiid', 'ownerid']
# missing_fields = [field for field in required_fields if not rec.get(field, '')]
# if missing_fields:
# raise ValueError(f"缺少必填字段: {', '.join(missing_fields)}")
#
# service_params = await get_service_params(orgid)
# if not service_params:
# raise ValueError("无法获取服务参数")
#
# # 调用 Milvus 删除
# await self.delete_from_milvus(request, orgid, realpath, fiid, id, service_params, userid, db_type)
#
# # 调用 Neo4j 删除
# neo4j_deleted_nodes = 0
# neo4j_deleted_rels = 0
# try:
# nodes_deleted, rels_deleted = await self.delete_from_neo4j(request, id, service_params, userid)
# neo4j_deleted_nodes += nodes_deleted
# neo4j_deleted_rels += rels_deleted
# total_nodes_deleted += nodes_deleted
# total_rels_deleted += rels_deleted
# except Exception as e:
# error(f"删除 document_id={id} 的 Neo4j 数据失败: {str(e)}")
#
# results.append({
# "status": "success",
# "collection_name": collection_name,
# "document_id": id,
# "message": f"成功删除文件 {realpath} 的 Milvus 记录,{neo4j_deleted_nodes} 个 Neo4j 节点,{neo4j_deleted_rels} 个 Neo4j 关系",
# "status_code": 200
# })
#
# except Exception as e:
# error(f"删除文档 {realpath} 失败: {str(e)}, 堆栈: {traceback.format_exc()}")
# results.append({
# "status": "error",
# "collection_name": collection_name,
# "document_id": id,
# "message": f"删除文档 {realpath} 失败: {str(e)}",
# "status_code": 400
# })
#
# return {
# "status": "success" if all(r["status"] == "success" for r in results) else "partial",
# "results": results,
# "total_nodes_deleted": total_nodes_deleted,
# "total_rels_deleted": total_rels_deleted,
# "message": f"处理 {len(recs)} 个文件,成功删除 {sum(1 for r in results if r['status'] == 'success')} 个",
# "status_code": 200 if all(r["status"] == "success" for r in results) else 207
# }
# async def test_ragfilemgr():
# """测试 RagFileMgr 类的 get_service_params"""
# print("初始化数据库连接池...")
# dbs = {
# "kyrag": {
# "driver": "aiomysql",
# "async_mode": True,
# "coding": "utf8",
# "maxconn": 100,
# "dbname": "kyrag",
# "kwargs": {
# "user": "test",
# "db": "kyrag",
# "password": "QUZVcXg5V1p1STMybG5Ia6mX9D0v7+g=",
# "host": "db"
# }
# }
# }
# DBPools(dbs)
#
# ragfilemgr = RagFileMgr()
# orgid = "04J6VbxLqB_9RPMcgOv_8"
# result = await get_service_params(orgid)
# print(f"get_service_params 结果: {result}")
#
#
# if __name__ == "__main__":
# asyncio.run(test_ragfilemgr())