llama-index
470 строк · 16.9 Кб
1"""Redis Vector store index.
2
3An index that is built on top of an existing vector store.
4"""
5
6import logging7from typing import TYPE_CHECKING, Any, Dict, List, Optional8
9import fsspec10
11from llama_index.legacy.bridge.pydantic import PrivateAttr12from llama_index.legacy.readers.redis.utils import (13TokenEscaper,14array_to_buffer,15check_redis_modules_exist,16convert_bytes,17get_redis_query,18)
19from llama_index.legacy.schema import (20BaseNode,21MetadataMode,22NodeRelationship,23RelatedNodeInfo,24TextNode,25)
26from llama_index.legacy.vector_stores.types import (27BasePydanticVectorStore,28MetadataFilters,29VectorStoreQuery,30VectorStoreQueryResult,31)
32from llama_index.legacy.vector_stores.utils import (33metadata_dict_to_node,34node_to_metadata_dict,35)
36
37_logger = logging.getLogger(__name__)38
39
40if TYPE_CHECKING:41from redis.client import Redis as RedisType42from redis.commands.search.field import VectorField43
44
45class RedisVectorStore(BasePydanticVectorStore):46stores_text = True47stores_node = True48flat_metadata = False49
50_tokenizer: Any = PrivateAttr()51_redis_client: Any = PrivateAttr()52_prefix: str = PrivateAttr()53_index_name: str = PrivateAttr()54_index_args: Dict[str, Any] = PrivateAttr()55_metadata_fields: List[str] = PrivateAttr()56_overwrite: bool = PrivateAttr()57_vector_field: str = PrivateAttr()58_vector_key: str = PrivateAttr()59
60def __init__(61self,62index_name: str,63index_prefix: str = "llama_index",64prefix_ending: str = "/vector",65index_args: Optional[Dict[str, Any]] = None,66metadata_fields: Optional[List[str]] = None,67redis_url: str = "redis://localhost:6379",68overwrite: bool = False,69**kwargs: Any,70) -> None:71"""Initialize RedisVectorStore.72
73For index arguments that can be passed to RediSearch, see
74https://redis.io/docs/stack/search/reference/vectors/
75
76The index arguments will depend on the index type chosen. There
77are two available index types
78- FLAT: a flat index that uses brute force search
79- HNSW: a hierarchical navigable small world graph index
80
81Args:
82index_name (str): Name of the index.
83index_prefix (str): Prefix for the index. Defaults to "llama_index".
84The actual prefix used by Redis will be
85"{index_prefix}{prefix_ending}".
86prefix_ending (str): Prefix ending for the index. Be careful when
87changing this: https://github.com/jerryjliu/llama_index/pull/6665.
88Defaults to "/vector".
89index_args (Dict[str, Any]): Arguments for the index. Defaults to None.
90metadata_fields (List[str]): List of metadata fields to store in the index
91(only supports TAG fields).
92redis_url (str): URL for the redis instance.
93Defaults to "redis://localhost:6379".
94overwrite (bool): Whether to overwrite the index if it already exists.
95Defaults to False.
96kwargs (Any): Additional arguments to pass to the redis client.
97
98Raises:
99ValueError: If redis-py is not installed
100ValueError: If RediSearch is not installed
101
102Examples:
103>>> from llama_index.legacy.vector_stores.redis import RedisVectorStore
104>>> # Create a RedisVectorStore
105>>> vector_store = RedisVectorStore(
106>>> index_name="my_index",
107>>> index_prefix="llama_index",
108>>> index_args={"algorithm": "HNSW", "m": 16, "ef_construction": 200,
109"distance_metric": "cosine"},
110>>> redis_url="redis://localhost:6379/",
111>>> overwrite=True)
112"""
113try:114import redis115except ImportError:116raise ValueError(117"Could not import redis python package. "118"Please install it with `pip install redis`."119)120try:121# connect to redis from url122self._redis_client = redis.from_url(redis_url, **kwargs)123# check if redis has redisearch module installed124check_redis_modules_exist(self._redis_client)125except ValueError as e:126raise ValueError(f"Redis failed to connect: {e}")127
128# index identifiers129self._prefix = index_prefix + prefix_ending130self._index_name = index_name131self._index_args = index_args if index_args is not None else {}132self._metadata_fields = metadata_fields if metadata_fields is not None else []133self._overwrite = overwrite134self._vector_field = str(self._index_args.get("vector_field", "vector"))135self._vector_key = str(self._index_args.get("vector_key", "vector"))136self._tokenizer = TokenEscaper()137super().__init__()138
139@property140def client(self) -> "RedisType":141"""Return the redis client instance."""142return self._redis_client143
144def add(self, nodes: List[BaseNode], **add_kwargs: Any) -> List[str]:145"""Add nodes to the index.146
147Args:
148nodes (List[BaseNode]): List of nodes with embeddings
149
150Returns:
151List[str]: List of ids of the documents added to the index.
152
153Raises:
154ValueError: If the index already exists and overwrite is False.
155"""
156# check to see if empty document list was passed157if len(nodes) == 0:158return []159
160# set vector dim for creation if index doesn't exist161self._index_args["dims"] = len(nodes[0].get_embedding())162
163if self._index_exists():164if self._overwrite:165self.delete_index()166self._create_index()167else:168logging.info(f"Adding document to existing index {self._index_name}")169else:170self._create_index()171
172ids = []173for node in nodes:174mapping = {175"id": node.node_id,176"doc_id": node.ref_doc_id,177"text": node.get_content(metadata_mode=MetadataMode.NONE),178self._vector_key: array_to_buffer(node.get_embedding()),179}180additional_metadata = node_to_metadata_dict(181node, remove_text=True, flat_metadata=self.flat_metadata182)183mapping.update(additional_metadata)184
185ids.append(node.node_id)186key = "_".join([self._prefix, str(node.node_id)])187self._redis_client.hset(key, mapping=mapping) # type: ignore188
189_logger.info(f"Added {len(ids)} documents to index {self._index_name}")190return ids191
192def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None:193"""194Delete nodes using with ref_doc_id.
195
196Args:
197ref_doc_id (str): The doc_id of the document to delete.
198
199"""
200# use tokenizer to escape dashes in query201query_str = "@doc_id:{%s}" % self._tokenizer.escape(ref_doc_id)202# find all documents that match a doc_id203results = self._redis_client.ft(self._index_name).search(query_str)204if len(results.docs) == 0:205# don't raise an error but warn the user that document wasn't found206# could be a result of eviction policy207_logger.warning(208f"Document with doc_id {ref_doc_id} not found "209f"in index {self._index_name}"210)211return212
213for doc in results.docs:214self._redis_client.delete(doc.id)215_logger.info(216f"Deleted {len(results.docs)} documents from index {self._index_name}"217)218
219def delete_index(self) -> None:220"""Delete the index and all documents."""221_logger.info(f"Deleting index {self._index_name}")222self._redis_client.ft(self._index_name).dropindex(delete_documents=True)223
224def query(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:225"""Query the index.226
227Args:
228query (VectorStoreQuery): query object
229
230Returns:
231VectorStoreQueryResult: query result
232
233Raises:
234ValueError: If query.query_embedding is None.
235redis.exceptions.RedisError: If there is an error querying the index.
236redis.exceptions.TimeoutError: If there is a timeout querying the index.
237ValueError: If no documents are found when querying the index.
238"""
239from redis.exceptions import RedisError240from redis.exceptions import TimeoutError as RedisTimeoutError241
242return_fields = [243"id",244"doc_id",245"text",246self._vector_key,247"vector_score",248"_node_content",249]250
251filters = _to_redis_filters(query.filters) if query.filters is not None else "*"252
253_logger.info(f"Using filters: {filters}")254
255redis_query = get_redis_query(256return_fields=return_fields,257top_k=query.similarity_top_k,258vector_field=self._vector_field,259filters=filters,260)261
262if not query.query_embedding:263raise ValueError("Query embedding is required for querying.")264
265query_params = {266"vector": array_to_buffer(query.query_embedding),267}268_logger.info(f"Querying index {self._index_name}")269
270try:271results = self._redis_client.ft(self._index_name).search(272redis_query, query_params=query_params # type: ignore273)274except RedisTimeoutError as e:275_logger.error(f"Query timed out on {self._index_name}: {e}")276raise277except RedisError as e:278_logger.error(f"Error querying {self._index_name}: {e}")279raise280
281if len(results.docs) == 0:282raise ValueError(283f"No docs found on index '{self._index_name}' with "284f"prefix '{self._prefix}' and filters '{filters}'. "285"* Did you originally create the index with a different prefix? "286"* Did you index your metadata fields when you created the index?"287)288
289ids = []290nodes = []291scores = []292for doc in results.docs:293try:294node = metadata_dict_to_node({"_node_content": doc._node_content})295node.text = doc.text296except Exception:297# TODO: Legacy support for old metadata format298node = TextNode(299text=doc.text,300id_=doc.id,301embedding=None,302relationships={303NodeRelationship.SOURCE: RelatedNodeInfo(node_id=doc.doc_id)304},305)306ids.append(doc.id.replace(self._prefix + "_", ""))307nodes.append(node)308scores.append(1 - float(doc.vector_score))309_logger.info(f"Found {len(nodes)} results for query with id {ids}")310
311return VectorStoreQueryResult(nodes=nodes, ids=ids, similarities=scores)312
313def persist(314self,315persist_path: str,316fs: Optional[fsspec.AbstractFileSystem] = None,317in_background: bool = True,318) -> None:319"""Persist the vector store to disk.320
321Args:
322persist_path (str): Path to persist the vector store to. (doesn't apply)
323in_background (bool, optional): Persist in background. Defaults to True.
324fs (fsspec.AbstractFileSystem, optional): Filesystem to persist to.
325(doesn't apply)
326
327Raises:
328redis.exceptions.RedisError: If there is an error
329persisting the index to disk.
330"""
331from redis.exceptions import RedisError332
333try:334if in_background:335_logger.info("Saving index to disk in background")336self._redis_client.bgsave()337else:338_logger.info("Saving index to disk")339self._redis_client.save()340
341except RedisError as e:342_logger.error(f"Error saving index to disk: {e}")343raise344
345def _create_index(self) -> None:346# should never be called outside class and hence should not raise importerror347from redis.commands.search.field import TagField, TextField348from redis.commands.search.indexDefinition import IndexDefinition, IndexType349
350# Create Index351default_fields = [352TextField("text", weight=1.0),353TagField("doc_id", sortable=False),354TagField("id", sortable=False),355]356# add vector field to list of index fields. Create lazily to allow user357# to specify index and search attributes in creation.358
359fields = [360*default_fields,361self._create_vector_field(self._vector_field, **self._index_args),362]363
364# add metadata fields to list of index fields or we won't be able to search them365for metadata_field in self._metadata_fields:366# TODO: allow addition of text fields as metadata367# TODO: make sure we're preventing overwriting other keys (e.g. text,368# doc_id, id, and other vector fields)369fields.append(TagField(metadata_field, sortable=False))370
371_logger.info(f"Creating index {self._index_name}")372self._redis_client.ft(self._index_name).create_index(373fields=fields,374definition=IndexDefinition(375prefix=[self._prefix], index_type=IndexType.HASH376), # TODO support JSON377)378
379def _index_exists(self) -> bool:380# use FT._LIST to check if index exists381indices = convert_bytes(self._redis_client.execute_command("FT._LIST"))382return self._index_name in indices383
384def _create_vector_field(385self,386name: str,387dims: int = 1536,388algorithm: str = "FLAT",389datatype: str = "FLOAT32",390distance_metric: str = "COSINE",391initial_cap: int = 20000,392block_size: int = 1000,393m: int = 16,394ef_construction: int = 200,395ef_runtime: int = 10,396epsilon: float = 0.8,397**kwargs: Any,398) -> "VectorField":399"""Create a RediSearch VectorField.400
401Args:
402name (str): The name of the field.
403algorithm (str): The algorithm used to index the vector.
404dims (int): The dimensionality of the vector.
405datatype (str): The type of the vector. default: FLOAT32
406distance_metric (str): The distance metric used to compare vectors.
407initial_cap (int): The initial capacity of the index.
408block_size (int): The block size of the index.
409m (int): The number of outgoing edges in the HNSW graph.
410ef_construction (int): Number of maximum allowed potential outgoing edges
411candidates for each node in the graph,
412during the graph building.
413ef_runtime (int): The umber of maximum top candidates to hold during the
414KNN search
415
416Returns:
417A RediSearch VectorField.
418"""
419from redis import DataError420from redis.commands.search.field import VectorField421
422try:423if algorithm.upper() == "HNSW":424return VectorField(425name,426"HNSW",427{428"TYPE": datatype.upper(),429"DIM": dims,430"DISTANCE_METRIC": distance_metric.upper(),431"INITIAL_CAP": initial_cap,432"M": m,433"EF_CONSTRUCTION": ef_construction,434"EF_RUNTIME": ef_runtime,435"EPSILON": epsilon,436},437)438else:439return VectorField(440name,441"FLAT",442{443"TYPE": datatype.upper(),444"DIM": dims,445"DISTANCE_METRIC": distance_metric.upper(),446"INITIAL_CAP": initial_cap,447"BLOCK_SIZE": block_size,448},449)450except DataError as e:451raise ValueError(452f"Failed to create Redis index vector field with error: {e}"453)454
455
456# currently only supports exact tag match - {} denotes a tag
457# must create the index with the correct metadata field before using a field as a
458# filter, or it will return no results
459def _to_redis_filters(metadata_filters: MetadataFilters) -> str:460tokenizer = TokenEscaper()461
462filter_strings = []463for filter in metadata_filters.legacy_filters():464# adds quotes around the value to ensure that the filter is treated as an465# exact match466filter_string = f"@{filter.key}:{{{tokenizer.escape(str(filter.value))}}}"467filter_strings.append(filter_string)468
469joined_filter_strings = " & ".join(filter_strings)470return f"({joined_filter_strings})"471