colossalai

Форк
0
299 строк · 10.7 Кб
1
import argparse
2
import json
3
import os
4

5
import torch
6
import torch.distributed as dist
7
from huggingface_hub import snapshot_download
8
from model.modeling_openmoe import OpenMoeForCausalLM, set_openmoe_args
9
from model.openmoe_policy import OpenMoeForCausalLMPolicy
10
from torch.utils.data import Dataset
11
from tqdm import tqdm
12
from transformers import T5Tokenizer
13
from transformers.models.llama import LlamaConfig
14
from utils import PerformanceEvaluator, get_model_numel
15

16
import colossalai
17
from colossalai.accelerator import get_accelerator
18
from colossalai.booster import Booster
19
from colossalai.booster.plugin.moe_hybrid_parallel_plugin import MoeHybridParallelPlugin
20
from colossalai.cluster import DistCoordinator
21
from colossalai.moe.layers import apply_load_balance
22
from colossalai.moe.manager import MOE_MANAGER
23
from colossalai.moe.utils import skip_init
24
from colossalai.nn.optimizer import HybridAdam
25

26

27
def move_to_cuda(batch, device):
28
    return {k: v.to(device) for k, v in batch.items()}
29

30

31
def load_ckpt(repo_name: str, model: OpenMoeForCausalLM, booster: Booster):
32
    ckpt_path = snapshot_download(repo_name)
33
    # single ckpt
34
    if os.path.exists(os.path.join(ckpt_path, "pytorch_model.bin")):
35
        ckpt_path = os.path.join(ckpt_path, "pytorch_model.bin")
36
    # shard ckpt
37
    elif os.path.exists(os.path.join(ckpt_path, "pytorch_model.bin.index.json")):
38
        ckpt_path = os.path.join(ckpt_path, "pytorch_model.bin.index.json")
39
    else:
40
        raise ValueError(f"Invalid checkpoint path: {ckpt_path}")
41
    booster.load_model(model, ckpt_path)
42

43

44
class RandomDataset(Dataset):
45
    def __init__(
46
        self, num_samples: int = 1000, max_length: int = 2048, vocab_size: int = 256384, tokenizer: T5Tokenizer = None
47
    ):
48
        self.num_samples = num_samples
49
        self.max_length = max_length
50
        if os.path.exists("./mock_data.json"):
51
            self.input_ids = []
52
            self.attention_mask = []
53
            with open("./mock_data.json", "r") as f:
54
                data = json.load(f)
55
            for v in data.values():
56
                d = v["text"]
57
                encode = tokenizer(
58
                    "<pad>" + d,
59
                    return_tensors="pt",
60
                    add_special_tokens=False,
61
                    max_length=max_length,
62
                    truncation=True,
63
                    padding="max_length",
64
                )
65
                self.input_ids.append(encode["input_ids"])
66
                self.attention_mask.append(encode["attention_mask"])
67
            self.input_ids = torch.cat(self.input_ids, dim=0).to(get_accelerator().get_current_device())
68
            self.attention_mask = torch.cat(self.attention_mask, dim=0).to(get_accelerator().get_current_device())
69
            repeat_times = num_samples // self.input_ids.shape[0] + 1
70
            self.input_ids = self.input_ids.repeat(repeat_times, 1)[:num_samples]
71
            self.attention_mask = self.attention_mask.repeat(repeat_times, 1)[:num_samples]
72
        else:
73
            self.input_ids = torch.randint(
74
                0, vocab_size, (num_samples, max_length), device=get_accelerator().get_current_device()
75
            )
76
            self.attention_mask = torch.ones_like(self.input_ids)
77

78
    def __len__(self):
79
        return self.num_samples
80

81
    def __getitem__(self, idx):
82
        return {
83
            "input_ids": self.input_ids[idx],
84
            "attention_mask": self.attention_mask[idx],
85
            "labels": self.input_ids[idx],
86
        }
87

88

89
def parse_args():
90
    # basic settings
91
    parser = argparse.ArgumentParser()
92
    parser.add_argument(
93
        "--model_name",
94
        type=str,
95
        default="base",
96
        choices=["base", "8b"],
97
        help="Path to pretrained model or model identifier from huggingface.co/models.",
98
    )
