llama-index

Форк
0
677 строк · 25.4 Кб
1
"""NebulaGraph graph store index."""
2

3
import logging
4
import os
5
from string import Template
6
from typing import Any, Dict, List, Optional
7

8
from tenacity import retry, stop_after_attempt, wait_random_exponential
9

10
from llama_index.legacy.graph_stores.types import GraphStore
11

12
QUOTE = '"'
13
RETRY_TIMES = 3
14
WAIT_MIN_SECONDS = 0.5
15
WAIT_MAX_SECONDS = 10
16

17
logger = logging.getLogger(__name__)
18

19

20
rel_query_sample_edge = Template(
21
    """
22
MATCH ()-[e:`$edge_type`]->()
23
RETURN [src(e), dst(e)] AS sample_edge LIMIT 1
24
"""
25
)
26

27
rel_query_edge_type = Template(
28
    """
29
MATCH (m)-[:`$edge_type`]->(n)
30
  WHERE id(m) == $quote$src_id$quote AND id(n) == $quote$dst_id$quote
31
RETURN "(:" + tags(m)[0] + ")-[:$edge_type]->(:" + tags(n)[0] + ")" AS rels
32
"""
33
)
34

35

36
def hash_string_to_rank(string: str) -> int:
37
    # get signed 64-bit hash value
38
    signed_hash = hash(string)
39

40
    # reduce the hash value to a 64-bit range
41
    mask = (1 << 64) - 1
42
    signed_hash &= mask
43

44
    # convert the signed hash value to an unsigned 64-bit integer
45
    if signed_hash & (1 << 63):
46
        unsigned_hash = -((signed_hash ^ mask) + 1)
47
    else:
48
        unsigned_hash = signed_hash
49

50
    return unsigned_hash
51

52

53
def prepare_subjs_param(
54
    subjs: Optional[List[str]], vid_type: str = "FIXED_STRING(256)"
55
) -> Dict:
56
    """Prepare parameters for query."""
57
    if subjs is None:
58
        return {}
59
    from nebula3.common import ttypes
60

61
    subjs_list = []
62
    subjs_byte = ttypes.Value()
63

64
    # filter non-digit string for INT64 vid type
65
    if vid_type == "INT64":
66
        subjs = [subj for subj in subjs if subj.isdigit()]
67
        if len(subjs) == 0:
68
            logger.warning(
69
                f"KG is with INT64 vid type, but no digit string is provided."
70
                f"Return empty subjs, and no query will be executed."
71
                f"subjs: {subjs}"
72
            )
73
            return {}
74
    for subj in subjs:
75
        if not isinstance(subj, str):
76
            raise TypeError(f"Subject should be str, but got {type(subj).__name__}.")
77
        subj_byte = ttypes.Value()
78
        if vid_type == "INT64":
79
            assert subj.isdigit(), (
80
                "Subject should be a digit string in current "
81
                "graph store, where vid type is INT64."
82
            )
83
            subj_byte.set_iVal(int(subj))
84
        else:
85
            subj_byte.set_sVal(subj)
86
        subjs_list.append(subj_byte)
87
    subjs_nlist = ttypes.NList(values=subjs_list)
88
    subjs_byte.set_lVal(subjs_nlist)
89
    return {"subjs": subjs_byte}
90

91

92
def escape_str(value: str) -> str:
93
    """Escape String for NebulaGraph Query."""
94
    patterns = {
95
        '"': " ",
96
    }
97
    for pattern in patterns:
98
        if pattern in value:
99
            value = value.replace(pattern, patterns[pattern])
100
    if value[0] == " " or value[-1] == " ":
101
        value = value.strip()
102

103
    return value
104

105

106
class NebulaGraphStore(GraphStore):
107
    """NebulaGraph graph store."""
108

109
    def __init__(
110
        self,
111
        session_pool: Optional[Any] = None,
112
        space_name: Optional[str] = None,
113
        edge_types: Optional[List[str]] = ["relationship"],
114
        rel_prop_names: Optional[List[str]] = ["relationship,"],
115
        tags: Optional[List[str]] = ["entity"],
116
        tag_prop_names: Optional[List[str]] = ["name,"],
117
        include_vid: bool = True,
118
        session_pool_kwargs: Optional[Dict[str, Any]] = {},
119
        **kwargs: Any,
120
    ) -> None:
