pytorch

Форк
0
/
test_c10d_spawn_gloo.py 
286 строк · 11.5 Кб
1
# Owner(s): ["oncall: distributed"]
2

3
import copy
4
import os
5
import sys
6
import tempfile
7

8
import test_c10d_spawn
9
import torch
10
import torch.distributed as c10d
11
import torch.nn as nn
12
from test_c10d_spawn import _torch_dist_nn_available, TestDistributedNNFunctions
13
from torch.testing._internal.common_cuda import TEST_CUDA, TEST_MULTIGPU
14
from torch.testing._internal.common_distributed import requires_gloo, \
15
    create_device, skip_if_lt_x_gpu
16
from torch.testing._internal.common_utils import TestCase, run_tests, skip_but_pass_in_sandcastle_if, TEST_WITH_DEV_DBG_ASAN
17

18
# Fails on Python-3.9, see https://github.com/pytorch/pytorch/issues/51619
19
if sys.version_info < (3, 9):
20
    class ProcessGroupShareTensorTest(test_c10d_spawn.AbstractProcessGroupShareTensorTest, TestCase):
21

22
        @classmethod
23
        def opts(cls, threads=2):
24
            opts = c10d.ProcessGroupGloo._Options()
25
            opts._timeout = 5.0
26
            opts._devices = [create_device(interface='lo')]
27
            opts._threads = threads
28
            return opts
29

30
        @classmethod
31
        def _init_pg_gloo(cls, rank, filename, world_size):
32
            store = c10d.FileStore(filename, world_size)
33
            backend = c10d.ProcessGroupGloo(
34
                store, rank, world_size, ProcessGroupShareTensorTest.opts())
35
            # set process group backends manually
36
            c10d.init_process_group(backend="gloo", store=store, rank=rank, world_size=world_size)
37
            pg = c10d.distributed_c10d._get_default_group()
38
            pg._register_backend(torch.device("cpu"), c10d.ProcessGroup.BackendType.GLOO, backend)
39
            pg._register_backend(torch.device("cuda"), c10d.ProcessGroup.BackendType.GLOO, backend)
40

41
            return pg
42

43
        @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "At least 2 CUDA GPUS needed")
44
        def test_shared_broadcast_gloo(self):
45
            self._test_multiprocess(
46
                ProcessGroupShareTensorTest._test_broadcast_process,
47
                [torch.ones(2, 2).to(i) * i for i in range(self.world_size)],
48
                ProcessGroupShareTensorTest._init_pg_gloo,
49
                1)
50

51
        @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "At least 2 CUDA GPUS needed")
52
        def test_shared_allreduce_gloo(self):
53
            self._test_multiprocess(
54
                ProcessGroupShareTensorTest._test_allreduce_process,
55
                [torch.ones(2, 2).to(i) for i in range(self.world_size)],
56
                ProcessGroupShareTensorTest._init_pg_gloo,
57
                1)
58

59
        @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "At least 2 CUDA GPUS needed")
60
        def test_shared_allgather_gloo(self):
61
            self._test_multiprocess(
62
                ProcessGroupShareTensorTest._test_allgather_process,
63
                [torch.ones(2, 2).to(i) * i for i in range(self.world_size)],
64
                ProcessGroupShareTensorTest._init_pg_gloo,
65
                self.world_size)
66

67
        @classmethod
68
        def _test_allgather_chunk_process(
69
                cls, rank, filename, shared_tensor, world_size, init_pg, c2p, p2c):
70
            pg = init_pg(rank, filename, world_size)
71
            chunks = torch.chunk(shared_tensor, world_size, dim=0)
72
            x = chunks[rank]
73
            ys = [torch.zeros_like(x) for _ in range(world_size)]
74
            pg.allgather(ys, x).wait()
75
            c2p.put((rank, chunks[0].to("cpu"), ys[0].to("cpu")))
76
            c2p.put((rank, chunks[1].to("cpu"), ys[1].to("cpu")))
77
            p2c.get()
78

79
        @skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "At least 2 CUDA GPUS needed")
80
        def test_shared_allgather_chunk_gloo(self):
81
            self._test_multiprocess(
82
                ProcessGroupShareTensorTest._test_allgather_chunk_process,
83
                torch.tensor(range(4)).reshape(2, 2),
84
                ProcessGroupShareTensorTest._init_pg_gloo,
85
                self.world_size)
86

87

88
class DistributedDataParallelSingleProcessTest(TestCase):
89
    def setUp(self):
90
        self.rank = 0
91
        self.world_size = 1
