pytorch-lightning

Форк
0
137 строк · 5.2 Кб
1
# Copyright The Lightning AI team.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
import multiprocessing
16
from typing import Any, List, Optional
17

18
import lightning.app
19
from lightning.app.core import constants
20
from lightning.app.core.queues import QueuingSystem
21
from lightning.app.runners.backends.backend import Backend, WorkManager
22
from lightning.app.utilities.enum import WorkStageStatus
23
from lightning.app.utilities.network import _check_service_url_is_ready, find_free_network_port
24
from lightning.app.utilities.port import disable_port, enable_port
25
from lightning.app.utilities.proxies import ProxyWorkRun, WorkRunner
26

27

28
class MultiProcessWorkManager(WorkManager):
29
    def __init__(self, app, work):
30
        self.app = app
31
        self.work = work
32
        self._process = None
33

34
    def start(self):
35
        self._work_runner = WorkRunner(
36
            work=self.work,
37
            work_name=self.work.name,
38
            caller_queue=self.app.caller_queues[self.work.name],
39
            delta_queue=self.app.delta_queue,
40
            readiness_queue=self.app.readiness_queue,
41
            error_queue=self.app.error_queue,
42
            request_queue=self.app.request_queues[self.work.name],
43
            response_queue=self.app.response_queues[self.work.name],
44
            copy_request_queue=self.app.copy_request_queues[self.work.name],
45
            copy_response_queue=self.app.copy_response_queues[self.work.name],
46
            flow_to_work_delta_queue=self.app.flow_to_work_delta_queues[self.work.name],
47
            run_executor_cls=self.work._run_executor_cls,
48
        )
49

50
        start_method = self.work._start_method
51
        context = multiprocessing.get_context(start_method)
52
        self._process = context.Process(target=self._work_runner)
53
        self._process.start()
54

55
    def kill(self):
56
        self._process.terminate()
57

58
    def restart(self):
59
        assert not self.is_alive()
60
        work = self._work_runner.work
61
        # un-wrap ProxyRun.
62
        is_proxy = isinstance(work.run, ProxyWorkRun)
63
        if is_proxy:
64
            work_run = work.run
65
            work.run = work_run.work_run
66
        work._restarting = True
67
        self.start()
68
        if is_proxy:
69
            work.run = work_run
70

71
    def is_alive(self) -> bool:
72
        return self._process.is_alive()
73

74

75
class MultiProcessingBackend(Backend):
76
    def __init__(self, entrypoint_file: str):
77
        super().__init__(entrypoint_file=entrypoint_file, queues=QueuingSystem.MULTIPROCESS, queue_id="0")
78

79
    def create_work(self, app, work) -> None:
80
        if constants.LIGHTNING_CLOUDSPACE_HOST is not None:
81
            # Override the port if set by the user
82
            work._port = find_free_network_port()
83
            work._host = "0.0.0.0"  # noqa: S104
84
            work._future_url = f"https://{work.port}-{constants.LIGHTNING_CLOUDSPACE_HOST}"
85

86
        app.processes[work.name] = MultiProcessWorkManager(app, work)
87
        app.processes[work.name].start()
88
        self.resolve_url(app)
89
        app._update_layout()
90

91
    def update_work_statuses(self, works) -> None:
92
        pass
93

94
    def stop_all_works(self, works: List["lightning.app.LightningWork"]) -> None:
95
        pass
96

97
    def resolve_url(self, app, base_url: Optional[str] = None) -> None:
98
        for work in app.works:
99
            if (
100
                work.status.stage in (WorkStageStatus.RUNNING, WorkStageStatus.SUCCEEDED)
101
                and work._url == ""
102
                and work._port
103
            ):
104
                url = work._future_url if work._future_url else f"http://{work._host}:{work._port}"
105
                if _check_service_url_is_ready(url, metadata=f"Checking {work.name}"):
106
                    work._url = url
107

108
    def stop_work(self, app, work: "lightning.app.LightningWork") -> None:
109
        work_manager: MultiProcessWorkManager = app.processes[work.name]
110
        work_manager.kill()
111

112
    def delete_work(self, app, work: "lightning.app.LightningWork") -> None:
113
        self.stop_work(app, work)
114

115

116
class CloudMultiProcessingBackend(MultiProcessingBackend):
117
    def __init__(self, *args: Any, **kwargs: Any):
118
        super().__init__(*args, **kwargs)
119

120
        # Note: Track the open ports to close them on termination.
121
        self.ports = []
122

123
    def create_work(self, app, work) -> None:
124
        work._host = "0.0.0.0"  # noqa: S104
125
        nc = enable_port()
126
        self.ports.append(nc.port)
127
        work._port = nc.port
128
        work._future_url = f"https://{nc.host}"
129
        return super().create_work(app, work)
130

131
    def stop_work(self, app, work: "lightning.app.LightningWork") -> None:
132
        disable_port(work._port)
133
        self.ports = [port for port in self.ports if port != work._port]
134
        return super().stop_work(app, work)
135

136
    def delete_work(self, app, work: "lightning.app.LightningWork") -> None:
137
        self.stop_work(app, work)
138

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.