pytorch-lightning

Форк
0
861 строка · 32.3 Кб
1
# Copyright The Lightning AI team.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
import inspect
16
import warnings
17
from copy import deepcopy
18
from datetime import datetime
19
from types import FrameType
20
from typing import TYPE_CHECKING, Any, Dict, Generator, Iterable, List, Optional, Tuple, Union, cast
21

22
from deepdiff import DeepHash
23

24
from lightning.app.core.work import LightningWork
25
from lightning.app.frontend import Frontend
26
from lightning.app.storage.drive import Drive, _maybe_create_drive
27
from lightning.app.storage.path import Path
28
from lightning.app.utilities.app_helpers import _is_json_serializable, _LightningAppRef, _set_child_name, is_overridden
29
from lightning.app.utilities.component import _sanitize_state
30
from lightning.app.utilities.exceptions import ExitAppException, LightningFlowException
31
from lightning.app.utilities.introspection import _is_init_context, _is_run_context
32
from lightning.app.utilities.packaging.cloud_compute import CloudCompute, _maybe_create_cloud_compute
33

34
if TYPE_CHECKING:
35
    from lightning.app.runners.backends.backend import Backend
36

37

38
class LightningFlow:
39
    _INTERNAL_STATE_VARS = {
40
        # Internal protected variables that are still part of the state (even though they are prefixed with "_")
41
        "_paths",
42
        "_layout",
43
    }
44

45
    def __init__(self) -> None:
46
        """The LightningFlow is used by the :class:`~lightning.app.core.app.LightningApp` to coordinate and manage
47
        long- running jobs contained, the :class:`~lightning.app.core.work.LightningWork`.
48

49
        A LightningFlow is characterized by:
50

51
        * A set of state variables.
52
        * Long-running jobs (:class:`~lightning.app.core.work.LightningWork`).
53
        * Its children ``LightningFlow`` or ``LightningWork`` with their state variables.
54

55
        **State variables**
56

57
        The LightningFlow are special classes whose attributes require to be
58
        json-serializable (e.g., int, float, bool, list, dict, ...).
59

60
        They also may not reach into global variables unless they are constant.
61

62
        The attributes need to be all defined in `__init__` method,
63
        and eventually assigned to different values throughout the lifetime of the object.
64
        However, defining new attributes outside of `__init__` is not allowed.
65

66
        Attributes taken together represent the state of the component.
67
        Components are capable of retrieving their state and that of their
68
        children recursively at any time. They are also capable of setting
69
        an externally provided state recursively to its children.
70

71
        **Execution model and work**
72

73
        The entry point for execution is the ``run`` method at the root component.
74
        The ``run`` method of the root component may call the ``run`` method of its children, and the children
75
        may call the ``run`` methods of their children and so on.
76

77
        The ``run`` method of the root component is called repeatedly in a while loop forever until the app gets
78
        terminated. In this programming model (reminiscent of React, Vue or Streamlit from the JavaScript world),
79
        the values of the state variables, or their changes, are translated into actions throughout the component
80
        hierarchy. This means the flow of execution will only be affected by state changes in a component or one of
81
        its children, and otherwise remain idempotent.
82

83
        The actions themselves are self-contained within :class:`~lightning.app.core.work.LightningWork`.
84
        The :class:`~lightning.app.core.work.LightningWork` are typically used for long-running jobs,
85
        like downloading a dataset, performing a query, starting a computationally heavy script.
86
        While one may access any state variable in a LightningWork from a LightningFlow, one may not
87
        directly call methods of other components from within a LightningWork as LightningWork can't have any children.
88
        This limitation allows applications to be distributed at scale.
89

90
        **Component hierarchy and App**
91

92
        Given the above characteristics, a root LightningFlow, potentially containing
93
        children components, can be passed to an App object and its execution
94
        can be distributed (each LightningWork will be run within its own process
95
        or different arrangements).
96

97
        Example:
98

99
            >>> from lightning.app import LightningFlow
100
            >>> class RootFlow(LightningFlow):
101
            ...     def __init__(self):
102
            ...         super().__init__()
103
            ...         self.counter = 0
104
            ...     def run(self):
105
            ...         self.counter += 1
106
            ...
107
            >>> flow = RootFlow()
108
            >>> flow.run()
109
            >>> assert flow.counter == 1
110
            >>> assert flow.state["vars"]["counter"] == 1
111

112
        """
