llama-index

Форк
0
185 строк · 5.9 Кб
1
"""Simple graph store index."""
2

3
import logging
4
from typing import Any, Dict, List, Optional
5

6
from llama_index.legacy.graph_stores.types import GraphStore
7

8
logger = logging.getLogger(__name__)
9

10

11
class FalkorDBGraphStore(GraphStore):
12
    """FalkorDB Graph Store.
13

14
    In this graph store, triplets are stored within FalkorDB.
15

16
    Args:
17
        simple_graph_store_data_dict (Optional[dict]): data dict
18
            containing the triplets. See FalkorDBGraphStoreData
19
            for more details.
20
    """
21

22
    def __init__(
23
        self,
24
        url: str,
25
        database: str = "falkor",
26
        node_label: str = "Entity",
27
        **kwargs: Any,
28
    ) -> None:
29
        try:
30
            import redis
31
        except ImportError:
32
            raise ImportError("Please install redis client: pip install redis")
33

34
        """Initialize params."""
35
        self._node_label = node_label
36

37
        self._driver = redis.Redis.from_url(url).graph(database)
38
        self._driver.query(f"CREATE INDEX FOR (n:`{self._node_label}`) ON (n.id)")
39

40
        self._database = database
41

42
        self.schema = ""
43
        self.get_query = f"""
44
            MATCH (n1:`{self._node_label}`)-[r]->(n2:`{self._node_label}`)
45
            WHERE n1.id = $subj RETURN type(r), n2.id
46
        """
47

48
    @property
49
    def client(self) -> None:
50
        return self._driver
51

52
    def get(self, subj: str) -> List[List[str]]:
53
        """Get triplets."""
54
        result = self._driver.query(
55
            self.get_query, params={"subj": subj}, read_only=True
56
        )
57
        return result.result_set
58

59
    def get_rel_map(
60
        self, subjs: Optional[List[str]] = None, depth: int = 2, limit: int = 30
61
    ) -> Dict[str, List[List[str]]]:
62
        """Get flat rel map."""
63
        # The flat means for multi-hop relation path, we could get
64
        # knowledge like: subj -> rel -> obj -> rel -> obj -> rel -> obj.
65
        # This type of knowledge is useful for some tasks.
66
        # +-------------+------------------------------------+
67
        # | subj        | flattened_rels                     |
68
        # +-------------+------------------------------------+
69
        # | "player101" | [95, "player125", 2002, "team204"] |
70
        # | "player100" | [1997, "team204"]                  |
71
        # ...
72
        # +-------------+------------------------------------+
73

74
        rel_map: Dict[Any, List[Any]] = {}
75
        if subjs is None or len(subjs) == 0:
76
            # unlike simple graph_store, we don't do get_all here
77
            return rel_map
78

79
        query = f"""
80
            MATCH (n1:{self._node_label})
81
            WHERE n1.id IN $subjs
82
            WITH n1
83
            MATCH p=(n1)-[e*1..{depth}]->(z)
84
            RETURN p LIMIT {limit}
85
        """
86

87
        data = self.query(query, params={"subjs": subjs})
88
        if not data:
89
            return rel_map
90

91
        for record in data:
92
            nodes = record[0].nodes()
93
            edges = record[0].edges()
94

95
            subj_id = nodes[0].properties["id"]
96
            path = []
97
            for i, edge in enumerate(edges):
98
                dest = nodes[i + 1]
99
                dest_id = dest.properties["id"]
100
                path.append(edge.relation)
101
                path.append(dest_id)
102

103
            paths = rel_map[subj_id] if subj_id in rel_map else []
104
            paths.append(path)
105
            rel_map[subj_id] = paths
106

107
        return rel_map
108

109
    def upsert_triplet(self, subj: str, rel: str, obj: str) -> None:
110
        """Add triplet."""
111
        query = """
112
            MERGE (n1:`%s` {id:$subj})
113
            MERGE (n2:`%s` {id:$obj})
114
            MERGE (n1)-[:`%s`]->(n2)
115
        """
116

117
        prepared_statement = query % (
118
            self._node_label,
119
            self._node_label,
120
            rel.replace(" ", "_").upper(),
121
        )
122

123
        # Call FalkorDB with prepared statement
124
        self._driver.query(prepared_statement, params={"subj": subj, "obj": obj})
125

126
    def delete(self, subj: str, rel: str, obj: str) -> None:
127
        """Delete triplet."""
128

129
        def delete_rel(subj: str, obj: str, rel: str) -> None:
130
            rel = rel.replace(" ", "_").upper()
131
            query = f"""
132
                MATCH (n1:`{self._node_label}`)-[r:`{rel}`]->(n2:`{self._node_label}`)
133
                WHERE n1.id = $subj AND n2.id = $obj DELETE r
134
            """
135

136
            # Call FalkorDB with prepared statement
137
            self._driver.query(query, params={"subj": subj, "obj": obj})
138

139
        def delete_entity(entity: str) -> None:
140
            query = f"MATCH (n:`{self._node_label}`) WHERE n.id = $entity DELETE n"
141

142
            # Call FalkorDB with prepared statement
143
            self._driver.query(query, params={"entity": entity})
144

145
        def check_edges(entity: str) -> bool:
146
            query = f"""
147
                MATCH (n1:`{self._node_label}`)--()
148
                WHERE n1.id = $entity RETURN count(*)
149
            """
150

151
            # Call FalkorDB with prepared statement
152
            result = self._driver.query(
153
                query, params={"entity": entity}, read_only=True
154
            )
155
            return bool(result.result_set)
156

157
        delete_rel(subj, obj, rel)
158
        if not check_edges(subj):
159
            delete_entity(subj)
160
        if not check_edges(obj):
161
            delete_entity(obj)
162

163
    def refresh_schema(self) -> None:
164
        """
165
        Refreshes the FalkorDB graph schema information.
166
        """
167
        node_properties = self.query("CALL DB.PROPERTYKEYS()")
168
        relationships = self.query("CALL DB.RELATIONSHIPTYPES()")
169

170
        self.schema = f"""
171
        Properties: {node_properties}
172
        Relationships: {relationships}
173
        """
174

175
    def get_schema(self, refresh: bool = False) -> str:
176
        """Get the schema of the FalkorDBGraph store."""
177
        if self.schema and not refresh:
178
            return self.schema
179
        self.refresh_schema()
180
        logger.debug(f"get_schema() schema:\n{self.schema}")
181
        return self.schema
182

183
    def query(self, query: str, params: Optional[Dict[str, Any]] = None) -> Any:
184
        result = self._driver.query(query, params=params)
185
        return result.result_set
186

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.