pytorch-lightning

Форк
0
769 строк · 30.5 Кб
1
# Copyright The Lightning AI team.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
import sys
16
import time
17
import warnings
18
from copy import deepcopy
19
from functools import partial, wraps
20
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Type, Union
21

22
from deepdiff import DeepHash, Delta
23

24
from lightning.app.core.queues import BaseQueue
25
from lightning.app.storage.drive import Drive, _maybe_create_drive
26
from lightning.app.storage.path import Path
27
from lightning.app.storage.payload import Payload
28
from lightning.app.utilities.app_helpers import _is_json_serializable, _LightningAppRef, is_overridden
29
from lightning.app.utilities.app_status import WorkStatus
30
from lightning.app.utilities.component import _is_flow_context, _sanitize_state
31
from lightning.app.utilities.enum import (
32
    CacheCallsKeys,
33
    WorkFailureReasons,
34
    WorkStageStatus,
35
    WorkStopReasons,
36
    make_status,
37
)
38
from lightning.app.utilities.exceptions import LightningWorkException
39
from lightning.app.utilities.introspection import _is_init_context
40
from lightning.app.utilities.network import find_free_network_port
41
from lightning.app.utilities.packaging.build_config import BuildConfig
42
from lightning.app.utilities.packaging.cloud_compute import (
43
    _CLOUD_COMPUTE_STORE,
44
    CloudCompute,
45
    _CloudComputeStore,
46
    _maybe_create_cloud_compute,
47
)
48
from lightning.app.utilities.proxies import Action, LightningWorkSetAttrProxy, ProxyWorkRun, WorkRunExecutor, unwrap
49

50
if TYPE_CHECKING:
51
    from lightning.app.frontend import Frontend
52

53

54
class LightningWork:
55
    _INTERNAL_STATE_VARS = (
56
        # Internal protected variables that are still part of the state (even though they are prefixed with "_")
57
        "_paths",
58
        "_host",
59
        "_port",
60
        "_url",
61
        "_restarting",
62
        "_internal_ip",
63
        "_public_ip",
64
    )
65

66
    _run_executor_cls: Type[WorkRunExecutor] = WorkRunExecutor
67
    # TODO: Move to spawn for all Operating System.
68
    _start_method = "spawn" if sys.platform in ("darwin", "win32") else "fork"
69

70
    def __init__(
71
        self,
72
        parallel: bool = False,
73
        cache_calls: bool = True,
74
        raise_exception: bool = True,
75
        host: str = "127.0.0.1",
76
        port: Optional[int] = None,
77
        local_build_config: Optional[BuildConfig] = None,
78
        cloud_build_config: Optional[BuildConfig] = None,
79
        cloud_compute: Optional[CloudCompute] = None,
80
        run_once: Optional[bool] = None,  # TODO: Remove run_once
81
        start_with_flow: bool = True,
82
    ):
83
        """LightningWork, or Work in short, is a building block for long-running jobs.
84

85
        The LightningApp runs its :class:`~lightning.app.core.flow.LightningFlow` component
86
        within an infinite loop and track the ``LightningWork`` status update.
87

88
        Use LightningWork for third-party services or for launching heavy jobs such as
89
        downloading data, training or serving a model.
90

91
        Each LightningWork is running in its own independent process. Works are self-isolated from the rest,
92
        e.g any state changes happening within the work will be reflected within the flow but not the other way around.
93

94
        Arguments:
95
            parallel: Whether to run in parallel mode or not. When False, the flow waits for the work to finish.
96
            cache_calls: Whether the ``run`` method should cache its input arguments and not run again when provided
97
                with the same arguments in subsequent calls.
98
            raise_exception: Whether to re-raise an exception in the flow when raised from within the work run method.
99
            host: Bind socket to this host
100
            port: Bind socket to this port. Be default, this is None and should be called within your run method.
101
            local_build_config: The local BuildConfig isn't used until Lightning supports DockerRuntime.
102
            cloud_build_config: The cloud BuildConfig enables user to easily configure machine before running this work.
103
            run_once: Deprecated in favor of cache_calls. This will be removed soon.
104
            start_with_flow: Whether the work should be started at the same time as the root flow. Only applies to works
105
                defined in ``__init__``.
106

107
        **Learn More About Lightning Work Inner Workings**
108

109
        .. raw:: html
110

111
            <div class="display-card-container">
112
                <div class="row">
113

114
        .. displayitem::
115
            :header: The Lightning Work inner workings.
116
            :description: Learn more Lightning Work.
117
            :col_css: col-md-4
118
            :button_link: ../../core_api/lightning_work/index.html
119
            :height: 180
120
            :tag: Basic
121

122
        .. raw:: html
123

124
                </div>
125
            </div>
126
            <br />
127

128
        """
