pytorch
868 строк · 32.4 Кб
1import hypothesis.strategies as st
2from hypothesis import given, assume, settings
3import io
4import math
5import numpy as np
6import os
7import struct
8import unittest
9from pathlib import Path
10from typing import Dict, Generator, List, NamedTuple, Optional, Tuple, Type
11from caffe2.proto import caffe2_pb2
12from caffe2.proto.caffe2_pb2 import BlobSerializationOptions
13from caffe2.python import core, test_util, workspace
14
15if workspace.has_gpu_support:
16DEVICES = [caffe2_pb2.CPU, workspace.GpuDeviceType]
17max_gpuid = workspace.NumGpuDevices() - 1
18else:
19DEVICES = [caffe2_pb2.CPU]
20max_gpuid = 0
21
22
23class MiniDBEntry(NamedTuple):
24key: str
25value_size: int
26
27
28# Utility class for other loading tests, don't add test functions here
29# Inherit from this test instead. If you add a test here,
30# each derived class will inherit it as well and cause test duplication
31class TestLoadSaveBase(test_util.TestCase):
32
33def __init__(self, methodName, db_type='minidb'):
34super().__init__(methodName)
35self._db_type = db_type
36
37@settings(deadline=None)
38@given(src_device_type=st.sampled_from(DEVICES),
39src_gpu_id=st.integers(min_value=0, max_value=max_gpuid),
40dst_device_type=st.sampled_from(DEVICES),
41dst_gpu_id=st.integers(min_value=0, max_value=max_gpuid))
42def load_save(self, src_device_type, src_gpu_id,
43dst_device_type, dst_gpu_id):
44workspace.ResetWorkspace()
45dtypes = [np.float16, np.float32, np.float64, bool, np.int8,
46np.int16, np.int32, np.int64, np.uint8, np.uint16]
47arrays = [np.random.permutation(6).reshape(2, 3).astype(T)
48for T in dtypes]
49assume(core.IsGPUDeviceType(src_device_type) or src_gpu_id == 0)
50assume(core.IsGPUDeviceType(dst_device_type) or dst_gpu_id == 0)
51src_device_option = core.DeviceOption(
52src_device_type, src_gpu_id)
53dst_device_option = core.DeviceOption(
54dst_device_type, dst_gpu_id)
55
56for i, arr in enumerate(arrays):
57self.assertTrue(workspace.FeedBlob(str(i), arr, src_device_option))
58self.assertTrue(workspace.HasBlob(str(i)))
59
60# Saves the blobs to a local db.
61tmp_folder = self.make_tempdir()
62op = core.CreateOperator(
63"Save",
64[str(i) for i in range(len(arrays))], [],
65absolute_path=1,
66db=str(tmp_folder / "db"), db_type=self._db_type)
67self.assertTrue(workspace.RunOperatorOnce(op))
68
69# Reset the workspace so that anything we load is surely loaded
70# from the serialized proto.
71workspace.ResetWorkspace()
72self.assertEqual(len(workspace.Blobs()), 0)
73
74def _LoadTest(keep_device, device_type, gpu_id, blobs, loadAll):
75"""A helper subfunction to test keep and not keep."""
76op = core.CreateOperator(
77"Load",
78[], blobs,
79absolute_path=1,
80db=str(tmp_folder / "db"), db_type=self._db_type,
81device_option=dst_device_option,
82keep_device=keep_device,
83load_all=loadAll)
84self.assertTrue(workspace.RunOperatorOnce(op))
85for i, arr in enumerate(arrays):
86self.assertTrue(workspace.HasBlob(str(i)))
87fetched = workspace.FetchBlob(str(i))
88self.assertEqual(fetched.dtype, arr.dtype)
89np.testing.assert_array_equal(
90workspace.FetchBlob(str(i)), arr)
91proto = caffe2_pb2.BlobProto()
92proto.ParseFromString(workspace.SerializeBlob(str(i)))
93self.assertTrue(proto.HasField('tensor'))
94self.assertEqual(proto.tensor.device_detail.device_type,
95device_type)
96if core.IsGPUDeviceType(device_type):
97self.assertEqual(proto.tensor.device_detail.device_id,
98gpu_id)
99
100blobs = [str(i) for i in range(len(arrays))]
101# Load using device option stored in the proto, i.e.
102# src_device_option
103_LoadTest(1, src_device_type, src_gpu_id, blobs, 0)
104# Load again, but this time load into dst_device_option.
105_LoadTest(0, dst_device_type, dst_gpu_id, blobs, 0)
106# Load back to the src_device_option to see if both paths are able
107# to reallocate memory.
108_LoadTest(1, src_device_type, src_gpu_id, blobs, 0)
109# Reset the workspace, and load directly into the dst_device_option.
110workspace.ResetWorkspace()
111_LoadTest(0, dst_device_type, dst_gpu_id, blobs, 0)
112
113# Test load all which loads all blobs in the db into the workspace.
114workspace.ResetWorkspace()
115_LoadTest(1, src_device_type, src_gpu_id, [], 1)
116# Load again making sure that overwrite functionality works.
117_LoadTest(1, src_device_type, src_gpu_id, [], 1)
118# Load again with different device.
119_LoadTest(0, dst_device_type, dst_gpu_id, [], 1)
120workspace.ResetWorkspace()
121_LoadTest(0, dst_device_type, dst_gpu_id, [], 1)
122workspace.ResetWorkspace()
123_LoadTest(1, src_device_type, src_gpu_id, blobs, 1)
124workspace.ResetWorkspace()
125_LoadTest(0, dst_device_type, dst_gpu_id, blobs, 1)
126
127def saveFile(
128self, tmp_folder: Path, db_name: str, db_type: str, start_blob_id: int
129) -> Tuple[str, List[np.ndarray]]:
130dtypes = [np.float16, np.float32, np.float64, bool, np.int8,
131np.int16, np.int32, np.int64, np.uint8, np.uint16]
132arrays = [np.random.permutation(6).reshape(2, 3).astype(T)
133for T in dtypes]
134
135for i, arr in enumerate(arrays):
136self.assertTrue(workspace.FeedBlob(str(i + start_blob_id), arr))
137self.assertTrue(workspace.HasBlob(str(i + start_blob_id)))
138
139# Saves the blobs to a local db.
140tmp_file = str(tmp_folder / db_name)
141op = core.CreateOperator(
142"Save",
143[str(i + start_blob_id) for i in range(len(arrays))], [],
144absolute_path=1,
145db=tmp_file, db_type=db_type)
146workspace.RunOperatorOnce(op)
147return tmp_file, arrays
148
149
150class TestLoadSave(TestLoadSaveBase):
151
152def testLoadSave(self):
153self.load_save()
154
155def testRepeatedArgs(self):
156dtypes = [np.float16, np.float32, np.float64, bool, np.int8,
157np.int16, np.int32, np.int64, np.uint8, np.uint16]
158arrays = [np.random.permutation(6).reshape(2, 3).astype(T)
159for T in dtypes]
160
161for i, arr in enumerate(arrays):
162self.assertTrue(workspace.FeedBlob(str(i), arr))
163self.assertTrue(workspace.HasBlob(str(i)))
164
165# Saves the blobs to a local db.
166tmp_folder = self.make_tempdir()
167op = core.CreateOperator(
168"Save",
169[str(i) for i in range(len(arrays))] * 2, [],
170absolute_path=1,
171db=str(tmp_folder / "db"), db_type=self._db_type)
172with self.assertRaises(RuntimeError):
173workspace.RunOperatorOnce(op)
174
175def testLoadExcessblobs(self):
176tmp_folder = self.make_tempdir()
177tmp_file, arrays = self.saveFile(tmp_folder, "db", self._db_type, 0)
178
179op = core.CreateOperator(
180"Load",
181[], [str(i) for i in range(len(arrays))] * 2,
182absolute_path=1,
183db=tmp_file, db_type=self._db_type,
184load_all=False)
185with self.assertRaises(RuntimeError):
186workspace.RunOperatorOnce(op)
187
188op = core.CreateOperator(
189"Load",
190[], [str(len(arrays) + i) for i in [-1, 0]],
191absolute_path=1,
192db=tmp_file, db_type=self._db_type,
193load_all=True)
194with self.assertRaises(RuntimeError):
195workspace.ResetWorkspace()
196workspace.RunOperatorOnce(op)
197
198op = core.CreateOperator(
199"Load",
200[], [str(len(arrays) + i) for i in range(2)],
201absolute_path=1,
202db=tmp_file, db_type=self._db_type,
203load_all=True)
204with self.assertRaises(RuntimeError):
205workspace.ResetWorkspace()
206workspace.RunOperatorOnce(op)
207
208def testTruncatedFile(self):
209tmp_folder = self.make_tempdir()
210tmp_file, arrays = self.saveFile(tmp_folder, "db", self._db_type, 0)
211
212with open(tmp_file, 'wb+') as fdest:
213fdest.seek(20, os.SEEK_END)
214fdest.truncate()
215
216op = core.CreateOperator(
217"Load",
218[], [str(i) for i in range(len(arrays))],
219absolute_path=1,
220db=tmp_file, db_type=self._db_type,
221load_all=False)
222with self.assertRaises(RuntimeError):
223workspace.RunOperatorOnce(op)
224
225op = core.CreateOperator(
226"Load",
227[], [],
228absolute_path=1,
229db=tmp_file, db_type=self._db_type,
230load_all=True)
231with self.assertRaises(RuntimeError):
232workspace.RunOperatorOnce(op)
233
234def testBlobNameOverrides(self):
235original_names = ['blob_a', 'blob_b', 'blob_c']
236new_names = ['x', 'y', 'z']
237blobs = [np.random.permutation(6) for i in range(3)]
238for i, blob in enumerate(blobs):
239self.assertTrue(workspace.FeedBlob(original_names[i], blob))
240self.assertTrue(workspace.HasBlob(original_names[i]))
241self.assertEqual(len(workspace.Blobs()), 3)
242
243# Saves the blobs to a local db.
244tmp_folder = self.make_tempdir()
245with self.assertRaises(RuntimeError):
246workspace.RunOperatorOnce(
247core.CreateOperator(
248"Save", original_names, [],
249absolute_path=1,
250strip_prefix='.temp',
251blob_name_overrides=new_names,
252db=str(tmp_folder / "db"),
253db_type=self._db_type
254)
255)
256self.assertTrue(
257workspace.RunOperatorOnce(
258core.CreateOperator(
259"Save", original_names, [],
260absolute_path=1,
261blob_name_overrides=new_names,
262db=str(tmp_folder / "db"),
263db_type=self._db_type
264)
265)
266)
267self.assertTrue(workspace.ResetWorkspace())
268self.assertEqual(len(workspace.Blobs()), 0)
269self.assertTrue(
270workspace.RunOperatorOnce(
271core.CreateOperator(
272"Load", [], [],
273absolute_path=1,
274db=str(tmp_folder / "db"),
275db_type=self._db_type,
276load_all=1
277)
278)
279)
280self.assertEqual(len(workspace.Blobs()), 3)
281for i, name in enumerate(new_names):
282self.assertTrue(workspace.HasBlob(name))
283self.assertTrue((workspace.FetchBlob(name) == blobs[i]).all())
284# moved here per @cxj's suggestion
285load_new_names = ['blob_x', 'blob_y', 'blob_z']
286# load 'x' into 'blob_x'
287self.assertTrue(
288workspace.RunOperatorOnce(
289core.CreateOperator(
290"Load", [], load_new_names[0:1],
291absolute_path=1,
292db=str(tmp_folder / "db"),
293db_type=self._db_type,
294source_blob_names=new_names[0:1]
295)
296)
297)
298# we should have 'blob_a/b/c/' and 'blob_x' now
299self.assertEqual(len(workspace.Blobs()), 4)
300for i, name in enumerate(load_new_names[0:1]):
301self.assertTrue(workspace.HasBlob(name))
302self.assertTrue((workspace.FetchBlob(name) == blobs[i]).all())
303self.assertTrue(
304workspace.RunOperatorOnce(
305core.CreateOperator(
306"Load", [], load_new_names[0:3],
307absolute_path=1,
308db=str(tmp_folder / "db"),
309db_type=self._db_type,
310source_blob_names=new_names[0:3]
311)
312)
313)
314# we should have 'blob_a/b/c/' and 'blob_x/y/z' now
315self.assertEqual(len(workspace.Blobs()), 6)
316for i, name in enumerate(load_new_names[0:3]):
317self.assertTrue(workspace.HasBlob(name))
318self.assertTrue((workspace.FetchBlob(name) == blobs[i]).all())
319
320def testMissingFile(self):
321tmp_folder = self.make_tempdir()
322tmp_file = tmp_folder / "missing_db"
323
324op = core.CreateOperator(
325"Load",
326[], [],
327absolute_path=1,
328db=str(tmp_file), db_type=self._db_type,
329load_all=True)
330with self.assertRaises(RuntimeError):
331try:
332workspace.RunOperatorOnce(op)
333except RuntimeError as e:
334print(e)
335raise
336
337def testLoadMultipleFilesGivenSourceBlobNames(self):
338tmp_folder = self.make_tempdir()
339db_file_1, arrays_1 = self.saveFile(tmp_folder, "db1", self._db_type, 0)
340db_file_2, arrays_2 = self.saveFile(
341tmp_folder, "db2", self._db_type, len(arrays_1)
342)
343db_files = [db_file_1, db_file_2]
344blobs_names = [str(i) for i in range(len(arrays_1) + len(arrays_2))]
345
346workspace.ResetWorkspace()
347self.assertEqual(len(workspace.Blobs()), 0)
348self.assertTrue(
349workspace.RunOperatorOnce(
350core.CreateOperator(
351"Load",
352[], blobs_names,
353absolute_path=1,
354dbs=db_files, db_type=self._db_type,
355source_blob_names=blobs_names
356)
357)
358)
359self.assertEqual(len(workspace.Blobs()), len(blobs_names))
360for i in range(len(arrays_1)):
361np.testing.assert_array_equal(
362workspace.FetchBlob(str(i)), arrays_1[i]
363)
364for i in range(len(arrays_2)):
365np.testing.assert_array_equal(
366workspace.FetchBlob(str(i + len(arrays_1))), arrays_2[i]
367)
368
369def testLoadAllMultipleFiles(self):
370tmp_folder = self.make_tempdir()
371db_file_1, arrays_1 = self.saveFile(tmp_folder, "db1", self._db_type, 0)
372db_file_2, arrays_2 = self.saveFile(
373tmp_folder, "db2", self._db_type, len(arrays_1)
374)
375db_files = [db_file_1, db_file_2]
376
377workspace.ResetWorkspace()
378self.assertEqual(len(workspace.Blobs()), 0)
379self.assertTrue(
380workspace.RunOperatorOnce(
381core.CreateOperator(
382"Load",
383[], [],
384absolute_path=1,
385dbs=db_files, db_type=self._db_type,
386load_all=True
387)
388)
389)
390self.assertEqual(len(workspace.Blobs()), len(arrays_1) + len(arrays_2))
391for i in range(len(arrays_1)):
392np.testing.assert_array_equal(
393workspace.FetchBlob(str(i)), arrays_1[i]
394)
395for i in range(len(arrays_2)):
396np.testing.assert_array_equal(
397workspace.FetchBlob(str(i + len(arrays_1))), arrays_2[i]
398)
399
400def testLoadAllMultipleFilesWithSameKey(self):
401tmp_folder = self.make_tempdir()
402db_file_1, arrays_1 = self.saveFile(tmp_folder, "db1", self._db_type, 0)
403db_file_2, arrays_2 = self.saveFile(tmp_folder, "db2", self._db_type, 0)
404
405db_files = [db_file_1, db_file_2]
406workspace.ResetWorkspace()
407self.assertEqual(len(workspace.Blobs()), 0)
408op = core.CreateOperator(
409"Load",
410[], [],
411absolute_path=1,
412dbs=db_files, db_type=self._db_type,
413load_all=True)
414with self.assertRaises(RuntimeError):
415workspace.RunOperatorOnce(op)
416
417def testLoadRepeatedFiles(self):
418tmp_folder = self.make_tempdir()
419tmp_file, arrays = self.saveFile(tmp_folder, "db", self._db_type, 0)
420
421db_files = [tmp_file, tmp_file]
422workspace.ResetWorkspace()
423self.assertEqual(len(workspace.Blobs()), 0)
424op = core.CreateOperator(
425"Load",
426[], [str(i) for i in range(len(arrays))],
427absolute_path=1,
428dbs=db_files, db_type=self._db_type,
429load_all=False)
430with self.assertRaises(RuntimeError):
431workspace.RunOperatorOnce(op)
432
433def testLoadWithDBOptions(self) -> None:
434tmp_folder = self.make_tempdir()
435tmp_file, arrays = self.saveFile(tmp_folder, "db", self._db_type, 0)
436
437db_files = [tmp_file, tmp_file]
438workspace.ResetWorkspace()
439self.assertEqual(len(workspace.Blobs()), 0)
440
441db_options = b"test_db_options"
442op = core.CreateOperator(
443"Load",
444[], [str(i) for i in range(len(arrays))],
445absolute_path=1,
446dbs=db_files, db_type=self._db_type,
447load_all=False,
448db_options=db_options,
449)
450with self.assertRaises(RuntimeError):
451workspace.RunOperatorOnce(op)
452
453def create_test_blobs(
454self, size: int = 1234, feed: bool = True
455) -> List[Tuple[str, np.ndarray]]:
456def int_array(dtype: Type[np.integer], size: int) -> np.ndarray:
457info = np.iinfo(dtype)
458return np.random.randint(info.min, info.max, size, dtype=dtype)
459
460def float_array(dtype: Type[np.floating], size: int) -> np.ndarray:
461return np.random.random_sample(size).astype(dtype)
462
463blobs = [
464("int8_data", int_array(np.int8, size)),
465("int16_data", int_array(np.int16, size)),
466("int32_data", int_array(np.int32, size)),
467("int64_data", int_array(np.int64, size)),
468("uint8_data", int_array(np.uint8, size)),
469("uint16_data", int_array(np.uint16, size)),
470("float16_data", float_array(np.float16, size)),
471("float32_data", float_array(np.float32, size)),
472("float64_data", float_array(np.float64, size)),
473]
474
475if feed:
476for name, data in blobs:
477workspace.FeedBlob(name, data)
478
479return blobs
480
481def load_blobs(
482self,
483blob_names: List[str],
484dbs: List[str],
485db_type: Optional[str] = None
486) -> None:
487workspace.ResetWorkspace()
488self.assertEqual(len(workspace.Blobs()), 0)
489load_op = core.CreateOperator(
490"Load",
491[],
492blob_names,
493absolute_path=1,
494dbs=dbs,
495db_type=db_type or self._db_type,
496)
497self.assertTrue(workspace.RunOperatorOnce(load_op))
498self.assertEqual(len(workspace.Blobs()), len(blob_names))
499
500def load_and_check_blobs(
501self,
502blobs: List[Tuple[str, np.ndarray]],
503dbs: List[str],
504db_type: Optional[str] = None
505) -> None:
506self.load_blobs([name for name, data in blobs], dbs, db_type)
507for name, data in blobs:
508np.testing.assert_array_equal(workspace.FetchBlob(name), data)
509
510def _read_minidb_entries(
511self, path: Path
512) -> Generator[MiniDBEntry, None, None]:
513"""Read the entry information out of a minidb file.
514"""
515header = struct.Struct("=ii")
516with path.open("rb") as f:
517while True:
518buf = f.read(header.size)
519if not buf:
520break
521if len(buf) < header.size:
522raise Exception("early EOF in minidb header")
523(key_len, value_len) = header.unpack(buf)
524if key_len < 0 or value_len < 0:
525raise Exception(
526f"invalid minidb header: ({key_len}, {value_len})"
527)
528key = f.read(key_len)
529if len(key) < key_len:
530raise Exception("early EOF in minidb key")
531f.seek(value_len, io.SEEK_CUR)
532yield MiniDBEntry(key=key.decode("utf-8"), value_size=value_len)
533
534def _read_chunk_info(self, path: Path) -> Dict[str, List[MiniDBEntry]]:
535"""Read a minidb file and return the names of each blob and how many
536chunks are stored for that blob.
537"""
538chunk_id_separator = "#%"
539results: Dict[str, List[MiniDBEntry]] = {}
540for entry in self._read_minidb_entries(path):
541parts = entry.key.rsplit(chunk_id_separator, 1)
542if len(parts) == 0:
543assert entry.key not in results
544results[entry.key] = [entry]
545else:
546blob_name = parts[0]
547results.setdefault(blob_name, [])
548results[blob_name].append(entry)
549
550return results
551
552def _test_save_with_chunk_size(
553self, num_elems: int, chunk_size: int, expected_num_chunks: int,
554) -> None:
555tmp_folder = self.make_tempdir()
556tmp_file = str(tmp_folder / "save.output")
557
558blobs = self.create_test_blobs(num_elems)
559
560# Saves the blobs to a local db.
561save_op = core.CreateOperator(
562"Save",
563[name for name, data in blobs],
564[],
565absolute_path=1,
566db=tmp_file,
567db_type=self._db_type,
568chunk_size=chunk_size,
569)
570self.assertTrue(workspace.RunOperatorOnce(save_op))
571
572self.load_and_check_blobs(blobs, [tmp_file])
573
574blob_chunks = self._read_chunk_info(Path(tmp_file))
575for blob_name, chunks in blob_chunks.items():
576self.assertEqual(len(chunks), expected_num_chunks)
577
578def testSaveWithChunkSize(self) -> None:
579num_elems = 1234
580chunk_size = 32
581expected_num_chunks = math.ceil(num_elems / chunk_size)
582self._test_save_with_chunk_size(
583num_elems=num_elems,
584chunk_size=chunk_size,
585expected_num_chunks=expected_num_chunks,
586)
587
588def testSaveWithDefaultChunkSize(self) -> None:
589# This is the default value of the --caffe2_tensor_chunk_size flag from
590# core/blob_serialization.cc
591#
592# Test with just slightly more than this to ensure that 2 chunks are
593# used.
594default_chunk_size = 1000000
595self._test_save_with_chunk_size(
596num_elems=default_chunk_size + 10,
597chunk_size=-1,
598expected_num_chunks=2,
599)
600
601def testSaveWithNoChunking(self) -> None:
602default_chunk_size = 1000000
603self._test_save_with_chunk_size(
604num_elems=default_chunk_size + 10,
605chunk_size=0,
606expected_num_chunks=1,
607)
608
609def testSaveWithOptions(self) -> None:
610tmp_folder = self.make_tempdir()
611tmp_file = str(tmp_folder / "save.output")
612
613num_elems = 1234
614blobs = self.create_test_blobs(num_elems)
615
616# Saves the blobs to a local db.
617save_op = core.CreateOperator(
618"Save",
619[name for name, data in blobs],
620[],
621absolute_path=1,
622db=tmp_file,
623db_type=self._db_type,
624chunk_size=40,
625options=caffe2_pb2.SerializationOptions(
626options=[
627BlobSerializationOptions(
628blob_name_regex="int16_data", chunk_size=10
629),
630BlobSerializationOptions(
631blob_name_regex=".*16_data", chunk_size=20
632),
633BlobSerializationOptions(
634blob_name_regex="float16_data", chunk_size=30
635),
636],
637),
638)
639self.assertTrue(workspace.RunOperatorOnce(save_op))
640
641self.load_and_check_blobs(blobs, [tmp_file])
642
643blob_chunks = self._read_chunk_info(Path(tmp_file))
644# We explicitly set a chunk_size of 10 for int16_data
645self.assertEqual(
646len(blob_chunks["int16_data"]), math.ceil(num_elems / 10)
647)
648# uint16_data should match the .*16_data pattern, and get a size of 20
649self.assertEqual(
650len(blob_chunks["uint16_data"]), math.ceil(num_elems / 20)
651)
652# float16_data should also match the .*16_data pattern, and get a size
653# of 20. The explicitly float16_data rule came after the .*16_data
654# pattern, so it has lower precedence and will be ignored.
655self.assertEqual(
656len(blob_chunks["float16_data"]), math.ceil(num_elems / 20)
657)
658# int64_data will get the default chunk_size of 40
659self.assertEqual(
660len(blob_chunks["int64_data"]), math.ceil(num_elems / 40)
661)
662
663
664def testSaveWithDBOptions(self) -> None:
665num_elems = 1234
666chunk_size = 32
667expected_num_chunks = math.ceil(num_elems / chunk_size)
668
669tmp_folder = self.make_tempdir()
670tmp_file = str(tmp_folder / "save.output")
671
672blobs = self.create_test_blobs(num_elems)
673
674db_options = b"test_db_options"
675# Saves the blobs to a local db.
676save_op = core.CreateOperator(
677"Save",
678[name for name, data in blobs],
679[],
680absolute_path=1,
681db=tmp_file,
682db_type=self._db_type,
683chunk_size=chunk_size,
684db_options=db_options,
685)
686self.assertTrue(workspace.RunOperatorOnce(save_op))
687
688self.load_and_check_blobs(blobs, [tmp_file])
689
690blob_chunks = self._read_chunk_info(Path(tmp_file))
691for blob_name, chunks in blob_chunks.items():
692self.assertEqual(len(chunks), expected_num_chunks)
693
694def testSaveFloatToBfloat16(self) -> None:
695tmp_folder = self.make_tempdir()
696tmp_file = str(tmp_folder / "save.output")
697
698# Create 2 blobs with the same float data
699float_data = np.random.random_sample(4000).astype(np.float32)
700workspace.FeedBlob("float1", float_data)
701workspace.FeedBlob("float2", float_data)
702blob_names = ["float1", "float2"]
703
704# Serialize the data, using bfloat16 serialization for one of the blobs
705save_op = core.CreateOperator(
706"Save",
707blob_names,
708[],
709absolute_path=1,
710db=tmp_file,
711db_type=self._db_type,
712options=caffe2_pb2.SerializationOptions(
713options=[
714BlobSerializationOptions(
715blob_name_regex="float1",
716float_format=BlobSerializationOptions.FLOAT_BFLOAT16,
717),
718],
719),
720)
721self.assertTrue(workspace.RunOperatorOnce(save_op))
722
723# As long as fbgemm was available for us to perform bfloat16 conversion,
724# the serialized data for float1 should be almost half the size of float2
725if workspace.has_fbgemm:
726blob_chunks = self._read_chunk_info(Path(tmp_file))
727self.assertEqual(len(blob_chunks["float1"]), 1, blob_chunks["float1"])
728self.assertEqual(len(blob_chunks["float2"]), 1, blob_chunks["float2"])
729self.assertLess(
730blob_chunks["float1"][0].value_size,
7310.6 * blob_chunks["float2"][0].value_size
732)
733
734self.load_blobs(blob_names, [tmp_file])
735
736# float2 should be exactly the same as the input data
737np.testing.assert_array_equal(workspace.FetchBlob("float2"), float_data)
738# float2 should be close-ish to the input data
739np.testing.assert_array_almost_equal(
740workspace.FetchBlob("float1"), float_data, decimal=2
741)
742
743def testEstimateBlobSizes(self) -> None:
744# Create some blobs to test with
745float_data = np.random.random_sample(4000).astype(np.float32)
746workspace.FeedBlob("float1", float_data)
747workspace.FeedBlob("float2", float_data)
748workspace.FeedBlob(
749"float3", np.random.random_sample(2).astype(np.float32)
750)
751workspace.FeedBlob(
752"ui16", np.random.randint(0, 0xffff, size=1024, dtype=np.uint16)
753)
754
755# Estimate the serialized size of the data.
756# Request bfloat16 serialization for one of the float blobs, just to
757# exercise size estimation when using this option.
758options = caffe2_pb2.SerializationOptions(
759options=[
760BlobSerializationOptions(
761blob_name_regex="float1",
762float_format=BlobSerializationOptions.FLOAT_BFLOAT16,
763chunk_size=500,
764),
765],
766)
767get_blobs_op = core.CreateOperator(
768"EstimateAllBlobSizes",
769[],
770["blob_names", "blob_sizes"],
771options=options,
772)
773self.assertTrue(workspace.RunOperatorOnce(get_blobs_op))
774blob_names = workspace.FetchBlob("blob_names")
775blob_sizes = workspace.FetchBlob("blob_sizes")
776
777sizes_by_name: Dict[str, int] = {}
778for idx, name in enumerate(blob_names):
779sizes_by_name[name.decode("utf-8")] = blob_sizes[idx]
780
781# Note that the output blob list will include our output blob names.
782expected_blobs = [
783"float1", "float2", "float3", "ui16",
784"blob_names", "blob_sizes"
785]
786self.assertEqual(set(sizes_by_name.keys()), set(expected_blobs))
787
788def check_expected_blob_size(
789name: str, num_elems: int, elem_size: int, num_chunks: int = 1
790) -> None:
791# The estimation code applies a fixed 40 byte per-chunk overhead to
792# account for the extra space required for other fixed TensorProto
793# message fields.
794per_chunk_overhead = 50
795expected_size = (
796(num_chunks * (len(name) + per_chunk_overhead))
797+ (num_elems * elem_size)
798)
799self.assertEqual(
800sizes_by_name[name],
801expected_size,
802f"expected size mismatch for {name}"
803)
804
805check_expected_blob_size("ui16", 1024, 3)
806check_expected_blob_size("float2", 4000, 4)
807check_expected_blob_size("float3", 2, 4)
808
809# Our serialization options request to split float1 into 500-element
810# chunks when saving it. If fbgemm is available then the float1 blob
811# will be serialized using 2 bytes per element instead of 4 bytes.
812float1_num_chunks = 4000 // 500
813if workspace.has_fbgemm:
814check_expected_blob_size("float1", 4000, 2, float1_num_chunks)
815else:
816check_expected_blob_size("float1", 4000, 4, float1_num_chunks)
817
818check_expected_blob_size("blob_names", len(expected_blobs), 50)
819check_expected_blob_size("blob_sizes", len(expected_blobs), 8)
820
821# Now actually save the blobs so we can compare our estimates
822# to how big the serialized data actually is.
823tmp_folder = self.make_tempdir()
824tmp_file = str(tmp_folder / "save.output")
825save_op = core.CreateOperator(
826"Save",
827list(sizes_by_name.keys()),
828[],
829absolute_path=1,
830db=tmp_file,
831db_type=self._db_type,
832options=options,
833)
834self.assertTrue(workspace.RunOperatorOnce(save_op))
835
836blob_chunks = self._read_chunk_info(Path(tmp_file))
837saved_sizes: Dict[str, int] = {}
838for blob_name, chunks in blob_chunks.items():
839total_size = sum(chunk.value_size for chunk in chunks)
840saved_sizes[blob_name] = total_size
841
842# For sanity checking, ensure that our estimates aren't
843# extremely far off
844for name in expected_blobs:
845estimated_size = sizes_by_name[name]
846saved_size = saved_sizes[name]
847difference = abs(estimated_size - saved_size)
848error_pct = 100.0 * (difference / saved_size)
849print(
850f"{name}: estimated={estimated_size} actual={saved_size} "
851f"error={error_pct:.2f}%"
852)
853# Don't check the blob_names blob. It is a string tensor, and we
854# can't estimate string tensor sizes very well without knowing the
855# individual string lengths. (Currently it requires 102 bytes to
856# save, but we estimate 360).
857if name == "blob_names":
858continue
859# Check that we are within 100 bytes, or within 25%
860# We are generally quite close for tensors with fixed-width fields
861# (like float), but a little farther off for tensors that use varint
862# encoding.
863if difference > 100:
864self.assertLess(error_pct, 25.0)
865
866
867if __name__ == '__main__':
868unittest.main()
869