llama-index

Форк
0
470 строк · 15.2 Кб
1
"""
2
Pinecone Vector store index.
3

4
An index that is built on top of an existing vector store.
5

6
"""
7

8
import logging
9
from collections import Counter
10
from functools import partial
11
from typing import Any, Callable, Dict, List, Optional, cast
12

13
from llama_index.legacy.bridge.pydantic import PrivateAttr
14
from llama_index.legacy.schema import BaseNode, MetadataMode, TextNode
15
from llama_index.legacy.vector_stores.pinecone_utils import (
16
    _import_pinecone,
17
    _is_pinecone_v3,
18
)
19
from llama_index.legacy.vector_stores.types import (
20
    BasePydanticVectorStore,
21
    MetadataFilters,
22
    VectorStoreQuery,
23
    VectorStoreQueryMode,
24
    VectorStoreQueryResult,
25
)
26
from llama_index.legacy.vector_stores.utils import (
27
    DEFAULT_TEXT_KEY,
28
    legacy_metadata_dict_to_node,
29
    metadata_dict_to_node,
30
    node_to_metadata_dict,
31
)
32

33
ID_KEY = "id"
34
VECTOR_KEY = "values"
35
SPARSE_VECTOR_KEY = "sparse_values"
36
METADATA_KEY = "metadata"
37

38
DEFAULT_BATCH_SIZE = 100
39

40
_logger = logging.getLogger(__name__)
41

42

43
def _transform_pinecone_filter_condition(condition: str) -> str:
44
    """Translate standard metadata filter op to Pinecone specific spec."""
45
    if condition == "and":
46
        return "$and"
47
    elif condition == "or":
48
        return "$or"
49
    else:
50
        raise ValueError(f"Filter condition {condition} not supported")
51

52

53
def _transform_pinecone_filter_operator(operator: str) -> str:
54
    """Translate standard metadata filter operator to Pinecone specific spec."""
55
    if operator == "!=":
56
        return "$ne"
57
    elif operator == "==":
58
        return "$eq"
59
    elif operator == ">":
60
        return "$gt"
61
    elif operator == "<":
62
        return "$lt"
63
    elif operator == ">=":
64
        return "$gte"
65
    elif operator == "<=":
66
        return "$lte"
67
    elif operator == "in":
68
        return "$in"
69
    elif operator == "nin":
70
        return "$nin"
71
    else:
72
        raise ValueError(f"Filter operator {operator} not supported")
73

74

75
def build_dict(input_batch: List[List[int]]) -> List[Dict[str, Any]]:
76
    """Build a list of sparse dictionaries from a batch of input_ids.
77

78
    NOTE: taken from https://www.pinecone.io/learn/hybrid-search-intro/.
79

80
    """
81
    # store a batch of sparse embeddings
82
    sparse_emb = []
83
    # iterate through input batch
84
    for token_ids in input_batch:
85
        indices = []
86
        values = []
87
        # convert the input_ids list to a dictionary of key to frequency values
88
        d = dict(Counter(token_ids))
89
        for idx in d:
90
            indices.append(idx)
91
            values.append(float(d[idx]))
92
        sparse_emb.append({"indices": indices, "values": values})
93
    # return sparse_emb list
94
    return sparse_emb
95

96

97
def generate_sparse_vectors(
98
    context_batch: List[str], tokenizer: Callable
99
) -> List[Dict[str, Any]]:
100
    """Generate sparse vectors from a batch of contexts.
101

102
    NOTE: taken from https://www.pinecone.io/learn/hybrid-search-intro/.
103

104
    """
105
    # create batch of input_ids
106
    inputs = tokenizer(context_batch)["input_ids"]
107
    # create sparse dictionaries
108
    return build_dict(inputs)
109

110

111
def get_default_tokenizer() -> Callable:
112
    """Get default tokenizer.
113

114
    NOTE: taken from https://www.pinecone.io/learn/hybrid-search-intro/.
115

116
    """
117
    from transformers import BertTokenizerFast
118

119
    orig_tokenizer = BertTokenizerFast.from_pretrained("bert-base-uncased")
120
    # set some default arguments, so input is just a list of strings
