skypilot

Форк
0
/
torch_ddp_benchmark.py 
310 строк · 10.0 Кб
1
# Adapted from https://github.com/pytorch/pytorch/tree/main/benchmarks/distributed/ddp
2
# The original program was written to be launched `python -m torch.distributed.launch`
3
# but could not since it is missing `local-rank` as an argument in and also hardcoded the
4
# distributed communications library to `gloo`. This version allows for `nccl`and launches
5
# using `torchrun` to handle initializing the process ranks instead of the script arguments
6
# and is the currently recommended way for writing/launching pytorch distributed programs.
7
#
8
# Measure distributed training iteration time.
9
#
10
# This program performs a sweep over a) a number of model architectures, and
11
# b) an increasing number of processes. This produces a 1-GPU baseline,
12
# an 8-GPU baseline (if applicable), as well as measurements for however
13
# many processes can participate in training.
14
#
15

16
import argparse
17
import itertools
18
import json
19
import os
20
import shlex
21
import subprocess
22
import sys
23
import time
24

25
import numpy as np
26
import torch
27
import torch.distributed as dist
28
import torch.nn as nn
29
import torch.optim as optim
30
import torchvision
31

32
DEBUG = True
33

34

35
def debug_print(s):
36
    if DEBUG:
37
        print(s)
38

39

40
def allgather_object(obj):
41
    out = [None for _ in range(dist.get_world_size())]
42
    dist.all_gather_object(out, obj)
43
    return out
44

45

46
def allgather_run(cmd):
47
    proc = subprocess.run(shlex.split(cmd), capture_output=True)
48
    assert proc.returncode == 0
49
    return allgather_object(proc.stdout.decode("utf-8"))
50

51

52
def allequal(iterator):
53
    iterator = iter(iterator)
54
    try:
55
        first = next(iterator)
56
    except StopIteration:
57
        return True
58
    return all(first == rest for rest in iterator)
59

60

61
def benchmark_process_group(pg, benchmark, use_ddp_for_single_rank=True):
62
    torch.manual_seed(pg.rank())
63
    torch.cuda.manual_seed(pg.rank())
64

65
    model = benchmark.create_model()
66
    data = [(benchmark.generate_inputs(), benchmark.generate_target())]
67
    criterion = nn.CrossEntropyLoss()
68
    optimizer = optim.SGD(model.parameters(),
69
                          0.001,
70
                          momentum=0.9,
71
                          weight_decay=1e-4)
72
    model = torch.nn.parallel.DistributedDataParallel(
73
        model,
74
        device_ids=[torch.cuda.current_device()],
75
        broadcast_buffers=False,
76
        process_group=pg,
77
        bucket_cap_mb=benchmark.bucket_size,
78
    )
79
    measurements = []
80
    warmup_iterations = 5
81
    measured_iterations = 10
82
    for inputs, target in data * (warmup_iterations + measured_iterations):
83
        start = time.time()
84
        output = model(*inputs)
85
        loss = criterion(output, target)
86
        loss.backward()
87
        optimizer.step()
88
        torch.cuda.synchronize()
89
        measurements.append(time.time() - start)
90

91
    # Throw away measurements for warmup iterations
92
    return measurements[warmup_iterations:]
93

94

95
def run_benchmark(benchmark, ranks, opts):
96
    group = dist.new_group(ranks=ranks, backend=benchmark.distributed_backend)
97
    measurements = []
98
    if dist.get_rank() in set(ranks):
99
        if not opts:
100
            opts = {}
101
        measurements = benchmark_process_group(group, benchmark, **opts)
102
    dist.destroy_process_group(group)
103
    dist.barrier()
104

105
    # Aggregate measurements for better estimation of percentiles
106
    return list(itertools.chain(*allgather_object(measurements)))
107

108

109
def sweep(benchmark):
110
    # Synthesize the set of benchmarks to run.
111
    # This list contain tuples for ("string prefix", [rank...]).
112
    benchmarks = []
113

114
    def append_benchmark(prefix, ranks, opts=None):
115
        prefix = f"{len(ranks):4} GPUs -- {prefix}"
116
        benchmarks.append((prefix, ranks, opts))
117

118
    def local_print(msg):
119
        if dist.get_rank() == 0:
120
            print(msg, end="", flush=True)  # noqa: E999
121

122
    def print_header():
123
        local_print("\n")
124
        local_print("%22s" % "")
125
        for p in [50, 75, 90, 95]:
126
            local_print("%14s%10s" % ("sec/iter", "ex/sec"))
127
        local_print("\n")
128

129
    def print_measurements(prefix, nelem, measurements):
130
        measurements = sorted(measurements)
131
        local_print("%8s:" % prefix)
132
        for p in [50, 75, 90, 95]:
133
            v = np.percentile(measurements, p)
134
            local_print("  p%02d:  %1.3fs  %6d/s" % (p, v, nelem / v))
135
        local_print("\n")
136

137
    # Every process runs once by themselves to warm up (CUDA init, etc).
138
    append_benchmark("  warmup", [dist.get_rank()],
139
                     {"use_ddp_for_single_rank": False})
140

141
    # Single machine baselines
142
    append_benchmark("  no ddp", range(1), {"use_ddp_for_single_rank": False})
143
    append_benchmark("   1M/1G", range(1))
144
    append_benchmark("   1M/2G", range(2))
145
    append_benchmark("   1M/4G", range(4))
146

147
    # Multi-machine benchmarks