113
        self._state: set = set()
114
        self._name: str = ""
115
        self._flows: set = set()
116
        self._works: set = set()
117
        self._structures: set = set()
118
        self._calls: dict = {}
119
        self._changes: dict = {}
120
        self._layout: Union[List[Dict], Dict] = {}
121
        self._paths: dict = {}
122
        self._backend: Optional["Backend"] = None
123
        # tuple instead of a list so that it cannot be modified without using the setter
124
        self._lightningignore: Tuple[str, ...] = ()
125

126
    @property
127
    def name(self) -> str:
128
        """Return the current LightningFlow name."""
129
        return self._name or "root"
130

131
    def __setattr__(self, name: str, value: Any) -> None:
132
        attr = getattr(self.__class__, name, None)
133
        if isinstance(attr, property) and attr.fset is not None:
134
            return attr.fset(self, value)
135

136
        from lightning.app.structures import Dict as ComponentDict
137
        from lightning.app.structures import List as ComponentList
138

139
        if (
140
            not _is_init_context(self)
141
            and name not in self._state
142
            and name not in self._paths
143
            and (
144
                not isinstance(value, (LightningWork, LightningFlow))
145
                or (isinstance(value, (LightningWork, LightningFlow)) and not _is_run_context(self))
146
            )
147
            and name not in self._works.union(self._flows)
148
            and self._is_state_attribute(name)
149
        ):
150
            raise AttributeError(f"Cannot set attributes that were not defined in __init__: {name}")
151

152
        if isinstance(value, str) and value.startswith("lit://"):
153
            value = Path(value)
154

155
        if self._is_state_attribute(name):
156
            if hasattr(self, name):
157
                if name in self._flows and value != getattr(self, name):
158
                    raise AttributeError(f"Cannot set attributes as the flow can't be changed once defined: {name}")
159

160
                if name in self._works and value != getattr(self, name):
161
                    raise AttributeError(f"Cannot set attributes as the work can't be changed once defined: {name}")
162

163
            if isinstance(value, (list, dict)) and value:
164
                _type = (LightningFlow, LightningWork, ComponentList, ComponentDict)
165
                if isinstance(value, list) and all(isinstance(va, _type) for va in value):
166
                    value = ComponentList(*value)
167

168
                if isinstance(value, dict) and all(isinstance(va, _type) for va in value.values()):
169
                    value = ComponentDict(**value)
170

171
            if isinstance(value, LightningFlow):
172
                self._flows.add(name)
173
                _set_child_name(self, value, name)
174
                if name in self._state:
175
                    self._state.remove(name)
176
                # Attach the backend to the flow and its children work.
177
                if self._backend:
178
                    LightningFlow._attach_backend(value, self._backend)
179
                for work in value.works():
180
                    work._register_cloud_compute()
181

182
            elif isinstance(value, LightningWork):
183
                self._works.add(name)
184
                _set_child_name(self, value, name)
185
                if name in self._state:
186
                    self._state.remove(name)
187
                if self._backend:
188
                    self._backend._wrap_run_method(_LightningAppRef().get_current(), value)  # type: ignore[arg-type]
189
                value._register_cloud_compute()
190

191
            elif isinstance(value, (ComponentDict, ComponentList)):
192
                self._structures.add(name)
193
                _set_child_name(self, value, name)
194

195
                _backend = getattr(self, "backend", None)
196
                if _backend is not None:
197
                    value._backend = _backend
198

199
                for flow in value.flows:
200
                    if _backend is not None:
201
                        LightningFlow._attach_backend(flow, _backend)
202

203
                for work in value.works:
204
                    work._register_cloud_compute()
205
                    if _backend is not None:
206
                        _backend._wrap_run_method(_LightningAppRef().get_current(), work)
207

208
            elif isinstance(value, Path):
209
                # In the init context, the full name of the Flow and Work is not known, i.e., we can't serialize
210
                # the path without losing the information of origin and consumer. Hence, we delay the serialization
211
                # of the path object until the app is instantiated.
212
                if not _is_init_context(self):
213
                    self._paths[name] = value.to_dict()
214
                self._state.add(name)
215

216
            elif isinstance(value, Drive):
217
                value = deepcopy(value)
218
                value.component_name = self.name
219
                self._state.add(name)
220

221
            elif isinstance(value, CloudCompute):
222
                self._state.add(name)
