pytorch

Форк
0
/
test_c10d_spawn_gloo.py 
341 строка · 12.0 Кб
1
# Owner(s): ["oncall: distributed"]
2

3
import copy
4
import os
5
import sys
6
import tempfile
7

8
import test_c10d_spawn
9
from test_c10d_spawn import _torch_dist_nn_available, TestDistributedNNFunctions
10

11
import torch
12
import torch.distributed as c10d
13
import torch.nn as nn
14
from torch.testing._internal.common_cuda import TEST_CUDA, TEST_MULTIGPU
15
from torch.testing._internal.common_distributed import (
16
    create_device,
17
    requires_gloo,
18
    skip_if_lt_x_gpu,
19
)
20
from torch.testing._internal.common_utils import (
21
    run_tests,
22
    skip_but_pass_in_sandcastle_if,
23
    TEST_WITH_DEV_DBG_ASAN,
24
    TestCase,
25
)
26

27

28
# Fails on Python-3.9, see https://github.com/pytorch/pytorch/issues/51619
29
if sys.version_info < (3, 9):
30

31
    class ProcessGroupShareTensorTest(
32
        test_c10d_spawn.AbstractProcessGroupShareTensorTest, TestCase
33
    ):
34
        @classmethod
35
        def opts(cls, threads=2):
36
            opts = c10d.ProcessGroupGloo._Options()
37
            opts._timeout = 5.0
38
            opts._devices = [create_device(interface="lo")]
39
            opts._threads = threads
40
            return opts
41

42
        @classmethod
43
        def _init_pg_gloo(cls, rank, filename, world_size):
44
            store = c10d.FileStore(filename, world_size)
45
            backend = c10d.ProcessGroupGloo(
46
                store, rank, world_size, ProcessGroupShareTensorTest.opts()
47
            )
48
            # set process group backends manually
49
            c10d.init_process_group(
50
                backend="gloo", store=store, rank=rank, world_size=world_size
51
            )
52
            pg = c10d.distributed_c10d._get_default_group()
53
            pg._register_backend(
54
                torch.device("cpu"), c10d.ProcessGroup.BackendType.GLOO, backend
55
            )
56
            pg._register_backend(
57
                torch.device("cuda"), c10d.ProcessGroup.BackendType.GLOO, backend
58
            )
59

60
            return pg
61

62
        @skip_but_pass_in_sandcastle_if(
63
            not TEST_MULTIGPU, "At least 2 CUDA GPUS needed"
64
        )
65
        def test_shared_broadcast_gloo(self):
66
            self._test_multiprocess(
67
                ProcessGroupShareTensorTest._test_broadcast_process,
68
                [torch.ones(2, 2).to(i) * i for i in range(self.world_size)],
69
                ProcessGroupShareTensorTest._init_pg_gloo,
70
                1,
71
            )
72

73
        @skip_but_pass_in_sandcastle_if(
74
            not TEST_MULTIGPU, "At least 2 CUDA GPUS needed"
75
        )
76
        def test_shared_allreduce_gloo(self):
77
            self._test_multiprocess(
78
                ProcessGroupShareTensorTest._test_allreduce_process,
79
                [torch.ones(2, 2).to(i) for i in range(self.world_size)],
80
                ProcessGroupShareTensorTest._init_pg_gloo,
81
                1,
82
            )
83

84
        @skip_but_pass_in_sandcastle_if(
85
            not TEST_MULTIGPU, "At least 2 CUDA GPUS needed"
86
        )
87
        def test_shared_allgather_gloo(self):
88
            self._test_multiprocess(
89
                ProcessGroupShareTensorTest._test_allgather_process,
90
                [torch.ones(2, 2).to(i) * i for i in range(self.world_size)],
91
                ProcessGroupShareTensorTest._init_pg_gloo,
92
                self.world_size,
93
            )
94

95
        @classmethod
96
        def _test_allgather_chunk_process(
97
            cls, rank, filename, shared_tensor, world_size, init_pg, c2p, p2c
98
        ):
99
            pg = init_pg(rank, filename, world_size)
100
            chunks = torch.chunk(shared_tensor, world_size, dim=0)
101
            x = chunks[rank]
102
            ys = [torch.zeros_like(x) for _ in range(world_size)]
103
            pg.allgather(ys, x).wait()
104
            c2p.put((rank, chunks[0].to("cpu"), ys[0].to("cpu")))
105
            c2p.put((rank, chunks[1].to("cpu"), ys[1].to("cpu")))
106
            p2c.get()
107

108
        @skip_but_pass_in_sandcastle_if(
109
            not TEST_MULTIGPU, "At least 2 CUDA GPUS needed"
110
        )
111
        def test_shared_allgather_chunk_gloo(self):
