pytorch

Форк
0
/
upload_metrics.py 
184 строки · 6.6 Кб
1
import datetime
2
import inspect
3
import os
4
import time
5
import uuid
6

7
from decimal import Decimal
8
from typing import Any, Dict
9
from warnings import warn
10

11
# boto3 is an optional dependency. If it's not installed,
12
# we'll just not emit the metrics.
13
# Keeping this logic here so that callers don't have to
14
# worry about it.
15
EMIT_METRICS = False
16
try:
17
    import boto3  # type: ignore[import]
18

19
    EMIT_METRICS = True
20
except ImportError as e:
21
    print(f"Unable to import boto3. Will not be emitting metrics.... Reason: {e}")
22

23

24
class EnvVarMetric:
25
    name: str
26
    env_var: str
27
    required: bool = True
28
    # Used to cast the value of the env_var to the correct type (defaults to str)
29
    type_conversion_fn: Any = None
30

31
    def __init__(
32
        self,
33
        name: str,
34
        env_var: str,
35
        required: bool = True,
36
        type_conversion_fn: Any = None,
37
    ) -> None:
38
        self.name = name
39
        self.env_var = env_var
40
        self.required = required
41
        self.type_conversion_fn = type_conversion_fn
42

43
    def value(self) -> Any:
44
        value = os.environ.get(self.env_var)
45

46
        # Github CI will set some env vars to an empty string
47
        DEFAULT_ENVVAR_VALUES = [None, ""]
48
        if value in DEFAULT_ENVVAR_VALUES:
49
            if not self.required:
50
                return None
51

52
            raise ValueError(
53
                f"Missing {self.name}. Please set the {self.env_var} "
54
                "environment variable to pass in this value."
55
            )
56

57
        if self.type_conversion_fn:
58
            return self.type_conversion_fn(value)
59
        return value
60

61

62
global_metrics: Dict[str, Any] = {}
63

64

65
def add_global_metric(metric_name: str, metric_value: Any) -> None:
66
    """
67
    Adds stats that should be emitted with every metric by the current process.
68
    If the emit_metrics method specifies a metric with the same name, it will
69
    overwrite this value.
70
    """
71
    global_metrics[metric_name] = metric_value
72

73

74
def emit_metric(
75
    metric_name: str,
76
    metrics: Dict[str, Any],
77
) -> None:
78
    """
79
    Upload a metric to DynamoDB (and from there, Rockset).
80

81
    Even if EMIT_METRICS is set to False, this function will still run the code to
82
    validate and shape the metrics, skipping just the upload.
83

84
    Parameters:
85
        metric_name:
86
            Name of the metric. Every unique metric should have a different name
87
            and be emitted just once per run attempt.
88
            Metrics are namespaced by their module and the function that emitted them.
89
        metrics: The actual data to record.
90

91
    Some default values are populated from environment variables, which must be set
92
    for metrics to be emitted. (If they're not set, this function becomes a noop):
93
    """
94

95
    if metrics is None:
96
        raise ValueError("You didn't ask to upload any metrics!")
97

98
    # Merge the given metrics with the global metrics, overwriting any duplicates
99
    # with the given metrics.
100
    metrics = {**global_metrics, **metrics}
101

102
    # We use these env vars that to determine basic info about the workflow run.
103
    # By using env vars, we don't have to pass this info around to every function.
104
    # It also helps ensure that we only emit metrics during CI
105
    env_var_metrics = [
106
        EnvVarMetric("repo", "GITHUB_REPOSITORY"),
107
        EnvVarMetric("workflow", "GITHUB_WORKFLOW"),
108
        EnvVarMetric("build_environment", "BUILD_ENVIRONMENT", required=False),
109
        EnvVarMetric("job", "GITHUB_JOB"),
110
        EnvVarMetric("test_config", "TEST_CONFIG", required=False),
111
        EnvVarMetric("pr_number", "PR_NUMBER", required=False, type_conversion_fn=int),
112
        EnvVarMetric("run_id", "GITHUB_RUN_ID", type_conversion_fn=int),
113
        EnvVarMetric("run_number", "GITHUB_RUN_NUMBER", type_conversion_fn=int),
114
        EnvVarMetric("run_attempt", "GITHUB_RUN_ATTEMPT", type_conversion_fn=int),
115
        EnvVarMetric("job_id", "JOB_ID", type_conversion_fn=int),
116
        EnvVarMetric("job_name", "JOB_NAME"),
117
    ]
118

119
    # Use info about the function that invoked this one as a namespace and a way to filter metrics.
120
    calling_frame = inspect.currentframe().f_back  # type: ignore[union-attr]
121
    calling_frame_info = inspect.getframeinfo(calling_frame)  # type: ignore[arg-type]
122
    calling_file = os.path.basename(calling_frame_info.filename)
123
    calling_module = inspect.getmodule(calling_frame).__name__  # type: ignore[union-attr]
124
    calling_function = calling_frame_info.function
125

126
    try:
127
        reserved_metrics = {
128
            "metric_name": metric_name,
129
            "calling_file": calling_file,
130
            "calling_module": calling_module,
131
            "calling_function": calling_function,
132
            "timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f"),
133
            **{m.name: m.value() for m in env_var_metrics if m.value()},
134
        }
135
    except ValueError as e:
136
        warn(f"Not emitting metrics for {metric_name}. {e}")
137
        return
138

139
    # Prefix key with metric name and timestamp to derisk chance of a uuid1 name collision
140
    reserved_metrics[
141
        "dynamo_key"
142
    ] = f"{metric_name}_{int(time.time())}_{uuid.uuid1().hex}"
143

144
    # Ensure the metrics dict doesn't contain any reserved keys
145
    for key in reserved_metrics.keys():
146
        used_reserved_keys = [k for k in metrics.keys() if k == key]
147
        if used_reserved_keys:
148
            raise ValueError(f"Metrics dict contains reserved keys: [{', '.join(key)}]")
149

150
    # boto3 doesn't support uploading float values to DynamoDB, so convert them all to decimals.
151
    metrics = _convert_float_values_to_decimals(metrics)
152

153
    if EMIT_METRICS:
154
        try:
155
            session = boto3.Session(region_name="us-east-1")
156
            session.resource("dynamodb").Table("torchci-metrics").put_item(
157
                Item={
158
                    **reserved_metrics,
159
                    **metrics,
160
                }
161
            )
162
        except Exception as e:
163
            # We don't want to fail the job if we can't upload the metric.
164
            # We still raise the ValueErrors outside this try block since those indicate improperly configured metrics
165
            warn(f"Error uploading metric {metric_name} to DynamoDB: {e}")
166
            return
167
    else:
168
        print(f"Not emitting metrics for {metric_name}. Boto wasn't imported.")
169

170

171
def _convert_float_values_to_decimals(data: Dict[str, Any]) -> Dict[str, Any]:
172
    # Attempt to recurse
173
    def _helper(o: Any) -> Any:
174
        if isinstance(o, float):
175
            return Decimal(str(o))
176
        if isinstance(o, list):
177
            return [_helper(v) for v in o]
178
        if isinstance(o, dict):
179
            return {_helper(k): _helper(v) for k, v in o.items()}
180
        if isinstance(o, tuple):
181
            return tuple(_helper(v) for v in o)
182
        return o
183

184
    return {k: _helper(v) for k, v in data.items()}
185

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.