121
        """Initialize NebulaGraph graph store.
122

123
        Args:
124
            session_pool: NebulaGraph session pool.
125
            space_name: NebulaGraph space name.
126
            edge_types: Edge types.
127
            rel_prop_names: Relation property names corresponding to edge types.
128
            tags: Tags.
129
            tag_prop_names: Tag property names corresponding to tags.
130
            session_pool_kwargs: Keyword arguments for NebulaGraph session pool.
131
            **kwargs: Keyword arguments.
132
        """
133
        try:
134
            import nebula3  # noqa
135
        except ImportError:
136
            raise ImportError(
137
                "Please install NebulaGraph Python client first: "
138
                "`pip install nebula3-python`"
139
            )
140
        assert space_name is not None, "space_name should be provided."
141
        self._space_name = space_name
142
        self._session_pool_kwargs = session_pool_kwargs
143

144
        self._session_pool: Any = session_pool
145
        if self._session_pool is None:
146
            self.init_session_pool()
147

148
        self._vid_type = self._get_vid_type()
149

150
        self._tags = tags or ["entity"]
151
        self._edge_types = edge_types or ["rel"]
152
        self._rel_prop_names = rel_prop_names or ["predicate,"]
153
        if len(self._edge_types) != len(self._rel_prop_names):
154
            raise ValueError(
155
                "edge_types and rel_prop_names to define relation and relation name"
156
                "should be provided, yet with same length."
157
            )
158
        if len(self._edge_types) == 0:
159
            raise ValueError("Length of `edge_types` should be greater than 0.")
160

161
        if tag_prop_names is None or len(self._tags) != len(tag_prop_names):
162
            raise ValueError(
163
                "tag_prop_names to define tag and tag property name should be "
164
                "provided, yet with same length."
165
            )
166

167
        if len(self._tags) == 0:
168
            raise ValueError("Length of `tags` should be greater than 0.")
169

170
        # for building query
171
        self._edge_dot_rel = [
172
            f"`{edge_type}`.`{rel_prop_name}`"
173
            for edge_type, rel_prop_name in zip(self._edge_types, self._rel_prop_names)
174
        ]
175

176
        self._edge_prop_map = {}
177
        for edge_type, rel_prop_name in zip(self._edge_types, self._rel_prop_names):
178
            self._edge_prop_map[edge_type] = [
179
                prop.strip() for prop in rel_prop_name.split(",")
180
            ]
181

182
        # cypher string like: map{`follow`: "degree", `serve`: "start_year,end_year"}
183
        self._edge_prop_map_cypher_string = (
184
            "map{"
185
            + ", ".join(
186
                [
187
                    f"`{edge_type}`: \"{','.join(rel_prop_names)}\""
188
                    for edge_type, rel_prop_names in self._edge_prop_map.items()
189
                ]
190
            )
191
            + "}"
192
        )
193

194
        # build tag_prop_names map
195
        self._tag_prop_names_map = {}
196
        for tag, prop_names in zip(self._tags, tag_prop_names or []):
197
            if prop_names is not None:
198
                self._tag_prop_names_map[tag] = f"`{tag}`.`{prop_names}`"
199
        self._tag_prop_names: List[str] = list(
200
            {
201
                prop_name.strip()
202
                for prop_names in tag_prop_names or []
203
                if prop_names is not None
204
                for prop_name in prop_names.split(",")
205
            }
206
        )
207

208
        self._include_vid = include_vid
209

210
    def init_session_pool(self) -> Any:
211
        """Return NebulaGraph session pool."""
212
        from nebula3.Config import SessionPoolConfig
213
        from nebula3.gclient.net.SessionPool import SessionPool
214

215
        # ensure "NEBULA_USER", "NEBULA_PASSWORD", "NEBULA_ADDRESS" are set
216
        # in environment variables
217
        if not all(
218
            key in os.environ
219
            for key in ["NEBULA_USER", "NEBULA_PASSWORD", "NEBULA_ADDRESS"]
220
        ):