99
    parser.add_argument(
100
        "--batch_size",
101
        type=int,
102
        default=4,
103
        help="Batch size (per dp group) for the training dataloader.",
104
    )
105
    parser.add_argument(
106
        "--seq_length",
107
        type=int,
108
        default=2048,
109
        help="sequence length for the training dataloader.",
110
    )
111
    parser.add_argument("--seed", type=int, default=42, help="A seed for reproducible training.")
112
    parser.add_argument(
113
        "--plugin",
114
        type=str,
115
        default="hybrid",
116
        help="parallel plugin",
117
    )
118
    # hybrid plugin
119
    parser.add_argument("--pp_size", type=int, default=2, help="pp size")
120
    parser.add_argument("--dp_size", type=int, default=1, help="dp size")
121
    parser.add_argument("--ep_size", type=int, default=2, help="ep size")
122
    parser.add_argument("--zero_stage", type=int, default=2, help="zero stage in hybrid plugin")
123
    parser.add_argument("--microbatch_size", type=int, default=1, help="microbatch size")
124
    parser.add_argument("--extra_dp_size", type=int, default=1)
125
    # kernel
126
    parser.add_argument(
127
        "--use_kernel",
128
        action="store_true",
129
        help="Use kernel optim. Need to install flash attention, apex, triton to enable all kernel optimizations.",
130
    )
131
    # bench
132
    parser.add_argument("--warmup", type=int, default=20)
133
    parser.add_argument("--active", type=int, default=20)
134
    # load balance
135
    parser.add_argument("--load_balance", action="store_true")
136

137
    # overlap communication
138
    parser.add_argument("--overlap_comm", action="store_true")
139
    # hierarchical all-to-all
140
    parser.add_argument("--hierarchical_alltoall", action="store_true")
141
    args = parser.parse_args()
142
    return args
143

144

145
def main():
146
    args = parse_args()
147

148
    # Launch ColossalAI
149
    colossalai.launch_from_torch(config={}, seed=args.seed)
150
    coordinator = DistCoordinator()
151

152
    # Set plugin
153
    booster_kwargs = {}
154
    hybrid_dict = {
155
        "tp_size": 1,
156
        "custom_policy": OpenMoeForCausalLMPolicy(),
157
        "enable_fused_normalization": args.use_kernel,
158
        "enable_jit_fused": args.use_kernel,
159
        "precision": "bf16",
160
        "zero_stage": args.zero_stage,
161
    }
162
    mgr_dict = {}
163
    if args.plugin == "ep":
164
        dp_size = dist.get_world_size()
165
        plugin = MoeHybridParallelPlugin(
166
            pp_size=1,
167
            **hybrid_dict,
168
        )
169
        MOE_MANAGER.setup(
170
            parallel="EP",
171
            max_ep_size=dp_size,
172
            **mgr_dict,
173
        )
174
    elif args.plugin == "ep_zero":
175
        dp_size = dist.get_world_size()
176
        use_ep_inside = False
177
        plugin = MoeHybridParallelPlugin(
178
            pp_size=1,
179
            extra_dp_size=args.extra_dp_size,
180
            use_ep_inside=use_ep_inside,
181
            **hybrid_dict,
182
        )
183
        MOE_MANAGER.setup(
184
            parallel="EP",
185
            max_ep_size=dp_size // args.extra_dp_size,
186
            use_ep_inside=use_ep_inside,
187
            **mgr_dict,
188
        )
189
    elif args.plugin == "hybrid":
190
        dp_size = dist.get_world_size() // args.pp_size
191
        plugin = MoeHybridParallelPlugin(
192
            pp_size=args.pp_size,
193
            zero_stage=args.zero_stage,
194
            microbatch_size=args.microbatch_size,
195
            **hybrid_dict,
196
        )
197
        MOE_MANAGER.setup(
198
            parallel="EP",
199
            mode="fixed",
200
            fixed_dp_size=args.dp_size,
201
            fixed_ep_size=args.ep_size,
202
            fixed_pp_size=args.pp_size,
203
            **mgr_dict,
204
        )