223

224
            elif _is_json_serializable(value):
225
                self._state.add(name)
226

227
                if not isinstance(value, Path) and hasattr(self, "_paths") and name in self._paths:
228
                    # The attribute changed type from Path to another
229
                    self._paths.pop(name)
230

231
            else:
232
                raise AttributeError(
233
                    f"Only JSON-serializable attributes are currently supported"
234
                    f" (str, int, float, bool, tuple, list, dict etc.) to be part of {self} state. "
235
                    f"Found the attribute {name} with {value} instead. \n"
236
                    "HINT: Private attributes defined as follows `self._x = y` won't be shared between components "
237
                    "and therefore don't need to be JSON-serializable."
238
                )
239

240
        super().__setattr__(name, value)
241
        return None
242

243
    @staticmethod
244
    def _attach_backend(flow: "LightningFlow", backend: "Backend") -> None:
245
        """Attach the backend to all flows and its children."""
246
        flow._backend = backend
247

248
        for name in flow._structures:
249
            getattr(flow, name)._backend = backend
250

251
        for child_flow in flow.flows.values():
252
            child_flow._backend = backend
253
            for name in child_flow._structures:
254
                getattr(child_flow, name)._backend = backend
255

256
        app = _LightningAppRef().get_current()
257

258
        for child_work in flow.works():
259
            child_work._backend = backend
260
            backend._wrap_run_method(app, child_work)  # type: ignore[arg-type]
261

262
    def __getattr__(self, item: str) -> Any:
263
        if item in self.__dict__.get("_paths", {}):
264
            return Path.from_dict(self._paths[item])
265
        return self.__getattribute__(item)
266

267
    @property
268
    def ready(self) -> bool:
269
        """Override to customize when your App should be ready."""
270
        flows = self.flows
271
        return all(flow.ready for flow in flows.values()) if flows else True
272

273
    @property
274
    def changes(self) -> dict:
275
        return self._changes.copy()
276

277
    @property
278
    def state(self) -> dict:
279
        """Returns the current flow state along its children."""
280
        children_state = {child: getattr(self, child).state for child in self._flows}
281
        works_state = {work: getattr(self, work).state for work in self._works}
282
        return {
283
            "vars": _sanitize_state({el: getattr(self, el) for el in self._state}),
284
            # this may have the challenge that ret cannot be pickled, we'll need to handle this
285
            "calls": self._calls.copy(),
286
            "flows": children_state,
287
            "works": works_state,
288
            "structures": {child: getattr(self, child).state for child in self._structures},
289
            "changes": {},
290
        }
291

292
    @property
293
    def state_vars(self) -> dict:
294
        children_state = {child: getattr(self, child).state_vars for child in self._flows}
295
        works_state = {work: getattr(self, work).state_vars for work in self._works}
296
        return {
297
            "vars": _sanitize_state({el: getattr(self, el) for el in self._state}),
298
            "flows": children_state,
299
            "works": works_state,
300
            "structures": {child: getattr(self, child).state_vars for child in self._structures},
301
        }
302

303
    @property
304
    def state_with_changes(self) -> dict:
305
        children_state = {child: getattr(self, child).state_with_changes for child in self._flows}
306
        works_state = {work: getattr(self, work).state_with_changes for work in self._works}
307
        return {
308
            "vars": _sanitize_state({el: getattr(self, el) for el in self._state}),
309
            # this may have the challenge that ret cannot be pickled, we'll need to handle this
310
            "calls": self._calls.copy(),
311
            "flows": children_state,
312
            "works": works_state,
313
            "structures": {child: getattr(self, child).state_with_changes for child in self._structures},
314
            "changes": self.changes,
315
        }
316

317
    @property
318
    def flows(self) -> Dict[str, "LightningFlow"]:
319
        """Return its children LightningFlow."""
320
        flows = {}
321
        for el in sorted(self._flows):
322
            flow = getattr(self, el)
323
            flows[flow.name] = flow
324
            flows.update(flow.flows)
325
        for struct_name in sorted(self._structures):
326
            flows.update(getattr(self, struct_name).flows)
327
        return flows
328

329
    @property
330
    def lightningignore(self) -> Tuple[str, ...]:
331
        """Programmatic equivalent of the ``.lightningignore`` file."""
332
        return self._lightningignore
333

334
    @lightningignore.setter