148
    for i in range(1, (dist.get_world_size() // 8) + 1):
149
        append_benchmark("   %dM/8G" % i, range(i * 8))
150

151
    # Run benchmarks in order of increasing number of GPUs
152
    print_header()
153
    results = []
154
    for prefix, ranks, opts in sorted(benchmarks, key=lambda tup: len(tup[1])):
155
        # Turn range into materialized list.
156
        ranks = list(ranks)
157
        measurements = run_benchmark(benchmark, ranks, opts)
158
        if "warmup" not in prefix:
159
            print_measurements(prefix, benchmark.batch_size, measurements)
160
            results.append({"ranks": ranks, "measurements": measurements})
161

162
    return results
163

164

165
class Benchmark:
166

167
    def __init__(self, device, distributed_backend, bucket_size):
168
        self.device = device
169
        self.batch_size = 32
170
        self.distributed_backend = distributed_backend
171
        self.bucket_size = bucket_size
172

173
    def __str__(self):
174
        raise NotImplementedError
175

176
    def create_model(self):
177
        raise NotImplementedError
178

179
    def generate_inputs(self):
180
        raise NotImplementedError
181

182
    def generate_target(self):
183
        raise NotImplementedError
184

185

186
class TorchvisionBenchmark(Benchmark):
187

188
    def __init__(self, device, distributed_backend, bucket_size, model):
189
        super().__init__(
190
            device,
191
            distributed_backend,
192
            bucket_size,
193
        )
194
        self.model = model
195

196
    def __str__(self):
197
        return f"{self.model} with batch size {self.batch_size}"
198

199
    def create_model(self):
200
        return torchvision.models.__dict__[self.model]().to(self.device)
201

202
    def generate_inputs(self):
203
        return [torch.rand([self.batch_size, 3, 224, 224], device=self.device)]
204

205
    def generate_target(self):
206
        return torch.tensor([1] * self.batch_size,
207
                            dtype=torch.long,
208
                            device=self.device)
209

210

211
def main():
212
    parser = argparse.ArgumentParser(
213
        description="PyTorch distributed benchmark suite")
214
    parser.add_argument("--rank", type=int, default=os.environ["RANK"])
215
    parser.add_argument("--world-size",
216
                        type=int,
217
                        default=os.environ["WORLD_SIZE"])
218
    parser.add_argument("--distributed-backend", type=str, default="nccl")
219
    parser.add_argument("--bucket-size", type=int, default=25)
220
    parser.add_argument("--model", type=str)
221
    parser.add_argument("--json",
222
                        type=str,
223
                        metavar="PATH",
224
                        help="Write file with benchmark results")
225
    parser.add_argument("--local-rank",
226
                        type=str,
227
                        default=os.environ["LOCAL_RANK"])
228
    args = parser.parse_args()
229

230
    num_gpus_per_node = torch.cuda.device_count()
231
    assert num_gpus_per_node == 8, "Expected 8 GPUs per machine"
232

233
    # The global process group used only for communicating benchmark
234
    # metadata, like measurements. Not for benchmarking itself.
235
    torch.cuda.set_device(args.rank % 8)
236
    device = torch.device("cuda:%d" % (args.rank % 8))
237
    dist.init_process_group(
238
        backend=args.distributed_backend,
239
        rank=args.rank,
240
        world_size=args.world_size,
241
    )
242

243
    output = allgather_run("nvidia-smi topo -m")
244
    if not allequal(output):
245
        print('Output of "nvidia-smi topo -m" differs between machines')
246
        sys.exit(1)
247

248
    if args.rank == 0:
249
        print("-----------------------------------")
250
        print("PyTorch distributed benchmark suite")
251
        print("-----------------------------------")
252
        print("")
253
        print(f"* PyTorch version: {torch.__version__}")
254
        print(f"* CUDA version: {torch.version.cuda}")
255
        print(f"* Distributed backend: {args.distributed_backend}")
256
        print(f"* Maximum bucket size: {args.bucket_size}MB")
257
        print("")
258
        print("--- nvidia-smi topo -m ---")
259
        print("")
260
        print(output[0])
261
        print("--------------------------")
262
        print("")
263

264
    benchmarks = []
265
    if args.model:
266
        benchmarks.append(
267
            TorchvisionBenchmark(
268
                device=device,
269
                distributed_backend=args.distributed_backend,
270
                bucket_size=args.bucket_size,
271
                model=args.model,
272
            ))
273
    else:
274
        for model in [
275
                "resnet50", "resnet101", "resnext50_32x4d", "resnext101_32x8d"
276
        ]:
277
            benchmarks.append(
278
                TorchvisionBenchmark(
279
                    device=device,
280
                    distributed_backend=args.distributed_backend,
281
                    bucket_size=args.bucket_size,
282
                    model=model,
283
                ))
284

285
    benchmark_results = []
286
    for benchmark in benchmarks:
287
        if args.rank == 0:
288
            print(f"\nBenchmark: {str(benchmark)}")
289
        result = sweep(benchmark)
290
        benchmark_results.append({
291
            "model": benchmark.model,
292
            "batch_size": benchmark.batch_size,
293
            "result": result,
294
        })
295

296
    # Write file with benchmark results if applicable
297
    if args.rank == 0 and args.json:
298
        report = {
299
            "pytorch_version": torch.__version__,
300
            "cuda_version": torch.version.cuda,
301
            "distributed_backend": args.distributed_backend,
302
            "bucket_size": args.bucket_size,
303
            "benchmark_results": benchmark_results,
304
        }
305
        with open(args.json, "w") as f:
306
            json.dump(report, f)
307

308

309
if __name__ == "__main__":
310
    main()
311

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.