glusterfs

Форк
0
519 строк · 13.3 Кб
1
/*
2
   Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
3
   This file is part of GlusterFS.
4

5
   This file is licensed to you under your choice of the GNU Lesser
6
   General Public License, version 3 or any later version (LGPLv3 or
7
   later), or the GNU General Public License, version 2 (GPLv2), in all
8
   cases as published by the Free Software Foundation.
9
*/
10

11
#include <glusterfs/glusterfs.h>
12
#include <glusterfs/logging.h>
13
#include <glusterfs/syscall.h>
14
#include <zlib.h>
15

16
#include "cdc.h"
17
#include "cdc-mem-types.h"
18

19
/* gzip header looks something like this
20
 * (RFC 1950)
21
 *
22
 * +---+---+---+---+---+---+---+---+---+---+
23
 * |ID1|ID2|CM |FLG|     MTIME     |XFL|OS |
24
 * +---+---+---+---+---+---+---+---+---+---+
25
 *
26
 * Data is usually sent without this header i.e
27
 * Data sent = <compressed-data> + trailer(8)
28
 * The trailer contains the checksum.
29
 *
30
 * gzip_header is added only during debugging.
31
 * Refer to the function cdc_dump_iovec_to_disk
32
 */
33
static const char gzip_header[10] = {'\037', '\213', Z_DEFLATED,  0, 0, 0, 0,
34
                                     0,      0,      GF_CDC_OS_ID};
35

36
static int32_t
37
cdc_next_iovec(cdc_info_t *ci)
38
{
39
    int ret = -1;
40

41
    ci->ncount++;
42
    /* check for iovec overflow -- should not happen */
43
    if (caa_unlikely(ci->ncount == MAX_IOVEC)) {
44
        gf_log(THIS->name, GF_LOG_ERROR,
45
               "Zlib output buffer overflow"
46
               " ->ncount (%d) | ->MAX_IOVEC (%d)",
47
               ci->ncount, MAX_IOVEC);
48
        goto out;
49
    }
50

51
    ret = 0;
52

53
out:
54
    return ret;
55
}
56

57
static void
58
cdc_put_long(unsigned char *string, unsigned long x)
59
{
60
    string[0] = (unsigned char)(x & 0xff);
61
    string[1] = (unsigned char)((x & 0xff00) >> 8);
62
    string[2] = (unsigned char)((x & 0xff0000) >> 16);
63
    string[3] = (unsigned char)((x & 0xff000000) >> 24);
64
}
65

66
static unsigned long
67
cdc_get_long(unsigned char *buf)
68
{
69
    return ((unsigned long)buf[0]) | (((unsigned long)buf[1]) << 8) |
70
           (((unsigned long)buf[2]) << 16) | (((unsigned long)buf[3]) << 24);
71
}
72

73
static int32_t
74
cdc_init_gzip_trailer(cdc_info_t *ci)
75
{
76
    int ret = -1;
77
    char *buf = NULL;
78

79
    ret = cdc_next_iovec(ci);
80
    if (ret)
81
        goto out;
82

83
    buf = CURR_VEC(ci).iov_base = (char *)GF_CALLOC(1, GF_CDC_VALIDATION_SIZE,
84
                                                    gf_cdc_mt_gzip_trailer_t);
85

86
    if (!CURR_VEC(ci).iov_base)
87
        goto out;
88

89
    CURR_VEC(ci).iov_len = GF_CDC_VALIDATION_SIZE;
90

91
    cdc_put_long((unsigned char *)&buf[0], ci->crc);
92
    cdc_put_long((unsigned char *)&buf[4], ci->stream.total_in);
93

94
    ret = 0;
95

96
out:
97
    return ret;
98
}
99

100
static int32_t
101
cdc_alloc_iobuf_and_init_vec(xlator_t *this, cdc_info_t *ci, int size)
102
{
103
    int ret = -1;
104
    int alloc_len = 0;
105
    struct iobuf *iobuf = NULL;
106

107
    ret = cdc_next_iovec(ci);
108
    if (caa_unlikely(ret))
109
        goto out;
110

111
    alloc_len = size ? size : ci->buffer_size;
112

113
    iobuf = iobuf_get2(this->ctx->iobuf_pool, alloc_len);
114
    if (caa_unlikely(!iobuf))
115
        goto out;
116

117
    ret = iobref_add(ci->iobref, iobuf);
118
    if (caa_unlikely(ret))
119
        goto out;
120

121
    /* Initialize this iovec */
122
    CURR_VEC(ci).iov_base = iobuf->ptr;
123
    CURR_VEC(ci).iov_len = alloc_len;
124

125
    ret = 0;
126

127
out:
128
    return ret;
129
}
130