221
            raise ValueError(
222
                "NEBULA_USER, NEBULA_PASSWORD, NEBULA_ADDRESS should be set in "
223
                "environment variables when NebulaGraph Session Pool is not "
224
                "directly passed."
225
            )
226
        graphd_host, graphd_port = os.environ["NEBULA_ADDRESS"].split(":")
227
        session_pool = SessionPool(
228
            os.environ["NEBULA_USER"],
229
            os.environ["NEBULA_PASSWORD"],
230
            self._space_name,
231
            [(graphd_host, int(graphd_port))],
232
        )
233

234
        seesion_pool_config = SessionPoolConfig()
235
        session_pool.init(seesion_pool_config)
236
        self._session_pool = session_pool
237
        return self._session_pool
238

239
    def _get_vid_type(self) -> str:
240
        """Get vid type."""
241
        return (
242
            self.execute(f"DESCRIBE SPACE {self._space_name}")
243
            .column_values("Vid Type")[0]
244
            .cast()
245
        )
246

247
    def __del__(self) -> None:
248
        """Close NebulaGraph session pool."""
249
        self._session_pool.close()
250

251
    @retry(
252
        wait=wait_random_exponential(min=WAIT_MIN_SECONDS, max=WAIT_MAX_SECONDS),
253
        stop=stop_after_attempt(RETRY_TIMES),
254
    )
255
    def execute(self, query: str, param_map: Optional[Dict[str, Any]] = {}) -> Any:
256
        """Execute query.
257

258
        Args:
259
            query: Query.
260
            param_map: Parameter map.
261

262
        Returns:
263
            Query result.
264
        """
265
        from nebula3.Exception import IOErrorException
266
        from nebula3.fbthrift.transport.TTransport import TTransportException
267

268
        # Clean the query string by removing triple backticks
269
        query = query.replace("```", "").strip()
270

271
        try:
272
            result = self._session_pool.execute_parameter(query, param_map)
273
            if result is None:
274
                raise ValueError(f"Query failed. Query: {query}, Param: {param_map}")
275
            if not result.is_succeeded():
276
                raise ValueError(
277
                    f"Query failed. Query: {query}, Param: {param_map}"
278
                    f"Error message: {result.error_msg()}"
279
                )
280
            return result
281
        except (TTransportException, IOErrorException, RuntimeError) as e:
282
            logger.error(
283
                f"Connection issue, try to recreate session pool. Query: {query}, "
284
                f"Param: {param_map}"
285
                f"Error: {e}"
286
            )
287
            self.init_session_pool()
288
            logger.info(
289
                f"Session pool recreated. Query: {query}, Param: {param_map}"
290
                f"This was due to error: {e}, and now retrying."
291
            )
292
            raise
293

294
        except ValueError as e:
295
            # query failed on db side
296
            logger.error(
297
                f"Query failed. Query: {query}, Param: {param_map}"
298
                f"Error message: {e}"
299
            )
300
            raise
301
        except Exception as e:
302
            # other exceptions
303
            logger.error(
304
                f"Query failed. Query: {query}, Param: {param_map}"
305
                f"Error message: {e}"
306
            )
307
            raise
308

309
    @classmethod
310
    def from_dict(cls, config_dict: Dict[str, Any]) -> "GraphStore":
311
        """Initialize graph store from configuration dictionary.
312

313
        Args:
314
            config_dict: Configuration dictionary.
315

316
        Returns:
317
            Graph store.
318
        """
319
        return cls(**config_dict)
320

321
    @property
322
    def client(self) -> Any:
323
        """Return NebulaGraph session pool."""
324
        return self._session_pool
325

326
    @property
327
    def config_dict(self) -> dict:
328
        """Return configuration dictionary."""
329
        return {
330
            "session_pool": self._session_pool,
331
            "space_name": self._space_name,
332
            "edge_types": self._edge_types,
333
            "rel_prop_names": self._rel_prop_names,
334
            "session_pool_kwargs": self._session_pool_kwargs,
335
        }
336

337
    def get(self, subj: str) -> List[List[str]]:
338
        """Get triplets.
339

340
        Args:
341
            subj: Subject.
342

343
        Returns:
344
            Triplets.
345
        """
