diff --git a/rag/fileprocess.py b/rag/fileprocess.py new file mode 100644 index 0000000..87e4863 --- /dev/null +++ b/rag/fileprocess.py @@ -0,0 +1,341 @@ +import numpy as np +import os +import re +from pdf2image import convert_from_path +from appPublic.log import debug, error, info +from pathlib import Path +import zipfile +import xml.etree.ElementTree as ET +from PIL import Image +from typing import List + +# ==================== 新增:路径安全化函数 ==================== +def safe_filename(name: str) -> str: + """ + 安全化文件名/目录名 + - 去除首尾空格 + - 多个空格 → 单空格 + - 非法字符 → 下划线 + - 空格 → 下划线(推荐!永不炸) + """ + name = name.strip() + name = re.sub(r'\s+', ' ', name) # 多个空格合并 + name = re.sub(r'[<>:"/\\|?*]', '_', name) # 非法字符 + name = name.replace(' ', '_') # 空格 → 下划线(关键!) + return name + +def render_pdf_to_images(pdf_path, base_output_dir, dpi=200, image_format="PNG")-> List[str]: + """ + 将PDF文件的每一页渲染为图片 + + 参数: + pdf_path (str): PDF文件路径 + page_output_dir (str): 输出图片的目录 + dpi (int): 图片分辨率,默认200 + image_format (str): 图片格式,默认PNG + + 返回: + int: 成功渲染的页面数量 + """ + pdf_filename = safe_filename(Path(pdf_path).stem) + page_output_dir = os.path.join(base_output_dir, pdf_filename) + + # 创建输出目录(如果不存在) + if not os.path.exists(page_output_dir): + os.makedirs(page_output_dir, exist_ok=True) + debug(f"创建输出目录: {page_output_dir}") + + try: + # 检查PDF文件是否存在 + if not os.path.exists(pdf_path): + error(f"PDF文件不存在: {pdf_path}") + return [] + + debug(f"开始渲染PDF: {pdf_path}") + debug(f"输出目录: {page_output_dir}") + debug(f"分辨率: {dpi} DPI, 格式: {image_format}") + + # 渲染PDF页面为图片 + pages = convert_from_path(pdf_path, dpi=dpi) + + debug(f"PDF总页数: {len(pages)}") + debug("📄 正在渲染 PDF 页面...") + + img_paths = [] + for i, page in enumerate(pages, start=1): + try: + # 生成图片文件路径 + img_path = os.path.join(page_output_dir, f"page_{i:03d}.{image_format.lower()}") + img_paths.append(img_path) + # 保存图片 + page.save(img_path, image_format) + debug(f"✅ 已保存 {img_path}") + + except Exception as e: + error(f"保存第 {i} 页失败: {e}") + continue + + debug(f"渲染完成: 成功保存{len(pages)} 页") + return img_paths + + except Exception as e: + error(f"渲染PDF失败: {e}") + return [] + + +def extract_images_from_word(doc_path, base_output_dir) -> List[str]: + """ + 从Word文档中提取所有图像 + + 参数: + doc_path (str): Word文档路径(.docx格式) + base_output_dir (str): 基础输出目录,会在此目录下创建以文档名命名的子文件夹 + + 返回: + int: 成功提取的图像数量 + """ + # 检查文件是否为.docx格式 + if not doc_path.lower().endswith('.docx'): + error(f"仅支持.docx格式的Word文档: {doc_path}") + return [] + + # 从文档路径提取文件名(不含扩展名) + doc_filename = safe_filename(Path(doc_path).stem) + + # 创建以文档名命名的子文件夹 + image_output_dir = os.path.join(base_output_dir, doc_filename) + + # 创建输出目录(如果不存在) + if not os.path.exists(image_output_dir): + os.makedirs(image_output_dir, exist_ok=True) + debug(f"创建输出目录: {image_output_dir}") + + try: + # 检查文档是否存在 + if not os.path.exists(doc_path): + error(f"Word文档不存在: {doc_path}") + return [] + + debug(f"开始从Word文档提取图像: {doc_path}") + debug(f"输出目录: {image_output_dir}") + + # 将.docx文件视为zip文件处理 + with zipfile.ZipFile(doc_path, 'r') as docx: + # 获取所有文件列表 + file_list = docx.namelist() + + # 筛选出图像文件(通常位于word/media/目录下) + image_files = [f for f in file_list if f.startswith('word/media/') and not f.endswith('/') and os.path.basename(f)] + + debug(f"找到 {len(image_files)} 个图像文件") + + img_paths = [] + for i, image_path in enumerate(image_files): + try: + # 提取图像文件名 + image_name = os.path.basename(image_path) + + # 确保文件名有效 + if not image_name or image_name == "media": + # 从路径中提取有意义的文件名 + parts = image_path.split('/') + for part in reversed(parts): + if part and part != "media": + image_name = part + break + else: + image_name = f"image_{i + 1}.png" + + # 添加文件扩展名如果缺失 + if not Path(image_name).suffix: + # 尝试从文件内容检测格式,否则使用默认png + image_name += ".png" + + # 生成输出文件路径 + output_path = os.path.join(image_output_dir, f"image_{i + 1:03d}_{image_name}") + img_paths.append(output_path) + # 提取并保存图像 + with docx.open(image_path) as image_file: + image_data = image_file.read() + + # 保存图像数据 + with open(output_path, 'wb') as f: + f.write(image_data) + + debug(f"✅ 已提取图像: {output_path}") + + except Exception as e: + error(f"提取图像 {image_path} 失败: {e}") + continue + + debug(f"Word文档图像提取完成: 成功提取 {len(image_files)} 个图像") + return img_paths + + except Exception as e: + error(f"提取Word文档图像失败: {e}") + return [] + + +def extract_images_from_ppt(ppt_path, base_output_dir) -> List[str]: + """ + 从PowerPoint演示文稿中提取所有图像 + + 参数: + ppt_path (str): PowerPoint文件路径(.pptx格式) + base_output_dir (str): 基础输出目录,会在此目录下创建以PPT名命名的子文件夹 + + 返回: + int: 成功提取的图像数量 + """ + # 检查文件是否为.pptx格式 + if not ppt_path.lower().endswith('.pptx'): + error(f"仅支持.pptx格式的PowerPoint文档: {ppt_path}") + return [] + + # 从PPT路径提取文件名(不含扩展名) + ppt_filename = safe_filename(Path(ppt_path).stem) + + # 创建以PPT名命名的子文件夹 + image_output_dir = os.path.join(base_output_dir, ppt_filename) + + # 创建输出目录(如果不存在) + if not os.path.exists(image_output_dir): + os.makedirs(image_output_dir, exist_ok=True) + debug(f"创建输出目录: {image_output_dir}") + + try: + # 检查PPT文件是否存在 + if not os.path.exists(ppt_path): + error(f"PowerPoint文档不存在: {ppt_path}") + return [] + + debug(f"开始从PowerPoint文档提取图像: {ppt_path}") + debug(f"输出目录: {image_output_dir}") + + # 将.pptx文件视为zip文件处理 + with zipfile.ZipFile(ppt_path, 'r') as pptx: + # 获取所有文件列表 + file_list = pptx.namelist() + + # 筛选出图像文件(通常位于ppt/media/目录下) + image_files = [f for f in file_list if f.startswith('ppt/media/') and not f.endswith('/') and os.path.basename(f)] + + debug(f"找到 {len(image_files)} 个图像文件") + + img_paths = [] + for i, image_path in enumerate(image_files): + try: + # 提取图像文件名 + image_name = Path(image_path).name + + # 验证文件名有效性 + if not image_name or image_name == "media": + parts = image_path.split('/') + for part in reversed(parts): + if part and part != "media": + image_name = part + break + else: + image_name = f"image_{i + 1}.png" + + # 确保有文件扩展名 + if not Path(image_name).suffix: + image_name += ".png" + + # 生成输出文件路径 + output_path = os.path.join(image_output_dir, f"image_{i + 1:03d}_{image_name}") + img_paths.append(output_path) + # 提取并保存图像 + with pptx.open(image_path) as image_file: + image_data = image_file.read() + + # 保存图像数据 + with open(output_path, 'wb') as f: + f.write(image_data) + + debug(f"✅ 已提取图像: {output_path}") + + except Exception as e: + error(f"提取图像 {image_path} 失败: {e}") + continue + + debug(f"PowerPoint文档图像提取完成: 成功提取{len(image_files)} 个图像") + return img_paths + + except Exception as e: + error(f"提取PowerPoint文档图像失败: {e}") + return [] + + +def extract_images_from_file(file_path, base_output_dir="/home/wangmeihua/kyrag/data/extracted_images", file_type=None): + """ + 通用函数:根据文件类型自动选择提取方法 + + 参数: + file_path (str): 文件路径 + base_output_dir (str): 基础输出目录 + file_type (str): 文件类型(可选,自动检测) + + 返回: + int: 成功提取的图像/页面数量 + """ + # 如果没有指定文件类型,根据扩展名自动检测 + if file_type is None: + ext = Path(file_path).suffix.lower() + if ext == '.pdf': + file_type = 'pdf' + elif ext == '.docx': + file_type = 'word' + elif ext == '.pptx': + file_type = 'ppt' + else: + error(f"不支持的文件类型: {ext}") + return [] + + # 根据文件类型调用相应的函数 + if file_type == 'pdf': + return render_pdf_to_images(file_path, base_output_dir) + elif file_type == 'word': + return extract_images_from_word(file_path, base_output_dir) + elif file_type == 'ppt': + return extract_images_from_ppt(file_path, base_output_dir) + else: + error(f"不支持的文件类型: {file_type}") + return [] + + +# 使用示例 +if __name__ == "__main__": + base_output_dir = "/home/wangmeihua/kyrag/data/extracted_images" + + # PDF文件处理 + pdf_path = "/home/wangmeihua/kyrag/22-zh-review.pdf" + pdf_imgs = extract_images_from_file(pdf_path, base_output_dir, 'pdf') + debug(f"pdf_imgs: {pdf_imgs}") + if len(pdf_imgs) > 0: + debug(f"成功处理PDF: {len(pdf_imgs)} 页") + else: + error("PDF处理失败") + + # Word文档处理 + doc_path = "/home/wangmeihua/kyrag/test.docx" + if os.path.exists(doc_path): + doc_imgs = extract_images_from_file(doc_path, base_output_dir, 'word') + debug(f"doc_imgs: {doc_imgs}") + if len(doc_imgs) > 0: + debug(f"成功处理Word文档: {len(doc_imgs)} 个图像") + else: + error("Word文档处理失败") + else: + debug(f"Word文档不存在: {doc_path}") + + # PowerPoint处理 + ppt_path = "/home/wangmeihua/kyrag/提示学习-王美华.pptx" + if os.path.exists(ppt_path): + ppt_imgs = extract_images_from_file(ppt_path, base_output_dir, 'ppt') + if len(ppt_imgs) > 0: + debug(f"成功处理PowerPoint: {len(ppt_imgs)} 个图像") + else: + error("PowerPoint处理失败") + else: + debug(f"PowerPoint文档不存在: {ppt_path}") \ No newline at end of file diff --git a/rag/transaction_manager.py b/rag/transaction_manager.py index 6303549..92d1294 100644 --- a/rag/transaction_manager.py +++ b/rag/transaction_manager.py @@ -20,7 +20,6 @@ class OperationType(Enum): VECTOR_SEARCH = "vector_search" RERANK = "rerank" - @dataclass class RollbackOperation: """回滚操作记录"""