llama-index

Форк
0
195 строк · 5.7 Кб
1
"""Docs parser.
2

3
Contains parsers for docx, pdf files.
4

5
"""
6

7
import struct
8
import zlib
9
from pathlib import Path
10
from typing import Any, Dict, List, Optional
11

12
from llama_index.legacy.readers.base import BaseReader
13
from llama_index.legacy.schema import Document
14

15

16
class PDFReader(BaseReader):
17
    """PDF parser."""
18

19
    def __init__(self, return_full_document: Optional[bool] = False) -> None:
20
        """
21
        Initialize PDFReader.
22
        """
23
        self.return_full_document = return_full_document
24

25
    def load_data(
26
        self, file: Path, extra_info: Optional[Dict] = None
27
    ) -> List[Document]:
28
        """Parse file."""
29
        try:
30
            import pypdf
31
        except ImportError:
32
            raise ImportError(
33
                "pypdf is required to read PDF files: `pip install pypdf`"
34
            )
35
        with open(file, "rb") as fp:
36
            # Create a PDF object
37
            pdf = pypdf.PdfReader(fp)
38

39
            # Get the number of pages in the PDF document
40
            num_pages = len(pdf.pages)
41

42
            docs = []
43

44
            # This block returns a whole PDF as a single Document
45
            if self.return_full_document:
46
                text = ""
47
                metadata = {"file_name": fp.name}
48

49
                for page in range(num_pages):
50
                    # Extract the text from the page
51
                    page_text = pdf.pages[page].extract_text()
52
                    text += page_text
53

54
                docs.append(Document(text=text, metadata=metadata))
55

56
            # This block returns each page of a PDF as its own Document
57
            else:
58
                # Iterate over every page
59

60
                for page in range(num_pages):
61
                    # Extract the text from the page
62
                    page_text = pdf.pages[page].extract_text()
63
                    page_label = pdf.page_labels[page]
64

65
                    metadata = {"page_label": page_label, "file_name": fp.name}
66
                    if extra_info is not None:
67
                        metadata.update(extra_info)
68

69
                    docs.append(Document(text=page_text, metadata=metadata))
70

71
            return docs
72

73

74
class DocxReader(BaseReader):
75
    """Docx parser."""
76

77
    def load_data(
78
        self, file: Path, extra_info: Optional[Dict] = None
79
    ) -> List[Document]:
80
        """Parse file."""
81
        try:
82
            import docx2txt
83
        except ImportError:
84
            raise ImportError(
85
                "docx2txt is required to read Microsoft Word files: "
86
                "`pip install docx2txt`"
87
            )
88

89
        text = docx2txt.process(file)
90
        metadata = {"file_name": file.name}
91
        if extra_info is not None:
92
            metadata.update(extra_info)
93

94
        return [Document(text=text, metadata=metadata or {})]
95

96

97
class HWPReader(BaseReader):
98
    """Hwp Parser."""
99

100
    def __init__(self, *args: Any, **kwargs: Any) -> None:
101
        super().__init__(*args, **kwargs)
102
        self.FILE_HEADER_SECTION = "FileHeader"
103
        self.HWP_SUMMARY_SECTION = "\x05HwpSummaryInformation"
104
        self.SECTION_NAME_LENGTH = len("Section")
105
        self.BODYTEXT_SECTION = "BodyText"
106
        self.HWP_TEXT_TAGS = [67]
107
        self.text = ""
108

109
    def load_data(
110
        self, file: Path, extra_info: Optional[Dict] = None
111
    ) -> List[Document]:
112
        """Load data and extract table from Hwp file.
113

114
        Args:
115
            file (Path): Path for the Hwp file.
116

117
        Returns:
118
            List[Document]
119
        """
120
        import olefile
121

122
        load_file = olefile.OleFileIO(file)
123
        file_dir = load_file.listdir()
124
        if self.is_valid(file_dir) is False:
125
            raise Exception("Not Valid HwpFile")
126

127
        result_text = self._get_text(load_file, file_dir)
128
        result = self._text_to_document(text=result_text, extra_info=extra_info)
129
        return [result]
130

131
    def is_valid(self, dirs: List[str]) -> bool:
132
        if [self.FILE_HEADER_SECTION] not in dirs:
133
            return False
134

135
        return [self.HWP_SUMMARY_SECTION] in dirs
136

137
    def get_body_sections(self, dirs: List[str]) -> List[str]:
138
        m = []
139
        for d in dirs:
140
            if d[0] == self.BODYTEXT_SECTION:
141
                m.append(int(d[1][self.SECTION_NAME_LENGTH :]))
142

143
        return ["BodyText/Section" + str(x) for x in sorted(m)]
144

145
    def _text_to_document(
146
        self, text: str, extra_info: Optional[Dict] = None
147
    ) -> Document:
148
        return Document(text=text, extra_info=extra_info or {})
149

150
    def get_text(self) -> str:
151
        return self.text
152

153
        # 전체 text 추출
154

155
    def _get_text(self, load_file: Any, file_dirs: List[str]) -> str:
156
        sections = self.get_body_sections(file_dirs)
157
        text = ""
158
        for section in sections:
159
            text += self.get_text_from_section(load_file, section)
160
            text += "\n"
161

162
        self.text = text
163
        return self.text
164

165
    def is_compressed(self, load_file: Any) -> bool:
166
        header = load_file.openstream("FileHeader")
167
        header_data = header.read()
168
        return (header_data[36] & 1) == 1
169

170
    def get_text_from_section(self, load_file: Any, section: str) -> str:
171
        bodytext = load_file.openstream(section)
172
        data = bodytext.read()
173

174
        unpacked_data = (
175
            zlib.decompress(data, -15) if self.is_compressed(load_file) else data
176
        )
177
        size = len(unpacked_data)
178

179
        i = 0
180

181
        text = ""
182
        while i < size:
183
            header = struct.unpack_from("<I", unpacked_data, i)[0]
184
            rec_type = header & 0x3FF
185
            (header >> 10) & 0x3FF
186
            rec_len = (header >> 20) & 0xFFF
187

188
            if rec_type in self.HWP_TEXT_TAGS:
189
                rec_data = unpacked_data[i + 4 : i + 4 + rec_len]
190
                text += rec_data.decode("utf-16")
191
                text += "\n"
192

193
            i += 4 + rec_len
194

195
        return text
196

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.