129
        from lightning.app.runners.backends.backend import Backend
130

131
        if run_once is not None:
132
            warnings.warn(
133
                "The `run_once` argument to LightningWork is deprecated in favor of `cache_calls` and will be removed"
134
                " in the next version. Use `cache_calls` instead."
135
            )
136
        self._cache_calls = run_once if run_once is not None else cache_calls
137
        self._state = {
138
            "_host",
139
            "_port",
140
            "_url",
141
            "_future_url",
142
            "_internal_ip",
143
            "_public_ip",
144
            "_restarting",
145
            "_cloud_compute",
146
            "_display_name",
147
        }
148
        self._parallel: bool = parallel
149
        self._host: str = host
150
        self._port: Optional[int] = port
151
        self._url: str = ""
152
        self._future_url: str = ""  # The cache URL is meant to defer resolving the url values.
153
        self._internal_ip: str = ""
154
        self._public_ip: str = ""
155
        # setattr_replacement is used by the multiprocessing runtime to send the latest changes to the main coordinator
156
        self._setattr_replacement: Optional[Callable[[str, Any], None]] = None
157
        self._name: str = ""
158
        self._display_name: str = ""
159
        # The ``self._calls`` is used to track whether the run
160
        # method with a given set of input arguments has already been called.
161
        # Example of its usage:
162
        # {
163
        #   'latest_call_hash': '167fe2e',
164
        #   '167fe2e': {
165
        #       'statuses': [
166
        #           {'stage': 'pending', 'timestamp': 1659433519.851271},
167
        #           {'stage': 'running', 'timestamp': 1659433519.956482},
168
        #           {'stage': 'stopped', 'timestamp': 1659433520.055768}]}
169
        #        ]
170
        #    },
171
        #    ...
172
        # }
173
        self._calls: dict = {CacheCallsKeys.LATEST_CALL_HASH: None}
174
        self._changes: dict = {}
175
        self._raise_exception = raise_exception
176
        self._paths: dict = {}
177
        self._request_queue: Optional[BaseQueue] = None
178
        self._response_queue: Optional[BaseQueue] = None
179
        self._restarting: bool = False
180
        self._start_with_flow = start_with_flow
181
        self._local_build_config = local_build_config or BuildConfig()
182
        self._cloud_build_config = cloud_build_config or BuildConfig()
183
        self._cloud_compute = cloud_compute or CloudCompute()
184
        # tuple instead of a list so that it cannot be modified without using the setter
185
        self._lightningignore: Tuple[str, ...] = ()
186
        self._backend: Optional[Backend] = None
187
        self._check_run_is_implemented()
188
        self._on_init_end()
189

190
    @property
191
    def url(self) -> str:
192
        """Returns the current url of the work."""
193
        return self._url
194

195
    @url.setter
196
    def url(self, url: str) -> None:
197
        self._url = url
198

199
    @property
200
    def host(self) -> str:
201
        """Returns the current host of the work."""
202
        return self._host
203

204
    @property
205
    def port(self) -> int:
206
        if self._port is None:
207
            self._port = find_free_network_port()
208
        return self._port
209

210
    @property
211
    def internal_ip(self) -> str:
212
        """The internal ip address of this LightningWork, reachable by other Work locally and in the cloud.
213

214
        By default, this attribute returns the empty string and the ip address will only be returned once the work runs.
215
        Locally, the address is 127.0.0.1 and in the cloud it will be determined by the cluster.
216

217
        """
218
        return self._internal_ip
219

220
    @property
221
    def public_ip(self) -> str:
222
        """The public ip address of this LightningWork, reachable from the internet.
223

224
        By default, this attribute returns the empty string and the ip address will only be returned once the work runs.
225
        Locally, this address is undefined (empty string) and in the cloud it will be determined by the cluster.
226

227
        """
228
        return self._public_ip
229

230
    def _on_init_end(self) -> None:
231
        self._local_build_config.on_work_init(self)