335
    def lightningignore(self, lightningignore: Tuple[str, ...]) -> None:
336
        if self._backend is not None:
337
            raise RuntimeError(
338
                f"Your app has been already dispatched, so modifying the `{self.name}.lightningignore` does not have an"
339
                " effect"
340
            )
341
        self._lightningignore = lightningignore
342

343
    def works(self, recurse: bool = True) -> List[LightningWork]:
344
        """Return its :class:`~lightning.app.core.work.LightningWork`."""
345
        works = [getattr(self, el) for el in sorted(self._works)]
346
        if not recurse:
347
            return works
348
        for child_name in sorted(self._flows):
349
            for w in getattr(self, child_name).works(recurse=recurse):
350
                works.append(w)
351
        for struct_name in sorted(self._structures):
352
            for w in getattr(self, struct_name).works:
353
                works.append(w)
354
        return works
355

356
    def named_works(self, recurse: bool = True) -> List[Tuple[str, LightningWork]]:
357
        """Return its :class:`~lightning.app.core.work.LightningWork` with their names."""
358
        return [(w.name, w) for w in self.works(recurse=recurse)]
359

360
    def set_state(self, provided_state: Dict, recurse: bool = True) -> None:
361
        """Method to set the state to this LightningFlow, its children and
362
        :class:`~lightning.app.core.work.LightningWork`.
363

364
        Arguments:
365
            provided_state: The state to be reloaded
366
            recurse: Whether to apply the state down children.
367

368
        """
369
        for k, v in provided_state["vars"].items():
370
            if isinstance(v, Dict):
371
                v = _maybe_create_drive(self.name, v)
372
            if isinstance(v, Dict):
373
                v = _maybe_create_cloud_compute(v)
374
            setattr(self, k, v)
375
        self._changes = provided_state["changes"]
376
        self._calls.update(provided_state["calls"])
377

378
        if not recurse:
379
            return
380

381
        for child, state in provided_state["flows"].items():
382
            getattr(self, child).set_state(state)
383
        for work, state in provided_state["works"].items():
384
            getattr(self, work).set_state(state)
385
        for structure, state in provided_state["structures"].items():
386
            getattr(self, structure).set_state(state)
387

388
    def stop(self, end_msg: str = "") -> None:
389
        """Method used to exit the application."""
390
        if end_msg:
391
            print(end_msg)
392
        raise ExitAppException
393

394
    def fail(self, end_msg: str = "") -> None:
395
        """Method used to exit and fail the application."""
396
        if end_msg:
397
            print(end_msg)
398
        raise LightningFlowException
399

400
    def _exit(self, end_msg: str = "") -> None:
401
        """Used to exit the application.
402

403
        Private method.
404

405
        .. deprecated:: 1.9.0
406
            This function is deprecated and will be removed in 2.0.0. Use :meth:`stop` instead.
407

408
        """
409
        warnings.warn(
410
            DeprecationWarning(
411
                "This function is deprecated and will be removed in 2.0.0. Use `LightningFlow.stop` instead."
412
            )
413
        )
414

415
        return self.stop(end_msg=end_msg)
416

417
    @staticmethod
418
    def _is_state_attribute(name: str) -> bool:
419
        """Every public attribute is part of the state by default and all protected (prefixed by '_') or private
420
        (prefixed by '__') attributes are not.
421

422
        Exceptions are listed in the `_INTERNAL_STATE_VARS` class variable.
423

424
        """
425
        return name in LightningFlow._INTERNAL_STATE_VARS or not name.startswith("_")
426

427
    def run(self, *args: Any, **kwargs: Any) -> None:
428
        """Override with your own logic."""
429
        pass
430

431
    def schedule(
432
        self, cron_pattern: str, start_time: Optional[datetime] = None, user_key: Optional[str] = None
433
    ) -> bool:
