diff --git a/rag/api_service.py b/rag/api_service.py index ce78f10..532d739 100644 --- a/rag/api_service.py +++ b/rag/api_service.py @@ -113,7 +113,7 @@ class APIService: error(f"Request #{request_id} failed, status: {response.status}, response: {error_text}") raise RuntimeError(f"三元组抽取服务调用失败: {response.status}, {error_text}") result = await response.json() - if result.get("object") != "list" or not result.get("data"): + if result.get("object") != "list": error(f"Request #{request_id} invalid response format: {result}") raise RuntimeError("三元组抽取服务响应格式错误") triples = result["data"] @@ -231,6 +231,13 @@ class APIService: "chunks": chunks, "dbtype": db_type } + + # 计算请求体大小 + payload = json.dumps(params) # 转换为 JSON 字符串 + payload_bytes = payload.encode() # 编码为字节 + payload_size = len(payload_bytes) # 获取字节数 + debug(f"Request payload size for insertdocument: {payload_size} bytes") + return await self._make_request("https://vectordb.opencomputing.net:10443/v1/insertdocument", "insertdocument", params) async def milvus_delete_document(self, userid: str, file_path: str, knowledge_base_id: str, document_id:str, db_type: str = "") -> Dict[str, Any]: diff --git a/rag/file.py b/rag/file.py index 7a156d3..d865fbf 100644 --- a/rag/file.py +++ b/rag/file.py @@ -1,4 +1,4 @@ -from api_service import APIService +from rag.api_service import APIService from appPublic.registerfunction import RegisterFunction from appPublic.log import debug, error, info from sqlor.dbpools import DBPools @@ -14,6 +14,9 @@ from datetime import datetime import traceback from filetxt.loader import fileloader from ahserver.serverenv import get_serverenv +from typing import List, Dict, Any + +api_service = APIService() async def get_orgid_by_id(kdb_id): """ @@ -38,7 +41,6 @@ async def get_orgid_by_id(kdb_id): async def file_uploaded(params_kw): """将文档插入 Milvus 并抽取三元组到 Neo4j""" debug(f'Received params: {params_kw=}') - api_service = APIService() realpath = params_kw.get('realpath', '') fiid = params_kw.get('fiid', '') id = params_kw.get('id', '') @@ -52,6 +54,7 @@ async def file_uploaded(params_kw): try: if not orgid or not fiid or not id: raise ValueError("orgid、fiid 和 id 不能为空") + debug(f'orgid、fiid 和 id 不能为空') if len(orgid) > 32 or len(fiid) > 255: raise ValueError("orgid 或 fiid 的长度超出限制") if not os.path.exists(realpath): @@ -90,7 +93,11 @@ async def file_uploaded(params_kw): debug("调用嵌入服务生成向量") start_embedding = time.time() texts = [chunk.page_content for chunk in chunks] - embeddings = await api_service.get_embeddings(texts) + embeddings = [] + for i in range(0, len(texts), 10): # 每次处理 10 个文本块 + batch_texts = texts[i:i + 10] + batch_embeddings = await api_service.get_embeddings(batch_texts) + embeddings.extend(batch_embeddings) if not embeddings or not all(len(vec) == 1024 for vec in embeddings): raise ValueError("所有嵌入向量必须是长度为 1024 的浮点数列表") timings["generate_embeddings"] = time.time() - start_embedding @@ -112,7 +119,11 @@ async def file_uploaded(params_kw): debug(f"调用插入文件端点: {realpath}") start_milvus = time.time() - result = await api_service.milvus_insert_document(chunks_data, db_type) + for i in range(0, len(chunks_data), 10): # 每次处理 10 条数据 + batch_chunks = chunks_data[i:i + 10] + result = await api_service.milvus_insert_document(batch_chunks, db_type) + if result.get("status") != "success": + raise ValueError(result.get("message", "Milvus 插入失败")) timings["insert_milvus"] = time.time() - start_milvus debug(f"Milvus 插入耗时: {timings['insert_milvus']:.2f} 秒") @@ -158,8 +169,9 @@ async def file_uploaded(params_kw): debug(f"抽取到 {len(unique_triples)} 个三元组,调用 Neo4j 服务插入") start_neo4j = time.time() - if unique_triples: - neo4j_result = await api_service.neo4j_insert_triples(unique_triples, id, fiid, orgid) + for i in range(0, len(unique_triples), 30): # 每次插入 30 个三元组 + batch_triples = unique_triples[i:i + 30] + neo4j_result = await api_service.neo4j_insert_triples(batch_triples, id, fiid, orgid) debug(f"Neo4j 服务响应: {neo4j_result}") if neo4j_result.get("status") != "success": timings["insert_neo4j"] = time.time() - start_neo4j @@ -194,7 +206,6 @@ async def file_uploaded(params_kw): async def file_deleted(params_kw): """删除用户指定文件数据,包括 Milvus 和 Neo4j 中的记录""" - api_service = APIService() id = params_kw.get('id', '') realpath = params_kw.get('realpath', '') fiid = params_kw.get('fiid', '') @@ -246,6 +257,72 @@ async def file_deleted(params_kw): "status_code": 400 } +async def _search_query(query: str, userid: str, knowledge_base_ids: List[str], limit: int = 5, + offset: int = 0, use_rerank: bool = True, db_type: str = "") -> Dict[str, Any]: + """纯向量搜索,调用服务化端点""" + start_time = time.time() + collection_name = "ragdb" if not db_type else f"ragdb_{db_type}" + timing_stats = {} + try: + info( + f"开始纯向量搜索: query={query}, userid={userid}, db_type={db_type}, knowledge_base_ids={knowledge_base_ids}, limit={limit}, offset={offset}, use_rerank={use_rerank}") + + if not query: + raise ValueError("查询文本不能为空") + if not userid: + raise ValueError("userid 不能为空") + if limit <= 0 or limit > 16384: + raise ValueError("limit 必须在 1 到 16384 之间") + if offset < 0: + raise ValueError("offset 不能为负数") + if limit + offset > 16384: + raise ValueError("limit + offset 不能超过 16384") + if not knowledge_base_ids: + raise ValueError("knowledge_base_ids 不能为空") + for kb_id in knowledge_base_ids: + if not isinstance(kb_id, str): + raise ValueError(f"knowledge_base_id 必须是字符串: {kb_id}") + if len(kb_id) > 100: + raise ValueError(f"knowledge_base_id 长度超出 100 个字符: {kb_id}") + + # 将查询文本转换为向量 + vector_start = time.time() + query_vector = await api_service.get_embeddings([query]) + if not query_vector or not all(len(vec) == 1024 for vec in query_vector): + raise ValueError("查询向量必须是长度为 1024 的浮点数列表") + query_vector = query_vector[0] # 取第一个向量 + timing_stats["vector_generation"] = time.time() - vector_start + debug(f"生成查询向量耗时: {timing_stats['vector_generation']:.3f} 秒") + + # 调用纯向量搜索端点 + search_start = time.time() + result = await api_service.milvus_search_query(query_vector, userid, knowledge_base_ids, limit, offset) + timing_stats["vector_search"] = time.time() - search_start + debug(f"向量搜索耗时: {timing_stats['vector_search']:.3f} 秒") + + if result.get("status") != "success": + error(f"纯向量搜索失败: {result.get('message', '未知错误')}") + return {"results": [], "timing": timing_stats} + + unique_results = result.get("results", []) + if use_rerank and unique_results: + rerank_start = time.time() + debug("开始重排序") + unique_results = await api_service.rerank_results(query, unique_results, limit) + unique_results = sorted(unique_results, key=lambda x: x.get('rerank_score', 0), reverse=True) + timing_stats["reranking"] = time.time() - rerank_start + debug(f"重排序耗时: {timing_stats['reranking']:.3f} 秒") + debug(f"重排序分数分布: {[round(r.get('rerank_score', 0), 3) for r in unique_results]}") + else: + unique_results = [{k: v for k, v in r.items() if k != 'rerank_score'} for r in unique_results] + + timing_stats["total_time"] = time.time() - start_time + info(f"纯向量搜索完成,返回 {len(unique_results)} 条结果,总耗时: {timing_stats['total_time']:.3f} 秒") + return {"results": unique_results[:limit], "timing": timing_stats} + + except Exception as e: + error(f"纯向量搜索失败: {str(e)}, 堆栈: {traceback.format_exc()}") + return {"results": [], "timing": timing_stats} async def main(): dbs = { @@ -286,6 +363,18 @@ async def main(): # delete_result = await file_deleted(test_params_delete) # print(f"file_deleted 结果: {delete_result}") + # # 测试 _search_query + # print("测试 _search_query...") + # test_params_query = { + # "query": "什么是关系抽取", + # "userid": "04J6VbxLqB_9RPMcgOv_8", + # "knowledge_base_ids": ["1"], + # "limit": 5, + # "offset": 0, + # "use_rerank": True + # } + # query_result = await _search_query(query="什么是知识融合?", userid="testuser1", knowledge_base_ids=["kb1", "kb2"], limit=5, offset=0, use_rerank=True, db_type="") + # print(f"file_uploaded 结果: {query_result}") if __name__ == "__main__": asyncio.run(main()) diff --git a/rag/folderinfo.py b/rag/folderinfo.py index d075e99..07d4fa9 100644 --- a/rag/folderinfo.py +++ b/rag/folderinfo.py @@ -153,10 +153,10 @@ where a.orgid = b.orgid db = DBPools() dbname = "kyrag" sql_check_hash = """ - SELECT hashvalue - FROM file - WHERE hashvalue = ${hashvalue}$ - """ + SELECT hashvalue + FROM file + WHERE hashvalue = ${hashvalue}$ + """ async with db.sqlorContext(dbname) as sor: hash_result = await sor.sqlExe(sql_check_hash, {"hashvalue": hashvalue}) if hash_result: @@ -184,6 +184,7 @@ where a.orgid = b.orgid debug(f"加载文件: {realpath}") start_load = time.time() text = fileloader(realpath) + # debug(f"处理后的文件内容是:{text=}") text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s.;,\n]', '', text) timings["load_file"] = time.time() - start_load debug(f"加载文件耗时: {timings['load_file']:.2f} 秒, 文本长度: {len(text)}") @@ -249,7 +250,7 @@ where a.orgid = b.orgid chunks=batch_chunks, db_type=db_type, upappid=service_params['vdb'], - apiname="milvus/insertdocument", # 固定 apiname + apiname="milvus/insertdocument", user=userid ) if result.get("status") != "success": @@ -264,6 +265,7 @@ where a.orgid = b.orgid debug("调用三元组抽取服务") start_triples = time.time() + unique_triples = [] try: chunk_texts = [doc.page_content for doc in chunks] debug(f"处理 {len(chunk_texts)} 个分片进行三元组抽取") @@ -272,7 +274,7 @@ where a.orgid = b.orgid request=request, text=chunk, upappid=service_params['triples'], - apiname="Babelscape/mrebel-large", # 固定 apiname + apiname="Babelscape/mrebel-large", user=userid ) for chunk in chunk_texts ] @@ -285,7 +287,6 @@ where a.orgid = b.orgid else: error(f"分片 {i + 1} 处理失败: {str(result)}") - unique_triples = [] seen = set() for t in triples: identifier = (t['head'].lower(), t['tail'].lower(), t['type'].lower()) @@ -306,58 +307,81 @@ where a.orgid = b.orgid debug( f"三元组抽取耗时: {timings['extract_triples']:.2f} 秒, 抽取到 {len(unique_triples)} 个三元组: {unique_triples[:5]}") - debug(f"抽取到 {len(unique_triples)} 个三元组,调用 Neo4j 服务插入") - start_neo4j = time.time() - for i in range(0, len(unique_triples), 30): # 每次插入 30 个三元组 - batch_triples = unique_triples[i:i + 30] - neo4j_result = await api_service.neo4j_insert_triples( - request=request, - triples=batch_triples, - document_id=id, - knowledge_base_id=fiid, - userid=orgid, - upappid=service_params['gdb'], - apiname="neo4j/inserttriples", # 固定 apiname - user=userid - ) - debug(f"Neo4j 服务响应: {neo4j_result}") - if neo4j_result.get("status") != "success": - timings["insert_neo4j"] = time.time() - start_neo4j - timings["total"] = time.time() - start_total - return {"status": "error", "document_id": id, "collection_name": "ragdb", + if unique_triples: + debug(f"抽取到 {len(unique_triples)} 个三元组,调用 Neo4j 服务插入") + start_neo4j = time.time() + for i in range(0, len(unique_triples), 30): # 每次插入 30 个三元组 + batch_triples = unique_triples[i:i + 30] + neo4j_result = await api_service.neo4j_insert_triples( + request=request, + triples=batch_triples, + document_id=id, + knowledge_base_id=fiid, + userid=orgid, + upappid=service_params['gdb'], + apiname="neo4j/inserttriples", + user=userid + ) + debug(f"Neo4j 服务响应: {neo4j_result}") + if neo4j_result.get("status") != "success": + timings["insert_neo4j"] = time.time() - start_neo4j + timings["total"] = time.time() - start_total + return { + "status": "error", + "document_id": id, + "collection_name": "ragdb", "timings": timings, "message": f"Neo4j 三元组插入失败: {neo4j_result.get('message', '未知错误')}", - "status_code": 400} - info(f"文件 {realpath} 三元组成功插入 Neo4j: {neo4j_result.get('message')}") + "status_code": 400 + } + info(f"文件 {realpath} 三元组成功插入 Neo4j: {neo4j_result.get('message')}") + timings["insert_neo4j"] = time.time() - start_neo4j + debug(f"Neo4j 插入耗时: {timings['insert_neo4j']:.2f} 秒") else: debug(f"文件 {realpath} 未抽取到三元组") - timings["insert_neo4j"] = time.time() - start_neo4j - debug(f"Neo4j 插入耗时: {timings['insert_neo4j']:.2f} 秒") + timings["insert_neo4j"] = 0.0 except Exception as e: timings["extract_triples"] = time.time() - start_triples if "extract_triples" not in timings else \ - timings[ - "extract_triples"] - timings["insert_neo4j"] = time.time() - start_neo4j + timings["extract_triples"] + timings["insert_neo4j"] = time.time() - start_neo4j if "insert_neo4j" not in timings else timings[ + "insert_neo4j"] debug(f"处理三元组或 Neo4j 插入失败: {str(e)}, 堆栈: {traceback.format_exc()}") timings["total"] = time.time() - start_total - return {"status": "success", "document_id": id, "collection_name": "ragdb", "timings": timings, - "unique_triples": unique_triples, - "message": f"文件 {realpath} 成功嵌入,但三元组处理或 Neo4j 插入失败: {str(e)}", - "status_code": 200} + return { + "status": "success", + "document_id": id, + "collection_name": "ragdb", + "timings": timings, + "unique_triples": unique_triples, + "message": f"文件 {realpath} 成功嵌入,但三元组处理或 Neo4j 插入失败: {str(e)}", + "status_code": 200 + } timings["total"] = time.time() - start_total debug(f"总耗时: {timings['total']:.2f} 秒") - return {"status": "success", "userid": orgid, "document_id": id, "collection_name": "ragdb", - "timings": timings, - "unique_triples": unique_triples, "message": f"文件 {realpath} 成功嵌入并处理三元组", - "status_code": 200} + return { + "status": "success", + "userid": orgid, + "document_id": id, + "collection_name": "ragdb", + "timings": timings, + "unique_triples": unique_triples, + "message": f"文件 {realpath} 成功嵌入并处理三元组", + "status_code": 200 + } except Exception as e: error(f"插入文档失败: {str(e)}, 堆栈: {traceback.format_exc()}") timings["total"] = time.time() - start_total - return {"status": "error", "document_id": id, "collection_name": "ragdb", "timings": timings, - "message": f"插入文档失败: {str(e)}", "status_code": 400} + return { + "status": "error", + "document_id": id, + "collection_name": "ragdb", + "timings": timings, + "message": f"插入文档失败: {str(e)}", + "status_code": 400 + } async def file_deleted(self, request, recs, userid): """删除用户指定文件数据,包括 Milvus 和 Neo4j 中的记录""" diff --git a/wwwroot/programs.ui b/wwwroot/programs.ui index ed73b8c..88a30d3 100644 --- a/wwwroot/programs.ui +++ b/wwwroot/programs.ui @@ -25,7 +25,7 @@ } }, { - "widgettype":"text", + "widgettype":"Text", "options":{ "otext":"{{p.description}}", "i18n":true, @@ -37,18 +37,18 @@ { "widgettype":"HBox", "options":{ - "height":1.5 + "cheight":1.5 }, "subwidgets":[ { - "widgettype":"text", + "widgettype":"Text", "options":{ "otext":"可用磁盘容量", "i18n":true } }, { - "widgettype":"text", + "widgettype":"Text", "options":{ "text":"{{p.quota / 1000000}}M" } @@ -58,18 +58,18 @@ { "widgettype":"HBox", "options":{ - "height":1.5 + "cheight":1.5 }, "subwidgets":[ { - "widgettype":"text", + "widgettype":"Text", "options":{ "otext":"时长(月)", "i18n":true } }, { - "widgettype":"text", + "widgettype":"Text", "options":{ "text":"{{p.term}}" } @@ -79,18 +79,18 @@ { "widgettype":"HBox", "options":{ - "height":1.5 + "cheight":1.5 }, "subwidgets":[ { - "widgettype":"text", + "widgettype":"Text", "options":{ "otext":"价格", "i18n":true } }, { - "widgettype":"text", + "widgettype":"Text", "options":{ "color":"red", "text":"{{p.price}}圆" @@ -112,7 +112,7 @@ }, "options":{ "params":{ - "selected_program":"p.id" +"selected_program":"{{p.id}}" }, "url":"{{entire_url('./program_selected.dspy')}}" }