oracledb-exporter-async
218 строк · 6.5 Кб
1'''
2Metric collector module
3
4Terminology:
5Metric - a module artifact. This is what is sent to Prometheus.
6Task - the source data for metric formation. A task is a single query
7to the database, which is executed according to its own schedule.
8Multiple metrics can be described within one task.
9Tasks are described in the TOML file within the [[metrics]] section.
10Job - a background task execution mechanism provided
11by the APSchedule package.
12'''
13
14import sqlalchemy as sa
15import tomllib
16import prometheus_client
17
18from apscheduler.triggers import cron
19
20from odbe import extensions as ext
21
22
23DEFAULT_TRIGGER_INTERVAL = 30 # seconds
24
25
26# --- global task registry
27_tasks_registry = []
28
29
30def app():
31'''Instance of application'''
32return ext.scheduler.app
33
34
35def collect_metrics(task: dict):
36'''Create metric objects with task data and response from DB
37
38Args:
39task - scheduled task with DB reponse
40example:
41{
42'context': 'context_no_label',
43'labels': [ 'label_1', 'label_2' ]
44'request': "SELECT 1 as value_1, 2 as value_2, "
45"'First label' as label_1, "
46"'Second label' as label_2 FROM DUAL",
47'metricsdesc': {
48'value_1': 'Simple example returning always 1.',
49'value_2': 'Same but returning always 2.'
50},
51'response': {
52'value_1': 1,
53'value_2': 2,
54'label_1': 'First label',
55'label_2': 'Second label'
56},
57'metrics': {
58'value_1': prometheus_client.Gauge(),
59'value_2': prometheus_client.Counter(),
60}
61}
62'''
63metricsdesc = task.get('metricsdesc', [])
64labels = task.get('labels', [])
65response = task.get('response', {})
66metrics = task.get('metrics', {})
67for name in metricsdesc:
68# --- lower() - reponse from db always in lowercase
69value = response.get(name.lower(), 0.0)
70# --- value can be None
71if value is None:
72value = 0.0
73# ---
74label_vals = [response.get(label.lower(), None) for label in labels]
75metric_obj = metrics.get(name, None)
76if not metric_obj:
77continue
78if len(label_vals) > 0:
79metric_obj = metric_obj.labels(*label_vals)
80if isinstance(metric_obj, prometheus_client.Counter):
81metric_obj.inc(float(value))
82else:
83metric_obj.set(float(value))
84
85
86def execute(index: int):
87'''
88Job executor - background request to the database
89
90Args:
91index - task index in global registry
92'''
93with app().app_context():
94task = _tasks_registry[index]
95context = task.get('context')
96metrics = str(list(task.get('metricsdesc', []).keys()))
97app().logger.info(f'Request for {context}{metrics}')
98# --- make sql request to DB
99# take first row in response only
100# response example:
101# {
102# 'value_1': 1,
103# 'value_2': 2,
104# 'label_1': 'First label',
105# 'label_2': 'Second label'
106# }
107request = task['request']
108result = ext.db.session.execute(sa.text(request)).all()
109if len(result) == 0:
110app().logger.warning(
111f'Response for {context}{metrics} has no rows in answer')
112return
113response = result[0]._asdict()
114app().logger.info(
115f'Response for {context}{metrics}: {response}')
116task['response'] = response
117collect_metrics(task)
118
119
120def _create_metrics(task: dict) -> dict:
121'''
122Create metrics for the task.
123
124One task can describe one or multiple metrics,
125which are generated based on the response from the database.
126Metrics are described in the 'metricsdesc' field:
127each key represents a metric.
128
129Args:
130task - task description
131Returns:
132Dictionary of metrics, where the key is the metric name from
133'metricsdesc',
134and the value is the metric object, e.g., 'prometheus_client.Gauge'
135'''
136metrics = {}
137context = task.get('context', 'odbe')
138metricsdesc = task.get('metricsdesc', [])
139metricstype = task.get('metricstype', {})
140labels = task.get('labels', [])
141for name in metricsdesc:
142desc = metricsdesc[name]
143if metricstype.get(name, 'gauge') == 'counter':
144klass = prometheus_client.Counter
145else:
146klass = prometheus_client.Gauge
147full_name = f'{context}_{name}'
148metrics[name] = klass(full_name, desc, labels)
149return metrics
150
151
152def _read_tasks(path: str):
153'''
154Read tasks into the global list.
155
156Args:
157path - path to the TOML file with metric descriptions
158'''
159file = open(path, 'rb')
160# TODO
161# make file validation
162tasks = tomllib.load(file).get('metric', [])
163if len(tasks) == 0:
164app().logger.warn(
165f'There are no metrics load from {path}')
166return tasks
167
168
169def _create_job(task: dict, func_args: list, job_id: str):
170'''
171Schedule job for task
172
173Args:
174task - task description
175func_args - list of argumets for execute function
176job_id - job id
177'''
178context = task.get('context', 'task')
179job_name = f'{context}_{job_id}'
180if 'cron' in task:
181crontab = task['cron']
182job = ext.scheduler.add_job(
183func=execute,
184args=func_args,
185trigger=cron.CronTrigger.from_crontab(crontab),
186id=job_id,
187name=job_name,
188replace_existing=True,
189misfire_grace_time=app().config['MISFIRE_GRACE_TIME'],
190)
191else:
192interval = task.get('interval', DEFAULT_TRIGGER_INTERVAL)
193job = ext.scheduler.add_job(
194func=execute,
195args=func_args,
196trigger='interval',
197seconds=interval,
198id=job_id,
199name=job_name,
200replace_existing=True,
201misfire_grace_time=app().config['MISFIRE_GRACE_TIME'],
202)
203return job
204
205
206def load(metrics_path: str):
207'''
208Load metrics.
209
210Args:
211metrics_path - path to the TOML file with metric descriptions
212'''
213_tasks_registry.extend(_read_tasks(metrics_path))
214
215for index, task in enumerate(_tasks_registry):
216job = _create_job(task, func_args=[index], job_id=str(index))
217task['metrics'] = _create_metrics(task)
218app().logger.info(f'job {job.name} scheduled')
219