434
        """The schedule method is used to run a part of the flow logic on timely manner.
435

436
        .. code-block:: python
437

438
            from lightning.app import LightningFlow
439

440

441
            class Flow(LightningFlow):
442
                def run(self):
443
                    if self.schedule("hourly"):
444
                        print("run some code every hour")
445

446
        Arguments:
447
            cron_pattern: The cron pattern to provide. Learn more at https://crontab.guru/.
448
            start_time: The start time of the cron job.
449
            user_key: Optional key used to improve the caching mechanism.
450

451
        A best practice is to avoid running a dynamic flow or work under the self.schedule method.
452
        Instead, instantiate them within the condition, but run them outside.
453

454
        .. code-block:: python
455

456
            from lightning.app import LightningFlow
457
            from lightning.app.structures import List
458

459

460
            class SchedulerDAG(LightningFlow):
461
                def __init__(self):
462
                    super().__init__()
463
                    self.dags = List()
464

465
                def run(self):
466
                    if self.schedule("hourly"):
467
                        self.dags.append(DAG(...))
468

469
                    for dag in self.dags:
470
                        payload = dag.run()
471

472
        **Learn more about Scheduling**
473

474
        .. raw:: html
475

476
            <div class="display-card-container">
477
                <div class="row">
478

479
        .. displayitem::
480
            :header: Schedule your components
481
            :description: Learn more scheduling.
482
            :col_css: col-md-4
483
            :button_link: ../../../glossary/scheduling.html
484
            :height: 180
485
            :tag: Basic
486

487
        .. displayitem::
488
            :header: Build your own DAG
489
            :description: Learn more DAG scheduling with examples.
490
            :col_css: col-md-4
491
            :button_link: ../../../examples/app/dag/dag.html
492
            :height: 180
493
            :tag: Basic
494

495
        .. raw:: html
496

497
                </div>
498
            </div>
499
            <br />
500

501
        """
502
        if not user_key:
503
            frame = cast(FrameType, inspect.currentframe()).f_back
504
            assert frame is not None
505
            cache_key = f"{cron_pattern}.{frame.f_code.co_filename}.{frame.f_lineno}"
506
        else:
507
            cache_key = user_key
508

509
        call_hash = f"{self.schedule.__name__}:{DeepHash(cache_key)[cache_key]}"
510

511
        if "scheduling" not in self._calls:
512
            self._calls["scheduling"] = {}
513

514
        entered = call_hash in self._calls["scheduling"]
515

516
        expr_aliases = {
517
            "midnight": "@midnight",
518
            "hourly": "@hourly",
519
            "daily": "@daily",
520
            "weekly": "@weekly",
521
            "monthly": "@monthly",
522
            "yearly": "@yearly",
523
            "annually": "@annually",
524
        }
525

526
        if cron_pattern in expr_aliases:
527
            cron_pattern = expr_aliases[cron_pattern]
528

529
        if not entered:
530
            if not start_time:
531
                start_time = datetime.now()
532

533
            schedule_metadata = {
534
                "running": False,
535
                "cron_pattern": cron_pattern,
536
                "start_time": str(start_time.isoformat()),
537
                "name": self.name,
538
            }
539

540
            self._calls["scheduling"][call_hash] = schedule_metadata
541
            app = _LightningAppRef().get_current()
542
            if app:
543
                app._register_schedule(call_hash, schedule_metadata)
544
            return True
545

546
        return self._calls["scheduling"][call_hash]["running"]
547

548
    def _enable_schedule(self, call_hash: str) -> None:
549
        self._calls["scheduling"][call_hash]["running"] = True
550

551
    def _disable_running_schedules(self) -> None:
552
        if "scheduling" not in self._calls:
553
            return
554
        for call_hash in self._calls["scheduling"]:
555
            self._calls["scheduling"][call_hash]["running"] = False
556

557
    def configure_layout(self) -> Union[Dict[str, Any], List[Dict[str, Any]], Frontend]:
558
        """Configure the UI layout of this LightningFlow.
559

560
        You can either
561

562
        1.  Return a single :class:`~lightning.app.frontend.frontend.Frontend` object to serve a user interface
563
            for this Flow.
564
        2.  Return a single dictionary to expose the UI of a child flow.
565
        3.  Return a list of dictionaries to arrange the children of this flow in one or multiple tabs.
566

567
        **Example:** Serve a static directory (with at least a file index.html inside).
568

569
        .. code-block:: python
570

571
            from lightning.app.frontend import StaticWebFrontend
572

573

574
            class Flow(LightningFlow):
575
                ...
576

577
                def configure_layout(self):
578
                    return StaticWebFrontend("path/to/folder/to/serve")
579

580
        **Example:** Serve a streamlit UI (needs the streamlit package to be installed).
581

582
        .. code-block:: python
583

584
            from lightning.app.frontend import StaticWebFrontend
585

586

587
            class Flow(LightningFlow):
588
                ...
589

590
                def configure_layout(self):
591
                    return StreamlitFrontend(render_fn=my_streamlit_ui)
592

593

594
            def my_streamlit_ui(state):
595
                # add your streamlit code here!
596
                import streamlit as st
597

598

599
        **Example:** Arrange the UI of my children in tabs (default UI by Lightning).
600

601
        .. code-block:: python
602

603
            class Flow(LightningFlow):
604
                def configure_layout(self):
605
                    return [
606
                        dict(name="First Tab", content=self.child0),
607
                        dict(name="Second Tab", content=self.child1),
608
                        dict(name="Lightning", content="https://lightning.ai"),
609
                    ]
610

611
        If you don't implement ``configure_layout``, Lightning will collect all children and display their UI in a tab
612
        (if they have their own ``configure_layout`` implemented).
613

614
        Note:
615
            This hook gets called at the time of app creation and then again as part of the loop. If desired, the
616
            returned layout configuration can depend on the state. The only exception are the flows that return a
617
            :class:`~lightning.app.frontend.frontend.Frontend`. These need to be provided at the time of app creation
618
            in order for the runtime to start the server.
619

620
        **Learn more about adding UI**
621

622
        .. raw:: html
623

624
            <div class="display-card-container">
625
                <div class="row">
626

627
        .. displayitem::
628
            :header: Add a web user interface (UI)
629
            :description: Learn more how to integrate several UIs.
630
            :col_css: col-md-4
631
            :button_link: ../../../workflows/add_web_ui/index.html
632
            :height: 180
633
            :tag: Basic
634

635
        .. raw:: html
636

637
                </div>
638
            </div>
639
            <br />
640

641
        """
642
        return [{"name": name, "content": component} for (name, component) in self.flows.items()]
643

644
    def experimental_iterate(self, iterable: Iterable, run_once: bool = True, user_key: str = "") -> Generator:
645
        """This method should always be used with any kind of iterable to ensure its fault tolerant.
646

647
        If you want your iterable to always be consumed from scratch, you shouldn't use this method.
648

649
        Arguments:
650
            iterable: Iterable to iterate over. The iterable shouldn't have side effects or be random.
651
            run_once: Whether to run the entire iteration only once.
652
                Otherwise, it would restart from the beginning.
653
            user_key: Key to be used to track the caching mechanism.
654

655
        """
656
        if not isinstance(iterable, Iterable):
657
            raise TypeError(f"An iterable should be provided to `self.iterate` method. Found {iterable}")
658

659
        # TODO: Find a better way. Investigated using __reduce__, but state change invalidate the cache.
660
        if not user_key:
661
            frame = cast(FrameType, inspect.currentframe()).f_back
662
            assert frame is not None
663
            cache_key = f"{frame.f_code.co_filename}.{frame.f_code.co_firstlineno}"
664
        else:
665
            cache_key = user_key
666

667
        call_hash = f"{self.experimental_iterate.__name__}:{DeepHash(cache_key)[cache_key]}"
668
        entered = call_hash in self._calls
669
        has_started = entered and self._calls[call_hash]["counter"] > 0
670
        has_finished = entered and self._calls[call_hash]["has_finished"]
671

672
        if has_finished:
673
            if not run_once:
674
                self._calls[call_hash].update({"counter": 0, "has_finished": False})
675
            else:
676
                return range(0)
677

678
        if not has_started:
679
            self._calls[call_hash] = {
680
                "name": self.experimental_iterate.__name__,
681
                "call_hash": call_hash,
682
                "counter": 0,
683
                "has_finished": False,
684
            }
685

686
        skip_counter = max(self._calls[call_hash]["counter"], 0)
687

688
        for counter, value in enumerate(iterable):
689
            if skip_counter:
690
                skip_counter -= 1
691
                continue
692
            self._calls[call_hash].update({"counter": counter})
693
            yield value
694

695
        self._calls[call_hash].update({"has_finished": True})
696

697
    def configure_commands(self) -> None:
698
        """Configure the commands of this LightningFlow.
699

700
        Returns a list of dictionaries mapping a command name to a flow method.
701

702
        .. code-block:: python
703

704
            class Flow(LightningFlow):
705
                def __init__(self):
706
                    super().__init__()
707
                    self.names = []
708

709
                def configure_commands(self):
710
                    return {"my_command_name": self.my_remote_method}
711

712
                def my_remote_method(self, name):
713
                    self.names.append(name)
714

715
        Once the app is running with the following command:
716

717
        .. code-block:: bash
718

719
            lightning_app run app app.py
720

721
        .. code-block:: bash
722

723
            lightning_app my_command_name --args name=my_own_name
724

725
        """
