lz4

Форк
0
/
test-lz4-list.py 
267 строк · 10.9 Кб
1
#!/usr/bin/env python3
2
import subprocess
3
import time
4
import glob
5
import os
6
import tempfile
7
import unittest
8
import sys
9

10
SIZES = [3, 11]  # Always 2 sizes
11
MIB = 1048576
12
LZ4 = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/../lz4")
13
if not os.path.exists(LZ4):
14
    LZ4 = os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + "/../programs/lz4")
15
TEMP = tempfile.gettempdir()
16

17

18
class NVerboseFileInfo:
19
    def __init__(self, line_in):
20
        self.line = line_in
21
        splitlines = line_in.split()
22
        if len(splitlines) != 7:
23
            errout(f"Unexpected line: {line_in}")
24
        self.frames, self.type, self.block, self.compressed, self.uncompressed, self.ratio, self.filename = splitlines
25
        self.exp_unc_size = 0
26
        # Get real file sizes
27
        if "concat-all" in self.filename or "2f--content-size" in self.filename:
28
            for i in SIZES:
29
                self.exp_unc_size += os.path.getsize(f"{TEMP}/test_list_{i}M")
30
        else:
31
            uncompressed_filename = self.filename.split("-")[0]
32
            self.exp_unc_size += os.path.getsize(f"{TEMP}/{uncompressed_filename}")
33
        self.exp_comp_size = os.path.getsize(f"{TEMP}/{self.filename}")
34

35

36
class TestNonVerbose(unittest.TestCase):
37
    @classmethod
38
    def setUpClass(self):
39
        self.nvinfo_list = []
40
        test_list_files = glob.glob(f"{TEMP}/test_list_*.lz4")
41
        # One of the files has 2 frames so duplicate it in this list to map each frame 1 to a single file
42
        for i, filename in enumerate(test_list_files):
43
            for i, line in enumerate(execute(f"{LZ4} --list -m {filename}", print_output=True)):
44
                if i > 0:
45
                    self.nvinfo_list.append(NVerboseFileInfo(line))
46

47
    def test_frames(self):
48
        all_concat_frames = 0
49
        all_concat_index = None
50
        for i, nvinfo in enumerate(self.nvinfo_list):
51
            if "concat-all" in nvinfo.filename:
52
                all_concat_index = i
53
            elif "2f--content-size" in nvinfo.filename:
54
                self.assertEqual("2", nvinfo.frames, nvinfo.line)
55
                all_concat_frames += 2
56
            else:
57
                self.assertEqual("1", nvinfo.frames, nvinfo.line)
58
                all_concat_frames += 1
59
        self.assertNotEqual(None, all_concat_index, "Couldn't find concat-all file index.")
60
        self.assertEqual(self.nvinfo_list[all_concat_index].frames, str(all_concat_frames), self.nvinfo_list[all_concat_index].line)
61

62
    def test_frame_types(self):
63
        for nvinfo in self.nvinfo_list:
64
            if "-lz4f-" in nvinfo.filename:
65
                self.assertEqual(nvinfo.type, "LZ4Frame", nvinfo.line)
66
            elif "-legc-" in nvinfo.filename:
67
                self.assertEqual(nvinfo.type, "LegacyFrame", nvinfo.line)
68
            elif "-skip-" in nvinfo.filename:
69
                self.assertEqual(nvinfo.type, "SkippableFrame", nvinfo.line)
70

71
    def test_block(self):
72
        for nvinfo in self.nvinfo_list:
73
            # if "-leg" in nvinfo.filename or "-skip" in nvinfo.filename:
74
            #     self.assertEqual(nvinfo.block, "-", nvinfo.line)
75
            if "--BD" in nvinfo.filename:
76
                self.assertRegex(nvinfo.block, "^B[0-9]+D$", nvinfo.line)
77
            elif "--BI" in nvinfo.filename:
78
                self.assertRegex(nvinfo.block, "^B[0-9]+I$", nvinfo.line)
79

80
    def test_compressed_size(self):
81
        for nvinfo in self.nvinfo_list:
82
            self.assertEqual(nvinfo.compressed, to_human(nvinfo.exp_comp_size), nvinfo.line)
83

84
    def test_ratio(self):
85
        for nvinfo in self.nvinfo_list:
86
            if "--content-size" in nvinfo.filename:
87
                self.assertEqual(nvinfo.ratio, f"{float(nvinfo.exp_comp_size) / float(nvinfo.exp_unc_size) * 100:.2f}%", nvinfo.line)