205
    else:
206
        raise ValueError(f"Invalid plugin {args.plugin}")
207
    coordinator.print_on_master(f"Set plugin as {plugin}")
208

209
    # Build OpenMoe model
210
    repo_name = "hpcai-tech/openmoe-" + args.model_name
211
    config = LlamaConfig.from_pretrained(repo_name)
212
    set_openmoe_args(
213
        config,
214
        num_experts=config.num_experts,
215
        moe_layer_interval=config.moe_layer_interval,
216
        enable_load_balance=args.load_balance,
217
        enable_kernel=args.use_kernel,
218
        enable_comm_overlap=args.overlap_comm,
219
        enable_hierarchical_alltoall=args.hierarchical_alltoall,
220
    )
221
    with skip_init():
222
        model = OpenMoeForCausalLM(config)
223
    coordinator.print_on_master(f"Finish init model with config:\n{config}")
224

225
    # Enable gradient checkpointing
226
    model.gradient_checkpointing_enable()
227

228
    # Prepare tokenizer and dataloader
229
    tokenizer = T5Tokenizer.from_pretrained("google/umt5-small")
230
    dataset = RandomDataset(
231
        num_samples=args.batch_size * (args.warmup + args.active + 1) * dp_size,
232
        max_length=args.seq_length,
233
        tokenizer=tokenizer,
234
    )
235
    dataloader = plugin.prepare_dataloader(dataset, batch_size=args.batch_size)
236

237
    # Set optimizer
238
    optimizer = HybridAdam(model.parameters(), weight_decay=0.01, lr=1e-5)
239

240
    model_numel = get_model_numel(model)
241
    performance_evaluator = PerformanceEvaluator(
242
        model_numel,
243
        enable_grad_checkpoint=True,
244
        ignore_steps=args.warmup,
245
        dp_world_size=dp_size,
246
    )
247

248
    # Set booster
249
    booster = Booster(plugin=plugin, **booster_kwargs)
250
    load_ckpt(repo_name, model, booster)
251
    model, optimizer, _, dataloader, _ = booster.boost(model=model, optimizer=optimizer, dataloader=dataloader)
252
    use_pipeline = isinstance(booster.plugin, MoeHybridParallelPlugin) and booster.plugin.pp_size > 1
253
    is_pp_last_stage = use_pipeline and booster.plugin.stage_manager.is_last_stage()
254
    coordinator.print_on_master(f"Finish init booster")
255

256
    # Start finetuning
257
    coordinator.print_on_master(f"Start training")
258
    model.train()
259
    train_dataloader_iter = iter(dataloader)
260
    total_len = len(train_dataloader_iter) - 1
261
    exmaple_data = next(train_dataloader_iter)
262
    with tqdm(range(total_len), disable=not coordinator.is_master()) as pbar:
263
        for step in pbar:
264
            performance_evaluator.on_step_start(step)
265
            if use_pipeline:
266
                # Forward pass
267
                outputs = booster.execute_pipeline(
268
                    train_dataloader_iter,
269
                    model,
270
                    lambda x, y: x.loss,
271
                    optimizer,
272
                    return_loss=True,
273
                    return_outputs=True,
274
                )
275
                # Backward and optimize
276
                if is_pp_last_stage:
277
                    loss = outputs["loss"]
278
                    pbar.set_postfix({"loss": loss.item()})
279
            else:
280
                # Forward pass
281
                data = next(train_dataloader_iter)
282
                data = move_to_cuda(data, torch.cuda.current_device())
283
                outputs = model(**data)
284
                loss = outputs["loss"]
285
                # Backward
286
                booster.backward(loss, optimizer)
287
                pbar.set_postfix({"loss": loss.item()})
288

289
            optimizer.step()
290
            optimizer.zero_grad()
291
            performance_evaluator.on_step_end(exmaple_data["input_ids"])
292
            if (step == args.warmup // 2) and args.load_balance:
293
                coordinator.print_on_master(f"Apply load balance")
294
                apply_load_balance(model, optimizer)
295
    performance_evaluator.on_fit_end()
296

297

298
if __name__ == "__main__":
299
    main()
300

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.