232
        self._cloud_build_config.on_work_init(self, self._cloud_compute)
233

234
    @staticmethod
235
    def _is_state_attribute(name: str) -> bool:
236
        """Every public attribute is part of the state by default and all protected (prefixed by '_') or private
237
        (prefixed by '__') attributes are not.
238

239
        Exceptions are listed in the `_INTERNAL_STATE_VARS` class variable.
240

241
        """
242
        return name in LightningWork._INTERNAL_STATE_VARS or not name.startswith("_")
243

244
    @property
245
    def name(self) -> str:
246
        """Returns the name of the LightningWork."""
247
        return self._name
248

249
    @property
250
    def display_name(self) -> str:
251
        """Returns the display name of the LightningWork in the cloud.
252

253
        The display name needs to set before the run method of the work is called.
254

255
        """
256
        return self._display_name
257

258
    @display_name.setter
259
    def display_name(self, display_name: str) -> None:
260
        """Sets the display name of the LightningWork in the cloud."""
261
        if not self.has_started:
262
            self._display_name = display_name
263
        elif self._display_name != display_name:
264
            raise RuntimeError("The display name can be set only before the work has started.")
265

266
    @property
267
    def cache_calls(self) -> bool:
268
        """Returns whether the ``run`` method should cache its input arguments and not run again when provided with the
269
        same arguments in subsequent calls."""
270
        return self._cache_calls
271

272
    @property
273
    def parallel(self) -> bool:
274
        """Whether to run in parallel mode or not.
275

276
        When parallel is False, the flow waits for the work to finish.
277

278
        """
279
        return self._parallel
280

281
    @property
282
    def local_build_config(self) -> BuildConfig:
283
        return self._local_build_config
284

285
    @local_build_config.setter
286
    def local_build_config(self, build_config: BuildConfig) -> None:
287
        self._local_build_config = build_config
288
        self._local_build_config.on_work_init(self)
289

290
    @property
291
    def cloud_build_config(self) -> BuildConfig:
292
        """Returns the cloud build config used to prepare the selected cloud hardware."""
293
        return self._cloud_build_config
294

295
    @cloud_build_config.setter
296
    def cloud_build_config(self, build_config: BuildConfig) -> None:
297
        self._cloud_build_config = build_config
298
        self._cloud_build_config.on_work_init(self, cloud_compute=self._cloud_compute)
299

300
    @property
301
    def cloud_compute(self) -> CloudCompute:
302
        return self._cloud_compute
303

304
    @cloud_compute.setter
305
    def cloud_compute(self, cloud_compute: CloudCompute) -> None:
306
        """Returns the cloud compute used to select the cloud hardware."""
307
        # A new ID
308
        current_id = self._cloud_compute.id
309
        new_id = cloud_compute.id
310
        if current_id != new_id:
311
            compute_store: _CloudComputeStore = _CLOUD_COMPUTE_STORE[current_id]
312
            compute_store.remove(self.name)
313
        self._cloud_compute = cloud_compute
314

315
    @property
316
    def lightningignore(self) -> Tuple[str, ...]:
317
        """Programmatic equivalent of the ``.lightningignore`` file."""
318
        return self._lightningignore
319

320
    @lightningignore.setter
321
    def lightningignore(self, lightningignore: Tuple[str, ...]) -> None:
322
        if self._backend is not None:
323
            raise RuntimeError(
324
                f"Your app has been already dispatched, so modifying the `{self.name}.lightningignore` does not have an"
325
                " effect"
326
            )
327
        self._lightningignore = lightningignore
328

329
    @property
330
    def status(self) -> WorkStatus:
331
        """Return the current status of the work.
332

333
        All statuses are stored in the state.
334

335
        """
336
        call_hash = self._calls[CacheCallsKeys.LATEST_CALL_HASH]
337
        if call_hash in self._calls:
338
            statuses = self._calls[call_hash]["statuses"]
339
            # deltas aren't necessarily coming in the expected order.
340
            statuses = sorted(statuses, key=lambda x: x["timestamp"])
341
            latest_status = statuses[-1]
342
            if latest_status.get("reason") == WorkFailureReasons.TIMEOUT:
343
                return self._aggregate_status_timeout(statuses)
344
            return WorkStatus(**latest_status)
345
        return WorkStatus(stage=WorkStageStatus.NOT_STARTED, timestamp=time.time())
