quick-start-connectors
36 строк · 1.0 Кб
1import logging
2from typing import Any
3from .client import get_client
4
5logger = logging.getLogger(__name__)
6
7
8def serialize_results(data, mappings={}):
9"""
10Serialize a list of dictionaries by transforming keys based on provided mappings
11and converting values to strings.
12
13Parameters:
14- data (list): A list of dictionaries to be serialized.
15- mappings (dict): A dictionary specifying key mappings for transformation.
16
17Returns:
18list: A serialized list of dictionaries with transformed keys and string-converted values.
19"""
20
21def serialize_item(item):
22serialized_item = {}
23for k, v in item.items():
24key = k if k not in mappings else mappings[k]
25serialized_item[key] = (
26str(v) if not isinstance(v, list) else ", ".join(str(vl) for vl in v)
27)
28
29return serialized_item
30
31return list(map(serialize_item, data))
32
33
34def search(query) -> list[dict[str, Any]]:
35client = get_client()
36return serialize_results(client.search(query), client.fields_mapping)
37