firecracker
202 строки · 5.9 Кб
1# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2# SPDX-License-Identifier: Apache-2.0
3
4"""Fixture to send metrics to AWS CloudWatch and validate Firecracker metrics
5
6We use the aws-embedded-metrics library although it has some sharp corners,
7namely:
8
91. It uses asyncio, which complicates the flushing a bit.
10
112. It has an stateful API. Setting dimensions will override previous ones.
12
13Example:
14
15set_dimensions("instance")
16put_metric("duration", 1)
17set_dimensions("cpu")
18put_metric("duration", 1)
19
20This will end with 2 identical metrics with dimension "cpu" (the last one). The
21correct way of doing it is:
22
23set_dimensions("instance")
24put_metric("duration", 1)
25flush()
26set_dimensions("cpu")
27put_metric("duration", 1)
28
29This is not very intuitive, but we assume all metrics within a test will have
30the same dimensions.
31
32# Debugging
33
34You can override the destination of the metrics to stdout with:
35
36AWS_EMF_NAMESPACE=$USER-test
37AWS_EMF_ENVIRONMENT=local ./tools/devtest test
38
39# References:
40
41- https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html
42- https://github.com/awslabs/aws-embedded-metrics-python
43"""
44
45import asyncio46import json47import os48import socket49from urllib.parse import urlparse50
51from aws_embedded_metrics.constants import DEFAULT_NAMESPACE52from aws_embedded_metrics.logger.metrics_logger_factory import create_metrics_logger53
54
55class MetricsWrapperDummy:56"""Send metrics to /dev/null"""57
58def set_dimensions(self, *args, **kwargs):59"""Set dimensions"""60
61def put_metric(self, *args, **kwargs):62"""Put a datapoint with given dimensions"""63
64def set_property(self, *args, **kwargs):65"""Set a property"""66
67def flush(self):68"""Flush any remaining metrics"""69
70
71class MetricsWrapper:72"""A convenient metrics logger"""73
74def __init__(self, logger):75self.logger = logger76
77def __getattr__(self, attr):78"""Dispatch methods to logger instance"""79if attr not in self.__dict__:80return getattr(self.logger, attr)81return getattr(self, attr)82
83def flush(self):84"""Flush any remaining metrics"""85asyncio.run(self.logger.flush())86
87
88def get_metrics_logger():89"""Get a new metrics logger object"""90# if no metrics namespace, don't output metrics91if "AWS_EMF_NAMESPACE" not in os.environ:92return MetricsWrapperDummy()93logger = create_metrics_logger()94logger.reset_dimensions(False)95return MetricsWrapper(logger)96
97
98def emit_raw_emf(emf_msg: dict):99"""Emites a raw EMF log message to the local cloudwatch agent"""100if "AWS_EMF_AGENT_ENDPOINT" not in os.environ:101return102
103namespace = os.environ.get("AWS_EMF_NAMESPACE", DEFAULT_NAMESPACE)104emf_msg["_aws"]["LogGroupName"] = os.environ.get(105"AWS_EMF_LOG_GROUP_NAME", f"{namespace}-metrics"106)107emf_msg["_aws"]["LogStreamName"] = os.environ.get("AWS_EMF_LOG_STREAM_NAME", "")108for metrics in emf_msg["_aws"]["CloudWatchMetrics"]:109metrics["Namespace"] = namespace110
111emf_endpoint = urlparse(os.environ["AWS_EMF_AGENT_ENDPOINT"])112with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:113sock.sendto(114(json.dumps(emf_msg) + "\n").encode("utf-8"),115(emf_endpoint.hostname, emf_endpoint.port),116)117
118
119UNIT_REDUCTIONS = {120"Microseconds": "Milliseconds",121"Milliseconds": "Seconds",122"Bytes": "Kilobytes",123"Kilobytes": "Megabytes",124"Megabytes": "Gigabytes",125"Gigabytes": "Terabytes",126"Bits": "Kilobits",127"Kilobits": "Megabits",128"Megabits": "Gigabits",129"Gigabits": "Terabit",130"Bytes/Second": "Kilobytes/Second",131"Kilobytes/Second": "Megabytes/Second",132"Megabytes/Second": "Gigabytes/Second",133"Gigabytes/Second": "Terabytes/Second",134"Bits/Second": "Kilobits/Second",135"Kilobits/Second": "Megabits/Second",136"Megabits/Second": "Gigabits/Second",137"Gigabits/Second": "Terabits/Second",138}
139INV_UNIT_REDUCTIONS = {v: k for k, v in UNIT_REDUCTIONS.items()}140
141
142UNIT_SHORTHANDS = {143"Seconds": "s",144"Microseconds": "μs",145"Milliseconds": "ms",146"Bytes": "B",147"Kilobytes": "KB",148"Megabytes": "MB",149"Gigabytes": "GB",150"Terabytes": "TB",151"Bits": "Bit",152"Kilobits": "KBit",153"Megabits": "MBit",154"Gigabits": "GBit",155"Terabits": "TBit",156"Percent": "%",157"Count": "",158"Bytes/Second": "B/s",159"Kilobytes/Second": "KB/s",160"Megabytes/Second": "MB/s",161"Gigabytes/Second": "GB/s",162"Terabytes/Second": "TB/s",163"Bits/Second": "Bit/s",164"Kilobits/Second": "KBit/s",165"Megabits/Second": "MBit/s",166"Gigabits/Second": "GBit/s",167"Terabits/Second": "TBit/s",168"Count/Second": "Hz",169"None": "",170}
171
172
173def reduce_value(value, unit):174"""175Utility function for expressing a value in the largest possible unit in which it would still be >= 1
176
177For example, `reduce_value(1_000_000, Bytes)` would return (1, Megabytes)
178"""
179# Could do this recursively, but I am worried about infinite recursion180# due to precision problems (e.g. infinite loop of dividing/multiplying by 1000, alternating181# between values < 1 and >= 1000).182while abs(value) < 1 and unit in INV_UNIT_REDUCTIONS:183value *= 1000184unit = INV_UNIT_REDUCTIONS[unit]185while abs(value) >= 1000 and unit in UNIT_REDUCTIONS:186value /= 1000187unit = UNIT_REDUCTIONS[unit]188
189return value, unit190
191
192def format_with_reduced_unit(value, unit):193"""194Utility function for pretty printing a given value by choosing a unit as large as possible,
195and then outputting its shorthand.
196
197For example, `format_with_reduced_unit(1_000_000, Bytes)` would return "1MB".
198"""
199reduced_value, reduced_unit = reduce_value(value, unit)200formatted_unit = UNIT_SHORTHANDS.get(reduced_unit, reduced_unit)201
202return f"{reduced_value:.2f}{formatted_unit}"203