rag
This commit is contained in:
parent
3a726ef958
commit
985c5a998a
341
rag/fileprocess.py
Normal file
341
rag/fileprocess.py
Normal file
@ -0,0 +1,341 @@
|
||||
import numpy as np
|
||||
import os
|
||||
import re
|
||||
from pdf2image import convert_from_path
|
||||
from appPublic.log import debug, error, info
|
||||
from pathlib import Path
|
||||
import zipfile
|
||||
import xml.etree.ElementTree as ET
|
||||
from PIL import Image
|
||||
from typing import List
|
||||
|
||||
# ==================== 新增:路径安全化函数 ====================
|
||||
def safe_filename(name: str) -> str:
|
||||
"""
|
||||
安全化文件名/目录名
|
||||
- 去除首尾空格
|
||||
- 多个空格 → 单空格
|
||||
- 非法字符 → 下划线
|
||||
- 空格 → 下划线(推荐!永不炸)
|
||||
"""
|
||||
name = name.strip()
|
||||
name = re.sub(r'\s+', ' ', name) # 多个空格合并
|
||||
name = re.sub(r'[<>:"/\\|?*]', '_', name) # 非法字符
|
||||
name = name.replace(' ', '_') # 空格 → 下划线(关键!)
|
||||
return name
|
||||
|
||||
def render_pdf_to_images(pdf_path, base_output_dir, dpi=200, image_format="PNG")-> List[str]:
|
||||
"""
|
||||
将PDF文件的每一页渲染为图片
|
||||
|
||||
参数:
|
||||
pdf_path (str): PDF文件路径
|
||||
page_output_dir (str): 输出图片的目录
|
||||
dpi (int): 图片分辨率,默认200
|
||||
image_format (str): 图片格式,默认PNG
|
||||
|
||||
返回:
|
||||
int: 成功渲染的页面数量
|
||||
"""
|
||||
pdf_filename = safe_filename(Path(pdf_path).stem)
|
||||
page_output_dir = os.path.join(base_output_dir, pdf_filename)
|
||||
|
||||
# 创建输出目录(如果不存在)
|
||||
if not os.path.exists(page_output_dir):
|
||||
os.makedirs(page_output_dir, exist_ok=True)
|
||||
debug(f"创建输出目录: {page_output_dir}")
|
||||
|
||||
try:
|
||||
# 检查PDF文件是否存在
|
||||
if not os.path.exists(pdf_path):
|
||||
error(f"PDF文件不存在: {pdf_path}")
|
||||
return []
|
||||
|
||||
debug(f"开始渲染PDF: {pdf_path}")
|
||||
debug(f"输出目录: {page_output_dir}")
|
||||
debug(f"分辨率: {dpi} DPI, 格式: {image_format}")
|
||||
|
||||
# 渲染PDF页面为图片
|
||||
pages = convert_from_path(pdf_path, dpi=dpi)
|
||||
|
||||
debug(f"PDF总页数: {len(pages)}")
|
||||
debug("📄 正在渲染 PDF 页面...")
|
||||
|
||||
img_paths = []
|
||||
for i, page in enumerate(pages, start=1):
|
||||
try:
|
||||
# 生成图片文件路径
|
||||
img_path = os.path.join(page_output_dir, f"page_{i:03d}.{image_format.lower()}")
|
||||
img_paths.append(img_path)
|
||||
# 保存图片
|
||||
page.save(img_path, image_format)
|
||||
debug(f"✅ 已保存 {img_path}")
|
||||
|
||||
except Exception as e:
|
||||
error(f"保存第 {i} 页失败: {e}")
|
||||
continue
|
||||
|
||||
debug(f"渲染完成: 成功保存{len(pages)} 页")
|
||||
return img_paths
|
||||
|
||||
except Exception as e:
|
||||
error(f"渲染PDF失败: {e}")
|
||||
return []
|
||||
|
||||
|
||||
def extract_images_from_word(doc_path, base_output_dir) -> List[str]:
|
||||
"""
|
||||
从Word文档中提取所有图像
|
||||
|
||||
参数:
|
||||
doc_path (str): Word文档路径(.docx格式)
|
||||
base_output_dir (str): 基础输出目录,会在此目录下创建以文档名命名的子文件夹
|
||||
|
||||
返回:
|
||||
int: 成功提取的图像数量
|
||||
"""
|
||||
# 检查文件是否为.docx格式
|
||||
if not doc_path.lower().endswith('.docx'):
|
||||
error(f"仅支持.docx格式的Word文档: {doc_path}")
|
||||
return []
|
||||
|
||||
# 从文档路径提取文件名(不含扩展名)
|
||||
doc_filename = safe_filename(Path(doc_path).stem)
|
||||
|
||||
# 创建以文档名命名的子文件夹
|
||||
image_output_dir = os.path.join(base_output_dir, doc_filename)
|
||||
|
||||
# 创建输出目录(如果不存在)
|
||||
if not os.path.exists(image_output_dir):
|
||||
os.makedirs(image_output_dir, exist_ok=True)
|
||||
debug(f"创建输出目录: {image_output_dir}")
|
||||
|
||||
try:
|
||||
# 检查文档是否存在
|
||||
if not os.path.exists(doc_path):
|
||||
error(f"Word文档不存在: {doc_path}")
|
||||
return []
|
||||
|
||||
debug(f"开始从Word文档提取图像: {doc_path}")
|
||||
debug(f"输出目录: {image_output_dir}")
|
||||
|
||||
# 将.docx文件视为zip文件处理
|
||||
with zipfile.ZipFile(doc_path, 'r') as docx:
|
||||
# 获取所有文件列表
|
||||
file_list = docx.namelist()
|
||||
|
||||
# 筛选出图像文件(通常位于word/media/目录下)
|
||||
image_files = [f for f in file_list if f.startswith('word/media/') and not f.endswith('/') and os.path.basename(f)]
|
||||
|
||||
debug(f"找到 {len(image_files)} 个图像文件")
|
||||
|
||||
img_paths = []
|
||||
for i, image_path in enumerate(image_files):
|
||||
try:
|
||||
# 提取图像文件名
|
||||
image_name = os.path.basename(image_path)
|
||||
|
||||
# 确保文件名有效
|
||||
if not image_name or image_name == "media":
|
||||
# 从路径中提取有意义的文件名
|
||||
parts = image_path.split('/')
|
||||
for part in reversed(parts):
|
||||
if part and part != "media":
|
||||
image_name = part
|
||||
break
|
||||
else:
|
||||
image_name = f"image_{i + 1}.png"
|
||||
|
||||
# 添加文件扩展名如果缺失
|
||||
if not Path(image_name).suffix:
|
||||
# 尝试从文件内容检测格式,否则使用默认png
|
||||
image_name += ".png"
|
||||
|
||||
# 生成输出文件路径
|
||||
output_path = os.path.join(image_output_dir, f"image_{i + 1:03d}_{image_name}")
|
||||
img_paths.append(output_path)
|
||||
# 提取并保存图像
|
||||
with docx.open(image_path) as image_file:
|
||||
image_data = image_file.read()
|
||||
|
||||
# 保存图像数据
|
||||
with open(output_path, 'wb') as f:
|
||||
f.write(image_data)
|
||||
|
||||
debug(f"✅ 已提取图像: {output_path}")
|
||||
|
||||
except Exception as e:
|
||||
error(f"提取图像 {image_path} 失败: {e}")
|
||||
continue
|
||||
|
||||
debug(f"Word文档图像提取完成: 成功提取 {len(image_files)} 个图像")
|
||||
return img_paths
|
||||
|
||||
except Exception as e:
|
||||
error(f"提取Word文档图像失败: {e}")
|
||||
return []
|
||||
|
||||
|
||||
def extract_images_from_ppt(ppt_path, base_output_dir) -> List[str]:
|
||||
"""
|
||||
从PowerPoint演示文稿中提取所有图像
|
||||
|
||||
参数:
|
||||
ppt_path (str): PowerPoint文件路径(.pptx格式)
|
||||
base_output_dir (str): 基础输出目录,会在此目录下创建以PPT名命名的子文件夹
|
||||
|
||||
返回:
|
||||
int: 成功提取的图像数量
|
||||
"""
|
||||
# 检查文件是否为.pptx格式
|
||||
if not ppt_path.lower().endswith('.pptx'):
|
||||
error(f"仅支持.pptx格式的PowerPoint文档: {ppt_path}")
|
||||
return []
|
||||
|
||||
# 从PPT路径提取文件名(不含扩展名)
|
||||
ppt_filename = safe_filename(Path(ppt_path).stem)
|
||||
|
||||
# 创建以PPT名命名的子文件夹
|
||||
image_output_dir = os.path.join(base_output_dir, ppt_filename)
|
||||
|
||||
# 创建输出目录(如果不存在)
|
||||
if not os.path.exists(image_output_dir):
|
||||
os.makedirs(image_output_dir, exist_ok=True)
|
||||
debug(f"创建输出目录: {image_output_dir}")
|
||||
|
||||
try:
|
||||
# 检查PPT文件是否存在
|
||||
if not os.path.exists(ppt_path):
|
||||
error(f"PowerPoint文档不存在: {ppt_path}")
|
||||
return []
|
||||
|
||||
debug(f"开始从PowerPoint文档提取图像: {ppt_path}")
|
||||
debug(f"输出目录: {image_output_dir}")
|
||||
|
||||
# 将.pptx文件视为zip文件处理
|
||||
with zipfile.ZipFile(ppt_path, 'r') as pptx:
|
||||
# 获取所有文件列表
|
||||
file_list = pptx.namelist()
|
||||
|
||||
# 筛选出图像文件(通常位于ppt/media/目录下)
|
||||
image_files = [f for f in file_list if f.startswith('ppt/media/') and not f.endswith('/') and os.path.basename(f)]
|
||||
|
||||
debug(f"找到 {len(image_files)} 个图像文件")
|
||||
|
||||
img_paths = []
|
||||
for i, image_path in enumerate(image_files):
|
||||
try:
|
||||
# 提取图像文件名
|
||||
image_name = Path(image_path).name
|
||||
|
||||
# 验证文件名有效性
|
||||
if not image_name or image_name == "media":
|
||||
parts = image_path.split('/')
|
||||
for part in reversed(parts):
|
||||
if part and part != "media":
|
||||
image_name = part
|
||||
break
|
||||
else:
|
||||
image_name = f"image_{i + 1}.png"
|
||||
|
||||
# 确保有文件扩展名
|
||||
if not Path(image_name).suffix:
|
||||
image_name += ".png"
|
||||
|
||||
# 生成输出文件路径
|
||||
output_path = os.path.join(image_output_dir, f"image_{i + 1:03d}_{image_name}")
|
||||
img_paths.append(output_path)
|
||||
# 提取并保存图像
|
||||
with pptx.open(image_path) as image_file:
|
||||
image_data = image_file.read()
|
||||
|
||||
# 保存图像数据
|
||||
with open(output_path, 'wb') as f:
|
||||
f.write(image_data)
|
||||
|
||||
debug(f"✅ 已提取图像: {output_path}")
|
||||
|
||||
except Exception as e:
|
||||
error(f"提取图像 {image_path} 失败: {e}")
|
||||
continue
|
||||
|
||||
debug(f"PowerPoint文档图像提取完成: 成功提取{len(image_files)} 个图像")
|
||||
return img_paths
|
||||
|
||||
except Exception as e:
|
||||
error(f"提取PowerPoint文档图像失败: {e}")
|
||||
return []
|
||||
|
||||
|
||||
def extract_images_from_file(file_path, base_output_dir="/home/wangmeihua/kyrag/data/extracted_images", file_type=None):
|
||||
"""
|
||||
通用函数:根据文件类型自动选择提取方法
|
||||
|
||||
参数:
|
||||
file_path (str): 文件路径
|
||||
base_output_dir (str): 基础输出目录
|
||||
file_type (str): 文件类型(可选,自动检测)
|
||||
|
||||
返回:
|
||||
int: 成功提取的图像/页面数量
|
||||
"""
|
||||
# 如果没有指定文件类型,根据扩展名自动检测
|
||||
if file_type is None:
|
||||
ext = Path(file_path).suffix.lower()
|
||||
if ext == '.pdf':
|
||||
file_type = 'pdf'
|
||||
elif ext == '.docx':
|
||||
file_type = 'word'
|
||||
elif ext == '.pptx':
|
||||
file_type = 'ppt'
|
||||
else:
|
||||
error(f"不支持的文件类型: {ext}")
|
||||
return []
|
||||
|
||||
# 根据文件类型调用相应的函数
|
||||
if file_type == 'pdf':
|
||||
return render_pdf_to_images(file_path, base_output_dir)
|
||||
elif file_type == 'word':
|
||||
return extract_images_from_word(file_path, base_output_dir)
|
||||
elif file_type == 'ppt':
|
||||
return extract_images_from_ppt(file_path, base_output_dir)
|
||||
else:
|
||||
error(f"不支持的文件类型: {file_type}")
|
||||
return []
|
||||
|
||||
|
||||
# 使用示例
|
||||
if __name__ == "__main__":
|
||||
base_output_dir = "/home/wangmeihua/kyrag/data/extracted_images"
|
||||
|
||||
# PDF文件处理
|
||||
pdf_path = "/home/wangmeihua/kyrag/22-zh-review.pdf"
|
||||
pdf_imgs = extract_images_from_file(pdf_path, base_output_dir, 'pdf')
|
||||
debug(f"pdf_imgs: {pdf_imgs}")
|
||||
if len(pdf_imgs) > 0:
|
||||
debug(f"成功处理PDF: {len(pdf_imgs)} 页")
|
||||
else:
|
||||
error("PDF处理失败")
|
||||
|
||||
# Word文档处理
|
||||
doc_path = "/home/wangmeihua/kyrag/test.docx"
|
||||
if os.path.exists(doc_path):
|
||||
doc_imgs = extract_images_from_file(doc_path, base_output_dir, 'word')
|
||||
debug(f"doc_imgs: {doc_imgs}")
|
||||
if len(doc_imgs) > 0:
|
||||
debug(f"成功处理Word文档: {len(doc_imgs)} 个图像")
|
||||
else:
|
||||
error("Word文档处理失败")
|
||||
else:
|
||||
debug(f"Word文档不存在: {doc_path}")
|
||||
|
||||
# PowerPoint处理
|
||||
ppt_path = "/home/wangmeihua/kyrag/提示学习-王美华.pptx"
|
||||
if os.path.exists(ppt_path):
|
||||
ppt_imgs = extract_images_from_file(ppt_path, base_output_dir, 'ppt')
|
||||
if len(ppt_imgs) > 0:
|
||||
debug(f"成功处理PowerPoint: {len(ppt_imgs)} 个图像")
|
||||
else:
|
||||
error("PowerPoint处理失败")
|
||||
else:
|
||||
debug(f"PowerPoint文档不存在: {ppt_path}")
|
||||
@ -20,7 +20,6 @@ class OperationType(Enum):
|
||||
VECTOR_SEARCH = "vector_search"
|
||||
RERANK = "rerank"
|
||||
|
||||
|
||||
@dataclass
|
||||
class RollbackOperation:
|
||||
"""回滚操作记录"""
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user