11
#include <glusterfs/glusterfs.h>
12
#include <glusterfs/logging.h>
13
#include <glusterfs/syscall.h>
17
#include "cdc-mem-types.h"
33
static const char gzip_header[10] = {'\037', '\213', Z_DEFLATED, 0, 0, 0, 0,
37
cdc_next_iovec(cdc_info_t *ci)
43
if (caa_unlikely(ci->ncount == MAX_IOVEC)) {
44
gf_log(THIS->name, GF_LOG_ERROR,
45
"Zlib output buffer overflow"
46
" ->ncount (%d) | ->MAX_IOVEC (%d)",
47
ci->ncount, MAX_IOVEC);
58
cdc_put_long(unsigned char *string, unsigned long x)
60
string[0] = (unsigned char)(x & 0xff);
61
string[1] = (unsigned char)((x & 0xff00) >> 8);
62
string[2] = (unsigned char)((x & 0xff0000) >> 16);
63
string[3] = (unsigned char)((x & 0xff000000) >> 24);
67
cdc_get_long(unsigned char *buf)
69
return ((unsigned long)buf[0]) | (((unsigned long)buf[1]) << 8) |
70
(((unsigned long)buf[2]) << 16) | (((unsigned long)buf[3]) << 24);
74
cdc_init_gzip_trailer(cdc_info_t *ci)
79
ret = cdc_next_iovec(ci);
83
buf = CURR_VEC(ci).iov_base = (char *)GF_CALLOC(1, GF_CDC_VALIDATION_SIZE,
84
gf_cdc_mt_gzip_trailer_t);
86
if (!CURR_VEC(ci).iov_base)
89
CURR_VEC(ci).iov_len = GF_CDC_VALIDATION_SIZE;
91
cdc_put_long((unsigned char *)&buf[0], ci->crc);
92
cdc_put_long((unsigned char *)&buf[4], ci->stream.total_in);
101
cdc_alloc_iobuf_and_init_vec(xlator_t *this, cdc_info_t *ci, int size)
105
struct iobuf *iobuf = NULL;
107
ret = cdc_next_iovec(ci);
108
if (caa_unlikely(ret))
111
alloc_len = size ? size : ci->buffer_size;
113
iobuf = iobuf_get2(this->ctx->iobuf_pool, alloc_len);
114
if (caa_unlikely(!iobuf))
117
ret = iobref_add(ci->iobref, iobuf);
118
if (caa_unlikely(ret))
122
CURR_VEC(ci).iov_base = iobuf->ptr;
123
CURR_VEC(ci).iov_len = alloc_len;
132
cdc_init_zlib_output_stream(cdc_info_t *ci, int size)
134
ci->stream.next_out = (unsigned char *)CURR_VEC(ci).iov_base;
135
ci->stream.avail_out = size ? size : ci->buffer_size;
143
cdc_dump_iovec_to_disk(xlator_t *this, cdc_info_t *ci, const char *file)
148
size_t total_written = 0;
150
fd = open(file, O_WRONLY | O_CREAT | O_TRUNC, 0777);
152
gf_log(this->name, GF_LOG_ERROR, "Cannot open file: %s", file);
156
written = sys_write(fd, (char *)gzip_header, 10);
157
total_written += written;
158
for (i = 0; i < ci->ncount; i++) {
159
written = sys_write(fd, (char *)ci->vec[i].iov_base,
161
total_written += written;
164
gf_log(this->name, GF_LOG_DEBUG, "dump'd %zu bytes to %s", total_written,
165
GF_CDC_DEBUG_DUMP_FILE);
171
cdc_flush_libz_buffer(xlator_t *this, cdc_info_t *ci,
172
int (*libz_func)(z_streamp, int), int flush)
176
unsigned int deflate_len = 0;
179
deflate_len = ci->buffer_size - ci->stream.avail_out;
181
if (deflate_len != 0) {
182
CURR_VEC(ci).iov_len = deflate_len;
184
ret = cdc_alloc_iobuf_and_init_vec(this, ci, 0);
191
cdc_init_zlib_output_stream(ci, 0);
199
ret = libz_func(&ci->stream, flush);
201
if (ret == Z_BUF_ERROR) {
207
done = (ci->stream.avail_out != 0 || ret == Z_STREAM_END);
209
if (ret != Z_OK && ret != Z_STREAM_END)
217
do_cdc_compress(struct iovec *vec, xlator_t *this, cdc_info_t *ci)
221
ret = cdc_alloc_iobuf_and_init_vec(this, ci, 0);
226
cdc_init_zlib_output_stream(ci, 0);
229
ci->stream.next_in = (unsigned char *)vec->iov_base;
230
ci->stream.avail_in = vec->iov_len;
232
ci->crc = crc32(ci->crc, (const Bytef *)vec->iov_base, vec->iov_len);
234
gf_log(this->name, GF_LOG_DEBUG, "crc=%lu len=%d buffer_size=%d", ci->crc,
235
ci->stream.avail_in, ci->buffer_size);
238
while (ci->stream.avail_in != 0) {
239
if (ci->stream.avail_out == 0) {
240
CURR_VEC(ci).iov_len = ci->buffer_size;
242
ret = cdc_alloc_iobuf_and_init_vec(this, ci, 0);
247
cdc_init_zlib_output_stream(ci, 0);
250
ret = deflate(&ci->stream, Z_NO_FLUSH);
260
cdc_compress(xlator_t *this, cdc_priv_t *priv, cdc_info_t *ci, dict_t **xdata)
265
ci->iobref = iobref_new();
272
gf_log(this->name, GF_LOG_ERROR,
273
"Cannot allocate xdata"
280
ret = deflateInit2(&ci->stream, priv->cdc_level, Z_DEFLATED,
281
priv->window_size, priv->mem_level, Z_DEFAULT_STRATEGY);
284
gf_log(this->name, GF_LOG_ERROR, "unable to init Zlib (retval: %d)",
290
for (i = 0; i < ci->count; i++) {
291
ret = do_cdc_compress(&ci->vector[i], this, ci);
293
goto deflate_cleanup_out;
297
ret = cdc_flush_libz_buffer(this, ci, deflate, Z_FINISH);
298
if (!(ret == Z_OK || ret == Z_STREAM_END)) {
299
gf_log(this->name, GF_LOG_ERROR, "Compression Error: ret (%d)", ret);
301
goto deflate_cleanup_out;
305
ret = cdc_init_gzip_trailer(ci);
307
goto deflate_cleanup_out;
309
gf_log(this->name, GF_LOG_DEBUG, "Compressed %ld to %ld bytes",
310
ci->stream.total_in, ci->stream.total_out);
312
ci->nbytes = ci->stream.total_out + GF_CDC_VALIDATION_SIZE;
315
ret = dict_set_int32(*xdata, GF_CDC_DEFLATE_CANARY_VAL, 1);
321
gf_log(this->name, GF_LOG_ERROR,
322
"Data deflated, but could not set canary"
323
" value in dict for identification");
328
cdc_dump_iovec_to_disk(this, ci, GF_CDC_DEBUG_DUMP_FILE);
332
(void)deflateEnd(&ci->stream);
342
cdc_check_content_for_deflate(dict_t *xdata)
344
return dict_get(xdata, GF_CDC_DEFLATE_CANARY_VAL) ? -1 : 0;
348
cdc_extract_crc(char *trailer)
350
return cdc_get_long((unsigned char *)&trailer[0]);
354
cdc_extract_size(char *trailer)
356
return cdc_get_long((unsigned char *)&trailer[4]);
360
cdc_validate_inflate(cdc_info_t *ci, unsigned long crc, unsigned long len)
362
return !((crc == ci->crc)
365
&& (len == ci->stream.total_out));
369
do_cdc_decompress(xlator_t *this, cdc_info_t *ci)
375
char *trailer = NULL;
379
unsigned long computed_crc = 0;
380
unsigned long computed_len = 0;
382
vec = THIS_VEC(ci, 0);
384
trailer = (char *)(((char *)vec.iov_base) + vec.iov_len -
385
GF_CDC_VALIDATION_SIZE);
388
computed_crc = cdc_extract_crc(trailer);
391
computed_len = cdc_extract_size(trailer);
393
gf_log(this->name, GF_LOG_DEBUG, "crc=%lu len=%lu buffer_size=%d",
394
computed_crc, computed_len, ci->buffer_size);
396
inflte = vec.iov_base;
397
len = vec.iov_len - GF_CDC_VALIDATION_SIZE;
400
ret = cdc_alloc_iobuf_and_init_vec(this, ci, 0);
405
cdc_init_zlib_output_stream(ci, 0);
408
ci->stream.next_in = (unsigned char *)inflte;
409
ci->stream.avail_in = len;
411
while (ci->stream.avail_in != 0) {
412
if (ci->stream.avail_out == 0) {
413
CURR_VEC(ci).iov_len = ci->buffer_size;
415
ret = cdc_alloc_iobuf_and_init_vec(this, ci, 0);
420
cdc_init_zlib_output_stream(ci, 0);
423
ret = inflate(&ci->stream, Z_NO_FLUSH);
424
if (ret == Z_STREAM_ERROR)
429
ret = cdc_flush_libz_buffer(this, ci, inflate, Z_SYNC_FLUSH);
430
if (!(ret == Z_OK || ret == Z_STREAM_END)) {
431
gf_log(this->name, GF_LOG_ERROR, "Decompression Error: ret (%d)", ret);
439
for (i = 0; i < ci->ncount; i++) {
440
ci->crc = crc32(ci->crc, (const Bytef *)ci->vec[i].iov_base,
445
ret = cdc_validate_inflate(ci, computed_crc, computed_len);
447
gf_log(this->name, GF_LOG_ERROR,
448
"Checksum or length mismatched in inflated data");
456
cdc_decompress(xlator_t *this, cdc_priv_t *priv, cdc_info_t *ci, dict_t *xdata)
461
if (!cdc_check_content_for_deflate(xdata)) {
462
gf_log(this->name, GF_LOG_DEBUG,
463
"Content not deflated, passing through ...");
464
goto passthrough_out;
467
ci->iobref = iobref_new();
469
goto passthrough_out;
491
gf_log(this->name, GF_LOG_WARNING,
493
" multiple iovecs (%d in number)",
495
goto inflate_cleanup_out;
499
ret = inflateInit2(&ci->stream, priv->window_size);
501
gf_log(this->name, GF_LOG_ERROR, "Zlib: Unable to initialize inflate");
502
goto inflate_cleanup_out;
505
ret = do_cdc_decompress(this, ci);
507
goto inflate_cleanup_out;
509
ci->nbytes = ci->stream.total_out;
511
gf_log(this->name, GF_LOG_DEBUG, "Inflated %ld to %ld bytes",
512
ci->stream.total_in, ci->stream.total_out);
515
(void)inflateEnd(&ci->stream);