llama-index

Форк
0
427 строк · 15.4 Кб
1
"""Simple reader that reads files of different formats from a directory."""
2

3
import logging
4
import mimetypes
5
import multiprocessing
6
import os
7
import warnings
8
from datetime import datetime
9
from functools import reduce
10
from itertools import repeat
11
from pathlib import Path
12
from typing import Any, Callable, Dict, Generator, List, Optional, Type
13

14
from tqdm import tqdm
15

16
from llama_index.legacy.readers.base import BaseReader
17
from llama_index.legacy.readers.file.docs_reader import DocxReader, HWPReader, PDFReader
18
from llama_index.legacy.readers.file.epub_reader import EpubReader
19
from llama_index.legacy.readers.file.image_reader import ImageReader
20
from llama_index.legacy.readers.file.ipynb_reader import IPYNBReader
21
from llama_index.legacy.readers.file.markdown_reader import MarkdownReader
22
from llama_index.legacy.readers.file.mbox_reader import MboxReader
23
from llama_index.legacy.readers.file.slides_reader import PptxReader
24
from llama_index.legacy.readers.file.tabular_reader import PandasCSVReader
25
from llama_index.legacy.readers.file.video_audio_reader import VideoAudioReader
26
from llama_index.legacy.schema import Document
27

28
DEFAULT_FILE_READER_CLS: Dict[str, Type[BaseReader]] = {
29
    ".hwp": HWPReader,
30
    ".pdf": PDFReader,
31
    ".docx": DocxReader,
32
    ".pptx": PptxReader,
33
    ".ppt": PptxReader,
34
    ".pptm": PptxReader,
35
    ".jpg": ImageReader,
36
    ".png": ImageReader,
37
    ".jpeg": ImageReader,
38
    ".mp3": VideoAudioReader,
39
    ".mp4": VideoAudioReader,
40
    ".csv": PandasCSVReader,
41
    ".epub": EpubReader,
42
    ".md": MarkdownReader,
43
    ".mbox": MboxReader,
44
    ".ipynb": IPYNBReader,
45
}
46

47

48
def default_file_metadata_func(file_path: str) -> Dict:
49
    """Get some handy metadate from filesystem.
50

51
    Args:
52
        file_path: str: file path in str
53
    """
54
    return {
55
        "file_path": file_path,
56
        "file_name": os.path.basename(file_path),
57
        "file_type": mimetypes.guess_type(file_path)[0],
58
        "file_size": os.path.getsize(file_path),
59
        "creation_date": datetime.fromtimestamp(
60
            Path(file_path).stat().st_ctime
61
        ).strftime("%Y-%m-%d"),
62
        "last_modified_date": datetime.fromtimestamp(
63
            Path(file_path).stat().st_mtime
64
        ).strftime("%Y-%m-%d"),
65
        "last_accessed_date": datetime.fromtimestamp(
66
            Path(file_path).stat().st_atime
67
        ).strftime("%Y-%m-%d"),
68
    }
69

70

71
logger = logging.getLogger(__name__)
72

73

74
class SimpleDirectoryReader(BaseReader):
75
    """Simple directory reader.
76

77
    Load files from file directory.
78
    Automatically select the best file reader given file extensions.
79

80
    Args:
81
        input_dir (str): Path to the directory.
82
        input_files (List): List of file paths to read
83
            (Optional; overrides input_dir, exclude)
84
        exclude (List): glob of python file paths to exclude (Optional)
85
        exclude_hidden (bool): Whether to exclude hidden files (dotfiles).
86
        encoding (str): Encoding of the files.
87
            Default is utf-8.
88
        errors (str): how encoding and decoding errors are to be handled,
89
              see https://docs.python.org/3/library/functions.html#open
90
        recursive (bool): Whether to recursively search in subdirectories.
91
            False by default.
92
        filename_as_id (bool): Whether to use the filename as the document id.
93
            False by default.
94
        required_exts (Optional[List[str]]): List of required extensions.
95
            Default is None.
96
        file_extractor (Optional[Dict[str, BaseReader]]): A mapping of file
97
            extension to a BaseReader class that specifies how to convert that file
98
            to text. If not specified, use default from DEFAULT_FILE_READER_CLS.
99
        num_files_limit (Optional[int]): Maximum number of files to read.
100
            Default is None.
101
        file_metadata (Optional[Callable[str, Dict]]): A function that takes
102
            in a filename and returns a Dict of metadata for the Document.
103
            Default is None.
104
    """