726
        raise NotImplementedError
727

728
    def configure_api(self) -> None:
729
        """Configure the API routes of the LightningFlow.
730

731
        Returns a list of HttpMethod such as Post or Get.
732

733
        .. code-block:: python
734

735
            from lightning.app import LightningFlow
736
            from lightning.app.api import Post
737

738
            from pydantic import BaseModel
739

740

741
            class HandlerModel(BaseModel):
742
                name: str
743

744

745
            class Flow(LightningFlow):
746
                def __init__(self):
747
                    super().__init__()
748
                    self.names = []
749

750
                def handler(self, config: HandlerModel) -> None:
751
                    self.names.append(config.name)
752

753
                def configure_api(self):
754
                    return [Post("/v1/api/request", self.handler)]
755

756
        Once the app is running, you can access the Swagger UI of the app
757
        under the ``/docs`` route.
758

759
        """
760
        raise NotImplementedError
761

762
    def state_dict(self) -> dict:
763
        """Returns the current flow state but not its children."""
764
        return {
765
            "vars": _sanitize_state({el: getattr(self, el) for el in self._state}),
766
            "calls": self._calls.copy(),
767
            "changes": {},
768
            "flows": {},
769
            "works": {},
770
            "structures": {},
771
        }
772

773
    def load_state_dict(
774
        self,
775
        flow_state: Dict[str, Any],
776
        children_states: Dict[str, Any],
777
        strict: bool = True,
778
    ) -> None:
779
        """Reloads the state of this flow and its children.
780

781
        .. code-block:: python
782

783

784
            class Work(LightningWork):
785
                def __init__(self):
786
                    super().__init__()
787
                    self.counter = 0
788

789
                def run(self):
790
                    self.counter += 1
791

792

793
            class Flow(LightningFlow):
794
                def run(self):
795
                    # dynamically create a work.
796
                    if not getattr(self, "w", None):
797
                        self.w = WorkReload()
798

799
                    self.w.run()
800

801
                def load_state_dict(self, flow_state, children_states, strict) -> None:
802
                    # 1: Re-instantiate the dynamic work
803
                    self.w = Work()
804

805
                    # 2: Make any states modification / migration.
806
                    ...
807

808
                    # 3: Call the parent ``load_state_dict`` to
809
                    # recursively reload the states.
810
                    super().load_state_dict(
811
                        flow_state,
812
                        children_states,
813
                        strict,
814
                    )
815

816
        Arguments:
817
            flow_state: The state of the current flow.
818
            children_states: The state of the dynamic children of this flow.
819
            strict: Whether to raise an exception if a dynamic
820
                children hasn't been re-created.
821

822
        """
823
        self.set_state(flow_state, recurse=False)
824
        direct_children_states = {k: v for k, v in children_states.items() if "." not in k}
825
        for child_name, state in direct_children_states.items():
826
            child = getattr(self, child_name, None)
827
            if isinstance(child, LightningFlow):
828
                lower_children_states = {
829
                    k.replace(child_name + ".", ""): v
830
                    for k, v in children_states.items()
831
                    if k.startswith(child_name) and k != child_name
832
                }
833
                child.load_state_dict(state, lower_children_states, strict=strict)
834
            elif isinstance(child, LightningWork):
835
                child.set_state(state)
836
            elif strict:
837
                raise ValueError(f"The component {child_name} wasn't instantiated for the component {self.name}")
838

839

840
class _RootFlow(LightningFlow):
841
    def __init__(self, work: LightningWork) -> None:
842
        super().__init__()
843
        self.work = work
844

845
    @property
846
    def ready(self) -> bool:
847
        ready = getattr(self.work, "ready", None)
848
        if ready is not None:
849
            return ready
850
        return self.work.url != ""
851

852
    def run(self) -> None:
853
        if self.work.has_succeeded:
854
            self.work.stop()
855
            self.stop()
856
        self.work.run()
857

858
    def configure_layout(self) -> list:
859
        if is_overridden("configure_layout", self.work):
860
            return [{"name": "Main", "content": self.work}]
861
        return []
862

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.