pytorch

Форк
0
292 строки · 9.3 Кб
1
#!/usr/bin/env python3
2
#
3
# Measure distributed training iteration time.
4
#
5
# This program performs a sweep over a) a number of model architectures, and
6
# b) an increasing number of processes. This produces a 1-GPU baseline,
7
# an 8-GPU baseline (if applicable), as well as measurements for however
8
# many processes can participate in training.
9
#
10

11
import argparse
12
import itertools
13
import json
14
import os
15
import shlex
16
import subprocess
17
import sys
18
import time
19

20
import numpy as np
21
import torchvision
22

23
import torch
24
import torch.distributed as dist
25
import torch.nn as nn
26
import torch.optim as optim
27

28

29
def allgather_object(obj):
30
    out = [None for _ in range(dist.get_world_size())]
31
    dist.all_gather_object(out, obj)
32
    return out
33

34

35
def allgather_run(cmd):
36
    proc = subprocess.run(shlex.split(cmd), capture_output=True)
37
    assert proc.returncode == 0
38
    return allgather_object(proc.stdout.decode("utf-8"))
39

40

41
def allequal(iterator):
42
    iterator = iter(iterator)
43
    try:
44
        first = next(iterator)
45
    except StopIteration:
46
        return True
47
    return all(first == rest for rest in iterator)
48

49

50
def benchmark_process_group(pg, benchmark, use_ddp_for_single_rank=True):
51
    torch.manual_seed(pg.rank())
52
    torch.cuda.manual_seed(pg.rank())
53

54
    model = benchmark.create_model()
55
    data = [(benchmark.generate_inputs(), benchmark.generate_target())]
56
    criterion = nn.CrossEntropyLoss()
57
    optimizer = optim.SGD(model.parameters(), 0.001, momentum=0.9, weight_decay=1e-4)
58
    if use_ddp_for_single_rank or pg.size() > 1:
59
        model = torch.nn.parallel.DistributedDataParallel(
60
            model,
61
            device_ids=[torch.cuda.current_device()],
62
            broadcast_buffers=False,
63
            process_group=pg,
64
            bucket_cap_mb=benchmark.bucket_size,
65
        )
66

67
    measurements = []
68
    warmup_iterations = 5
69
    measured_iterations = 10
70
    for inputs, target in data * (warmup_iterations + measured_iterations):
71
        start = time.time()
72
        output = model(*inputs)
73
        loss = criterion(output, target)
74
        loss.backward()
75
        optimizer.step()
76
        torch.cuda.synchronize()
77
        measurements.append(time.time() - start)
78

79
    # Throw away measurements for warmup iterations
80
    return measurements[warmup_iterations:]
81

82

83
def run_benchmark(benchmark, ranks, opts):
84
    group = dist.new_group(ranks=ranks, backend=benchmark.distributed_backend)
85
    measurements = []
86
    if dist.get_rank() in set(ranks):
87
        if not opts:
88
            opts = {}
89
        measurements = benchmark_process_group(group, benchmark, **opts)
90
    dist.destroy_process_group(group)
91
    dist.barrier()
92

93
    # Aggregate measurements for better estimation of percentiles
94
    return list(itertools.chain(*allgather_object(measurements)))
95

96

97
def sweep(benchmark):
98
    # Synthesize the set of benchmarks to run.
99
    # This list contain tuples for ("string prefix", [rank...]).
100
    benchmarks = []
101

102
    def append_benchmark(prefix, ranks, opts=None):
103
        prefix = f"{len(ranks):4} GPUs -- {prefix}"
104
        benchmarks.append((prefix, ranks, opts))
105

106
    def local_print(msg):
107
        if dist.get_rank() == 0:
108
            print(msg, end="", flush=True)  # noqa: E999
109

110
    def print_header():
111
        local_print("\n")
112
        local_print("%22s" % "")
113
        for p in [50, 75, 90, 95]:
114
            local_print("%14s%10s" % ("sec/iter", "ex/sec"))
115
        local_print("\n")
116

117
    def print_measurements(prefix, nelem, measurements):
118
        measurements = sorted(measurements)
119
        local_print("%8s:" % prefix)
120
        for p in [50, 75, 90, 95]:
121
            v = np.percentile(measurements, p)
122
            local_print("  p%02d:  %1.3fs  %6d/s" % (p, v, nelem / v))
123
        local_print("\n")
124

125
    # Every process runs once by themselves to warm up (CUDA init, etc).
126
    append_benchmark("  warmup", [dist.get_rank()], {"use_ddp_for_single_rank": False})
127

128
    # Single machine baselines
129
    append_benchmark("  no ddp", range(1), {"use_ddp_for_single_rank": False})
130
    append_benchmark("   1M/1G", range(1))
131
    append_benchmark("   1M/2G", range(2))
132
    append_benchmark("   1M/4G", range(4))
133

134
    # Multi-machine benchmarks