346

347
    @property
348
    def statuses(self) -> List[WorkStatus]:
349
        """Return all the status of the work."""
350
        call_hash = self._calls[CacheCallsKeys.LATEST_CALL_HASH]
351
        if call_hash in self._calls:
352
            statuses = self._calls[call_hash]["statuses"]
353
            # deltas aren't necessarily coming in the expected order.
354
            statuses = sorted(statuses, key=lambda x: x["timestamp"])
355
            return [WorkStatus(**status) for status in statuses]
356
        return []
357

358
    @property
359
    def has_started(self) -> bool:
360
        """Return whether the work has started."""
361
        return self.status.stage != WorkStageStatus.NOT_STARTED
362

363
    @property
364
    def has_stopped(self) -> bool:
365
        """Return whether the work has stopped."""
366
        return self.status.stage == WorkStageStatus.STOPPED
367

368
    @property
369
    def has_succeeded(self) -> bool:
370
        """Return whether the work has succeeded."""
371
        return self.status.stage == WorkStageStatus.SUCCEEDED
372

373
    @property
374
    def has_failed(self) -> bool:
375
        """Return whether the work has failed."""
376
        return self.status.stage == WorkStageStatus.FAILED
377

378
    @property
379
    def has_timeout(self) -> bool:
380
        """Return whether the work has time-out."""
381
        return self.has_failed and self.status.reason == WorkFailureReasons.TIMEOUT
382

383
    @property
384
    def is_running(self) -> bool:
385
        """Return whether the work is running."""
386
        return self.status.stage == WorkStageStatus.RUNNING
387

388
    @property
389
    def is_pending(self) -> bool:
390
        """Return whether the work is pending."""
391
        return self.status.stage == WorkStageStatus.PENDING
392

393
    @property
394
    def num_timeouts(self) -> int:
395
        """Return the number of timeout status since the lastest succeeded run."""
396
        status = self.status
397
        if status.reason == WorkFailureReasons.TIMEOUT:
398
            return status.count
399
        return 0
400

401
    @property
402
    def num_successes(self) -> int:
403
        """Returns the number of successful runs."""
404
        # FIXME: Resolve this within  single process runtime.
405
        run_keys = [key for key in self._calls if key.startswith("run:")]
406
        if not run_keys:
407
            return 0
408

409
        has_succeeded_counter = 0
410
        for run_key in run_keys:
411
            c = len([s for s in self._calls[run_key]["statuses"] if s["stage"] == WorkStageStatus.SUCCEEDED])
412
            has_succeeded_counter += c
413

414
        return has_succeeded_counter
415

416
    def _get_property_if_exists(self, name: str) -> Union[property, None]:
417
        attr = getattr(self.__class__, name, None)
418
        return attr if isinstance(attr, property) else None
419

420
    def __setattr__(self, name: str, value: Any) -> None:
421
        property_object = self._get_property_if_exists(name)
422
        if property_object is not None and property_object.fset is not None:
423
            property_object.fset(self, value)
424
        else:
425
            setattr_fn = getattr(self, "_setattr_replacement", None) or self._default_setattr
426
            setattr_fn(name, value)
427

428
    def _default_setattr(self, name: str, value: Any) -> None:
429
        from lightning.app.core.flow import LightningFlow
430

431
        # Allow the run method to be patched with ProxyWorkRun (done by certain Runtime implementations).
432
        allowed_to_set_run = name == "run" and (
433
            isinstance(value, ProxyWorkRun)
434
            or (unwrap(value) == unwrap(self.run))
435
            or (isinstance(value, partial) and value.func.__name__ == "_dynamic_run_wrapper")
436
        )
437

438
        is_proxy_setattr = isinstance(value, LightningWorkSetAttrProxy)
439
        is_init_context = _is_init_context(self)
440

441
        if (
442
            not is_init_context
443
            and name not in self._state
444
            and name not in self._paths
445
            and self._is_state_attribute(name)
446
            and not allowed_to_set_run
447
        ):
448
            raise AttributeError(f"Cannot set attributes that were not defined in __init__: {name}.")
449

450
        if isinstance(value, str) and value.startswith("lit://"):
451
            value = Path(value)
452

453
        if self._is_state_attribute(name):
454
            if isinstance(value, (LightningFlow, LightningWork)):