112
            self._test_multiprocess(
113
                ProcessGroupShareTensorTest._test_allgather_chunk_process,
114
                torch.tensor(range(4)).reshape(2, 2),
115
                ProcessGroupShareTensorTest._init_pg_gloo,
116
                self.world_size,
117
            )
118

119

120
class DistributedDataParallelSingleProcessTest(TestCase):
121
    def setUp(self):
122
        self.rank = 0
123
        self.world_size = 1
124
        self.file = tempfile.NamedTemporaryFile(delete=False)  # noqa: P201
125

126
    def tearDown(self):
127
        try:
128
            os.remove(self.file.name)
129
        except OSError:
130
            pass
131

132
    def _test_base(self, net, inp, check_allclose=True):
133
        store = c10d.FileStore(self.file.name, self.world_size)
134
        c10d.init_process_group(
135
            backend="gloo", store=store, rank=self.rank, world_size=self.world_size
136
        )
137
        process_group = c10d.distributed_c10d._get_default_group()
138
        if inp[0].is_cuda:
139
            device_ids = [torch.cuda.current_device()]
140
        else:
141
            device_ids = None
142

143
        ddp = nn.parallel.DistributedDataParallel(
144
            copy.deepcopy(net), device_ids=device_ids, process_group=process_group
145
        )
146

147
        net_opt = torch.optim.Adam(net.parameters(), lr=0.001)
148
        ddp_opt = torch.optim.Adam(ddp.parameters(), lr=0.001)
149

150
        for i, j in zip(ddp.parameters(), net.parameters()):
151
            self.assertTrue(i.allclose(j))
152

153
        for _ in range(10):
154
            net_out = net(*inp)
155
            ddp_out = ddp(*inp)
156

157
            net_out.sum().backward()
158
            ddp_out.sum().backward()
159

160
            net_opt.step()
161
            ddp_opt.step()
162

163
        if check_allclose:
164
            for i, j in zip(ddp.parameters(), net.parameters()):
165
                self.assertTrue(i.allclose(j))
166

167
    @requires_gloo()
168
    def test_cpu(self):
169
        self._test_base(nn.Linear(2, 2), [torch.randn(30, 2)])
170

171
    @requires_gloo()
172
    @skip_but_pass_in_sandcastle_if(not TEST_CUDA, "At least 1 CUDA GPUS needed")
173
    def test_cuda(self):
174
        self._test_base(nn.Linear(2, 2).to(0), [torch.randn(30, 2).to(0)])
175

176
    @requires_gloo()
177
    @skip_but_pass_in_sandcastle_if(not TEST_CUDA, "At least 1 CUDA GPUS needed")
178
    def test_rnn(self):
179
        # This test is inspired by the bug reported in
180
        # https://github.com/pytorch/pytorch/issues/36268
181
        BATCH_SIZE = 12  # Divisible by 2, 3, 4
182
        INPUT_DIM = 256
183
        OUTPUT_DIM = 256
184
        HIDDEN_DIM = 256
185
        N_LAYERS = 3
186
        SEQ_LEN = 100
187

188
        class Net(nn.Module):
189
            def __init__(self, input_dim, hidden_dim, output_dim, hidden_layers):
190
                super().__init__()
191
                self.input_dim = input_dim
192
                self.hidden_dim = hidden_dim
193
                self.output_dim = output_dim
194
                self.hidden_layers = hidden_layers
195

196
                self.lstm = nn.LSTM(
197
                    input_dim, hidden_dim, hidden_layers, batch_first=True
198
                )
199
                self.h2o = nn.Linear(hidden_dim, output_dim)
200

201
            def forward(self, x, y):
202
                self.lstm.flatten_parameters()
203
                h_t, _ = self.lstm(x)
204
                output = self.h2o(h_t)
205
                loss = nn.functional.mse_loss(output, y)
206
                return loss
207

208
        net = Net(INPUT_DIM, HIDDEN_DIM, OUTPUT_DIM, N_LAYERS).to(0)
209
        inp = [
210
            torch.randn((BATCH_SIZE, SEQ_LEN, INPUT_DIM)).to(0),
211
            torch.rand((BATCH_SIZE, SEQ_LEN, OUTPUT_DIM)).to(0),
212
        ]
213

214
        # Not checking result allclose as the parameter inconsistency exist
215
        # prior to this change. See #37079
216
        self._test_base(net, inp, check_allclose=False)
217

218

219
# Skip dev-asan as torch + multiprocessing spawn have known issues
220
if not TEST_WITH_DEV_DBG_ASAN:
221

222
    class TestDistributedNNFunctionsGloo(TestDistributedNNFunctions):
223
        # Test Common Ops First.
224
        @requires_gloo()
225
        @skip_if_lt_x_gpu(2)
226
        @skip_but_pass_in_sandcastle_if(
227
            not _torch_dist_nn_available, "torch.distributed.nn is not available"
228
        )
