pytorch-lightning
60 строк · 2.2 Кб
1# Copyright The Lightning AI team.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import threading
16from datetime import datetime
17from typing import Optional
18
19from croniter import croniter
20from deepdiff import Delta
21
22from lightning.app.utilities.proxies import ComponentDelta
23
24
25class SchedulerThread(threading.Thread):
26# TODO (tchaton) Abstract this logic to a generic scheduling service.
27
28def __init__(self, app) -> None:
29super().__init__(daemon=True)
30self._exit_event = threading.Event()
31self._sleep_time = 1.0
32self._app = app
33
34def run(self) -> None:
35while not self._exit_event.is_set():
36self._exit_event.wait(self._sleep_time)
37self.run_once()
38
39def run_once(self):
40for call_hash in list(self._app._schedules.keys()):
41metadata = self._app._schedules[call_hash]
42start_time = datetime.fromisoformat(metadata["start_time"])
43current_date = datetime.now()
44next_event = croniter(metadata["cron_pattern"], start_time).get_next(datetime)
45# When the event is reached, send a delta to activate scheduling.
46if current_date > next_event:
47component_delta = ComponentDelta(
48id=metadata["name"],
49delta=Delta({
50"values_changed": {
51f"root['calls']['scheduling']['{call_hash}']['running']": {"new_value": True}
52}
53}),
54)
55self._app.delta_queue.put(component_delta)
56metadata["start_time"] = next_event.isoformat()
57
58def join(self, timeout: Optional[float] = None) -> None:
59self._exit_event.set()
60super().join(timeout)
61