12
LZ4 = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/../lz4")
13
if not os.path.exists(LZ4):
14
LZ4 = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/../programs/lz4")
15
TEMP = tempfile.gettempdir()
18
class NVerboseFileInfo:
19
def __init__(self, line_in):
21
splitlines = line_in.split()
22
if len(splitlines) != 7:
23
errout(f"Unexpected line: {line_in}")
24
self.frames, self.type, self.block, self.compressed, self.uncompressed, self.ratio, self.filename = splitlines
27
if "concat-all" in self.filename or "2f--content-size" in self.filename:
29
self.exp_unc_size += os.path.getsize(f"{TEMP}/test_list_{i}M")
31
uncompressed_filename = self.filename.split("-")[0]
32
self.exp_unc_size += os.path.getsize(f"{TEMP}/{uncompressed_filename}")
33
self.exp_comp_size = os.path.getsize(f"{TEMP}/{self.filename}")
36
class TestNonVerbose(unittest.TestCase):
40
test_list_files = glob.glob(f"{TEMP}/test_list_*.lz4")
42
for i, filename in enumerate(test_list_files):
43
for i, line in enumerate(execute(f"{LZ4} --list -m {filename}", print_output=True)):
45
self.nvinfo_list.append(NVerboseFileInfo(line))
47
def test_frames(self):
49
all_concat_index = None
50
for i, nvinfo in enumerate(self.nvinfo_list):
51
if "concat-all" in nvinfo.filename:
53
elif "2f--content-size" in nvinfo.filename:
54
self.assertEqual("2", nvinfo.frames, nvinfo.line)
55
all_concat_frames += 2
57
self.assertEqual("1", nvinfo.frames, nvinfo.line)
58
all_concat_frames += 1
59
self.assertNotEqual(None, all_concat_index, "Couldn't find concat-all file index.")
60
self.assertEqual(self.nvinfo_list[all_concat_index].frames, str(all_concat_frames), self.nvinfo_list[all_concat_index].line)
62
def test_frame_types(self):
63
for nvinfo in self.nvinfo_list:
64
if "-lz4f-" in nvinfo.filename:
65
self.assertEqual(nvinfo.type, "LZ4Frame", nvinfo.line)
66
elif "-legc-" in nvinfo.filename:
67
self.assertEqual(nvinfo.type, "LegacyFrame", nvinfo.line)
68
elif "-skip-" in nvinfo.filename:
69
self.assertEqual(nvinfo.type, "SkippableFrame", nvinfo.line)
72
for nvinfo in self.nvinfo_list:
75
if "--BD" in nvinfo.filename:
76
self.assertRegex(nvinfo.block, "^B[0-9]+D$", nvinfo.line)
77
elif "--BI" in nvinfo.filename:
78
self.assertRegex(nvinfo.block, "^B[0-9]+I$", nvinfo.line)
80
def test_compressed_size(self):
81
for nvinfo in self.nvinfo_list:
82
self.assertEqual(nvinfo.compressed, to_human(nvinfo.exp_comp_size), nvinfo.line)
85
for nvinfo in self.nvinfo_list:
86
if "--content-size" in nvinfo.filename:
87
self.assertEqual(nvinfo.ratio, f"{float(nvinfo.exp_comp_size) / float(nvinfo.exp_unc_size) * 100:.2f}%", nvinfo.line)
89
def test_uncompressed_size(self):
90
for nvinfo in self.nvinfo_list:
91
if "--content-size" in nvinfo.filename:
92
self.assertEqual(nvinfo.uncompressed, to_human(nvinfo.exp_unc_size), nvinfo.line)
96
def __init__(self, lines):
99
self.file_frame_map = []
100
for i, line in enumerate(lines):
107
frame_info = dict(zip(["frame", "type", "block", "checksum", "compressed", "uncompressed", "ratio"], line.split()))
108
frame_info["line"] = line
109
self.frame_list.append(frame_info)
112
class TestVerbose(unittest.TestCase):
114
def setUpClass(self):
119
test_list_SM_lz4f = glob.glob(f"{TEMP}/test_list_*M-lz4f-2f--content-size.lz4")
120
for i, filename in enumerate(test_list_SM_lz4f):
121
output = execute(f"{LZ4} --list -m -v {TEMP}/test_list_concat-all.lz4 {filename}", print_output=True)
122
for i, line in enumerate(output):
123
if line.startswith("test_list"):
124
if start != 0 and end != 0:
125
self.vinfo_list.append(VerboseFileInfo(output[start:end]))
129
self.vinfo_list.append(VerboseFileInfo(output[start:end]))
131
concat_file_list = glob.glob(f"{TEMP}/test_list_[!concat]*.lz4")
133
for i, filename in enumerate(concat_file_list):
134
if "2f--content-size" in filename:
135
concat_file_list.insert(i, filename)
137
self.cvinfo = self.vinfo_list[0]
138
self.cvinfo.file_frame_map = concat_file_list
139
self.cvinfo.compressed_size = os.path.getsize(f"{TEMP}/test_list_concat-all.lz4")
141
def test_frame_number(self):
142
for vinfo in self.vinfo_list:
143
for i, frame_info in enumerate(vinfo.frame_list):
144
self.assertEqual(frame_info["frame"], str(i + 1), frame_info["line"])
146
def test_block(self):
147
for i, frame_info in enumerate(self.cvinfo.frame_list):
148
if "--BD" in self.cvinfo.file_frame_map[i]:
149
self.assertRegex(self.cvinfo.frame_list[i]["block"], "^B[0-9]+D$", self.cvinfo.frame_list[i]["line"])
150
elif "--BI" in self.cvinfo.file_frame_map[i]:
151
self.assertEqual(self.cvinfo.frame_list[i]["block"], "^B[0-9]+I$", self.cvinfo.frame_list[i]["line"])
153
def test_checksum(self):
154
for i, frame_info in enumerate(self.cvinfo.frame_list):
155
if "-lz4f-" in self.cvinfo.file_frame_map[i] and "--no-frame-crc" not in self.cvinfo.file_frame_map[i]:
156
self.assertEqual(self.cvinfo.frame_list[i]["checksum"], "XXH32", self.cvinfo.frame_list[i]["line"])
158
def test_uncompressed(self):
159
for i, frame_info in enumerate(self.cvinfo.frame_list):
160
ffm = self.cvinfo.file_frame_map[i]
161
if "-2f-" not in ffm and "--content-size" in ffm:
162
expected_size_unc = int(ffm[ffm.rindex("_") + 1:ffm.index("M")]) * 1048576
163
self.assertEqual(self.cvinfo.frame_list[i]["uncompressed"], str(expected_size_unc), self.cvinfo.frame_list[i]["line"])
165
def test_ratio(self):
166
for i, frame_info in enumerate(self.cvinfo.frame_list):
167
if "--content-size" in self.cvinfo.file_frame_map[i]:
168
self.assertEqual(self.cvinfo.frame_list[i]['ratio'],
169
f"{float(self.cvinfo.frame_list[i]['compressed']) / float(self.cvinfo.frame_list[i]['uncompressed']) * 100:.2f}%",
170
self.cvinfo.frame_list[i]["line"])
174
for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y']:
178
return f"{size:.2f}{unit}"
182
print(time.strftime("%Y/%m/%d %H:%M:%S") + ' - ' + text)
185
def errout(text, err=1):
190
def execute(command, print_command=True, print_output=False, print_error=True):
191
if os.environ.get('QEMU_SYS'):
192
command = f"{os.environ['QEMU_SYS']} {command}"
195
popen = subprocess.Popen(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
196
stdout_lines, stderr_lines = popen.communicate()
197
stderr_lines = stderr_lines.decode("utf-8")
198
stdout_lines = stdout_lines.decode("utf-8")
204
if popen.returncode is not None and popen.returncode != 0:
205
if stderr_lines and not print_output and print_error:
207
errout(f"Failed to run: {command}, {stdout_lines + stderr_lines}\n")
208
return (stdout_lines).splitlines()
211
def cleanup(silent=False):
212
for f in glob.glob(f"{TEMP}/test_list*"):
218
def datagen(file_name, size):
219
non_sparse_size = size // 2
220
sparse_size = size - non_sparse_size
221
with open(file_name, "wb") as f:
223
f.write(os.urandom(non_sparse_size))
230
filename = f"{TEMP}/test_list_{i}M"
231
log(f"Generating {filename}")
232
datagen(filename, i * MIB)
233
for j in ["--content-size", "-BI", "-BD", "-BX", "--no-frame-crc"]:
234
lz4file = f"{filename}-lz4f-1f{j}.lz4"
235
execute(f"{LZ4} {j} {filename} {lz4file}")
237
lz4file = f"{filename}-skip-1f.lz4"
239
skipbytes = bytes([80, 42, 77, 24]) + skipsize.to_bytes(4, byteorder='little', signed=False)
240
with open(lz4file, 'wb') as f:
242
f.write(os.urandom(skipsize))
244
lz4file = f"{filename}-legc-1f.lz4"
245
execute(f"{LZ4} -l {filename} {lz4file}")
248
file_list = glob.glob(f"{TEMP}/test_list_*-lz4f-1f--content-size.lz4")
249
with open(f"{TEMP}/test_list_{sum(SIZES)}M-lz4f-2f--content-size.lz4", 'ab') as outfile:
250
for fname in file_list:
251
with open(fname, 'rb') as infile:
252
outfile.write(infile.read())
255
file_list = glob.glob(f"{TEMP}/test_list_*.lz4")
256
with open(f"{TEMP}/test_list_concat-all.lz4", 'ab') as outfile:
257
for fname in file_list:
258
with open(fname, 'rb') as infile:
259
outfile.write(infile.read())
262
if __name__ == '__main__':
265
ret = unittest.main(verbosity=2, exit=False)
267
sys.exit(not ret.result.wasSuccessful())