oracledb-exporter-async

Форк
0
218 строк · 6.5 Кб
1
'''
2
Metric collector module
3

4
Terminology:
5
Metric - a module artifact. This is what is sent to Prometheus.
6
Task - the source data for metric formation. A task is a single query
7
    to the database, which is executed according to its own schedule.
8
    Multiple metrics can be described within one task.
9
    Tasks are described in the TOML file within the [[metrics]] section.
10
Job - a background task execution mechanism provided
11
    by the APSchedule package.
12
'''
13

14
import sqlalchemy as sa
15
import tomllib
16
import prometheus_client
17

18
from apscheduler.triggers import cron
19

20
from odbe import extensions as ext
21

22

23
DEFAULT_TRIGGER_INTERVAL = 30  # seconds
24

25

26
# --- global task registry
27
_tasks_registry = []
28

29

30
def app():
31
    '''Instance of application'''
32
    return ext.scheduler.app
33

34

35
def collect_metrics(task: dict):
36
    '''Create metric objects with task data and response from DB
37

38
    Args:
39
        task - scheduled task with DB reponse
40
        example:
41
        {
42
            'context': 'context_no_label',
43
            'labels': [ 'label_1', 'label_2' ]
44
            'request': "SELECT 1 as value_1, 2 as value_2, "
45
                "'First label' as label_1, "
46
                "'Second label' as label_2 FROM DUAL",
47
            'metricsdesc': {
48
                'value_1': 'Simple example returning always 1.',
49
                'value_2': 'Same but returning always 2.'
50
                },
51
            'response': {
52
                'value_1': 1,
53
                'value_2': 2,
54
                'label_1': 'First label',
55
                'label_2': 'Second label'
56
                },
57
            'metrics': {
58
                'value_1': prometheus_client.Gauge(),
59
                'value_2': prometheus_client.Counter(),
60
                }
61
        }
62
    '''
63
    metricsdesc = task.get('metricsdesc', [])
64
    labels = task.get('labels', [])
65
    response = task.get('response', {})
66
    metrics = task.get('metrics', {})
67
    for name in metricsdesc:
68
        # --- lower() - reponse from db always in lowercase
69
        value = response.get(name.lower(), 0.0)
70
        # --- value can be None
71
        if value is None:
72
            value = 0.0
73
        # ---
74
        label_vals = [response.get(label.lower(), None) for label in labels]
75
        metric_obj = metrics.get(name, None)
76
        if not metric_obj:
77
            continue
78
        if len(label_vals) > 0:
79
            metric_obj = metric_obj.labels(*label_vals)
80
        if isinstance(metric_obj, prometheus_client.Counter):
81
            metric_obj.inc(float(value))
82
        else:
83
            metric_obj.set(float(value))
84

85

86
def execute(index: int):
87
    '''
88
    Job executor - background request to the database
89

90
    Args:
91
        index - task index in global registry
92
    '''
93
    with app().app_context():
94
        task = _tasks_registry[index]
95
        context = task.get('context')
96
        metrics = str(list(task.get('metricsdesc', []).keys()))
97
        app().logger.info(f'Request for {context}{metrics}')
98
        # --- make sql request to DB
99
        #     take first row in response only
100
        #     response example:
101
        #     {
102
        #       'value_1': 1,
103
        #       'value_2': 2,
104
        #       'label_1': 'First label',
105
        #       'label_2': 'Second label'
106
        #       }
107
        request = task['request']
108
        result = ext.db.session.execute(sa.text(request)).all()
109
        if len(result) == 0:
110
            app().logger.warning(
111
                    f'Response for {context}{metrics} has no rows in answer')
112
            return
113
        response = result[0]._asdict()
114
        app().logger.info(
115
                f'Response for {context}{metrics}:  {response}')
116
        task['response'] = response
117
        collect_metrics(task)
118

119

120
def _create_metrics(task: dict) -> dict:
121
    '''
122
    Create metrics for the task.
123

124
    One task can describe one or multiple metrics,
125
    which are generated based on the response from the database.
126
    Metrics are described in the 'metricsdesc' field:
127
    each key represents a metric.
128

129
    Args:
130
        task - task description
131
    Returns:
132
        Dictionary of metrics, where the key is the metric name from
133
        'metricsdesc',
134
        and the value is the metric object, e.g., 'prometheus_client.Gauge'
135
    '''
136
    metrics = {}
137
    context = task.get('context', 'odbe')
138
    metricsdesc = task.get('metricsdesc', [])
139
    metricstype = task.get('metricstype', {})
140
    labels = task.get('labels', [])
141
    for name in metricsdesc:
142
        desc = metricsdesc[name]
143
        if metricstype.get(name, 'gauge') == 'counter':
144
            klass = prometheus_client.Counter
145
        else:
146
            klass = prometheus_client.Gauge
147
        full_name = f'{context}_{name}'
148
        metrics[name] = klass(full_name, desc, labels)
149
    return metrics
150

151

152
def _read_tasks(path: str):
153
    '''
154
    Read tasks into the global list.
155

156
    Args:
157
        path - path to the TOML file with metric descriptions
158
    '''
159
    file = open(path, 'rb')
160
    # TODO
161
    # make file validation
162
    tasks = tomllib.load(file).get('metric', [])
163
    if len(tasks) == 0:
164
        app().logger.warn(
165
                f'There are no metrics load from {path}')
166
    return tasks
167

168

169
def _create_job(task: dict, func_args: list, job_id: str):
170
    '''
171
    Schedule job for task
172

173
    Args:
174
        task - task description
175
        func_args - list of argumets for execute function
176
        job_id - job id
177
    '''
178
    context = task.get('context', 'task')
179
    job_name = f'{context}_{job_id}'
180
    if 'cron' in task:
181
        crontab = task['cron']
182
        job = ext.scheduler.add_job(
183
            func=execute,
184
            args=func_args,
185
            trigger=cron.CronTrigger.from_crontab(crontab),
186
            id=job_id,
187
            name=job_name,
188
            replace_existing=True,
189
            misfire_grace_time=app().config['MISFIRE_GRACE_TIME'],
190
        )
191
    else:
192
        interval = task.get('interval', DEFAULT_TRIGGER_INTERVAL)
193
        job = ext.scheduler.add_job(
194
            func=execute,
195
            args=func_args,
196
            trigger='interval',
197
            seconds=interval,
198
            id=job_id,
199
            name=job_name,
200
            replace_existing=True,
201
            misfire_grace_time=app().config['MISFIRE_GRACE_TIME'],
202
        )
203
    return job
204

205

206
def load(metrics_path: str):
207
    '''
208
    Load metrics.
209

210
    Args:
211
        metrics_path - path to the TOML file with metric descriptions
212
    '''
213
    _tasks_registry.extend(_read_tasks(metrics_path))
214

215
    for index, task in enumerate(_tasks_registry):
216
        job = _create_job(task, func_args=[index], job_id=str(index))
217
        task['metrics'] = _create_metrics(task)
218
        app().logger.info(f'job {job.name} scheduled')
219

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.