llama-index
223 строки · 8.4 Кб
1"""Slack reader."""
2
3import logging4import os5import time6from datetime import datetime7from ssl import SSLContext8from typing import Any, List, Optional9
10from llama_index.legacy.bridge.pydantic import PrivateAttr11from llama_index.legacy.readers.base import BasePydanticReader12from llama_index.legacy.schema import Document13
14logger = logging.getLogger(__name__)15
16
17class SlackReader(BasePydanticReader):18"""Slack reader.19
20Reads conversations from channels. If an earliest_date is provided, an
21optional latest_date can also be provided. If no latest_date is provided,
22we assume the latest date is the current timestamp.
23
24Args:
25slack_token (Optional[str]): Slack token. If not provided, we
26assume the environment variable `SLACK_BOT_TOKEN` is set.
27ssl (Optional[str]): Custom SSL context. If not provided, it is assumed
28there is already an SSL context available.
29earliest_date (Optional[datetime]): Earliest date from which
30to read conversations. If not provided, we read all messages.
31latest_date (Optional[datetime]): Latest date from which to
32read conversations. If not provided, defaults to current timestamp
33in combination with earliest_date.
34"""
35
36is_remote: bool = True37slack_token: str38earliest_date_timestamp: Optional[float]39latest_date_timestamp: float40
41_client: Any = PrivateAttr()42
43def __init__(44self,45slack_token: Optional[str] = None,46ssl: Optional[SSLContext] = None,47earliest_date: Optional[datetime] = None,48latest_date: Optional[datetime] = None,49earliest_date_timestamp: Optional[float] = None,50latest_date_timestamp: Optional[float] = None,51) -> None:52"""Initialize with parameters."""53from slack_sdk import WebClient54
55if slack_token is None:56slack_token = os.environ["SLACK_BOT_TOKEN"]57if slack_token is None:58raise ValueError(59"Must specify `slack_token` or set environment "60"variable `SLACK_BOT_TOKEN`."61)62if ssl is None:63self._client = WebClient(token=slack_token)64else:65self._client = WebClient(token=slack_token, ssl=ssl)66if latest_date is not None and earliest_date is None:67raise ValueError(68"Must specify `earliest_date` if `latest_date` is specified."69)70if earliest_date is not None:71earliest_date_timestamp = earliest_date.timestamp()72else:73earliest_date_timestamp = None or earliest_date_timestamp74if latest_date is not None:75latest_date_timestamp = latest_date.timestamp()76else:77latest_date_timestamp = datetime.now().timestamp() or latest_date_timestamp78res = self._client.api_test()79if not res["ok"]:80raise ValueError(f"Error initializing Slack API: {res['error']}")81
82super().__init__(83slack_token=slack_token,84earliest_date_timestamp=earliest_date_timestamp,85latest_date_timestamp=latest_date_timestamp,86)87
88@classmethod89def class_name(cls) -> str:90return "SlackReader"91
92def _read_message(self, channel_id: str, message_ts: str) -> str:93from slack_sdk.errors import SlackApiError94
95"""Read a message."""96
97messages_text: List[str] = []98next_cursor = None99while True:100try:101# https://slack.com/api/conversations.replies102# List all replies to a message, including the message itself.103if self.earliest_date_timestamp is None:104result = self._client.conversations_replies(105channel=channel_id, ts=message_ts, cursor=next_cursor106)107else:108conversations_replies_kwargs = {109"channel": channel_id,110"ts": message_ts,111"cursor": next_cursor,112"latest": str(self.latest_date_timestamp),113}114if self.earliest_date_timestamp is not None:115conversations_replies_kwargs["oldest"] = str(116self.earliest_date_timestamp117)118result = self._client.conversations_replies(119**conversations_replies_kwargs # type: ignore120)121messages = result["messages"]122messages_text.extend(message["text"] for message in messages)123if not result["has_more"]:124break125
126next_cursor = result["response_metadata"]["next_cursor"]127except SlackApiError as e:128if e.response["error"] == "ratelimited":129logger.error(130"Rate limit error reached, sleeping for: {} seconds".format(131e.response.headers["retry-after"]132)133)134time.sleep(int(e.response.headers["retry-after"]))135else:136logger.error(f"Error parsing conversation replies: {e}")137
138return "\n\n".join(messages_text)139
140def _read_channel(self, channel_id: str, reverse_chronological: bool) -> str:141from slack_sdk.errors import SlackApiError142
143"""Read a channel."""144
145result_messages: List[str] = []146next_cursor = None147while True:148try:149# Call the conversations.history method using the WebClient150# conversations.history returns the first 100 messages by default151# These results are paginated,152# see: https://api.slack.com/methods/conversations.history$pagination153conversations_history_kwargs = {154"channel": channel_id,155"cursor": next_cursor,156"latest": str(self.latest_date_timestamp),157}158if self.earliest_date_timestamp is not None:159conversations_history_kwargs["oldest"] = str(160self.earliest_date_timestamp161)162result = self._client.conversations_history(163**conversations_history_kwargs # type: ignore164)165conversation_history = result["messages"]166# Print results167logger.info(168f"{len(conversation_history)} messages found in {channel_id}"169)170result_messages.extend(171self._read_message(channel_id, message["ts"])172for message in conversation_history173)174if not result["has_more"]:175break176next_cursor = result["response_metadata"]["next_cursor"]177
178except SlackApiError as e:179if e.response["error"] == "ratelimited":180logger.error(181"Rate limit error reached, sleeping for: {} seconds".format(182e.response.headers["retry-after"]183)184)185time.sleep(int(e.response.headers["retry-after"]))186else:187logger.error(f"Error parsing conversation replies: {e}")188
189return (190"\n\n".join(result_messages)191if reverse_chronological192else "\n\n".join(result_messages[::-1])193)194
195def load_data(196self, channel_ids: List[str], reverse_chronological: bool = True197) -> List[Document]:198"""Load data from the input directory.199
200Args:
201channel_ids (List[str]): List of channel ids to read.
202
203Returns:
204List[Document]: List of documents.
205"""
206results = []207for channel_id in channel_ids:208channel_content = self._read_channel(209channel_id, reverse_chronological=reverse_chronological210)211results.append(212Document(213id_=channel_id,214text=channel_content,215metadata={"channel": channel_id},216)217)218return results219
220
221if __name__ == "__main__":222reader = SlackReader()223logger.info(reader.load_data(channel_ids=["C04DC2VUY3F"]))224