135
    for i in range(1, (dist.get_world_size() // 8) + 1):
136
        append_benchmark("   %dM/8G" % i, range(i * 8))
137

138
    # Run benchmarks in order of increasing number of GPUs
139
    print_header()
140
    results = []
141
    for prefix, ranks, opts in sorted(benchmarks, key=lambda tup: len(tup[1])):
142
        # Turn range into materialized list.
143
        ranks = list(ranks)
144
        measurements = run_benchmark(benchmark, ranks, opts)
145
        if "warmup" not in prefix:
146
            print_measurements(prefix, benchmark.batch_size, measurements)
147
            results.append({"ranks": ranks, "measurements": measurements})
148

149
    return results
150

151

152
class Benchmark:
153
    def __init__(self, device, distributed_backend, bucket_size):
154
        self.device = device
155
        self.batch_size = 32
156
        self.distributed_backend = distributed_backend
157
        self.bucket_size = bucket_size
158

159
    def __str__(self):
160
        raise NotImplementedError
161

162
    def create_model(self):
163
        raise NotImplementedError
164

165
    def generate_inputs(self):
166
        raise NotImplementedError
167

168
    def generate_target(self):
169
        raise NotImplementedError
170

171

172
class TorchvisionBenchmark(Benchmark):
173
    def __init__(self, device, distributed_backend, bucket_size, model):
174
        super().__init__(
175
            device,
176
            distributed_backend,
177
            bucket_size,
178
        )
179
        self.model = model
180

181
    def __str__(self):
182
        return f"{self.model} with batch size {self.batch_size}"
183

184
    def create_model(self):
185
        return torchvision.models.__dict__[self.model]().to(self.device)
186

187
    def generate_inputs(self):
188
        return [torch.rand([self.batch_size, 3, 224, 224], device=self.device)]
189

190
    def generate_target(self):
191
        return torch.tensor([1] * self.batch_size, dtype=torch.long, device=self.device)
192

193

194
def main():
195
    parser = argparse.ArgumentParser(description="PyTorch distributed benchmark suite")
196
    parser.add_argument("--rank", type=int, default=os.environ["RANK"])
197
    parser.add_argument("--world-size", type=int, required=True)
198
    parser.add_argument("--distributed-backend", type=str, default="nccl")
199
    parser.add_argument("--bucket-size", type=int, default=25)
200
    parser.add_argument("--master-addr", type=str, required=True)
201
    parser.add_argument("--master-port", type=str, required=True)
202
    parser.add_argument("--model", type=str)
203
    parser.add_argument(
204
        "--json", type=str, metavar="PATH", help="Write file with benchmark results"
205
    )
206
    args = parser.parse_args()
207

208
    num_gpus_per_node = torch.cuda.device_count()
209
    assert num_gpus_per_node == 8, "Expected 8 GPUs per machine"
210

211
    # The global process group used only for communicating benchmark
212
    # metadata, like measurements. Not for benchmarking itself.
213
    dist.init_process_group(
214
        backend="gloo",
215
        init_method=f"tcp://{args.master_addr}:{args.master_port}",
216
        rank=args.rank,
217
        world_size=args.world_size,
218
    )
219

220
    output = allgather_run("nvidia-smi topo -m")
221
    if not allequal(output):
222
        print('Output of "nvidia-smi topo -m" differs between machines')
223
        sys.exit(1)
224

225
    if args.rank == 0:
226
        print("-----------------------------------")
227
        print("PyTorch distributed benchmark suite")
228
        print("-----------------------------------")
229
        print()
230
        print(f"* PyTorch version: {torch.__version__}")
231
        print(f"* CUDA version: {torch.version.cuda}")
232
        print(f"* Distributed backend: {args.distributed_backend}")
233
        print(f"* Maximum bucket size: {args.bucket_size}MB")
234
        print()
235
        print("--- nvidia-smi topo -m ---")
236
        print()
237
        print(output[0])
238
        print("--------------------------")
239
        print()
240

241
    torch.cuda.set_device(dist.get_rank() % 8)
242
    device = torch.device("cuda:%d" % (dist.get_rank() % 8))
243

244
    benchmarks = []
245
    if args.model:
246
        benchmarks.append(
247
            TorchvisionBenchmark(
248
                device=device,
249
                distributed_backend=args.distributed_backend,
250
                bucket_size=args.bucket_size,
251
                model=args.model,
252
            )
253
        )
254
    else:
255
        for model in ["resnet50", "resnet101", "resnext50_32x4d", "resnext101_32x8d"]:
256
            benchmarks.append(
257
                TorchvisionBenchmark(
258
                    device=device,
259
                    distributed_backend=args.distributed_backend,
260
                    bucket_size=args.bucket_size,
261
                    model=model,
262
                )
263
            )
264

265
    benchmark_results = []
266
    for benchmark in benchmarks:
267
        if args.rank == 0:
268
            print(f"\nBenchmark: {str(benchmark)}")
269
        result = sweep(benchmark)
270
        benchmark_results.append(
271
            {
272
                "model": benchmark.model,
273
                "batch_size": benchmark.batch_size,
274
                "result": result,
275
            }
276
        )
277

278
    # Write file with benchmark results if applicable
279
    if args.rank == 0 and args.json:
280
        report = {
281
            "pytorch_version": torch.__version__,
282
            "cuda_version": torch.version.cuda,
283
            "distributed_backend": args.distributed_backend,
284
            "bucket_size": args.bucket_size,
285
            "benchmark_results": benchmark_results,
286
        }
287
        with open(args.json, "w") as f:
288
            json.dump(report, f)
289

290

291
if __name__ == "__main__":
292
    main()
293

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.