229
        def test_broadcast(self):
230
            self._test_broadcast("gloo")
231

232
        @requires_gloo()
233
        @skip_if_lt_x_gpu(2)
234
        @skip_but_pass_in_sandcastle_if(
235
            not _torch_dist_nn_available, "torch.distributed.nn is not available"
236
        )
237
        def test_reduce(self):
238
            self._test_reduce("gloo")
239

240
        @requires_gloo()
241
        @skip_if_lt_x_gpu(2)
242
        @skip_but_pass_in_sandcastle_if(
243
            not _torch_dist_nn_available, "torch.distributed.nn is not available"
244
        )
245
        def test_allreduce(self):
246
            self._test_allreduce("gloo")
247

248
        @requires_gloo()
249
        @skip_if_lt_x_gpu(2)
250
        @skip_but_pass_in_sandcastle_if(
251
            not _torch_dist_nn_available, "torch.distributed.nn is not available"
252
        )
253
        def test_all_gather(self):
254
            self._test_all_gather("gloo")
255

256
        @requires_gloo()
257
        @skip_if_lt_x_gpu(2)
258
        @skip_but_pass_in_sandcastle_if(
259
            not _torch_dist_nn_available, "torch.distributed.nn is not available"
260
        )
261
        def test_all_to_all(self):
262
            self._test_all_to_all("gloo")
263

264
        @requires_gloo()
265
        @skip_if_lt_x_gpu(2)
266
        @skip_but_pass_in_sandcastle_if(
267
            not _torch_dist_nn_available, "torch.distributed.nn is not available"
268
        )
269
        def test_all_to_all_single(self):
270
            self._test_all_to_all_single("gloo")
271

272
        # Test Ops only supported in GLOO.
273
        @requires_gloo()
274
        @skip_if_lt_x_gpu(2)
275
        @skip_but_pass_in_sandcastle_if(
276
            not _torch_dist_nn_available, "torch.distributed.nn is not available"
277
        )
278
        def test_gather(self):
279
            store = c10d.FileStore(self.file_name, self.world_size)
280
            # This is required because these functions calls directly to the .dist and needs
281
            # the world to be initialized
282
            c10d.init_process_group(
283
                store=store, rank=self.rank, world_size=self.world_size, backend="gloo"
284
            )
285
            device = torch.device(f"cuda:{self.rank}")
286
            x = torch.ones(5, 5, device=device) + self.rank
287
            x.requires_grad = True
288
            tensors = torch.distributed.nn.gather(x, 1)
289
            if self.rank == 1:
290
                for i, t in enumerate(tensors):
291
                    self.assertEqual(t, torch.ones(5, 5, device=device) + i)
292
            elif self.rank == 0:
293
                for i, t in enumerate(tensors):
294
                    zeros = torch.zeros(5, 5, device=device)
295
                    self.assertEqual(t, zeros)
296
            y = torch.sum(torch.stack(tensors), axis=0)
297
            z = y.sin().sum()
298
            z.backward()
299

300
            # Test gradient
301
            x_s = 3 * torch.ones(5, 5, device=device)
302
            self.assertEqual(x.grad, x_s.cos())
303

304
        @requires_gloo()
305
        @skip_if_lt_x_gpu(2)
306
        @skip_but_pass_in_sandcastle_if(
307
            not _torch_dist_nn_available, "torch.distributed.nn is not available"
308
        )
309
        def test_scatter(self):
310
            store = c10d.FileStore(self.file_name, self.world_size)
311
            # This is required because these functions calls directly to the .dist and needs
312
            # the world to be initialized
313
            c10d.init_process_group(
314
                store=store, rank=self.rank, world_size=self.world_size, backend="gloo"
315
            )
316
            device = torch.device(f"cuda:{self.rank}")
317
            x0 = torch.ones(5, 5, device=device)
318
            x1 = torch.ones(5, 5, device=device) + 1
319
            x0.requires_grad = True
320
            x1.requires_grad = True
321

322
            y = torch.distributed.nn.scatter([x0, x1], 1)
323
            if self.rank == 1:
324
                self.assertEqual(y, 1 + torch.ones(5, 5, device=device))
325
            elif self.rank == 0:
326
                self.assertEqual(y, torch.ones(5, 5, device=device))
327
            z = y.sin().sum()
328
            z.backward()
329

330
            # Test gradient
331
            if self.rank == 1:
332
                x0_s = torch.ones(5, 5, device=device).cos()
333
                x1_s = (2 * torch.ones(5, 5, device=device)).cos()
334
                self.assertEqual(x0.grad, x0_s)
335
                self.assertEqual(x1.grad, x1_s)
336
            if self.rank == 0:
337
                self.assertEqual(x0.grad, torch.zeros(5, 5, device=device))
338

339

340
if __name__ == "__main__":
341
    run_tests()
342

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.