346
        rel_map = self.get_flat_rel_map([subj], depth=1)
347
        rels = list(rel_map.values())
348
        if len(rels) == 0:
349
            return []
350
        return rels[0]
351

352
    def get_flat_rel_map(
353
        self, subjs: Optional[List[str]] = None, depth: int = 2, limit: int = 30
354
    ) -> Dict[str, List[List[str]]]:
355
        """Get flat rel map."""
356
        # The flat means for multi-hop relation path, we could get
357
        # knowledge like: subj -rel-> obj -rel-> obj <-rel- obj.
358
        # This type of knowledge is useful for some tasks.
359
        # +---------------------+---------------------------------------------...-----+
360
        # | subj                | flattened_rels                              ...     |
361
        # +---------------------+---------------------------------------------...-----+
362
        # | "{name:Tony Parker}"| "{name: Tony Parker}-[follow:{degree:95}]-> ...ili}"|
363
        # | "{name:Tony Parker}"| "{name: Tony Parker}-[follow:{degree:95}]-> ...r}"  |
364
        # ...
365
        rel_map: Dict[Any, List[Any]] = {}
366
        if subjs is None or len(subjs) == 0:
367
            # unlike simple graph_store, we don't do get_all here
368
            return rel_map
369

370
        # WITH map{`true`: "-[", `false`: "<-["} AS arrow_l,
371
        #      map{`true`: "]->", `false`: "]-"} AS arrow_r,
372
        #      map{`follow`: "degree", `serve`: "start_year,end_year"} AS edge_type_map
373
        # MATCH p=(start)-[e:follow|serve*..2]-()
374
        #     WHERE id(start) IN ["player100", "player101"]
375
        #   WITH start, id(start) AS vid, nodes(p) AS nodes, e AS rels,
376
        #     length(p) AS rel_count, arrow_l, arrow_r, edge_type_map
377
        #   WITH
378
        #     REDUCE(s = vid + '{', key IN [key_ in ["name"]
379
        #       WHERE properties(start)[key_] IS NOT NULL]  | s + key + ': ' +
380
        #         COALESCE(TOSTRING(properties(start)[key]), 'null') + ', ')
381
        #         + '}'
382
        #       AS subj,
383
        #     [item in [i IN RANGE(0, rel_count - 1) | [nodes[i], nodes[i + 1],
384
        #         rels[i], typeid(rels[i]) > 0, type(rels[i]) ]] | [
385
        #      arrow_l[tostring(item[3])] +
386
        #          item[4] + ':' +
387
        #          REDUCE(s = '{', key IN SPLIT(edge_type_map[item[4]], ',') |
388
        #            s + key + ': ' + COALESCE(TOSTRING(properties(item[2])[key]),
389
        #            'null') + ', ') + '}'
390
        #           +
391
        #      arrow_r[tostring(item[3])],
392
        #      REDUCE(s = id(item[1]) + '{', key IN [key_ in ["name"]
393
        #           WHERE properties(item[1])[key_] IS NOT NULL]  | s + key + ': ' +
394
        #           COALESCE(TOSTRING(properties(item[1])[key]), 'null') + ', ') + '}'
395
        #      ]
396
        #   ] AS rels
397
        #   WITH
398
        #       REPLACE(subj, ', }', '}') AS subj,
399
        #       REDUCE(acc = collect(NULL), l in rels | acc + l) AS flattened_rels
400
        #   RETURN
401
        #     subj,
402
        #     REPLACE(REDUCE(acc = subj,l in flattened_rels|acc + ' ' + l),
403
        #       ', }', '}')
404
        #       AS flattened_rels
405
        #   LIMIT 30
406

407
        # Based on self._include_vid
408
        # {name: Tim Duncan} or player100{name: Tim Duncan} for entity
409
        s_prefix = "vid + '{'" if self._include_vid else "'{'"
410
        s1 = "id(item[1]) + '{'" if self._include_vid else "'{'"
411

