7
from decimal import Decimal
8
from typing import Any, Dict
9
from warnings import warn
11
# boto3 is an optional dependency. If it's not installed,
12
# we'll just not emit the metrics.
13
# Keeping this logic here so that callers don't have to
17
import boto3 # type: ignore[import]
20
except ImportError as e:
21
print(f"Unable to import boto3. Will not be emitting metrics.... Reason: {e}")
28
# Used to cast the value of the env_var to the correct type (defaults to str)
29
type_conversion_fn: Any = None
35
required: bool = True,
36
type_conversion_fn: Any = None,
39
self.env_var = env_var
40
self.required = required
41
self.type_conversion_fn = type_conversion_fn
43
def value(self) -> Any:
44
value = os.environ.get(self.env_var)
46
# Github CI will set some env vars to an empty string
47
DEFAULT_ENVVAR_VALUES = [None, ""]
48
if value in DEFAULT_ENVVAR_VALUES:
53
f"Missing {self.name}. Please set the {self.env_var} "
54
"environment variable to pass in this value."
57
if self.type_conversion_fn:
58
return self.type_conversion_fn(value)
62
global_metrics: Dict[str, Any] = {}
65
def add_global_metric(metric_name: str, metric_value: Any) -> None:
67
Adds stats that should be emitted with every metric by the current process.
68
If the emit_metrics method specifies a metric with the same name, it will
71
global_metrics[metric_name] = metric_value
76
metrics: Dict[str, Any],
79
Upload a metric to DynamoDB (and from there, Rockset).
81
Even if EMIT_METRICS is set to False, this function will still run the code to
82
validate and shape the metrics, skipping just the upload.
86
Name of the metric. Every unique metric should have a different name
87
and be emitted just once per run attempt.
88
Metrics are namespaced by their module and the function that emitted them.
89
metrics: The actual data to record.
91
Some default values are populated from environment variables, which must be set
92
for metrics to be emitted. (If they're not set, this function becomes a noop):
96
raise ValueError("You didn't ask to upload any metrics!")
98
# Merge the given metrics with the global metrics, overwriting any duplicates
99
# with the given metrics.
100
metrics = {**global_metrics, **metrics}
102
# We use these env vars that to determine basic info about the workflow run.
103
# By using env vars, we don't have to pass this info around to every function.
104
# It also helps ensure that we only emit metrics during CI
106
EnvVarMetric("repo", "GITHUB_REPOSITORY"),
107
EnvVarMetric("workflow", "GITHUB_WORKFLOW"),
108
EnvVarMetric("build_environment", "BUILD_ENVIRONMENT", required=False),
109
EnvVarMetric("job", "GITHUB_JOB"),
110
EnvVarMetric("test_config", "TEST_CONFIG", required=False),
111
EnvVarMetric("pr_number", "PR_NUMBER", required=False, type_conversion_fn=int),
112
EnvVarMetric("run_id", "GITHUB_RUN_ID", type_conversion_fn=int),
113
EnvVarMetric("run_number", "GITHUB_RUN_NUMBER", type_conversion_fn=int),
114
EnvVarMetric("run_attempt", "GITHUB_RUN_ATTEMPT", type_conversion_fn=int),
115
EnvVarMetric("job_id", "JOB_ID", type_conversion_fn=int),
116
EnvVarMetric("job_name", "JOB_NAME"),
119
# Use info about the function that invoked this one as a namespace and a way to filter metrics.
120
calling_frame = inspect.currentframe().f_back # type: ignore[union-attr]
121
calling_frame_info = inspect.getframeinfo(calling_frame) # type: ignore[arg-type]
122
calling_file = os.path.basename(calling_frame_info.filename)
123
calling_module = inspect.getmodule(calling_frame).__name__ # type: ignore[union-attr]
124
calling_function = calling_frame_info.function
128
"metric_name": metric_name,
129
"calling_file": calling_file,
130
"calling_module": calling_module,
131
"calling_function": calling_function,
132
"timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"),
133
**{m.name: m.value() for m in env_var_metrics if m.value()},
135
except ValueError as e:
136
warn(f"Not emitting metrics for {metric_name}. {e}")
139
# Prefix key with metric name and timestamp to derisk chance of a uuid1 name collision
142
] = f"{metric_name}_{int(time.time())}_{uuid.uuid1().hex}"
144
# Ensure the metrics dict doesn't contain any reserved keys
145
for key in reserved_metrics.keys():
146
used_reserved_keys = [k for k in metrics.keys() if k == key]
147
if used_reserved_keys:
148
raise ValueError(f"Metrics dict contains reserved keys: [{', '.join(key)}]")
150
# boto3 doesn't support uploading float values to DynamoDB, so convert them all to decimals.
151
metrics = _convert_float_values_to_decimals(metrics)
155
session = boto3.Session(region_name="us-east-1")
156
session.resource("dynamodb").Table("torchci-metrics").put_item(
162
except Exception as e:
163
# We don't want to fail the job if we can't upload the metric.
164
# We still raise the ValueErrors outside this try block since those indicate improperly configured metrics
165
warn(f"Error uploading metric {metric_name} to DynamoDB: {e}")
168
print(f"Not emitting metrics for {metric_name}. Boto wasn't imported.")
171
def _convert_float_values_to_decimals(data: Dict[str, Any]) -> Dict[str, Any]:
173
def _helper(o: Any) -> Any:
174
if isinstance(o, float):
175
return Decimal(str(o))
176
if isinstance(o, list):
177
return [_helper(v) for v in o]
178
if isinstance(o, dict):
179
return {_helper(k): _helper(v) for k, v in o.items()}
180
if isinstance(o, tuple):
181
return tuple(_helper(v) for v in o)
184
return {k: _helper(v) for k, v in data.items()}