llama-index

Форк
0
223 строки · 8.4 Кб
1
"""Slack reader."""
2

3
import logging
4
import os
5
import time
6
from datetime import datetime
7
from ssl import SSLContext
8
from typing import Any, List, Optional
9

10
from llama_index.legacy.bridge.pydantic import PrivateAttr
11
from llama_index.legacy.readers.base import BasePydanticReader
12
from llama_index.legacy.schema import Document
13

14
logger = logging.getLogger(__name__)
15

16

17
class SlackReader(BasePydanticReader):
18
    """Slack reader.
19

20
    Reads conversations from channels. If an earliest_date is provided, an
21
    optional latest_date can also be provided. If no latest_date is provided,
22
    we assume the latest date is the current timestamp.
23

24
    Args:
25
        slack_token (Optional[str]): Slack token. If not provided, we
26
            assume the environment variable `SLACK_BOT_TOKEN` is set.
27
        ssl (Optional[str]): Custom SSL context. If not provided, it is assumed
28
            there is already an SSL context available.
29
        earliest_date (Optional[datetime]): Earliest date from which
30
            to read conversations. If not provided, we read all messages.
31
        latest_date (Optional[datetime]): Latest date from which to
32
            read conversations. If not provided, defaults to current timestamp
33
            in combination with earliest_date.
34
    """
35

36
    is_remote: bool = True
37
    slack_token: str
38
    earliest_date_timestamp: Optional[float]
39
    latest_date_timestamp: float
40

41
    _client: Any = PrivateAttr()
42

43
    def __init__(
44
        self,
45
        slack_token: Optional[str] = None,
46
        ssl: Optional[SSLContext] = None,
47
        earliest_date: Optional[datetime] = None,
48
        latest_date: Optional[datetime] = None,
49
        earliest_date_timestamp: Optional[float] = None,
50
        latest_date_timestamp: Optional[float] = None,
51
    ) -> None:
52
        """Initialize with parameters."""
53
        from slack_sdk import WebClient
54

55
        if slack_token is None:
56
            slack_token = os.environ["SLACK_BOT_TOKEN"]
57
        if slack_token is None:
58
            raise ValueError(
59
                "Must specify `slack_token` or set environment "
60
                "variable `SLACK_BOT_TOKEN`."
61
            )
62
        if ssl is None:
63
            self._client = WebClient(token=slack_token)
64
        else:
65
            self._client = WebClient(token=slack_token, ssl=ssl)
66
        if latest_date is not None and earliest_date is None:
67
            raise ValueError(
68
                "Must specify `earliest_date` if `latest_date` is specified."
69
            )
70
        if earliest_date is not None:
71
            earliest_date_timestamp = earliest_date.timestamp()
72
        else:
73
            earliest_date_timestamp = None or earliest_date_timestamp
74
        if latest_date is not None:
75
            latest_date_timestamp = latest_date.timestamp()
76
        else:
77
            latest_date_timestamp = datetime.now().timestamp() or latest_date_timestamp
78
        res = self._client.api_test()
79
        if not res["ok"]:
80
            raise ValueError(f"Error initializing Slack API: {res['error']}")
81

82
        super().__init__(
83
            slack_token=slack_token,
84
            earliest_date_timestamp=earliest_date_timestamp,
85
            latest_date_timestamp=latest_date_timestamp,
86
        )
87

88
    @classmethod
89
    def class_name(cls) -> str:
90
        return "SlackReader"
91

92
    def _read_message(self, channel_id: str, message_ts: str) -> str:
93
        from slack_sdk.errors import SlackApiError
94

95
        """Read a message."""
96

97
        messages_text: List[str] = []
98
        next_cursor = None
99
        while True:
100
            try:
101
                # https://slack.com/api/conversations.replies
102
                # List all replies to a message, including the message itself.
103
                if self.earliest_date_timestamp is None:
104
                    result = self._client.conversations_replies(
105
                        channel=channel_id, ts=message_ts, cursor=next_cursor
106
                    )
107
                else:
108
                    conversations_replies_kwargs = {
109
                        "channel": channel_id,
110
                        "ts": message_ts,
111
                        "cursor": next_cursor,
112
                        "latest": str(self.latest_date_timestamp),
113
                    }
114
                    if self.earliest_date_timestamp is not None:
115
                        conversations_replies_kwargs["oldest"] = str(
116
                            self.earliest_date_timestamp
117
                        )
118
                    result = self._client.conversations_replies(
119
                        **conversations_replies_kwargs  # type: ignore
120
                    )
121
                messages = result["messages"]
122
                messages_text.extend(message["text"] for message in messages)
123
                if not result["has_more"]:
124
                    break
125

126
                next_cursor = result["response_metadata"]["next_cursor"]
127
            except SlackApiError as e:
128
                if e.response["error"] == "ratelimited":
129
                    logger.error(
130
                        "Rate limit error reached, sleeping for: {} seconds".format(
131
                            e.response.headers["retry-after"]
132
                        )
133
                    )
134
                    time.sleep(int(e.response.headers["retry-after"]))
135
                else:
136
                    logger.error(f"Error parsing conversation replies: {e}")
137

138
        return "\n\n".join(messages_text)
139

140
    def _read_channel(self, channel_id: str, reverse_chronological: bool) -> str:
141
        from slack_sdk.errors import SlackApiError
142

143
        """Read a channel."""
144

145
        result_messages: List[str] = []
146
        next_cursor = None
147
        while True:
148
            try:
149
                # Call the conversations.history method using the WebClient
150
                # conversations.history returns the first 100 messages by default
151
                # These results are paginated,
152
                # see: https://api.slack.com/methods/conversations.history$pagination
153
                conversations_history_kwargs = {
154
                    "channel": channel_id,
155
                    "cursor": next_cursor,
156
                    "latest": str(self.latest_date_timestamp),
157
                }
158
                if self.earliest_date_timestamp is not None:
159
                    conversations_history_kwargs["oldest"] = str(
160
                        self.earliest_date_timestamp
161
                    )
162
                result = self._client.conversations_history(
163
                    **conversations_history_kwargs  # type: ignore
164
                )
165
                conversation_history = result["messages"]
166
                # Print results
167
                logger.info(
168
                    f"{len(conversation_history)} messages found in {channel_id}"
169
                )
170
                result_messages.extend(
171
                    self._read_message(channel_id, message["ts"])
172
                    for message in conversation_history
173
                )
174
                if not result["has_more"]:
175
                    break
176
                next_cursor = result["response_metadata"]["next_cursor"]
177

178
            except SlackApiError as e:
179
                if e.response["error"] == "ratelimited":
180
                    logger.error(
181
                        "Rate limit error reached, sleeping for: {} seconds".format(
182
                            e.response.headers["retry-after"]
183
                        )
184
                    )
185
                    time.sleep(int(e.response.headers["retry-after"]))
186
                else:
187
                    logger.error(f"Error parsing conversation replies: {e}")
188

189
        return (
190
            "\n\n".join(result_messages)
191
            if reverse_chronological
192
            else "\n\n".join(result_messages[::-1])
193
        )
194

195
    def load_data(
196
        self, channel_ids: List[str], reverse_chronological: bool = True
197
    ) -> List[Document]:
198
        """Load data from the input directory.
199

200
        Args:
201
            channel_ids (List[str]): List of channel ids to read.
202

203
        Returns:
204
            List[Document]: List of documents.
205
        """
206
        results = []
207
        for channel_id in channel_ids:
208
            channel_content = self._read_channel(
209
                channel_id, reverse_chronological=reverse_chronological
210
            )
211
            results.append(
212
                Document(
213
                    id_=channel_id,
214
                    text=channel_content,
215
                    metadata={"channel": channel_id},
216
                )
217
            )
218
        return results
219

220

221
if __name__ == "__main__":
222
    reader = SlackReader()
223
    logger.info(reader.load_data(channel_ids=["C04DC2VUY3F"]))
224

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.