412
        query = (
413
            f"WITH map{{`true`: '-[', `false`: '<-['}} AS arrow_l,"
414
            f"     map{{`true`: ']->', `false`: ']-'}} AS arrow_r,"
415
            f"     {self._edge_prop_map_cypher_string} AS edge_type_map "
416
            f"MATCH p=(start)-[e:`{'`|`'.join(self._edge_types)}`*..{depth}]-() "
417
            f"  WHERE id(start) IN $subjs "
418
            f"WITH start, id(start) AS vid, nodes(p) AS nodes, e AS rels,"
419
            f"  length(p) AS rel_count, arrow_l, arrow_r, edge_type_map "
420
            f"WITH "
421
            f"  REDUCE(s = {s_prefix}, key IN [key_ in {self._tag_prop_names!s} "
422
            f"    WHERE properties(start)[key_] IS NOT NULL]  | s + key + ': ' + "
423
            f"      COALESCE(TOSTRING(properties(start)[key]), 'null') + ', ')"
424
            f"      + '}}'"
425
            f"    AS subj,"
426
            f"  [item in [i IN RANGE(0, rel_count - 1)|[nodes[i], nodes[i + 1],"
427
            f"      rels[i], typeid(rels[i]) > 0, type(rels[i]) ]] | ["
428
            f"    arrow_l[tostring(item[3])] +"
429
            f"      item[4] + ':' +"
430
            f"      REDUCE(s = '{{', key IN SPLIT(edge_type_map[item[4]], ',') | "
431
            f"        s + key + ': ' + COALESCE(TOSTRING(properties(item[2])[key]),"
432
            f"        'null') + ', ') + '}}'"
433
            f"      +"
434
            f"    arrow_r[tostring(item[3])],"
435
            f"    REDUCE(s = {s1}, key IN [key_ in "
436
            f"        {self._tag_prop_names!s} WHERE properties(item[1])[key_] "
437
            f"        IS NOT NULL]  | s + key + ': ' + "
438
            f"        COALESCE(TOSTRING(properties(item[1])[key]), 'null') + ', ')"
439
            f"        + '}}'"
440
            f"    ]"
441
            f"  ] AS rels "
442
            f"WITH "
443
            f"  REPLACE(subj, ', }}', '}}') AS subj,"
444
            f"  REDUCE(acc = collect(NULL), l in rels | acc + l) AS flattened_rels "
445
            f"RETURN "
446
            f"  subj,"
447
            f"  REPLACE(REDUCE(acc = subj, l in flattened_rels | acc + ' ' + l), "
448
            f"    ', }}', '}}') "
449
            f"    AS flattened_rels"
450
            f"  LIMIT {limit}"
451
        )
452
        subjs_param = prepare_subjs_param(subjs, self._vid_type)
453
        logger.debug(f"get_flat_rel_map()\nsubjs_param: {subjs},\nquery: {query}")
454
        if subjs_param == {}:
455
            # This happens when subjs is None after prepare_subjs_param()
456
            # Probably because vid type is INT64, but no digit string is provided.
457
            return rel_map
458
        result = self.execute(query, subjs_param)
459
        if result is None:
460
            return rel_map
461

462
        # get raw data
463
        subjs_ = result.column_values("subj") or []
464
        rels_ = result.column_values("flattened_rels") or []
465

466
        for subj, rel in zip(subjs_, rels_):
467
            subj_ = subj.cast()
468
            rel_ = rel.cast()
469
            if subj_ not in rel_map:
470
                rel_map[subj_] = []
471
            rel_map[subj_].append(rel_)
472
        return rel_map
473

474
    def get_rel_map(
475
        self, subjs: Optional[List[str]] = None, depth: int = 2, limit: int = 30
476
    ) -> Dict[str, List[List[str]]]:
477
        """Get rel map."""
478
        # We put rels in a long list for depth>= 1, this is different from
479
        # SimpleGraphStore.get_rel_map() though.
480
        # But this makes more sense for multi-hop relation path.
481

482
        if subjs is not None:
483
            subjs = [
484
                escape_str(subj) for subj in subjs if isinstance(subj, str) and subj
485
            ]
486
            if len(subjs) == 0:
487
                return {}
488

489
        return self.get_flat_rel_map(subjs, depth, limit)
490

491
    def upsert_triplet(self, subj: str, rel: str, obj: str) -> None:
492
        """Add triplet."""
