gpt-neox

Форк
0
/
initialize.py 
234 строки · 8.3 Кб
1
# Copyright (c) 2024, EleutherAI
2
# This file is based on code by the authors denoted below and has been modified from its original version.
3
#
4
# Copyright (c) 2024, NVIDIA CORPORATION.  All rights reserved.
5
#
6
# Licensed under the Apache License, Version 2.0 (the "License");
7
# you may not use this file except in compliance with the License.
8
# You may obtain a copy of the License at
9
#
10
#     http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS,
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
# See the License for the specific language governing permissions and
16
# limitations under the License.
17

18
"""Megatron initialization."""
19

20
import random
21
import os
22

23
import numpy as np
24
import torch
25

26
from megatron import fused_kernels
27
from megatron import mpu
28
from megatron.mpu import set_model_parallel_rank, set_model_parallel_world_size
29

30
import deepspeed
31
import inspect
32

33

34
def initialize_megatron(neox_args, allow_no_cuda=False):
35
    """Set initialize distributed and set autoresume and random seeds.
36
    `allow_no_cuda` should not be set unless using megatron for cpu only
37
    data processing. In general this arg should not be set unless you know
38
    what you are doing.
39
    Returns a function to finalize distributed env initialization
40
    (optionally, only when args.lazy_mpu_init == True)
41
    """
42
    if not allow_no_cuda:
43
        # Make sure cuda is available.
44
        assert torch.cuda.is_available(), "Megatron requires CUDA."
45

46
    # torch.distributed initialization
47
    def finish_mpu_init():
48
        # Pytorch distributed.
49
        _initialize_distributed(neox_args=neox_args)
50

51
        # Random seeds for reproducibility.
52
        if neox_args.rank == 0:
53
            print("> setting random seeds to {} ...".format(neox_args.seed))
54
        _set_random_seed(neox_args.seed)
55

56
    # check fused kernels are installed:
57
    if (
58
        neox_args.scaled_upper_triang_masked_softmax_fusion
59
        or neox_args.scaled_masked_softmax_fusion
60
        or neox_args.rope_fusion
61
    ):
62
        fused_kernels.load_fused_kernels()
63

64
    if neox_args.lazy_mpu_init:
65
        neox_args.use_cpu_initialization = True
66
        # delayed initialization of DDP-related stuff
67
        # We only set basic DDP globals
68
        set_model_parallel_world_size(neox_args.model_parallel_size)
69
        # and return function for external DDP manager to call when it has DDP initialized
70
        set_model_parallel_rank(neox_args.rank)
71
        return finish_mpu_init
72
    else:
73
        # Megatron's MPU is the master. Complete initialization right away.
74
        finish_mpu_init()
75

76
        # Compile dataset C++ code.
77
        if neox_args.local_rank == 0:
78
            from megatron.data.data_utils import compile_helper
79

80
            compile_helper()
81

82
        # Write arguments to tensorboard.
83
        _write_args_to_tensorboard(neox_args=neox_args)
84
        # No continuation function
85
        return None
86

87

88
def setup_deepspeed_random_and_activation_checkpointing(neox_args):
89
    """Optional DeepSpeed Activation Checkpointing features.
90
    Gives access to partition activations, contiguous memory optimizations
91
    and cpu checkpointing.
92

93
    Activation checkpoint requires keep track of the random states
94
    and setting the random seed for each MP process. Megatron uses
95
    mpu.get_cuda_rng_tracker and mpu.model_parallel_cuda_manual_seed
96
    for keeping track of the random states and setting the random seeds.
97
    Since they are used in places outside of activation checkpointing,
98
    we overwrite them to maintain consistency.
99

100
    This must be called before all the calls to mpu.model_parallel_cuda_manual_seed
101
    """
102
    num_layers = neox_args.num_layers // neox_args.checkpoint_num_layers
103
    num_layers = (
104
        num_layers
105
        if neox_args.num_layers % neox_args.checkpoint_num_layers == 0
106
        else num_layers + 1
107
    )
108

109
    deepspeed.checkpointing.configure(
110
        mpu,
111
        partition_activations=neox_args.partition_activations,
112
        contiguous_checkpointing=neox_args.contiguous_checkpointing,
113
        num_checkpoints=num_layers,
114
        checkpoint_in_cpu=neox_args.checkpoint_in_cpu,
115
        synchronize=neox_args.synchronize_each_layer,
116
        profile=neox_args.profile_backward,
117
    )
118

119

120
def _initialize_distributed(neox_args):
121
    """Initialize torch.distributed and mpu."""