105

106
    supported_suffix = list(DEFAULT_FILE_READER_CLS.keys())
107

108
    def __init__(
109
        self,
110
        input_dir: Optional[str] = None,
111
        input_files: Optional[List] = None,
112
        exclude: Optional[List] = None,
113
        exclude_hidden: bool = True,
114
        errors: str = "ignore",
115
        recursive: bool = False,
116
        encoding: str = "utf-8",
117
        filename_as_id: bool = False,
118
        required_exts: Optional[List[str]] = None,
119
        file_extractor: Optional[Dict[str, BaseReader]] = None,
120
        num_files_limit: Optional[int] = None,
121
        file_metadata: Optional[Callable[[str], Dict]] = None,
122
    ) -> None:
123
        """Initialize with parameters."""
124
        super().__init__()
125

126
        if not input_dir and not input_files:
127
            raise ValueError("Must provide either `input_dir` or `input_files`.")
128

129
        self.errors = errors
130
        self.encoding = encoding
131

132
        self.exclude = exclude
133
        self.recursive = recursive
134
        self.exclude_hidden = exclude_hidden
135
        self.required_exts = required_exts
136
        self.num_files_limit = num_files_limit
137

138
        if input_files:
139
            self.input_files = []
140
            for path in input_files:
141
                if not os.path.isfile(path):
142
                    raise ValueError(f"File {path} does not exist.")
143
                input_file = Path(path)
144
                self.input_files.append(input_file)
145
        elif input_dir:
146
            if not os.path.isdir(input_dir):
147
                raise ValueError(f"Directory {input_dir} does not exist.")
148
            self.input_dir = Path(input_dir)
149
            self.exclude = exclude
150
            self.input_files = self._add_files(self.input_dir)
151

152
        if file_extractor is not None:
153
            self.file_extractor = file_extractor
154
        else:
155
            self.file_extractor = {}
156

157
        self.file_metadata = file_metadata or default_file_metadata_func
158
        self.filename_as_id = filename_as_id
159

160
    def is_hidden(self, path: Path) -> bool:
161
        return any(
162
            part.startswith(".") and part not in [".", ".."] for part in path.parts
163
        )
164

165
    def _add_files(self, input_dir: Path) -> List[Path]:
166
        """Add files."""
167
        all_files = set()
168
        rejected_files = set()
169

170
        if self.exclude is not None:
171
            for excluded_pattern in self.exclude:
172
                if self.recursive:
173
                    # Recursive glob
174
                    for file in input_dir.rglob(excluded_pattern):
175
                        rejected_files.add(Path(file))
176
                else:
177
                    # Non-recursive glob
178
                    for file in input_dir.glob(excluded_pattern):
179
                        rejected_files.add(Path(file))
180

181
        file_refs: Generator[Path, None, None]
182
        if self.recursive:
183
            file_refs = Path(input_dir).rglob("*")
184
        else:
185
            file_refs = Path(input_dir).glob("*")
186

187
        for ref in file_refs:
188
            # Manually check if file is hidden or directory instead of
189
            # in glob for backwards compatibility.
190
            is_dir = ref.is_dir()
191
            skip_because_hidden = self.exclude_hidden and self.is_hidden(ref)
192
            skip_because_bad_ext = (
193
                self.required_exts is not None and ref.suffix not in self.required_exts
194
            )
195
            skip_because_excluded = ref in rejected_files
