pytorch

Форк
0
/
nccl_ops_test.py 
187 строк · 7.9 Кб
1

2

3

4

5

6
import unittest
7
import hypothesis.strategies as st
8
from hypothesis import given, assume
9
import numpy as np
10
import time
11
import os
12
from caffe2.proto import caffe2_pb2
13
from caffe2.python import core, workspace, muji, dyndep
14
import caffe2.python.hypothesis_test_util as hu
15

16
np.random.seed(1)
17

18
dyndep.InitOpsLibrary('@/caffe2/caffe2/contrib/nccl:nccl_ops')
19

20

21
def gpu_device(i):
22
    device_option = caffe2_pb2.DeviceOption()
23
    device_option.device_type = workspace.GpuDeviceType
24
    device_option.device_id = i
25
    return device_option
26

27

28
def benchmark(ws, net, warmups=5, iters=100):
29
    for _ in range(warmups):
30
        ws.run(net)
31
    plan = core.Plan("plan")
32
    plan.AddStep(core.ExecutionStep("test-step", net, iters))
33
    before = time.time()
34
    ws.run(plan)
35
    after = time.time()
36
    print("Timing network, time taken per-iteration: {:.6f}ms".format((
37
        after - before) / float(iters) * 1000.0))
38
    return after - before
39

40

41
@unittest.skipIf(not workspace.has_cuda_support, "NCCL only on CUDA GPU")
42
class NCCLOpsTest(hu.HypothesisTestCase):
43
    @given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
44
           m=st.integers(min_value=1, max_value=1000),
45
           in_place=st.booleans())
46
    def test_nccl_allreduce(self, n, m, in_place):
47
        xs = [np.random.randn(m).astype(np.float32) for i in range(n)]
48
        inputs = [str("x_{}".format(i)) for i in range(n)]
49
        prefix = "" if in_place else "o"
50
        outputs = [str("{}x_{}".format(prefix, i)) for i in range(n)]
51
        op = core.CreateOperator("NCCLAllreduce", inputs, outputs)
52
        input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}
53

54
        def allreduce(*args):
55
            assert len(args) == n
56
            output = np.sum(args, axis=0)
57
            return [output for _ in range(n)]
58

59
        outputs = self.assertReferenceChecks(
60
            hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
61
            allreduce, input_device_options)
62
        for output in outputs:
63
            np.testing.assert_array_equal(outputs[0], output)
64
            self.assertEqual(outputs[0].tobytes(), output.tobytes())
65

66
    @given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
67
           m=st.integers(min_value=1, max_value=1000),
68
           root=st.integers(min_value=0,
69
                            max_value=workspace.NumGpuDevices() - 1))
70
    def test_nccl_broadcast(self, n, m, root):
71
        assume(root < n)
72
        xs = [np.random.randn(m).astype(np.float32) for i in range(n)]
73
        inputs = [str("x_{}".format(i)) for i in range(n)]
74
        op = core.CreateOperator("NCCLBroadcast", inputs, inputs, root=root)
75
        input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}
76

77
        def broadcast(*args):
78
            assert len(args) == n
79
            return [args[root] for _ in range(n)]
80

81
        self.assertReferenceChecks(
82
            hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
83
            broadcast, input_device_options)
84

85
    @given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
86
           m=st.integers(min_value=1, max_value=1000),
87
           # NCCL Reduce seems to deadlock for non-zero roots.
88
           root=st.integers(min_value=0, max_value=0),
89
           in_place=st.booleans())
90
    def test_nccl_reduce(self, n, m, root, in_place):
91
        assume(in_place is False or root == 0)
92
        xs = [np.random.randn(m).astype(np.float32) for i in range(n)]
93
        inputs = [str("x_{}".format(i)) for i in range(n)]
94
        op = core.CreateOperator(
95
            "NCCLReduce", inputs,
96
            inputs[root] if in_place else b"o", root=root)
97
        input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}
98

99
        def reduce(*args):
100
            assert len(args) == n
101
            return [np.sum(args, axis=0)]
102

103
        self.assertReferenceChecks(
104
            hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
105
            reduce, input_device_options)
106

107
    @given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
