llama-index
107 строк · 3.4 Кб
1"""Mbox parser.
2
3Contains simple parser for mbox files.
4
5"""
6
7import logging8from pathlib import Path9from typing import Any, Dict, List, Optional10
11from llama_index.legacy.readers.base import BaseReader12from llama_index.legacy.schema import Document13
14logger = logging.getLogger(__name__)15
16
17class MboxReader(BaseReader):18"""Mbox parser.19
20Extract messages from mailbox files.
21Returns string including date, subject, sender, receiver and
22content for each message.
23
24"""
25
26DEFAULT_MESSAGE_FORMAT: str = (27"Date: {_date}\n"28"From: {_from}\n"29"To: {_to}\n"30"Subject: {_subject}\n"31"Content: {_content}"32)33
34def __init__(35self,36*args: Any,37max_count: int = 0,38message_format: str = DEFAULT_MESSAGE_FORMAT,39**kwargs: Any,40) -> None:41"""Init params."""42try:43from bs4 import BeautifulSoup # noqa44except ImportError:45raise ImportError(46"`beautifulsoup4` package not found: `pip install beautifulsoup4`"47)48
49super().__init__(*args, **kwargs)50self.max_count = max_count51self.message_format = message_format52
53def load_data(54self, file: Path, extra_info: Optional[Dict] = None55) -> List[Document]:56"""Parse file into string."""57# Import required libraries58import mailbox59from email.parser import BytesParser60from email.policy import default61
62from bs4 import BeautifulSoup63
64i = 065results: List[str] = []66# Load file using mailbox67bytes_parser = BytesParser(policy=default).parse68mbox = mailbox.mbox(file, factory=bytes_parser) # type: ignore69
70# Iterate through all messages71for _, _msg in enumerate(mbox):72try:73msg: mailbox.mboxMessage = _msg74# Parse multipart messages75if msg.is_multipart():76for part in msg.walk():77ctype = part.get_content_type()78cdispo = str(part.get("Content-Disposition"))79if ctype == "text/plain" and "attachment" not in cdispo:80content = part.get_payload(decode=True) # decode81break82# Get plain message payload for non-multipart messages83else:84content = msg.get_payload(decode=True)85
86# Parse message HTML content and remove unneeded whitespace87soup = BeautifulSoup(content)88stripped_content = " ".join(soup.get_text().split())89# Format message to include date, sender, receiver and subject90msg_string = self.message_format.format(91_date=msg["date"],92_from=msg["from"],93_to=msg["to"],94_subject=msg["subject"],95_content=stripped_content,96)97# Add message string to results98results.append(msg_string)99except Exception as e:100logger.warning(f"Failed to parse message:\n{_msg}\n with exception {e}")101
102# Increment counter and return if max count is met103i += 1104if self.max_count > 0 and i >= self.max_count:105break106
107return [Document(text=result, metadata=extra_info or {}) for result in results]108