92
        self.file = tempfile.NamedTemporaryFile(delete=False)  # noqa: P201
93

94
    def tearDown(self):
95
        try:
96
            os.remove(self.file.name)
97
        except OSError:
98
            pass
99

100
    def _test_base(self, net, inp, check_allclose=True):
101
        store = c10d.FileStore(self.file.name, self.world_size)
102
        c10d.init_process_group(backend="gloo", store=store, rank=self.rank, world_size=self.world_size)
103
        process_group = c10d.distributed_c10d._get_default_group()
104
        if inp[0].is_cuda:
105
            device_ids = [torch.cuda.current_device()]
106
        else:
107
            device_ids = None
108

109
        ddp = nn.parallel.DistributedDataParallel(
110
            copy.deepcopy(net),
111
            device_ids=device_ids,
112
            process_group=process_group
113
        )
114

115
        net_opt = torch.optim.Adam(net.parameters(), lr=0.001)
116
        ddp_opt = torch.optim.Adam(ddp.parameters(), lr=0.001)
117

118
        for i, j in zip(ddp.parameters(), net.parameters()):
119
            self.assertTrue(i.allclose(j))
120

121
        for _ in range(10):
122
            net_out = net(*inp)
123
            ddp_out = ddp(*inp)
124

125
            net_out.sum().backward()
126
            ddp_out.sum().backward()
127

128
            net_opt.step()
129
            ddp_opt.step()
130

131
        if check_allclose:
132
            for i, j in zip(ddp.parameters(), net.parameters()):
133
                self.assertTrue(i.allclose(j))
134

135
    @requires_gloo()
136
    def test_cpu(self):
137
        self._test_base(nn.Linear(2, 2), [torch.randn(30, 2)])
138

139
    @requires_gloo()
140
    @skip_but_pass_in_sandcastle_if(not TEST_CUDA, "At least 1 CUDA GPUS needed")
141
    def test_cuda(self):
142
        self._test_base(nn.Linear(2, 2).to(0), [torch.randn(30, 2).to(0)])
143

144
    @requires_gloo()
145
    @skip_but_pass_in_sandcastle_if(not TEST_CUDA, "At least 1 CUDA GPUS needed")
146
    def test_rnn(self):
147
        # This test is inspired by the bug reported in
148
        # https://github.com/pytorch/pytorch/issues/36268
149
        BATCH_SIZE = 12  # Divisible by 2, 3, 4
150
        INPUT_DIM = 256
151
        OUTPUT_DIM = 256
152
        HIDDEN_DIM = 256
153
        N_LAYERS = 3
154
        SEQ_LEN = 100
155

156
        class Net(nn.Module):
157
            def __init__(self, input_dim, hidden_dim, output_dim, hidden_layers):
158
                super().__init__()
159
                self.input_dim = input_dim
160
                self.hidden_dim = hidden_dim
161
                self.output_dim = output_dim
162
                self.hidden_layers = hidden_layers
163

164
                self.lstm = nn.LSTM(input_dim, hidden_dim, hidden_layers, batch_first=True)
165
                self.h2o = nn.Linear(hidden_dim, output_dim)
166

167
            def forward(self, x, y):
168
                self.lstm.flatten_parameters()
169
                h_t, _ = self.lstm(x)
170
                output = self.h2o(h_t)
171
                loss = nn.functional.mse_loss(output, y)
172
                return loss
173

174
        net = Net(INPUT_DIM, HIDDEN_DIM, OUTPUT_DIM, N_LAYERS).to(0)
175
        inp = [
176
            torch.randn((BATCH_SIZE, SEQ_LEN, INPUT_DIM)).to(0),
177
            torch.rand((BATCH_SIZE, SEQ_LEN, OUTPUT_DIM)).to(0)
178
        ]
179

180
        # Not checking result allclose as the parameter inconsistency exist
181
        # prior to this change. See #37079
182
        self._test_base(net, inp, check_allclose=False)
183

184

185
# Skip dev-asan as torch + multiprocessing spawn have known issues
186
if not TEST_WITH_DEV_DBG_ASAN:
187
    class TestDistributedNNFunctionsGloo(TestDistributedNNFunctions):
188
        # Test Common Ops First.
189
        @requires_gloo()
190
        @skip_if_lt_x_gpu(2)
191
        @skip_but_pass_in_sandcastle_if(not _torch_dist_nn_available, "torch.distributed.nn is not available")
192
        def test_broadcast(self):
193
            self._test_broadcast("gloo")
194

195
        @requires_gloo()
