colossalai
126 строк · 4.1 Кб
1from time import time2from typing import Optional3
4import torch5import torch.distributed as dist6import torch.nn as nn7from torch import Tensor8
9from colossalai.logging import DistributedLogger10
11
12def print_model_numel(logger: DistributedLogger, model: nn.Module) -> None:13B = 1024**314M = 1024**215K = 102416outputs = "Model param count: "17model_param = sum(p.numel() for p in model.parameters() if p.requires_grad)18if model_param >= B:19outputs += f"{model_param / B:.2f} B\n"20elif model_param >= M:21outputs += f"{model_param / M:.2f} M\n"22elif model_param >= K:23outputs += f"{model_param / K:.2f} K\n"24else:25outputs += f"{model_param}\n"26logger.info(outputs, ranks=[0])27
28
29def get_model_numel(model: nn.Module) -> None:30model_param = sum(p.numel() for p in model.parameters() if p.requires_grad)31return model_param32
33
34def divide(x: float, y: float) -> float:35if y == 0:36return float("inf")37elif y == float("inf"):38return float("nan")39return x / y40
41
42@torch.no_grad()43def all_reduce_mean(x: float, world_size: int) -> float:44if world_size == 1:45return x46tensor = torch.tensor([x], device=torch.cuda.current_device())47dist.all_reduce(tensor)48tensor = tensor / world_size49return tensor.item()50
51
52class Timer:53
54def __init__(self) -> None:55self.start_time: Optional[float] = None56self.duration: float = 0.057
58def start(self) -> None:59self.start_time = time()60
61def end(self) -> None:62assert self.start_time is not None63self.duration += time() - self.start_time64self.start_time = None65
66def reset(self) -> None:67self.duration = 0.068
69
70class PerformanceEvaluator:71"""72Callback for valuate the performance of the model.
73Args:
74actor_num_params: The number of parameters of the actor model.
75critic_num_params: The number of parameters of the critic model.
76initial_model_num_params: The number of parameters of the initial model.
77reward_model_num_params: The number of parameters of the reward model.
78enable_grad_checkpoint: Whether to enable gradient checkpointing.
79ignore_episodes: The number of episodes to ignore when calculating the performance.
80"""
81
82def __init__(83self,84model_numel: int,85enable_grad_checkpoint: bool = False,86ignore_steps: int = 0,87dp_world_size: Optional[int] = None,88) -> None:89self.model_numel = model_numel90self.enable_grad_checkpoint = enable_grad_checkpoint91self.ignore_steps = ignore_steps92self.dp_world_size = dp_world_size93self.world_size = dist.get_world_size()94self.disable: bool = False95self.timer = Timer()96self.num_samples: int = 097self.flop: int = 098
99def on_step_start(self, step: int) -> None:100self.disable = self.ignore_steps > 0 and step < self.ignore_steps101if self.disable:102return103torch.cuda.synchronize()104self.timer.start()105
106def on_step_end(self, input_ids: Tensor, **kwargs) -> None:107if self.disable:108return109torch.cuda.synchronize()110self.timer.end()111
112batch_size, seq_len = input_ids.shape113
114self.num_samples += batch_size115self.flop += (batch_size * seq_len * self.model_numel * 2 * (3 + int(self.enable_grad_checkpoint)))116
117def on_fit_end(self) -> None:118avg_duration = all_reduce_mean(self.timer.duration, self.world_size)119avg_throughput = self.num_samples * self.dp_world_size / (avg_duration + 1e-12)120mp_world_size = self.world_size // self.dp_world_size121avg_tflops_per_gpu = self.flop / 1e12 / (avg_duration + 1e-12) / mp_world_size122if dist.get_rank() == 0:123print(124f"num_samples: {self.num_samples}, dp_world_size: {self.dp_world_size}, flop: {self.flop}, avg_duration: {avg_duration}, "125f"avg_throughput: {avg_throughput}")126print(f"Throughput: {avg_throughput:.2f} samples/sec, TFLOPS per GPU: {avg_tflops_per_gpu:.2f}")127