131
static void
132
cdc_init_zlib_output_stream(cdc_info_t *ci, int size)
133
{
134
    ci->stream.next_out = (unsigned char *)CURR_VEC(ci).iov_base;
135
    ci->stream.avail_out = size ? size : ci->buffer_size;
136
}
137

138
/* This routine is for testing and debugging only.
139
 * Data written = header(10) + <compressed-data> + trailer(8)
140
 * So each gzip dump file is at least 18 bytes in size.
141
 */
142
static void
143
cdc_dump_iovec_to_disk(xlator_t *this, cdc_info_t *ci, const char *file)
144
{
145
    int i = 0;
146
    int fd = 0;
147
    size_t written = 0;
148
    size_t total_written = 0;
149

150
    fd = open(file, O_WRONLY | O_CREAT | O_TRUNC, 0777);
151
    if (fd < 0) {
152
        gf_log(this->name, GF_LOG_ERROR, "Cannot open file: %s", file);
153
        return;
154
    }
155

156
    written = sys_write(fd, (char *)gzip_header, 10);
157
    total_written += written;
158
    for (i = 0; i < ci->ncount; i++) {
159
        written = sys_write(fd, (char *)ci->vec[i].iov_base,
160
                            ci->vec[i].iov_len);
161
        total_written += written;
162
    }
163

164
    gf_log(this->name, GF_LOG_DEBUG, "dump'd %zu bytes to %s", total_written,
165
           GF_CDC_DEBUG_DUMP_FILE);
166

167
    sys_close(fd);
168
}
169

170
static int32_t
171
cdc_flush_libz_buffer(xlator_t *this, cdc_info_t *ci,
172
                      int (*libz_func)(z_streamp, int), int flush)
173
{
174
    int32_t ret = Z_OK;
175
    int done = 0;
176
    unsigned int deflate_len = 0;
177

178
    for (;;) {
179
        deflate_len = ci->buffer_size - ci->stream.avail_out;
180

181
        if (deflate_len != 0) {
182
            CURR_VEC(ci).iov_len = deflate_len;
183

184
            ret = cdc_alloc_iobuf_and_init_vec(this, ci, 0);
185
            if (ret) {
186
                ret = Z_MEM_ERROR;
187
                break;
188
            }
189

190
            /* Re-position Zlib output buffer */
191
            cdc_init_zlib_output_stream(ci, 0);
192
        }
193

194
        if (done) {
195
            ci->ncount--;
196
            break;
197
        }
198

199
        ret = libz_func(&ci->stream, flush);
200

201
        if (ret == Z_BUF_ERROR) {
202
            ret = Z_OK;
203
            ci->ncount--;
204
            break;
205
        }
206

207
        done = (ci->stream.avail_out != 0 || ret == Z_STREAM_END);
208

209
        if (ret != Z_OK && ret != Z_STREAM_END)
210
            break;
211
    }
212

213
    return ret;
214
}
215

216
static int32_t
217
do_cdc_compress(struct iovec *vec, xlator_t *this, cdc_info_t *ci)
218
{
219
    int ret = -1;
220

221
    ret = cdc_alloc_iobuf_and_init_vec(this, ci, 0);
222
    if (ret)
223
        goto out;
224

225
    /* setup output buffer */
226
    cdc_init_zlib_output_stream(ci, 0);
227

228
    /* setup input buffer */
229
    ci->stream.next_in = (unsigned char *)vec->iov_base;
230
    ci->stream.avail_in = vec->iov_len;
231

232
    ci->crc = crc32(ci->crc, (const Bytef *)vec->iov_base, vec->iov_len);
233

234
    gf_log(this->name, GF_LOG_DEBUG, "crc=%lu len=%d buffer_size=%d", ci->crc,
235
           ci->stream.avail_in, ci->buffer_size);
236

237
    /* compress !! */
238
    while (ci->stream.avail_in != 0) {
239
        if (ci->stream.avail_out == 0) {
240
            CURR_VEC(ci).iov_len = ci->buffer_size;
241

242
            ret = cdc_alloc_iobuf_and_init_vec(this, ci, 0);
243
            if (ret)
244
                break;
245

246
            /* Re-position Zlib output buffer */
247
            cdc_init_zlib_output_stream(ci, 0);
248
        }
249

250
        ret = deflate(&ci->stream, Z_NO_FLUSH);
251
        if (ret != Z_OK)
252
            break;
253
    }
254

255
out:
256
    return ret;
257
}
258