88

89
    def test_uncompressed_size(self):
90
        for nvinfo in self.nvinfo_list:
91
            if "--content-size" in nvinfo.filename:
92
                self.assertEqual(nvinfo.uncompressed, to_human(nvinfo.exp_unc_size), nvinfo.line)
93

94

95
class VerboseFileInfo:
96
    def __init__(self, lines):
97
        # Parse lines
98
        self.frame_list = []
99
        self.file_frame_map = []
100
        for i, line in enumerate(lines):
101
            if i == 0:
102
                self.filename = line
103
                continue
104
            elif i == 1:
105
                # Skip header
106
                continue
107
            frame_info = dict(zip(["frame", "type", "block", "checksum", "compressed", "uncompressed", "ratio"], line.split()))
108
            frame_info["line"] = line
109
            self.frame_list.append(frame_info)
110

111

112
class TestVerbose(unittest.TestCase):
113
    @classmethod
114
    def setUpClass(self):
115
        # Even do we're listing 2 files to test multiline working as expected.
116
        # we're only really interested in testing the output of the concat-all file.
117
        self.vinfo_list = []
118
        start = end = 0
119
        test_list_SM_lz4f = glob.glob(f"{TEMP}/test_list_*M-lz4f-2f--content-size.lz4")
120
        for i, filename in enumerate(test_list_SM_lz4f):
121
            output = execute(f"{LZ4} --list -m -v {TEMP}/test_list_concat-all.lz4 {filename}", print_output=True)
122
            for i, line in enumerate(output):
123
                if line.startswith("test_list"):
124
                    if start != 0 and end != 0:
125
                        self.vinfo_list.append(VerboseFileInfo(output[start:end]))
126
                    start = i
127
                if not line:
128
                    end = i
129
        self.vinfo_list.append(VerboseFileInfo(output[start:end]))
130
        # Populate file_frame_map as a reference of the expected info
131
        concat_file_list = glob.glob(f"{TEMP}/test_list_[!concat]*.lz4")
132
        # One of the files has 2 frames so duplicate it in this list to map each frame 1 to a single file
133
        for i, filename in enumerate(concat_file_list):
134
            if "2f--content-size" in filename:
135
                concat_file_list.insert(i, filename)
136
                break
137
        self.cvinfo = self.vinfo_list[0]
138
        self.cvinfo.file_frame_map = concat_file_list
139
        self.cvinfo.compressed_size = os.path.getsize(f"{TEMP}/test_list_concat-all.lz4")
140

141
    def test_frame_number(self):
142
        for vinfo in self.vinfo_list:
143
            for i, frame_info in enumerate(vinfo.frame_list):
144
                self.assertEqual(frame_info["frame"], str(i + 1), frame_info["line"])
145

146
    def test_block(self):
147
        for i, frame_info in enumerate(self.cvinfo.frame_list):
148
            if "--BD" in self.cvinfo.file_frame_map[i]:
149
                self.assertRegex(self.cvinfo.frame_list[i]["block"], "^B[0-9]+D$", self.cvinfo.frame_list[i]["line"])
150
            elif "--BI" in self.cvinfo.file_frame_map[i]:
151
                self.assertEqual(self.cvinfo.frame_list[i]["block"], "^B[0-9]+I$", self.cvinfo.frame_list[i]["line"])
152

153
    def test_checksum(self):
154
        for i, frame_info in enumerate(self.cvinfo.frame_list):
155
            if "-lz4f-" in self.cvinfo.file_frame_map[i] and "--no-frame-crc" not in self.cvinfo.file_frame_map[i]:
156
                self.assertEqual(self.cvinfo.frame_list[i]["checksum"], "XXH32", self.cvinfo.frame_list[i]["line"])
157

158
    def test_uncompressed(self):
159
        for i, frame_info in enumerate(self.cvinfo.frame_list):
160
            ffm = self.cvinfo.file_frame_map[i]
161
            if "-2f-" not in ffm and "--content-size" in ffm:
162
                expected_size_unc = int(ffm[ffm.rindex("_") + 1:ffm.index("M")]) * 1048576
163
                self.assertEqual(self.cvinfo.frame_list[i]["uncompressed"], str(expected_size_unc), self.cvinfo.frame_list[i]["line"])
164

165
    def test_ratio(self):
166
        for i, frame_info in enumerate(self.cvinfo.frame_list):
