1
# Copyright (c) 2024, EleutherAI
2
# This file is based on code by the authors denoted below and has been modified from its original version.
4
# Copyright (c) Facebook, Inc. and its affiliates.
6
# This source code is licensed under the MIT license found in the
7
# LICENSE file in the root directory of this source tree.
10
# copied from fairseq/fairseq/data/indexed_dataset.py
11
# Removed IndexedRawTextDataset since it relied on Fairseq dictionary
12
# other slight modifications to remove fairseq dependencies
13
# Added document index to index file and made it accessible.
14
# An empty sentence no longer separates documents.
19
from functools import lru_cache
20
from itertools import accumulate
25
from megatron import print_rank_0
28
def __best_fitting_dtype(vocab_size=None):
29
if vocab_size is not None and vocab_size < 65500:
35
def infer_dataset_impl(path):
36
if IndexedDataset.exists(path):
37
with open(index_file_path(path), "rb") as f:
39
if magic == IndexedDataset._HDR_MAGIC:
41
elif magic == MMapIndexedDataset.Index._HDR_MAGIC[:8]:
46
print(f"Dataset does not exist: {path}")
48
"Path should be a basename that both .idx and .bin can be appended to get full filenames."
53
def make_builder(out_file, impl, vocab_size=None):
55
return MMapIndexedDatasetBuilder(
56
out_file, dtype=__best_fitting_dtype(vocab_size)
59
return IndexedDatasetBuilder(out_file)
62
def make_dataset(path, impl, skip_warmup=False):
63
if not IndexedDataset.exists(path):
64
print(f"Dataset does not exist: {path}")
66
"Path should be a basename that both .idx and .bin can be appended to get full filenames."
70
impl = infer_dataset_impl(path)
71
elif impl == "cached" and IndexedDataset.exists(path):
72
return IndexedCachedDataset(path)
73
elif impl == "mmap" and MMapIndexedDataset.exists(path):
74
return MMapIndexedDataset(path, skip_warmup)
75
print(f"Unknown dataset implementation: {impl}")
79
def dataset_exists(path, impl):
81
return MMapIndexedDataset.exists(path)
83
return IndexedDataset.exists(path)
87
a = np.empty(n, dtype=np.int64)
93
f.write(np.array(a, dtype=np.int64))
109
for k in dtypes.keys():
110
if dtypes[k] == dtype:
112
raise ValueError(dtype)
115
def index_file_path(prefix_path):
116
return prefix_path + ".idx"
119
def data_file_path(prefix_path):
120
return prefix_path + ".bin"
123
def create_doc_idx(sizes):
125
for i, s in enumerate(sizes):
127
doc_idx.append(i + 1)
131
class IndexedDataset(torch.utils.data.Dataset):
132
"""Loader for IndexedDataset"""
134
_HDR_MAGIC = b"TNTIDX\x00\x00"
136
def __init__(self, path):
139
self.data_file = None
140
self.read_index(path)
142
def read_index(self, path):
143
with open(index_file_path(path), "rb") as f:
145
assert magic == self._HDR_MAGIC, (
146
"Index file doesn't match expected format. "
147
"Make sure that --dataset-impl is configured properly."
150
assert struct.unpack("<Q", version) == (1,)
151
code, self.element_size = struct.unpack("<QQ", f.read(16))
152
self.dtype = dtypes[code]
153
self._len, self.s = struct.unpack("<QQ", f.read(16))
154
self.doc_count = struct.unpack("<Q", f.read(8))
155
self.dim_offsets = read_longs(f, self._len + 1)
156
self.data_offsets = read_longs(f, self._len + 1)
157
self.sizes = read_longs(f, self.s)
158
self.doc_idx = read_longs(f, self.doc_count)
160
def read_data(self, path):
161
self.data_file = open(data_file_path(path), "rb", buffering=0)
163
def check_index(self, i):
164
if i < 0 or i >= self._len:
165
raise IndexError("index out of range")
169
self.data_file.close()
171
# @lru_cache(maxsize=8)
172
def __getitem__(self, idx):
173
if not self.data_file:
174
self.read_data(self.path)
175
if isinstance(idx, int):
178
tensor_size = self.sizes[self.dim_offsets[i] : self.dim_offsets[i + 1]]
179
a = np.empty(tensor_size, dtype=self.dtype)
180
self.data_file.seek(self.data_offsets[i] * self.element_size)
181
self.data_file.readinto(a)
183
elif isinstance(idx, slice):
184
start, stop, step = idx.indices(len(self))
186
raise ValueError("Slices into indexed_dataset must be contiguous")
187
sizes = self.sizes[self.dim_offsets[start] : self.dim_offsets[stop]]
189
a = np.empty(size, dtype=self.dtype)
190
self.data_file.seek(self.data_offsets[start] * self.element_size)
191
self.data_file.readinto(a)
192
offsets = list(accumulate(sizes))
193
sents = np.split(a, offsets[:-1])
199
def num_tokens(self, index):
200
return self.sizes[index]
202
def size(self, index):
203
return self.sizes[index]
207
return os.path.exists(index_file_path(path)) and os.path.exists(
212
def supports_prefetch(self):
213
return False # avoid prefetching to save memory
216
class IndexedCachedDataset(IndexedDataset):
217
def __init__(self, path):
218
super().__init__(path)
220
self.cache_index = {}
223
def supports_prefetch(self):
226
def prefetch(self, indices):
227
if all(i in self.cache_index for i in indices):
229
if not self.data_file:
230
self.read_data(self.path)
231
indices = sorted(set(indices))
234
total_size += self.data_offsets[i + 1] - self.data_offsets[i]
235
self.cache = np.empty(total_size, dtype=self.dtype)
237
self.cache_index.clear()
239
self.cache_index[i] = ptx
240
size = self.data_offsets[i + 1] - self.data_offsets[i]
241
a = self.cache[ptx : ptx + size]
242
self.data_file.seek(self.data_offsets[i] * self.element_size)
243
self.data_file.readinto(a)
246
# close and delete data file after prefetch so we can pickle
247
self.data_file.close()
248
self.data_file = None
250
# @lru_cache(maxsize=8)
251
def __getitem__(self, idx):
252
if isinstance(idx, int):
255
tensor_size = self.sizes[self.dim_offsets[i] : self.dim_offsets[i + 1]]
256
a = np.empty(tensor_size, dtype=self.dtype)
257
ptx = self.cache_index[i]
258
np.copyto(a, self.cache[ptx : ptx + a.size])
260
elif isinstance(idx, slice):
261
# Hack just to make this work, can optimizer later if necessary
263
for i in range(*idx.indices(len(self))):
264
sents.append(self[i])
268
class IndexedDatasetBuilder(object):
279
def __init__(self, out_file, dtype=np.int32):
280
self.out_file = open(out_file, "wb")
282
self.data_offsets = [0]
283
self.dim_offsets = [0]
285
self.element_size = self.element_sizes[self.dtype]
288
def add_item(self, np_array):
289
assert isinstance(np_array, np.ndarray) and np_array.dtype == self.dtype
290
bytes = self.out_file.write(np_array)
291
self.data_offsets.append(self.data_offsets[-1] + bytes / self.element_size)
292
for s in np_array.shape:
294
self.dim_offsets.append(self.dim_offsets[-1] + len(np_array.shape))
296
def end_document(self):
297
self.doc_idx.append(len(self.sizes))
299
def merge_file_(self, another_file):
300
index = IndexedDataset(another_file)
301
assert index.dtype == self.dtype
303
begin = self.data_offsets[-1]
304
for offset in index.data_offsets[1:]:
305
self.data_offsets.append(begin + offset)
306
self.sizes.extend(index.sizes)
307
begin = self.dim_offsets[-1]
308
for dim_offset in index.dim_offsets[1:]:
309
self.dim_offsets.append(begin + dim_offset)
311
with open(data_file_path(another_file), "rb") as f:
315
self.out_file.write(data)
319
def finalize(self, index_file):
320
self.out_file.close()
321
index = open(index_file, "wb")
322
index.write(b"TNTIDX\x00\x00")
323
index.write(struct.pack("<Q", 1))
324
index.write(struct.pack("<QQ", code(self.dtype), self.element_size))
325
index.write(struct.pack("<QQ", len(self.data_offsets) - 1, len(self.sizes)))
326
index.write(struct.pack("<Q", len(self.doc_idx)))
327
write_longs(index, self.dim_offsets)
328
write_longs(index, self.data_offsets)
329
write_longs(index, self.sizes)
330
write_longs(index, self.doc_idx)
334
def _warmup_mmap_file(path):
335
with open(path, "rb") as stream:
336
while stream.read(100 * 1024 * 1024):
340
class MMapIndexedDataset(torch.utils.data.Dataset):
342
_HDR_MAGIC = b"MMIDIDX\x00\x00"
345
def writer(cls, path, dtype):
346
class _Writer(object):
348
self._file = open(path, "wb")
350
# Write Magic string so we can check the file format then opening it again.
351
self._file.write(cls._HDR_MAGIC)
352
# Write version number
353
# Little endian unsigned 64 Bit integer
354
self._file.write(struct.pack("<Q", 1))
355
# Little endian unsigned 8 Bit integer
356
self._file.write(struct.pack("<B", code(dtype)))
361
def _get_pointers(sizes):
362
pointers = np.zeros(len(sizes), dtype=np.int64)
363
sizes = np.array(sizes, dtype=np.int64)
365
np.cumsum(sizes[:-1], out=pointers[1:])
366
pointers = pointers * dtype().itemsize
369
def write(self, sizes, doc_idx):
370
pointers = self._get_pointers(sizes)
372
# Little endian unsigned 64 Bit integer
373
self._file.write(struct.pack("<Q", len(sizes)))
374
# Little endian unsigned 64 Bit integer
375
self._file.write(struct.pack("<Q", len(doc_idx)))
377
sizes = np.array(sizes, dtype=np.int32)
378
self._file.write(sizes.tobytes(order="C"))
381
pointers = np.array(pointers, dtype=np.int64)
382
self._file.write(pointers.tobytes(order="C"))
385
doc_idx = np.array(doc_idx, dtype=np.int64)
386
self._file.write(doc_idx.tobytes(order="C"))
388
def __exit__(self, exc_type, exc_val, exc_tb):
393
def __init__(self, path, skip_warmup=False):
394
with open(path, "rb") as stream:
395
magic_test = stream.read(9)
396
assert self._HDR_MAGIC == magic_test, (
397
"Index file doesn't match expected format. "
398
"Make sure that --dataset-impl is configured properly."
400
# Little endian unsigned 64 Bit integer
401
version = struct.unpack("<Q", stream.read(8))
402
assert (1,) == version
404
# Little endian unsigned 8 Bit integer
405
(dtype_code,) = struct.unpack("<B", stream.read(1))
406
self._dtype = dtypes[dtype_code]
407
self._dtype_size = self._dtype().itemsize
409
self._len = struct.unpack("<Q", stream.read(8))[0]
410
self._doc_count = struct.unpack("<Q", stream.read(8))[0]
411
offset = stream.tell()
414
print_rank_0(" warming up index mmap file...")
415
_warmup_mmap_file(path)
417
self._bin_buffer_mmap = np.memmap(path, mode="r", order="C")
418
self._bin_buffer = memoryview(self._bin_buffer_mmap)
419
print_rank_0(" reading sizes...")
420
self._sizes = np.frombuffer(
421
self._bin_buffer, dtype=np.int32, count=self._len, offset=offset
423
print_rank_0(" reading pointers...")
424
self._pointers = np.frombuffer(
428
offset=offset + self._sizes.nbytes,
430
print_rank_0(" reading document index...")
431
self._doc_idx = np.frombuffer(
434
count=self._doc_count,
435
offset=offset + self._sizes.nbytes + self._pointers.nbytes,
439
self._bin_buffer_mmap._mmap.close()
440
del self._bin_buffer_mmap
454
@lru_cache(maxsize=8)
455
def __getitem__(self, i):
456
return self._pointers[i], self._sizes[i]
461
def __init__(self, path, skip_warmup=False):
466
self._bin_buffer = None
468
self._do_init(path, skip_warmup)
470
def __getstate__(self):
473
def __setstate__(self, state):
476
def _do_init(self, path, skip_warmup):
478
self._index = self.Index(index_file_path(self._path), skip_warmup)
481
print_rank_0(" warming up data mmap file...")
482
_warmup_mmap_file(data_file_path(self._path))
483
print_rank_0(" creating numpy buffer of mmap...")
484
self._bin_buffer_mmap = np.memmap(
485
data_file_path(self._path), mode="r", order="C"
487
print_rank_0(" creating memory view of numpy buffer...")
488
self._bin_buffer = memoryview(self._bin_buffer_mmap)
491
self._bin_buffer_mmap._mmap.close()
492
del self._bin_buffer_mmap
496
return len(self._index)
498
# @lru_cache(maxsize=8)
499
def __getitem__(self, idx):
500
if isinstance(idx, int):
501
ptr, size = self._index[idx]
502
np_array = np.frombuffer(
503
self._bin_buffer, dtype=self._index.dtype, count=size, offset=ptr
506
elif isinstance(idx, slice):
507
start, stop, step = idx.indices(len(self))
509
raise ValueError("Slices into indexed_dataset must be contiguous")
510
ptr = self._index._pointers[start]
511
sizes = self._index._sizes[idx]
512
offsets = list(accumulate(sizes))
513
total_size = sum(sizes)
514
np_array = np.frombuffer(
515
self._bin_buffer, dtype=self._index.dtype, count=total_size, offset=ptr
517
sents = np.split(np_array, offsets[:-1])
520
def get(self, idx, offset=0, length=None):
521
"""Retrieves a single item from the dataset with the option to only
522
return a portion of the item.
524
get(idx) is the same as [idx] but get() does not support slicing.
526
ptr, size = self._index[idx]
528
length = size - offset
529
ptr += offset * np.dtype(self._index.dtype).itemsize
530
np_array = np.frombuffer(
531
self._bin_buffer, dtype=self._index.dtype, count=length, offset=ptr
537
return self._index.sizes
541
return self._index.doc_idx
543
def get_doc_idx(self):
544
return self._index._doc_idx
546
def set_doc_idx(self, doc_idx_):
547
self._index._doc_idx = doc_idx_
550
def supports_prefetch(self):
555
return os.path.exists(index_file_path(path)) and os.path.exists(
560
class MMapIndexedDatasetBuilder(object):
561
def __init__(self, out_file, dtype=np.int64):
562
self._data_file = open(out_file, "wb")
571
def add_item(self, np_array):
572
assert isinstance(np_array, np.ndarray) and np_array.dtype == self.dtype
573
self._data_file.write(np_array.tobytes(order="C"))
574
self._sizes.append(np_array.size)
576
def end_document(self):
577
self._doc_idx.append(len(self._sizes))
579
def merge_file_(self, another_file):
581
index = MMapIndexedDataset.Index(index_file_path(another_file))
582
assert index.dtype == self._dtype
584
for size in index.sizes:
585
self._sizes.append(size)
588
with open(data_file_path(another_file), "rb") as f:
589
shutil.copyfileobj(f, self._data_file)
591
def finalize(self, index_file):
592
self._data_file.close()
594
with MMapIndexedDataset.Index.writer(index_file, self._dtype) as index:
595
index.write(self._sizes, self._doc_idx)