493
        # Note, to enable leveraging existing knowledge graph,
494
        # the (triplet -- property graph) mapping
495
        #   makes (n:1) edge_type.prop_name --> triplet.rel
496
        # thus we have to assume rel to be the first edge_type.prop_name
497
        # here in upsert_triplet().
498
        # This applies to the type of entity(tags) with subject and object, too,
499
        # thus we have to assume subj to be the first entity.tag_name
500

501
        # lower case subj, rel, obj
502
        subj = escape_str(subj)
503
        rel = escape_str(rel)
504
        obj = escape_str(obj)
505
        if self._vid_type == "INT64":
506
            assert all(
507
                [subj.isdigit(), obj.isdigit()]
508
            ), "Subject and object should be digit strings in current graph store."
509
            subj_field = subj
510
            obj_field = obj
511
        else:
512
            subj_field = f"{QUOTE}{subj}{QUOTE}"
513
            obj_field = f"{QUOTE}{obj}{QUOTE}"
514
        edge_field = f"{subj_field}->{obj_field}"
515

516
        edge_type = self._edge_types[0]
517
        rel_prop_name = self._rel_prop_names[0]
518
        entity_type = self._tags[0]
519
        rel_hash = hash_string_to_rank(rel)
520
        dml_query = (
521
            f"INSERT VERTEX `{entity_type}`(name) "
522
            f"  VALUES {subj_field}:({QUOTE}{subj}{QUOTE});"
523
            f"INSERT VERTEX `{entity_type}`(name) "
524
            f"  VALUES {obj_field}:({QUOTE}{obj}{QUOTE});"
525
            f"INSERT EDGE `{edge_type}`(`{rel_prop_name}`) "
526
            f"  VALUES "
527
            f"{edge_field}"
528
            f"@{rel_hash}:({QUOTE}{rel}{QUOTE});"
529
        )
530
        logger.debug(f"upsert_triplet()\nDML query: {dml_query}")
531
        result = self.execute(dml_query)
532
        assert (
533
            result and result.is_succeeded()
534
        ), f"Failed to upsert triplet: {subj} {rel} {obj}, query: {dml_query}"
535

536
    def delete(self, subj: str, rel: str, obj: str) -> None:
537
        """Delete triplet.
538
        1. Similar to upsert_triplet(),
539
           we have to assume rel to be the first edge_type.prop_name.
540
        2. After edge being deleted, we need to check if the subj or
541
           obj are isolated vertices,
542
           if so, delete them, too.
543
        """
544
        # lower case subj, rel, obj
545
        subj = escape_str(subj)
546
        rel = escape_str(rel)
547
        obj = escape_str(obj)
548

549
        if self._vid_type == "INT64":
550
            assert all(
551
                [subj.isdigit(), obj.isdigit()]
552
            ), "Subject and object should be digit strings in current graph store."
553
            subj_field = subj
554
            obj_field = obj
555
        else:
556
            subj_field = f"{QUOTE}{subj}{QUOTE}"
557
            obj_field = f"{QUOTE}{obj}{QUOTE}"
558
        edge_field = f"{subj_field}->{obj_field}"
559

560
        # DELETE EDGE serve "player100" -> "team204"@7696463696635583936;
561
        edge_type = self._edge_types[0]
562
        # rel_prop_name = self._rel_prop_names[0]
563
        rel_hash = hash_string_to_rank(rel)
564
        dml_query = f"DELETE EDGE `{edge_type}`" f"  {edge_field}@{rel_hash};"
565
        logger.debug(f"delete()\nDML query: {dml_query}")
566
        result = self.execute(dml_query)
567
        assert (
568
            result and result.is_succeeded()
569
        ), f"Failed to delete triplet: {subj} {rel} {obj}, query: {dml_query}"
570
        # Get isolated vertices to be deleted
571
        # MATCH (s) WHERE id(s) IN ["player700"] AND NOT (s)-[]-()
572
        # RETURN id(s) AS isolated
573
        query = (
574
            f"MATCH (s) "
575
            f"  WHERE id(s) IN [{subj_field}, {obj_field}] "
576
            f"  AND NOT (s)-[]-() "
577
            f"RETURN id(s) AS isolated"
578
        )