455
                raise LightningWorkException(
456
                    "A ``LightningWork`` isn't allowed to take any children "
457
                    f"such as ``LightningWork`` or ``LightningFlow``. Found {value}."
458
                )
459

460
            if isinstance(value, Path):
461
                value._attach_work(work=self)
462
                value._attach_queues(self._request_queue, self._response_queue)  # type: ignore[arg-type]
463
                value._name = name
464
                # In the init context, the full name of the Flow and Work is not known, i.e., we can't serialize
465
                # the path without losing the information of origin and consumer. Hence, we delay the serialization
466
                # of the path object until the app is instantiated.
467
                if not is_init_context:
468
                    self._paths[name] = value.to_dict()
469
                self._state.add(name)
470

471
            elif isinstance(value, Payload):
472
                if is_init_context:
473
                    raise AttributeError("The Payload object should be set only within the run method of the work.")
474
                value._attach_work(work=self)
475
                value._name = name
476
                self._state.add(name)
477

478
            elif isinstance(value, Drive):
479
                value = deepcopy(value)
480
                value.component_name = self.name
481
                self._state.add(name)
482

483
            elif allowed_to_set_run or is_proxy_setattr:
484
                # enable overriding the run method (dispatcher)
485
                pass
486

487
            elif _is_json_serializable(value):
488
                self._state.add(name)
489

490
            else:
491
                raise AttributeError(
492
                    f"Only JSON-serializable attributes are currently supported"
493
                    f" (str, int, float, bool, tuple, list, dict etc.) to be part of {self} state. "
494
                    f"Found the attribute {name} with {value} instead. \n"
495
                    "HINT: Private attributes defined as follows `self._x = y` won't be shared between components "
496
                    "and therefore don't need to be JSON-serializable. If you need to include non-JSON serializable "
497
                    "objects in the state, you can use the `lightning.app.storage.Payload` API."
498
                )
499

500
        super().__setattr__(name, value)
501

502
    def __getattribute__(self, name: str) -> Any:
503
        try:
504
            attr = object.__getattribute__(self, name)
505
        except AttributeError as ex:
506
            if str(ex).endswith("'_state'"):
507
                raise AttributeError(f"Did you forget to call super().__init__() in {self}")
508
            raise ex
509

510
        if isinstance(attr, ProxyWorkRun):
511
            return attr
512

513
        if callable(attr) and getattr(attr, "__name__", "") == "run" and getattr(self, "_cache_calls", False):
514
            # disable while building the class.
515
            return self._wrap_run_for_caching(attr)
516
        return attr
517

518
    def __getattr__(self, item: str) -> Any:
519
        if item in self.__dict__.get("_paths", {}) and not _is_init_context(self):
520
            path = Path.from_dict(self._paths[item])
521
            path._attach_work(work=self)
522
            path._attach_queues(self._request_queue, self._response_queue)  # type: ignore[arg-type]
523
            return path
524
        return self.__getattribute__(item)
525

526
    def _call_hash(self, fn: Callable, args: Any, kwargs: Any) -> str:
527
        hash_args = args[1:] if len(args) > 0 and args[0] == self else args
528
        call_obj = {"args": hash_args, "kwargs": kwargs}
529
        # Note: Generate a hash as 167fe2e.
530
        # Seven was selected after checking upon Github default SHA length
531
        # and to minimize hidden state size.
532
        return str(DeepHash(call_obj)[call_obj])[:7]
533

534
    def _wrap_run_for_caching(self, fn: Callable) -> Callable:
535
        @wraps(fn)
536
        def new_fn(*args: Any, **kwargs: Any) -> Any:
537
            call_hash = self._call_hash(fn, args, kwargs)
538

539
            entered = call_hash in self._calls
540
            returned = entered and "ret" in self._calls[call_hash]
541

542
            if returned:
543
                entry = self._calls[call_hash]
544
                return entry["ret"]
545

546
            self._calls[call_hash] = {}
547

548
            result = fn(*args, **kwargs)
549

550
            self._calls[call_hash] = {"ret": result}
551

552
            return result
553

554
        return new_fn
555

556
    @property
557
    def changes(self) -> dict:
558
        return self._changes.copy()
559

560
    @property
561
    def state(self) -> dict:
562
        """Returns the current state of this LightningWork."""
