pytorch-lightning

Форк
0
159 строк · 6.8 Кб
1
# Copyright The Lightning AI team.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
import logging
15
import os
16
import subprocess
17
from typing import Any, Callable, List, Optional
18

19
from lightning_utilities.core.imports import RequirementCache
20
from typing_extensions import override
21

22
import lightning.pytorch as pl
23
from lightning.fabric.plugins import ClusterEnvironment
24
from lightning.fabric.strategies.launchers.subprocess_script import (
25
    _basic_subprocess_cmd,
26
    _hydra_subprocess_cmd,
27
    _launch_process_observer,
28
)
29
from lightning.fabric.utilities.distributed import _set_num_threads_if_needed
30
from lightning.pytorch.strategies.launchers.launcher import _Launcher
31
from lightning.pytorch.trainer.connectors.signal_connector import _SIGNUM
32

33
log = logging.getLogger(__name__)
34
_HYDRA_AVAILABLE = RequirementCache("hydra-core")
35

36

37
class _SubprocessScriptLauncher(_Launcher):
38
    r"""A process launcher that invokes the current script as many times as desired in a single node.
39

40
    This launcher needs to be invoked on each node.
41
    In its default behavior, the main process in each node then spawns N-1 child processes via :func:`subprocess.Popen`,
42
    where N is the number of devices (e.g. GPU) per node. It is very similar to how :mod:`torch.distributed.run`
43
    launches processes.
44

45
    For example, if the script gets invoked with the command
46

47
    .. code-block:: bash
48

49
        python train.py --devices 4
50

51
    The launcher will create three additional subprocesses that get called like so:
52

53
    .. code-block:: bash
54

55
        LOCAL_RANK=1 python train.py --devices 4
56
        LOCAL_RANK=2 python train.py --devices 4
57
        LOCAL_RANK=3 python train.py --devices 4
58

59
    It is implied that the main process which launched the others has ``LOCAL_RANK=0``.
60
    Beside the local rank, the following other environment variables also get set, but unlike the local rank, these
61
    get determined by the cluster environment:
62

63
    1. `MASTER_ADDR`: The IP address of the main node.
64
    2. `MASTER_PORT`: The port number of the main node through which all processes communicate.
65
    3. `NODE_RANK`: The index of the node the current process is running on. Ranges from 0 to ``num_nodes - 1``.
66
    4. `WORLD_SIZE`: The total number of processes across all nodes, i.e., ``num_processes * num_nodes``.
67

68
    Arguments:
69
        cluster_environment: A cluster environment that provides access to world size, node rank, etc.
70
        num_processes: The number of processes to launch in the current node.
71
        num_nodes: The total number of nodes that participate in this process group.
72

73
    """
74

75
    def __init__(self, cluster_environment: ClusterEnvironment, num_processes: int, num_nodes: int) -> None:
76
        super().__init__()
77
        self.cluster_environment = cluster_environment
78
        self.num_processes = num_processes
79
        self.num_nodes = num_nodes
80
        self.procs: List[subprocess.Popen] = []  # launched child subprocesses, does not include the launcher
81

82
    @property
83
    @override
84
    def is_interactive_compatible(self) -> bool:
85
        return False
86

87
    @override
88
    def launch(self, function: Callable, *args: Any, trainer: Optional["pl.Trainer"] = None, **kwargs: Any) -> Any:
89
        """Creates new processes, then calls the given function.
90

91
        Arguments:
92
            function: A callback function to execute after all processes have been created.
93
                It is up to the implementation of this function to synchronize the processes, e.g., with barriers.
94
            *args: Optional positional arguments to be passed to the given function.
95
            trainer: Optional reference to the :class:`~lightning.pytorch.trainer.trainer.Trainer`.
96
            **kwargs: Optional keyword arguments to be passed to the given function.
97

98
        """
99
        self.cluster_environment.validate_settings(num_devices=self.num_processes, num_nodes=self.num_nodes)
100
        if not self.cluster_environment.creates_processes_externally:
101
            self._call_children_scripts()
102
            _launch_process_observer(self.procs)
103

104
        _set_num_threads_if_needed(num_processes=self.num_processes)
105
        return function(*args, **kwargs)
106

107
    @override
108
    def kill(self, signum: _SIGNUM) -> None:
109
        for proc in self.procs:
110
            log.info(f"pid {os.getpid()} killing {proc.pid} with {signum}")
111
            # this skips subprocesses already terminated
112
            proc.send_signal(signum)
113

114
    def _call_children_scripts(self) -> None:
115
        # bookkeeping of spawned processes
116
        self._check_can_spawn_children()
117
        self.procs = []  # reset in case it's called twice
118

119
        # DDP Environment variables
120
        os.environ["MASTER_ADDR"] = self.cluster_environment.main_address
121
        os.environ["MASTER_PORT"] = str(self.cluster_environment.main_port)
122

123
        # allow the user to pass the node rank
124
        os.environ["NODE_RANK"] = str(self.cluster_environment.node_rank())
125
        os.environ["LOCAL_RANK"] = str(self.cluster_environment.local_rank())
126
        os.environ["WORLD_SIZE"] = f"{self.num_processes * self.num_nodes}"
127

128
        for local_rank in range(1, self.num_processes):
129
            env_copy = os.environ.copy()
130
            env_copy["LOCAL_RANK"] = f"{local_rank}"
131

132
            # remove env var if global seed not set
133
            if os.environ.get("PL_GLOBAL_SEED") is None and "PL_GLOBAL_SEED" in env_copy:
134
                del env_copy["PL_GLOBAL_SEED"]
135

136
            hydra_in_use = False
137
            cwd: Optional[str] = None
138
            if _HYDRA_AVAILABLE:
139
                from hydra.core.hydra_config import HydraConfig
140

141
                hydra_in_use = HydraConfig.initialized()
142

143
            if hydra_in_use:
144
                command, cwd = _hydra_subprocess_cmd(local_rank)
145
            else:
146
                command = _basic_subprocess_cmd()
147

148
            new_process = subprocess.Popen(command, env=env_copy, cwd=cwd)
149
            self.procs.append(new_process)
150

151
    def _check_can_spawn_children(self) -> None:
152
        if len(self.procs) > 0:
153
            raise RuntimeError("The launcher can only create subprocesses once.")
154
        if self.cluster_environment.local_rank() != 0:
155
            raise RuntimeError(
156
                "Lightning attempted to launch new distributed processes with `local_rank > 0`. This should not happen."
157
                " Possible reasons: 1) LOCAL_RANK environment variable was incorrectly modified by the user,"
158
                " 2) `ClusterEnvironment.creates_processes_externally` incorrectly implemented."
159
            )
160

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.