colossalai
118 строк · 3.2 Кб
1import copy
2
3import pytest
4import torch
5import torch.nn as nn
6from torch.nn.parallel import DistributedDataParallel as DDP
7from torch.testing import assert_close
8
9import colossalai
10from colossalai.testing import rerun_if_address_is_in_use, spawn
11from colossalai.testing.random import seed_all
12from colossalai.zero import LowLevelZeroOptimizer
13
14
15class MlpModel(nn.Module):
16def __init__(self):
17super(MlpModel, self).__init__()
18self.linear1 = nn.Linear(12, 24)
19self.linear2 = nn.Linear(24, 12)
20
21def forward(self, x):
22x = self.linear1(x)
23x = self.linear2(x)
24return x
25
26
27def loose_close(a, b, dtype: torch.dtype = torch.float32):
28rtol = None
29atol = None
30if dtype is torch.float16:
31rtol = 5e-2
32atol = 5e-4
33elif dtype is torch.bfloat16:
34rtol = 4e-3
35atol = 4e-3
36
37a = a.detach().to(dtype)
38b = b.detach().to(dtype).to(a.device)
39
40assert_close(a, b, rtol=rtol, atol=atol)
41
42
43def exam_zero_1_torch_ddp_ckpt():
44"""
45We examine the state_dict of zero and DDP.
46Moreover, we examine the zero's loading checkpoint of a torch ckpt.
47"""
48local_rank = torch.distributed.get_rank()
49seed_all(1453)
50
51# create models
52torch_model = MlpModel().cuda()
53zero_model = copy.deepcopy(torch_model)
54
55torch_model = DDP(torch_model.cuda(), static_graph=True).cuda()
56
57# create optimizer
58zero_optimizer = torch.optim.Adam(zero_model.parameters(), lr=1)
59
60# we only test stage 1 here
61# the state dicts of stage 1 and stage 2 are the same
62zero_optimizer = LowLevelZeroOptimizer(
63zero_optimizer, overlap_communication=True, initial_scale=1, reduce_bucket_size=262144
64)
65
66torch_optimizer = torch.optim.Adam(torch_model.parameters(), lr=1)
67
68seed_all(1453 + local_rank)
69# create
70input_data = torch.rand(4, 12).cuda()
71
72# forward
73zero_output = zero_model(input_data)
74torch_output = torch_model(input_data)
75
76# backward
77zero_optimizer.backward(zero_output.mean().float())
78torch_output.mean().backward()
79
80# step
81zero_optimizer.step()
82torch_optimizer.step()
83
84torch_state_dict = torch_optimizer.state_dict()
85zero_state_dict = zero_optimizer.state_dict()
86
87# examine the original state dict
88for torch_state, zero_state in zip(torch_state_dict["state"].values(), zero_state_dict["state"].values()):
89for t_v, z_v in zip(torch_state.values(), zero_state.values()):
90loose_close(t_v, z_v)
91
92# empty the optimzer state
93zero_optimizer.optim.state = []
94
95# zero load a torch checkpoint
96zero_optimizer.load_state_dict(copy.deepcopy(torch_state_dict))
97zero_state_dict = zero_optimizer.state_dict()
98
99# examine the loaded state dict
100for torch_state, zero_state in zip(torch_state_dict["state"].values(), zero_state_dict["state"].values()):
101for t_v, z_v in zip(torch_state.values(), zero_state.values()):
102loose_close(t_v, z_v)
103
104
105def run_dist(rank, world_size, port):
106colossalai.launch(config=dict(), rank=rank, world_size=world_size, port=port, host="localhost")
107
108exam_zero_1_torch_ddp_ckpt()
109
110
111@pytest.mark.dist
112@rerun_if_address_is_in_use()
113def test_zero_ckpt():
114spawn(run_dist, 2)
115
116
117if __name__ == "__main__":
118test_zero_ckpt()
119