196

197
            if (
198
                is_dir
199
                or skip_because_hidden
200
                or skip_because_bad_ext
201
                or skip_because_excluded
202
            ):
203
                continue
204
            else:
205
                all_files.add(ref)
206

207
        new_input_files = sorted(all_files)
208

209
        if len(new_input_files) == 0:
210
            raise ValueError(f"No files found in {input_dir}.")
211

212
        if self.num_files_limit is not None and self.num_files_limit > 0:
213
            new_input_files = new_input_files[0 : self.num_files_limit]
214

215
        # print total number of files added
216
        logger.debug(
217
            f"> [SimpleDirectoryReader] Total files added: {len(new_input_files)}"
218
        )
219

220
        return new_input_files
221

222
    def _exclude_metadata(self, documents: List[Document]) -> List[Document]:
223
        """Exclude metadata from documents.
224

225
        Args:
226
            documents (List[Document]): List of documents.
227
        """
228
        for doc in documents:
229
            # Keep only metadata['file_path'] in both embedding and llm content
230
            # str, which contain extreme important context that about the chunks.
231
            # Dates is provided for convenience of postprocessor such as
232
            # TimeWeightedPostprocessor, but excluded for embedding and LLMprompts
233
            doc.excluded_embed_metadata_keys.extend(
234
                [
235
                    "file_name",
236
                    "file_type",
237
                    "file_size",
238
                    "creation_date",
239
                    "last_modified_date",
240
                    "last_accessed_date",
241
                ]
242
            )
243
            doc.excluded_llm_metadata_keys.extend(
244
                [
245
                    "file_name",
246
                    "file_type",
247
                    "file_size",
248
                    "creation_date",
249
                    "last_modified_date",
250
                    "last_accessed_date",
251
                ]
252
            )
253

254
        return documents
255

256
    @staticmethod
257
    def load_file(
258
        input_file: Path,
259
        file_metadata: Callable[[str], Dict],
260
        file_extractor: Dict[str, BaseReader],
261
        filename_as_id: bool = False,
262
        encoding: str = "utf-8",
263
        errors: str = "ignore",
264
    ) -> List[Document]:
265
        """Static method for loading file.
266

267
        NOTE: necessarily as a static method for parallel processing.
268

269
        Args:
270
            input_file (Path): _description_
271
            file_metadata (Callable[[str], Dict]): _description_
272
            file_extractor (Dict[str, BaseReader]): _description_
273
            filename_as_id (bool, optional): _description_. Defaults to False.
274
            encoding (str, optional): _description_. Defaults to "utf-8".
275
            errors (str, optional): _description_. Defaults to "ignore".
276

277
        input_file (Path): File path to read
278
        file_metadata ([Callable[str, Dict]]): A function that takes
279
            in a filename and returns a Dict of metadata for the Document.
280
        file_extractor (Dict[str, BaseReader]): A mapping of file
281
            extension to a BaseReader class that specifies how to convert that file
282
            to text.
283
        filename_as_id (bool): Whether to use the filename as the document id.
284
        encoding (str): Encoding of the files.
285
            Default is utf-8.
286
        errors (str): how encoding and decoding errors are to be handled,
287
              see https://docs.python.org/3/library/functions.html#open
288

289
        Returns:
290
            List[Document]: loaded documents
291
        """
292
        metadata: Optional[dict] = None
293
        documents: List[Document] = []
294

295
        if file_metadata is not None:
296
            metadata = file_metadata(str(input_file))
297

298
        file_suffix = input_file.suffix.lower()
299
        if (
300
            file_suffix in SimpleDirectoryReader.supported_suffix
301
            or file_suffix in file_extractor
302
        ):
303
            # use file readers
304
            if file_suffix not in file_extractor:
305
                # instantiate file reader if not already
306
                reader_cls = DEFAULT_FILE_READER_CLS[file_suffix]
307
                file_extractor[file_suffix] = reader_cls()
