llama-index
195 строк · 5.7 Кб
1"""Docs parser.
2
3Contains parsers for docx, pdf files.
4
5"""
6
7import struct8import zlib9from pathlib import Path10from typing import Any, Dict, List, Optional11
12from llama_index.legacy.readers.base import BaseReader13from llama_index.legacy.schema import Document14
15
16class PDFReader(BaseReader):17"""PDF parser."""18
19def __init__(self, return_full_document: Optional[bool] = False) -> None:20"""21Initialize PDFReader.
22"""
23self.return_full_document = return_full_document24
25def load_data(26self, file: Path, extra_info: Optional[Dict] = None27) -> List[Document]:28"""Parse file."""29try:30import pypdf31except ImportError:32raise ImportError(33"pypdf is required to read PDF files: `pip install pypdf`"34)35with open(file, "rb") as fp:36# Create a PDF object37pdf = pypdf.PdfReader(fp)38
39# Get the number of pages in the PDF document40num_pages = len(pdf.pages)41
42docs = []43
44# This block returns a whole PDF as a single Document45if self.return_full_document:46text = ""47metadata = {"file_name": fp.name}48
49for page in range(num_pages):50# Extract the text from the page51page_text = pdf.pages[page].extract_text()52text += page_text53
54docs.append(Document(text=text, metadata=metadata))55
56# This block returns each page of a PDF as its own Document57else:58# Iterate over every page59
60for page in range(num_pages):61# Extract the text from the page62page_text = pdf.pages[page].extract_text()63page_label = pdf.page_labels[page]64
65metadata = {"page_label": page_label, "file_name": fp.name}66if extra_info is not None:67metadata.update(extra_info)68
69docs.append(Document(text=page_text, metadata=metadata))70
71return docs72
73
74class DocxReader(BaseReader):75"""Docx parser."""76
77def load_data(78self, file: Path, extra_info: Optional[Dict] = None79) -> List[Document]:80"""Parse file."""81try:82import docx2txt83except ImportError:84raise ImportError(85"docx2txt is required to read Microsoft Word files: "86"`pip install docx2txt`"87)88
89text = docx2txt.process(file)90metadata = {"file_name": file.name}91if extra_info is not None:92metadata.update(extra_info)93
94return [Document(text=text, metadata=metadata or {})]95
96
97class HWPReader(BaseReader):98"""Hwp Parser."""99
100def __init__(self, *args: Any, **kwargs: Any) -> None:101super().__init__(*args, **kwargs)102self.FILE_HEADER_SECTION = "FileHeader"103self.HWP_SUMMARY_SECTION = "\x05HwpSummaryInformation"104self.SECTION_NAME_LENGTH = len("Section")105self.BODYTEXT_SECTION = "BodyText"106self.HWP_TEXT_TAGS = [67]107self.text = ""108
109def load_data(110self, file: Path, extra_info: Optional[Dict] = None111) -> List[Document]:112"""Load data and extract table from Hwp file.113
114Args:
115file (Path): Path for the Hwp file.
116
117Returns:
118List[Document]
119"""
120import olefile121
122load_file = olefile.OleFileIO(file)123file_dir = load_file.listdir()124if self.is_valid(file_dir) is False:125raise Exception("Not Valid HwpFile")126
127result_text = self._get_text(load_file, file_dir)128result = self._text_to_document(text=result_text, extra_info=extra_info)129return [result]130
131def is_valid(self, dirs: List[str]) -> bool:132if [self.FILE_HEADER_SECTION] not in dirs:133return False134
135return [self.HWP_SUMMARY_SECTION] in dirs136
137def get_body_sections(self, dirs: List[str]) -> List[str]:138m = []139for d in dirs:140if d[0] == self.BODYTEXT_SECTION:141m.append(int(d[1][self.SECTION_NAME_LENGTH :]))142
143return ["BodyText/Section" + str(x) for x in sorted(m)]144
145def _text_to_document(146self, text: str, extra_info: Optional[Dict] = None147) -> Document:148return Document(text=text, extra_info=extra_info or {})149
150def get_text(self) -> str:151return self.text152
153# 전체 text 추출154
155def _get_text(self, load_file: Any, file_dirs: List[str]) -> str:156sections = self.get_body_sections(file_dirs)157text = ""158for section in sections:159text += self.get_text_from_section(load_file, section)160text += "\n"161
162self.text = text163return self.text164
165def is_compressed(self, load_file: Any) -> bool:166header = load_file.openstream("FileHeader")167header_data = header.read()168return (header_data[36] & 1) == 1169
170def get_text_from_section(self, load_file: Any, section: str) -> str:171bodytext = load_file.openstream(section)172data = bodytext.read()173
174unpacked_data = (175zlib.decompress(data, -15) if self.is_compressed(load_file) else data176)177size = len(unpacked_data)178
179i = 0180
181text = ""182while i < size:183header = struct.unpack_from("<I", unpacked_data, i)[0]184rec_type = header & 0x3FF185(header >> 10) & 0x3FF186rec_len = (header >> 20) & 0xFFF187
188if rec_type in self.HWP_TEXT_TAGS:189rec_data = unpacked_data[i + 4 : i + 4 + rec_len]190text += rec_data.decode("utf-16")191text += "\n"192
193i += 4 + rec_len194
195return text196