121
    return partial(
122
        orig_tokenizer,
123
        padding=True,
124
        truncation=True,
125
        max_length=512,
126
    )
127

128

129
def _to_pinecone_filter(standard_filters: MetadataFilters) -> dict:
130
    """Convert from standard dataclass to pinecone filter dict."""
131
    filters = {}
132
    filters_list = []
133
    condition = standard_filters.condition or "and"
134
    condition = _transform_pinecone_filter_condition(condition)
135
    if standard_filters.filters:
136
        for filter in standard_filters.filters:
137
            if filter.operator:
138
                filters_list.append(
139
                    {
140
                        filter.key: {
141
                            _transform_pinecone_filter_operator(
142
                                filter.operator
143
                            ): filter.value
144
                        }
145
                    }
146
                )
147
            else:
148
                filters_list.append({filter.key: filter.value})
149

150
    if len(filters_list) == 1:
151
        # If there is only one filter, return it directly
152
        return filters_list[0]
153
    elif len(filters_list) > 1:
154
        filters[condition] = filters_list
155
    return filters
156

157

158
import_err_msg = (
159
    "`pinecone` package not found, please run `pip install pinecone-client`"
160
)
161

162

163
class PineconeVectorStore(BasePydanticVectorStore):
164
    """Pinecone Vector Store.
165

166
    In this vector store, embeddings and docs are stored within a
167
    Pinecone index.
168

169
    During query time, the index uses Pinecone to query for the top
170
    k most similar nodes.
171

172
    Args:
173
        pinecone_index (Optional[Union[pinecone.Pinecone.Index, pinecone.Index]]): Pinecone index instance,
174
        pinecone.Pinecone.Index for clients >= 3.0.0; pinecone.Index for older clients.
175
        insert_kwargs (Optional[Dict]): insert kwargs during `upsert` call.
176
        add_sparse_vector (bool): whether to add sparse vector to index.
177
        tokenizer (Optional[Callable]): tokenizer to use to generate sparse
178
        default_empty_query_vector (Optional[List[float]]): default empty query vector.
179
            Defaults to None. If not None, then this vector will be used as the query
180
            vector if the query is empty.
181

182
    """
183

184
    stores_text: bool = True
185
    flat_metadata: bool = False
186

187
    api_key: Optional[str]
188
    index_name: Optional[str]
189
    environment: Optional[str]
190
    namespace: Optional[str]
191
    insert_kwargs: Optional[Dict]
192
    add_sparse_vector: bool
193
    text_key: str
194
    batch_size: int
195
    remove_text_from_metadata: bool
196

197
    _pinecone_index: Any = PrivateAttr()
198
    _tokenizer: Optional[Callable] = PrivateAttr()
199

200
    def __init__(
201
        self,
202
        pinecone_index: Optional[
203
            Any
204
        ] = None,  # Dynamic import prevents specific type hinting here
205
        api_key: Optional[str] = None,
206
        index_name: Optional[str] = None,
207
        environment: Optional[str] = None,
208
        namespace: Optional[str] = None,
209
        insert_kwargs: Optional[Dict] = None,
210
        add_sparse_vector: bool = False,
211
        tokenizer: Optional[Callable] = None,
212
        text_key: str = DEFAULT_TEXT_KEY,
213
        batch_size: int = DEFAULT_BATCH_SIZE,
214
        remove_text_from_metadata: bool = False,
215
        default_empty_query_vector: Optional[List[float]] = None,
216
        **kwargs: Any,
217
    ) -> None:
218
        insert_kwargs = insert_kwargs or {}
219

220
        if tokenizer is None and add_sparse_vector:
221
            tokenizer = get_default_tokenizer()
222
        self._tokenizer = tokenizer
223

224
        super().__init__(
225
            index_name=index_name,
226
            environment=environment,
227
            api_key=api_key,
228
            namespace=namespace,
229
            insert_kwargs=insert_kwargs,
230
            add_sparse_vector=add_sparse_vector,
231
            text_key=text_key,
232
            batch_size=batch_size,
233
            remove_text_from_metadata=remove_text_from_metadata,
234
        )
235

