pytorch
4490 строк · 174.2 Кб
1# Owner(s): ["oncall: distributed"]
2
3import copy4import json5import os6import pickle7import random8import re9import signal10import sys11import tempfile12import threading13import time14import warnings15from contextlib import contextmanager16from datetime import datetime, timedelta17from enum import auto, Enum18from itertools import chain, product19from unittest import mock, SkipTest20
21import torch22import torch.distributed as c10d23
24
25if not c10d.is_available() or not c10d.is_nccl_available():26print("c10d NCCL not available, skipping tests", file=sys.stderr)27sys.exit(0)28
29from typing import Dict, List30
31import test_c10d_common32from test_c10d_common import ConvNet, DoubleGpuNet, gpus_for_rank, ModuleForDdpCommHook33
34import torch.distributed as dist35import torch.distributed.algorithms.ddp_comm_hooks.default_hooks as default36import torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook as powerSGD37import torch.nn.functional as F38import torch.testing._internal.common_utils as common39from torch import nn40from torch._C._distributed_c10d import OpType41from torch.nn.parallel import DistributedDataParallel42from torch.testing._internal.common_cuda import TEST_MULTIGPU43from torch.testing._internal.common_distributed import (44get_timeout,45init_multigpu_helper,46MultiProcessTestCase,47requires_gloo,48requires_nccl,49requires_nccl_version,50skip_if_lt_x_gpu,51skip_if_rocm,52TEST_SKIPS,53with_dist_debug_levels,54with_nccl_blocking_wait,55)
56from torch.testing._internal.common_utils import (57instantiate_parametrized_tests,58parametrize,59retry_on_connect_failures,60run_tests,61skip_but_pass_in_sandcastle,62skip_but_pass_in_sandcastle_if,63TEST_CUDA,64TEST_WITH_DEV_DBG_ASAN,65TEST_WITH_ROCM,66TestCase,67)
68
69
70if TEST_WITH_DEV_DBG_ASAN:71print(72"Skip ASAN as torch + multiprocessing spawn have known issues", file=sys.stderr73)74sys.exit(0)75
76# bfloat16 is only supported by CUDA 11+
77BFLOAT16_AVAILABLE = torch.cuda.is_available() and (78(torch.version.cuda is not None and int(torch.version.cuda.split(".")[0]) >= 11)79or torch.version.hip is not None80)
81
82
83class RendezvousEnvTest(TestCase):84@retry_on_connect_failures85@requires_nccl()86@skip_but_pass_in_sandcastle_if(not TEST_CUDA, "No GPUs available, skipping test")87def test_common_errors(self):88vars = {89"WORLD_SIZE": "1",90"RANK": "0",91"MASTER_ADDR": "127.0.0.1",92"MASTER_PORT": str(common.find_free_port()),93}94
95class Env:96def __init__(self, vars):97self.env_patcher = mock.patch.dict(os.environ, vars, clear=True)98
99def __enter__(self):100self.env_patcher.start()101
102def __exit__(self, type, value, traceback):103self.env_patcher.stop()104
105def without(d, key):106d = d.copy()107d.pop(key)108return d109
110def withouts(d, keys):111d = d.copy()112for key in keys:113d.pop(key)114return d115
116with Env(without(vars, "WORLD_SIZE")):117self.assertEqual(None, os.environ.get("WORLD_SIZE"))118with self.assertRaisesRegex(ValueError, "WORLD_SIZE expected"):119gen = c10d.rendezvous("env://")120next(gen)121c10d.init_process_group(backend="nccl", world_size=1)122self.assertEqual(c10d.get_rank(), 0)123self.assertEqual(c10d.get_world_size(), 1)124c10d.destroy_process_group()125
126with Env(without(vars, "RANK")):127self.assertEqual(None, os.environ.get("RANK"))128with self.assertRaisesRegex(ValueError, "RANK expected"):129gen = c10d.rendezvous("env://")130next(gen)131c10d.init_process_group(backend="nccl", rank=0)132self.assertEqual(c10d.get_rank(), 0)133self.assertEqual(c10d.get_world_size(), 1)134c10d.destroy_process_group()135
136with Env(withouts(vars, ["RANK", "WORLD_SIZE"])):137self.assertEqual(None, os.environ.get("RANK"))138self.assertEqual(None, os.environ.get("WORLD_SIZE"))139c10d.init_process_group(backend="nccl", rank=0, world_size=1)140self.assertEqual(c10d.get_rank(), 0)141self.assertEqual(c10d.get_world_size(), 1)142c10d.destroy_process_group()143
144with Env(vars):145c10d.init_process_group(backend="nccl")146self.assertEqual(c10d.get_rank(), 0)147self.assertEqual(c10d.get_world_size(), 1)148c10d.destroy_process_group()149
150with Env(without(vars, "MASTER_ADDR")):151self.assertEqual(None, os.environ.get("MASTER_ADDR"))152with self.assertRaisesRegex(ValueError, "MASTER_ADDR expected"):153gen = c10d.rendezvous("env://")154next(gen)155
156with Env(without(vars, "MASTER_PORT")):157self.assertEqual(None, os.environ.get("MASTER_PORT"))158with self.assertRaisesRegex(ValueError, "MASTER_PORT expected"):159gen = c10d.rendezvous("env://")160next(gen)161
162with Env(without(vars, "WORLD_SIZE")):163self.assertEqual(None, os.environ.get("WORLD_SIZE"))164gen = c10d.rendezvous(f"env://?world_size={1}")165_, _, size = next(gen)166self.assertEqual(size, 1)167
168with Env(without(vars, "RANK")):169self.assertEqual(None, os.environ.get("RANK"))170gen = c10d.rendezvous(f"env://?rank={0}")171_, rank, _ = next(gen)172self.assertEqual(rank, 0)173
174with Env(withouts(vars, ["RANK", "WORLD_SIZE"])):175self.assertEqual(None, os.environ.get("RANK"))176self.assertEqual(None, os.environ.get("WORLD_SIZE"))177gen = c10d.rendezvous(f"env://?rank={0}&world_size={1}")178_, rank, size = next(gen)179self.assertEqual(rank, 0)180self.assertEqual(size, 1)181
182
183class TimeoutTest(test_c10d_common.AbstractTimeoutTest, TestCase):184@requires_nccl()185@retry_on_connect_failures186@skip_but_pass_in_sandcastle_if(not TEST_CUDA, "No GPUs available, skipping test")187def test_default_store_timeout_nccl(self):188self._test_default_store_timeout("nccl")189
190
191class ProcessGroupNCCLNoGPUTest(TestCase):192MAIN_PROCESS_RANK = 0193
194def setUp(self):195self.rank = self.MAIN_PROCESS_RANK196self.world_size = 1197self.file = tempfile.NamedTemporaryFile(delete=False)198
199def tearDown(self):200pass201
202@requires_nccl()203@skip_but_pass_in_sandcastle_if(TEST_CUDA, "GPUs are available, skipping test")204def test_init_no_gpus(self):205store = c10d.FileStore(self.file.name, self.world_size)206with self.assertRaisesRegex(207ValueError, "ProcessGroupNCCL is only supported with GPUs, no GPUs found!"208):209c10d.ProcessGroupNCCL(store, self.rank, self.world_size)210
211
212class ProcessGroupNCCLGroupTest(MultiProcessTestCase):213def _create_process_group_nccl(self, store, opts, device_id=None):214# create nccl processgroup with opts215c10d.init_process_group(216"nccl",217world_size=self.world_size,218rank=self.rank,219store=store,220pg_options=opts,221device_id=device_id,222)223pg = c10d.distributed_c10d._get_default_group()224return pg225
226def opts(self, high_priority_stream=False):227opts = c10d.ProcessGroupNCCL.Options()228opts.is_high_priority_stream = high_priority_stream229return opts230
231def setUp(self):232super().setUp()233# Need to skip return code checking for these tests since the child234# processes don't exit cleanly in some cuda versions235self.skip_return_code_checks = [236self.test_nan_assert_float16.__wrapped__,237self.test_nan_assert_float32.__wrapped__,238self.test_nan_assert_float64.__wrapped__,239self.test_nan_assert_bfloat16.__wrapped__,240]241
242# TORCH_NCCL_BLOCKING_WAIT overrides TORCH_NCCL_ASYNC_ERROR_HANDLING hence tests243# that use TORCH_NCCL_BLOCKING_WAIT will test it as expected.244os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "1"245# self.num_gpus = torch.cuda.device_count()246self._spawn_processes()247
248def tearDown(self):249super().tearDown()250try:251os.remove(self.file_name)252except OSError:253pass254
255@property256def world_size(self):257return 2258
259@property260def rank_to_GPU(self):261# return rank to GPU map262return init_multigpu_helper(self.world_size, "nccl")263
264@requires_nccl()265@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 1 GPU")266@skip_if_lt_x_gpu(1)267def test_nccl_dist_backend_error(self):268store = c10d.FileStore(self.file_name, self.world_size)269self._create_process_group_nccl(store, self.opts())270
271# Both rank 0 and 1 will use the same CUDA device resulting in ncclInvalidUsage272with self.assertRaises(dist.DistBackendError) as cm:273dist.broadcast(torch.tensor([1, 2, 3]).cuda(), 0)274self.assertTrue(isinstance(cm.exception, dist.DistError))275
276self.assertIsInstance(cm.exception, RuntimeError)277
278@requires_nccl()279@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")280def test_abort_pg(self):281# Disable ASYNC_ERROR_HANDLING for this test to ensure we can programmatically282# abort the process group.283os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "0"284
285store = c10d.FileStore(self.file_name, self.world_size)286self._create_process_group_nccl(store, self.opts())287device = self.rank_to_GPU[self.rank][0]288
289t = torch.rand(10, 10, device=device)290# First allreduce to initialize state.291dist.all_reduce(t)292
293def abortpg():294c10d.distributed_c10d._get_default_group()._get_backend(295torch.device(device)296)._shutdown()297
298# Initialize DDP to ensure "destroy_process_group" will not call299# ProcessGroupNCCL destructor since DDP holds a reference to process group.300# Run a single iteration of DDP to initialize state.301model = DistributedDataParallel(302torch.nn.Linear(10, 10).to(device), device_ids=[device]303)304model(t).sum().backward()305
306# Now simulate collective getting stuck and abort gets us unstuck307if self.rank == 0:308dist.all_reduce(t)309
310# Schedule thread before we get stuck to abort pg.311thread = threading.Thread(target=abortpg)312thread.start()313
314# We would get stuck here due to d2h if we didn't abort.315t_cpu = t.cpu()316
317thread.join()318
319@requires_nccl()320@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")321def test_close_pg(self):322# Disable ASYNC_ERROR_HANDLING for this test to ensure we can programmatically323# abort the process group.324os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "0"325
326store = c10d.FileStore(self.file_name, self.world_size)327pg = self._create_process_group_nccl(store, self.opts())328device = self.rank_to_GPU[self.rank][0]329
330t = torch.rand(10, 10, device=device)331# First allreduce to initialize state.332pg.allreduce(t)333
334# Destroy pg and validate pg is no longer valid335dist.destroy_process_group()336with self.assertRaises(dist.DistBackendError):337pg.allreduce([t])338
339del pg340
341CUDA_12_AND_ABOVE = torch.cuda.is_available() and (342torch.version.cuda is not None and int(torch.version.cuda.split(".")[0]) >= 12343)344
345@requires_nccl()346@skip_but_pass_in_sandcastle_if(347not (TEST_MULTIGPU and CUDA_12_AND_ABOVE),348"NCCL test requires 2+ GPUs and Device side assert could cause unexpected errors in lower versions of CUDA",349)350@parametrize("type", [torch.float16, torch.float32, torch.float64, torch.bfloat16])351@skip_if_rocm352def test_nan_assert(self, type):353# Expecting a device-side error when NaN is detected354os.environ["TORCH_NCCL_NAN_CHECK"] = "1"355store = c10d.FileStore(self.file_name, self.world_size)356pg = self._create_process_group_nccl(store, self.opts())357device = self.rank_to_GPU[self.rank][0]358size = (10, 10)359nan_tensor = torch.full(size, self.rank, dtype=type, device=device)360# randomly pick an nan element361i = random.randint(0, nan_tensor.size(0) - 1)362j = random.randint(0, nan_tensor.size(1) - 1)363nan_tensor[i, j] = float("nan")364with self.assertRaises(RuntimeError):365pg.allreduce(nan_tensor)366dist.destroy_process_group()367# reset env368os.environ["TORCH_NCCL_NAN_CHECK"] = "0"369
370@requires_nccl()371@skip_if_lt_x_gpu(2)372def test_nan_rank_filter(self):373# Putting NaN at recv buffer, program should not fail as NaN checker374# should not check on receive buffer375os.environ["TORCH_NCCL_NAN_CHECK"] = "1"376store = c10d.FileStore(self.file_name, self.world_size)377device = torch.device("cuda:%d" % self.rank)378c10d.init_process_group(379backend="nccl", store=store, rank=self.rank, world_size=self.world_size380)381t = torch.ones(3, 4, dtype=torch.bfloat16, device=device)382if self.rank != 0:383# Putting NaN at recv buffer384t[1, 1] = float("nan")385# Against broadcast386c10d.broadcast(t, 0)387# Against P2P388if self.rank == 0:389c10d.send(t, 1)390elif self.rank == 1:391c10d.recv(t, 0)392c10d.destroy_process_group()393# reset env394os.environ["TORCH_NCCL_NAN_CHECK"] = "0"395
396@requires_nccl()397@skip_if_lt_x_gpu(2)398def test_nan_check(self):399# Not expecting an error, NaN check should not make legit code fail400os.environ["TORCH_NCCL_NAN_CHECK"] = "1"401store = c10d.FileStore(self.file_name, self.world_size)402device = torch.device("cuda:%d" % self.rank)403c10d.init_process_group(404backend="nccl", store=store, rank=self.rank, world_size=self.world_size405)406x = torch.ones((10,), dtype=torch.bfloat16, device=device) * self.rank407t = torch.ones(3, 4, dtype=torch.bfloat16, device=device)408c10d.broadcast(x, src=0)409c10d.all_reduce(t)410c10d.barrier()411c10d.destroy_process_group()412# reset env413os.environ["TORCH_NCCL_NAN_CHECK"] = "0"414
415@requires_nccl()416@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")417def test_destruct_before_terminate_pg(self):418# Disable ASYNC_ERROR_HANDLING for this test to ensure we can programmatically419# abort the process group.420os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "0"421store = c10d.FileStore(self.file_name, self.world_size)422pg = self._create_process_group_nccl(store, self.opts())423device = self.rank_to_GPU[self.rank][0]424
425t = torch.rand(10, 10, device=device)426# First allreduce to initialize state.427pg.allreduce(t)428# force destruction before terminating comms, destructor would terminate comms429del pg430
431@requires_nccl()432@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")433def test_abort_in_destroy_pg(self):434# Disable ASYNC_ERROR_HANDLING for this test to ensure we can programmatically435# abort the process group.436os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "0"437
438store = c10d.FileStore(self.file_name, self.world_size)439pg = self._create_process_group_nccl(store, self.opts())440device = self.rank_to_GPU[self.rank][0]441
442t = torch.rand(10, 10, device=device)443# First allreduce to initialize state.444pg.allreduce(t)445
446# Destroy pg and validate pg is NOT in working condition since447# we have shutdown comms448dist.destroy_process_group()449with self.assertRaises(dist.DistBackendError):450pg.allreduce([t])451
452@requires_nccl()453@skip_but_pass_in_sandcastle_if(454torch.cuda.device_count() < 2, "NCCL test requires 2+ GPUs"455)456def test_close_multi_pg_unordered(self):457store = c10d.FileStore(self.file_name, self.world_size)458pg = self._create_process_group_nccl(store, self.opts())459device = self.rank_to_GPU[self.rank][0]460t = torch.rand(10, 10, device=device)461# First allreduce to initialize default PG's communicator.462pg.allreduce(t).wait()463new_pg1 = c10d.new_group([0, 1])464new_pg2 = c10d.new_group([0, 1])465if self.rank == 0 or self.rank == 1:466t1 = torch.rand(10, 10, device=device)467t2 = torch.rand(10, 10, device=device)468new_pg1.allreduce(t1).wait()469new_pg2.allreduce(t2).wait()470if self.rank == 0:471dist.destroy_process_group(new_pg2)472# force destruction of pg2 first473del new_pg2474dist.destroy_process_group(new_pg1)475del new_pg1476if self.rank == 1:477c10d.destroy_process_group(new_pg1)478# force destruction of pg1 first479del new_pg1480dist.destroy_process_group(new_pg2)481del new_pg2482dist.destroy_process_group()483
484@requires_nccl()485@skip_but_pass_in_sandcastle_if(486torch.cuda.device_count() < 2, "NCCL test requires 2+ GPUs"487)488def test_abort_in_destroy_multi_pgs(self):489store = c10d.FileStore(self.file_name, self.world_size)490pg = self._create_process_group_nccl(store, self.opts())491device = self.rank_to_GPU[self.rank][0]492t = torch.rand(10, 10, device=device)493# First allreduce to initialize default PG's communicator.494pg.allreduce(t).wait()495new_pg1 = c10d.new_group([0, 1])496new_pg2 = c10d.new_group([0, 1])497t1 = torch.rand(10, 10, device=device)498t2 = torch.rand(10, 10, device=device)499new_pg1.allreduce(t1).wait()500new_pg2.allreduce(t2).wait()501backend = pg._get_backend(torch.device(device))502# default PG's backend should have a split count of 2503self.assertEqual(backend.comm_split_count(), 2)504# shutdown all NCCL PGs in one shot505dist.destroy_process_group()506
507@requires_nccl()508@skip_but_pass_in_sandcastle_if(509torch.cuda.device_count() < 2, "NCCL test requires 2+ GPUs"510)511def test_abort_in_destroy_mixed_empty_pgs(self):512store = c10d.FileStore(self.file_name, self.world_size)513pg = self._create_process_group_nccl(store, self.opts())514device = self.rank_to_GPU[self.rank][0]515t = torch.rand(10, 10, device=device)516# First allreduce to initialize default PG's communicator.517pg.allreduce(t).wait()518# PG1 is an PG without comms initialized, since we don't call collective on it519new_pg1 = c10d.new_group([0, 1])520new_pg2 = c10d.new_group([0, 1])521t2 = torch.rand(10, 10, device=device)522
523new_pg2.allreduce(t2).wait()524backend = pg._get_backend(torch.device(device))525# default PG's backend should have a split count of 1526self.assertEqual(backend.comm_split_count(), 1)527# shutdown all NCCL PGs in one shot528dist.destroy_process_group()529
530@requires_nccl()531@skip_but_pass_in_sandcastle_if(532torch.cuda.device_count() < 2, "NCCL test requires 2+ GPUs"533)534def test_file_store_check(self):535os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "0"536os.environ["TORCH_NCCL_ENABLE_MONITORING"] = "0"537# FileStore check() would be executed538os.environ["TORCH_NCCL_DUMP_ON_TIMEOUT"] = "1"539os.environ["TORCH_NCCL_HEARTBEAT_TIMEOUT_SEC"] = "0"540
541# self.file_name is created using "delete=False"542# e.g., self.file_name = tempfile.NamedTemporaryFile(delete=False).name543store = dist.FileStore(self.file_name, self.world_size)544dist.init_process_group(545backend="nccl", rank=self.rank, world_size=self.world_size, store=store546)547pg = dist.distributed_c10d._get_default_group()548self.assertEqual(pg.rank(), self.rank)549self.assertEqual(pg.size(), self.world_size)550# give enough time for check() to be executed multiple times551time.sleep(2)552dist.destroy_process_group()553
554def _check_nccl_timeout(self, expected_timeout):555pg = dist.distributed_c10d._get_default_group()556options = pg._get_backend(torch.device(f"cuda:{self.rank}")).options557self.assertEqual(options._timeout, expected_timeout)558
559@requires_nccl()560@skip_but_pass_in_sandcastle_if(not TEST_CUDA, "No GPUs available, skipping test")561def test_init_process_group_nccl_timeout(self):562# nccl is handled 'specially' inside init_process_group and its options class is different from the options563# used by the other PG's. There are specific edge cases for nccl that need to be tested.564
565store = c10d.FileStore(self.file_name, self.world_size)566base_opts = dict(567backend="nccl", store=store, rank=self.rank, world_size=self.world_size568)569
570# test the default value coming from the `init_process_group` kwarg default571dist.init_process_group(**base_opts)572self._check_nccl_timeout(torch.distributed.constants.default_pg_nccl_timeout)573dist.destroy_process_group()574
575# test that `kwarg` timeout takes effect576new_timeout = timedelta(seconds=123)577dist.init_process_group(**base_opts, timeout=new_timeout)578self._check_nccl_timeout(new_timeout)579dist.destroy_process_group()580
581# test that timeout value provided via `pg_options` kwarg is ignored and issues warning,582# 'timeout' kwarg (or its kwdefault) taking precedence583opts = dist.ProcessGroupNCCL.Options()584opts._timeout = timedelta(seconds=123)585with warnings.catch_warnings(record=True) as w:586dist.init_process_group(**base_opts, pg_options=opts)587# TODO(whc) i verified that we are indeed emitting this warning, and i can't figure out why i can't catch it.588# self.assertEqual(len(w), 1)589# self.assertTrue("pg_options._timeout was specified" in str(w[-1].message))590self._check_nccl_timeout(torch.distributed.constants.default_pg_nccl_timeout)591dist.destroy_process_group()592
593# test that timeout value provided via `pg_options` kwarg is ignored and issues warning,594# 'timeout' kwarg taking precedence595opts = dist.ProcessGroupNCCL.Options()596opts._timeout = timedelta(seconds=123)597dist.init_process_group(598**base_opts, pg_options=opts, timeout=timedelta(seconds=1240)599)600self._check_nccl_timeout(timedelta(seconds=1240))601dist.destroy_process_group()602
603@requires_nccl()604@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")605@parametrize("backend", [None, "nccl"])606def test_set_nccl_pg_timeout(self, backend):607store = c10d.FileStore(self.file_name, self.world_size)608opts = dict(609backend=backend,610store=store,611rank=self.rank,612world_size=self.world_size,613timeout=timedelta(seconds=123),614)615dist.init_process_group(**opts)616pg = dist.distributed_c10d._get_default_group()617pg.allreduce(torch.rand(10).cuda(self.rank))618self._check_nccl_timeout(timedelta(seconds=123))619pg._get_backend(torch.device(f"cuda:{self.rank}"))._set_default_timeout(620timedelta(seconds=23)621)622self._check_nccl_timeout(timedelta(seconds=23))623pg.allreduce(torch.rand(10).cuda(self.rank))624c10d.distributed_c10d._set_pg_timeout(timedelta(seconds=252), pg)625self._check_nccl_timeout(timedelta(seconds=252))626
627@requires_nccl()628@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")629@parametrize("backend", [None, "nccl"])630def test_extend_nccl_pg_timeout(self, backend):631torch.cuda.set_device(self.rank)632store = c10d.FileStore(self.file_name, self.world_size)633opts = dict(634backend=backend,635store=store,636rank=self.rank,637world_size=self.world_size,638timeout=timedelta(seconds=123),639)640dist.init_process_group(**opts)641pg = dist.distributed_c10d._get_default_group()642bankend = pg._get_backend(torch.device(f"cuda:{self.rank}"))643w = pg.allreduce(torch.rand(10).cuda(self.rank))644self.assertTrue(bankend._verify_work_timeout(w, timedelta(seconds=123)))645w.wait()646bankend._set_default_timeout(timedelta(seconds=3))647if self.rank == 0:648# Ideally we want to sleep for a very long time, but this is not649# feasible in unit test. So this is only a very tiny case.650time.sleep(5)651pg.allreduce(torch.rand(10).cuda(self.rank))652time.sleep(5)653pg.allreduce(torch.rand(5).cuda(self.rank))654w = pg.allreduce(torch.rand(10).cuda(self.rank))655self.assertTrue(bankend._verify_work_timeout(w, timedelta(seconds=3)))656w.wait()657else:658dist.distributed_c10d._add_ephemeral_timeout_for_all_pgs(659timedelta(seconds=10)660)661w1 = pg.allreduce(torch.rand(10).cuda(self.rank))662w2 = pg.allreduce(torch.rand(5).cuda(self.rank))663self.assertTrue(bankend._verify_work_timeout(w1, timedelta(seconds=13)))664self.assertTrue(bankend._verify_work_timeout(w2, timedelta(seconds=13)))665w1.wait()666dist.distributed_c10d._add_ephemeral_timeout_for_all_pgs(667timedelta(seconds=5)668)669# Since we are not block wait so use a sync here to leave enough time670# for watchdog to reset first timeout extension.671torch.cuda.synchronize(torch.device(f"cuda:{self.rank}"))672w = pg.allreduce(torch.rand(10).cuda(self.rank))673self.assertTrue(bankend._verify_work_timeout(w, timedelta(seconds=8)))674w.wait()675
676@requires_nccl_version((2, 18), "Need NCCL 2.18+ for ncclCommSplit")677@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")678def test_comm_split_optimization(self):679# Test the optimization of new groups that contain all world680# ranks use the "transparent" `ncclCommSplit` optimization.681store = c10d.FileStore(self.file_name, self.world_size)682pg = self._create_process_group_nccl(store, self.opts())683
684# Test lazy splitting behavior across each per-device backend.685for device in self.rank_to_GPU[self.rank]:686backend = pg._get_backend(torch.device(device))687
688# split doesn't happen unless the original process group has lazily689# created communicators, so first verify we haven't split even when690# making the new group and running an operation on the original pg.691ng = c10d.new_group()692tensor = torch.tensor([self.rank]).cuda(device)693pg.broadcast(tensor, 0)694self.assertEqual(backend.comm_split_count(), 0)695
696# The new group will force a split of the original on first use.697ng.broadcast(tensor, 0)698self.assertEqual(backend.comm_split_count(), 1)699
700@requires_nccl_version((2, 18), "Need NCCL 2.18+ for ncclCommSplit")701@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")702@skip_but_pass_in_sandcastle_if(703torch.cuda.nccl.version()[-1] == "x", "NCCL test not for NCCLX"704)705def test_comm_split_subgroup(self):706# Test `ncclCommSplit` for smaller subgroups of the world when707# we've passed a specific device_id to init_process_group.708store = c10d.FileStore(self.file_name, self.world_size)709device = torch.device(f"cuda:{self.rank}")710pg = self._create_process_group_nccl(store, self.opts(), device_id=device)711backend = pg._get_backend(torch.device(device))712
713tensor = torch.full((1,), self.rank).cuda(device)714original_tensor = tensor.clone()715ng = c10d.new_group([0])716
717# comm split happens eagerly since device_id is passed to init_process_group.718self.assertEqual(backend.comm_split_count(), 1)719if self.rank == 0:720dist.broadcast(tensor, 0, group=ng)721
722# no additional comm split happens after a collective.723self.assertEqual(backend.comm_split_count(), 1)724self.assertEqual(tensor, original_tensor)725dist.destroy_process_group()726
727@requires_nccl_version((2, 18), "Need NCCL 2.18+ for ncclCommSplit")728@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")729def test_comm_split_group(self):730# Test `ncclCommSplit` for smaller subgroups of the world when731# we've passed a specific device_id to init_process_group.732store = c10d.FileStore(self.file_name, self.world_size)733device = torch.device(f"cuda:{self.rank}")734pg = self._create_process_group_nccl(store, self.opts(), device_id=device)735backend = pg._get_backend(torch.device(device))736
737tensor = torch.full((1,), self.rank).cuda(device)738ng1 = c10d.split_group(pg, [[0, 1]])739backend1 = pg._get_backend(torch.device(device))740
741# check basic options are the same between parent and child742self.assertEqual(backend.options._timeout, backend1.options._timeout)743self.assertEqual(744backend.options.is_high_priority_stream,745backend1.options.is_high_priority_stream,746)747self.assertEqual(ng1.group_desc, "default_pg:split:0")748
749# comm split happens eagerly since device_id is passed to init_process_group.750self.assertEqual(backend.comm_split_count(), 1)751dist.broadcast(tensor, 0, group=ng1)752self.assertEqual(tensor, torch.full((1,), 0))753
754ng2 = c10d.split_group(pg, [[0, 1]])755self.assertEqual(ng2.group_desc, "default_pg:split:1")756self.assertEqual(backend.comm_split_count(), 2)757
758dist.destroy_process_group()759
760@requires_nccl_version((2, 18), "Need NCCL 2.18+ for ncclCommSplit")761@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")762def test_non_blocking_init(self):763# Test creating a pg using nonblocking mode but not eagerly764os.environ["TORCH_NCCL_USE_COMM_NONBLOCKING"] = "1"765os.environ["TORCH_NCCL_NONBLOCKING_TIMEOUT"] = "100"766store = c10d.FileStore(self.file_name, self.world_size)767device = self.rank_to_GPU[self.rank][0]768pg = self._create_process_group_nccl(store, self.opts())769backend = pg._get_backend(torch.device(device))770self.assertEqual(backend.comm_split_count(), 0)771reduce_tensor = torch.rand(10, 10, device=device)772# Run an allreduce, which should trigger a comm init for pg773pg.allreduce(reduce_tensor).wait()774new_pg = c10d.new_group()775# even after pg's collective call, new pg's comm is not initialized until its own collectcive calls776self.assertEqual(backend.comm_split_count(), 0)777broadcast_tensor = torch.tensor([self.rank]).cuda(device)778new_pg.broadcast(broadcast_tensor, 0).wait()779self.assertEqual(backend.comm_split_count(), 1)780dist.destroy_process_group()781
782@requires_nccl_version((2, 18), "Need NCCL 2.18+ for ncclCommSplit")783@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")784def test_non_blocking_with_eager_init(self):785# Test creating a pg eagerly with nonblocking mode when786# we've passed a specific device_id to init_process_group.787os.environ["TORCH_NCCL_USE_COMM_NONBLOCKING"] = "1"788os.environ["TORCH_NCCL_NONBLOCKING_TIMEOUT"] = "100"789store = c10d.FileStore(self.file_name, self.world_size)790device = torch.device(f"cuda:{self.rank}")791# bound device to triger eager init mode792pg = self._create_process_group_nccl(store, self.opts(), device_id=device)793backend = pg._get_backend(torch.device(device))794self.assertEqual(backend.comm_split_count(), 0)795reduce_tensor = torch.rand(10, 10, device=device)796# Run an allreduce, comm should have already started initilizaing,797# but allreduce is issued to CUDA STREAM only after the initialization is a success798pg.allreduce(reduce_tensor).wait()799new_pg = c10d.new_group()800# new pg's comm is initialized eagerly801self.assertEqual(backend.comm_split_count(), 1)802broadcast_tensor = torch.tensor([self.rank]).cuda(device)803new_pg.broadcast(broadcast_tensor, 0).wait()804self.assertEqual(backend.comm_split_count(), 1)805dist.destroy_process_group()806
807@requires_nccl()808@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")809def test_get_uid(self):810store = c10d.FileStore(self.file_name, self.world_size)811device = torch.device(f"cuda:{self.rank}")812pg = self._create_process_group_nccl(store, self.opts(), device_id=device)813from torch.distributed.distributed_c10d import _get_process_group_uid814
815self.assertEqual(_get_process_group_uid(pg), 0)816pg_2 = c10d.new_group([0, 1])817self.assertEqual(_get_process_group_uid(pg_2), 1)818
819@requires_nccl()820@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")821def test_set_process_group_desc(self):822store = c10d.FileStore(self.file_name, self.world_size)823device = torch.device(f"cuda:{self.rank}")824pg_default = self._create_process_group_nccl(825store, self.opts(), device_id=device826)827self.assertEqual(pg_default.group_desc, "default_pg")828pg_1 = c10d.new_group([0, 1], group_desc="test_purpose")829self.assertEqual(pg_1.group_desc, "test_purpose")830pg_2 = c10d.new_group([0, 1])831self.assertEqual(pg_2.group_desc, "undefined")832
833
834class DistributedDataParallelTest(835test_c10d_common.CommonDistributedDataParallelTest, MultiProcessTestCase836):837def setUp(self):838super().setUp()839# TORCH_NCCL_BLOCKING_WAIT overrides TORCH_NCCL_ASYNC_ERROR_HANDLING hence tests840# that use TORCH_NCCL_BLOCKING_WAIT will test it as expected.841os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "1"842self._spawn_processes()843
844def _get_process_group(self):845store = self._get_store()846c10d.init_process_group(847"nccl", store=store, rank=self.rank, world_size=self.world_size848)849return c10d.distributed_c10d._get_default_group()850
851def _test_nccl_backend(852self, devices, device_ids, multi_device=False, gradient_as_bucket_view=False853):854process_group = self._get_process_group()855self._test_ddp_with_process_group(856process_group, devices, device_ids, multi_device, gradient_as_bucket_view857)858
859@requires_nccl()860@skip_if_lt_x_gpu(2)861def test_nccl_propagate_error_reason(self):862# Need to use TORCH_NCCL_BLOCKING_WAIT and not ASYNC_ERROR_HANDLING,863# otherwise process will be taken down and we can't check for errors.864os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "0"865os.environ["TORCH_NCCL_BLOCKING_WAIT"] = "1"866# Need to disable TORCH_NCCL_DUMP_ON_TIMEOUT otherwise this test times out867os.environ["TORCH_NCCL_DUMP_ON_TIMEOUT"] = "0"868store = c10d.FileStore(self.file_name, self.world_size)869# provide sufficient timeout to initialize NCCL comm.870pg = c10d.ProcessGroupNCCL(871store, self.rank, self.world_size, timeout=timedelta(seconds=15)872)873pg_gloo = c10d.ProcessGroupGloo(store, self.rank, self.world_size)874pg.barrier().wait(timedelta(seconds=5))875# Simulate stuckness in rank 0.876if self.rank == 0:877pg_gloo.barrier().wait()878inp = torch.ones(1).cuda(self.rank)879
880if self.rank != 0:881# Time out due to rank 0 not calling into allreduce.882with self.assertRaises(dist.DistBackendError):883pg.allreduce([inp]).wait(timedelta(seconds=5))884
885# Now when nonzero rank attempts to use communicator, original failure reason should be logged.886try:887pg.allreduce([torch.ones(2).cuda(self.rank)]).wait()888except dist.DistBackendError as e:889self.assertTrue("aborted" in str(e))890else:891self.fail("Expected error to be raised!")892
893# Unblock rank 0894pg_gloo.barrier().wait()895
896# TODO: We can also test that if rank 0 attempts to use the communicator,897# then we should error out with the info that it was aborted due to898# timeout on another rank. Although this would only be the case after899# the watchdog has run on the rank, and there is no reliable way900# to confirm it has run.901
902@requires_nccl()903@skip_if_lt_x_gpu(2)904def test_nccl_backend_multi_device_ids_not_allowed(self):905int_devices = list(range(torch.cuda.device_count()))906devices = [torch.device("cuda:" + str(i)) for i in int_devices]907with self.assertRaisesRegex(908ValueError, "device_ids can only be None or contain a single element."909):910self._test_nccl_backend(devices, int_devices)911
912@requires_nccl()913@skip_if_lt_x_gpu(2)914def test_nccl_backend_single_device_module_device_ids_None(self):915self._test_nccl_backend(None, None)916
917@requires_nccl()918@skip_if_lt_x_gpu(2)919def test_nccl_backend_single_device_module_empty_device_ids(self):920# This tests the backward compatibility of accepting an empty list as `device_ids`,921# although we no longer document this in favor of the default value of `None`,922# which is consistent with multi-device modules and CPU modules.923self._test_nccl_backend(None, [])924
925@requires_nccl()926@skip_if_lt_x_gpu(4)927def test_nccl_backend_multi_device_module_device_ids_None(self):928int_devices = gpus_for_rank(self.world_size)[self.rank][:2]929devices = [torch.device("cuda:" + str(i)) for i in int_devices]930self._test_nccl_backend(devices, None, multi_device=True)931
932@requires_nccl()933@skip_if_lt_x_gpu(2)934def test_nccl_backend_1gpu_module_device_ids_integer_list(self):935int_devices = gpus_for_rank(self.world_size)[self.rank][:1]936devices = [torch.device("cuda:" + str(i)) for i in int_devices]937self._test_nccl_backend(devices, int_devices)938
939@requires_nccl()940@skip_if_lt_x_gpu(2)941def test_nccl_backend_1gpu_module_device_ids_torch_device_list(self):942int_devices = gpus_for_rank(self.world_size)[self.rank][:1]943devices = [torch.device("cuda:" + str(i)) for i in int_devices]944self._test_nccl_backend(devices, devices)945
946@requires_nccl()947@skip_if_lt_x_gpu(4)948def test_nccl_backend_2gpu_module(self):949int_devices = gpus_for_rank(self.world_size)[self.rank][:2]950devices = [torch.device("cuda:" + str(i)) for i in int_devices]951self._test_nccl_backend(devices, None, multi_device=True)952
953@requires_nccl()954@skip_if_lt_x_gpu(8)955def test_nccl_backend_4gpu_module(self):956int_devices = gpus_for_rank(self.world_size)[self.rank][:4]957devices = [torch.device("cuda:" + str(i)) for i in int_devices]958self._test_nccl_backend(devices, None, multi_device=True)959
960@requires_nccl()961@skip_if_lt_x_gpu(4)962def test_ddp_multi_device_module_config(self):963gpus = gpus_for_rank(self.world_size)[self.rank]964
965self.assertTrue(len(gpus) >= 2, "expecting at least 2 gpus per process")966
967process_group = self._get_process_group()968
969gpus = gpus[:2]970model = DoubleGpuNet(gpus)971
972with self.assertRaisesRegex(973ValueError,974"DistributedDataParallel device_ids and output_device arguments only work with "975"single-device/multiple-device GPU modules or CPU modules",976):977ddp_model = DistributedDataParallel(978model, output_device=gpus[1], process_group=process_group979)980
981with self.assertRaisesRegex(982ValueError, "device_ids can only be None or contain a single element."983):984ddp_model = DistributedDataParallel(985model, device_ids=gpus, process_group=process_group986)987
988with self.assertRaisesRegex(989ValueError, "input module must be on the same type of devices"990):991model.fc1 = model.fc1.cpu()992ddp_model = DistributedDataParallel(model, process_group=process_group)993
994model = model.cpu()995with self.assertRaisesRegex(996ValueError, "device_ids can only be None or contain a single element."997):998ddp_model = DistributedDataParallel(999model, device_ids=gpus, process_group=process_group1000)1001
1002def _test_fp16(self, gradient_as_bucket_view=False):1003process_group = self._get_process_group()1004
1005gpus = gpus_for_rank(self.world_size)[self.rank]1006model = nn.Linear(1, 1, bias=False).cuda(gpus[0]).half()1007nn.init.constant_(model.weight, 1)1008ddp_model = DistributedDataParallel(1009model,1010device_ids=[gpus[0]],1011process_group=process_group,1012bucket_cap_mb=0.001,1013gradient_as_bucket_view=gradient_as_bucket_view,1014)1015
1016# Input 2**15, so that the gradients will overflow with a1017# world_size of 2, unless we normalize the gradient by the1018# world_size before the reduction1019input = torch.tensor([[2**15]]).cuda(gpus[0]).half()1020
1021# Step model1022ddp_model.train()1023output = ddp_model(input)1024loss = output.sum()1025loss.backward()1026
1027self.assertFalse(any(torch.isinf(p.grad).any() for p in ddp_model.parameters()))1028
1029@requires_nccl()1030@skip_if_lt_x_gpu(2)1031def test_fp16(self):1032self._test_fp16()1033
1034@requires_nccl()1035@skip_if_lt_x_gpu(2)1036def test_fp16_grad_is_view(self):1037self._test_fp16(gradient_as_bucket_view=True)1038
1039def _test_arbitrary_forward_return_value(self, gradient_as_bucket_view=False):1040"""1041Note: this test can be sped up by only running it on a CPU module
1042once DistributedDataParallel supports them.
1043"""
1044process_group = self._get_process_group()1045
1046class ForwardReturnValueModule(nn.Module):1047def __init__(self) -> None:1048super().__init__()1049self.fc1 = nn.Linear(2, 10, bias=False)1050self.fc2 = nn.Linear(10, 4, bias=False)1051self.fc3 = nn.Linear(4, 4, bias=False)1052self.relu = nn.ReLU()1053
1054def forward(self, x, fn):1055x = self.relu(self.fc1(x))1056x = self.relu(self.fc2(x))1057# The first softmax does NOT include fc3 in its autograd graph1058# whereas the second softmax DOES. If we pass only the first1059# tensor we see in the output to the reducer, it marks the1060# gradient for fc3 as ready (because it doesn't show up). If1061# downstream uses of this return value choose to differentiate1062# against the second output tensor, it would still receive a1063# gradient and a callback for this tensor, resulting in a crash.1064return fn(1065F.softmax(x, dim=1),1066F.softmax(self.fc3(x), dim=1),1067)1068
1069device_id = gpus_for_rank(self.world_size)[self.rank][0]1070model = DistributedDataParallel(1071ForwardReturnValueModule().float().to(device_id),1072device_ids=[device_id],1073process_group=process_group,1074gradient_as_bucket_view=gradient_as_bucket_view,1075)1076
1077batch_size = 41078criterion = nn.CrossEntropyLoss()1079input = torch.rand([batch_size, 2], dtype=torch.float)1080target = torch.LongTensor([random.randrange(4) for _ in range(batch_size)]).to(1081device_id
1082)1083
1084# Always run "backward" to ensure the reducer is called by autograd.1085# If we don't correctly capture the output tensors from the return value,1086# the reducer won't see a hook for the unused parameter, and throw an error.1087# The correct capture is what we're testing in this function.1088def test(box, unbox):1089output = model(input, fn=box)1090loss = criterion(unbox(output), target)1091loss.backward()1092
1093# Test with identity return value1094test(1095box=lambda x, y: (x, y),1096unbox=lambda obj: obj[1],1097)1098
1099# Test with list return value1100test(1101box=lambda x, y: ["foo", x, "bar", y],1102unbox=lambda obj: obj[3],1103)1104
1105# Test with tuple return value1106test(1107box=lambda x, y: ("foo", x, "bar", y),1108unbox=lambda obj: obj[3],1109)1110
1111# Test with dict return value1112test(1113box=lambda x, y: {"foo": "bar", "a": x, "b": y},1114unbox=lambda obj: obj["b"],1115)1116
1117# Test with list with dict return value1118test(1119box=lambda x, y: ["foo", "bar", {"a": x, "b": y}],1120unbox=lambda obj: obj[2]["b"],1121)1122
1123# Test with dict with list return value1124test(1125box=lambda x, y: {"foo": "bar", "list": [0, x, 1, y]},1126unbox=lambda obj: obj["list"][3],1127)1128
1129@requires_nccl()1130@skip_if_lt_x_gpu(2)1131def test_arbitrary_forward_return_value(self):1132self._test_arbitrary_forward_return_value()1133
1134@requires_nccl()1135@skip_if_lt_x_gpu(2)1136def test_arbitrary_forward_return_value_grad_is_view(self):1137self._test_arbitrary_forward_return_value(gradient_as_bucket_view=True)1138
1139@requires_nccl()1140@skip_if_lt_x_gpu(2)1141def test_ddp_with_lazy_parameters(self):1142process_group = self._get_process_group()1143with self.assertRaisesRegex(1144RuntimeError, "Modules with uninitialized parameters"1145):1146DistributedDataParallel(1147torch.nn.LazyLinear(10), process_group=process_group1148)1149
1150def _test_find_unused_parameters_kwarg(self, gradient_as_bucket_view=False):1151"""1152Note: this test can be sped up by only running it on a CPU module
1153once DistributedDataParallel supports them.
1154"""
1155torch.cuda.set_device(self.rank)1156dist.init_process_group(1157backend="nccl",1158world_size=self.world_size,1159rank=self.rank,1160init_method=f"file://{self.file_name}",1161)1162process_group = c10d.distributed_c10d._get_default_group()1163
1164class FindUnusedParametersModule(nn.Module):1165def __init__(self) -> None:1166super().__init__()1167self.fc1 = nn.Linear(2, 10, bias=False)1168self.fc2 = nn.Linear(10, 4, bias=False)1169self.fc3 = nn.Linear(4, 4, bias=False)1170self.relu = nn.ReLU()1171
1172def forward(self, x):1173x = self.relu(self.fc1(x))1174x = self.relu(self.fc2(x))1175# Return the fc3 module so that the caller can invoke it1176# outside of the forward function. While this is bad practice,1177# we can use it to trigger a reducer error.1178return (F.softmax(x, dim=1), self.fc3)1179
1180device_id = gpus_for_rank(self.world_size)[self.rank][0]1181batch_size = 41182criterion = nn.CrossEntropyLoss()1183input = torch.rand([batch_size, 2], dtype=torch.float)1184target = torch.LongTensor([random.randrange(4) for _ in range(batch_size)]).to(1185device_id
1186)1187
1188ddp_model = None1189
1190def test_find_unused_parameters(1191find_unused_parameters, test_default=False, gradient_as_bucket_view=False1192):1193if test_default:1194model = DistributedDataParallel(1195FindUnusedParametersModule().float().to(device_id),1196device_ids=[device_id],1197process_group=process_group,1198gradient_as_bucket_view=gradient_as_bucket_view,1199)1200else:1201model = DistributedDataParallel(1202FindUnusedParametersModule().float().to(device_id),1203device_ids=[device_id],1204process_group=process_group,1205find_unused_parameters=find_unused_parameters,1206gradient_as_bucket_view=gradient_as_bucket_view,1207)1208nonlocal ddp_model1209ddp_model = model1210
1211output, fc3 = model(input)1212output = fc3(output)1213loss = criterion(output, target)1214loss.backward()1215
1216# First test that finding unused params under these conditions is to1217# trigger an error when `backward` is called (because fc3 is an unused1218# parameter and will therefore be marked ready twice).1219try:1220test_find_unused_parameters(1221True, gradient_as_bucket_view=gradient_as_bucket_view1222)1223except Exception as ex:1224self.assertTrue(1225str(ex).startswith(1226"Expected to mark a variable ready only once.",1227)1228)1229unused_index = 21230unused_index_str = f"Parameter at index {unused_index}"1231model = ddp_model.module1232for module_name, module in model.named_modules():1233if module == model.fc3:1234for parameter_name, _ in module.named_parameters(recurse=False):1235unused_fqn = f"{module_name}.{parameter_name}"1236# Only one such parameter in model.fc3, since bias=False1237break1238
1239if dist.get_debug_level() != dist.DebugLevel.OFF:1240unused_index_str += f" with name {unused_fqn}"1241
1242self.assertTrue(unused_index_str in str(ex))1243else:1244self.fail("Expected exception")1245
1246dist.barrier(process_group)1247
1248# Then test that the default behavior can be overridden by setting1249# `find_unused_parameters=False`.1250try:1251test_find_unused_parameters(1252False, gradient_as_bucket_view=gradient_as_bucket_view1253)1254except Exception as ex:1255self.fail(f"Unexpected exception: {ex}")1256
1257# Test find_unused_parameters defaults to False1258try:1259test_find_unused_parameters(1260True, test_default=True, gradient_as_bucket_view=gradient_as_bucket_view1261)1262except Exception as ex:1263self.fail(f"Unexpected exception: {ex}")1264
1265# TODO: Combine the following tests once https://github.com/pytorch/pytorch/issues/559671266# is resolved.1267@requires_nccl()1268@skip_if_lt_x_gpu(2)1269@with_dist_debug_levels(levels=["DETAIL"])1270def test_find_unused_parameters_kwarg_debug_detail(self):1271self._test_find_unused_parameters_kwarg()1272
1273@requires_nccl()1274@skip_if_lt_x_gpu(2)1275@with_dist_debug_levels(levels=["INFO"])1276def test_find_unused_parameters_kwarg_debug_info(self):1277self._test_find_unused_parameters_kwarg()1278
1279@requires_nccl()1280@skip_if_lt_x_gpu(2)1281@with_dist_debug_levels(levels=["OFF"])1282def test_find_unused_parameters_kwarg_debug_off(self):1283self._test_find_unused_parameters_kwarg()1284
1285@requires_nccl()1286@skip_if_lt_x_gpu(2)1287@with_dist_debug_levels(levels=["DETAIL"])1288def test_find_unused_parameters_kwarg_grad_is_view_debug_detail(self):1289self._test_find_unused_parameters_kwarg(gradient_as_bucket_view=True)1290
1291@requires_nccl()1292@skip_if_lt_x_gpu(2)1293@with_dist_debug_levels(levels=["INFO"])1294def test_find_unused_parameters_kwarg_grad_is_view_debug_info(self):1295self._test_find_unused_parameters_kwarg(gradient_as_bucket_view=True)1296
1297@requires_nccl()1298@skip_if_lt_x_gpu(2)1299@with_dist_debug_levels(levels=["OFF"])1300def test_find_unused_parameters_kwarg_grad_is_view_debug_off(self):1301self._test_find_unused_parameters_kwarg(gradient_as_bucket_view=True)1302
1303def _test_multiple_outputs_multiple_backward(self, gradient_as_bucket_view=False):1304"""1305Note: this test can be sped up by only running it on a CPU module
1306once DistributedDataParallel supports them.
1307"""
1308process_group = self._get_process_group()1309
1310class MultipleOutputModule(nn.Module):1311def __init__(self) -> None:1312super().__init__()1313
1314def define_module():1315return nn.Sequential(1316nn.Linear(2, 10, bias=False),1317nn.ReLU(),1318nn.Linear(10, 4, bias=False),1319nn.ReLU(),1320)1321
1322self.module0 = define_module()1323self.module1 = define_module()1324
1325def forward(self, x):1326return (1327F.softmax(self.module0(x), dim=1),1328F.softmax(self.module1(x), dim=1),1329)1330
1331device_id = gpus_for_rank(self.world_size)[self.rank][0]1332model = DistributedDataParallel(1333MultipleOutputModule().float().to(device_id),1334device_ids=[device_id],1335process_group=process_group,1336gradient_as_bucket_view=gradient_as_bucket_view,1337)1338
1339batch_size = 41340criterion = nn.CrossEntropyLoss()1341input = torch.rand([batch_size, 2], dtype=torch.float)1342target = torch.LongTensor([random.randrange(4) for _ in range(batch_size)]).to(1343device_id
1344)1345
1346# Compute loss and gradients for both outputs1347output1, output2 = model(input)1348loss1 = criterion(output1, target)1349loss1.backward()1350loss2 = criterion(output2, target)1351loss2.backward()1352
1353@requires_nccl()1354@skip_if_lt_x_gpu(2)1355def test_multiple_outputs_multiple_backward(self):1356self._test_multiple_outputs_multiple_backward()1357
1358@requires_nccl()1359@skip_if_lt_x_gpu(2)1360def test_multiple_outputs_multiple_backward_grad_is_view(self):1361self._test_multiple_outputs_multiple_backward(gradient_as_bucket_view=True)1362
1363@requires_nccl()1364@skip_if_lt_x_gpu(2)1365def test_no_grad(self):1366"""1367Note: this test can be sped up by only running it on a CPU module
1368once DistributedDataParallel supports them.
1369"""
1370process_group = self._get_process_group()1371
1372class NoGradModule(nn.Module):1373def __init__(self) -> None:1374super().__init__()1375self.fc1 = nn.Linear(2, 10, bias=False)1376self.fc2 = nn.Linear(10, 4, bias=False)1377self.relu = nn.ReLU()1378
1379def forward(self, x):1380x = self.relu(self.fc1(x))1381x = self.relu(self.fc2(x))1382return F.softmax(x, dim=1)1383
1384device_id = gpus_for_rank(self.world_size)[self.rank][0]1385model = DistributedDataParallel(1386NoGradModule().float().to(device_id),1387device_ids=[device_id],1388process_group=process_group,1389)1390
1391batch_size = 41392input = torch.rand([batch_size, 2], dtype=torch.float)1393
1394def check_no_grads():1395for p in model.parameters():1396self.assertTrue(p.requires_grad)1397self.assertIsNone(p.grad)1398
1399# After initialization, no parameter has their gradient set.1400check_no_grads()1401
1402# Run `forward` function with torch.no_grad()1403with torch.no_grad():1404output = model(input)1405self.assertTrue(isinstance(output, torch.Tensor))1406
1407# No parameter should have their gradient set.1408check_no_grads()1409
1410def _test_accumulate_gradients_module(self, gradient_as_bucket_view=False):1411# This is NOT the recommended way to implement accumulating grads, but1412# we would like to make sure DDP does not mess up with the underlying1413# module.1414int_devices = gpus_for_rank(self.world_size)[self.rank][:1]1415devices = [torch.device("cuda:" + str(i)) for i in int_devices]1416process_group = self._get_process_group()1417global_batch_size = self.world_size1418
1419model, ddp_model, input, target = self._prepare_single_device_module(1420process_group, devices, devices, global_batch_size, gradient_as_bucket_view1421)1422
1423def step_model(model, input, target):1424model.train()1425output = model(input)1426loss = F.mse_loss(output, target.to(output.device))1427loss.backward()1428
1429# ensure accumulate grads works with no_grad1430with torch.no_grad():1431ddp_model.train()1432ddp_model.module(input)1433
1434# Check two model parameters over 4 iterations.1435# Use 4 iterations because we alternate between reducing and1436# not reducing and want to make sure we switch both ways.1437for iteration in range(4):1438step_model(model, input, target)1439
1440if iteration % 2 == 0:1441# Skip gradients sync without calling prepare_for_backward1442step_model(1443ddp_model.module,1444input[self.rank : (self.rank + 1)],1445target[self.rank : (self.rank + 1)],1446)1447for i, j in zip(model.parameters(), ddp_model.parameters()):1448self.assertNotEqual(i.grad, j.grad)1449else:1450step_model(1451ddp_model,1452input[self.rank : (self.rank + 1)],1453target[self.rank : (self.rank + 1)],1454)1455for i, j in zip(model.parameters(), ddp_model.parameters()):1456self.assertEqual(i.grad, j.grad, rtol=1.3e-06, atol=5e-5)1457
1458# Shuffle the input so that DDP input is different1459torch.manual_seed(1337 + iteration)1460input = input[torch.randperm(global_batch_size)]1461
1462@requires_nccl()1463@skip_if_lt_x_gpu(2)1464def test_accumulate_gradients_module(self):1465self._test_accumulate_gradients_module()1466
1467@requires_nccl()1468@skip_if_lt_x_gpu(2)1469def test_accumulate_gradients_module_with_grad_is_view(self):1470self._test_accumulate_gradients_module(gradient_as_bucket_view=True)1471
1472@requires_nccl()1473@skip_if_lt_x_gpu(2)1474def test_failure_recovery(self):1475process_group = self._get_process_group()1476
1477# need to create a separate file for the recovered FileStore, because1478# the original one will be deleted when destructing the first FileStore.1479recovery_filename = self.file_name + "_recovery"1480
1481if self.rank == 0:1482# the file will be deleted by the recovered FileStore1483open(recovery_filename, "w").close()1484
1485# not necessary to run barrier here, as DDP will synchronize1486
1487class TestModel(nn.Module):1488def __init__(self) -> None:1489super().__init__()1490self.fc1 = nn.Linear(2, 10, bias=False)1491self.fc2 = nn.Linear(10, 4, bias=False)1492self.relu = nn.ReLU()1493
1494def forward(self, x):1495x = self.relu(self.fc1(x))1496x = self.relu(self.fc2(x))1497return F.softmax(x, dim=1)1498
1499device_id = gpus_for_rank(self.world_size)[self.rank][0]1500model = TestModel().float().to(device_id)1501ddp = DistributedDataParallel(1502model,1503device_ids=[device_id],1504process_group=process_group,1505)1506
1507batch_size = 41508criterion = nn.CrossEntropyLoss()1509input = torch.rand([batch_size, 2], dtype=torch.float)1510target = torch.LongTensor([random.randrange(4) for _ in range(batch_size)]).to(1511device_id
1512)1513
1514for _ in range(6):1515output = ddp(input)1516loss = criterion(output, target)1517loss.backward()1518
1519del ddp1520c10d.destroy_process_group(process_group)1521
1522store = c10d.FileStore(recovery_filename, self.world_size)1523c10d.init_process_group(1524"nccl", store=store, rank=self.rank, world_size=self.world_size1525)1526process_group = c10d.distributed_c10d._get_default_group()1527ddp = DistributedDataParallel(1528model,1529device_ids=[device_id],1530process_group=process_group,1531)1532
1533input = torch.rand([batch_size, 2], dtype=torch.float)1534target = torch.LongTensor([random.randrange(4) for _ in range(batch_size)]).to(1535device_id
1536)1537for _ in range(6):1538output = ddp(input)1539loss = criterion(output, target)1540loss.backward()1541
1542@requires_nccl()1543@skip_if_lt_x_gpu(2)1544def test_pass_default_pg(self):1545dist.init_process_group(1546"nccl",1547init_method=f"file://{self.file_name}",1548world_size=self.world_size,1549rank=self.rank,1550)1551
1552default_pg = c10d.distributed_c10d._get_default_group()1553dist.destroy_process_group(default_pg)1554self.assertFalse(dist.is_initialized())1555
1556def _test_grad_layout(self, replica_devices, layer_devs, local_batch_size):1557process_group = self._get_process_group()1558
1559global_batch_size = local_batch_size * self.world_size1560
1561# Carry out some trials with small buckets and some with big buckets.1562bucketsizes = (0.000001, 25)1563# Tuples of lists. Each list describes per-layer characteristics for one trial.1564layer_formats = (1565[torch.contiguous_format] * 4,1566[torch.channels_last] * 2 + [torch.contiguous_format] * 2,1567[torch.channels_last] * 4,1568)1569layer_dtypes = (1570[torch.float] * 4,1571[torch.float] * 2 + [torch.half] * 2,1572[torch.half] * 4,1573)1574
1575input_dev = layer_devs[0] if isinstance(layer_devs, list) else layer_devs1576target_dev = layer_devs[-1] if isinstance(layer_devs, list) else layer_devs1577input = torch.randn(1578(global_batch_size, 8, 8, 8), device=input_dev, dtype=torch.float1579)1580target = torch.randn(1581(global_batch_size, 8, 4, 4), device=target_dev, dtype=torch.float1582)1583local_batch_start = self.rank * local_batch_size1584local_batch_end = (self.rank + 1) * local_batch_size1585
1586# Reducer.cpp sneakily creates one "initial bucket" that ignores the "bucket_cap_mb"1587# argument. The following makes sure the initial bucket also complies.1588@contextmanager1589def first_bucket_size(ddp_bucket_mb):1590old_DEFAULT_FIRST_BUCKET_BYTES = dist._DEFAULT_FIRST_BUCKET_BYTES1591dist._DEFAULT_FIRST_BUCKET_BYTES = int(ddp_bucket_mb * 1.0e6)1592try:1593yield1594finally:1595dist._DEFAULT_FIRST_BUCKET_BYTES = old_DEFAULT_FIRST_BUCKET_BYTES1596
1597with torch.backends.cudnn.flags(1598enabled=True, deterministic=True, benchmark=False1599):1600for formats, dtypes, bucketsize in product(1601layer_formats, layer_dtypes, bucketsizes1602):1603with first_bucket_size(bucketsize):1604model_msg = f"rank = {self.rank} formats = {formats} dtypes = {dtypes} bucketsize = {bucketsize} "1605try:1606m = ConvNet(layer_devs, formats, dtypes)1607m_ddp = DistributedDataParallel(1608copy.deepcopy(m),1609device_ids=replica_devices,1610process_group=process_group,1611bucket_cap_mb=bucketsize,1612)1613opt = torch.optim.SGD(m.parameters(), lr=0.1)1614opt_ddp = torch.optim.SGD(m_ddp.parameters(), lr=0.1)1615has_half = any(p.dtype is torch.half for p in m.parameters())1616tol = 1.0e-3 if has_half else 1.0e-51617except BaseException:1618# Prints case-specific debugging info to narrow down failing case.1619print(1620"Caught exception during model creation for " + model_msg,1621flush=True,1622)1623raise1624# 3 iters: First iter creates grads, second iter retests after rebucketing,1625# third iter tries zeroed grads.1626for it in range(3):1627iter_msg = f"iter = {it} " + model_msg1628named_msg = iter_msg1629try:1630F.mse_loss(m(input).float(), target).backward()1631F.mse_loss(1632m_ddp(input[local_batch_start:local_batch_end]).float(),1633target[local_batch_start:local_batch_end],1634).backward()1635for i, ((layer_name, m_child), m_ddp_child) in enumerate(1636zip(m.named_children(), m_ddp.module.children())1637):1638named_msg = layer_name + ".weight" + " " + iter_msg1639self.assertTrue(1640m_child.weight.grad.is_contiguous(1641memory_format=formats[i]1642),1643named_msg,1644)1645self.assertTrue(1646m_ddp_child.weight.grad.is_contiguous(1647memory_format=formats[i]1648),1649named_msg,1650)1651for j, ((param_name, p), p_ddp) in enumerate(1652zip(1653m_child.named_parameters(),1654m_ddp_child.parameters(),1655)1656):1657named_msg = (1658layer_name + "." + param_name + " " + iter_msg1659)1660self.assertEqual(1661p.grad, p_ddp.grad, rtol=tol, atol=tol1662)1663opt.step()1664opt_ddp.step()1665if it == 0:1666for p, p_ddp in zip(m.parameters(), m_ddp.parameters()):1667p.grad = None1668p_ddp.grad = None1669else:1670m.zero_grad()1671m_ddp.zero_grad()1672except BaseException:1673# Makes sure we still get info if an error occurred somewhere other than the asserts.1674print(1675"Caught exception during iterations at " + named_msg,1676flush=True,1677)1678raise1679
1680@requires_nccl()1681@skip_if_lt_x_gpu(2)1682def test_grad_layout_1devicemodule_1replicaperprocess(self):1683dev0 = torch.device("cuda:" + str(gpus_for_rank(self.world_size)[self.rank][0]))1684# Tells DDP to use just one device.1685replica_devices = [dev0]1686# Tells _test_grad_layout to construct ConvNet with all layers on this process's first assigned device.1687layer_devs = dev01688local_batch_size = 81689self._test_grad_layout(replica_devices, layer_devs, local_batch_size)1690
1691@requires_nccl()1692@skip_if_lt_x_gpu(4)1693@skip_if_rocm1694def test_grad_layout_2devicemodule(self):1695int_devices = gpus_for_rank(self.world_size)[self.rank][:2]1696dev0 = torch.device("cuda:" + str(int_devices[0]))1697dev1 = torch.device("cuda:" + str(int_devices[1]))1698# DDP's default behavior for a multi-device module is "don't replicate."1699replica_devices = None1700# Tells _test_grad_layout to constructs this process's ConvNet on 2 devices, with 2 layers on each device.1701layer_devs = [dev0] * 2 + [dev1] * 21702local_batch_size = 81703self._test_grad_layout(replica_devices, layer_devs, local_batch_size)1704
1705@requires_nccl()1706@skip_if_lt_x_gpu(2)1707def test_param_layout_mismatch_error(self):1708process_group = self._get_process_group()1709
1710dev0 = torch.device("cuda:" + str(gpus_for_rank(self.world_size)[self.rank][0]))1711layer_devs = dev01712layer_formats = (1713[torch.contiguous_format] * 41714if self.rank == 01715else [torch.channels_last] * 41716)1717layer_dtypes = [torch.float] * 41718
1719m = ConvNet(layer_devs, layer_formats, layer_dtypes)1720if self.rank == 0:1721m_ddp = DistributedDataParallel(1722m, device_ids=[dev0], process_group=process_group1723)1724else:1725with self.assertRaisesRegex(1726RuntimeError,1727".* appears not to match strides of the same param in process 0",1728):1729m_ddp = DistributedDataParallel(1730m, device_ids=[dev0], process_group=process_group1731)1732
1733def _gpu_model_with_ddp_comm_hook(1734self,1735process_group,1736hook=None,1737gradient_as_bucket_view=False,1738state=None,1739static_graph=False,1740):1741device_id = gpus_for_rank(self.world_size)[self.rank][0]1742gpu_model = DistributedDataParallel(1743ModuleForDdpCommHook().to(device_id),1744device_ids=[device_id],1745process_group=process_group,1746gradient_as_bucket_view=gradient_as_bucket_view,1747static_graph=static_graph,1748)1749
1750# Register a DDP communication hook if any.1751if hook is not None:1752gpu_model.register_comm_hook(state, hook)1753
1754return gpu_model1755
1756@requires_nccl()1757@skip_if_lt_x_gpu(2)1758def test_ddp_comm_hook_future_passing_gpu_nccl(self):1759"""1760This unit test verifies whether the Future object is passed properly using nccl backend.
1761The hook callback function creates a Future object and sets a value to it.
1762"""
1763process_group = self._get_process_group()1764
1765# Get GPU model with simple_hook registered.1766gpu_model = self._gpu_model_with_ddp_comm_hook(process_group, self._simple_hook)1767
1768# check whether the grads are equal to what simple_hook's then callback returns.1769# without the comm_hook, result would be 0.25 * torch.ones(2, 2).1770self._run_and_verify_hook(gpu_model, 8, 2 * torch.ones(2, 2))1771
1772def _test_ddp_comm_hook_allreduce_hook_nccl(1773self, gradient_as_bucket_view=False, static_graph=False1774):1775"""1776This unit test verifies whether a DDP communication hook that just calls
1777allreduce gives the same result with the case of no hook registered.
1778Without the then callback, the future_value in reducer is no longer
1779a PyObject, and this unit test verifies future_value is properly checked.
1780"""
1781process_group = self._get_process_group()1782
1783def allreduce_hook(1784state: object, bucket: dist.GradBucket1785) -> torch.futures.Future[torch.Tensor]:1786tensors = [bucket.buffer() / self.world_size]1787return (1788process_group.allreduce(tensors)1789.get_future()1790.then(lambda fut: fut.value()[0])1791)1792
1793# Get GPU model with allreduce_hook registered.1794gpu_model = self._gpu_model_with_ddp_comm_hook(1795process_group, allreduce_hook, gradient_as_bucket_view, static_graph1796)1797
1798# check whether the grads are equal to what DDP without hook would return.1799self._run_and_verify_hook(gpu_model, 8, 0.25 * torch.ones(2, 2))1800
1801def _test_default_ddp_comm_hooks_nccl(self, gradient_as_bucket_view=False):1802"""1803This unit test verifies whether default Python DDP communication hooks ALLREDUCE, FP16_COMPRESS
1804and BF16_COMPRESS, can give the same result with the case of no hook registered.
1805"""
1806process_group = self._get_process_group()1807
1808# For these default DDP comm hooks, the only state is process group.1809state = process_group1810hook_options = [default.allreduce_hook, default.fp16_compress_hook]1811if (1812not TEST_WITH_ROCM1813and BFLOAT16_AVAILABLE1814and c10d.is_nccl_available()1815and torch.cuda.nccl.version() >= (2, 10)1816):1817hook_options.append(default.bf16_compress_hook)1818for hook in hook_options:1819# Get GPU model with the hook registered.1820# The first arg 'process_group' is used for initializing the test environment,1821# so it cannot be replaced by 'state', although they have the same value.1822gpu_model = self._gpu_model_with_ddp_comm_hook(1823process_group, hook, gradient_as_bucket_view, state1824)1825
1826# check whether the grads are equal to what DDP without hook would return.1827self._run_and_verify_hook(gpu_model, 8, 0.25 * torch.ones(2, 2))1828
1829def _test_fp16_compress_wrapper(self, gradient_as_bucket_view=False):1830"""1831This unit test verifies whether wrapping the ALLREDUCE and POWER_SGD hooks with
1832the FP16_WRAPPER can give the same result as when there is no hook registered.
1833"""
1834process_group = self._get_process_group()1835powerSGD_state = powerSGD.PowerSGDState(process_group=process_group)1836
1837hook_args = [1838(powerSGD.powerSGD_hook, powerSGD_state),1839(default.allreduce_hook, process_group),1840]1841
1842for hook, state in hook_args:1843gpu_model = self._gpu_model_with_ddp_comm_hook(1844process_group,1845default.fp16_compress_wrapper(hook),1846gradient_as_bucket_view,1847state,1848)1849
1850# check whether the grads are equal to what DDP without hook would return.1851self._run_and_verify_hook(gpu_model, 8, 0.25 * torch.ones(2, 2))1852
1853def _test_bf16_compress_wrapper(self, gradient_as_bucket_view=False):1854"""1855This unit test verifies whether wrapping the ALLREDUCE and POWER_SGD hooks with
1856the BF16_WRAPPER can give the same result as when there is no hook registered.
1857"""
1858process_group = self._get_process_group()1859powerSGD_state = powerSGD.PowerSGDState(process_group=process_group)1860
1861hook_args = [1862(powerSGD.powerSGD_hook, powerSGD_state),1863(default.allreduce_hook, process_group),1864]1865
1866for hook, state in hook_args:1867gpu_model = self._gpu_model_with_ddp_comm_hook(1868process_group,1869default.bf16_compress_wrapper(hook),1870gradient_as_bucket_view,1871state,1872)1873
1874# check whether the grads are equal to what DDP without hook would return.1875self._run_and_verify_hook(gpu_model, 8, 0.25 * torch.ones(2, 2))1876
1877def _test_powerSGD_ddp_comm_hook_nccl(self, gradient_as_bucket_view=False):1878"""1879This unit test verifies whether Python DDP communication hook POWER_SGD
1880can give the same result with the case of no hook registered.
1881"""
1882process_group = self._get_process_group()1883
1884# Get GPU model with the hook registered.1885# Test the hook with different algorithmic configs.1886for use_error_feedback, warm_start, batch_tensors_with_same_shape in product(1887[True, False],1888[True, False],1889[True, False],1890):1891state = powerSGD.PowerSGDState(1892process_group=process_group,1893matrix_approximation_rank=1,1894use_error_feedback=use_error_feedback,1895warm_start=warm_start,1896batch_tensors_with_same_shape=batch_tensors_with_same_shape,1897)1898for hook in [powerSGD.powerSGD_hook, powerSGD.batched_powerSGD_hook]:1899gpu_model = self._gpu_model_with_ddp_comm_hook(1900process_group, hook, gradient_as_bucket_view, state1901)1902
1903# check whether the grads are equal to what DDP without hook would return.1904self._run_and_verify_hook(gpu_model, 8, 0.25 * torch.ones(2, 2))1905
1906def _test_builtin_ddp_comm_hooks_nccl(self, gradient_as_bucket_view=False):1907"""1908This unit test verifies whether built-in C++ DDP communication hooks ALLREDUCE and FP16_COMPRESS
1909can give the same result with the case of no hook registered.
1910"""
1911process_group = self._get_process_group()1912
1913for comm_hook_type in [1914dist.BuiltinCommHookType.ALLREDUCE,1915dist.BuiltinCommHookType.FP16_COMPRESS,1916]:1917# Get GPU model with the built-in communication hook.1918gpu_model = self._gpu_model_with_builtin_ddp_comm_hook(1919process_group, comm_hook_type, gradient_as_bucket_view1920)1921
1922# check whether the grads are equal to what DDP without hook would return.1923self._run_and_verify_hook(gpu_model, 8, 0.25 * torch.ones(2, 2))1924
1925@requires_nccl()1926@skip_if_lt_x_gpu(2)1927def test_ddp_comm_hook_allreduce_hook_nccl(self):1928self._test_ddp_comm_hook_allreduce_hook_nccl()1929
1930@requires_nccl()1931@skip_if_lt_x_gpu(2)1932def test_default_ddp_comm_hooks_nccl(self):1933self._test_default_ddp_comm_hooks_nccl()1934
1935@requires_nccl()1936@skip_if_lt_x_gpu(2)1937def test_fp16_compress_wrapper_nccl(self):1938self._test_fp16_compress_wrapper()1939
1940@requires_nccl()1941@requires_nccl_version((2, 10), "Need NCCL 2.10+ for BF16_COMPRESS")1942@skip_but_pass_in_sandcastle_if(1943not BFLOAT16_AVAILABLE,1944"BFloat16 is only supported by CUDA 11+",1945)1946@skip_if_lt_x_gpu(2)1947def test_bf16_compress_wrapper_nccl(self):1948self._test_bf16_compress_wrapper()1949
1950@requires_nccl()1951@skip_if_lt_x_gpu(2)1952def test_builtin_ddp_comm_hooks_nccl(self):1953self._test_builtin_ddp_comm_hooks_nccl()1954
1955@requires_nccl()1956@skip_if_lt_x_gpu(2)1957def test_powerSGD_ddp_comm_hook_nccl(self):1958self._test_powerSGD_ddp_comm_hook_nccl()1959
1960@requires_nccl()1961@skip_if_lt_x_gpu(2)1962def test_ddp_comm_hook_allreduce_hook_nccl_grad_is_view(self):1963self._test_ddp_comm_hook_allreduce_hook_nccl(gradient_as_bucket_view=True)1964
1965@requires_nccl()1966@skip_if_lt_x_gpu(2)1967def test_ddp_comm_hook_allreduce_hook_nccl_static_graph(self):1968self._test_ddp_comm_hook_allreduce_hook_nccl(static_graph=True)1969
1970@requires_nccl()1971@skip_if_lt_x_gpu(2)1972def test_default_ddp_comm_hooks_nccl_is_view(self):1973self._test_default_ddp_comm_hooks_nccl(gradient_as_bucket_view=True)1974
1975@requires_nccl()1976@skip_if_lt_x_gpu(2)1977def test_fp16_compress_wrapper_is_view(self):1978self._test_fp16_compress_wrapper(gradient_as_bucket_view=True)1979
1980@requires_nccl()1981@requires_nccl_version((2, 10), "Need NCCL 2.10+ for BF16_COMPRESS")1982@skip_but_pass_in_sandcastle_if(1983not BFLOAT16_AVAILABLE,1984"BFloat16 is only supported by CUDA 11+",1985)1986@skip_if_lt_x_gpu(2)1987def test_bf16_compress_wrapper_is_view(self):1988self._test_bf16_compress_wrapper(gradient_as_bucket_view=True)1989
1990@requires_nccl()1991@skip_if_lt_x_gpu(2)1992def test_builtin_ddp_comm_hooks_nccl_grad_is_view(self):1993self._test_builtin_ddp_comm_hooks_nccl(gradient_as_bucket_view=True)1994
1995@requires_nccl()1996@skip_if_lt_x_gpu(2)1997def test_powerSGD_ddp_comm_hook_nccl_grad_is_view(self):1998self._test_powerSGD_ddp_comm_hook_nccl(gradient_as_bucket_view=True)1999
2000@requires_nccl()2001@skip_if_lt_x_gpu(2)2002def test_ddp_comm_hook_allreduce_with_then_hook_nccl(self):2003"""2004This unit test verifies whether a DDP communication hook that calls allreduce and then
2005multiplies the result by ten and divides by two gives the expected result.
2006"""
2007process_group = self._get_process_group()2008
2009def allreduce_with_then_hook(2010state: object, bucket: dist.GradBucket2011) -> torch.futures.Future[torch.Tensor]:2012tensors = [bucket.buffer() / self.world_size]2013fut = process_group.allreduce(tensors).get_future()2014
2015def mult(fut):2016# Multiply the result by 10.2017return 10 * fut.value()[0]2018
2019def div(fut):2020# Divide the result by 2.2021return 0.5 * fut.value()2022
2023return fut.then(mult).then(div)2024
2025# Get GPU model with allreduce_with_then_hook registered.2026gpu_model = self._gpu_model_with_ddp_comm_hook(2027process_group, allreduce_with_then_hook2028)2029
2030# check whether the grads are equal to what allreduce returns multiplied by 5.2031# without the comm_hook, result would be still 0.25 * torch.ones(2, 2).2032self._run_and_verify_hook(gpu_model, 8, 1.25 * torch.ones(2, 2))2033
2034class AcceptsParam(torch.nn.Module):2035def __init__(self, p, factor):2036super().__init__()2037self.a = p2038self.f = factor2039
2040def forward(self, input):2041return input + self.a * self.f2042
2043@requires_nccl()2044@skip_if_lt_x_gpu(2)2045def test_ddp_weight_sharing(self):2046process_group = self._get_process_group()2047
2048size = 2048 * 20482049dev = self.rank2050world = self.world_size2051
2052p = torch.nn.Parameter(torch.randn(size, requires_grad=True))2053
2054for try_set_to_none, use_bucket_view in product((False, True), (False, True)):2055m = torch.nn.Sequential(2056self.AcceptsParam(p, dev + 1), self.AcceptsParam(p, dev + 1)2057).cuda(dev)2058
2059m = torch.nn.parallel.DistributedDataParallel(2060m,2061bucket_cap_mb=1,2062gradient_as_bucket_view=use_bucket_view,2063device_ids=[dev],2064process_group=process_group,2065)2066
2067for i in range(3):2068m.zero_grad(set_to_none=try_set_to_none)2069m(1).sum().backward()2070
2071# Each param value is multiplied by "rank + 1" twice in forward, so the grad2072# values produced by a particular rank should be 2. * (rank + 1).2073# Summing these over ranks and dividing by world size gives the expected result:2074analytic = torch.full_like(2075p, 2.0 * (world * (world + 1.0) / 2.0) / world, device=dev2076)2077for name, p in m.named_parameters():2078self.assertEqual(2079p.grad,2080analytic,2081"mismatch at "2082+ name2083+ ".grad for "2084+ f"set_to_none = {try_set_to_none}, use_bucket_view = {use_bucket_view}",2085)2086
2087@requires_nccl()2088@skip_if_lt_x_gpu(2)2089def test_ddp_packed_sequence(self):2090"""2091Tests that DDP with ``device_ids`` specified can run a forward and
2092backward pass with ``PackedSequence`` s with parity compared to a local
2093version of the model.
2094"""
2095store = c10d.FileStore(self.file_name, self.world_size)2096process_group = dist.init_process_group(2097"nccl",2098world_size=self.world_size,2099rank=self.rank,2100store=store,2101)2102seqs = ["sequence_sequence", "seq", "sequence"]2103vocab = ["<pad>"] + sorted({ch for seq in seqs for ch in seq})2104vectorized_seqs = [[vocab.index(tok) for tok in seq] for seq in seqs]2105# Set the seed to make the embedding and LSTM deterministic (even2106# across ranks since DDP broadcasts parameters from rank 0)2107torch.manual_seed(0)2108embed = nn.Embedding(len(vocab), 4) # keep on CPU2109lstm = nn.LSTM(input_size=4, hidden_size=2, batch_first=True).to(self.rank)2110lstm_ddp = DistributedDataParallel(2111copy.deepcopy(lstm),2112device_ids=[self.rank],2113process_group=process_group,2114)2115for p1, p2 in zip(lstm.parameters(), lstm_ddp.module.parameters()):2116self.assertEqual(p1, p2)2117seq_lengths = torch.LongTensor(list(map(len, vectorized_seqs)))2118seq_tensor = torch.Tensor(2119torch.zeros((len(vectorized_seqs), seq_lengths.max()))2120).long()2121for i, (seq, seq_len) in enumerate(zip(vectorized_seqs, seq_lengths)):2122seq_tensor[i, :seq_len] = torch.LongTensor(seq)2123seq_lengths, permutation_idx = seq_lengths.sort(0, descending=True)2124seq_tensor = seq_tensor[permutation_idx]2125embedded_seq_tensor = embed(seq_tensor)2126packed_input = torch.nn.utils.rnn.pack_padded_sequence(2127embedded_seq_tensor,2128seq_lengths,2129batch_first=True,2130)2131packed_input_ddp = torch.nn.utils.rnn.pack_padded_sequence(2132embedded_seq_tensor.detach().clone(),2133seq_lengths,2134batch_first=True,2135)2136# Move the input to GPU explicitly for the local model2137packed_output, (ht, ct) = lstm(packed_input.to(self.rank))2138# Let DDP move the input to GPU internally2139packed_output_ddp, (ht_ddp, ct_ddp) = lstm_ddp(packed_input_ddp)2140self.assertEqual(packed_output.data, packed_output_ddp.data)2141self.assertEqual(ht, ht_ddp)2142self.assertEqual(ct, ct_ddp)2143packed_output.data.sum().backward()2144packed_output_ddp.data.sum().backward()2145for p1, p2 in zip(lstm.parameters(), lstm_ddp.parameters()):2146self.assertEqual(p1.grad, p2.grad)2147
2148@requires_nccl()2149@skip_if_lt_x_gpu(2)2150def test_channels_last_contig(self):2151process_group = self._get_process_group()2152device = torch.device(f"cuda:{self.rank}")2153tensor = torch.ones((2, 16, 768, 1152), dtype=torch.float32, device=device).to(2154memory_format=torch.channels_last2155)2156process_group.broadcast([tensor]).wait()2157
2158@requires_nccl()2159@skip_if_lt_x_gpu(2)2160def test_ddp_complex_params(self):2161class FFTModel(nn.Module):2162def __init__(self, hin, win, n_features):2163super().__init__()2164self.hin = hin2165self.win = win2166self.weight = nn.Parameter(2167torch.ones(2168(n_features, n_features, hin, win // 2 + 1), dtype=torch.cfloat2169)2170)2171
2172def forward(self, x):2173xc = torch.fft.rfft2(2174x, s=(self.hin, self.win), dim=(-2, -1), norm="ortho"2175)2176xcw = torch.einsum("nchw,cohw->nohw", xc, self.weight)2177x = torch.fft.irfft2(xcw, dim=(-2, -1), norm="ortho")2178return x2179
2180process_group = self._get_process_group()2181device_id = gpus_for_rank(self.world_size)[self.rank][0]2182N, C, H, W = 1, 16, 64, 642183ddp_model = DistributedDataParallel(2184FFTModel(hin=H, win=W, n_features=C).to(device_id),2185device_ids=[device_id],2186process_group=process_group,2187)2188optimizer = torch.optim.Adam(ddp_model.parameters(), lr=0.001)2189
2190inp = torch.ones((N, C, H, W), dtype=torch.float32)2191
2192# train step2193out = ddp_model(inp)2194loss = torch.sum(out)2195loss.backward()2196optimizer.step()2197
2198torch.cuda.synchronize(device=device_id)2199
2200
2201class WorkHookTest(MultiProcessTestCase):2202@property2203def world_size(self):2204return 22205
2206def setUp(self):2207super().setUp()2208# set TORCH_NCCL_ENABLE_TIMING to enable timing for CUDAEvents2209# in ProcessGroup Work2210os.environ["TORCH_NCCL_ENABLE_TIMING"] = "1"2211self._spawn_processes()2212
2213def tearDown(self):2214super().tearDown()2215del os.environ["TORCH_NCCL_ENABLE_TIMING"]2216try:2217os.remove(self.file_name)2218except OSError:2219pass2220
2221def _get_store(self):2222return dist.FileStore(self.file_name, self.world_size)2223
2224def _get_process_group(self):2225store = self._get_store()2226c10d.init_process_group(2227"nccl", store=store, rank=self.rank, world_size=self.world_size2228)2229return c10d.distributed_c10d._get_default_group()2230
2231@requires_nccl()2232@skip_if_lt_x_gpu(2)2233def test_on_completion_hook_broadcast(self):2234pg = self._get_process_group()2235num_hook_fired = 02236durations: List[float] = []2237
2238def hook(work_info: torch._C._distributed_c10d.WorkInfo):2239nonlocal num_hook_fired, durations2240num_hook_fired += 12241durations.append(work_info.active_duration.total_seconds())2242
2243pg._register_on_completion_hook(hook)2244tensor = torch.ones([2, 3]).cuda(self.rank) * self.rank2245pg.broadcast([tensor]).wait()2246pg.broadcast([tensor]).wait()2247
2248# N.B.: destroy_process_group is necessary to wait for2249# all pending works to finish.2250c10d.destroy_process_group(pg)2251
2252self.assertEqual(num_hook_fired, 2)2253self.assertEqual(len(durations), 2)2254for duration in durations:2255self.assertTrue(duration > 0)2256
2257self.assertEqual(tensor, torch.zeros([2, 3]).cuda(self.rank))2258
2259@requires_nccl()2260@skip_if_lt_x_gpu(2)2261def test_on_completion_hook_mixed_ops(self):2262pg = self._get_process_group()2263num_hook_fired = 02264durations: List[float] = []2265
2266def hook(work_info: torch._C._distributed_c10d.WorkInfo):2267nonlocal num_hook_fired, durations2268num_hook_fired += 12269durations.append(work_info.active_duration.total_seconds())2270
2271pg._register_on_completion_hook(hook)2272tensor = torch.ones([2, 3]).cuda(self.rank)2273tensor_list = [torch.empty_like(tensor) for _ in range(self.world_size)]2274# intentionally using async ops.2275pg.allreduce(tensor)2276pg.allgather(tensor_list, tensor)2277pg.allreduce(tensor)2278
2279# N.B.: destroy_process_group is necessary to wait for2280# all pending works to finish.2281c10d.destroy_process_group(pg)2282
2283self.assertEqual(num_hook_fired, 3)2284self.assertEqual(len(durations), 3)2285for duration in durations:2286self.assertTrue(duration > 0)2287
2288self.assertEqual(2289tensor,2290torch.ones([2, 3]).cuda(self.rank) * self.world_size * self.world_size,2291)2292
2293self.assertEqual(2294tensor_list,2295[2296torch.ones([2, 3]).cuda(self.rank) * self.world_size2297for _ in range(self.world_size)2298],2299)2300
2301@requires_nccl()2302@skip_if_lt_x_gpu(2)2303def test_on_completion_hook_with_ddp(self):2304pg = self._get_process_group()2305num_hook_fired: Dict[int, int] = {}2306durations: Dict[OpType, List[float]] = {}2307
2308def hook(work_info: torch._C._distributed_c10d.WorkInfo):2309nonlocal num_hook_fired, durations2310op_type = work_info.op_type2311if op_type not in num_hook_fired:2312num_hook_fired[op_type] = 02313durations[op_type] = []2314num_hook_fired[op_type] += 12315durations[op_type].append(work_info.active_duration.total_seconds())2316
2317pg._register_on_completion_hook(hook)2318
2319nlayers = 102320net = nn.Sequential(2321*[nn.Linear(1000, 1000, bias=False) for _ in range(nlayers)]2322).to(self.rank)2323
2324ddp = DistributedDataParallel(2325net,2326device_ids=[self.rank],2327process_group=pg,2328bucket_cap_mb=1,2329)2330
2331pg._wait_for_pending_works()2332
2333# DDP is expected to synchronize model parameter by broadcasting2334# from rank0 to other ranks. However, this is DDP's internal implementation,2335# which is subject to change in future versions.2336self.assertTrue(num_hook_fired[OpType.BROADCAST] > 0)2337ctor_allreduce = (2338num_hook_fired[OpType.ALLREDUCE]2339if OpType.ALLREDUCE in num_hook_fired2340else 02341)2342
2343x = torch.zeros(2, 1000).cuda(self.rank)2344ddp(x).sum().backward()2345
2346c10d.destroy_process_group(pg)2347
2348self.assertTrue(OpType.ALLREDUCE in num_hook_fired)2349# The number of allreduce ops depend on DDP internal implementation, but2350# there should be at least one allreduce.2351self.assertTrue(num_hook_fired[OpType.ALLREDUCE] - ctor_allreduce > 0)2352self.assertTrue(all(duration > 0 for duration in chain(*(durations.values()))))2353
2354# Not testing FSDP due to https://github.com/pytorch/pytorch/issues/90848.2355# We cannot disable workCleanupLoop() as hooks are fired in that thread.2356
2357@requires_nccl()2358@skip_if_lt_x_gpu(2)2359def test_on_completion_hook_all_gather_object(self):2360torch.cuda.set_device(self.rank)2361
2362pg = self._get_process_group()2363num_hook_fired: Dict[int, int] = {}2364durations: Dict[OpType, List[float]] = {}2365
2366def hook(work_info: torch._C._distributed_c10d.WorkInfo):2367nonlocal num_hook_fired, durations2368op_type = work_info.op_type2369if op_type not in num_hook_fired:2370num_hook_fired[op_type] = 02371durations[op_type] = []2372num_hook_fired[op_type] += 12373durations[op_type].append(work_info.active_duration.total_seconds())2374
2375pg._register_on_completion_hook(hook)2376
2377obj = {"rank": self.rank, "world_size": self.world_size}2378obj_list = [None for _ in range(self.world_size)]2379
2380c10d.all_gather_object(obj_list, obj, group=pg)2381
2382for r, o in enumerate(obj_list):2383self.assertTrue(isinstance(o, dict))2384self.assertTrue(set(o.keys()), {"rank", "world_size"})2385self.assertEqual(o["rank"], r)2386self.assertEqual(o["world_size"], self.world_size)2387
2388c10d.destroy_process_group(pg)2389
2390self.assertTrue(OpType.ALLGATHER in num_hook_fired)2391self.assertEqual(len(num_hook_fired), 1)2392# two allgathers, one for size and another for values2393self.assertEqual(num_hook_fired[OpType.ALLGATHER], 2)2394self.assertTrue(all(duration > 0 for duration in durations[OpType.ALLGATHER]))2395
2396@requires_nccl()2397@skip_if_lt_x_gpu(2)2398def test_on_completion_hook_seq(self):2399pg = self._get_process_group()2400num_hook_fired = 02401seq: int = -12402work: int = 02403
2404def hook(work_info: torch._C._distributed_c10d.WorkInfo):2405nonlocal num_hook_fired, seq2406num_hook_fired += 12407seq = work_info.seq2408
2409pg._register_on_completion_hook(hook)2410tensor = torch.ones([2, 3]).cuda(self.rank) * self.rank2411work_count = 32412for i in range(work_count):2413work += 12414pg.broadcast([tensor]).wait()2415
2416# N.B.: destroy_process_group is necessary to wait for2417# all pending works to finish.2418c10d.destroy_process_group(pg)2419
2420self.assertEqual(num_hook_fired, work_count)2421self.assertEqual(work, seq)2422
2423
2424class NcclErrorHandlingTest(MultiProcessTestCase):2425def setUp(self):2426super().setUp()2427# Need to skip return code checking for these tests since the child2428# processes don't exit cleanly.2429self.skip_return_code_checks = [2430self.test_nccl_errors_blocking_abort.__wrapped__,2431self.test_nccl_errors_blocking_sigkill.__wrapped__,2432self.test_nccl_errors_blocking_sigterm.__wrapped__,2433self.test_nccl_errors_blocking_nonzero_exit.__wrapped__,2434]2435# TORCH_NCCL_BLOCKING_WAIT overrides TORCH_NCCL_ASYNC_ERROR_HANDLING hence tests2436# that use TORCH_NCCL_BLOCKING_WAIT will test it as expected.2437os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "1"2438self._spawn_processes()2439
2440def tearDown(self):2441super().tearDown()2442try:2443os.remove(self.file_name)2444except OSError:2445pass2446
2447@property2448def op_timeout_sec(self):2449return 32450
2451@property2452def world_size(self):2453return 32454
2455@property2456def blocking_wait_error_msg(self):2457return "timeout"2458
2459def _run_all_reduce(self, pg):2460pg.allreduce(torch.rand(10).cuda(self.rank))2461
2462@requires_nccl()2463@requires_nccl_version((2, 4, 0), "Need NCCL 2.4+ for error checking")2464@skip_if_lt_x_gpu(3)2465@skip_if_rocm2466@skip_but_pass_in_sandcastle("Test does not pass when run locally")2467def test_nccl_errors_nonblocking(self):2468# Note: we unset and restore TORCH_NCCL_ASYNC_ERROR_HANDLING for this test2469# since test_c10d_common runs with async error handling by default, but this2470# tests behavior when it is not enabled.2471prev_nccl_async_error_handling = os.environ.get(2472"TORCH_NCCL_ASYNC_ERROR_HANDLING", None2473)2474os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "0"2475store = c10d.FileStore(self.file_name, self.world_size)2476process_group = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)2477process_group.allreduce(torch.rand(10).cuda(self.rank))2478if self.rank == 0:2479# This allreduce does not block Python thread as allreduce enqueues2480# the cuda operation, and then wait only blocks the current cuda2481# stream.2482work = process_group.allreduce(torch.rand(10).cuda(self.rank))2483work.wait()2484
2485# Now the work scheduled next should hang forever since the previous2486# allreduce will never complete.2487t = threading.Thread(target=self._run_all_reduce, args=(process_group,))2488t.daemon = True2489t.start()2490t.join(int(get_timeout(self.id()) / 5))2491self.assertTrue(t.is_alive())2492
2493if prev_nccl_async_error_handling is not None:2494os.environ[2495"TORCH_NCCL_ASYNC_ERROR_HANDLING"2496] = prev_nccl_async_error_handling2497
2498def _test_nccl_errors_blocking(self, func):2499store = c10d.FileStore(self.file_name, self.world_size)2500process_group = c10d.ProcessGroupNCCL(2501store,2502self.rank,2503self.world_size,2504timeout=timedelta(seconds=10),2505)2506process_group.allreduce(torch.rand(10).cuda(self.rank))2507if self.rank == 0:2508work = process_group.allreduce(torch.rand(10).cuda(self.rank))2509with self.assertRaisesRegex(dist.DistBackendError, ""):2510# It seems the error message would be different depending on2511# whether the test is run on CI machine and devGPU. Skipping2512# the error message check to make both sides happy.2513work.wait(timeout=timedelta(seconds=self.op_timeout_sec))2514# Run some GPU operations to make sure cuda has not gotten stuck.2515# It was observed cuda could get stuck if NCCL communicators were2516# not properly aborted before throwing RuntimeError.2517a = torch.rand(10).cuda(self.rank)2518elif self.rank == 1:2519# Clean up structures (ex: files for FileStore before going down)2520del process_group2521func()2522
2523@with_nccl_blocking_wait2524@requires_nccl()2525@requires_nccl_version((2, 4, 0), "Need NCCL 2.4+ for error checking")2526@skip_if_lt_x_gpu(3)2527@skip_if_rocm2528def test_nccl_errors_blocking_clean_exit(self):2529self._test_nccl_errors_blocking(lambda: sys.exit(0))2530
2531@with_nccl_blocking_wait2532@requires_nccl()2533@requires_nccl_version((2, 4, 0), "Need NCCL 2.4+ for error checking")2534@skip_if_lt_x_gpu(3)2535@skip_if_rocm2536def test_nccl_errors_blocking_nonzero_exit(self):2537self._test_nccl_errors_blocking(lambda: sys.exit(1))2538
2539@with_nccl_blocking_wait2540@requires_nccl()2541@requires_nccl_version((2, 4, 0), "Need NCCL 2.4+ for error checking")2542@skip_if_lt_x_gpu(3)2543@skip_if_rocm2544@skip_but_pass_in_sandcastle(2545"Frequently times out see https://github.com/pytorch/pytorch/issues/58920"2546)2547def test_nccl_errors_blocking_abort(self):2548self._test_nccl_errors_blocking(lambda: os.abort())2549
2550@with_nccl_blocking_wait2551@requires_nccl()2552@requires_nccl_version((2, 4, 0), "Need NCCL 2.4+ for error checking")2553@skip_if_lt_x_gpu(3)2554@skip_if_rocm2555def test_nccl_errors_blocking_sigkill(self):2556self._test_nccl_errors_blocking(lambda: os.kill(os.getpid(), signal.SIGKILL))2557
2558@with_nccl_blocking_wait2559@requires_nccl()2560@requires_nccl_version((2, 4, 0), "Need NCCL 2.4+ for error checking")2561@skip_if_lt_x_gpu(3)2562@skip_if_rocm2563def test_nccl_errors_blocking_sigterm(self):2564self._test_nccl_errors_blocking(lambda: os.kill(os.getpid(), signal.SIGTERM))2565
2566@with_nccl_blocking_wait2567@requires_nccl()2568@requires_nccl_version((2, 4, 0), "Need NCCL 2.4+ for error checking")2569@skip_if_lt_x_gpu(3)2570def test_nccl_blocking_wait_with_barrier(self):2571store = c10d.FileStore(self.file_name, self.world_size)2572process_group = c10d.ProcessGroupNCCL(2573store,2574self.rank,2575self.world_size,2576timeout=timedelta(seconds=10),2577)2578process_group.barrier().wait()2579if self.rank == 0:2580with self.assertRaisesRegex(dist.DistBackendError, ""):2581# It seems the error message would be different depending on2582# whether the test is run on CI machine and devGPU. Skipping2583# the error message check to make both sides happy.2584process_group.barrier().wait(2585timeout=timedelta(seconds=self.op_timeout_sec)2586)2587
2588def _run_invalid_nccl_blocking_wait_env(self, val):2589os.environ["TORCH_NCCL_BLOCKING_WAIT"] = val2590store = c10d.FileStore(self.file_name, self.world_size)2591with self.assertRaises(RuntimeError):2592process_group = c10d.ProcessGroupNCCL(store, self.rank, self.world_size)2593
2594@requires_nccl()2595@skip_if_lt_x_gpu(3)2596def test_invalid_nccl_blocking_wait_env(self):2597self._run_invalid_nccl_blocking_wait_env("abc")2598self._run_invalid_nccl_blocking_wait_env("-1")2599self._run_invalid_nccl_blocking_wait_env("2147483647")2600self._run_invalid_nccl_blocking_wait_env("4294967295")2601
2602@with_nccl_blocking_wait2603@requires_nccl()2604@requires_gloo()2605@skip_if_lt_x_gpu(3)2606def test_nccl_timeout(self):2607store = c10d.FileStore(self.file_name, self.world_size)2608
2609# Initialize process_group.2610process_group = c10d.ProcessGroupNCCL(2611store, self.rank, self.world_size, timeout=timedelta(seconds=10)2612)2613# Control gloo pg used as go-ahead signal/barrier2614# to coordinate btwn ranks.2615pg_gloo = c10d.ProcessGroupGloo(store, self.rank, self.world_size)2616failed_collective_timeout = timedelta(milliseconds=100)2617process_group.allreduce(torch.rand(10).cuda(self.rank)).wait(2618timeout=timedelta(seconds=5)2619)2620
2621if self.rank == 0:2622# This should timeout in about 1 second.2623# Watchdog may abort timed out work resulting in NCCL error instead of operation timed out.2624with self.assertRaisesRegex(2625dist.DistBackendError, self.blocking_wait_error_msg2626):2627process_group.allreduce(torch.rand(10).cuda(self.rank)).wait(2628timeout=failed_collective_timeout2629)2630# Now do a barrier to tell other rank to go ahead.2631pg_gloo.barrier().wait()2632else:2633# Wait on rank 0 to fail.2634try:2635pg_gloo.barrier().wait()2636except Exception as e:2637raise ValueError(2638f"Rank {self.rank} barrier timed out waiting for rank 0 with error: {str(e)}"2639) from e2640
2641
2642class CommTest(test_c10d_common.AbstractCommTest, MultiProcessTestCase):2643@property2644def device(self):2645return f"cuda:{self.rank}"2646
2647def setUp(self):2648super().setUp()2649# TORCH_NCCL_BLOCKING_WAIT overrides TORCH_NCCL_ASYNC_ERROR_HANDLING hence tests2650# that use TORCH_NCCL_BLOCKING_WAIT will test it as expected.2651os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "1"2652self._spawn_processes()2653
2654def tearDown(self):2655super().tearDown()2656try:2657os.remove(self.file_name)2658except OSError:2659pass2660
2661def _test_broadcast_coalesced(self, process_group, device, root_rank):2662half = torch.float162663
2664# No support for float16 for CPU tensors2665if device == torch.device("cpu"):2666half = torch.float322667
2668target = torch.arange(60, dtype=half, device=device).chunk(5)2669target += torch.arange(60, dtype=torch.float32, device=device).chunk(5)2670target += torch.arange(60, dtype=half, device=device).chunk(5)2671target += torch.arange(60, dtype=torch.float64, device=device).chunk(5)2672target += torch.arange(60, dtype=half, device=device).chunk(5)2673target += torch.arange(60, dtype=torch.float32, device=device).chunk(5)2674
2675# The tensors to pass to broadcast are identical to the target2676# only on the process that is the root of the broadcast.2677if self.rank == root_rank:2678tensors = [tensor.clone() for tensor in target]2679else:2680tensors = [torch.zeros_like(tensor) for tensor in target]2681
2682if self.rank != root_rank:2683self.assertNotEqual(tensors, target)2684
2685c10d._broadcast_coalesced(2686process_group, tensors, buffer_size=256, src=root_rank2687)2688
2689if self.rank != root_rank:2690self.assertEqual(tensors, target)2691
2692@requires_nccl()2693@skip_if_lt_x_gpu(2)2694def test_broadcast_coalesced_nccl(self):2695store = c10d.FileStore(self.file_name, self.world_size)2696c10d.init_process_group(2697backend="nccl", store=store, rank=self.rank, world_size=self.world_size2698)2699process_group = c10d.distributed_c10d._get_default_group()2700device = torch.device("cuda:%d" % self.rank)2701ranks = [0, 1]2702for root_rank in ranks:2703self._test_broadcast_coalesced(process_group, device, root_rank)2704
2705@requires_nccl()2706@skip_if_lt_x_gpu(2)2707def test_all_reduce_coalesced_nccl(self):2708store = c10d.FileStore(self.file_name, self.world_size)2709c10d.init_process_group(2710backend="nccl", store=store, rank=self.rank, world_size=self.world_size2711)2712process_group = c10d.distributed_c10d._get_default_group()2713device = torch.device("cuda:%d" % self.rank)2714tensors = [2715torch.full((60 + i,), self.rank + 1 + i, device=device, dtype=torch.float)2716for i in range(5)2717]2718torch.distributed.all_reduce_coalesced(tensors, group=process_group)2719for i, t in enumerate(tensors):2720self.assertEqual(2721t,2722torch.full_like(2723t, self.world_size * (i + (self.world_size + 1.0) / 2.0)2724),2725)2726
2727@requires_nccl()2728@skip_if_lt_x_gpu(2)2729def test_all_reduce_coalesced_nccl_float8_errors(self):2730store = c10d.FileStore(self.file_name, self.world_size)2731c10d.init_process_group(2732backend="nccl", store=store, rank=self.rank, world_size=self.world_size2733)2734process_group = c10d.distributed_c10d._get_default_group()2735device = torch.device("cuda:%d" % self.rank)2736tensors = [2737torch.full(2738(60 + i,), self.rank + 1 + i, device=device, dtype=torch.float2739).to(torch.float8_e4m3fn)2740for i in range(5)2741]2742with self.assertRaisesRegex(2743RuntimeError,2744"Float8 dtypes are not currenlty supported for NCCL reductions",2745):2746torch.distributed.all_reduce_coalesced(tensors, group=process_group)2747
2748@requires_nccl()2749@skip_if_lt_x_gpu(2)2750def test_all_reduce_coalesced_manager_nccl(self):2751store = c10d.FileStore(self.file_name, self.world_size)2752c10d.init_process_group(2753backend="nccl", store=store, rank=self.rank, world_size=self.world_size2754)2755process_group = c10d.distributed_c10d._get_default_group()2756device = torch.device("cuda:%d" % self.rank)2757tensors = [2758torch.full((60 + i,), self.rank + 1 + i, device=device, dtype=torch.float)2759for i in range(5)2760]2761with torch.distributed._coalescing_manager(2762group=process_group, device=device, async_ops=True2763) as cm:2764for tensor in tensors:2765torch.distributed.all_reduce(tensor)2766self.assertEqual(len(cm.works), 1)2767cm.wait()2768for i, t in enumerate(tensors):2769self.assertEqual(2770t,2771torch.full_like(2772t, self.world_size * (i + (self.world_size + 1.0) / 2.0)2773),2774)2775
2776@requires_nccl()2777@skip_if_lt_x_gpu(2)2778@skip_if_rocm2779def test_intra_node_comm_all_reduce(self):2780from torch._C._distributed_c10d import _get_intra_node_comm_usage_counter2781from torch.testing._internal.common_cuda import SM80OrLater2782
2783for peer in range(self.world_size):2784if peer == self.rank:2785continue2786if not torch._C._cuda_canDeviceAccessPeer(self.rank, peer):2787raise SkipTest("Test requires p2p access")2788
2789if not SM80OrLater:2790raise SkipTest("Test requires sm>=80")2791
2792store = c10d.FileStore(self.file_name, self.world_size)2793os.environ["ENABLE_INTRA_NODE_COMM"] = "1"2794os.environ["TEST_INTRA_NODE_COMM"] = "1"2795torch.cuda.set_device(self.rank)2796c10d.init_process_group(2797backend="nccl", rank=self.rank, world_size=self.world_size, store=store2798)2799expect = self.world_size * (self.world_size - 1) // 22800
2801# IntraNodeComm currently only supports sum and bf16.2802# Verify that it is not used in the next two configurations.2803t = torch.full((4 * 1024 // 2,), self.rank).cuda()2804c10d.all_reduce(t, c10d.ReduceOp.SUM)2805self.assertTrue(t.eq(expect).all())2806self.assertEqual(_get_intra_node_comm_usage_counter(), 0)2807
2808t = torch.full((4 * 1024 // 2,), self.rank, dtype=torch.bfloat16).cuda()2809c10d.all_reduce(t, c10d.ReduceOp.AVG)2810self.assertEqual(_get_intra_node_comm_usage_counter(), 0)2811
2812# Verify that IntraNodeComm is used up to 10MB2813t = torch.full((4 * 1024 // 2,), self.rank, dtype=torch.bfloat16).cuda()2814c10d.all_reduce(t, c10d.ReduceOp.SUM)2815self.assertTrue(t.eq(expect).all())2816self.assertEqual(_get_intra_node_comm_usage_counter(), 1)2817
2818t = torch.full((512 * 1024 // 2,), self.rank, dtype=torch.bfloat16).cuda()2819c10d.all_reduce(t, c10d.ReduceOp.SUM)2820self.assertTrue(t.eq(expect).all())2821self.assertEqual(_get_intra_node_comm_usage_counter(), 2)2822
2823t = torch.full((10 * 1024**2 // 2,), self.rank, dtype=torch.bfloat16).cuda()2824c10d.all_reduce(t, c10d.ReduceOp.SUM)2825self.assertTrue(t.eq(expect).all())2826self.assertEqual(_get_intra_node_comm_usage_counter(), 3)2827
2828# Verify that IntraNodeComm is not used beyond 10MB2829t = torch.full(2830(10 * 1024**2 // 2 + 1,), self.rank, dtype=torch.bfloat162831).cuda()2832c10d.all_reduce(t, c10d.ReduceOp.SUM)2833self.assertTrue(t.eq(expect).all())2834self.assertEqual(_get_intra_node_comm_usage_counter(), 3)2835
2836c10d.destroy_process_group()2837
2838@requires_nccl()2839@skip_if_lt_x_gpu(2)2840def test_sequence_num_set_default_pg_nccl(self):2841torch.cuda.set_device(self.rank)2842self._test_sequence_num_set_default_pg(backend="nccl")2843
2844@skip_if_lt_x_gpu(2)2845@requires_nccl()2846def test_sequence_num_incremented_nccl_default(self):2847self._test_sequence_num_incremented_default_group("nccl")2848
2849@skip_if_lt_x_gpu(4)2850@requires_nccl()2851def test_sequence_num_incremented_nccl_subgroup(self):2852if self.world_size < 4:2853return skip_but_pass_in_sandcastle("Test requires world_size of at least 4")2854self._test_sequence_num_incremented_subgroup("nccl")2855
2856@requires_nccl()2857@skip_if_lt_x_gpu(2)2858def test_sequence_num_set_nccl_new_group(self):2859torch.cuda.set_device(self.rank)2860self._test_sequence_num_set_new_group(backend="nccl")2861
2862def _test_pass_nccl_options(self, pg_opts):2863store = c10d.FileStore(self.file_name, self.world_size)2864# Test init_process_group accepts options2865dist.init_process_group(2866"nccl",2867world_size=self.world_size,2868rank=self.rank,2869store=store,2870pg_options=pg_opts,2871)2872
2873# Test with new_group2874pg = c10d.new_group([0, 1], pg_options=pg_opts)2875# test the process group works as expected2876t = torch.tensor([self.rank + 1] * 10).cuda(self.rank)2877pg.allreduce(t).wait()2878expected_tensor = torch.tensor([3] * 10).cuda(self.rank)2879self.assertEqual(expected_tensor, t)2880
2881@requires_nccl()2882@skip_if_lt_x_gpu(2)2883def test_pass_nccl_options_high_priority_stream(self):2884pg_opts = c10d.ProcessGroupNCCL.Options()2885pg_opts.is_high_priority_stream = True2886self._test_pass_nccl_options(pg_opts)2887
2888@requires_nccl()2889@requires_nccl_version(2890(2, 18), "Need NCCL 2.17+ for configuring NCCL communicators"2891)2892@skip_if_lt_x_gpu(2)2893def test_pass_nccl_options_config(self):2894pg_opts = c10d.ProcessGroupNCCL.Options()2895pg_opts.config.max_ctas = 42896pg_opts.config.min_ctas = 22897pg_opts.config.cga_cluster_size = 22898pg_opts.config.net_name = "Socket"2899pg_opts.config.split_share = 12900nccl_debug_file = tempfile.NamedTemporaryFile()2901os.environ["NCCL_DEBUG"] = "INFO"2902os.environ["NCCL_DEBUG_FILE"] = nccl_debug_file.name2903
2904# Tests functionality when passing nccl config2905self._test_pass_nccl_options(pg_opts)2906
2907# Tests if comms were configured2908nccl_debug_file_content = nccl_debug_file.read()2909max_ctas = re.search(rb"Max CTAs.*(\d+)|$", nccl_debug_file_content).group(1)2910min_ctas = re.search(rb"Min CTAs.*(\d+)|$", nccl_debug_file_content).group(1)2911split_share = re.search(2912rb"Split share.*(\d+)|$", nccl_debug_file_content2913).group(1)2914cga_cluster_size = re.search(2915rb"CGA cluster.*(\d+)|$", nccl_debug_file_content2916).group(1)2917net_name = re.search(2918rb"Using network.([a-zA-z]+)|$", nccl_debug_file_content2919).group(1)2920self.assertEqual(pg_opts.config.max_ctas, int(max_ctas))2921self.assertEqual(pg_opts.config.min_ctas, int(min_ctas))2922self.assertEqual(pg_opts.config.cga_cluster_size, int(cga_cluster_size))2923self.assertEqual(pg_opts.config.net_name, net_name.decode())2924self.assertEqual(pg_opts.config.split_share, int(split_share))2925
2926@requires_nccl()2927@skip_if_lt_x_gpu(4)2928def test_nccl_barrier(self):2929store = c10d.FileStore(self.file_name, self.world_size)2930c10d.init_process_group(2931backend="nccl", rank=self.rank, world_size=self.world_size, store=store2932)2933
2934t = torch.tensor([self.rank + 1] * 10).cuda(2 * self.rank)2935c10d.all_reduce(t)2936expected_tensor = torch.tensor([3] * 10).cuda(2 * self.rank)2937self.assertEqual(expected_tensor, t)2938
2939# Test with new_group2940pg = c10d.new_group([0, 1])2941t = torch.tensor([self.rank + 1] * 10).cuda(2 * self.rank)2942pg.allreduce(t).wait()2943self.assertEqual(expected_tensor, t)2944
2945pg = c10d.new_group([0])2946if self.rank == 0:2947t = torch.tensor([self.rank + 1] * 10).cuda(2 * self.rank)2948expected_tensor = torch.tensor([self.rank + 1] * 10).cuda(2 * self.rank)2949pg.allreduce(t).wait()2950self.assertEqual(expected_tensor, t)2951
2952pg = c10d.new_group([1])2953if self.rank == 1:2954t = torch.tensor([self.rank + 1] * 10).cuda(2 * self.rank)2955expected_tensor = torch.tensor([self.rank + 1] * 10).cuda(2 * self.rank)2956pg.allreduce(t).wait()2957self.assertEqual(expected_tensor, t)2958
2959@requires_nccl()2960@skip_if_lt_x_gpu(2)2961def test_nccl_barrier_device_ids(self):2962store = c10d.FileStore(self.file_name, self.world_size)2963c10d.init_process_group(2964backend="nccl", rank=self.rank, world_size=self.world_size, store=store2965)2966
2967c10d.barrier(device_ids=[self.rank])2968
2969@requires_nccl()2970@skip_if_lt_x_gpu(2)2971def test_nccl_barrier_device_ids_function_argument(self):2972store = c10d.FileStore(self.file_name, self.world_size)2973c10d.init_process_group(2974backend="nccl", rank=self.rank, world_size=self.world_size, store=store2975)2976
2977with self.assertRaisesRegex(TypeError, "Invalid function argument"):2978c10d.barrier(device_ids=self.rank)2979
2980@requires_nccl()2981@skip_if_lt_x_gpu(2)2982@with_dist_debug_levels(levels=["DETAIL"])2983def test_nccl_warn_not_in_group_debug_detail(self):2984self._test_warn_not_in_group(backend="nccl")2985
2986@requires_nccl()2987@skip_if_lt_x_gpu(2)2988@with_dist_debug_levels(levels=["INFO"])2989def test_nccl_warn_not_in_group_debug_info(self):2990self._test_warn_not_in_group(backend="nccl")2991
2992@requires_nccl()2993@skip_if_lt_x_gpu(2)2994@with_dist_debug_levels(levels=["OFF"])2995def test_nccl_warn_not_in_group_debug_off(self):2996self._test_warn_not_in_group(backend="nccl")2997
2998@requires_nccl()2999@skip_if_lt_x_gpu(2)3000def test_nncl_rank_membership(self):3001self._test_rank_membership(backend="nccl")3002
3003@requires_nccl()3004@skip_if_lt_x_gpu(2)3005def test_tensor_dtype_mismatch(self):3006self._test_tensor_dtype_mismatch(backend="nccl")3007
3008@requires_nccl()3009@skip_if_lt_x_gpu(2)3010def test_tensor_dtype_complex(self):3011self._test_tensor_dtype_complex(backend="nccl")3012
3013@requires_nccl()3014@skip_if_lt_x_gpu(2)3015def test_reduce_scatter_base_k(self):3016store = dist.FileStore(self.file_name, self.world_size)3017dist.init_process_group(3018"nccl",3019world_size=self.world_size,3020rank=self.rank,3021store=store,3022)3023output_tensor = torch.zeros(2, dtype=torch.int64).to(self.rank)3024input_tensors = torch.arange(self.world_size * 2, dtype=torch.int64).to(3025self.rank3026)3027input_tensors = torch.reshape(input_tensors, (self.world_size, 2))3028dist.reduce_scatter_tensor(output_tensor, input_tensors)3029self.assertEqual(output_tensor, input_tensors[self.rank] * self.world_size)3030
3031@requires_nccl()3032@skip_if_lt_x_gpu(2)3033def test_reduce_scatter_tensor_coalesced(self):3034store = dist.FileStore(self.file_name, self.world_size)3035dist.init_process_group(3036"nccl",3037world_size=self.world_size,3038rank=self.rank,3039store=store,3040)3041output_tensors = torch.zeros(2, 2).to(self.rank)3042input_tensors = [torch.ones(2, 2).to(self.rank) for _ in range(self.world_size)]3043with dist._coalescing_manager():3044for i in range(self.world_size):3045dist.reduce_scatter_tensor(output_tensors[i], input_tensors[i])3046self.assertEqual(output_tensors, input_tensors[self.rank] * self.world_size)3047
3048@requires_nccl()3049@skip_if_lt_x_gpu(2)3050def test_reduce_scatter_base_k_float8_errors(self):3051store = dist.FileStore(self.file_name, self.world_size)3052dist.init_process_group(3053"nccl",3054world_size=self.world_size,3055rank=self.rank,3056store=store,3057)3058output_tensor = (3059torch.zeros(2, dtype=torch.float32).to(torch.float8_e4m3fn).to(self.rank)3060)3061input_tensors = (3062torch.arange(self.world_size * 2, dtype=torch.float32)3063.to(torch.float8_e4m3fn)3064.to(self.rank)3065)3066input_tensors = torch.reshape(input_tensors, (self.world_size, 2))3067with self.assertRaisesRegex(3068RuntimeError,3069"Float8 dtypes are not currenlty supported for NCCL reductions",3070):3071dist.reduce_scatter_tensor(output_tensor, input_tensors)3072
3073@requires_nccl()3074@skip_if_lt_x_gpu(2)3075def test_reduce_scatter_tensor_coalesced_float8_errors(self):3076store = dist.FileStore(self.file_name, self.world_size)3077dist.init_process_group(3078"nccl",3079world_size=self.world_size,3080rank=self.rank,3081store=store,3082)3083output_tensors = torch.zeros(2, 2).to(torch.float8_e5m2).to(self.rank)3084input_tensors = [3085torch.ones(2, 2).to(torch.float8_e5m2).to(self.rank)3086for _ in range(self.world_size)3087]3088
3089with self.assertRaisesRegex(3090RuntimeError,3091"Float8 dtypes are not currenlty supported for NCCL reductions",3092):3093with dist._coalescing_manager():3094for i in range(self.world_size):3095dist.reduce_scatter_tensor(output_tensors[i], input_tensors[i])3096self.assertEqual(output_tensors, input_tensors[self.rank])3097
3098
3099class SetDeviceMethod(Enum):3100TORCH_CUDA_SET = auto() # torch.cuda.set_device3101COLLECTIVE_ARGUMENT = auto() # broadcast_object_list(device=)3102
3103
3104class NcclProcessGroupWithDispatchedCollectivesTests(3105test_c10d_common.ProcessGroupWithDispatchedCollectivesTests3106):3107@requires_nccl()3108@skip_if_lt_x_gpu(1)3109def test_collectives(self):3110self._test_collectives(backend="nccl")3111
3112@requires_nccl()3113@skip_if_lt_x_gpu(1)3114def test_allreduce_coalesced(self):3115self._test_allreduce_coalesced(backend="nccl")3116
3117@requires_nccl()3118@skip_if_lt_x_gpu(1)3119def test_all_to_all_single(self):3120self._test_all_to_all_single(backend="nccl")3121
3122@requires_nccl()3123@skip_if_lt_x_gpu(1)3124def test_allgather_base(self):3125store = dist.FileStore(self.file_name, self.world_size)3126dist.init_process_group(3127"nccl",3128world_size=self.world_size,3129rank=self.rank,3130store=store,3131)3132device = "cuda"3133tensor = torch.ones(10, 10, device=torch.device(device))3134output_tensor = torch.zeros(10, 10, device=torch.device(device))3135dist.all_gather_into_tensor(output_tensor, tensor)3136self.assertEqual(output_tensor, tensor)3137
3138@requires_nccl()3139@skip_if_lt_x_gpu(1)3140@parametrize("float8_dtype", [torch.float8_e4m3fn, torch.float8_e5m2])3141def test_allgather_float8(self, float8_dtype):3142store = dist.FileStore(self.file_name, self.world_size)3143dist.init_process_group(3144"nccl",3145world_size=self.world_size,3146rank=self.rank,3147store=store,3148)3149device = "cuda"3150tensor = torch.ones(10, 16, device=torch.device(device)).to(float8_dtype)3151output_tensor = torch.zeros(10, 16, device=torch.device(device)).to(3152float8_dtype
3153)3154dist.all_gather_into_tensor(output_tensor, tensor)3155self.assertEqual(output_tensor.view(torch.float32), tensor.view(torch.float32))3156
3157
3158instantiate_parametrized_tests(NcclProcessGroupWithDispatchedCollectivesTests)3159
3160
3161class LargeCommTest(test_c10d_common.AbstractLargeCommTest, MultiProcessTestCase):3162def setUp(self):3163super().setUp()3164# TORCH_NCCL_BLOCKING_WAIT overrides TORCH_NCCL_ASYNC_ERROR_HANDLING hence tests3165# that use TORCH_NCCL_BLOCKING_WAIT will test it as expected.3166os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "1"3167self._spawn_processes()3168
3169def tearDown(self):3170super().tearDown()3171try:3172os.remove(self.file_name)3173except OSError:3174pass3175
3176@property3177def device(self):3178return self.rank3179
3180@requires_nccl()3181@skip_if_lt_x_gpu(4)3182def test_new_group_local_sync(self):3183self._test_new_group_local_sync(backend="nccl")3184
3185@requires_nccl()3186@skip_if_lt_x_gpu(4)3187def test_new_group_local_sync_sanity_check(self):3188self._test_new_group_local_sync_sanity_check(backend="nccl")3189
3190@requires_nccl()3191@skip_if_lt_x_gpu(4)3192def test_new_group_local_sync_duplicated_pg(self):3193self._test_new_group_local_sync_duplicate_pg(backend="nccl")3194
3195def _init_two_pg2_subgroups(self, world_size: int = 4):3196if world_size != 4:3197raise NotImplementedError(3198f"need world size of 4 to get 2 subgroup PGs, but got world size of {world_size}"3199)3200store = c10d.FileStore(self.file_name, world_size)3201c10d.init_process_group(3202backend="nccl", store=store, rank=self.rank, world_size=world_size3203)3204# every rank creates the same sub groups3205# including unused sub groups in the current rank3206a_group = c10d.new_group([0, 1])3207b_group = c10d.new_group([2, 3])3208return a_group if self.rank < 2 else b_group3209
3210@requires_nccl()3211@skip_if_lt_x_gpu(4)3212def test_gather_subgroup(self):3213world_size = 43214if self.rank >= world_size:3215# just easier to write the test for exactly 4 gpus, even if this test class increased to 8gpu later3216return3217
3218subgroup = self._init_two_pg2_subgroups(world_size)3219device = torch.device("cuda:%d" % self.rank)3220input = torch.ones((10,), device=device) * self.rank3221if self.rank == 0 or self.rank == 2:3222gather_list = [torch.empty_like(input) for _ in range(subgroup.size())]3223torch.distributed.gather(3224input,3225gather_list=gather_list,3226dst=self.rank,3227group=subgroup,3228async_op=False,3229)3230for src in range(len(gather_list)):3231expected = (torch.ones_like(input) * self.rank) + src3232self.assertEqual(gather_list[src], expected)3233else:3234torch.distributed.gather(3235input,3236gather_list=None,3237dst=self.rank - 1,3238group=subgroup,3239async_op=False,3240)3241
3242@requires_nccl()3243@skip_if_lt_x_gpu(4)3244def test_gather_object_subgroup(self):3245world_size = 43246if self.rank >= world_size:3247# just easier to write the test for exactly 4 gpus, even if this test class increased to 8gpu later3248return3249
3250subgroup = self._init_two_pg2_subgroups(world_size)3251
3252# discrepancy #13253# have to set device or else gather_object gets wrong device from 'current_device = _get_pg_default_device(group)3254torch.cuda.set_device(self.rank)3255
3256input = {"rank": self.rank}3257if self.rank == 0 or self.rank == 2:3258# discrepancy #23259# another weird thing- what's the point of making me specify some empty objects in my list?3260# empty list should be valid imo. (but it throws an error)3261gather_list = [{}, {}]3262torch.distributed.gather_object(3263input, object_gather_list=gather_list, dst=self.rank, group=subgroup3264)3265for src in range(len(gather_list)):3266self.assertEqual(gather_list[src]["rank"], self.rank + src)3267else:3268torch.distributed.gather_object(3269input, object_gather_list=None, dst=self.rank - 1, group=subgroup3270)3271
3272@requires_nccl()3273@skip_if_lt_x_gpu(4)3274def test_reduce_subgroup(self):3275world_size = 43276if self.rank >= world_size:3277return3278subgroup = self._init_two_pg2_subgroups(world_size)3279device = torch.device("cuda:%d" % self.rank)3280x = torch.ones((10,), device=device) * self.rank3281if self.rank == 0 or self.rank == 2:3282expected = x + torch.ones((10,), device=device) * (self.rank + 1)3283c10d.reduce(x, dst=self.rank, group=subgroup, async_op=False)3284self.assertEqual(x, expected)3285else:3286c10d.reduce(x, dst=self.rank - 1, group=subgroup, async_op=False)3287
3288@requires_nccl()3289@skip_if_lt_x_gpu(4)3290@parametrize("async_op", [True, False])3291def test_send_recv_subgroup(self, async_op):3292world_size = 43293if self.rank >= world_size:3294return3295subgroup = self._init_two_pg2_subgroups(world_size)3296device = torch.device("cuda:%d" % self.rank)3297if self.rank == 0 or self.rank == 2:3298x = torch.empty((10,), device=device)3299if async_op:3300c10d.irecv(x, src=self.rank + 1, group=subgroup).wait()3301else:3302c10d.recv(x, src=self.rank + 1, group=subgroup)3303expected = torch.ones((10,), device=device) * (self.rank + 1)3304self.assertEqual(x, expected)3305else:3306x = torch.ones((10,), device=device) * self.rank3307if async_op:3308c10d.isend(x, dst=self.rank - 1, group=subgroup).wait()3309else:3310c10d.send(x, dst=self.rank - 1, group=subgroup)3311
3312@requires_nccl()3313@skip_if_lt_x_gpu(4)3314def test_broadcast_subgroup(self):3315world_size = 43316if self.rank >= world_size:3317return3318subgroup = self._init_two_pg2_subgroups(world_size)3319device = torch.device("cuda:%d" % self.rank)3320if self.rank == 0 or self.rank == 2:3321x = torch.empty((10,), device=device)3322c10d.broadcast(x, src=self.rank + 1, group=subgroup)3323expected = torch.ones((10,), device=device) * (self.rank + 1)3324self.assertEqual(x, expected)3325else:3326x = torch.ones((10,), device=device) * self.rank3327c10d.broadcast(x, src=self.rank, group=subgroup)3328
3329@requires_nccl()3330@skip_if_lt_x_gpu(4)3331@parametrize(3332"set_device",3333[SetDeviceMethod.TORCH_CUDA_SET, SetDeviceMethod.COLLECTIVE_ARGUMENT],3334)3335def test_send_recv_object_list_subgroup(self, set_device: SetDeviceMethod):3336world_size = 43337if self.rank >= world_size:3338return3339subgroup = self._init_two_pg2_subgroups(world_size)3340if set_device == SetDeviceMethod.TORCH_CUDA_SET:3341torch.cuda.set_device(self.rank)3342device = None3343else:3344device = torch.device("cuda:%d" % self.rank)3345if self.rank == 0 or self.rank == 2:3346x = [{}]3347c10d.recv_object_list(x, src=self.rank + 1, group=subgroup, device=device)3348expected = [{"rank": self.rank + 1}]3349self.assertEqual(x, expected)3350else:3351x = [{"rank": self.rank}]3352c10d.send_object_list(x, dst=self.rank - 1, group=subgroup, device=device)3353
3354@requires_nccl()3355@skip_if_lt_x_gpu(4)3356@parametrize(3357"set_device",3358[SetDeviceMethod.TORCH_CUDA_SET, SetDeviceMethod.COLLECTIVE_ARGUMENT],3359)3360def test_broadcast_object_list_subgroup(self, set_device: SetDeviceMethod):3361world_size = 43362if self.rank >= world_size:3363return3364subgroup = self._init_two_pg2_subgroups(world_size)3365if set_device == SetDeviceMethod.TORCH_CUDA_SET:3366torch.cuda.set_device(self.rank)3367device = None3368else:3369device = torch.device("cuda:%d" % self.rank)3370if self.rank == 0 or self.rank == 2:3371x = [{}]3372c10d.broadcast_object_list(3373x, src=self.rank + 1, group=subgroup, device=device3374)3375expected = [{"rank": self.rank + 1}]3376self.assertEqual(x, expected)3377else:3378x = [{"rank": self.rank}]3379c10d.broadcast_object_list(x, src=self.rank, group=subgroup, device=device)3380
3381@requires_nccl()3382@skip_if_lt_x_gpu(4)3383def test_scatter_subgroup(self):3384world_size = 43385if self.rank >= world_size:3386return3387subgroup = self._init_two_pg2_subgroups(world_size)3388device = torch.device("cuda:%d" % self.rank)3389x = torch.empty((10,), device=device)3390expected = torch.ones((10,), device=device) * self.rank3391if self.rank == 0 or self.rank == 2:3392c10d.scatter(x, scatter_list=None, src=self.rank + 1, group=subgroup)3393else:3394scatter_list = [3395torch.ones((10,), device=device) * (self.rank - 1),3396torch.ones((10,), device=device) * self.rank,3397]3398c10d.scatter(x, scatter_list=scatter_list, src=self.rank, group=subgroup)3399self.assertEqual(x, expected)3400
3401@requires_nccl()3402@skip_if_lt_x_gpu(4)3403def test_scatter_object_list_subgroup(self):3404world_size = 43405if self.rank >= world_size:3406return3407subgroup = self._init_two_pg2_subgroups(world_size)3408torch.cuda.set_device(self.rank)3409scatter_object_output_list = [None]3410expected = [{"rank": self.rank}]3411if self.rank == 0 or self.rank == 2:3412c10d.scatter_object_list(3413scatter_object_output_list=scatter_object_output_list,3414scatter_object_input_list=None,3415src=self.rank + 1,3416group=subgroup,3417)3418
3419else:3420scatter_object_input_list = [3421{"rank": self.rank - 1},3422{"rank": self.rank},3423]3424c10d.scatter_object_list(3425scatter_object_output_list=scatter_object_output_list,3426scatter_object_input_list=scatter_object_input_list,3427src=self.rank,3428group=subgroup,3429)3430self.assertEqual(scatter_object_output_list, expected)3431
3432
3433instantiate_parametrized_tests(LargeCommTest)3434
3435
3436class SparseCollective(MultiProcessTestCase):3437@property3438def world_size(self):3439return 13440
3441def setUp(self):3442super().setUp()3443# TORCH_NCCL_BLOCKING_WAIT overrides TORCH_NCCL_ASYNC_ERROR_HANDLING hence tests3444# that use TORCH_NCCL_BLOCKING_WAIT will test it as expected.3445os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "1"3446# self.num_gpus = torch.cuda.device_count()3447self._spawn_processes()3448
3449def tearDown(self):3450super().tearDown()3451try:3452os.remove(self.file_name)3453except OSError:3454pass3455
3456class ToyModel(nn.Module):3457def __init__(self, rank, vocab_size, embedding_dim):3458super().__init__()3459self.embedding = nn.Embedding(vocab_size, embedding_dim, sparse=True).to(3460rank
3461)3462self.linear = nn.Linear(embedding_dim, 1).to(rank)3463
3464def forward(self, inputs):3465embedded = self.embedding(inputs)3466# embedded shape: (batch_size, sequence_length, embedding_dim)3467flattened = torch.mean(embedded, dim=1)3468# flattened shape: (batch_size, embedding_dim)3469output = self.linear(flattened)3470# output shape: (batch_size, 1)3471return output3472
3473@requires_nccl()3474@skip_if_lt_x_gpu(1)3475def test_ddp_set_sparse_metadata(self):3476store = dist.FileStore(self.file_name, self.world_size)3477dist.init_process_group(3478"nccl",3479world_size=self.world_size,3480rank=self.rank,3481store=store,3482)3483
3484vocab_size = 53485
3486model = SparseCollective.ToyModel(3487self.rank, vocab_size=vocab_size, embedding_dim=103488)3489ddp_model = DistributedDataParallel(model)3490inputs = torch.tensor([[1, 0, 0], [0, 0, 0], [0, 0, 0]]).to(self.rank)3491# set sparse metadata on the DDP model3492indices = torch.Tensor(list(range(vocab_size)))3493ddp_model._set_sparse_metadata({"embedding.weight": indices})3494# forward pass3495try:3496output = ddp_model(inputs)3497loss = output.sum()3498
3499# backward pass3500loss.backward()3501self.assertTrue(ddp_model.module.embedding.weight.grad.indices, indices)3502except RuntimeError as e:3503if "NCCL does not support all_reduce with sparse tensors" in str(e):3504pass3505else:3506# Rethrow the exception if it's a different error3507raise3508
3509
3510class NCCLTraceTestBase(MultiProcessTestCase):3511def setUp(self):3512super().setUp()3513os.environ[3514"TORCH_NCCL_ENABLE_TIMING"3515] = "0" # see 'timing_enabled' parametrized tests3516os.environ["TORCH_NCCL_TRACE_BUFFER_SIZE"] = "1000"3517os.environ["TORCH_NCCL_DUMP_ON_TIMEOUT"] = "1"3518self.tempdir = tempfile.TemporaryDirectory()3519os.environ["TORCH_NCCL_DEBUG_INFO_TEMP_FILE"] = self._trace_basename()3520os.environ["TORCH_NCCL_DEBUG_INFO_PIPE_FILE"] = self._trace_basename()3521self._spawn_processes()3522
3523@classmethod3524def _run(3525cls,3526parent_conn,3527rank: int,3528test_name: str,3529file_name: str,3530parent_pipe,3531**kwargs,3532) -> None:3533cls.parent = parent_conn3534super()._run(rank, test_name, file_name, parent_pipe)3535
3536@property3537def local_device(self):3538return torch.device("cuda", self.rank_to_GPU[self.rank][0])3539
3540def _join_processes(self, fn):3541# We need to patch sys.exit() as skip_if will use sys.exit() and3542# the exit code from the this process will not be catched.3543with mock.patch("sys.exit") as exit_mock:3544fn()3545super()._join_processes(fn)3546
3547def _spawn_processes(self) -> None:3548proc = torch.multiprocessing.get_context("spawn").Process3549self.children_pipes = []3550parent_pipes = []3551for i in range(self.world_size):3552parent_conn, child_conn = torch.multiprocessing.Pipe()3553self.children_pipes.append(child_conn)3554parent_pipes.append(parent_conn)3555piter = iter(parent_pipes)3556
3557def wrap(*positional, args, **kwargs):3558args = (next(piter), *args)3559return proc(*positional, args=args, **kwargs)3560
3561self._start_processes(wrap)3562
3563def _create_process_group_nccl(self):3564store = dist.FileStore(self.file_name, self.world_size)3565c10d.init_process_group(3566"nccl", world_size=self.world_size, rank=self.rank, store=store3567)3568pg = c10d.distributed_c10d._get_default_group()3569return pg3570
3571def tearDown(self):3572super().tearDown()3573try:3574os.remove(self.file_name)3575except OSError:3576pass3577
3578@property3579def world_size(self):3580return 23581
3582@property3583def rank_to_GPU(self):3584# return rank to GPU map3585return init_multigpu_helper(self.world_size, "nccl")3586
3587def _trace_basename(self):3588# we pass the base to the env, and the dump util will append rank3589return os.path.join(self.tempdir.name, "trace_")3590
3591def _trace_name(self, rank):3592return self._trace_basename() + str(rank)3593
3594def started_or_scheduled(self, timing_enabled):3595return "started" if timing_enabled else "scheduled"3596
3597
3598class NCCLTraceTest(NCCLTraceTestBase):3599def _verify_trace(self, t, include_collectives, timing_enabled, is_json):3600ver = t["version"]3601self.assertEqual(ver, "2.3")3602pg_config = t["pg_config"]3603self.assertEqual(len(pg_config), 1)3604default_pg_info = pg_config["0"]3605self.assertIn("name", default_pg_info)3606self.assertIn("desc", default_pg_info)3607self.assertIn("ranks", default_pg_info)3608pg_status = t["pg_status"]3609self.assertEqual(len(pg_status), 1)3610self.assertEqual(str(pg_status["0"]["last_enqueued_collective"]), "2")3611self.assertEqual(str(pg_status["0"]["last_completed_collective"]), "2")3612self.assertEqual(3613str(pg_status["0"]["last_started_collective"]),3614"2" if timing_enabled else "-1",3615)3616global_ranks = pg_config["0"]["ranks"]3617self.assertEqual(len(json.loads(global_ranks)), self.world_size)3618if include_collectives:3619self.assertEqual(len(t["entries"]), 2)3620t = t["entries"]3621last = t[-1]3622self.assertEqual(last["process_group"], ("0", "default_pg"))3623self.assertEqual(last["state"], "completed")3624s = last["time_discovered_started_ns"]3625f = last["time_discovered_completed_ns"]3626self.assertEqual(last["record_id"], 1)3627self.assertIsNotNone(f)3628if timing_enabled:3629self.assertIsNotNone(s)3630self.assertTrue(s <= f)3631# we don't collect stack traces in JSON at the moment3632if not is_json:3633self.assertIn("test_c10d_nccl.py", str(last["frames"]))3634self.assertEqual(last["input_sizes"], ((3, 4),))3635self.assertEqual(last["input_dtypes"], ["Float"])3636self.assertEqual(last["output_sizes"], ((3, 4),))3637self.assertEqual(last["output_dtypes"], ["Float"])3638self.assertEqual(last["collective_seq_id"], 2)3639self.assertEqual(last["timeout_ms"], 600000)3640now = datetime.now()3641event_created_time = datetime.fromtimestamp(3642last["time_created_ns"] / 10000000003643)3644before_test = now - timedelta(minutes=1)3645self.assertTrue(before_test < event_created_time < now)3646if timing_enabled:3647# very loose bounds, measured 0.036 ms on devgpu3648self.assertTrue(0 < last["duration_ms"] < 100)3649else:3650self.assertTrue("duration_ms" not in last)3651else:3652self.assertTrue("entries" not in t)3653
3654@requires_nccl()3655@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")3656@parametrize("timing_enabled", [True, False])3657@parametrize("include_collectives", [True, False])3658def test_short_json(self, timing_enabled, include_collectives):3659if self.rank == self.MAIN_PROCESS_RANK:3660return3661pg = self._create_process_group_nccl()3662if timing_enabled:3663pg._enable_collectives_timing()3664device = self.local_device3665a = torch.full((3, 4), float(self.rank), device=device)3666for i in range(2):3667f = pg.allreduce(a)3668f.wait()3669torch.cuda.synchronize(device=device)3670# gah ok so now the duration_ms is populated best-effort since it can only happen outside "dump()" api3671time.sleep(1)3672t = json.loads(3673torch._C._distributed_c10d._dump_nccl_trace_json(3674includeCollectives=include_collectives3675)3676)3677self._verify_trace(t, include_collectives, timing_enabled, True)3678dist.destroy_process_group()3679
3680@requires_nccl()3681@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")3682@parametrize("timing_enabled", [True, False])3683@parametrize("include_collectives", [True, False])3684def test_short_pickle(self, timing_enabled, include_collectives):3685if self.rank == self.MAIN_PROCESS_RANK:3686return3687pg = self._create_process_group_nccl()3688if timing_enabled:3689pg._enable_collectives_timing()3690device = self.local_device3691a = torch.full((3, 4), float(self.rank), device=device)3692for i in range(2):3693f = pg.allreduce(a)3694f.wait()3695torch.cuda.synchronize(device=device)3696# gah ok so now the duration_ms is populated best-effort since it can only happen outside "dump()" api3697time.sleep(1)3698t = pickle.loads(3699torch._C._distributed_c10d._dump_nccl_trace(3700includeCollectives=include_collectives3701)3702)3703self._verify_trace(3704t,3705include_collectives=include_collectives,3706timing_enabled=timing_enabled,3707is_json=True,3708)3709dist.destroy_process_group()3710
3711@requires_nccl()3712@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")3713def test_dump_pipe(self):3714def open_file_with_timeout(file_path, mode, timeout=1.0):3715start_time = time.time()3716while time.time() - start_time < timeout:3717if os.path.exists(file_path):3718return open(file_path, mode)3719time.sleep(0.1)3720raise FileNotFoundError3721
3722if self.rank == self.MAIN_PROCESS_RANK:3723for c in self.children_pipes:3724self.assertEqual(c.recv(), "next")3725
3726dump_file = self._trace_name(rank=0)3727pipe_file = dump_file + ".pipe"3728with open_file_with_timeout(pipe_file, "w") as f:3729f.write("1\n")3730with open_file_with_timeout(dump_file, "rb", timeout=10.0) as f:3731self.assertTrue("all_reduce" in str(pickle.load(f)))3732
3733for c in self.children_pipes:3734c.send("next")3735return3736
3737pg = self._create_process_group_nccl()3738device = self.local_device3739a = torch.full((3, 4), float(self.rank), device=device)3740for i in range(2):3741f = pg.allreduce(a)3742f.wait()3743torch.cuda.synchronize(device=device)3744self.parent.send("next")3745self.parent.recv()3746
3747@requires_nccl()3748@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")3749def test_long(self):3750os.environ["TORCH_NCCL_TRACE_BUFFER_SIZE"] = "10"3751if self.rank == self.MAIN_PROCESS_RANK:3752return3753pg = self._create_process_group_nccl()3754device = self.local_device3755a = torch.full((3, 4), float(self.rank), device=device)3756for i in range(2):3757# test some other primitives to make sure3758# their strings are valid3759xs = [torch.ones(3, 4, device=device)]3760pg.broadcast(xs).wait()3761pg.allreduce(xs).wait()3762pg.reduce(xs).wait()3763ys = [[torch.empty(3, 4, device=device) for _ in range(self.world_size)]]3764pg.allgather(ys, xs).wait()3765pg.reduce_scatter(xs, ys).wait()3766f = pg.allreduce(a)3767f.wait()3768torch.cuda.synchronize(device=device)3769t = pickle.loads(torch._C._distributed_c10d._dump_nccl_trace())3770t = t["entries"]3771self.assertEqual(len(t), 10)3772first = t[0]3773last = t[-1]3774self.assertEqual(last["profiling_name"], "nccl:all_reduce")3775self.assertEqual(last["state"], "completed")3776self.assertIn("test_c10d_nccl.py", str(last["frames"]))3777self.assertEqual(last["input_sizes"], ((3, 4),))3778self.assertEqual(last["input_dtypes"], ["Float"])3779self.assertEqual(last["output_sizes"], ((3, 4),))3780self.assertEqual(last["output_dtypes"], ["Float"])3781self.assertEqual(last["timeout_ms"], 600000)3782self.assertEqual(last["collective_seq_id"] - first["collective_seq_id"], 9)3783dist.destroy_process_group()3784
3785@requires_nccl()3786@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")3787def test_trace_while_all_works_retired(self):3788os.environ["TORCH_NCCL_TRACE_BUFFER_SIZE"] = "10"3789if self.rank == self.MAIN_PROCESS_RANK:3790return3791pg = self._create_process_group_nccl()3792device = self.local_device3793# send more works than the buffer size to overwrite the previous entry3794for i in range(12):3795a = [torch.ones(3, 4, device=device)]3796pg.broadcast(a).wait()3797torch.cuda.synchronize(device=device)3798
3799# wait for all works to be retired3800pg._wait_for_pending_works()3801t = pickle.loads(torch._C._distributed_c10d._dump_nccl_trace())3802t = t["entries"]3803self.assertEqual(len(t), 10)3804last = t[-1]3805self.assertEqual(last["retired"], True)3806self.assertEqual(last["state"], "completed")3807
3808@requires_nccl()3809@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")3810@parametrize("timing_enabled", [True, False])3811@parametrize("only_active", [True, False])3812def test_trace_while_active(self, timing_enabled, only_active):3813if self.rank == self.MAIN_PROCESS_RANK:3814for c in self.children_pipes:3815self.assertEqual(c.recv(), "next")3816for c in self.children_pipes:3817c.send("next")3818return3819
3820pg = self._create_process_group_nccl()3821if timing_enabled:3822pg._enable_collectives_timing()3823device = self.local_device3824with torch.cuda.device(device):3825a = torch.full((3, 4), float(self.rank), device=device)3826
3827pg.allreduce(a).wait()3828e = torch.cuda.Event()3829e.record()3830if self.rank != 0:3831pg.allreduce(a).wait()3832e.synchronize()3833t = pickle.loads(3834torch._C._distributed_c10d._dump_nccl_trace(onlyActive=only_active)3835)3836t = t["entries"]3837if only_active:3838if self.rank == 0:3839self.assertEqual(len(t), 0)3840else:3841self.assertEqual(len(t), 1)3842if not only_active:3843if self.rank == 0:3844self.assertEqual(t[-1]["profiling_name"], "nccl:all_reduce")3845self.assertEqual(t[-1]["collective_seq_id"], 1)3846self.assertEqual(t[-1]["state"], "completed")3847else:3848self.assertEqual(t[-1]["profiling_name"], "nccl:all_reduce")3849self.assertEqual(t[-1]["collective_seq_id"], 2)3850self.assertEqual(3851t[-1]["state"], self.started_or_scheduled(timing_enabled)3852)3853
3854self.parent.send("next")3855self.assertEqual("next", self.parent.recv())3856if self.rank == 0:3857pg.allreduce(a).wait()3858torch.cuda.synchronize(device=device)3859
3860@requires_nccl()3861@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")3862@parametrize("timing_enabled", [True, False])3863def test_trace_while_stuck(self, timing_enabled):3864if self.rank == self.MAIN_PROCESS_RANK:3865for c in self.children_pipes:3866self.assertEqual(c.recv(), "next")3867for c in self.children_pipes:3868c.send("next")3869return3870
3871pg = self._create_process_group_nccl()3872if timing_enabled:3873pg._enable_collectives_timing()3874
3875device = self.local_device3876with torch.cuda.device(device):3877a = torch.full((3, 4), float(self.rank), device=device)3878
3879pg.allreduce(a).wait()3880e = torch.cuda.Event()3881e.record()3882
3883def gather_trace():3884e.synchronize()3885# give the other thread some time to fill the cuda buffer3886time.sleep(5)3887t = pickle.loads(torch._C._distributed_c10d._dump_nccl_trace())3888t = t["entries"]3889self.assertEqual(t[-1]["profiling_name"], "nccl:all_reduce")3890if self.rank == 0:3891self.assertEqual(t[-1]["collective_seq_id"], 1)3892self.assertEqual(t[-1]["state"], "completed")3893else:3894self.assertEqual(t[-1]["collective_seq_id"], 2)3895self.assertEqual(3896t[-1]["state"], self.started_or_scheduled(timing_enabled)3897)3898self.assertIsNone(t[-1]["time_discovered_completed_ns"])3899# this will eventually cause the missing rank 03900# to continue which will unblock the non-zero ranks3901self.parent.send("next")3902
3903if self.rank != 0:3904pg.allreduce(a).wait()3905th = threading.Thread(target=gather_trace)3906th.start()3907# fill the cuda buffer, at around 1024 events3908# this will stall3909for i in range(2000):3910a = a + a3911th.join()3912else:3913gather_trace()3914
3915self.assertEqual("next", self.parent.recv())3916if self.rank == 0:3917pg.allreduce(a).wait()3918torch.cuda.synchronize(device=device)3919
3920@requires_nccl()3921@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")3922@parametrize(3923"op_sizes_per_coalesce",3924[3925[(2, 3)],3926[(2, 3), (5, 5), (1,)],3927],3928)3929@parametrize("timing_enabled", [True, False])3930def test_batched_send_recv(self, op_sizes_per_coalesce, timing_enabled):3931"""3932'WorkEnqueue' was skipped for isendirecv, leading to segfault on dump_entries when update_state tried to use
3933a destructed Work obj's cuda events
3934"""
3935
3936if self.rank == self.MAIN_PROCESS_RANK:3937return3938pg = self._create_process_group_nccl()3939if timing_enabled:3940pg._enable_collectives_timing()3941
3942num_coalesced_ops = 203943ops_per_coalesce = len(op_sizes_per_coalesce)3944for i in range(num_coalesced_ops):3945ops = []3946for input_sizes in op_sizes_per_coalesce:3947tensor = torch.zeros(input_sizes).to(self.local_device)3948if self.rank == 0:3949ops.append(dist.P2POp(dist.irecv, tensor, 1))3950elif self.rank == 1:3951tensor *= 23952ops.append(dist.P2POp(dist.isend, tensor, 0))3953
3954dist.batch_isend_irecv(ops).pop().wait()3955
3956torch.cuda.synchronize(device=self.local_device)3957
3958if timing_enabled:3959# wait for watchdog thread to process the queue of works3960time.sleep(1)3961
3962t = pickle.loads(torch._C._distributed_c10d._dump_nccl_trace())3963self.assertEqual(len(t["entries"]), num_coalesced_ops * (ops_per_coalesce + 1))3964
3965expected_record_id = 03966expected_seq = 13967expected_op_id = 13968for seq in range(num_coalesced_ops):3969first_op = seq * (ops_per_coalesce + 1)3970coalesced_op = first_op + ops_per_coalesce3971for p2p_op_idx, input_sizes in zip(3972range(first_op, coalesced_op, 1), op_sizes_per_coalesce3973):3974# the indivudal ops inside the coalescing group the individual op metadata,3975# but not the timing info coming from the actual coalesced kernel3976profiling_name = (3977"nccl:recv 0<-1" if self.rank == 0 else "nccl:send 1->0"3978)3979self.assertEqual(3980t["entries"][p2p_op_idx]["record_id"], expected_record_id3981)3982expected_record_id += 13983self.assertEqual(3984t["entries"][p2p_op_idx]["profiling_name"], profiling_name3985)3986self.assertEqual(3987t["entries"][p2p_op_idx]["collective_seq_id"], expected_seq3988)3989self.assertEqual(t["entries"][p2p_op_idx]["op_id"], expected_op_id)3990expected_op_id += 13991self.assertEqual(t["entries"][p2p_op_idx]["input_sizes"], [input_sizes])3992self.assertEqual(3993t["entries"][p2p_op_idx]["output_sizes"], [input_sizes]3994)3995# duration doesn't get tagged onto individual ops yet, nor is their state updated3996self.assertEqual(t["entries"][p2p_op_idx]["state"], "scheduled")3997self.assertTrue("duration_ms" not in t["entries"][p2p_op_idx])3998
3999# the coalesced op has no metadata but indicates that coalescing was used,4000# and accurately reflects the timing and state info for the whole group4001self.assertEqual(4002t["entries"][coalesced_op]["record_id"], expected_record_id4003)4004expected_record_id += 14005self.assertEqual(4006t["entries"][coalesced_op]["profiling_name"], "nccl:coalesced"4007)4008self.assertEqual(4009t["entries"][coalesced_op]["collective_seq_id"], expected_seq4010)4011expected_seq += 14012self.assertEqual(t["entries"][coalesced_op]["state"], "completed")4013self.assertEqual(t["entries"][coalesced_op]["input_sizes"], [])4014self.assertEqual(t["entries"][coalesced_op]["output_sizes"], [])4015if timing_enabled:4016duration = t["entries"][coalesced_op]["duration_ms"]4017self.assertTrue(0.001 < duration < 10000, duration)4018else:4019self.assertTrue("duration_ms" not in t["entries"][coalesced_op])4020self.assertEqual(t["entries"][coalesced_op]["timeout_ms"], 600000)4021
4022@requires_nccl()4023@skip_but_pass_in_sandcastle_if(not TEST_MULTIGPU, "NCCL test requires 2+ GPUs")4024@parametrize(4025"op_sizes",4026[4027[(2, 3)],4028[(2, 3), (5, 5), (1,)],4029],4030)4031@parametrize("timing_enabled", [True, False])4032def test_individual_send_recv(self, op_sizes, timing_enabled):4033"""4034'WorkEnqueue' was skipped for isendirecv, leading to segfault on dump_entries when update_state tried to use
4035a destructed Work obj's cuda events
4036"""
4037
4038if self.rank == self.MAIN_PROCESS_RANK:4039return4040pg = self._create_process_group_nccl()4041if timing_enabled:4042pg._enable_collectives_timing()4043num_repeats = 104044ops_per_repeat = len(op_sizes)4045for i in range(num_repeats):4046for input_sizes in op_sizes:4047tensor = torch.zeros(input_sizes).to(self.local_device)4048if self.rank == 0:4049dist.recv(tensor, 1)4050elif self.rank == 1:4051tensor *= 24052dist.send(tensor, 0)4053
4054torch.cuda.synchronize(device=self.local_device)4055if timing_enabled:4056# wait for watchdog thread to process the queue of works4057time.sleep(1)4058
4059t = pickle.loads(torch._C._distributed_c10d._dump_nccl_trace())4060self.assertEqual(len(t["entries"]), num_repeats * (ops_per_repeat))4061expected_seq = 14062expected_op_id = 14063for seq in range(num_repeats * ops_per_repeat):4064input_sizes = op_sizes[seq % ops_per_repeat]4065profiling_name = "nccl:recv 0<-1" if self.rank == 0 else "nccl:send 1->0"4066self.assertEqual(t["entries"][seq]["profiling_name"], profiling_name)4067self.assertEqual(t["entries"][seq]["p2p_seq_id"], expected_seq)4068expected_seq += 14069self.assertEqual(t["entries"][seq]["op_id"], expected_op_id)4070expected_op_id += 14071self.assertEqual(t["entries"][seq]["input_sizes"], [input_sizes])4072self.assertEqual(t["entries"][seq]["output_sizes"], [input_sizes])4073self.assertEqual(t["entries"][seq]["state"], "completed")4074
4075if timing_enabled:4076duration = t["entries"][seq]["duration_ms"]4077self.assertTrue(0.001 < duration < 10000, duration)4078else:4079self.assertTrue("duration_ms" not in t["entries"][seq])4080
4081# TODO(whc) support and test coalesced collectives that use the c++ start/end group thingy instead of python4082# coalescing manager4083
4084# TODO(whc) test out other ops (And combinations of ops, if that's valid?)4085@requires_nccl()4086@skip_if_lt_x_gpu(2)4087@parametrize("timing_enabled", [True, False])4088def test_coalescing_manager_collective(self, timing_enabled):4089"""4090The coalescing manager api works by accumulating operations in python via a contextmanager, and then making
4091one call into c++ to an <op>_coalesced API. It has limited support for ops and has been added recently to
4092avoid overheads of making individual py-cpp calls. This complicates flight recording..
4093
4094For now, flight recording of coalescing_manager collectives is less detailed than cpp coalesced collectives.
4095"""
4096if self.rank == self.MAIN_PROCESS_RANK:4097return4098pg = self._create_process_group_nccl()4099if timing_enabled:4100pg._enable_collectives_timing()4101
4102output_tensors = torch.zeros(2, 2).to(self.rank)4103input_tensors = [torch.ones(2, 2).to(self.rank) for _ in range(self.world_size)]4104
4105# TODO(whc) make this work with bigger world or something4106self.assertEqual(self.world_size, 2, self.world_size)4107
4108with dist._coalescing_manager():4109for i in range(self.world_size):4110dist.reduce_scatter_tensor(output_tensors[i], input_tensors[i])4111self.assertEqual(output_tensors, input_tensors[self.rank] * self.world_size)4112
4113torch.cuda.synchronize(device=self.rank)4114
4115if timing_enabled:4116# wait for watchdog thread to process the queue of works4117time.sleep(1)4118
4119t = pickle.loads(torch._C._distributed_c10d._dump_nccl_trace())4120
4121self.assertEqual(4122len(t["entries"]), 14123) # one for the reduce_scatter_tensor_coalesced, one for the endCoalescing4124self.assertEqual(4125t["entries"][0]["profiling_name"], "nccl:reduce_scatter_tensor_coalesced"4126)4127self.assertEqual(t["entries"][0]["collective_seq_id"], 1)4128self.assertEqual(t["entries"][0]["input_sizes"], [[2, 2], [2, 2]])4129self.assertEqual(4130t["entries"][0]["output_sizes"],4131[4132[41332,4134],4135[41362,4137],4138],4139)4140self.assertEqual(t["entries"][0]["state"], "completed")4141if timing_enabled:4142duration = t["entries"][0]["duration_ms"]4143self.assertTrue(0.001 < duration < 10000, duration)4144else:4145self.assertTrue("duration_ms" not in t["entries"][0])4146
4147
4148def check_if_test_is_skipped(fn):4149def wrapper(self, *args, **kwargs):4150for skip in TEST_SKIPS.values():4151if self.processes[0].exitcode == skip.exit_code:4152return MultiProcessTestCase._check_return_codes(self, *args, **kwargs)4153return fn(self, *args, **kwargs)4154
4155return wrapper4156
4157
4158class NCCLTraceTestDumpOnTimeoutBase(NCCLTraceTestBase):4159timeout_sec = 14160
4161def _create_process_group_nccl(self):4162store = dist.FileStore(self.file_name, self.world_size)4163c10d.init_process_group(4164"nccl",4165world_size=self.world_size,4166rank=self.rank,4167store=store,4168timeout=timedelta(seconds=NCCLTraceTestDumpOnTimeoutBase.timeout_sec),4169)4170pg = c10d.distributed_c10d._get_default_group()4171return pg4172
4173@check_if_test_is_skipped4174def _check_return_codes(self, elapsed_time):4175# the base test infra assumes processes exit with matching return codes,4176# but we want rank0 to abort and rank1 to exit cleanly in this test4177self.assertEqual(self.processes[0].exitcode, -6)4178self.assertEqual(self.processes[1].exitcode, 0)4179
4180def _wait_process(self, rank, timeout):4181try:4182self.processes[rank].join(timeout)4183return self.processes[rank].exitcode4184except TimeoutError:4185return None4186
4187
4188@skip_but_pass_in_sandcastle
4189class NCCLTraceTestDumpOnTimeout(NCCLTraceTestDumpOnTimeoutBase):4190@requires_nccl()4191@skip_if_lt_x_gpu(2)4192@parametrize("timing_enabled", [True, False])4193def test_timeout_dumps(self, timing_enabled):4194# dump on heartbeatmonitor thread4195os.environ["TORCH_NCCL_COORD_CHECK_MILSEC"] = "1000"4196# need rank0 to crash before looking for its output file4197os.environ["TORCH_NCCL_HEARTBEAT_TIMEOUT_SEC"] = "1"4198
4199if self.rank == self.MAIN_PROCESS_RANK:4200# wait for rank0 to crash before looking for its output file4201# we rely on rank0 holding off its abort long enough to dump the debug info4202self.assertEqual(self._wait_process(0, timeout=90), -6)4203with open(self._trace_name(rank=0), "rb") as f:4204t = pickle.load(f)4205t = t["entries"]4206self.assertEqual(len(t), 2)4207self.assertEqual(t[0]["collective_seq_id"], 1)4208self.assertEqual(t[0]["state"], "completed")4209self.assertEqual(t[1]["collective_seq_id"], 2)4210self.assertEqual(4211t[1]["state"], self.started_or_scheduled(timing_enabled)4212)4213
4214self.assertFalse(os.path.exists(self._trace_name(rank=1)))4215
4216return4217
4218pg = self._create_process_group_nccl()4219if timing_enabled:4220# we force disabled timing in setup, since there is no 'disable' function4221pg._enable_collectives_timing()4222
4223device = self.local_device4224with torch.cuda.device(device):4225a = torch.full((3, 4), float(self.rank), device=device)4226
4227pg.allreduce(a).wait()4228if self.rank == 0:4229pg.allreduce(a).wait()4230
4231# rank 0 will crash before it passes the sync, but rank1 will exit quickly and cleanly4232torch.cuda.synchronize(device=device)4233
4234
4235instantiate_parametrized_tests(ProcessGroupNCCLGroupTest)4236instantiate_parametrized_tests(NCCLTraceTestDumpOnTimeout)4237instantiate_parametrized_tests(NCCLTraceTest)4238
4239
4240@skip_but_pass_in_sandcastle
4241class NCCLTraceTestTimeoutDumpOnStuckRanks(NCCLTraceTestDumpOnTimeoutBase):4242@check_if_test_is_skipped4243def _check_return_codes(self, elapsed_time):4244# the base test infra assumes processes exit with matching return codes,4245# but we want rank0 to abort and rank1 to exit cleanly in this test4246self.assertEqual(self.processes[0].exitcode, -6)4247self.assertEqual(self.processes[1].exitcode, -6)4248
4249@requires_nccl()4250@skip_if_lt_x_gpu(2)4251def test_timeout_dumps_on_stuck_ranks(self):4252# need rank0 to crash quicker after detecting timeout4253os.environ["TORCH_NCCL_HEARTBEAT_TIMEOUT_SEC"] = "1"4254# restore this env var to its prior default in case another test changed it4255os.environ["TORCH_NCCL_COORD_CHECK_MILSEC"] = "1000"4256
4257if self.rank == self.MAIN_PROCESS_RANK:4258# wait for both rank0 and 1 to crash before looking for both ranks' output4259# file, and we rely on rank1 to sleep long enough to dump the debug info.4260self.assertEqual(self._wait_process(0, timeout=90), -6)4261self.assertEqual(self._wait_process(1, timeout=90), -6)4262self.assertTrue(os.path.exists(self._trace_name(rank=1)))4263self.assertTrue(os.path.exists(self._trace_name(rank=0)))4264with open(self._trace_name(rank=0), "rb") as f:4265t = pickle.load(f)4266t = t["entries"]4267self.assertEqual(len(t), 2)4268with open(self._trace_name(rank=1), "rb") as f:4269t = pickle.load(f)4270t = t["entries"]4271self.assertEqual(len(t), 1)4272self.assertEqual(t[0]["collective_seq_id"], 1)4273self.assertEqual(t[0]["state"], "completed")4274return4275
4276pg = self._create_process_group_nccl()4277device = self.local_device4278with torch.cuda.device(device):4279a = torch.full((3, 4), float(self.rank), device=device)4280
4281pg.allreduce(a).wait()4282if self.rank == 0:4283pg.allreduce(a).wait()4284
4285# rank 0 will get stuck, timeout and then signal a timeout to all ranks.4286torch.cuda.synchronize(device=device)4287
4288if self.rank == 1:4289# Force rank 1 to idle so that it will eventually timeout as well after4290# getting the global signal to dump the debugging info.4291time.sleep(600)4292
4293
4294@skip_but_pass_in_sandcastle
4295class NcclErrorDumpTest(NCCLTraceTestBase):4296def _wait_process(self, rank, timeout):4297try:4298self.processes[rank].join(timeout)4299return self.processes[rank].exitcode4300except TimeoutError:4301return None4302
4303@check_if_test_is_skipped4304def _check_return_codes(self, elapsed_time):4305# the base test infra assumes processes exit with matching return codes,4306# but we want rank0 to abort with exception and rank1 to exit with exit 14307self.assertEqual(self.processes[0].exitcode, -6)4308self.assertEqual(self.processes[1].exitcode, 1)4309
4310@requires_nccl()4311@requires_nccl_version((2, 4, 0), "Need NCCL 2.4+ for error checking")4312@skip_if_lt_x_gpu(2)4313@skip_if_rocm4314def test_nccl_errors_dump(self):4315os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "1"4316os.environ["TORCH_NCCL_TRACE_BUFFER_SIZE"] = "1000"4317os.environ["TORCH_NCCL_DUMP_ON_TIMEOUT"] = "1"4318# need rank0 to dump before abort4319os.environ["TORCH_NCCL_HEARTBEAT_TIMEOUT_SEC"] = "5"4320
4321if self.rank == self.MAIN_PROCESS_RANK:4322# wait for both rank0 and 1 to crash before looking for dump4323self.assertEqual(self._wait_process(0, timeout=90), -6)4324self.assertEqual(self._wait_process(1, timeout=90), 1)4325# verify that the trace file exists for rank04326self.assertTrue(os.path.exists(self._trace_name(rank=0)))4327return4328
4329store = c10d.FileStore(self.file_name, self.world_size)4330process_group = c10d.ProcessGroupNCCL(4331store,4332self.rank,4333self.world_size,4334timeout=timedelta(seconds=10),4335)4336process_group.allreduce(torch.rand(10).cuda(self.rank))4337if self.rank == 0:4338work = process_group.allreduce(torch.rand(10).cuda(self.rank))4339# expect an error to be raised4340with self.assertRaisesRegex(dist.DistBackendError, ""):4341# Block the current stream on the NCCL stream4342work.wait()4343# Run some GPU operations4344a = torch.rand(10).cuda(self.rank)4345elif self.rank == 1:4346# Clean up structures (ex: files for FileStore before going down)4347del process_group4348sys.exit(1)4349
4350
4351# tests that needs to be run with a larger world size
4352class ProcessGroupNCCLLargerScaleTest(MultiProcessTestCase):4353def _create_process_group_nccl(self, store, opts, device_id=None):4354# create nccl processgroup with opts4355c10d.init_process_group(4356"nccl",4357world_size=self.world_size,4358rank=self.rank,4359store=store,4360pg_options=opts,4361device_id=device_id,4362)4363pg = c10d.distributed_c10d._get_default_group()4364return pg4365
4366def opts(self, high_priority_stream=False):4367opts = c10d.ProcessGroupNCCL.Options()4368opts.is_high_priority_stream = high_priority_stream4369return opts4370
4371def setUp(self):4372super().setUp()4373# TORCH_NCCL_BLOCKING_WAIT overrides TORCH_NCCL_ASYNC_ERROR_HANDLING hence tests4374# that use TORCH_NCCL_BLOCKING_WAIT will test it as expected.4375os.environ["TORCH_NCCL_ASYNC_ERROR_HANDLING"] = "1"4376# self.num_gpus = torch.cuda.device_count()4377self._spawn_processes()4378
4379def tearDown(self):4380super().tearDown()4381try:4382os.remove(self.file_name)4383except OSError:4384pass4385
4386@property4387def world_size(self):4388return 84389
4390@property4391def rank_to_GPU(self):4392# return rank to GPU map4393return init_multigpu_helper(self.world_size, "nccl")4394
4395@requires_nccl_version((2, 18), "Need NCCL 2.18+ for ncclCommSplit")4396@skip_if_lt_x_gpu(8)4397def test_comm_split_group_larger_scale(self):4398store = c10d.FileStore(self.file_name, self.world_size)4399device = torch.device(f"cuda:{self.rank}")4400pg = self._create_process_group_nccl(store, self.opts(), device_id=device)4401backend = pg._get_backend(torch.device(device))4402
4403tensor = torch.full((1,), self.rank).cuda(device)4404ng1 = c10d.split_group(pg, [[0, 1], [2, 3, 4, 5, 6, 7]])4405backend1 = ng1._get_backend(torch.device(device))4406
4407# comm split happens eagerly since device_id is passed to init_process_group.4408self.assertEqual(backend.comm_split_count(), 1)4409# dist.broadcast take Source rank on global process group4410if self.rank < 2:4411dist.broadcast(tensor, 0, group=ng1)4412self.assertEqual(tensor, torch.full((1,), 0))4413else:4414dist.broadcast(tensor, 2, group=ng1)4415self.assertEqual(tensor, torch.full((1,), 2))4416
4417# test split with only one colored group, other ranks should be no color split.4418ng2 = c10d.split_group(pg, [[5, 6, 7]])4419self.assertEqual(backend.comm_split_count(), 2)4420
4421if self.rank >= 5:4422tensor2 = torch.full((1,), self.rank).cuda(device)4423dist.broadcast(tensor2, 7, group=ng2)4424self.assertEqual(tensor2, torch.full((1,), 7))4425else:4426self.assertEqual(ng2, None)4427# a barrier and a cuda sync before destroying all pgs.4428dist.barrier(pg)4429torch.cuda.synchronize()4430dist.destroy_process_group()4431
4432@requires_nccl_version((2, 18), "Need NCCL 2.18+ for ncclCommSplit")4433@skip_if_lt_x_gpu(8)4434def test_comm_recursive_split_group(self):4435store = c10d.FileStore(self.file_name, self.world_size)4436device = torch.device(f"cuda:{self.rank}")4437pg = self._create_process_group_nccl(store, self.opts(), device_id=device)4438backend = pg._get_backend(torch.device(device))4439
4440# split the default PG into 2 subgroups, each subgroup (ng1) has 4 ranks.4441tensor1 = torch.full((1,), self.rank).cuda(device)4442ng1 = c10d.split_group(pg, [[0, 1, 2, 3], [4, 5, 6, 7]])4443backend1 = ng1._get_backend(torch.device(device))4444if self.rank < 4:4445dist.broadcast(tensor1, 0, group=ng1)4446self.assertEqual(tensor1, torch.full((1,), 0))4447else:4448dist.broadcast(tensor1, 4, group=ng1)4449self.assertEqual(tensor1, torch.full((1,), 4))4450
4451# comm split happens eagerly since device_id is passed to init_process_group.4452self.assertEqual(backend.comm_split_count(), 1)4453self.assertEqual(backend1.comm_split_count(), 0)4454
4455# further split ng1 into 2 subgroups, each subgroup (ng2) has 2 ranks.4456tensor2 = torch.full((1,), self.rank).cuda(device)4457ng2 = c10d.split_group(ng1, [[0, 1], [2, 3]])4458backend2 = ng2._get_backend(torch.device(device))4459self.assertEqual(backend.comm_split_count(), 1)4460self.assertEqual(backend1.comm_split_count(), 1)4461self.assertEqual(backend2.comm_split_count(), 0)4462
4463# execute collective calls within each 2-rank pg4464if self.rank == 0 or self.rank == 1:4465dist.broadcast(tensor2, 1, group=ng2)4466self.assertEqual(tensor2, torch.full((1,), 1))4467
4468if self.rank == 2 or self.rank == 3:4469dist.broadcast(tensor2, 2, group=ng2)4470self.assertEqual(tensor2, torch.full((1,), 2))4471
4472if self.rank == 4 or self.rank == 5:4473dist.broadcast(tensor2, 5, group=ng2)4474self.assertEqual(tensor2, torch.full((1,), 5))4475
4476if self.rank == 6 or self.rank == 7:4477dist.broadcast(tensor2, 6, group=ng2)4478self.assertEqual(tensor2, torch.full((1,), 6))4479# a barrier and a cuda sync before destroying all pgs.4480dist.barrier(pg)4481torch.cuda.synchronize()4482dist.destroy_process_group()4483
4484
4485if __name__ == "__main__":4486assert (4487not torch.cuda._initialized4488), "test_distributed must not have initialized CUDA context on main process"4489
4490run_tests()4491