pytorch-lightning
411 строк · 17.4 Кб
1# Copyright The Lightning AI team.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import json
15from re import escape
16from unittest import mock
17from unittest.mock import ANY, Mock
18
19import pytest
20import torch
21from lightning.fabric.accelerators import CPUAccelerator, CUDAAccelerator
22from lightning.fabric.strategies import DeepSpeedStrategy
23from torch.optim import Optimizer
24
25from tests_fabric.helpers.runif import RunIf
26
27
28@pytest.fixture()
29def deepspeed_config():
30return {
31"optimizer": {"type": "SGD", "params": {"lr": 3e-5}},
32"scheduler": {
33"type": "WarmupLR",
34"params": {"last_batch_iteration": -1, "warmup_min_lr": 0, "warmup_max_lr": 3e-5, "warmup_num_steps": 100},
35},
36}
37
38
39@pytest.fixture()
40def deepspeed_zero_config(deepspeed_config):
41return {**deepspeed_config, "zero_allow_untested_optimizer": True, "zero_optimization": {"stage": 2}}
42
43
44@RunIf(deepspeed=True)
45def test_deepspeed_only_compatible_with_cuda():
46"""Test that the DeepSpeed strategy raises an exception if an invalid accelerator is used."""
47strategy = DeepSpeedStrategy(accelerator=CPUAccelerator())
48with pytest.raises(RuntimeError, match="The DeepSpeed strategy is only supported on CUDA GPUs"):
49strategy.setup_environment()
50
51
52@RunIf(deepspeed=True)
53def test_deepspeed_with_invalid_config_path():
54"""Test to ensure if we pass an invalid config path we throw an exception."""
55with pytest.raises(
56FileNotFoundError, match="You passed in a path to a DeepSpeed config but the path does not exist"
57):
58DeepSpeedStrategy(config="invalid_path.json")
59
60
61@RunIf(deepspeed=True)
62def test_deepspeed_with_env_path(tmp_path, monkeypatch, deepspeed_config):
63"""Test to ensure if we pass an env variable, we load the config from the path."""
64config_path = tmp_path / "temp.json"
65with open(config_path, "w") as f:
66f.write(json.dumps(deepspeed_config))
67monkeypatch.setenv("PL_DEEPSPEED_CONFIG_PATH", str(config_path))
68strategy = DeepSpeedStrategy()
69assert strategy.config == deepspeed_config
70
71
72@RunIf(deepspeed=True)
73def test_deepspeed_defaults():
74"""Ensure that defaults are correctly set as a config for DeepSpeed if no arguments are passed."""
75strategy = DeepSpeedStrategy()
76assert strategy.config is not None
77assert isinstance(strategy.config["zero_optimization"], dict)
78assert strategy._backward_sync_control is None
79
80
81@RunIf(deepspeed=True)
82def test_deepspeed_custom_activation_checkpointing_params():
83"""Ensure if we modify the activation checkpointing parameters, the deepspeed config contains these changes."""
84ds = DeepSpeedStrategy(
85partition_activations=True,
86cpu_checkpointing=True,
87contiguous_memory_optimization=True,
88synchronize_checkpoint_boundary=True,
89)
90checkpoint_config = ds.config["activation_checkpointing"]
91assert checkpoint_config["partition_activations"]
92assert checkpoint_config["cpu_checkpointing"]
93assert checkpoint_config["contiguous_memory_optimization"]
94assert checkpoint_config["synchronize_checkpoint_boundary"]
95
96
97@RunIf(deepspeed=True)
98def test_deepspeed_config_zero_offload(deepspeed_zero_config):
99"""Test the various ways optimizer-offloading can be configured."""
100# default config
101strategy = DeepSpeedStrategy(config=deepspeed_zero_config)
102assert "offload_optimizer" not in strategy.config["zero_optimization"]
103
104# default config
105strategy = DeepSpeedStrategy()
106assert "offload_optimizer" not in strategy.config["zero_optimization"]
107
108# default config with `offload_optimizer` argument override
109strategy = DeepSpeedStrategy(offload_optimizer=True)
110assert strategy.config["zero_optimization"]["offload_optimizer"] == {
111"buffer_count": 4,
112"device": "cpu",
113"nvme_path": "/local_nvme",
114"pin_memory": False,
115}
116
117# externally configured through config
118deepspeed_zero_config["zero_optimization"]["offload_optimizer"] = False
119strategy = DeepSpeedStrategy(config=deepspeed_zero_config)
120assert strategy.config["zero_optimization"]["offload_optimizer"] is False
121
122
123@RunIf(deepspeed=True)
124@mock.patch("deepspeed.initialize")
125def test_deepspeed_setup_module(init_mock):
126"""Test that the DeepSpeed strategy can set up the model for inference (no optimizer required)."""
127model = Mock()
128model.parameters.return_value = []
129strategy = DeepSpeedStrategy()
130strategy.parallel_devices = [torch.device("cuda", 1)]
131init_mock.return_value = [Mock()] * 4 # mock to make tuple unpacking work
132
133strategy.setup_module(model)
134init_mock.assert_called_with(
135args=ANY,
136config=strategy.config,
137model=model,
138model_parameters=ANY,
139optimizer=None,
140dist_init_required=False,
141)
142
143
144@RunIf(deepspeed=True)
145def test_deepspeed_requires_joint_setup():
146"""Test that the DeepSpeed strategy does not support setting up model and optimizer independently."""
147strategy = DeepSpeedStrategy()
148with pytest.raises(
149NotImplementedError, match=escape("does not support setting up the module and optimizer(s) independently")
150):
151strategy.setup_optimizer(Mock())
152
153
154@RunIf(deepspeed=True)
155def test_deepspeed_save_checkpoint_storage_options(tmp_path):
156"""Test that the DeepSpeed strategy does not accept storage options for saving checkpoints."""
157strategy = DeepSpeedStrategy()
158with pytest.raises(TypeError, match=escape("DeepSpeedStrategy.save_checkpoint(..., storage_options=...)` is not")):
159strategy.save_checkpoint(path=tmp_path, state=Mock(), storage_options=Mock())
160
161
162@RunIf(deepspeed=True)
163def test_deepspeed_save_checkpoint_one_deepspeed_engine_required(tmp_path):
164"""Test that the DeepSpeed strategy can only save one DeepSpeedEngine per checkpoint."""
165from deepspeed import DeepSpeedEngine
166
167strategy = DeepSpeedStrategy()
168
169# missing DeepSpeedEngine
170with pytest.raises(ValueError, match="Could not find a DeepSpeed model in the provided checkpoint state."):
171strategy.save_checkpoint(path=tmp_path, state={})
172with pytest.raises(ValueError, match="Could not find a DeepSpeed model in the provided checkpoint state."):
173strategy.save_checkpoint(path=tmp_path, state={"model": torch.nn.Linear(3, 3)})
174
175# multiple DeepSpeedEngine
176model1 = Mock(spec=torch.nn.Module)
177model1.modules.return_value = [Mock(spec=DeepSpeedEngine)]
178model2 = Mock(spec=torch.nn.Module)
179model2.modules.return_value = [Mock(spec=DeepSpeedEngine)]
180with pytest.raises(ValueError, match="Found multiple DeepSpeed engine modules in the given state."):
181strategy.save_checkpoint(path=tmp_path, state={"model1": model1, "model2": model2})
182
183
184@RunIf(deepspeed=True)
185def test_deepspeed_save_checkpoint_client_state_separation(tmp_path):
186"""Test that the DeepSpeed engine and optimizer get separated from the client state."""
187from deepspeed import DeepSpeedEngine
188
189strategy = DeepSpeedStrategy()
190
191# Model only
192model = Mock(spec=DeepSpeedEngine, optimizer=None)
193model.modules.return_value = [model]
194strategy.save_checkpoint(path=tmp_path, state={"model": model, "test": "data"})
195# the client_state should not contain any deepspeed engine or deepspeed optimizer
196model.save_checkpoint.assert_called_with(tmp_path, client_state={"test": "data"}, tag="checkpoint")
197
198# Model and optimizer
199optimizer = Mock()
200model = Mock(spec=DeepSpeedEngine, optimizer=optimizer)
201model.modules.return_value = [model]
202strategy.save_checkpoint(path=tmp_path, state={"model": model, "optimizer": optimizer, "test": "data"})
203# the client_state should not contain any deepspeed engine or deepspeed optimizer
204model.save_checkpoint.assert_called_with(tmp_path, client_state={"test": "data"}, tag="checkpoint")
205
206
207@RunIf(deepspeed=True)
208def test_deepspeed_save_checkpoint_warn_colliding_keys(tmp_path):
209"""Test that the strategy warns if there are keys in the user dict that collide internally with DeepSpeed."""
210from deepspeed import DeepSpeedEngine
211
212strategy = DeepSpeedStrategy()
213optimizer = Mock()
214model = Mock(spec=DeepSpeedEngine, optimizer=optimizer)
215model.modules.return_value = [model]
216# `mp_world_size` is an internal key
217with pytest.warns(UserWarning, match="Your state has keys that collide with DeepSpeed's internal"):
218strategy.save_checkpoint(path=tmp_path, state={"model": model, "optimizer": optimizer, "mp_world_size": 2})
219
220
221@RunIf(deepspeed=True)
222def test_deepspeed_load_checkpoint_validate_path(tmp_path):
223"""Test that we validate the checkpoint path for a DeepSpeed checkpoint and give suggestions for user error."""
224strategy = DeepSpeedStrategy()
225with pytest.raises(FileNotFoundError, match="The provided path is not a valid DeepSpeed checkpoint"):
226strategy.load_checkpoint(path=tmp_path, state={"model": Mock()})
227
228# User tries to pass the subfolder as the path
229checkpoint_path = tmp_path / "checkpoint"
230checkpoint_path.mkdir()
231with pytest.raises(FileNotFoundError, match=f"Try to load using this parent directory instead: {tmp_path}"):
232strategy.load_checkpoint(path=checkpoint_path, state={"model": Mock()})
233
234# User tries to pass an individual file inside the checkpoint folder
235checkpoint_path = checkpoint_path / "zero_pp_rank_0_mp_rank_00_model_states.pt"
236checkpoint_path.touch()
237with pytest.raises(FileNotFoundError, match=f"Try to load using this parent directory instead: {tmp_path}"):
238strategy.load_checkpoint(path=checkpoint_path, state={"model": Mock()})
239
240
241@RunIf(deepspeed=True)
242def test_deepspeed_load_checkpoint_no_state(tmp_path):
243"""Test that DeepSpeed can't load the full state without access to a model instance from the user."""
244strategy = DeepSpeedStrategy()
245with pytest.raises(ValueError, match=escape("Got DeepSpeedStrategy.load_checkpoint(..., state=None")):
246strategy.load_checkpoint(path=tmp_path, state=None)
247with pytest.raises(ValueError, match=escape("Got DeepSpeedStrategy.load_checkpoint(..., state={})")):
248strategy.load_checkpoint(path=tmp_path, state={})
249
250
251@RunIf(deepspeed=True)
252@mock.patch("lightning.fabric.strategies.deepspeed._is_deepspeed_checkpoint", return_value=True)
253def test_deepspeed_load_checkpoint_one_deepspeed_engine_required(_, tmp_path):
254"""Test that the DeepSpeed strategy can only load one DeepSpeedEngine per checkpoint."""
255from deepspeed import DeepSpeedEngine
256
257strategy = DeepSpeedStrategy()
258
259# missing DeepSpeedEngine
260with pytest.raises(ValueError, match="Could not find a DeepSpeed model in the provided checkpoint state."):
261strategy.load_checkpoint(path=tmp_path, state={"other": "data"})
262with pytest.raises(ValueError, match="Could not find a DeepSpeed model in the provided checkpoint state."):
263strategy.load_checkpoint(path=tmp_path, state={"model": torch.nn.Linear(3, 3)})
264
265# multiple DeepSpeedEngine
266model1 = Mock(spec=torch.nn.Module)
267model1.modules.return_value = [Mock(spec=DeepSpeedEngine)]
268model2 = Mock(spec=torch.nn.Module)
269model2.modules.return_value = [Mock(spec=DeepSpeedEngine)]
270with pytest.raises(ValueError, match="Found multiple DeepSpeed engine modules in the given state."):
271strategy.load_checkpoint(path=tmp_path, state={"model1": model1, "model2": model2})
272
273
274@RunIf(deepspeed=True)
275def test_deepspeed_load_checkpoint_client_state_missing(tmp_path):
276"""Test that the DeepSpeed strategy raises a custom error when client state couldn't be loaded by DeepSpeed."""
277from deepspeed import DeepSpeedEngine
278
279strategy = DeepSpeedStrategy()
280optimizer = Mock()
281model = Mock(spec=DeepSpeedEngine, optimizer=optimizer)
282model.modules.return_value = [model]
283
284# If the DeepSpeed engine fails to load the checkpoint file (e.g., file not found), it prints a warning and
285# returns None from its function call
286model.load_checkpoint.return_value = [None, None]
287
288# Check for our custom user error
289with pytest.raises(FileNotFoundError, match="The provided path is not a valid DeepSpeed checkpoint"):
290strategy.load_checkpoint(path=tmp_path, state={"model": model, "optimizer": optimizer, "test": "data"})
291
292
293@RunIf(deepspeed=True)
294@mock.patch("lightning.fabric.strategies.deepspeed._is_deepspeed_checkpoint", return_value=True)
295def test_deepspeed_load_checkpoint_state_updated_with_client_state(_, tmp_path):
296"""Test that the DeepSpeed strategy properly updates the state variables and returns additional metadata."""
297from deepspeed import DeepSpeedEngine
298
299strategy = DeepSpeedStrategy()
300optimizer = Mock()
301model = Mock(spec=DeepSpeedEngine, optimizer=optimizer)
302model.modules.return_value = [model]
303
304# the client state contains the additional user data that was proveded when saving, plus some deepspeed metadata
305loaded_client_state = {"user_data": {"iteration": 5}, "deepspeed_metadata": "data"}
306model.load_checkpoint.return_value = [None, loaded_client_state]
307
308state = {"model": model, "user_data": {"iteration": 0}}
309metadata = strategy.load_checkpoint(path=tmp_path, state=state)
310
311# the user's state gets updated with the loaded value
312assert state == {"model": model, "user_data": {"iteration": 5}}
313# additional metadata gets separated from client state
314assert metadata == {"deepspeed_metadata": "data"}
315
316
317@RunIf(deepspeed=True)
318@pytest.mark.parametrize("optimzer_state_requested", [True, False])
319@mock.patch("lightning.fabric.strategies.deepspeed._is_deepspeed_checkpoint", return_value=True)
320def test_deepspeed_load_checkpoint_optimzer_state_requested(_, optimzer_state_requested, tmp_path):
321"""Test that the DeepSpeed strategy loads the optimizer state only when requested."""
322from deepspeed import DeepSpeedEngine
323
324strategy = DeepSpeedStrategy()
325optimizer = Mock(spec=Optimizer)
326model = Mock(spec=DeepSpeedEngine, optimizer=optimizer)
327model.modules.return_value = [model]
328
329# required, otherwise mock cannot be unpacked
330model.load_checkpoint.return_value = [None, {}]
331
332state = {"model": model}
333if optimzer_state_requested:
334state["optimizer"] = optimizer
335
336strategy.load_checkpoint(path=tmp_path, state=state)
337model.load_checkpoint.assert_called_with(
338tmp_path,
339tag="checkpoint",
340load_optimizer_states=optimzer_state_requested,
341load_lr_scheduler_states=False,
342load_module_strict=True,
343)
344
345
346@RunIf(deepspeed=True)
347@pytest.mark.parametrize("stage", [1, 2, 3])
348def test_deepspeed_load_checkpoint_raw_state_dict(stage, tmp_path):
349"""Test that the `load_checkpoint` can load raw state dict checkpoints too."""
350strategy = DeepSpeedStrategy(stage=stage)
351
352model = torch.nn.Linear(3, 3)
353optimizer = torch.optim.Adam(model.parameters(), lr=1.0)
354torch.save(model.state_dict(), tmp_path / "model.ckpt")
355torch.save(optimizer.state_dict(), tmp_path / "optimizer.ckpt")
356
357new_model = torch.nn.Linear(3, 3)
358new_optimizer = torch.optim.Adam(new_model.parameters(), lr=2.0)
359
360strategy.load_checkpoint(tmp_path / "model.ckpt", state=new_model, strict=False)
361assert torch.equal(new_model.weight, model.weight)
362strategy.load_checkpoint(tmp_path / "optimizer.ckpt", state=new_optimizer, strict=False)
363assert new_optimizer.state_dict()["param_groups"][0]["lr"] == 1.0
364
365
366@RunIf(deepspeed=True)
367def test_errors_grad_clipping():
368strategy = DeepSpeedStrategy()
369with pytest.raises(
370NotImplementedError,
371match=(
372"DeepSpeed handles gradient clipping automatically within the optimizer. "
373"Make sure to set the `gradient_clipping` value in your Config."
374),
375):
376strategy.clip_gradients_norm(Mock(), Mock(), Mock(), Mock(), Mock())
377
378with pytest.raises(
379NotImplementedError,
380match=(
381"DeepSpeed handles gradient clipping automatically within the optimizer. "
382"Make sure to set the `gradient_clipping` value in your Config."
383),
384):
385strategy.clip_gradients_value(Mock(), Mock(), Mock())
386
387
388@RunIf(deepspeed=True, mps=False)
389def test_deepspeed_save_filter(tmp_path):
390strategy = DeepSpeedStrategy()
391with pytest.raises(TypeError, match="manages the state serialization internally"):
392strategy.save_checkpoint(path=tmp_path, state={}, filter={})
393
394
395@RunIf(deepspeed=True)
396@pytest.mark.parametrize("device_indices", [[1], [1, 0], [0, 2], [3, 2, 1]])
397def test_validate_parallel_devices_indices(device_indices):
398"""Test that the strategy validates that it doesn't support selecting specific devices by index.
399
400DeepSpeed doesn't support it and needs the index to match to the local rank of the process.
401
402"""
403accelerator = Mock(spec=CUDAAccelerator)
404strategy = DeepSpeedStrategy(
405accelerator=accelerator, parallel_devices=[torch.device("cuda", i) for i in device_indices]
406)
407with pytest.raises(
408RuntimeError, match=escape(f"device indices {device_indices!r} don't match the local rank values of processes")
409):
410strategy.setup_environment()
411accelerator.setup_device.assert_called_once_with(torch.device("cuda", device_indices[0]))
412