pytorch-lightning
769 строк · 30.5 Кб
1# Copyright The Lightning AI team.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import sys
16import time
17import warnings
18from copy import deepcopy
19from functools import partial, wraps
20from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Type, Union
21
22from deepdiff import DeepHash, Delta
23
24from lightning.app.core.queues import BaseQueue
25from lightning.app.storage.drive import Drive, _maybe_create_drive
26from lightning.app.storage.path import Path
27from lightning.app.storage.payload import Payload
28from lightning.app.utilities.app_helpers import _is_json_serializable, _LightningAppRef, is_overridden
29from lightning.app.utilities.app_status import WorkStatus
30from lightning.app.utilities.component import _is_flow_context, _sanitize_state
31from lightning.app.utilities.enum import (
32CacheCallsKeys,
33WorkFailureReasons,
34WorkStageStatus,
35WorkStopReasons,
36make_status,
37)
38from lightning.app.utilities.exceptions import LightningWorkException
39from lightning.app.utilities.introspection import _is_init_context
40from lightning.app.utilities.network import find_free_network_port
41from lightning.app.utilities.packaging.build_config import BuildConfig
42from lightning.app.utilities.packaging.cloud_compute import (
43_CLOUD_COMPUTE_STORE,
44CloudCompute,
45_CloudComputeStore,
46_maybe_create_cloud_compute,
47)
48from lightning.app.utilities.proxies import Action, LightningWorkSetAttrProxy, ProxyWorkRun, WorkRunExecutor, unwrap
49
50if TYPE_CHECKING:
51from lightning.app.frontend import Frontend
52
53
54class LightningWork:
55_INTERNAL_STATE_VARS = (
56# Internal protected variables that are still part of the state (even though they are prefixed with "_")
57"_paths",
58"_host",
59"_port",
60"_url",
61"_restarting",
62"_internal_ip",
63"_public_ip",
64)
65
66_run_executor_cls: Type[WorkRunExecutor] = WorkRunExecutor
67# TODO: Move to spawn for all Operating System.
68_start_method = "spawn" if sys.platform in ("darwin", "win32") else "fork"
69
70def __init__(
71self,
72parallel: bool = False,
73cache_calls: bool = True,
74raise_exception: bool = True,
75host: str = "127.0.0.1",
76port: Optional[int] = None,
77local_build_config: Optional[BuildConfig] = None,
78cloud_build_config: Optional[BuildConfig] = None,
79cloud_compute: Optional[CloudCompute] = None,
80run_once: Optional[bool] = None, # TODO: Remove run_once
81start_with_flow: bool = True,
82):
83"""LightningWork, or Work in short, is a building block for long-running jobs.
84
85The LightningApp runs its :class:`~lightning.app.core.flow.LightningFlow` component
86within an infinite loop and track the ``LightningWork`` status update.
87
88Use LightningWork for third-party services or for launching heavy jobs such as
89downloading data, training or serving a model.
90
91Each LightningWork is running in its own independent process. Works are self-isolated from the rest,
92e.g any state changes happening within the work will be reflected within the flow but not the other way around.
93
94Arguments:
95parallel: Whether to run in parallel mode or not. When False, the flow waits for the work to finish.
96cache_calls: Whether the ``run`` method should cache its input arguments and not run again when provided
97with the same arguments in subsequent calls.
98raise_exception: Whether to re-raise an exception in the flow when raised from within the work run method.
99host: Bind socket to this host
100port: Bind socket to this port. Be default, this is None and should be called within your run method.
101local_build_config: The local BuildConfig isn't used until Lightning supports DockerRuntime.
102cloud_build_config: The cloud BuildConfig enables user to easily configure machine before running this work.
103run_once: Deprecated in favor of cache_calls. This will be removed soon.
104start_with_flow: Whether the work should be started at the same time as the root flow. Only applies to works
105defined in ``__init__``.
106
107**Learn More About Lightning Work Inner Workings**
108
109.. raw:: html
110
111<div class="display-card-container">
112<div class="row">
113
114.. displayitem::
115:header: The Lightning Work inner workings.
116:description: Learn more Lightning Work.
117:col_css: col-md-4
118:button_link: ../../core_api/lightning_work/index.html
119:height: 180
120:tag: Basic
121
122.. raw:: html
123
124</div>
125</div>
126<br />
127
128"""
129from lightning.app.runners.backends.backend import Backend
130
131if run_once is not None:
132warnings.warn(
133"The `run_once` argument to LightningWork is deprecated in favor of `cache_calls` and will be removed"
134" in the next version. Use `cache_calls` instead."
135)
136self._cache_calls = run_once if run_once is not None else cache_calls
137self._state = {
138"_host",
139"_port",
140"_url",
141"_future_url",
142"_internal_ip",
143"_public_ip",
144"_restarting",
145"_cloud_compute",
146"_display_name",
147}
148self._parallel: bool = parallel
149self._host: str = host
150self._port: Optional[int] = port
151self._url: str = ""
152self._future_url: str = "" # The cache URL is meant to defer resolving the url values.
153self._internal_ip: str = ""
154self._public_ip: str = ""
155# setattr_replacement is used by the multiprocessing runtime to send the latest changes to the main coordinator
156self._setattr_replacement: Optional[Callable[[str, Any], None]] = None
157self._name: str = ""
158self._display_name: str = ""
159# The ``self._calls`` is used to track whether the run
160# method with a given set of input arguments has already been called.
161# Example of its usage:
162# {
163# 'latest_call_hash': '167fe2e',
164# '167fe2e': {
165# 'statuses': [
166# {'stage': 'pending', 'timestamp': 1659433519.851271},
167# {'stage': 'running', 'timestamp': 1659433519.956482},
168# {'stage': 'stopped', 'timestamp': 1659433520.055768}]}
169# ]
170# },
171# ...
172# }
173self._calls: dict = {CacheCallsKeys.LATEST_CALL_HASH: None}
174self._changes: dict = {}
175self._raise_exception = raise_exception
176self._paths: dict = {}
177self._request_queue: Optional[BaseQueue] = None
178self._response_queue: Optional[BaseQueue] = None
179self._restarting: bool = False
180self._start_with_flow = start_with_flow
181self._local_build_config = local_build_config or BuildConfig()
182self._cloud_build_config = cloud_build_config or BuildConfig()
183self._cloud_compute = cloud_compute or CloudCompute()
184# tuple instead of a list so that it cannot be modified without using the setter
185self._lightningignore: Tuple[str, ...] = ()
186self._backend: Optional[Backend] = None
187self._check_run_is_implemented()
188self._on_init_end()
189
190@property
191def url(self) -> str:
192"""Returns the current url of the work."""
193return self._url
194
195@url.setter
196def url(self, url: str) -> None:
197self._url = url
198
199@property
200def host(self) -> str:
201"""Returns the current host of the work."""
202return self._host
203
204@property
205def port(self) -> int:
206if self._port is None:
207self._port = find_free_network_port()
208return self._port
209
210@property
211def internal_ip(self) -> str:
212"""The internal ip address of this LightningWork, reachable by other Work locally and in the cloud.
213
214By default, this attribute returns the empty string and the ip address will only be returned once the work runs.
215Locally, the address is 127.0.0.1 and in the cloud it will be determined by the cluster.
216
217"""
218return self._internal_ip
219
220@property
221def public_ip(self) -> str:
222"""The public ip address of this LightningWork, reachable from the internet.
223
224By default, this attribute returns the empty string and the ip address will only be returned once the work runs.
225Locally, this address is undefined (empty string) and in the cloud it will be determined by the cluster.
226
227"""
228return self._public_ip
229
230def _on_init_end(self) -> None:
231self._local_build_config.on_work_init(self)
232self._cloud_build_config.on_work_init(self, self._cloud_compute)
233
234@staticmethod
235def _is_state_attribute(name: str) -> bool:
236"""Every public attribute is part of the state by default and all protected (prefixed by '_') or private
237(prefixed by '__') attributes are not.
238
239Exceptions are listed in the `_INTERNAL_STATE_VARS` class variable.
240
241"""
242return name in LightningWork._INTERNAL_STATE_VARS or not name.startswith("_")
243
244@property
245def name(self) -> str:
246"""Returns the name of the LightningWork."""
247return self._name
248
249@property
250def display_name(self) -> str:
251"""Returns the display name of the LightningWork in the cloud.
252
253The display name needs to set before the run method of the work is called.
254
255"""
256return self._display_name
257
258@display_name.setter
259def display_name(self, display_name: str) -> None:
260"""Sets the display name of the LightningWork in the cloud."""
261if not self.has_started:
262self._display_name = display_name
263elif self._display_name != display_name:
264raise RuntimeError("The display name can be set only before the work has started.")
265
266@property
267def cache_calls(self) -> bool:
268"""Returns whether the ``run`` method should cache its input arguments and not run again when provided with the
269same arguments in subsequent calls."""
270return self._cache_calls
271
272@property
273def parallel(self) -> bool:
274"""Whether to run in parallel mode or not.
275
276When parallel is False, the flow waits for the work to finish.
277
278"""
279return self._parallel
280
281@property
282def local_build_config(self) -> BuildConfig:
283return self._local_build_config
284
285@local_build_config.setter
286def local_build_config(self, build_config: BuildConfig) -> None:
287self._local_build_config = build_config
288self._local_build_config.on_work_init(self)
289
290@property
291def cloud_build_config(self) -> BuildConfig:
292"""Returns the cloud build config used to prepare the selected cloud hardware."""
293return self._cloud_build_config
294
295@cloud_build_config.setter
296def cloud_build_config(self, build_config: BuildConfig) -> None:
297self._cloud_build_config = build_config
298self._cloud_build_config.on_work_init(self, cloud_compute=self._cloud_compute)
299
300@property
301def cloud_compute(self) -> CloudCompute:
302return self._cloud_compute
303
304@cloud_compute.setter
305def cloud_compute(self, cloud_compute: CloudCompute) -> None:
306"""Returns the cloud compute used to select the cloud hardware."""
307# A new ID
308current_id = self._cloud_compute.id
309new_id = cloud_compute.id
310if current_id != new_id:
311compute_store: _CloudComputeStore = _CLOUD_COMPUTE_STORE[current_id]
312compute_store.remove(self.name)
313self._cloud_compute = cloud_compute
314
315@property
316def lightningignore(self) -> Tuple[str, ...]:
317"""Programmatic equivalent of the ``.lightningignore`` file."""
318return self._lightningignore
319
320@lightningignore.setter
321def lightningignore(self, lightningignore: Tuple[str, ...]) -> None:
322if self._backend is not None:
323raise RuntimeError(
324f"Your app has been already dispatched, so modifying the `{self.name}.lightningignore` does not have an"
325" effect"
326)
327self._lightningignore = lightningignore
328
329@property
330def status(self) -> WorkStatus:
331"""Return the current status of the work.
332
333All statuses are stored in the state.
334
335"""
336call_hash = self._calls[CacheCallsKeys.LATEST_CALL_HASH]
337if call_hash in self._calls:
338statuses = self._calls[call_hash]["statuses"]
339# deltas aren't necessarily coming in the expected order.
340statuses = sorted(statuses, key=lambda x: x["timestamp"])
341latest_status = statuses[-1]
342if latest_status.get("reason") == WorkFailureReasons.TIMEOUT:
343return self._aggregate_status_timeout(statuses)
344return WorkStatus(**latest_status)
345return WorkStatus(stage=WorkStageStatus.NOT_STARTED, timestamp=time.time())
346
347@property
348def statuses(self) -> List[WorkStatus]:
349"""Return all the status of the work."""
350call_hash = self._calls[CacheCallsKeys.LATEST_CALL_HASH]
351if call_hash in self._calls:
352statuses = self._calls[call_hash]["statuses"]
353# deltas aren't necessarily coming in the expected order.
354statuses = sorted(statuses, key=lambda x: x["timestamp"])
355return [WorkStatus(**status) for status in statuses]
356return []
357
358@property
359def has_started(self) -> bool:
360"""Return whether the work has started."""
361return self.status.stage != WorkStageStatus.NOT_STARTED
362
363@property
364def has_stopped(self) -> bool:
365"""Return whether the work has stopped."""
366return self.status.stage == WorkStageStatus.STOPPED
367
368@property
369def has_succeeded(self) -> bool:
370"""Return whether the work has succeeded."""
371return self.status.stage == WorkStageStatus.SUCCEEDED
372
373@property
374def has_failed(self) -> bool:
375"""Return whether the work has failed."""
376return self.status.stage == WorkStageStatus.FAILED
377
378@property
379def has_timeout(self) -> bool:
380"""Return whether the work has time-out."""
381return self.has_failed and self.status.reason == WorkFailureReasons.TIMEOUT
382
383@property
384def is_running(self) -> bool:
385"""Return whether the work is running."""
386return self.status.stage == WorkStageStatus.RUNNING
387
388@property
389def is_pending(self) -> bool:
390"""Return whether the work is pending."""
391return self.status.stage == WorkStageStatus.PENDING
392
393@property
394def num_timeouts(self) -> int:
395"""Return the number of timeout status since the lastest succeeded run."""
396status = self.status
397if status.reason == WorkFailureReasons.TIMEOUT:
398return status.count
399return 0
400
401@property
402def num_successes(self) -> int:
403"""Returns the number of successful runs."""
404# FIXME: Resolve this within single process runtime.
405run_keys = [key for key in self._calls if key.startswith("run:")]
406if not run_keys:
407return 0
408
409has_succeeded_counter = 0
410for run_key in run_keys:
411c = len([s for s in self._calls[run_key]["statuses"] if s["stage"] == WorkStageStatus.SUCCEEDED])
412has_succeeded_counter += c
413
414return has_succeeded_counter
415
416def _get_property_if_exists(self, name: str) -> Union[property, None]:
417attr = getattr(self.__class__, name, None)
418return attr if isinstance(attr, property) else None
419
420def __setattr__(self, name: str, value: Any) -> None:
421property_object = self._get_property_if_exists(name)
422if property_object is not None and property_object.fset is not None:
423property_object.fset(self, value)
424else:
425setattr_fn = getattr(self, "_setattr_replacement", None) or self._default_setattr
426setattr_fn(name, value)
427
428def _default_setattr(self, name: str, value: Any) -> None:
429from lightning.app.core.flow import LightningFlow
430
431# Allow the run method to be patched with ProxyWorkRun (done by certain Runtime implementations).
432allowed_to_set_run = name == "run" and (
433isinstance(value, ProxyWorkRun)
434or (unwrap(value) == unwrap(self.run))
435or (isinstance(value, partial) and value.func.__name__ == "_dynamic_run_wrapper")
436)
437
438is_proxy_setattr = isinstance(value, LightningWorkSetAttrProxy)
439is_init_context = _is_init_context(self)
440
441if (
442not is_init_context
443and name not in self._state
444and name not in self._paths
445and self._is_state_attribute(name)
446and not allowed_to_set_run
447):
448raise AttributeError(f"Cannot set attributes that were not defined in __init__: {name}.")
449
450if isinstance(value, str) and value.startswith("lit://"):
451value = Path(value)
452
453if self._is_state_attribute(name):
454if isinstance(value, (LightningFlow, LightningWork)):
455raise LightningWorkException(
456"A ``LightningWork`` isn't allowed to take any children "
457f"such as ``LightningWork`` or ``LightningFlow``. Found {value}."
458)
459
460if isinstance(value, Path):
461value._attach_work(work=self)
462value._attach_queues(self._request_queue, self._response_queue) # type: ignore[arg-type]
463value._name = name
464# In the init context, the full name of the Flow and Work is not known, i.e., we can't serialize
465# the path without losing the information of origin and consumer. Hence, we delay the serialization
466# of the path object until the app is instantiated.
467if not is_init_context:
468self._paths[name] = value.to_dict()
469self._state.add(name)
470
471elif isinstance(value, Payload):
472if is_init_context:
473raise AttributeError("The Payload object should be set only within the run method of the work.")
474value._attach_work(work=self)
475value._name = name
476self._state.add(name)
477
478elif isinstance(value, Drive):
479value = deepcopy(value)
480value.component_name = self.name
481self._state.add(name)
482
483elif allowed_to_set_run or is_proxy_setattr:
484# enable overriding the run method (dispatcher)
485pass
486
487elif _is_json_serializable(value):
488self._state.add(name)
489
490else:
491raise AttributeError(
492f"Only JSON-serializable attributes are currently supported"
493f" (str, int, float, bool, tuple, list, dict etc.) to be part of {self} state. "
494f"Found the attribute {name} with {value} instead. \n"
495"HINT: Private attributes defined as follows `self._x = y` won't be shared between components "
496"and therefore don't need to be JSON-serializable. If you need to include non-JSON serializable "
497"objects in the state, you can use the `lightning.app.storage.Payload` API."
498)
499
500super().__setattr__(name, value)
501
502def __getattribute__(self, name: str) -> Any:
503try:
504attr = object.__getattribute__(self, name)
505except AttributeError as ex:
506if str(ex).endswith("'_state'"):
507raise AttributeError(f"Did you forget to call super().__init__() in {self}")
508raise ex
509
510if isinstance(attr, ProxyWorkRun):
511return attr
512
513if callable(attr) and getattr(attr, "__name__", "") == "run" and getattr(self, "_cache_calls", False):
514# disable while building the class.
515return self._wrap_run_for_caching(attr)
516return attr
517
518def __getattr__(self, item: str) -> Any:
519if item in self.__dict__.get("_paths", {}) and not _is_init_context(self):
520path = Path.from_dict(self._paths[item])
521path._attach_work(work=self)
522path._attach_queues(self._request_queue, self._response_queue) # type: ignore[arg-type]
523return path
524return self.__getattribute__(item)
525
526def _call_hash(self, fn: Callable, args: Any, kwargs: Any) -> str:
527hash_args = args[1:] if len(args) > 0 and args[0] == self else args
528call_obj = {"args": hash_args, "kwargs": kwargs}
529# Note: Generate a hash as 167fe2e.
530# Seven was selected after checking upon Github default SHA length
531# and to minimize hidden state size.
532return str(DeepHash(call_obj)[call_obj])[:7]
533
534def _wrap_run_for_caching(self, fn: Callable) -> Callable:
535@wraps(fn)
536def new_fn(*args: Any, **kwargs: Any) -> Any:
537call_hash = self._call_hash(fn, args, kwargs)
538
539entered = call_hash in self._calls
540returned = entered and "ret" in self._calls[call_hash]
541
542if returned:
543entry = self._calls[call_hash]
544return entry["ret"]
545
546self._calls[call_hash] = {}
547
548result = fn(*args, **kwargs)
549
550self._calls[call_hash] = {"ret": result}
551
552return result
553
554return new_fn
555
556@property
557def changes(self) -> dict:
558return self._changes.copy()
559
560@property
561def state(self) -> dict:
562"""Returns the current state of this LightningWork."""
563return {
564"vars": _sanitize_state({el: getattr(self, el) for el in self._state}),
565# this may have the challenge that ret cannot be pickled, we'll need to handle this
566"calls": self._calls.copy(),
567"changes": {},
568}
569
570@property
571def state_vars(self) -> dict:
572return {"vars": _sanitize_state({el: getattr(self, el) for el in self._state})}
573
574@property
575def state_with_changes(self) -> dict:
576return {
577"vars": _sanitize_state({el: getattr(self, el) for el in self._state}),
578# this may have the challenge that ret cannot be pickled, we'll need to handle this
579"calls": self._calls.copy(),
580"changes": self.changes,
581}
582
583def set_state(self, provided_state: dict) -> None:
584for k, v in provided_state["vars"].items():
585if isinstance(v, Dict):
586v = _maybe_create_drive(self.name, v)
587if isinstance(v, Dict):
588v = _maybe_create_cloud_compute(v)
589setattr(self, k, v)
590
591self._changes = provided_state["changes"]
592
593# Note, this is handled by the flow only.
594if _is_flow_context():
595self._cleanup_calls(provided_state["calls"])
596
597self._calls = provided_state["calls"]
598
599@staticmethod
600def _cleanup_calls(calls: Dict[str, Any]) -> None:
601# 1: Collect all the in_progress call hashes
602in_progress_call_hash = [k for k in list(calls) if k not in (CacheCallsKeys.LATEST_CALL_HASH)]
603
604for call_hash in in_progress_call_hash:
605if "statuses" not in calls[call_hash]:
606continue
607
608# 2: Filter the statuses by timestamp
609statuses = sorted(calls[call_hash]["statuses"], key=lambda x: x["timestamp"])
610
611# If the latest status is succeeded, then drop everything before.
612if statuses[-1]["stage"] == WorkStageStatus.SUCCEEDED:
613status = statuses[-1]
614status["timestamp"] = int(status["timestamp"])
615calls[call_hash]["statuses"] = [status]
616else:
617# TODO: Some status are being duplicated,
618# this seems related to the StateObserver.
619final_statuses = []
620for status in statuses:
621if status not in final_statuses:
622final_statuses.append(status)
623calls[call_hash]["statuses"] = final_statuses
624
625def start(self) -> None:
626"""Starts LightingWork component via CloudCompute."""
627if self.status.stage == WorkStageStatus.STOPPED:
628raise Exception("A work can be started only once for now.")
629
630# This enables to start the run method with a phony input and exit.
631self.run(Action(method="start"))
632
633def run(self, *args: Any, **kwargs: Any) -> None:
634"""Override to add your own logic.
635
636Raises:
637LightningPlatformException: If resource exceeds platform quotas or other constraints.
638
639"""
640
641def on_exception(self, exception: BaseException) -> None:
642"""Override to customize how to handle exception in the run method."""
643if self._raise_exception:
644raise exception
645
646def _aggregate_status_timeout(self, statuses: List[Dict]) -> WorkStatus:
647"""Method used to return the first request and the total count of timeout after the latest succeeded status."""
648succeeded_statuses = [
649status_idx for status_idx, status in enumerate(statuses) if status["stage"] == WorkStageStatus.SUCCEEDED
650]
651if succeeded_statuses:
652succeed_status_id = succeeded_statuses[-1] + 1
653statuses = statuses[succeed_status_id:]
654timeout_statuses = [status for status in statuses if status.get("reason") == WorkFailureReasons.TIMEOUT]
655assert statuses[0]["stage"] == WorkStageStatus.PENDING
656status = {**timeout_statuses[-1], "timestamp": statuses[0]["timestamp"]}
657return WorkStatus(**status, count=len(timeout_statuses))
658
659def on_exit(self) -> None:
660"""Override this hook to add your logic when the work is exiting.
661
662Note: This hook is not guaranteed to be called when running in the cloud.
663
664"""
665pass
666
667def stop(self) -> None:
668"""Stops LightingWork component and shuts down hardware provisioned via CloudCompute.
669
670This can only be called from a ``LightningFlow``.
671
672"""
673if not self._backend:
674raise RuntimeError(f"Only the `LightningFlow` can request this work ({self.name!r}) to stop.")
675if self.status.stage == WorkStageStatus.STOPPED:
676return
677latest_hash = self._calls[CacheCallsKeys.LATEST_CALL_HASH]
678stop_status = make_status(WorkStageStatus.STOPPED, reason=WorkStopReasons.PENDING)
679self._calls[latest_hash]["statuses"].append(stop_status)
680app = _LightningAppRef().get_current()
681self._backend.stop_work(app, self) # type: ignore[arg-type]
682
683def delete(self) -> None:
684"""Delete LightingWork component and shuts down hardware provisioned via CloudCompute.
685
686Locally, the work.delete() behaves as work.stop().
687
688"""
689if not self._backend:
690raise Exception(
691"Can't delete the work, it looks like it isn't attached to a LightningFlow. "
692"Make sure to assign the Work to a flow instance."
693)
694app = _LightningAppRef().get_current()
695self._backend.delete_work(app, self)
696
697def _check_run_is_implemented(self) -> None:
698if not is_overridden("run", instance=self, parent=LightningWork):
699raise TypeError(
700f"The work `{self.__class__.__name__}` is missing the `run()` method. This is required. Implement it"
701" first and then call it in your Flow."
702)
703
704def _register_cloud_compute(self) -> None:
705internal_id = self.cloud_compute.id
706assert internal_id
707if internal_id not in _CLOUD_COMPUTE_STORE:
708_CLOUD_COMPUTE_STORE[internal_id] = _CloudComputeStore(id=internal_id, component_names=[])
709_CLOUD_COMPUTE_STORE[internal_id].add_component_name(self.name)
710
711def apply_flow_delta(self, delta: Delta) -> None:
712"""Override to customize how the flow should update the work state."""
713# TODO: Add support for thread safe locking over JSON Serializable objects.
714if any(k not in ["values_changed", "type_changed"] for k in delta.to_dict()):
715raise Exception(
716"A forbidden operation to update the work from the flow was detected."
717f" Found {delta.to_dict()}, only `values_changed` and `type_changes` are currently allowed."
718)
719
720vars = self.state["vars"] + delta
721for name, value in vars.items():
722property_object = self._get_property_if_exists(name)
723if property_object is not None and property_object.fset is not None:
724property_object.fset(self, value)
725else:
726self._default_setattr(name, value)
727
728def configure_layout(self) -> Union[None, str, "Frontend"]:
729"""Configure the UI of this LightningWork.
730
731You can either
732
7331. Return a single :class:`~lightning.app.frontend.frontend.Frontend` object to serve a user interface
734for this Work.
7352. Return a string containing a URL to act as the user interface for this Work.
7363. Return ``None`` to indicate that this Work doesn't currently have a user interface.
737
738**Example:** Serve a static directory (with at least a file index.html inside).
739
740.. code-block:: python
741
742from lightning.app.frontend import StaticWebFrontend
743
744
745class Work(LightningWork):
746def configure_layout(self):
747return StaticWebFrontend("path/to/folder/to/serve")
748
749**Example:** Arrange the UI of my children in tabs (default UI by Lightning).
750
751.. code-block:: python
752
753class Work(LightningWork):
754def configure_layout(self):
755return [
756dict(name="First Tab", content=self.child0),
757dict(name="Second Tab", content=self.child1),
758dict(name="Lightning", content="https://lightning.ai"),
759]
760
761If you don't implement ``configure_layout``, Lightning will use ``self.url``.
762
763Note:
764This hook gets called at the time of app creation and then again as part of the loop. If desired, a
765returned URL can depend on the state. This is not the case if the work returns a
766:class:`~lightning.app.frontend.frontend.Frontend`. These need to be provided at the time of app creation
767in order for the runtime to start the server.
768
769"""
770