122

123
    device_count = torch.cuda.device_count()
124
    if torch.distributed.is_initialized():
125

126
        if neox_args.rank == 0:
127
            print(
128
                "torch distributed is already initialized, "
129
                "skipping initialization ...",
130
                flush=True,
131
            )
132
        neox_args.rank = torch.distributed.get_rank()
133
        neox_args.world_size = torch.distributed.get_world_size()
134

135
    else:
136

137
        if neox_args.rank == 0:
138
            print("> initializing torch distributed ...", flush=True)
139
        # Manually set the device ids.
140
        if device_count > 0:
141
            device = neox_args.rank % device_count
142
            if neox_args.local_rank is not None:
143
                assert (
144
                    neox_args.local_rank == device
145
                ), "expected local-rank to be the same as rank % device-count."
146
            else:
147
                neox_args.local_rank = device
148
            torch.cuda.set_device(device)
149

150
        deepspeed.init_distributed(
151
            dist_backend=neox_args.distributed_backend,
152
            auto_mpi_discovery=True,
153
            distributed_port=os.getenv("MASTER_PORT", "6000"),
154
            verbose=True,
155
        )
156

157
    # Setup 3D topology.
158
    pp = neox_args.pipe_parallel_size if neox_args.pipe_parallel_size >= 1 else 1
159
    mp = neox_args.model_parallel_size if neox_args.model_parallel_size >= 1 else 1
160
    assert (
161
        neox_args.world_size % (pp * mp) == 0
162
    ), f"world_size={neox_args.world_size}, pp={pp}, mp={mp}"
163
    dp = neox_args.world_size // (pp * mp)
164

165
    from deepspeed.runtime.pipe.topology import PipeModelDataParallelTopology
166

167
    # this does pipe on the most outside, then data, then model.
168
    # PipeModelDataParallelTopology is just a wrapper over ProcessTopology that predefines this order.
169
    topo = PipeModelDataParallelTopology(num_pp=pp, num_mp=mp, num_dp=dp)
170

171
    # Offset base seeds for the interior pipeline stages.
172
    # TODO: adjust last stage too once IO is improved.
173
    stage_id = topo.get_coord(rank=torch.distributed.get_rank()).pipe
174
    if 0 < stage_id < topo.get_dim("pipe") - 1:
175
        offset = neox_args.seed + 1138
176
        neox_args.seed = offset + (stage_id * mp)
177

178
    # Set the model-parallel / data-parallel communicators.
179
    if device_count > 0:
180
        if mpu.model_parallel_is_initialized():
181
            print(
182
                "_initialize_distributed() model parallel is already initialized",
183
                flush=True,
184
            )
185
        else:
186
            mpu.initialize_model_parallel(
187
                neox_args.model_parallel_size,
188
                topology=topo,
189
                fp32_allreduce=neox_args.fp32_allreduce,
190
            )
191

192
    # Init DeepSpeed Activation Checkpointing Features
193
    setup_deepspeed_random_and_activation_checkpointing(neox_args=neox_args)
194

195

196
def _init_autoresume(neox_args):
197
    """Set autoresume start time."""
198

199
    if neox_args.adlr_autoresume:
200
        print_rank_0("> enabling autoresume ...")
201
        sys.path.append(os.environ.get("SUBMIT_SCRIPTS", "."))
202
        try:
203
            from userlib.auto_resume import AutoResume
204
        except BaseException:
205
            print("> ADLR autoresume is not available, exiting ...", flush=True)
206
            sys.exit()
207
        neox_args.adlr_autoresume_object = AutoResume
208

209
    if neox_args.adlr_autoresume_object:
210
        torch.distributed.barrier()
211
        neox_args.adlr_autoresume_object.init()
212
        torch.distributed.barrier()
213

214

215
def _set_random_seed(seed):
216
    """Set random seed for reproducibility."""
217
    if seed is not None and seed > 0:
218
        random.seed(seed)
219
        np.random.seed(seed)
220
        torch.manual_seed(seed)
221
        if torch.cuda.device_count() > 0:
222
            mpu.model_parallel_cuda_manual_seed(seed)
223
    else:
224
        raise ValueError("Seed ({}) should be a positive integer.".format(seed))
225

226

227
def _write_args_to_tensorboard(neox_args):
228

229
    """Write arguments to tensorboard."""
230
    if neox_args.tensorboard_writer:
231
        for arg_name in vars(neox_args):
232
            neox_args.tensorboard_writer.add_text(
233
                arg_name, str(getattr(neox_args, arg_name))
234
            )
235

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.