196
        @skip_if_lt_x_gpu(2)
197
        @skip_but_pass_in_sandcastle_if(not _torch_dist_nn_available, "torch.distributed.nn is not available")
198
        def test_reduce(self):
199
            self._test_reduce("gloo")
200

201
        @requires_gloo()
202
        @skip_if_lt_x_gpu(2)
203
        @skip_but_pass_in_sandcastle_if(not _torch_dist_nn_available, "torch.distributed.nn is not available")
204
        def test_allreduce(self):
205
            self._test_allreduce("gloo")
206

207
        @requires_gloo()
208
        @skip_if_lt_x_gpu(2)
209
        @skip_but_pass_in_sandcastle_if(not _torch_dist_nn_available, "torch.distributed.nn is not available")
210
        def test_all_gather(self):
211
            self._test_all_gather("gloo")
212

213
        @requires_gloo()
214
        @skip_if_lt_x_gpu(2)
215
        @skip_but_pass_in_sandcastle_if(not _torch_dist_nn_available, "torch.distributed.nn is not available")
216
        def test_all_to_all(self):
217
            self._test_all_to_all("gloo")
218

219
        @requires_gloo()
220
        @skip_if_lt_x_gpu(2)
221
        @skip_but_pass_in_sandcastle_if(not _torch_dist_nn_available, "torch.distributed.nn is not available")
222
        def test_all_to_all_single(self):
223
            self._test_all_to_all_single("gloo")
224

225
        # Test Ops only supported in GLOO.
226
        @requires_gloo()
227
        @skip_if_lt_x_gpu(2)
228
        @skip_but_pass_in_sandcastle_if(not _torch_dist_nn_available, "torch.distributed.nn is not available")
229
        def test_gather(self):
230
            store = c10d.FileStore(self.file_name, self.world_size)
231
            # This is required because these functions calls directly to the .dist and needs
232
            # the world to be initialized
233
            c10d.init_process_group(store=store, rank=self.rank, world_size=self.world_size, backend='gloo')
234
            device = torch.device(f"cuda:{self.rank}")
235
            x = torch.ones(5, 5, device=device) + self.rank
236
            x.requires_grad = True
237
            tensors = torch.distributed.nn.gather(x, 1)
238
            if self.rank == 1:
239
                for i, t in enumerate(tensors):
240
                    self.assertEqual(t, torch.ones(5, 5, device=device) + i)
241
            elif self.rank == 0:
242
                for i, t in enumerate(tensors):
243
                    zeros = torch.zeros(5, 5, device=device)
244
                    self.assertEqual(t, zeros)
245
            y = torch.sum(torch.stack(tensors), axis=0)
246
            z = y.sin().sum()
247
            z.backward()
248

249
            # Test gradient
250
            x_s = 3 * torch.ones(5, 5, device=device)
251
            self.assertEqual(x.grad, x_s.cos())
252

253
        @requires_gloo()
254
        @skip_if_lt_x_gpu(2)
255
        @skip_but_pass_in_sandcastle_if(not _torch_dist_nn_available, "torch.distributed.nn is not available")
256
        def test_scatter(self):
257
            store = c10d.FileStore(self.file_name, self.world_size)
258
            # This is required because these functions calls directly to the .dist and needs
259
            # the world to be initialized
260
            c10d.init_process_group(store=store, rank=self.rank, world_size=self.world_size, backend='gloo')
261
            device = torch.device(f"cuda:{self.rank}")
262
            x0 = torch.ones(5, 5, device=device)
263
            x1 = torch.ones(5, 5, device=device) + 1
264
            x0.requires_grad = True
265
            x1.requires_grad = True
266

267
            y = torch.distributed.nn.scatter([x0, x1], 1)
268
            if self.rank == 1:
269
                self.assertEqual(y, 1 + torch.ones(5, 5, device=device))
270
            elif self.rank == 0:
271
                self.assertEqual(y, torch.ones(5, 5, device=device))
272
            z = y.sin().sum()
273
            z.backward()
274

275
            # Test gradient
276
            if self.rank == 1:
277
                x0_s = torch.ones(5, 5, device=device).cos()
278
                x1_s = (2 * torch.ones(5, 5, device=device)).cos()
279
                self.assertEqual(x0.grad, x0_s)
280
                self.assertEqual(x1.grad, x1_s)
281
            if self.rank == 0:
282
                self.assertEqual(x0.grad, torch.zeros(5, 5, device=device))
283

284

285
if __name__ == '__main__':
286
    run_tests()
287

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.