259
int32_t
260
cdc_compress(xlator_t *this, cdc_priv_t *priv, cdc_info_t *ci, dict_t **xdata)
261
{
262
    int ret = -1;
263
    int i = 0;
264

265
    ci->iobref = iobref_new();
266
    if (!ci->iobref)
267
        goto out;
268

269
    if (!*xdata) {
270
        *xdata = dict_new();
271
        if (!*xdata) {
272
            gf_log(this->name, GF_LOG_ERROR,
273
                   "Cannot allocate xdata"
274
                   " dict");
275
            goto out;
276
        }
277
    }
278

279
    /* Initialize defalte */
280
    ret = deflateInit2(&ci->stream, priv->cdc_level, Z_DEFLATED,
281
                       priv->window_size, priv->mem_level, Z_DEFAULT_STRATEGY);
282

283
    if (ret) {
284
        gf_log(this->name, GF_LOG_ERROR, "unable to init Zlib (retval: %d)",
285
               ret);
286
        goto out;
287
    }
288

289
    /* data */
290
    for (i = 0; i < ci->count; i++) {
291
        ret = do_cdc_compress(&ci->vector[i], this, ci);
292
        if (ret != Z_OK)
293
            goto deflate_cleanup_out;
294
    }
295

296
    /* flush zlib buffer */
297
    ret = cdc_flush_libz_buffer(this, ci, deflate, Z_FINISH);
298
    if (!(ret == Z_OK || ret == Z_STREAM_END)) {
299
        gf_log(this->name, GF_LOG_ERROR, "Compression Error: ret (%d)", ret);
300
        ret = -1;
301
        goto deflate_cleanup_out;
302
    }
303

304
    /* trailer */
305
    ret = cdc_init_gzip_trailer(ci);
306
    if (ret)
307
        goto deflate_cleanup_out;
308

309
    gf_log(this->name, GF_LOG_DEBUG, "Compressed %ld to %ld bytes",
310
           ci->stream.total_in, ci->stream.total_out);
311

312
    ci->nbytes = ci->stream.total_out + GF_CDC_VALIDATION_SIZE;
313

314
    /* set deflated canary value for identification */
315
    ret = dict_set_int32(*xdata, GF_CDC_DEFLATE_CANARY_VAL, 1);
316
    if (ret) {
317
        /* Send uncompressed data if we can't _tell_ the client
318
         * that deflated data is on its way. So, we just log
319
         * the failure and continue as usual.
320
         */
321
        gf_log(this->name, GF_LOG_ERROR,
322
               "Data deflated, but could not set canary"
323
               " value in dict for identification");
324
    }
325

326
    /* This is to be used in testing */
327
    if (priv->debug) {
328
        cdc_dump_iovec_to_disk(this, ci, GF_CDC_DEBUG_DUMP_FILE);
329
    }
330

331
deflate_cleanup_out:
332
    (void)deflateEnd(&ci->stream);
333

334
out:
335
    return ret;
336
}
337

338
/* deflate content is checked by the presence of a canary
339
 * value in the dict as the key
340
 */
341
static int32_t
342
cdc_check_content_for_deflate(dict_t *xdata)
343
{
344
    return dict_get(xdata, GF_CDC_DEFLATE_CANARY_VAL) ? -1 : 0;
345
}
346

347
static unsigned long
348
cdc_extract_crc(char *trailer)
349
{
350
    return cdc_get_long((unsigned char *)&trailer[0]);
351
}
352

353
static unsigned long
354
cdc_extract_size(char *trailer)
355
{
356
    return cdc_get_long((unsigned char *)&trailer[4]);
357
}
358

359
static int32_t
360
cdc_validate_inflate(cdc_info_t *ci, unsigned long crc, unsigned long len)
361
{
362
    return !((crc == ci->crc)
363
             /* inflated length is hidden inside
364
              * Zlib stream struct */
365
             && (len == ci->stream.total_out));
366
}
367

