diff --git a/rag/folderinfo.py b/rag/folderinfo.py index 89215eb..afd1b9e 100644 --- a/rag/folderinfo.py +++ b/rag/folderinfo.py @@ -17,13 +17,15 @@ import traceback from filetxt.loader import fileloader,File2Text from ahserver.serverenv import get_serverenv from typing import List, Dict, Any -from rag.service_opts import get_service_params, sor_get_service_params +from rag.service_opts import get_service_params, sor_get_service_params, sor_get_embedding_mode, get_embedding_mode +from rag.fileprocess import extract_images_from_file from rag.rag_operations import RagOperations import json from rag.transaction_manager import TransactionContext from dataclasses import dataclass from enum import Enum - +import base64 +from pathlib import Path class RagFileMgr(FileMgr): def __init__(self, fiid): @@ -53,6 +55,10 @@ where a.orgid = b.orgid return r.quota, r.expired_date return None, None + async def file_to_base64(self,path: str) -> str: + with open(path, "rb") as f: + return base64.b64encode(f.read()).decode("utf-8") + async def file_uploaded(self, request, ns, userid): """将文档插入 Milvus 并抽取三元组到 Neo4j""" debug(f'Received ns: {ns=}') @@ -104,21 +110,107 @@ where a.orgid = b.orgid raise ValueError("无法获取服务参数") rollback_context["service_params"] = service_params + #获取嵌入模式 + embedding_mode = await get_embedding_mode(orgid) + debug(f"检测到 embedding_mode = {embedding_mode}(0=文本, 1=多模态)") + # 加载和分片文档 chunks = await self.rag_ops.load_and_chunk_document( realpath, timings, transaction_mgr=transaction_mgr ) - # 生成嵌入向量 - embeddings = await self.rag_ops.generate_embeddings( - request, chunks, service_params, userid, timings, transaction_mgr=transaction_mgr - ) + text_embeddings = None + multi_results = None + image_paths = [] - # 插入 Milvus - chunks_data = await self.rag_ops.insert_to_vector_db( - request, chunks, embeddings, realpath, orgid, fiid, id, - service_params, userid, db_type, timings, transaction_mgr=transaction_mgr + if embedding_mode == 1: + inputs = [] + # 文本 + for chunk in chunks: + inputs.append({"type": "text", "content": chunk.page_content}) + + debug("开始多模态图像抽取与嵌入") + image_paths = extract_images_from_file(realpath) + debug(f"从文档中抽取 {len(image_paths)} 张图像") + + if image_paths: + for img_path in image_paths: + try: + # 1. 自动识别真实格式 + ext = Path(img_path).suffix.lower() + if ext not in {".png", ".jpg", ".jpeg", ".webp", ".bmp"}: + ext = ".jpg" + + mime_map = { + ".png": "image/png", + ".jpg": "image/jpeg", + ".jpeg": "image/jpeg", + ".webp": "image/webp", + ".bmp": "image/bmp" + } + mime_type = mime_map.get(ext, "image/jpeg") + + # # 2. 智能压缩(>1MB 才压缩,节省 70% 流量) + # img = Image.open(img_path).convert("RGB") + # if os.path.getsize(img_path) > 1024 * 1024: # >1MB + # buffer = BytesIO() + # img.save(buffer, format="JPEG", quality=85, optimize=True) + # b64 = base64.b64encode(buffer.getvalue()).decode() + # data_uri = f"data:image/jpeg;base64,{b64}" + # else: + b64 = await self.file_to_base64(img_path) + data_uri = f"data:{mime_type};base64,{b64}" + + inputs.append({ + "type": "image", + "data": data_uri + }) + debug(f"已添加图像({mime_type}, {len(b64) / 1024:.1f}KB): {Path(img_path).name}") + + except Exception as e: + debug(f"图像处理失败,跳过: {img_path} → {e}") + # 即使失败也加个占位,防止顺序错乱 + inputs.append({ + "type": "image", + "data": "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mNkYAAAAAYAAjCB0C8AAAAASUVORK5CYII=" + }) + + debug(f"混排输入总数: {len(inputs)}(文本 {len(chunks)} + 图像 {len(image_paths)})") + + multi_results = await self.rag_ops.generate_multi_embeddings( + request=request, + inputs=inputs, + service_params=service_params, + userid=userid, + timings=timings, + transaction_mgr=transaction_mgr + ) + debug(f"多模态嵌入成功,返回 {len(multi_results)} 条结果") + else: + # 生成嵌入向量 + debug("【纯文本模式】使用 BGE 嵌入") + text_embeddings = await self.rag_ops.generate_embeddings( + request, chunks, service_params, userid, timings, transaction_mgr=transaction_mgr + ) + debug(f"BGE 嵌入完成: {len(text_embeddings)} 条") + + inserted = await self.rag_ops.insert_all_vectors( + request=request, + text_chunks=chunks, + realpath=realpath, + orgid=orgid, + fiid=fiid, + document_id=id, + service_params=service_params, + userid=userid, + db_type=db_type, + timings=timings, + img_paths=image_paths, + text_embeddings=text_embeddings, + multi_results=multi_results, + transaction_mgr=transaction_mgr ) + debug(f"统一插入: 文本 {inserted['text']}, 图像 {inserted['image']}, 人脸 {inserted['face']}") # 抽取三元组 triples = await self.rag_ops.extract_triples( diff --git a/rag/rag_operations.py b/rag/rag_operations.py index 0280397..06e20fd 100644 --- a/rag/rag_operations.py +++ b/rag/rag_operations.py @@ -103,84 +103,477 @@ class RagOperations: return embeddings - async def insert_to_vector_db(self, request, chunks: List[Document], embeddings: List[List[float]], - realpath: str, orgid: str, fiid: str, id: str, service_params: Dict, - userid: str, db_type: str, timings: Dict, - transaction_mgr: TransactionManager = None): - """插入向量数据库""" - debug(f"准备数据并调用插入文件端点: {realpath}") - filename = os.path.basename(realpath).rsplit('.', 1)[0] - ext = realpath.rsplit('.', 1)[1].lower() if '.' in realpath else '' + async def generate_multi_embeddings(self, request, inputs: List[Dict], service_params: Dict, + userid: str, timings: Dict, + transaction_mgr: TransactionManager = None) -> Dict[str, Dict]: + """调用多模态嵌入服务(CLIP)""" + debug("调用多模态嵌入服务") + start = time.time() + + result = await self.api_service.get_multi_embeddings( + request=request, + inputs=inputs, + upappid=service_params['embedding'], + apiname="black/clip", + user=userid + ) + debug(f"多模态返回结果是{result}") + timings["multi_embedding"] = time.time() - start + debug(f"多模态嵌入耗时: {timings['multi_embedding']:.2f}秒,处理 {len(result)} 条") + + # ==================== 新增:错误检查 + 过滤 ==================== + valid_results = {} + error_count = 0 + error_examples = [] + + for key, info in result.items(): + if info.get("type") == "error": + error_count += 1 + if len(error_examples) < 3: # 只记录前3个 + error_examples.append(f"{key} → {info['error']}") + # 直接丢弃错误条目 + continue + valid_results[key] = info + + if error_count > 0: + error(f"多模态嵌入失败 {error_count} 条!示例:{'; '.join(error_examples)}") + raise RuntimeError(f"多模态嵌入有{error_count} 条失败") + else: + debug("多模态嵌入全部成功!") + + if transaction_mgr: + transaction_mgr.add_operation( + OperationType.EMBEDDING, + {'count': len(result)} + ) + + return result + + # 统一插入向量库 + async def insert_all_vectors( + self, + request, + text_chunks: List[Document], + realpath: str, + orgid: str, + fiid: str, + document_id: str, + service_params: Dict, + userid: str, + db_type: str, + timings: Dict, + img_paths: List[str] = None, + text_embeddings: List[List[float]] = None, + multi_results: Dict = None, + transaction_mgr: TransactionManager = None + ) -> Dict[str, int]: + """ + 统一插入函数:支持两种模式 + 1. 纯文本模式:text_embeddings 有值 + 2. 多模态模式:multi_results 有值(来自 generate_multi_embeddings) + """ + img_paths = img_paths or [] + all_chunks = [] + start = time.time() + filename = os.path.basename(realpath) upload_time = datetime.now().isoformat() - chunks_data = [ - { - "userid": orgid, - "knowledge_base_id": fiid, - "text": chunk.page_content, - "vector": embeddings[i], - "document_id": id, - "filename": filename + '.' + ext, - "file_path": realpath, - "upload_time": upload_time, - "file_type": ext, - } - for i, chunk in enumerate(chunks) - ] + # ==================== 1. 纯文本模式(BGE) ==================== + if text_embeddings is not None: + debug(f"【纯文本模式】插入 {len(text_embeddings)} 条文本向量") + for i, chunk in enumerate(text_chunks): + all_chunks.append({ + "userid": orgid, + "knowledge_base_id": fiid, + "text": chunk.page_content, + "vector": text_embeddings[i], + "document_id": document_id, + "filename": filename, + "file_path": realpath, + "upload_time": upload_time, + "file_type": "text", + }) - start_milvus = time.time() - for i in range(0, len(chunks_data), 10): - batch_chunks = chunks_data[i:i + 10] - debug(f"传入的数据是:{batch_chunks}") + # ==================== 2. 多模态模式(CLIP 混排) ==================== + if multi_results is not None: + debug(f"【多模态模式】解析 {len(multi_results)} 条 CLIP 结果") + # 遍历 multi_results + for raw_key, info in multi_results.items(): + typ = info["type"] + + # --- 文本 --- + if typ == "text": + # raw_key 就是原文 + all_chunks.append({ + "userid": orgid, + "knowledge_base_id": fiid, + "text": raw_key, + "vector": info["vector"], + "document_id": document_id, + "filename": filename, + "file_path": realpath, + "upload_time": upload_time, + "file_type": "text", + }) + continue + + # --- 图像 --- + if typ == "image": + img_path = info.get("path") or raw_key + img_name = os.path.basename(img_path) + + # 整图向量 + if "vector" in info: + all_chunks.append({ + "userid": orgid, + "knowledge_base_id": fiid, + "text": f"[Image: {img_path}]图片来源于文件{realpath}", + "vector": info["vector"], + "document_id": document_id, + "filename": img_name, + "file_path": realpath, + "upload_time": upload_time, + "file_type": "image", + }) + + # 人脸向量 + face_vecs = info.get("face_vecs", []) + face_count = len(face_vecs) + # if face_count > 0: + # for f_idx, fvec in enumerate(face_vecs): + # debug(f"人脸向量维度是:{len(fvec)}") + # all_chunks.append({ + # "userid": orgid, + # "knowledge_base_id": fiid, + # "text": f"[Face {f_idx + 1}/{face_count} in {img_name}]人脸来源于{realpath}的{img_path}图片", + # "vector": fvec, + # "document_id": document_id, + # "filename": img_name, + # "file_path": realpath, + # "upload_time": upload_time, + # "file_type": "face", + # }) + # continue + + # --- 视频 --- + if typ == "video": + video_path = info.get("path") or raw_key + video_name = os.path.basename(video_path) + + if "vector" in info: + all_chunks.append({ + "userid": orgid, + "knowledge_base_id": fiid, + "text": f"[Video: {video_name}]", + "vector": info["vector"], + "document_id": document_id, + "filename": video_path, + "file_path": realpath, + "upload_time": upload_time, + "file_type": "video", + }) + + # 视频人脸 + face_vecs = info.get("face_vecs", []) + face_count = len(face_vecs) + # if face_count > 0 : + # for f_idx, fvec in enumerate(face_vecs): + # all_chunks.append({ + # "userid": orgid, + # "knowledge_base_id": fiid, + # "text": f"[Face {f_idx + 1}/{face_count} in video {video_name}]来源于{video_path}", + # "vector": fvec, + # "document_id": document_id, + # "filename": video_path, + # "file_path": realpath, + # "upload_time": upload_time, + # "file_type": "face", + # }) + # continue + + # --- 音频 --- + if typ == "audio": + audio_path = info.get("path") or raw_key + audio_name = os.path.basename(audio_path) + + if "vector" in info: + all_chunks.append({ + "userid": orgid, + "knowledge_base_id": fiid, + "text": f"[Audio: {audio_name}]", + "vector": info["vector"], + "document_id": document_id, + "filename": audio_path, + "file_path": realpath, + "upload_time": upload_time, + "file_type": "audio", + }) + continue + + # --- 未知类型 --- + debug(f"未知类型跳过: {typ} → {raw_key}") + + # ==================== 3. 批量插入 Milvus ==================== + if not all_chunks: + debug("无向量需要插入") + return {"text": 0, "image": 0, "face": 0} + + for i in range(0, len(all_chunks), 10): + batch = all_chunks[i:i + 10] result = await self.api_service.milvus_insert_document( request=request, - chunks=batch_chunks, - db_type=db_type, + chunks=batch, upappid=service_params['vdb'], apiname="milvus/insertdocument", - user=userid + user=userid, + db_type=db_type ) if result.get("status") != "success": - raise ValueError(result.get("message", "Milvus 插入失败")) + raise ValueError(f"Milvus 插入失败: {result.get('message')}") - timings["insert_milvus"] = time.time() - start_milvus - debug(f"Milvus 插入耗时: {timings['insert_milvus']:.2f} 秒") - - # 记录事务操作,包含回滚函数 - if transaction_mgr: - async def rollback_vdb_insert(data, context): + # ==================== 4. 统一回滚(只登记一次) ==================== + if transaction_mgr and all_chunks: + async def rollback_all(data, context): try: - # 防御性检查 - required_context = ['request', 'service_params', 'userid'] - missing_context = [k for k in required_context if k not in context or context[k] is None] - if missing_context: - raise ValueError(f"回滚上下文缺少字段: {', '.join(missing_context)}") - - required_data = ['orgid', 'realpath', 'fiid', 'id', 'db_type'] - missing_data = [k for k in required_data if k not in data or data[k] is None] - if missing_data: - raise ValueError(f"VDB_INSERT 数据缺少字段: {', '.join(missing_data)}") - await self.delete_from_vector_db( - context['request'], data['orgid'], data['realpath'], - data['fiid'], data['id'], context['service_params'], - context['userid'], data['db_type'] + request=context['request'], + orgid=data['orgid'], + realpath=data['realpath'], + fiid=data['fiid'], + id=data['document_id'], + service_params=context['service_params'], + userid=context['userid'], + db_type=data['db_type'] ) - return f"已回滚向量数据库插入: {data['id']}" + return f"已回滚 document_id={data['document_id']} 的所有向量" except Exception as e: - error(f"回滚向量数据库失败: document_id={data.get('id', '未知')}, 错误: {str(e)}") + error(f"统一回滚失败: {e}") raise transaction_mgr.add_operation( OperationType.VDB_INSERT, { - 'orgid': orgid, 'realpath': realpath, 'fiid': fiid, - 'id': id, 'db_type': db_type + 'orgid': orgid, + 'realpath': realpath, + 'fiid': fiid, + 'id': document_id, + 'db_type': db_type }, - rollback_func=rollback_vdb_insert + rollback_func=rollback_all ) - return chunks_data + # ==================== 5. 统计返回 ==================== + stats = { + "text": len([c for c in all_chunks if c["file_type"] == "text"]), + "image": len([c for c in all_chunks if c["file_type"] == "image"]), + "face": len([c for c in all_chunks if c["file_type"] == "face"]) + } + + timings["insert_all"] = time.time() - start + debug( + f"统一插入完成: 文本 {stats['text']}, 图像 {stats['image']}, 人脸 {stats['face']}, 耗时 {timings['insert_all']:.2f}s") + return stats + # async def insert_to_vector_db(self, request, chunks: List[Document], embeddings: List[List[float]], + # realpath: str, orgid: str, fiid: str, id: str, service_params: Dict, + # userid: str, db_type: str, timings: Dict, + # transaction_mgr: TransactionManager = None): + # """插入向量数据库""" + # debug(f"准备数据并调用插入文件端点: {realpath}") + # filename = os.path.basename(realpath).rsplit('.', 1)[0] + # ext = realpath.rsplit('.', 1)[1].lower() if '.' in realpath else '' + # upload_time = datetime.now().isoformat() + # + # chunks_data = [ + # { + # "userid": orgid, + # "knowledge_base_id": fiid, + # "text": chunk.page_content, + # "vector": embeddings[i], + # "document_id": id, + # "filename": filename + '.' + ext, + # "file_path": realpath, + # "upload_time": upload_time, + # "file_type": ext, + # } + # for i, chunk in enumerate(chunks) + # ] + # + # start_milvus = time.time() + # for i in range(0, len(chunks_data), 10): + # batch_chunks = chunks_data[i:i + 10] + # debug(f"传入的数据是:{batch_chunks}") + # result = await self.api_service.milvus_insert_document( + # request=request, + # chunks=batch_chunks, + # db_type=db_type, + # upappid=service_params['vdb'], + # apiname="milvus/insertdocument", + # user=userid + # ) + # if result.get("status") != "success": + # raise ValueError(result.get("message", "Milvus 插入失败")) + # + # timings["insert_milvus"] = time.time() - start_milvus + # debug(f"Milvus 插入耗时: {timings['insert_milvus']:.2f} 秒") + # + # # 记录事务操作,包含回滚函数 + # if transaction_mgr: + # async def rollback_vdb_insert(data, context): + # try: + # # 防御性检查 + # required_context = ['request', 'service_params', 'userid'] + # missing_context = [k for k in required_context if k not in context or context[k] is None] + # if missing_context: + # raise ValueError(f"回滚上下文缺少字段: {', '.join(missing_context)}") + # + # required_data = ['orgid', 'realpath', 'fiid', 'id', 'db_type'] + # missing_data = [k for k in required_data if k not in data or data[k] is None] + # if missing_data: + # raise ValueError(f"VDB_INSERT 数据缺少字段: {', '.join(missing_data)}") + # + # await self.delete_from_vector_db( + # context['request'], data['orgid'], data['realpath'], + # data['fiid'], data['id'], context['service_params'], + # context['userid'], data['db_type'] + # ) + # return f"已回滚向量数据库插入: {data['id']}" + # except Exception as e: + # error(f"回滚向量数据库失败: document_id={data.get('id', '未知')}, 错误: {str(e)}") + # raise + # + # transaction_mgr.add_operation( + # OperationType.VDB_INSERT, + # { + # 'orgid': orgid, 'realpath': realpath, 'fiid': fiid, + # 'id': id, 'db_type': db_type + # }, + # rollback_func=rollback_vdb_insert + # ) + # + # return chunks_data + # + # async def insert_image_vectors( + # self, + # request, + # multi_results: Dict[str, Dict], + # realpath: str, + # orgid: str, + # fiid: str, + # document_id: str, + # service_params: Dict, + # userid: str, + # db_type: str, + # timings: Dict, + # transaction_mgr: TransactionManager = None + # ) -> tuple[int, int]: + # + # start = time.time() + # image_chunks = [] + # face_chunks = [] + # + # for img_path, info in multi_results.items(): + # # img_name = os.path.basename(img_path) + # + # # 1. 插入整张图 + # if info.get("type") in ["image", "video"] and "vector" in info: + # image_chunks.append({ + # "userid": orgid, + # "knowledge_base_id": fiid, + # "text": f"[Image: {img_path}]", + # "vector": info["vector"], + # "document_id": document_id, + # "filename": os.path.basename(realpath), + # "file_path": realpath, + # "upload_time": datetime.now().isoformat(), + # "file_type": "image" + # }) + # + # # 2. 插入每张人脸 + # face_vecs = info.get("face_vecs") + # face_count = info.get("face_count", 0) + # + # if face_count > 0 and face_vecs and len(face_vecs) == face_count: + # for idx, face_vec in enumerate(face_vecs): + # face_chunks.append({ + # "userid": orgid, + # "knowledge_base_id": fiid, + # "text": f"[Face {idx + 1}/{face_count} in {img_path}]", + # "vector": face_vec, + # "document_id": document_id, + # "filename": os.path.basename(realpath), + # "file_path": realpath, + # "upload_time": datetime.now().isoformat(), + # "file_type": "face", + # }) + # + # if image_chunks: + # for i in range(0, len(image_chunks), 10): + # await self.api_service.milvus_insert_document( + # request=request, + # chunks=image_chunks[i:i + 10], + # upappid=service_params['vdb'], + # apiname="milvus/insertdocument", + # user=userid, + # db_type=db_type + # ) + # + # if face_chunks: + # for i in range(0, len(face_chunks), 10): + # await self.api_service.milvus_insert_document( + # request=request, + # chunks=face_chunks[i:i + 10], + # upappid=service_params['vdb'], + # apiname="milvus/insertdocument", + # user=userid, + # db_type=db_type + # ) + # timings["insert_images"] = time.time() - start + # image_count = len(image_chunks) + # face_count = len(face_chunks) + # + # debug(f"多模态插入完成: 图像 {image_count} 条, 人脸 {face_count} 条") + # + # if transaction_mgr and (image_count + face_count > 0): + # transaction_mgr.add_operation( + # OperationType.IMAGE_VECTORS_INSERT, + # {"images": image_count, "faces": face_count, "document_id": document_id} + # ) + # + # # 记录事务操作,包含回滚函数 + # if transaction_mgr: + # async def rollback_multimodal(data, context): + # try: + # # 防御性检查 + # required_context = ['request', 'service_params', 'userid'] + # missing_context = [k for k in required_context if k not in context or context[k] is None] + # if missing_context: + # raise ValueError(f"回滚上下文缺少字段: {', '.join(missing_context)}") + # + # required_data = ['orgid', 'realpath', 'fiid', 'id', 'db_type'] + # missing_data = [k for k in required_data if k not in data or data[k] is None] + # if missing_data: + # raise ValueError(f"多模态回滚数据缺少字段: {', '.join(missing_data)}") + # + # await self.delete_from_vector_db( + # context['request'], data['orgid'], data['realpath'], + # data['fiid'], data['id'], context['service_params'], + # context['userid'], data['db_type'] + # ) + # return f"已回滚多模态向量: {data['id']}" + # except Exception as e: + # error(f"多模态回滚向量数据库失败: document_id={data.get('id', '未知')}, 错误: {str(e)}") + # raise + # + # transaction_mgr.add_operation( + # OperationType.VDB_INSERT, + # { + # 'orgid': orgid, 'realpath': realpath, 'fiid': fiid, + # 'id': id, 'db_type': db_type + # }, + # rollback_func=rollback_multimodal + # ) + # + # return image_count, face_count async def insert_to_vector_text(self, request, db_type: str, fields: Dict, service_params: Dict, userid: str, timings: Dict) -> List[Dict]: diff --git a/rag/service_opts.py b/rag/service_opts.py index 37f3b77..9cb7248 100644 --- a/rag/service_opts.py +++ b/rag/service_opts.py @@ -57,11 +57,12 @@ async def sor_get_service_params(sor, orgid): service_params['reranker'] = service['upappid'] elif name == 'mrebel三元组抽取': service_params['triples'] = service['upappid'] - elif name == 'neo4j删除知识库': + elif name == 'neo4j图知识库': service_params['gdb'] = service['upappid'] elif name == 'small实体抽取': service_params['entities'] = service['upappid'] - + elif name == 'clip多模态嵌入服务': + service_params['embedding'] = service['upappid'] # 检查是否所有服务参数都已填充 missing_services = [k for k, v in service_params.items() if v is None] if missing_services: @@ -76,3 +77,25 @@ async def get_service_params(orgid): async with db.sqlorContext(dbname) as sor: return await sor_get_service_params(sor, orgid) return None + +async def sor_get_embedding_mode(sor, orgid) -> int: + """根据 orgid 获取嵌入模式:0=纯文本,1=多模态""" + sql = """ + SELECT em.mode + FROM service_opts so + JOIN embedding_mode em ON so.embedding_id = em.embeddingid + WHERE so.orgid = ${orgid}$ + """ + rows = await sor.sqlExe(sql, {"orgid": orgid}) + if not rows: + debug(f"orgid={orgid} 未配置 embedding_mode,默认为 0(纯文本)") + return 0 + return int(rows[0].mode) + +async def get_embedding_mode(orgid): + db = DBPools() + debug(f"传入的orgid是:{orgid}") + dbname = get_serverenv('get_module_dbname')('rag') + async with db.sqlorContext(dbname) as sor: + return await sor_get_embedding_mode(sor, orgid) + return None \ No newline at end of file diff --git a/rag/uapi_service.py b/rag/uapi_service.py index c429dc5..1d2ed6c 100644 --- a/rag/uapi_service.py +++ b/rag/uapi_service.py @@ -45,6 +45,43 @@ class APIService: error(f"request #{request_id} 嵌入服务调用失败: {str(e)}, upappid={upappid}, apiname={apiname}") raise RuntimeError(f"嵌入服务调用失败: {str(e)}") + #多模态嵌入服务 + async def get_multi_embeddings( + self, + request, + inputs: List[Dict], + upappid: str, + apiname: str, + user: str + ) -> Dict[str, Dict]: + """ + 多模态统一嵌入(支持文本、图片、音频、视频) + 返回原始输入字符串为 key 的完整结果,含 type / vector / 人脸信息 + """ + request_id = str(uuid.uuid4()) + debug(f"Request #{request_id} 多模态嵌入开始,共{len(inputs)}项") + + if not inputs or not isinstance(inputs, list): + raise ValueError("inputs 必须为非空列表") + + try: + uapi = UAPI(request, DictObject(**globals())) + params_kw = {"inputs": inputs} + b = await uapi.call(upappid, apiname, user, params_kw) + d = await self.handle_uapi_response(b, upappid, apiname, "多模态嵌入服务", request_id) + + if d.get("object") != "embedding.result" or "data" not in d: + error(f"request #{request_id} 返回格式错误: {d}") + raise RuntimeError("多模态嵌入返回格式错误") + + result = d["data"] # 直接返回 {input_str: {type, vector, ...}} + debug(f"request #{request_id} 成功获取 {len(result)} 条多模态向量") + return result + + except Exception as e: + error(f"request #{request_id} 多模态嵌入失败: {str(e)}") + raise RuntimeError(f"多模态嵌入失败: {str(e)}") + # 实体提取服务 (LTP/small) async def extract_entities(self, request, query: str, upappid: str, apiname: str, user: str) -> list: """调用实体识别服务"""