llama-index

Форк
0
1
"""
2
Github readers utils.
3

4
This module contains utility functions for the Github readers.
5
"""
6

7
import asyncio
8
import os
9
import time
10
from abc import ABC, abstractmethod
11
from typing import List, Tuple
12

13
from llama_index.legacy.readers.github_readers.github_api_client import (
14
    GitBlobResponseModel,
15
    GithubClient,
16
    GitTreeResponseModel,
17
)
18

19

20
def print_if_verbose(verbose: bool, message: str) -> None:
21
    """Log message if verbose is True."""
22
    if verbose:
23
        print(message)
24

25

26
def get_file_extension(filename: str) -> str:
27
    """Get file extension."""
28
    return f".{os.path.splitext(filename)[1][1:].lower()}"
29

30

31
class BufferedAsyncIterator(ABC):
32
    """
33
    Base class for buffered async iterators.
34

35
    This class is to be used as a base class for async iterators
36
    that need to buffer the results of an async operation.
37
    The async operation is defined in the _fill_buffer method.
38
    The _fill_buffer method is called when the buffer is empty.
39
    """
40

41
    def __init__(self, buffer_size: int):
42
        """
43
        Initialize params.
44

45
        Args:
46
            - `buffer_size (int)`: Size of the buffer.
47
                It is also the number of items that will
48
                be retrieved from the async operation at once.
49
                see _fill_buffer. Defaults to 2. Setting it to 1
50
                will result in the same behavior as a synchronous iterator.
51
        """
52
        self._buffer_size = buffer_size
53
        self._buffer: List[Tuple[GitBlobResponseModel, str]] = []
54
        self._index = 0
55

56
    @abstractmethod
57
    async def _fill_buffer(self) -> None:
58
        raise NotImplementedError
59

60
    def __aiter__(self) -> "BufferedAsyncIterator":
61
        """Return the iterator object."""
62
        return self
63

64
    async def __anext__(self) -> Tuple[GitBlobResponseModel, str]:
65
        """
66
        Get next item.
67

68
        Returns:
69
            - `item (Tuple[GitBlobResponseModel, str])`: Next item.
70

71
        Raises:
72
            - `StopAsyncIteration`: If there are no more items.
73
        """
74
        if not self._buffer:
75
            await self._fill_buffer()
76

77
        if not self._buffer:
78
            raise StopAsyncIteration
79

80
        item = self._buffer.pop(0)
81
        self._index += 1
82
        return item
83

84

85
class BufferedGitBlobDataIterator(BufferedAsyncIterator):
86
    """
87
    Buffered async iterator for Git blobs.
88

89
    This class is an async iterator that buffers the results of the get_blob operation.
90
    It is used to retrieve the contents of the files in a Github repository.
91
    getBlob endpoint supports up to 100 megabytes of content for blobs.
92
    This concrete implementation of BufferedAsyncIterator allows you to lazily retrieve
93
    the contents of the files in a Github repository.
94
    Otherwise you would have to retrieve all the contents of
95
    the files in the repository at once, which would
96
    be problematic if the repository is large.
97
    """
98

99
    def __init__(
100
        self,
101
        blobs_and_paths: List[Tuple[GitTreeResponseModel.GitTreeObject, str]],
102
        github_client: GithubClient,
103
        owner: str,
104
        repo: str,
105
        loop: asyncio.AbstractEventLoop,
106
        buffer_size: int,
107
        verbose: bool = False,
108
    ):
109
        """
110
        Initialize params.
111

112
        Args:
113
            - blobs_and_paths (List[Tuple[GitTreeResponseModel.GitTreeObject, str]]):
114
                List of tuples containing the blob and the path of the file.
115
            - github_client (GithubClient): Github client.
116
            - owner (str): Owner of the repository.
117
            - repo (str): Name of the repository.
118
            - loop (asyncio.AbstractEventLoop): Event loop.
119
            - buffer_size (int): Size of the buffer.
120
        """
121
        super().__init__(buffer_size)
122
        self._blobs_and_paths = blobs_and_paths
123
        self._github_client = github_client
124
        self._owner = owner
125
        self._repo = repo
126
        self._verbose = verbose
127
        if loop is None:
128
            loop = asyncio.get_event_loop()
129
            if loop is None:
130
                raise ValueError("No event loop found")
131

132
    async def _fill_buffer(self) -> None:
133
        """
134
        Fill the buffer with the results of the get_blob operation.
135

136
        The get_blob operation is called for each blob in the blobs_and_paths list.
137
        The blobs are retrieved in batches of size buffer_size.
138
        """
139
        del self._buffer[:]
140
        self._buffer = []
141
        start = self._index
142
        end = min(start + self._buffer_size, len(self._blobs_and_paths))
143

144
        if start >= end:
145
            return
146

147
        if self._verbose:
148
            start_t = time.time()
149
        results: List[GitBlobResponseModel] = await asyncio.gather(
150
            *[
151
                self._github_client.get_blob(self._owner, self._repo, blob.sha)
152
                for blob, _ in self._blobs_and_paths[
153
                    start:end
154
                ]  # TODO: use batch_size instead of buffer_size for concurrent requests
155
            ]
156
        )
157
        if self._verbose:
158
            end_t = time.time()
159
            blob_names_and_sizes = [
160
                (blob.path, blob.size) for blob, _ in self._blobs_and_paths[start:end]
161
            ]
162
            print(
163
                "Time to get blobs ("
164
                + f"{blob_names_and_sizes}"
165
                + f"): {end_t - start_t:.2f} seconds"
166
            )
167

168
        self._buffer = [
169
            (result, path)
170
            for result, (_, path) in zip(results, self._blobs_and_paths[start:end])
171
        ]
172

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.