108
           m=st.integers(min_value=1, max_value=1000))
109
    def test_nccl_allgather(self, n, m):
110
        xs = [np.random.randn(m).astype(np.float32) for i in range(n)]
111
        inputs = [str("x_{}".format(i)) for i in range(n)]
112
        outputs = [str("o_{}".format(i)) for i in range(n)]
113
        op = core.CreateOperator("NCCLAllGather", inputs, outputs)
114
        input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}
115

116
        def allgather(*args):
117
            assert len(args) == n
118
            return [np.stack(args, axis=0) for _ in range(n)]
119

120
        outputs = self.assertReferenceChecks(
121
            hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
122
            allgather, input_device_options)
123
        for output in outputs:
124
            np.testing.assert_array_equal(outputs[0], output)
125
            self.assertEqual(outputs[0].tobytes(), output.tobytes())
126

127
    @given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
128
           m=st.integers(min_value=1, max_value=1000))
129
    def test_nccl_reduce_scatter(self, n, m):
130
        xs = [np.random.randn(n, m).astype(np.float32) for i in range(n)]
131
        inputs = [str("x_{}".format(i)) for i in range(n)]
132
        outputs = [str("o_{}".format(i)) for i in range(n)]
133
        op = core.CreateOperator("NCCLReduceScatter", inputs, outputs)
134
        input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}
135

136
        def reduce_scatter(*args):
137
            assert len(args) == n
138
            reduced = sum(args)
139
            assert len(reduced.shape) > 1
140
            ref = [reduced[i, :] for i in range(n)]
141
            return ref
142

143
        self.assertReferenceChecks(
144
            hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
145
            reduce_scatter, input_device_options)
146

147
    @given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
148
           m=st.integers(min_value=100000, max_value=100000),
149
           iters=st.integers(min_value=1, max_value=100),
150
           net_type=st.sampled_from(["dag", "async_dag", "simple"]))
151
    def _test_nccl_sync(self, n, m, iters, net_type):
152
        inputs = [str("x_{}".format(i)) for i in range(n)]
153
        extra_inputs = [str("xe_{}".format(i)) for i in range(n)]
154
        net = core.Net("asdf")
155
        net.Proto().type = net_type
156
        net.Proto().num_workers = n
157
        for i in range(n):
158
            net.ConstantFill([], inputs[i], shape=[m], value=0.0,
159
                             device_option=gpu_device(i))
160
            net.ConstantFill([], extra_inputs[i], shape=[m], value=1.0,
161
                             device_option=gpu_device(i))
162
            for _ in range(iters):
163
                net.Sum([inputs[i], extra_inputs[i]], [inputs[i]],
164
                        device_option=gpu_device(i))
165
        net.NCCLReduce(inputs, [inputs[0]], device_option=gpu_device(0))
166
        self.ws.run(net)
167
        np.testing.assert_array_equal(
168
            self.ws.blobs[inputs[0]].fetch(),
169
            np.full(shape=(m,), fill_value=iters * n, dtype=np.float32))
170

171
    @unittest.skipIf(not os.environ.get("CAFFE2_BENCHMARK"), "Benchmark")
172
    def test_timings(self):
173
        for n in range(2, workspace.NumGpuDevices()):
174
            for in_place in [False, True]:
175
                xs = [np.random.randn(1e7).astype(np.float32)
176
                      for i in range(n)]
177
                inputs = [str("x_{}".format(i)) for i in range(n)]
178
                prefix = "" if in_place else "o"
179
                outputs = [str("{}x_{}".format(prefix, i)) for i in range(n)]
180

181
                net = core.Net("test")
182
                net.NCCLAllreduce(inputs, outputs)
183
                net.RunAllOnGPU()
184
                for i in range(n):
185
                    self.ws.create_blob(inputs[i]).feed(xs[i], gpu_device(i))
186
                self.ws.run(net)
187
                net_time = benchmark(self.ws, net)
188
                vanilla = core.Net("vanilla")
189
                muji.Allreduce(vanilla, inputs)
190
                vanilla_time = benchmark(self.ws, vanilla)
191
                print("Speedup for NCCL: {:.2f}".format(
192
                    vanilla_time / net_time))
193

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.