llama-index
677 строк · 25.4 Кб
1"""NebulaGraph graph store index."""
2
3import logging
4import os
5from string import Template
6from typing import Any, Dict, List, Optional
7
8from tenacity import retry, stop_after_attempt, wait_random_exponential
9
10from llama_index.legacy.graph_stores.types import GraphStore
11
12QUOTE = '"'
13RETRY_TIMES = 3
14WAIT_MIN_SECONDS = 0.5
15WAIT_MAX_SECONDS = 10
16
17logger = logging.getLogger(__name__)
18
19
20rel_query_sample_edge = Template(
21"""
22MATCH ()-[e:`$edge_type`]->()
23RETURN [src(e), dst(e)] AS sample_edge LIMIT 1
24"""
25)
26
27rel_query_edge_type = Template(
28"""
29MATCH (m)-[:`$edge_type`]->(n)
30WHERE id(m) == $quote$src_id$quote AND id(n) == $quote$dst_id$quote
31RETURN "(:" + tags(m)[0] + ")-[:$edge_type]->(:" + tags(n)[0] + ")" AS rels
32"""
33)
34
35
36def hash_string_to_rank(string: str) -> int:
37# get signed 64-bit hash value
38signed_hash = hash(string)
39
40# reduce the hash value to a 64-bit range
41mask = (1 << 64) - 1
42signed_hash &= mask
43
44# convert the signed hash value to an unsigned 64-bit integer
45if signed_hash & (1 << 63):
46unsigned_hash = -((signed_hash ^ mask) + 1)
47else:
48unsigned_hash = signed_hash
49
50return unsigned_hash
51
52
53def prepare_subjs_param(
54subjs: Optional[List[str]], vid_type: str = "FIXED_STRING(256)"
55) -> Dict:
56"""Prepare parameters for query."""
57if subjs is None:
58return {}
59from nebula3.common import ttypes
60
61subjs_list = []
62subjs_byte = ttypes.Value()
63
64# filter non-digit string for INT64 vid type
65if vid_type == "INT64":
66subjs = [subj for subj in subjs if subj.isdigit()]
67if len(subjs) == 0:
68logger.warning(
69f"KG is with INT64 vid type, but no digit string is provided."
70f"Return empty subjs, and no query will be executed."
71f"subjs: {subjs}"
72)
73return {}
74for subj in subjs:
75if not isinstance(subj, str):
76raise TypeError(f"Subject should be str, but got {type(subj).__name__}.")
77subj_byte = ttypes.Value()
78if vid_type == "INT64":
79assert subj.isdigit(), (
80"Subject should be a digit string in current "
81"graph store, where vid type is INT64."
82)
83subj_byte.set_iVal(int(subj))
84else:
85subj_byte.set_sVal(subj)
86subjs_list.append(subj_byte)
87subjs_nlist = ttypes.NList(values=subjs_list)
88subjs_byte.set_lVal(subjs_nlist)
89return {"subjs": subjs_byte}
90
91
92def escape_str(value: str) -> str:
93"""Escape String for NebulaGraph Query."""
94patterns = {
95'"': " ",
96}
97for pattern in patterns:
98if pattern in value:
99value = value.replace(pattern, patterns[pattern])
100if value[0] == " " or value[-1] == " ":
101value = value.strip()
102
103return value
104
105
106class NebulaGraphStore(GraphStore):
107"""NebulaGraph graph store."""
108
109def __init__(
110self,
111session_pool: Optional[Any] = None,
112space_name: Optional[str] = None,
113edge_types: Optional[List[str]] = ["relationship"],
114rel_prop_names: Optional[List[str]] = ["relationship,"],
115tags: Optional[List[str]] = ["entity"],
116tag_prop_names: Optional[List[str]] = ["name,"],
117include_vid: bool = True,
118session_pool_kwargs: Optional[Dict[str, Any]] = {},
119**kwargs: Any,
120) -> None:
121"""Initialize NebulaGraph graph store.
122
123Args:
124session_pool: NebulaGraph session pool.
125space_name: NebulaGraph space name.
126edge_types: Edge types.
127rel_prop_names: Relation property names corresponding to edge types.
128tags: Tags.
129tag_prop_names: Tag property names corresponding to tags.
130session_pool_kwargs: Keyword arguments for NebulaGraph session pool.
131**kwargs: Keyword arguments.
132"""
133try:
134import nebula3 # noqa
135except ImportError:
136raise ImportError(
137"Please install NebulaGraph Python client first: "
138"`pip install nebula3-python`"
139)
140assert space_name is not None, "space_name should be provided."
141self._space_name = space_name
142self._session_pool_kwargs = session_pool_kwargs
143
144self._session_pool: Any = session_pool
145if self._session_pool is None:
146self.init_session_pool()
147
148self._vid_type = self._get_vid_type()
149
150self._tags = tags or ["entity"]
151self._edge_types = edge_types or ["rel"]
152self._rel_prop_names = rel_prop_names or ["predicate,"]
153if len(self._edge_types) != len(self._rel_prop_names):
154raise ValueError(
155"edge_types and rel_prop_names to define relation and relation name"
156"should be provided, yet with same length."
157)
158if len(self._edge_types) == 0:
159raise ValueError("Length of `edge_types` should be greater than 0.")
160
161if tag_prop_names is None or len(self._tags) != len(tag_prop_names):
162raise ValueError(
163"tag_prop_names to define tag and tag property name should be "
164"provided, yet with same length."
165)
166
167if len(self._tags) == 0:
168raise ValueError("Length of `tags` should be greater than 0.")
169
170# for building query
171self._edge_dot_rel = [
172f"`{edge_type}`.`{rel_prop_name}`"
173for edge_type, rel_prop_name in zip(self._edge_types, self._rel_prop_names)
174]
175
176self._edge_prop_map = {}
177for edge_type, rel_prop_name in zip(self._edge_types, self._rel_prop_names):
178self._edge_prop_map[edge_type] = [
179prop.strip() for prop in rel_prop_name.split(",")
180]
181
182# cypher string like: map{`follow`: "degree", `serve`: "start_year,end_year"}
183self._edge_prop_map_cypher_string = (
184"map{"
185+ ", ".join(
186[
187f"`{edge_type}`: \"{','.join(rel_prop_names)}\""
188for edge_type, rel_prop_names in self._edge_prop_map.items()
189]
190)
191+ "}"
192)
193
194# build tag_prop_names map
195self._tag_prop_names_map = {}
196for tag, prop_names in zip(self._tags, tag_prop_names or []):
197if prop_names is not None:
198self._tag_prop_names_map[tag] = f"`{tag}`.`{prop_names}`"
199self._tag_prop_names: List[str] = list(
200{
201prop_name.strip()
202for prop_names in tag_prop_names or []
203if prop_names is not None
204for prop_name in prop_names.split(",")
205}
206)
207
208self._include_vid = include_vid
209
210def init_session_pool(self) -> Any:
211"""Return NebulaGraph session pool."""
212from nebula3.Config import SessionPoolConfig
213from nebula3.gclient.net.SessionPool import SessionPool
214
215# ensure "NEBULA_USER", "NEBULA_PASSWORD", "NEBULA_ADDRESS" are set
216# in environment variables
217if not all(
218key in os.environ
219for key in ["NEBULA_USER", "NEBULA_PASSWORD", "NEBULA_ADDRESS"]
220):
221raise ValueError(
222"NEBULA_USER, NEBULA_PASSWORD, NEBULA_ADDRESS should be set in "
223"environment variables when NebulaGraph Session Pool is not "
224"directly passed."
225)
226graphd_host, graphd_port = os.environ["NEBULA_ADDRESS"].split(":")
227session_pool = SessionPool(
228os.environ["NEBULA_USER"],
229os.environ["NEBULA_PASSWORD"],
230self._space_name,
231[(graphd_host, int(graphd_port))],
232)
233
234seesion_pool_config = SessionPoolConfig()
235session_pool.init(seesion_pool_config)
236self._session_pool = session_pool
237return self._session_pool
238
239def _get_vid_type(self) -> str:
240"""Get vid type."""
241return (
242self.execute(f"DESCRIBE SPACE {self._space_name}")
243.column_values("Vid Type")[0]
244.cast()
245)
246
247def __del__(self) -> None:
248"""Close NebulaGraph session pool."""
249self._session_pool.close()
250
251@retry(
252wait=wait_random_exponential(min=WAIT_MIN_SECONDS, max=WAIT_MAX_SECONDS),
253stop=stop_after_attempt(RETRY_TIMES),
254)
255def execute(self, query: str, param_map: Optional[Dict[str, Any]] = {}) -> Any:
256"""Execute query.
257
258Args:
259query: Query.
260param_map: Parameter map.
261
262Returns:
263Query result.
264"""
265from nebula3.Exception import IOErrorException
266from nebula3.fbthrift.transport.TTransport import TTransportException
267
268# Clean the query string by removing triple backticks
269query = query.replace("```", "").strip()
270
271try:
272result = self._session_pool.execute_parameter(query, param_map)
273if result is None:
274raise ValueError(f"Query failed. Query: {query}, Param: {param_map}")
275if not result.is_succeeded():
276raise ValueError(
277f"Query failed. Query: {query}, Param: {param_map}"
278f"Error message: {result.error_msg()}"
279)
280return result
281except (TTransportException, IOErrorException, RuntimeError) as e:
282logger.error(
283f"Connection issue, try to recreate session pool. Query: {query}, "
284f"Param: {param_map}"
285f"Error: {e}"
286)
287self.init_session_pool()
288logger.info(
289f"Session pool recreated. Query: {query}, Param: {param_map}"
290f"This was due to error: {e}, and now retrying."
291)
292raise
293
294except ValueError as e:
295# query failed on db side
296logger.error(
297f"Query failed. Query: {query}, Param: {param_map}"
298f"Error message: {e}"
299)
300raise
301except Exception as e:
302# other exceptions
303logger.error(
304f"Query failed. Query: {query}, Param: {param_map}"
305f"Error message: {e}"
306)
307raise
308
309@classmethod
310def from_dict(cls, config_dict: Dict[str, Any]) -> "GraphStore":
311"""Initialize graph store from configuration dictionary.
312
313Args:
314config_dict: Configuration dictionary.
315
316Returns:
317Graph store.
318"""
319return cls(**config_dict)
320
321@property
322def client(self) -> Any:
323"""Return NebulaGraph session pool."""
324return self._session_pool
325
326@property
327def config_dict(self) -> dict:
328"""Return configuration dictionary."""
329return {
330"session_pool": self._session_pool,
331"space_name": self._space_name,
332"edge_types": self._edge_types,
333"rel_prop_names": self._rel_prop_names,
334"session_pool_kwargs": self._session_pool_kwargs,
335}
336
337def get(self, subj: str) -> List[List[str]]:
338"""Get triplets.
339
340Args:
341subj: Subject.
342
343Returns:
344Triplets.
345"""
346rel_map = self.get_flat_rel_map([subj], depth=1)
347rels = list(rel_map.values())
348if len(rels) == 0:
349return []
350return rels[0]
351
352def get_flat_rel_map(
353self, subjs: Optional[List[str]] = None, depth: int = 2, limit: int = 30
354) -> Dict[str, List[List[str]]]:
355"""Get flat rel map."""
356# The flat means for multi-hop relation path, we could get
357# knowledge like: subj -rel-> obj -rel-> obj <-rel- obj.
358# This type of knowledge is useful for some tasks.
359# +---------------------+---------------------------------------------...-----+
360# | subj | flattened_rels ... |
361# +---------------------+---------------------------------------------...-----+
362# | "{name:Tony Parker}"| "{name: Tony Parker}-[follow:{degree:95}]-> ...ili}"|
363# | "{name:Tony Parker}"| "{name: Tony Parker}-[follow:{degree:95}]-> ...r}" |
364# ...
365rel_map: Dict[Any, List[Any]] = {}
366if subjs is None or len(subjs) == 0:
367# unlike simple graph_store, we don't do get_all here
368return rel_map
369
370# WITH map{`true`: "-[", `false`: "<-["} AS arrow_l,
371# map{`true`: "]->", `false`: "]-"} AS arrow_r,
372# map{`follow`: "degree", `serve`: "start_year,end_year"} AS edge_type_map
373# MATCH p=(start)-[e:follow|serve*..2]-()
374# WHERE id(start) IN ["player100", "player101"]
375# WITH start, id(start) AS vid, nodes(p) AS nodes, e AS rels,
376# length(p) AS rel_count, arrow_l, arrow_r, edge_type_map
377# WITH
378# REDUCE(s = vid + '{', key IN [key_ in ["name"]
379# WHERE properties(start)[key_] IS NOT NULL] | s + key + ': ' +
380# COALESCE(TOSTRING(properties(start)[key]), 'null') + ', ')
381# + '}'
382# AS subj,
383# [item in [i IN RANGE(0, rel_count - 1) | [nodes[i], nodes[i + 1],
384# rels[i], typeid(rels[i]) > 0, type(rels[i]) ]] | [
385# arrow_l[tostring(item[3])] +
386# item[4] + ':' +
387# REDUCE(s = '{', key IN SPLIT(edge_type_map[item[4]], ',') |
388# s + key + ': ' + COALESCE(TOSTRING(properties(item[2])[key]),
389# 'null') + ', ') + '}'
390# +
391# arrow_r[tostring(item[3])],
392# REDUCE(s = id(item[1]) + '{', key IN [key_ in ["name"]
393# WHERE properties(item[1])[key_] IS NOT NULL] | s + key + ': ' +
394# COALESCE(TOSTRING(properties(item[1])[key]), 'null') + ', ') + '}'
395# ]
396# ] AS rels
397# WITH
398# REPLACE(subj, ', }', '}') AS subj,
399# REDUCE(acc = collect(NULL), l in rels | acc + l) AS flattened_rels
400# RETURN
401# subj,
402# REPLACE(REDUCE(acc = subj,l in flattened_rels|acc + ' ' + l),
403# ', }', '}')
404# AS flattened_rels
405# LIMIT 30
406
407# Based on self._include_vid
408# {name: Tim Duncan} or player100{name: Tim Duncan} for entity
409s_prefix = "vid + '{'" if self._include_vid else "'{'"
410s1 = "id(item[1]) + '{'" if self._include_vid else "'{'"
411
412query = (
413f"WITH map{{`true`: '-[', `false`: '<-['}} AS arrow_l,"
414f" map{{`true`: ']->', `false`: ']-'}} AS arrow_r,"
415f" {self._edge_prop_map_cypher_string} AS edge_type_map "
416f"MATCH p=(start)-[e:`{'`|`'.join(self._edge_types)}`*..{depth}]-() "
417f" WHERE id(start) IN $subjs "
418f"WITH start, id(start) AS vid, nodes(p) AS nodes, e AS rels,"
419f" length(p) AS rel_count, arrow_l, arrow_r, edge_type_map "
420f"WITH "
421f" REDUCE(s = {s_prefix}, key IN [key_ in {self._tag_prop_names!s} "
422f" WHERE properties(start)[key_] IS NOT NULL] | s + key + ': ' + "
423f" COALESCE(TOSTRING(properties(start)[key]), 'null') + ', ')"
424f" + '}}'"
425f" AS subj,"
426f" [item in [i IN RANGE(0, rel_count - 1)|[nodes[i], nodes[i + 1],"
427f" rels[i], typeid(rels[i]) > 0, type(rels[i]) ]] | ["
428f" arrow_l[tostring(item[3])] +"
429f" item[4] + ':' +"
430f" REDUCE(s = '{{', key IN SPLIT(edge_type_map[item[4]], ',') | "
431f" s + key + ': ' + COALESCE(TOSTRING(properties(item[2])[key]),"
432f" 'null') + ', ') + '}}'"
433f" +"
434f" arrow_r[tostring(item[3])],"
435f" REDUCE(s = {s1}, key IN [key_ in "
436f" {self._tag_prop_names!s} WHERE properties(item[1])[key_] "
437f" IS NOT NULL] | s + key + ': ' + "
438f" COALESCE(TOSTRING(properties(item[1])[key]), 'null') + ', ')"
439f" + '}}'"
440f" ]"
441f" ] AS rels "
442f"WITH "
443f" REPLACE(subj, ', }}', '}}') AS subj,"
444f" REDUCE(acc = collect(NULL), l in rels | acc + l) AS flattened_rels "
445f"RETURN "
446f" subj,"
447f" REPLACE(REDUCE(acc = subj, l in flattened_rels | acc + ' ' + l), "
448f" ', }}', '}}') "
449f" AS flattened_rels"
450f" LIMIT {limit}"
451)
452subjs_param = prepare_subjs_param(subjs, self._vid_type)
453logger.debug(f"get_flat_rel_map()\nsubjs_param: {subjs},\nquery: {query}")
454if subjs_param == {}:
455# This happens when subjs is None after prepare_subjs_param()
456# Probably because vid type is INT64, but no digit string is provided.
457return rel_map
458result = self.execute(query, subjs_param)
459if result is None:
460return rel_map
461
462# get raw data
463subjs_ = result.column_values("subj") or []
464rels_ = result.column_values("flattened_rels") or []
465
466for subj, rel in zip(subjs_, rels_):
467subj_ = subj.cast()
468rel_ = rel.cast()
469if subj_ not in rel_map:
470rel_map[subj_] = []
471rel_map[subj_].append(rel_)
472return rel_map
473
474def get_rel_map(
475self, subjs: Optional[List[str]] = None, depth: int = 2, limit: int = 30
476) -> Dict[str, List[List[str]]]:
477"""Get rel map."""
478# We put rels in a long list for depth>= 1, this is different from
479# SimpleGraphStore.get_rel_map() though.
480# But this makes more sense for multi-hop relation path.
481
482if subjs is not None:
483subjs = [
484escape_str(subj) for subj in subjs if isinstance(subj, str) and subj
485]
486if len(subjs) == 0:
487return {}
488
489return self.get_flat_rel_map(subjs, depth, limit)
490
491def upsert_triplet(self, subj: str, rel: str, obj: str) -> None:
492"""Add triplet."""
493# Note, to enable leveraging existing knowledge graph,
494# the (triplet -- property graph) mapping
495# makes (n:1) edge_type.prop_name --> triplet.rel
496# thus we have to assume rel to be the first edge_type.prop_name
497# here in upsert_triplet().
498# This applies to the type of entity(tags) with subject and object, too,
499# thus we have to assume subj to be the first entity.tag_name
500
501# lower case subj, rel, obj
502subj = escape_str(subj)
503rel = escape_str(rel)
504obj = escape_str(obj)
505if self._vid_type == "INT64":
506assert all(
507[subj.isdigit(), obj.isdigit()]
508), "Subject and object should be digit strings in current graph store."
509subj_field = subj
510obj_field = obj
511else:
512subj_field = f"{QUOTE}{subj}{QUOTE}"
513obj_field = f"{QUOTE}{obj}{QUOTE}"
514edge_field = f"{subj_field}->{obj_field}"
515
516edge_type = self._edge_types[0]
517rel_prop_name = self._rel_prop_names[0]
518entity_type = self._tags[0]
519rel_hash = hash_string_to_rank(rel)
520dml_query = (
521f"INSERT VERTEX `{entity_type}`(name) "
522f" VALUES {subj_field}:({QUOTE}{subj}{QUOTE});"
523f"INSERT VERTEX `{entity_type}`(name) "
524f" VALUES {obj_field}:({QUOTE}{obj}{QUOTE});"
525f"INSERT EDGE `{edge_type}`(`{rel_prop_name}`) "
526f" VALUES "
527f"{edge_field}"
528f"@{rel_hash}:({QUOTE}{rel}{QUOTE});"
529)
530logger.debug(f"upsert_triplet()\nDML query: {dml_query}")
531result = self.execute(dml_query)
532assert (
533result and result.is_succeeded()
534), f"Failed to upsert triplet: {subj} {rel} {obj}, query: {dml_query}"
535
536def delete(self, subj: str, rel: str, obj: str) -> None:
537"""Delete triplet.
5381. Similar to upsert_triplet(),
539we have to assume rel to be the first edge_type.prop_name.
5402. After edge being deleted, we need to check if the subj or
541obj are isolated vertices,
542if so, delete them, too.
543"""
544# lower case subj, rel, obj
545subj = escape_str(subj)
546rel = escape_str(rel)
547obj = escape_str(obj)
548
549if self._vid_type == "INT64":
550assert all(
551[subj.isdigit(), obj.isdigit()]
552), "Subject and object should be digit strings in current graph store."
553subj_field = subj
554obj_field = obj
555else:
556subj_field = f"{QUOTE}{subj}{QUOTE}"
557obj_field = f"{QUOTE}{obj}{QUOTE}"
558edge_field = f"{subj_field}->{obj_field}"
559
560# DELETE EDGE serve "player100" -> "team204"@7696463696635583936;
561edge_type = self._edge_types[0]
562# rel_prop_name = self._rel_prop_names[0]
563rel_hash = hash_string_to_rank(rel)
564dml_query = f"DELETE EDGE `{edge_type}`" f" {edge_field}@{rel_hash};"
565logger.debug(f"delete()\nDML query: {dml_query}")
566result = self.execute(dml_query)
567assert (
568result and result.is_succeeded()
569), f"Failed to delete triplet: {subj} {rel} {obj}, query: {dml_query}"
570# Get isolated vertices to be deleted
571# MATCH (s) WHERE id(s) IN ["player700"] AND NOT (s)-[]-()
572# RETURN id(s) AS isolated
573query = (
574f"MATCH (s) "
575f" WHERE id(s) IN [{subj_field}, {obj_field}] "
576f" AND NOT (s)-[]-() "
577f"RETURN id(s) AS isolated"
578)
579result = self.execute(query)
580isolated = result.column_values("isolated")
581if not isolated:
582return
583# DELETE VERTEX "player700" or DELETE VERTEX 700
584quote_field = QUOTE if self._vid_type != "INT64" else ""
585vertex_ids = ",".join(
586[f"{quote_field}{v.cast()}{quote_field}" for v in isolated]
587)
588dml_query = f"DELETE VERTEX {vertex_ids};"
589
590result = self.execute(dml_query)
591assert (
592result and result.is_succeeded()
593), f"Failed to delete isolated vertices: {isolated}, query: {dml_query}"
594
595def refresh_schema(self) -> None:
596"""
597Refreshes the NebulaGraph Store Schema.
598"""
599tags_schema, edge_types_schema, relationships = [], [], []
600for tag in self.execute("SHOW TAGS").column_values("Name"):
601tag_name = tag.cast()
602tag_schema = {"tag": tag_name, "properties": []}
603r = self.execute(f"DESCRIBE TAG `{tag_name}`")
604props, types, comments = (
605r.column_values("Field"),
606r.column_values("Type"),
607r.column_values("Comment"),
608)
609for i in range(r.row_size()):
610# back compatible with old version of nebula-python
611property_defination = (
612(props[i].cast(), types[i].cast())
613if comments[i].is_empty()
614else (props[i].cast(), types[i].cast(), comments[i].cast())
615)
616tag_schema["properties"].append(property_defination)
617tags_schema.append(tag_schema)
618for edge_type in self.execute("SHOW EDGES").column_values("Name"):
619edge_type_name = edge_type.cast()
620edge_schema = {"edge": edge_type_name, "properties": []}
621r = self.execute(f"DESCRIBE EDGE `{edge_type_name}`")
622props, types, comments = (
623r.column_values("Field"),
624r.column_values("Type"),
625r.column_values("Comment"),
626)
627for i in range(r.row_size()):
628# back compatible with old version of nebula-python
629property_defination = (
630(props[i].cast(), types[i].cast())
631if comments[i].is_empty()
632else (props[i].cast(), types[i].cast(), comments[i].cast())
633)
634edge_schema["properties"].append(property_defination)
635edge_types_schema.append(edge_schema)
636
637# build relationships types
638sample_edge = self.execute(
639rel_query_sample_edge.substitute(edge_type=edge_type_name)
640).column_values("sample_edge")
641if len(sample_edge) == 0:
642continue
643src_id, dst_id = sample_edge[0].cast()
644r = self.execute(
645rel_query_edge_type.substitute(
646edge_type=edge_type_name,
647src_id=src_id,
648dst_id=dst_id,
649quote="" if self._vid_type == "INT64" else QUOTE,
650)
651).column_values("rels")
652if len(r) > 0:
653relationships.append(r[0].cast())
654
655self.schema = (
656f"Node properties: {tags_schema}\n"
657f"Edge properties: {edge_types_schema}\n"
658f"Relationships: {relationships}\n"
659)
660
661def get_schema(self, refresh: bool = False) -> str:
662"""Get the schema of the NebulaGraph store."""
663if self.schema and not refresh:
664return self.schema
665self.refresh_schema()
666logger.debug(f"get_schema()\nschema: {self.schema}")
667return self.schema
668
669def query(self, query: str, param_map: Optional[Dict[str, Any]] = {}) -> Any:
670result = self.execute(query, param_map)
671columns = result.keys()
672d: Dict[str, list] = {}
673for col_num in range(result.col_size()):
674col_name = columns[col_num]
675col_list = result.column_values(col_name)
676d[col_name] = [x.cast() for x in col_list]
677return d
678