236
        # TODO: Make following instance check stronger -- check if pinecone_index is not pinecone.Index, else raise
237
        #  ValueError
238
        if isinstance(pinecone_index, str):
239
            raise ValueError(
240
                f"`pinecone_index` cannot be of type `str`; should be an instance of pinecone.Index, "
241
            )
242

243
        self._pinecone_index = pinecone_index or self._initialize_pinecone_client(
244
            api_key, index_name, environment, **kwargs
245
        )
246

247
    @classmethod
248
    def _initialize_pinecone_client(
249
        cls,
250
        api_key: Optional[str],
251
        index_name: Optional[str],
252
        environment: Optional[str],
253
        **kwargs: Any,
254
    ) -> Any:
255
        """
256
        Initialize Pinecone client based on version.
257

258
        If client version <3.0.0, use pods-based initialization; else, use serverless initialization.
259
        """
260
        if not index_name:
261
            raise ValueError(
262
                "`index_name` is required for Pinecone client initialization"
263
            )
264

265
        pinecone = _import_pinecone()
266

267
        if (
268
            not _is_pinecone_v3()
269
        ):  # If old version of Pinecone client (version bifurcation temporary):
270
            if not environment:
271
                raise ValueError("environment is required for Pinecone client < 3.0.0")
272
            pinecone.init(api_key=api_key, environment=environment)
273
            return pinecone.Index(index_name)
274
        else:  # If new version of Pinecone client (serverless):
275
            pinecone_instance = pinecone.Pinecone(api_key=api_key)
276
            return pinecone_instance.Index(index_name)
277

278
    @classmethod
279
    def from_params(
280
        cls,
281
        api_key: Optional[str] = None,
282
        index_name: Optional[str] = None,
283
        environment: Optional[str] = None,
284
        namespace: Optional[str] = None,
285
        insert_kwargs: Optional[Dict] = None,
286
        add_sparse_vector: bool = False,
287
        tokenizer: Optional[Callable] = None,
288
        text_key: str = DEFAULT_TEXT_KEY,
289
        batch_size: int = DEFAULT_BATCH_SIZE,
290
        remove_text_from_metadata: bool = False,
291
        default_empty_query_vector: Optional[List[float]] = None,
292
        **kwargs: Any,
293
    ) -> "PineconeVectorStore":
294
        pinecone_index = cls._initialize_pinecone_client(
295
            api_key, index_name, environment, **kwargs
296
        )
297

298
        return cls(
299
            pinecone_index=pinecone_index,
300
            api_key=api_key,
301
            index_name=index_name,
302
            environment=environment,
303
            namespace=namespace,
304
            insert_kwargs=insert_kwargs,
305
            add_sparse_vector=add_sparse_vector,
306
            tokenizer=tokenizer,
307
            text_key=text_key,
308
            batch_size=batch_size,
309
            remove_text_from_metadata=remove_text_from_metadata,
310
            default_empty_query_vector=default_empty_query_vector,
311
            **kwargs,
312
        )
313

314
    @classmethod
315
    def class_name(cls) -> str:
316
        return "PinconeVectorStore"
317

318
    def add(
319
        self,
320
        nodes: List[BaseNode],
321
        **add_kwargs: Any,
322
    ) -> List[str]:
323
        """Add nodes to index.
324

325
        Args:
326
            nodes: List[BaseNode]: list of nodes with embeddings
327

328
        """
329
        ids = []
330
        entries = []
331
        for node in nodes:
332
            node_id = node.node_id
333

334
            metadata = node_to_metadata_dict(
335
                node,
336
                remove_text=self.remove_text_from_metadata,
337
                flat_metadata=self.flat_metadata,
338
            )
339

340
            entry = {
341
                ID_KEY: node_id,
342
                VECTOR_KEY: node.get_embedding(),
343
                METADATA_KEY: metadata,
344
            }
345
            if self.add_sparse_vector and self._tokenizer is not None:
346
                sparse_vector = generate_sparse_vectors(
347
                    [node.get_content(metadata_mode=MetadataMode.EMBED)],
348
                    self._tokenizer,
349
                )[0]
350
                entry[SPARSE_VECTOR_KEY] = sparse_vector
351

