pytorch-lightning

Форк
0
206 строк · 7.0 Кб
1
"""Demo of a simple transformer language model.
2

3
Code is adapted from the PyTorch examples at
4
https://github.com/pytorch/examples/blob/main/word_language_model
5

6
"""
7

8
import math
9
import os
10
from pathlib import Path
11
from typing import Dict, List, Optional, Tuple
12

13
import torch
14
import torch.nn as nn
15
import torch.nn.functional as F
16
from lightning_utilities.core.imports import RequirementCache
17
from torch import Tensor
18
from torch.nn.modules import MultiheadAttention
19
from torch.utils.data import DataLoader, Dataset
20

21
from lightning.pytorch import LightningModule
22

23
_REQUESTS_AVAILABLE = RequirementCache("requests")
24

25

26
if hasattr(MultiheadAttention, "_reset_parameters") and not hasattr(MultiheadAttention, "reset_parameters"):
27
    # See https://github.com/pytorch/pytorch/issues/107909
28
    MultiheadAttention.reset_parameters = MultiheadAttention._reset_parameters
29

30

31
class Transformer(nn.Module):
32
    def __init__(
33
        self,
34
        vocab_size: int = 33278,  # default for WikiText2
35
        ninp: int = 200,
36
        nhead: int = 2,
37
        nhid: int = 200,
38
        nlayers: int = 2,
39
        dropout: float = 0.2,
40
    ) -> None:
41
        super().__init__()
42
        self.pos_encoder = PositionalEncoding(ninp, dropout)
43
        self.embedding = nn.Embedding(vocab_size, ninp)
44
        self.transformer = nn.Transformer(
45
            d_model=ninp,
46
            nhead=nhead,
47
            num_encoder_layers=nlayers,
48
            num_decoder_layers=nlayers,
49
            dim_feedforward=nhid,
50
            dropout=dropout,
51
            batch_first=True,
52
        )
53
        self.decoder = nn.Linear(ninp, vocab_size)
54

55
        self.ninp = ninp
56
        self.vocab_size = vocab_size
57
        self.src_mask = None
58

59
    def forward(self, inputs: Tensor, target: Tensor, mask: Optional[Tensor] = None) -> Tensor:
60
        _, t = inputs.shape
61

62
        # we assume target is already shifted w.r.t. inputs
63
        if mask is None:
64
            mask = torch.tril(torch.ones(t, t, device=inputs.device)) == 1
65
            mask = mask.float().masked_fill(mask == 0, float("-inf")).masked_fill(mask == 1, float(0.0))
66

67
        src = self.pos_encoder(self.embedding(inputs) * math.sqrt(self.ninp))
68
        target = self.pos_encoder(self.embedding(target) * math.sqrt(self.ninp))
69
        output = self.transformer(src, target, tgt_mask=mask)
70
        output = self.decoder(output)
71
        output = F.log_softmax(output, dim=-1)
72
        output = output.view(-1, self.vocab_size)
73
        return output
74

75

76
class PositionalEncoding(nn.Module):
77
    def __init__(self, dim: int, dropout: float = 0.1, max_len: int = 5000) -> None:
78
        super().__init__()
79
        self.dropout = nn.Dropout(p=dropout)
80
        self.dim = dim
81
        self.max_len = max_len
82
        self.pe: Optional[Tensor] = None
83

84
    def forward(self, x: Tensor) -> Tensor:
85
        if self.pe is None:
86
            # 1) can't use buffer, see https://github.com/pytorch/pytorch/issues/68407
87
            # 2) can't use parameter becauses pe gets sliced and DDP requires all params to participate in forward
88
            # 3) can't make it a `requires_grad=False` parameter because FSDP in PyTorch < 2.1 needs all params to
89
            # require grad
90
            self.pe = self._init_pos_encoding(device=x.device)
91

92
        x + self.pe[: x.size(0), :]
93
        return self.dropout(x)
94

95
    def _init_pos_encoding(self, device: torch.device) -> Tensor:
96
        pe = torch.zeros(self.max_len, self.dim, device=device)
97
        position = torch.arange(0, self.max_len, dtype=torch.float, device=device).unsqueeze(1)