579
        result = self.execute(query)
580
        isolated = result.column_values("isolated")
581
        if not isolated:
582
            return
583
        # DELETE VERTEX "player700" or DELETE VERTEX 700
584
        quote_field = QUOTE if self._vid_type != "INT64" else ""
585
        vertex_ids = ",".join(
586
            [f"{quote_field}{v.cast()}{quote_field}" for v in isolated]
587
        )
588
        dml_query = f"DELETE VERTEX {vertex_ids};"
589

590
        result = self.execute(dml_query)
591
        assert (
592
            result and result.is_succeeded()
593
        ), f"Failed to delete isolated vertices: {isolated}, query: {dml_query}"
594

595
    def refresh_schema(self) -> None:
596
        """
597
        Refreshes the NebulaGraph Store Schema.
598
        """
599
        tags_schema, edge_types_schema, relationships = [], [], []
600
        for tag in self.execute("SHOW TAGS").column_values("Name"):
601
            tag_name = tag.cast()
602
            tag_schema = {"tag": tag_name, "properties": []}
603
            r = self.execute(f"DESCRIBE TAG `{tag_name}`")
604
            props, types, comments = (
605
                r.column_values("Field"),
606
                r.column_values("Type"),
607
                r.column_values("Comment"),
608
            )
609
            for i in range(r.row_size()):
610
                # back compatible with old version of nebula-python
611
                property_defination = (
612
                    (props[i].cast(), types[i].cast())
613
                    if comments[i].is_empty()
614
                    else (props[i].cast(), types[i].cast(), comments[i].cast())
615
                )
616
                tag_schema["properties"].append(property_defination)
617
            tags_schema.append(tag_schema)
618
        for edge_type in self.execute("SHOW EDGES").column_values("Name"):
619
            edge_type_name = edge_type.cast()
620
            edge_schema = {"edge": edge_type_name, "properties": []}
621
            r = self.execute(f"DESCRIBE EDGE `{edge_type_name}`")
622
            props, types, comments = (
623
                r.column_values("Field"),
624
                r.column_values("Type"),
625
                r.column_values("Comment"),
626
            )
627
            for i in range(r.row_size()):
628
                # back compatible with old version of nebula-python
629
                property_defination = (
630
                    (props[i].cast(), types[i].cast())
631
                    if comments[i].is_empty()
632
                    else (props[i].cast(), types[i].cast(), comments[i].cast())
633
                )
634
                edge_schema["properties"].append(property_defination)
635
            edge_types_schema.append(edge_schema)
636

637
            # build relationships types
638
            sample_edge = self.execute(
639
                rel_query_sample_edge.substitute(edge_type=edge_type_name)
640
            ).column_values("sample_edge")
641
            if len(sample_edge) == 0:
642
                continue
643
            src_id, dst_id = sample_edge[0].cast()
644
            r = self.execute(
645
                rel_query_edge_type.substitute(
646
                    edge_type=edge_type_name,
647
                    src_id=src_id,
648
                    dst_id=dst_id,
649
                    quote="" if self._vid_type == "INT64" else QUOTE,
650
                )
651
            ).column_values("rels")
652
            if len(r) > 0:
653
                relationships.append(r[0].cast())
654

655
        self.schema = (
656
            f"Node properties: {tags_schema}\n"
657
            f"Edge properties: {edge_types_schema}\n"
658
            f"Relationships: {relationships}\n"
659
        )
660

661
    def get_schema(self, refresh: bool = False) -> str:
662
        """Get the schema of the NebulaGraph store."""
663
        if self.schema and not refresh:
664
            return self.schema
665
        self.refresh_schema()
666
        logger.debug(f"get_schema()\nschema: {self.schema}")
667
        return self.schema
668

669
    def query(self, query: str, param_map: Optional[Dict[str, Any]] = {}) -> Any:
670
        result = self.execute(query, param_map)
671
        columns = result.keys()
672
        d: Dict[str, list] = {}
673
        for col_num in range(result.col_size()):
674
            col_name = columns[col_num]
675
            col_list = result.column_values(col_name)
676
            d[col_name] = [x.cast() for x in col_list]
677
        return d
678

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.