352
            ids.append(node_id)
353
            entries.append(entry)
354
        self._pinecone_index.upsert(
355
            entries,
356
            namespace=self.namespace,
357
            batch_size=self.batch_size,
358
            **self.insert_kwargs,
359
        )
360
        return ids
361

362
    def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:
363
        """
364
        Delete nodes using with ref_doc_id.
365

366
        Args:
367
            ref_doc_id (str): The doc_id of the document to delete.
368

369
        """
370
        # delete by filtering on the doc_id metadata
371
        self._pinecone_index.delete(
372
            filter={"doc_id": {"$eq": ref_doc_id}},
373
            namespace=self.namespace,
374
            **delete_kwargs,
375
        )
376

377
    @property
378
    def client(self) -> Any:
379
        """Return Pinecone client."""
380
        return self._pinecone_index
381

382
    def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
383
        """Query index for top k most similar nodes.
384

385
        Args:
386
            query_embedding (List[float]): query embedding
387
            similarity_top_k (int): top k most similar nodes
388

389
        """
390
        sparse_vector = None
391
        if (
392
            query.mode in (VectorStoreQueryMode.SPARSE, VectorStoreQueryMode.HYBRID)
393
            and self._tokenizer is not None
394
        ):
395
            if query.query_str is None:
396
                raise ValueError(
397
                    "query_str must be specified if mode is SPARSE or HYBRID."
398
                )
399
            sparse_vector = generate_sparse_vectors([query.query_str], self._tokenizer)[
400
                0
401
            ]
402
            if query.alpha is not None:
403
                sparse_vector = {
404
                    "indices": sparse_vector["indices"],
405
                    "values": [v * (1 - query.alpha) for v in sparse_vector["values"]],
406
                }
407

408
        query_embedding = None
409
        if query.mode in (VectorStoreQueryMode.DEFAULT, VectorStoreQueryMode.HYBRID):
410
            query_embedding = cast(List[float], query.query_embedding)
411
            if query.alpha is not None:
412
                query_embedding = [v * query.alpha for v in query_embedding]
413

414
        if query.filters is not None:
415
            if "filter" in kwargs or "pinecone_query_filters" in kwargs:
416
                raise ValueError(
417
                    "Cannot specify filter via both query and kwargs. "
418
                    "Use kwargs only for pinecone specific items that are "
419
                    "not supported via the generic query interface."
420
                )
421
            filter = _to_pinecone_filter(query.filters)
422
        elif "pinecone_query_filters" in kwargs:
423
            filter = kwargs.pop("pinecone_query_filters")
424
        else:
425
            filter = kwargs.pop("filter", {})
426

427
        response = self._pinecone_index.query(
428
            vector=query_embedding,
429
            sparse_vector=sparse_vector,
430
            top_k=query.similarity_top_k,
431
            include_values=True,
432
            include_metadata=True,
433
            namespace=self.namespace,
434
            filter=filter,
435
            **kwargs,
436
        )
437

438
        top_k_nodes = []
439
        top_k_ids = []
440
        top_k_scores = []
441
        for match in response.matches:
442
            try:
443
                node = metadata_dict_to_node(match.metadata)
444
                node.embedding = match.values
445
            except Exception:
446
                # NOTE: deprecated legacy logic for backward compatibility
447
                _logger.debug(
448
                    "Failed to parse Node metadata, fallback to legacy logic."
449
                )
450
                metadata, node_info, relationships = legacy_metadata_dict_to_node(
451
                    match.metadata, text_key=self.text_key
452
                )
453

454
                text = match.metadata[self.text_key]
455
                id = match.id
456
                node = TextNode(
457
                    text=text,
458
                    id_=id,
459
                    metadata=metadata,
460
                    start_char_idx=node_info.get("start", None),
461
                    end_char_idx=node_info.get("end", None),
462
                    relationships=relationships,
463
                )
464
            top_k_ids.append(match.id)
465
            top_k_nodes.append(node)
466
            top_k_scores.append(match.score)
467

468
        return VectorStoreQueryResult(
469
            nodes=top_k_nodes, similarities=top_k_scores, ids=top_k_ids
470
        )
471

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.