transformers

Форк
0
/
test_multi_node_model_parallel.py 
122 строки · 4.4 Кб
1
import json
2
import os
3
import subprocess
4
import unittest
5
from ast import literal_eval
6

7
import pytest
8
from parameterized import parameterized, parameterized_class
9

10
from . import is_sagemaker_available
11

12

13
if is_sagemaker_available():
14
    from sagemaker import Session, TrainingJobAnalytics
15
    from sagemaker.huggingface import HuggingFace
16

17

18
@pytest.mark.skipif(
19
    literal_eval(os.getenv("TEST_SAGEMAKER", "False")) is not True,
20
    reason="Skipping test because should only be run when releasing minor transformers version",
21
)
22
@pytest.mark.usefixtures("sm_env")
23
@parameterized_class(
24
    [
25
        {
26
            "framework": "pytorch",
27
            "script": "run_glue_model_parallelism.py",
28
            "model_name_or_path": "FacebookAI/roberta-large",
29
            "instance_type": "ml.p3dn.24xlarge",
30
            "results": {"train_runtime": 1600, "eval_accuracy": 0.3, "eval_loss": 1.2},
31
        },
32
        {
33
            "framework": "pytorch",
34
            "script": "run_glue.py",
35
            "model_name_or_path": "FacebookAI/roberta-large",
36
            "instance_type": "ml.p3dn.24xlarge",
37
            "results": {"train_runtime": 1600, "eval_accuracy": 0.3, "eval_loss": 1.2},
38
        },
39
    ]
40
)
41
class MultiNodeTest(unittest.TestCase):
42
    def setUp(self):
43
        if self.framework == "pytorch":
44
            subprocess.run(
45
                f"cp ./examples/pytorch/text-classification/run_glue.py {self.env.test_path}/run_glue.py".split(),
46
                encoding="utf-8",
47
                check=True,
48
            )
49
        assert hasattr(self, "env")
50

51
    def create_estimator(self, instance_count):
52
        # configuration for running training on smdistributed Model Parallel
53
        mpi_options = {
54
            "enabled": True,
55
            "processes_per_host": 8,
56
        }
57
        smp_options = {
58
            "enabled": True,
59
            "parameters": {
60
                "microbatches": 4,
61
                "placement_strategy": "spread",
62
                "pipeline": "interleaved",
63
                "optimize": "speed",
64
                "partitions": 4,
65
                "ddp": True,
66
            },
67
        }
68

69
        distribution = {"smdistributed": {"modelparallel": smp_options}, "mpi": mpi_options}
70

71
        name_extension = "trainer" if self.script == "run_glue.py" else "smtrainer"
72
        # creates estimator
73
        return HuggingFace(
74
            entry_point=self.script,
75
            source_dir=self.env.test_path,
76
            role=self.env.role,
77
            image_uri=self.env.image_uri,
78
            base_job_name=f"{self.env.base_job_name}-{instance_count}-smp-{name_extension}",
79
            instance_count=instance_count,
80
            instance_type=self.instance_type,
81
            debugger_hook_config=False,
82
            hyperparameters={
83
                **self.env.hyperparameters,
84
                "model_name_or_path": self.model_name_or_path,
85
                "max_steps": 500,
86
            },
87
            metric_definitions=self.env.metric_definitions,
88
            distribution=distribution,
89
            py_version="py36",
90
        )
91

92
    def save_results_as_csv(self, job_name):
93
        TrainingJobAnalytics(job_name).export_csv(f"{self.env.test_path}/{job_name}_metrics.csv")
94

95
    # @parameterized.expand([(2,), (4,),])
96
    @parameterized.expand([(1,)])
97
    def test_scripz(self, instance_count):
98
        # create estimator
99
        estimator = self.create_estimator(instance_count)
100

101
        # run training
102
        estimator.fit()
103

104
        # result dataframe
105
        result_metrics_df = TrainingJobAnalytics(estimator.latest_training_job.name).dataframe()
106

107
        # extract kpis
108
        eval_accuracy = list(result_metrics_df[result_metrics_df.metric_name == "eval_accuracy"]["value"])
109
        eval_loss = list(result_metrics_df[result_metrics_df.metric_name == "eval_loss"]["value"])
110
        # get train time from SageMaker job, this includes starting, preprocessing, stopping
111
        train_runtime = (
112
            Session().describe_training_job(estimator.latest_training_job.name).get("TrainingTimeInSeconds", 999999)
113
        )
114

115
        # assert kpis
116
        assert train_runtime <= self.results["train_runtime"]
117
        assert all(t >= self.results["eval_accuracy"] for t in eval_accuracy)
118
        assert all(t <= self.results["eval_loss"] for t in eval_loss)
119

120
        # dump tests result into json file to share in PR
121
        with open(f"{estimator.latest_training_job.name}.json", "w") as outfile:
122
            json.dump({"train_time": train_runtime, "eval_accuracy": eval_accuracy, "eval_loss": eval_loss}, outfile)
123

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.