98
        div_term = torch.exp(torch.arange(0, self.dim, 2, device=device).float() * (-math.log(10000.0) / self.dim))
99
        pe[:, 0::2] = torch.sin(position * div_term)
100
        pe[:, 1::2] = torch.cos(position * div_term)
101
        pe = pe.unsqueeze(0).transpose(0, 1)
102
        return pe
103

104

105
class WikiText2(Dataset):
106
    """Mini version of WikiText2."""
107

108
    def __init__(self, data_dir: Path = Path("./data"), block_size: int = 35, download: bool = True) -> None:
109
        super().__init__()
110
        self.path = data_dir / "wikitext-2.txt"
111
        if download:
112
            self.download(self.path)
113
        self.data, self.dictionary = tokenize(self.path)
114
        self.block_size = block_size
115

116
    @property
117
    def vocab_size(self) -> int:
118
        return len(self.dictionary)
119

120
    def __len__(self) -> int:
121
        return len(self.data) // self.block_size - 1
122

123
    def __getitem__(self, index: int) -> Tuple[Tensor, Tensor]:
124
        start = index * self.block_size
125
        end = start + self.block_size
126
        inputs = self.data[start:end]
127
        target = self.data[(start + 1) : (end + 1)]
128
        return inputs, target
129

130
    @staticmethod
131
    def download(destination: Path) -> None:
132
        if not _REQUESTS_AVAILABLE:
133
            raise ModuleNotFoundError(str(_REQUESTS_AVAILABLE))
134

135
        import requests
136

137
        os.makedirs(destination.parent, exist_ok=True)
138
        url = "https://raw.githubusercontent.com/pytorch/examples/main/word_language_model/data/wikitext-2/train.txt"
139
        if os.path.exists(destination):
140
            return
141
        with open(destination, "w") as f:
142
            f.write(requests.get(url).text)
143

144

145
class Dictionary:
146
    def __init__(self) -> None:
147
        self.word2idx: Dict[str, int] = {}
148
        self.idx2word: List[str] = []
149

150
    def add_word(self, word: str) -> int:
151
        if word not in self.word2idx:
152
            self.idx2word.append(word)
153
            self.word2idx[word] = len(self.idx2word) - 1
154
        return self.word2idx[word]
155

156
    def __len__(self) -> int:
157
        return len(self.idx2word)
158

159

160
def tokenize(path: Path) -> Tuple[Tensor, Dictionary]:
161
    dictionary = Dictionary()
162

163
    assert os.path.exists(path)
164
    # Add words to the dictionary
165
    with open(path, encoding="utf8") as f:
166
        for line in f:
167
            words = line.split() + ["<eos>"]
168
            for word in words:
169
                dictionary.add_word(word)
170

171
    # Tokenize file content
172
    with open(path, encoding="utf8") as f:
173
        idss: List[Tensor] = []
174
        for line in f:
175
            words = line.split() + ["<eos>"]
176
            ids: List[int] = []
177
            for word in words:
178
                ids.append(dictionary.word2idx[word])
179
            idss.append(torch.tensor(ids).type(torch.int64))
180

181
    return torch.cat(idss), dictionary
182

183

184
class LightningTransformer(LightningModule):
185
    def __init__(self, vocab_size: int = 33278) -> None:
186
        super().__init__()
187
        self.model = Transformer(vocab_size=vocab_size)
188

189
    def forward(self, inputs: Tensor, target: Tensor) -> Tensor:
190
        return self.model(inputs, target)
191

192
    def training_step(self, batch: Tuple[Tensor, Tensor], batch_idx: int) -> Tensor:
193
        inputs, target = batch
194
        output = self(inputs, target)
195
        loss = torch.nn.functional.nll_loss(output, target.view(-1))
196
        return loss
197

198
    def configure_optimizers(self) -> torch.optim.Optimizer:
199
        return torch.optim.SGD(self.model.parameters(), lr=0.1)
200

201
    def prepare_data(self) -> None:
202
        WikiText2(download=True)
203

204
    def train_dataloader(self) -> DataLoader:
205
        dataset = WikiText2()
206
        return DataLoader(dataset)
207

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.