308
            reader = file_extractor[file_suffix]
309

310
            # load data -- catch all errors except for ImportError
311
            try:
312
                docs = reader.load_data(input_file, extra_info=metadata)
313
            except ImportError as e:
314
                # ensure that ImportError is raised so user knows
315
                # about missing dependencies
316
                raise ImportError(str(e))
317
            except Exception as e:
318
                # otherwise, just skip the file and report the error
319
                print(
320
                    f"Failed to load file {input_file} with error: {e}. Skipping...",
321
                    flush=True,
322
                )
323
                return []
324

325
            # iterate over docs if needed
326
            if filename_as_id:
327
                for i, doc in enumerate(docs):
328
                    doc.id_ = f"{input_file!s}_part_{i}"
329

330
            documents.extend(docs)
331
        else:
332
            # do standard read
333
            with open(input_file, errors=errors, encoding=encoding) as f:
334
                data = f.read()
335

336
            doc = Document(text=data, metadata=metadata or {})
337
            if filename_as_id:
338
                doc.id_ = str(input_file)
339

340
            documents.append(doc)
341

342
        return documents
343

344
    def load_data(
345
        self, show_progress: bool = False, num_workers: Optional[int] = None
346
    ) -> List[Document]:
347
        """Load data from the input directory.
348

349
        Args:
350
            show_progress (bool): Whether to show tqdm progress bars. Defaults to False.
351

352
        Returns:
353
            List[Document]: A list of documents.
354
        """
355
        documents = []
356

357
        files_to_process = self.input_files
358

359
        if num_workers and num_workers > 1:
360
            if num_workers > multiprocessing.cpu_count():
361
                warnings.warn(
362
                    "Specified num_workers exceed number of CPUs in the system. "
363
                    "Setting `num_workers` down to the maximum CPU count."
364
                )
365
            with multiprocessing.get_context("spawn").Pool(num_workers) as p:
366
                results = p.starmap(
367
                    SimpleDirectoryReader.load_file,
368
                    zip(
369
                        files_to_process,
370
                        repeat(self.file_metadata),
371
                        repeat(self.file_extractor),
372
                        repeat(self.filename_as_id),
373
                        repeat(self.encoding),
374
                        repeat(self.errors),
375
                    ),
376
                )
377
                documents = reduce(lambda x, y: x + y, results)
378

379
        else:
380
            if show_progress:
381
                files_to_process = tqdm(
382
                    self.input_files, desc="Loading files", unit="file"
383
                )
384
            for input_file in files_to_process:
385
                documents.extend(
386
                    SimpleDirectoryReader.load_file(
387
                        input_file=input_file,
388
                        file_metadata=self.file_metadata,
389
                        file_extractor=self.file_extractor,
390
                        filename_as_id=self.filename_as_id,
391
                        encoding=self.encoding,
392
                        errors=self.errors,
393
                    )
394
                )
395

396
        return self._exclude_metadata(documents)
397

398
    def iter_data(
399
        self, show_progress: bool = False
400
    ) -> Generator[List[Document], Any, Any]:
401
        """Load data iteratively from the input directory.
402

403
        Args:
404
            show_progress (bool): Whether to show tqdm progress bars. Defaults to False.
405

406
        Returns:
407
            Generator[List[Document]]: A list of documents.
408
        """
409
        files_to_process = self.input_files
410

411
        if show_progress:
412
            files_to_process = tqdm(self.input_files, desc="Loading files", unit="file")
413

414
        for input_file in files_to_process:
415
            documents = SimpleDirectoryReader.load_file(
416
                input_file=input_file,
417
                file_metadata=self.file_metadata,
418
                file_extractor=self.file_extractor,
419
                filename_as_id=self.filename_as_id,
420
                encoding=self.encoding,
421
                errors=self.errors,
422
            )
423

424
            documents = self._exclude_metadata(documents)
425

426
            if len(documents) > 0:
427
                yield documents
428

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.