563
        return {
564
            "vars": _sanitize_state({el: getattr(self, el) for el in self._state}),
565
            # this may have the challenge that ret cannot be pickled, we'll need to handle this
566
            "calls": self._calls.copy(),
567
            "changes": {},
568
        }
569

570
    @property
571
    def state_vars(self) -> dict:
572
        return {"vars": _sanitize_state({el: getattr(self, el) for el in self._state})}
573

574
    @property
575
    def state_with_changes(self) -> dict:
576
        return {
577
            "vars": _sanitize_state({el: getattr(self, el) for el in self._state}),
578
            # this may have the challenge that ret cannot be pickled, we'll need to handle this
579
            "calls": self._calls.copy(),
580
            "changes": self.changes,
581
        }
582

583
    def set_state(self, provided_state: dict) -> None:
584
        for k, v in provided_state["vars"].items():
585
            if isinstance(v, Dict):
586
                v = _maybe_create_drive(self.name, v)
587
            if isinstance(v, Dict):
588
                v = _maybe_create_cloud_compute(v)
589
            setattr(self, k, v)
590

591
        self._changes = provided_state["changes"]
592

593
        # Note, this is handled by the flow only.
594
        if _is_flow_context():
595
            self._cleanup_calls(provided_state["calls"])
596

597
        self._calls = provided_state["calls"]
598

599
    @staticmethod
600
    def _cleanup_calls(calls: Dict[str, Any]) -> None:
601
        # 1: Collect all the in_progress call hashes
602
        in_progress_call_hash = [k for k in list(calls) if k not in (CacheCallsKeys.LATEST_CALL_HASH)]
603

604
        for call_hash in in_progress_call_hash:
605
            if "statuses" not in calls[call_hash]:
606
                continue
607

608
            # 2: Filter the statuses by timestamp
609
            statuses = sorted(calls[call_hash]["statuses"], key=lambda x: x["timestamp"])
610

611
            # If the latest status is succeeded, then drop everything before.
612
            if statuses[-1]["stage"] == WorkStageStatus.SUCCEEDED:
613
                status = statuses[-1]
614
                status["timestamp"] = int(status["timestamp"])
615
                calls[call_hash]["statuses"] = [status]
616
            else:
617
                # TODO: Some status are being duplicated,
618
                # this seems related to the StateObserver.
619
                final_statuses = []
620
                for status in statuses:
621
                    if status not in final_statuses:
622
                        final_statuses.append(status)
623
                calls[call_hash]["statuses"] = final_statuses
624

625
    def start(self) -> None:
626
        """Starts LightingWork component via CloudCompute."""
627
        if self.status.stage == WorkStageStatus.STOPPED:
628
            raise Exception("A work can be started only once for now.")
629

630
        # This enables to start the run method with a phony input and exit.
631
        self.run(Action(method="start"))
632

633
    def run(self, *args: Any, **kwargs: Any) -> None:
634
        """Override to add your own logic.
635

636
        Raises:
637
            LightningPlatformException: If resource exceeds platform quotas or other constraints.
638

639
        """
640

641
    def on_exception(self, exception: BaseException) -> None:
642
        """Override to customize how to handle exception in the run method."""
643
        if self._raise_exception:
644
            raise exception
645

646
    def _aggregate_status_timeout(self, statuses: List[Dict]) -> WorkStatus:
647
        """Method used to return the first request and the total count of timeout after the latest succeeded status."""
648
        succeeded_statuses = [
649
            status_idx for status_idx, status in enumerate(statuses) if status["stage"] == WorkStageStatus.SUCCEEDED
650
        ]
651
        if succeeded_statuses:
652
            succeed_status_id = succeeded_statuses[-1] + 1
653
            statuses = statuses[succeed_status_id:]
654
        timeout_statuses = [status for status in statuses if status.get("reason") == WorkFailureReasons.TIMEOUT]
655
        assert statuses[0]["stage"] == WorkStageStatus.PENDING
656
        status = {**timeout_statuses[-1], "timestamp": statuses[0]["timestamp"]}
657
        return WorkStatus(**status, count=len(timeout_statuses))
658

659
    def on_exit(self) -> None:
660
        """Override this hook to add your logic when the work is exiting.
661

662
        Note: This hook is not guaranteed to be called when running in the cloud.
663

664
        """
665
        pass
666

667
    def stop(self) -> None:
668
        """Stops LightingWork component and shuts down hardware provisioned via CloudCompute.
669

670
        This can only be called from a ``LightningFlow``.
671

672
        """
673
        if not self._backend:
674
            raise RuntimeError(f"Only the `LightningFlow` can request this work ({self.name!r}) to stop.")
675
        if self.status.stage == WorkStageStatus.STOPPED:
676
            return
677
        latest_hash = self._calls[CacheCallsKeys.LATEST_CALL_HASH]
678
        stop_status = make_status(WorkStageStatus.STOPPED, reason=WorkStopReasons.PENDING)
679
        self._calls[latest_hash]["statuses"].append(stop_status)
680
        app = _LightningAppRef().get_current()
681
        self._backend.stop_work(app, self)  # type: ignore[arg-type]
682

683
    def delete(self) -> None:
684
        """Delete LightingWork component and shuts down hardware provisioned via CloudCompute.
685

686
        Locally, the work.delete() behaves as work.stop().
687

688
        """
689
        if not self._backend:
690
            raise Exception(
691
                "Can't delete the work, it looks like it isn't attached to a LightningFlow. "
692
                "Make sure to assign the Work to a flow instance."
693
            )
694
        app = _LightningAppRef().get_current()
695
        self._backend.delete_work(app, self)
696

697
    def _check_run_is_implemented(self) -> None:
698
        if not is_overridden("run", instance=self, parent=LightningWork):
699
            raise TypeError(
700
                f"The work `{self.__class__.__name__}` is missing the `run()` method. This is required. Implement it"
701
                " first and then call it in your Flow."
702
            )
703

704
    def _register_cloud_compute(self) -> None:
705
        internal_id = self.cloud_compute.id
706
        assert internal_id
707
        if internal_id not in _CLOUD_COMPUTE_STORE:
708
            _CLOUD_COMPUTE_STORE[internal_id] = _CloudComputeStore(id=internal_id, component_names=[])
709
        _CLOUD_COMPUTE_STORE[internal_id].add_component_name(self.name)
710

711
    def apply_flow_delta(self, delta: Delta) -> None:
712
        """Override to customize how the flow should update the work state."""
713
        # TODO: Add support for thread safe locking over JSON Serializable objects.
714
        if any(k not in ["values_changed", "type_changed"] for k in delta.to_dict()):
715
            raise Exception(
716
                "A forbidden operation to update the work from the flow was detected."
717
                f" Found {delta.to_dict()}, only `values_changed` and `type_changes` are currently allowed."
718
            )
719

720
        vars = self.state["vars"] + delta
721
        for name, value in vars.items():
722
            property_object = self._get_property_if_exists(name)
723
            if property_object is not None and property_object.fset is not None:
724
                property_object.fset(self, value)
725
            else:
726
                self._default_setattr(name, value)
727

728
    def configure_layout(self) -> Union[None, str, "Frontend"]:
729
        """Configure the UI of this LightningWork.
730

731
        You can either
732

733
        1.  Return a single :class:`~lightning.app.frontend.frontend.Frontend` object to serve a user interface
734
            for this Work.
735
        2.  Return a string containing a URL to act as the user interface for this Work.
736
        3.  Return ``None`` to indicate that this Work doesn't currently have a user interface.
737

738
        **Example:** Serve a static directory (with at least a file index.html inside).
739

740
        .. code-block:: python
741

742
            from lightning.app.frontend import StaticWebFrontend
743

744

745
            class Work(LightningWork):
746
                def configure_layout(self):
747
                    return StaticWebFrontend("path/to/folder/to/serve")
748

749
        **Example:** Arrange the UI of my children in tabs (default UI by Lightning).
750

751
        .. code-block:: python
752

753
            class Work(LightningWork):
754
                def configure_layout(self):
755
                    return [
756
                        dict(name="First Tab", content=self.child0),
757
                        dict(name="Second Tab", content=self.child1),
758
                        dict(name="Lightning", content="https://lightning.ai"),
759
                    ]
760

761
        If you don't implement ``configure_layout``, Lightning will use ``self.url``.
762

763
        Note:
764
            This hook gets called at the time of app creation and then again as part of the loop. If desired, a
765
            returned URL can depend on the state. This is not the case if the work returns a
766
            :class:`~lightning.app.frontend.frontend.Frontend`. These need to be provided at the time of app creation
767
            in order for the runtime to start the server.
768

769
        """
770

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.