skypilot
310 строк · 10.0 Кб
1# Adapted from https://github.com/pytorch/pytorch/tree/main/benchmarks/distributed/ddp
2# The original program was written to be launched `python -m torch.distributed.launch`
3# but could not since it is missing `local-rank` as an argument in and also hardcoded the
4# distributed communications library to `gloo`. This version allows for `nccl`and launches
5# using `torchrun` to handle initializing the process ranks instead of the script arguments
6# and is the currently recommended way for writing/launching pytorch distributed programs.
7#
8# Measure distributed training iteration time.
9#
10# This program performs a sweep over a) a number of model architectures, and
11# b) an increasing number of processes. This produces a 1-GPU baseline,
12# an 8-GPU baseline (if applicable), as well as measurements for however
13# many processes can participate in training.
14#
15
16import argparse17import itertools18import json19import os20import shlex21import subprocess22import sys23import time24
25import numpy as np26import torch27import torch.distributed as dist28import torch.nn as nn29import torch.optim as optim30import torchvision31
32DEBUG = True33
34
35def debug_print(s):36if DEBUG:37print(s)38
39
40def allgather_object(obj):41out = [None for _ in range(dist.get_world_size())]42dist.all_gather_object(out, obj)43return out44
45
46def allgather_run(cmd):47proc = subprocess.run(shlex.split(cmd), capture_output=True)48assert proc.returncode == 049return allgather_object(proc.stdout.decode("utf-8"))50
51
52def allequal(iterator):53iterator = iter(iterator)54try:55first = next(iterator)56except StopIteration:57return True58return all(first == rest for rest in iterator)59
60
61def benchmark_process_group(pg, benchmark, use_ddp_for_single_rank=True):62torch.manual_seed(pg.rank())63torch.cuda.manual_seed(pg.rank())64
65model = benchmark.create_model()66data = [(benchmark.generate_inputs(), benchmark.generate_target())]67criterion = nn.CrossEntropyLoss()68optimizer = optim.SGD(model.parameters(),690.001,70momentum=0.9,71weight_decay=1e-4)72model = torch.nn.parallel.DistributedDataParallel(73model,74device_ids=[torch.cuda.current_device()],75broadcast_buffers=False,76process_group=pg,77bucket_cap_mb=benchmark.bucket_size,78)79measurements = []80warmup_iterations = 581measured_iterations = 1082for inputs, target in data * (warmup_iterations + measured_iterations):83start = time.time()84output = model(*inputs)85loss = criterion(output, target)86loss.backward()87optimizer.step()88torch.cuda.synchronize()89measurements.append(time.time() - start)90
91# Throw away measurements for warmup iterations92return measurements[warmup_iterations:]93
94
95def run_benchmark(benchmark, ranks, opts):96group = dist.new_group(ranks=ranks, backend=benchmark.distributed_backend)97measurements = []98if dist.get_rank() in set(ranks):99if not opts:100opts = {}101measurements = benchmark_process_group(group, benchmark, **opts)102dist.destroy_process_group(group)103dist.barrier()104
105# Aggregate measurements for better estimation of percentiles106return list(itertools.chain(*allgather_object(measurements)))107
108
109def sweep(benchmark):110# Synthesize the set of benchmarks to run.111# This list contain tuples for ("string prefix", [rank...]).112benchmarks = []113
114def append_benchmark(prefix, ranks, opts=None):115prefix = f"{len(ranks):4} GPUs -- {prefix}"116benchmarks.append((prefix, ranks, opts))117
118def local_print(msg):119if dist.get_rank() == 0:120print(msg, end="", flush=True) # noqa: E999121
122def print_header():123local_print("\n")124local_print("%22s" % "")125for p in [50, 75, 90, 95]:126local_print("%14s%10s" % ("sec/iter", "ex/sec"))127local_print("\n")128
129def print_measurements(prefix, nelem, measurements):130measurements = sorted(measurements)131local_print("%8s:" % prefix)132for p in [50, 75, 90, 95]:133v = np.percentile(measurements, p)134local_print(" p%02d: %1.3fs %6d/s" % (p, v, nelem / v))135local_print("\n")136
137# Every process runs once by themselves to warm up (CUDA init, etc).138append_benchmark(" warmup", [dist.get_rank()],139{"use_ddp_for_single_rank": False})140
141# Single machine baselines142append_benchmark(" no ddp", range(1), {"use_ddp_for_single_rank": False})143append_benchmark(" 1M/1G", range(1))144append_benchmark(" 1M/2G", range(2))145append_benchmark(" 1M/4G", range(4))146
147# Multi-machine benchmarks148for i in range(1, (dist.get_world_size() // 8) + 1):149append_benchmark(" %dM/8G" % i, range(i * 8))150
151# Run benchmarks in order of increasing number of GPUs152print_header()153results = []154for prefix, ranks, opts in sorted(benchmarks, key=lambda tup: len(tup[1])):155# Turn range into materialized list.156ranks = list(ranks)157measurements = run_benchmark(benchmark, ranks, opts)158if "warmup" not in prefix:159print_measurements(prefix, benchmark.batch_size, measurements)160results.append({"ranks": ranks, "measurements": measurements})161
162return results163
164
165class Benchmark:166
167def __init__(self, device, distributed_backend, bucket_size):168self.device = device169self.batch_size = 32170self.distributed_backend = distributed_backend171self.bucket_size = bucket_size172
173def __str__(self):174raise NotImplementedError175
176def create_model(self):177raise NotImplementedError178
179def generate_inputs(self):180raise NotImplementedError181
182def generate_target(self):183raise NotImplementedError184
185
186class TorchvisionBenchmark(Benchmark):187
188def __init__(self, device, distributed_backend, bucket_size, model):189super().__init__(190device,191distributed_backend,192bucket_size,193)194self.model = model195
196def __str__(self):197return f"{self.model} with batch size {self.batch_size}"198
199def create_model(self):200return torchvision.models.__dict__[self.model]().to(self.device)201
202def generate_inputs(self):203return [torch.rand([self.batch_size, 3, 224, 224], device=self.device)]204
205def generate_target(self):206return torch.tensor([1] * self.batch_size,207dtype=torch.long,208device=self.device)209
210
211def main():212parser = argparse.ArgumentParser(213description="PyTorch distributed benchmark suite")214parser.add_argument("--rank", type=int, default=os.environ["RANK"])215parser.add_argument("--world-size",216type=int,217default=os.environ["WORLD_SIZE"])218parser.add_argument("--distributed-backend", type=str, default="nccl")219parser.add_argument("--bucket-size", type=int, default=25)220parser.add_argument("--model", type=str)221parser.add_argument("--json",222type=str,223metavar="PATH",224help="Write file with benchmark results")225parser.add_argument("--local-rank",226type=str,227default=os.environ["LOCAL_RANK"])228args = parser.parse_args()229
230num_gpus_per_node = torch.cuda.device_count()231assert num_gpus_per_node == 8, "Expected 8 GPUs per machine"232
233# The global process group used only for communicating benchmark234# metadata, like measurements. Not for benchmarking itself.235torch.cuda.set_device(args.rank % 8)236device = torch.device("cuda:%d" % (args.rank % 8))237dist.init_process_group(238backend=args.distributed_backend,239rank=args.rank,240world_size=args.world_size,241)242
243output = allgather_run("nvidia-smi topo -m")244if not allequal(output):245print('Output of "nvidia-smi topo -m" differs between machines')246sys.exit(1)247
248if args.rank == 0:249print("-----------------------------------")250print("PyTorch distributed benchmark suite")251print("-----------------------------------")252print("")253print(f"* PyTorch version: {torch.__version__}")254print(f"* CUDA version: {torch.version.cuda}")255print(f"* Distributed backend: {args.distributed_backend}")256print(f"* Maximum bucket size: {args.bucket_size}MB")257print("")258print("--- nvidia-smi topo -m ---")259print("")260print(output[0])261print("--------------------------")262print("")263
264benchmarks = []265if args.model:266benchmarks.append(267TorchvisionBenchmark(268device=device,269distributed_backend=args.distributed_backend,270bucket_size=args.bucket_size,271model=args.model,272))273else:274for model in [275"resnet50", "resnet101", "resnext50_32x4d", "resnext101_32x8d"276]:277benchmarks.append(278TorchvisionBenchmark(279device=device,280distributed_backend=args.distributed_backend,281bucket_size=args.bucket_size,282model=model,283))284
285benchmark_results = []286for benchmark in benchmarks:287if args.rank == 0:288print(f"\nBenchmark: {str(benchmark)}")289result = sweep(benchmark)290benchmark_results.append({291"model": benchmark.model,292"batch_size": benchmark.batch_size,293"result": result,294})295
296# Write file with benchmark results if applicable297if args.rank == 0 and args.json:298report = {299"pytorch_version": torch.__version__,300"cuda_version": torch.version.cuda,301"distributed_backend": args.distributed_backend,302"bucket_size": args.bucket_size,303"benchmark_results": benchmark_results,304}305with open(args.json, "w") as f:306json.dump(report, f)307
308
309if __name__ == "__main__":310main()311