llama-index
171 строка · 6.1 Кб
1"""Pathway Retriever."""
2
3import logging4from typing import Any, Callable, List, Optional, Tuple, Union5
6from llama_index.legacy.callbacks.base import CallbackManager7from llama_index.legacy.constants import DEFAULT_SIMILARITY_TOP_K8from llama_index.legacy.core.base_retriever import BaseRetriever9from llama_index.legacy.embeddings import BaseEmbedding10from llama_index.legacy.indices.query.schema import QueryBundle11from llama_index.legacy.ingestion.pipeline import run_transformations12from llama_index.legacy.schema import (13BaseNode,14NodeWithScore,15QueryBundle,16TextNode,17TransformComponent,18)
19
20logger = logging.getLogger(__name__)21
22
23def node_transformer(x: str) -> List[BaseNode]:24return [TextNode(text=x)]25
26
27def node_to_pathway(x: BaseNode) -> List[Tuple[str, dict]]:28return [(node.text, node.extra_info) for node in x]29
30
31class PathwayVectorServer:32"""33Build an autoupdating document indexing pipeline
34for approximate nearest neighbor search.
35
36Args:
37docs (list): Pathway tables, may be pw.io connectors or custom tables.
38
39transformations (List[TransformComponent]): list of transformation steps, has to
40include embedding as last step, optionally splitter and other
41TransformComponent in the middle
42
43parser (Callable[[bytes], list[tuple[str, dict]]]): optional, callable that
44parses file contents into a list of documents. If None, defaults to `uft-8` decoding of the file contents. Defaults to None.
45"""
46
47def __init__(48self,49*docs: Any,50transformations: List[Union[TransformComponent, Callable[[Any], Any]]],51parser: Optional[Callable[[bytes], List[Tuple[str, dict]]]] = None,52**kwargs: Any,53) -> None:54try:55from pathway.xpacks.llm import vector_store56except ImportError:57raise ImportError(58"Could not import pathway python package. "59"Please install it with `pip install pathway`."60)61
62if transformations is None or not transformations:63raise ValueError("Transformations list cannot be None or empty.")64
65if not isinstance(transformations[-1], BaseEmbedding):66raise ValueError(67f"Last step of transformations should be an instance of {BaseEmbedding.__name__}, "68f"found {type(transformations[-1])}."69)70
71embedder: BaseEmbedding = transformations.pop() # type: ignore72
73def embedding_callable(x: str) -> List[float]:74return embedder.get_text_embedding(x)75
76transformations.insert(0, node_transformer)77transformations.append(node_to_pathway) # TextNode -> (str, dict)78
79def generic_transformer(x: List[str]) -> List[Tuple[str, dict]]:80return run_transformations(x, transformations) # type: ignore81
82self.vector_store_server = vector_store.VectorStoreServer(83*docs,84embedder=embedding_callable,85parser=parser,86splitter=generic_transformer,87**kwargs,88)89
90def run_server(91self,92host: str,93port: str,94threaded: bool = False,95with_cache: bool = True,96cache_backend: Any = None,97) -> Any:98"""99Run the server and start answering queries.
100
101Args:
102host (str): host to bind the HTTP listener
103port (str | int): port to bind the HTTP listener
104threaded (bool): if True, run in a thread. Else block computation
105with_cache (bool): if True, embedding requests for the same contents are cached
106cache_backend: the backend to use for caching if it is enabled. The
107default is the disk cache, hosted locally in the folder ``./Cache``. You
108can use ``Backend`` class of the [`persistence API`]
109(/developers/api-docs/persistence-api/#pathway.persistence.Backend)
110to override it.
111
112Returns:
113If threaded, return the Thread object. Else, does not return.
114"""
115try:116import pathway as pw117except ImportError:118raise ImportError(119"Could not import pathway python package. "120"Please install it with `pip install pathway`."121)122if with_cache and cache_backend is None:123cache_backend = pw.persistence.Backend.filesystem("./Cache")124return self.vector_store_server.run_server(125host,126port,127threaded=threaded,128with_cache=with_cache,129cache_backend=cache_backend,130)131
132
133class PathwayRetriever(BaseRetriever):134"""Pathway retriever.135Pathway is an open data processing framework.
136It allows you to easily develop data transformation pipelines
137that work with live data sources and changing data.
138
139This is the client that implements Retriever API for PathwayVectorServer.
140"""
141
142def __init__(143self,144host: str,145port: Union[str, int],146similarity_top_k: int = DEFAULT_SIMILARITY_TOP_K,147callback_manager: Optional[CallbackManager] = None,148) -> None:149"""Initializing the Pathway retriever client."""150import_err_msg = "`pathway` package not found, please run `pip install pathway`"151try:152from pathway.xpacks.llm.vector_store import VectorStoreClient153except ImportError:154raise ImportError(import_err_msg)155self.client = VectorStoreClient(host, port)156self.similarity_top_k = similarity_top_k157super().__init__(callback_manager)158
159def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:160"""Retrieve."""161rets = self.client(query=query_bundle.query_str, k=self.similarity_top_k)162items = [163NodeWithScore(164node=TextNode(text=ret["text"], extra_info=ret["metadata"]),165# Transform cosine distance into a similairty score166# (higher is more similar)167score=1 - ret["dist"],168)169for ret in rets170]171return sorted(items, key=lambda x: x.score or 0.0, reverse=True)172