167
            if "--content-size" in self.cvinfo.file_frame_map[i]:
168
                self.assertEqual(self.cvinfo.frame_list[i]['ratio'],
169
                                 f"{float(self.cvinfo.frame_list[i]['compressed']) / float(self.cvinfo.frame_list[i]['uncompressed']) * 100:.2f}%",
170
                                 self.cvinfo.frame_list[i]["line"])
171

172

173
def to_human(size):
174
    for unit in ['', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y']:
175
        if size < 1024.0:
176
            break
177
        size /= 1024.0
178
    return f"{size:.2f}{unit}"
179

180

181
def log(text):
182
    print(time.strftime("%Y/%m/%d %H:%M:%S") + ' - ' + text)
183

184

185
def errout(text, err=1):
186
    log(text)
187
    exit(err)
188

189

190
def execute(command, print_command=True, print_output=False, print_error=True):
191
    if os.environ.get('QEMU_SYS'):
192
        command = f"{os.environ['QEMU_SYS']} {command}"
193
    if print_command:
194
        log("> " + command)
195
    popen = subprocess.Popen(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
196
    stdout_lines, stderr_lines = popen.communicate()
197
    stderr_lines = stderr_lines.decode("utf-8")
198
    stdout_lines = stdout_lines.decode("utf-8")
199
    if print_output:
200
        if stdout_lines:
201
            print(stdout_lines)
202
        if stderr_lines:
203
            print(stderr_lines)
204
    if popen.returncode is not None and popen.returncode != 0:
205
        if stderr_lines and not print_output and print_error:
206
            print(stderr_lines)
207
        errout(f"Failed to run: {command}, {stdout_lines + stderr_lines}\n")
208
    return (stdout_lines).splitlines()
209

210

211
def cleanup(silent=False):
212
    for f in glob.glob(f"{TEMP}/test_list*"):
213
        if not silent:
214
            log(f"Deleting {f}")
215
        os.unlink(f)
216

217

218
def datagen(file_name, size):
219
    non_sparse_size = size // 2
220
    sparse_size = size - non_sparse_size
221
    with open(file_name, "wb") as f:
222
        f.seek(sparse_size)
223
        f.write(os.urandom(non_sparse_size))
224

225

226
def generate_files():
227
    # file format  ~ test_list<frametype>-<no_frames>f<create-args>.lz4 ~
228
    # Generate LZ4Frames
229
    for i in SIZES:
230
        filename = f"{TEMP}/test_list_{i}M"
231
        log(f"Generating {filename}")
232
        datagen(filename, i * MIB)
233
        for j in ["--content-size", "-BI", "-BD", "-BX", "--no-frame-crc"]:
234
            lz4file = f"{filename}-lz4f-1f{j}.lz4"
235
            execute(f"{LZ4} {j} {filename} {lz4file}")
236
        # Generate skippable frames
237
        lz4file = f"{filename}-skip-1f.lz4"
238
        skipsize = i * 1024
239
        skipbytes = bytes([80, 42, 77, 24]) + skipsize.to_bytes(4, byteorder='little', signed=False)
240
        with open(lz4file, 'wb') as f:
241
            f.write(skipbytes)
242
            f.write(os.urandom(skipsize))
243
        # Generate legacy frames
244
        lz4file = f"{filename}-legc-1f.lz4"
245
        execute(f"{LZ4} -l {filename} {lz4file}")
246

247
    # Concatenate --content-size files
248
    file_list = glob.glob(f"{TEMP}/test_list_*-lz4f-1f--content-size.lz4")
249
    with open(f"{TEMP}/test_list_{sum(SIZES)}M-lz4f-2f--content-size.lz4", 'ab') as outfile:
250
        for fname in file_list:
251
            with open(fname, 'rb') as infile:
252
                outfile.write(infile.read())
253

254
    # Concatenate all files
255
    file_list = glob.glob(f"{TEMP}/test_list_*.lz4")
256
    with open(f"{TEMP}/test_list_concat-all.lz4", 'ab') as outfile:
257
        for fname in file_list:
258
            with open(fname, 'rb') as infile:
259
                outfile.write(infile.read())
260

261

262
if __name__ == '__main__':
263
    cleanup()
264
    generate_files()
265
    ret = unittest.main(verbosity=2, exit=False)
266
    cleanup(silent=True)
267
    sys.exit(not ret.result.wasSuccessful())
268

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.