368
static int32_t
369
do_cdc_decompress(xlator_t *this, cdc_info_t *ci)
370
{
371
    int ret = -1;
372
    int i = 0;
373
    int len = 0;
374
    char *inflte = NULL;
375
    char *trailer = NULL;
376
    struct iovec vec = {
377
        0,
378
    };
379
    unsigned long computed_crc = 0;
380
    unsigned long computed_len = 0;
381

382
    vec = THIS_VEC(ci, 0);
383

384
    trailer = (char *)(((char *)vec.iov_base) + vec.iov_len -
385
                       GF_CDC_VALIDATION_SIZE);
386

387
    /* CRC of uncompressed data */
388
    computed_crc = cdc_extract_crc(trailer);
389

390
    /* size of uncomrpessed data */
391
    computed_len = cdc_extract_size(trailer);
392

393
    gf_log(this->name, GF_LOG_DEBUG, "crc=%lu len=%lu buffer_size=%d",
394
           computed_crc, computed_len, ci->buffer_size);
395

396
    inflte = vec.iov_base;
397
    len = vec.iov_len - GF_CDC_VALIDATION_SIZE;
398

399
    /* allocate buffer of the original length of the data */
400
    ret = cdc_alloc_iobuf_and_init_vec(this, ci, 0);
401
    if (ret)
402
        goto out;
403

404
    /* setup output buffer */
405
    cdc_init_zlib_output_stream(ci, 0);
406

407
    /* setup input buffer */
408
    ci->stream.next_in = (unsigned char *)inflte;
409
    ci->stream.avail_in = len;
410

411
    while (ci->stream.avail_in != 0) {
412
        if (ci->stream.avail_out == 0) {
413
            CURR_VEC(ci).iov_len = ci->buffer_size;
414

415
            ret = cdc_alloc_iobuf_and_init_vec(this, ci, 0);
416
            if (ret)
417
                break;
418

419
            /* Re-position Zlib output buffer */
420
            cdc_init_zlib_output_stream(ci, 0);
421
        }
422

423
        ret = inflate(&ci->stream, Z_NO_FLUSH);
424
        if (ret == Z_STREAM_ERROR)
425
            break;
426
    }
427

428
    /* flush zlib buffer */
429
    ret = cdc_flush_libz_buffer(this, ci, inflate, Z_SYNC_FLUSH);
430
    if (!(ret == Z_OK || ret == Z_STREAM_END)) {
431
        gf_log(this->name, GF_LOG_ERROR, "Decompression Error: ret (%d)", ret);
432
        ret = -1;
433
        goto out;
434
    }
435

436
    /* compute CRC of the uncompressed data to check for
437
     * correctness */
438

439
    for (i = 0; i < ci->ncount; i++) {
440
        ci->crc = crc32(ci->crc, (const Bytef *)ci->vec[i].iov_base,
441
                        ci->vec[i].iov_len);
442
    }
443

444
    /* validate inflated data */
445
    ret = cdc_validate_inflate(ci, computed_crc, computed_len);
446
    if (ret) {
447
        gf_log(this->name, GF_LOG_ERROR,
448
               "Checksum or length mismatched in inflated data");
449
    }
450

451
out:
452
    return ret;
453
}
454

455
int32_t
456
cdc_decompress(xlator_t *this, cdc_priv_t *priv, cdc_info_t *ci, dict_t *xdata)
457
{
458
    int32_t ret = -1;
459

460
    /* check for deflate content */
461
    if (!cdc_check_content_for_deflate(xdata)) {
462
        gf_log(this->name, GF_LOG_DEBUG,
463
               "Content not deflated, passing through ...");
464
        goto passthrough_out;
465
    }
466

467
    ci->iobref = iobref_new();
468
    if (!ci->iobref)
469
        goto passthrough_out;
470

471
    /* do we need to do this? can we assume that one iovec
472
     * will hold per request data every time?
473
     *
474
     * server/client protocol seems to deal with a single
475
     * iovec even if op_ret > 1M. So, it looks ok to
476
     * assume that a single iovec will contain all the
477
     * data (This saves us a lot from finding the trailer
478
     * and the data since it could have been split-up onto
479
     * two adjacent iovec's.
480
     *
481
     * But, in case this translator is loaded above quick-read
482
     * for some reason, then it's entirely possible that we get
483
     * multiple iovec's...
484
     *
485
     * This case (handled below) is not tested. (by loading the
486
     * xlator below quick-read)
487
     */
488

489
    /* @@ I_HOPE_THIS_IS_NEVER_HIT */
490
    if (ci->count > 1) {
491
        gf_log(this->name, GF_LOG_WARNING,
492
               "unable to handle"
493
               " multiple iovecs (%d in number)",
494
               ci->count);
495
        goto inflate_cleanup_out;
496
        /* TODO: coallate all iovecs in one */
497
    }
498

499
    ret = inflateInit2(&ci->stream, priv->window_size);
500
    if (ret) {
501
        gf_log(this->name, GF_LOG_ERROR, "Zlib: Unable to initialize inflate");
502
        goto inflate_cleanup_out;
503
    }
504

505
    ret = do_cdc_decompress(this, ci);
506
    if (ret)
507
        goto inflate_cleanup_out;
508

509
    ci->nbytes = ci->stream.total_out;
510

511
    gf_log(this->name, GF_LOG_DEBUG, "Inflated %ld to %ld bytes",
512
           ci->stream.total_in, ci->stream.total_out);
513

514
inflate_cleanup_out:
515
    (void)inflateEnd(&ci->stream);
516

517
passthrough_out:
518
    return ret;
519
}
520

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.