7
import hypothesis.strategies as st
8
from hypothesis import given, assume
12
from caffe2.proto import caffe2_pb2
13
from caffe2.python import core, workspace, muji, dyndep
14
import caffe2.python.hypothesis_test_util as hu
18
dyndep.InitOpsLibrary('@/caffe2/caffe2/contrib/nccl:nccl_ops')
22
device_option = caffe2_pb2.DeviceOption()
23
device_option.device_type = workspace.GpuDeviceType
24
device_option.device_id = i
28
def benchmark(ws, net, warmups=5, iters=100):
29
for _ in range(warmups):
31
plan = core.Plan("plan")
32
plan.AddStep(core.ExecutionStep("test-step", net, iters))
36
print("Timing network, time taken per-iteration: {:.6f}ms".format((
37
after - before) / float(iters) * 1000.0))
41
@unittest.skipIf(not workspace.has_cuda_support, "NCCL only on CUDA GPU")
42
class NCCLOpsTest(hu.HypothesisTestCase):
43
@given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
44
m=st.integers(min_value=1, max_value=1000),
45
in_place=st.booleans())
46
def test_nccl_allreduce(self, n, m, in_place):
47
xs = [np.random.randn(m).astype(np.float32) for i in range(n)]
48
inputs = [str("x_{}".format(i)) for i in range(n)]
49
prefix = "" if in_place else "o"
50
outputs = [str("{}x_{}".format(prefix, i)) for i in range(n)]
51
op = core.CreateOperator("NCCLAllreduce", inputs, outputs)
52
input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}
56
output = np.sum(args, axis=0)
57
return [output for _ in range(n)]
59
outputs = self.assertReferenceChecks(
60
hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
61
allreduce, input_device_options)
62
for output in outputs:
63
np.testing.assert_array_equal(outputs[0], output)
64
self.assertEqual(outputs[0].tobytes(), output.tobytes())
66
@given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
67
m=st.integers(min_value=1, max_value=1000),
68
root=st.integers(min_value=0,
69
max_value=workspace.NumGpuDevices() - 1))
70
def test_nccl_broadcast(self, n, m, root):
72
xs = [np.random.randn(m).astype(np.float32) for i in range(n)]
73
inputs = [str("x_{}".format(i)) for i in range(n)]
74
op = core.CreateOperator("NCCLBroadcast", inputs, inputs, root=root)
75
input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}
79
return [args[root] for _ in range(n)]
81
self.assertReferenceChecks(
82
hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
83
broadcast, input_device_options)
85
@given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
86
m=st.integers(min_value=1, max_value=1000),
87
# NCCL Reduce seems to deadlock for non-zero roots.
88
root=st.integers(min_value=0, max_value=0),
89
in_place=st.booleans())
90
def test_nccl_reduce(self, n, m, root, in_place):
91
assume(in_place is False or root == 0)
92
xs = [np.random.randn(m).astype(np.float32) for i in range(n)]
93
inputs = [str("x_{}".format(i)) for i in range(n)]
94
op = core.CreateOperator(
96
inputs[root] if in_place else b"o", root=root)
97
input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}
100
assert len(args) == n
101
return [np.sum(args, axis=0)]
103
self.assertReferenceChecks(
104
hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
105
reduce, input_device_options)
107
@given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
108
m=st.integers(min_value=1, max_value=1000))
109
def test_nccl_allgather(self, n, m):
110
xs = [np.random.randn(m).astype(np.float32) for i in range(n)]
111
inputs = [str("x_{}".format(i)) for i in range(n)]
112
outputs = [str("o_{}".format(i)) for i in range(n)]
113
op = core.CreateOperator("NCCLAllGather", inputs, outputs)
114
input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}
116
def allgather(*args):
117
assert len(args) == n
118
return [np.stack(args, axis=0) for _ in range(n)]
120
outputs = self.assertReferenceChecks(
121
hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
122
allgather, input_device_options)
123
for output in outputs:
124
np.testing.assert_array_equal(outputs[0], output)
125
self.assertEqual(outputs[0].tobytes(), output.tobytes())
127
@given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
128
m=st.integers(min_value=1, max_value=1000))
129
def test_nccl_reduce_scatter(self, n, m):
130
xs = [np.random.randn(n, m).astype(np.float32) for i in range(n)]
131
inputs = [str("x_{}".format(i)) for i in range(n)]
132
outputs = [str("o_{}".format(i)) for i in range(n)]
133
op = core.CreateOperator("NCCLReduceScatter", inputs, outputs)
134
input_device_options = {n: gpu_device(i) for i, n in enumerate(inputs)}
136
def reduce_scatter(*args):
137
assert len(args) == n
139
assert len(reduced.shape) > 1
140
ref = [reduced[i, :] for i in range(n)]
143
self.assertReferenceChecks(
144
hu.gpu_do, op, [xs[i] for i, _ in enumerate(inputs)],
145
reduce_scatter, input_device_options)
147
@given(n=st.integers(min_value=2, max_value=workspace.NumGpuDevices()),
148
m=st.integers(min_value=100000, max_value=100000),
149
iters=st.integers(min_value=1, max_value=100),
150
net_type=st.sampled_from(["dag", "async_dag", "simple"]))
151
def _test_nccl_sync(self, n, m, iters, net_type):
152
inputs = [str("x_{}".format(i)) for i in range(n)]
153
extra_inputs = [str("xe_{}".format(i)) for i in range(n)]
154
net = core.Net("asdf")
155
net.Proto().type = net_type
156
net.Proto().num_workers = n
158
net.ConstantFill([], inputs[i], shape=[m], value=0.0,
159
device_option=gpu_device(i))
160
net.ConstantFill([], extra_inputs[i], shape=[m], value=1.0,
161
device_option=gpu_device(i))
162
for _ in range(iters):
163
net.Sum([inputs[i], extra_inputs[i]], [inputs[i]],
164
device_option=gpu_device(i))
165
net.NCCLReduce(inputs, [inputs[0]], device_option=gpu_device(0))
167
np.testing.assert_array_equal(
168
self.ws.blobs[inputs[0]].fetch(),
169
np.full(shape=(m,), fill_value=iters * n, dtype=np.float32))
171
@unittest.skipIf(not os.environ.get("CAFFE2_BENCHMARK"), "Benchmark")
172
def test_timings(self):
173
for n in range(2, workspace.NumGpuDevices()):
174
for in_place in [False, True]:
175
xs = [np.random.randn(1e7).astype(np.float32)
177
inputs = [str("x_{}".format(i)) for i in range(n)]
178
prefix = "" if in_place else "o"
179
outputs = [str("{}x_{}".format(prefix, i)) for i in range(n)]
181
net = core.Net("test")
182
net.NCCLAllreduce(inputs, outputs)
185
self.ws.create_blob(inputs[i]).feed(xs[i], gpu_device(i))
187
net_time = benchmark(self.ws, net)
188
vanilla = core.Net("vanilla")
189
muji.Allreduce(vanilla, inputs)
190
vanilla_time = benchmark(self.ws, vanilla)
191
print("Speedup for NCCL: {:.2f}".format(
192
vanilla_time / net_time))