llama-index
114 строк · 3.6 Кб
1"""Markdown parser.
2
3Contains parser for md files.
4
5"""
6
7import re8from pathlib import Path9from typing import Any, Dict, List, Optional, Tuple, cast10
11from llama_index.legacy.readers.base import BaseReader12from llama_index.legacy.schema import Document13
14
15class MarkdownReader(BaseReader):16"""Markdown parser.17
18Extract text from markdown files.
19Returns dictionary with keys as headers and values as the text between headers.
20
21"""
22
23def __init__(24self,25*args: Any,26remove_hyperlinks: bool = True,27remove_images: bool = True,28**kwargs: Any,29) -> None:30"""Init params."""31super().__init__(*args, **kwargs)32self._remove_hyperlinks = remove_hyperlinks33self._remove_images = remove_images34
35def markdown_to_tups(self, markdown_text: str) -> List[Tuple[Optional[str], str]]:36"""Convert a markdown file to a dictionary.37
38The keys are the headers and the values are the text under each header.
39
40"""
41markdown_tups: List[Tuple[Optional[str], str]] = []42lines = markdown_text.split("\n")43
44current_header = None45current_text = ""46
47for line in lines:48header_match = re.match(r"^#+\s", line)49if header_match:50if current_header is not None:51if current_text == "" or None:52continue53markdown_tups.append((current_header, current_text))54
55current_header = line56current_text = ""57else:58current_text += line + "\n"59markdown_tups.append((current_header, current_text))60
61if current_header is not None:62# pass linting, assert keys are defined63markdown_tups = [64(re.sub(r"#", "", cast(str, key)).strip(), re.sub(r"<.*?>", "", value))65for key, value in markdown_tups66]67else:68markdown_tups = [69(key, re.sub("<.*?>", "", value)) for key, value in markdown_tups70]71
72return markdown_tups73
74def remove_images(self, content: str) -> str:75"""Get a dictionary of a markdown file from its path."""76pattern = r"!{1}\[\[(.*)\]\]"77return re.sub(pattern, "", content)78
79def remove_hyperlinks(self, content: str) -> str:80"""Get a dictionary of a markdown file from its path."""81pattern = r"\[(.*?)\]\((.*?)\)"82return re.sub(pattern, r"\1", content)83
84def _init_parser(self) -> Dict:85"""Initialize the parser with the config."""86return {}87
88def parse_tups(89self, filepath: Path, errors: str = "ignore"90) -> List[Tuple[Optional[str], str]]:91"""Parse file into tuples."""92with open(filepath, encoding="utf-8") as f:93content = f.read()94if self._remove_hyperlinks:95content = self.remove_hyperlinks(content)96if self._remove_images:97content = self.remove_images(content)98return self.markdown_to_tups(content)99
100def load_data(101self, file: Path, extra_info: Optional[Dict] = None102) -> List[Document]:103"""Parse file into string."""104tups = self.parse_tups(file)105results = []106# TODO: don't include headers right now107for header, value in tups:108if header is None:109results.append(Document(text=value, metadata=extra_info or {}))110else:111results.append(112Document(text=f"\n\n{header}\n{value}", metadata=extra_info or {})113)114return results115