glusterfs

Форк
0
2079 строк · 50.9 Кб
1
/*
2
 *   Copyright (c) 2018 Red Hat, Inc. <http://www.redhat.com>
3
 *   This file is part of GlusterFS.
4
 *
5
 *   This file is licensed to you under your choice of the GNU Lesser
6
 *   General Public License, version 3 or any later version (LGPLv3 or
7
 *   later), or the GNU General Public License, version 2 (GPLv2), in all
8
 *   cases as published by the Free Software Foundation.
9
 */
10

11
#include "cloudsync.h"
12
#include "cloudsync-common.h"
13
#include <glusterfs/call-stub.h>
14
#include "cloudsync-autogen-fops.h"
15

16
#include <string.h>
17
#include <dlfcn.h>
18

19
static void
20
cs_cleanup_private(cs_private_t *priv)
21
{
22
    if (priv) {
23
        if (priv->stores) {
24
            priv->stores->fini(priv->stores->config);
25
            GF_FREE(priv->stores);
26
        }
27

28
        pthread_spin_destroy(&priv->lock);
29
        GF_FREE(priv);
30
    }
31

32
    return;
33
}
34

35
static struct cs_plugin plugins[] = {
36
    {.name = "cloudsyncs3",
37
     .library = "cloudsyncs3.so",
38
     .description = "cloudsync s3 store."},
39
#if defined(__linux__)
40
    {.name = "cvlt",
41
     .library = "cloudsynccvlt.so",
42
     .description = "Commvault content store."},
43
#endif
44
    {.name = NULL},
45
};
46

47
int
48
cs_init(xlator_t *this)
49
{
50
    cs_private_t *priv = NULL;
51
    gf_boolean_t per_vol = _gf_false;
52
    int ret = 0;
53
    char *libpath = NULL;
54
    store_methods_t *store_methods = NULL;
55
    void *handle = NULL;
56
    char *temp_str = NULL;
57
    int index = 0;
58
    char *libname = NULL;
59

60
    priv = GF_CALLOC(1, sizeof(*priv), gf_cs_mt_cs_private_t);
61
    if (!priv) {
62
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "insufficient memory");
63
        goto out;
64
    }
65

66
    priv->this = this;
67

68
    this->local_pool = mem_pool_new(cs_local_t, 512);
69
    if (!this->local_pool) {
70
        gf_msg(this->name, GF_LOG_ERROR, 0, ENOMEM, "initialisation failed.");
71
        ret = -1;
72
        goto out;
73
    }
74

75
    this->private = priv;
76

77
    GF_OPTION_INIT("cloudsync-remote-read", priv->remote_read, bool, out);
78

79
    /* temp workaround. Should be configurable through glusterd*/
80
    per_vol = _gf_true;
81

82
    if (per_vol) {
83
        if (dict_get_str_sizen(this->options, "cloudsync-storetype",
84
                               &temp_str) == 0) {
85
            for (index = 0; plugins[index].name; index++) {
86
                if (!strcmp(temp_str, plugins[index].name)) {
87
                    libname = plugins[index].library;
88
                    break;
89
                }
90
            }
91
        } else {
92
            ret = 0;
93
        }
94

95
        if (!libname) {
96
            gf_msg(this->name, GF_LOG_WARNING, 0, 0, "no plugin enabled");
97
            ret = 0;
98
            goto out;
99
        }
100

101
        ret = gf_asprintf(&libpath, "%s/%s", CS_PLUGINDIR, libname);
102
        if (ret == -1) {
103
            goto out;
104
        }
105

106
        handle = dlopen(libpath, RTLD_NOW);
107
        if (!handle) {
108
            gf_msg(this->name, GF_LOG_WARNING, 0, 0,
109
                   "could not "
110
                   "load the required library. %s",
111
                   dlerror());
112
            ret = 0;
113
            goto out;
114
        } else {
115
            gf_msg(this->name, GF_LOG_INFO, 0, 0,
116
                   "loading library:%s successful", libname);
117
        }
118

119
        priv->stores = GF_CALLOC(1, sizeof(struct cs_remote_stores),
120
                                 gf_cs_mt_cs_remote_stores_t);
121
        if (!priv->stores) {
122
            gf_msg(this->name, GF_LOG_ERROR, 0, 0,
123
                   "Could not "
124
                   "allocate memory for priv->stores");
125
            ret = -1;
126
            goto out;
127
        }
128

129
        (void)dlerror(); /* clear out previous error string */
130

131
        /* load library methods */
132
        store_methods = (store_methods_t *)dlsym(handle, "store_ops");
133
        if (!store_methods) {
134
            gf_msg(this->name, GF_LOG_ERROR, 0, 0, "null store_methods %s",
135
                   dlerror());
136
            ret = -1;
137
            goto out;
138
        }
139

140
        (void)dlerror();
141

142
        if (priv->remote_read) {
143
            priv->stores->rdfop = store_methods->fop_remote_read;
144
            if (!priv->stores->rdfop) {
145
                gf_msg(this->name, GF_LOG_ERROR, 0, 0,
146
                       "failed to get"
147
                       " read fop %s",
148
                       dlerror());
149
                ret = -1;
150
                goto out;
151
            }
152
        }
153

154
        priv->stores->dlfop = store_methods->fop_download;
155
        if (!priv->stores->dlfop) {
156
            gf_msg(this->name, GF_LOG_ERROR, 0, 0,
157
                   "failed to get"
158
                   " download fop %s",
159
                   dlerror());
160
            ret = -1;
161
            goto out;
162
        }
163

164
        (void)dlerror();
165
        priv->stores->init = store_methods->fop_init;
166
        if (!priv->stores->init) {
167
            gf_msg(this->name, GF_LOG_ERROR, 0, 0,
168
                   "failed to get"
169
                   " init fop %s",
170
                   dlerror());
171
            ret = -1;
172
            goto out;
173
        }
174

175
        (void)dlerror();
176
        priv->stores->reconfigure = store_methods->fop_reconfigure;
177
        if (!priv->stores->reconfigure) {
178
            gf_msg(this->name, GF_LOG_ERROR, 0, 0,
179
                   "failed to get"
180
                   " reconfigure fop %s",
181
                   dlerror());
182
            ret = -1;
183
            goto out;
184
        }
185

186
        priv->stores->handle = handle;
187

188
        priv->stores->config = (void *)((priv->stores->init)(this));
189
        if (!priv->stores->config) {
190
            gf_msg(this->name, GF_LOG_ERROR, 0, 0, "null config");
191
            ret = -1;
192
            goto out;
193
        }
194
    }
195

196
    ret = 0;
197

198
out:
199
    if (ret == -1) {
200
        if (this->local_pool) {
201
            mem_pool_destroy(this->local_pool);
202
            this->local_pool = NULL;
203
        }
204

205
        cs_cleanup_private(priv);
206

207
        if (handle) {
208
            dlclose(handle);
209
        }
210
    }
211

212
    GF_FREE(libpath);
213

214
    return ret;
215
}
216

217
int
218
cs_forget(xlator_t *this, inode_t *inode)
219
{
220
    uint64_t ctx_int = 0;
221
    cs_inode_ctx_t *ctx = NULL;
222

223
    inode_ctx_del(inode, this, &ctx_int);
224
    if (!ctx_int)
225
        return 0;
226

227
    ctx = (cs_inode_ctx_t *)(uintptr_t)ctx_int;
228

229
    GF_FREE(ctx);
230
    return 0;
231
}
232

233
void
234
cs_fini(xlator_t *this)
235
{
236
    cs_private_t *priv = NULL;
237
    priv = this->private;
238

239
    cs_cleanup_private(priv);
240
}
241

242
int
243
cs_reconfigure(xlator_t *this, dict_t *options)
244
{
245
    cs_private_t *priv = NULL;
246
    int ret = 0;
247

248
    priv = this->private;
249
    if (!priv) {
250
        ret = -1;
251
        goto out;
252
    }
253

254
    GF_OPTION_RECONF("cloudsync-remote-read", priv->remote_read, options, bool,
255
                     out);
256

257
    /* needed only for per volume configuration*/
258
    ret = priv->stores->reconfigure(this, options);
259

260
out:
261
    return ret;
262
}
263

264
int32_t
265
cs_mem_acct_init(xlator_t *this)
266
{
267
    int ret = -1;
268

269
    GF_VALIDATE_OR_GOTO("cloudsync", this, out);
270

271
    ret = xlator_mem_acct_init(this, gf_cs_mt_end);
272

273
    if (ret != 0) {
274
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "Memory accounting init failed");
275
        return ret;
276
    }
277
out:
278
    return ret;
279
}
280

281
int32_t
282
cs_readdirp(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
283
            off_t off, dict_t *xdata)
284
{
285
    int ret = 0;
286
    int op_errno = ENOMEM;
287

288
    if (!xdata) {
289
        xdata = dict_new();
290
        if (!xdata) {
291
            gf_msg(this->name, GF_LOG_ERROR, 0, ENOMEM,
292
                   "failed to create "
293
                   "dict");
294
            goto err;
295
        }
296
    } else {
297
        dict_ref(xdata);
298
    }
299

300
    ret = dict_set_uint32(xdata, GF_CS_OBJECT_STATUS, 1);
301
    if (ret) {
302
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
303
               "dict_set failed key:"
304
               " %s",
305
               GF_CS_OBJECT_STATUS);
306
        goto err;
307
    }
308

309
    STACK_WIND(frame, default_readdirp_cbk, FIRST_CHILD(this),
310
               FIRST_CHILD(this)->fops->readdirp, fd, size, off, xdata);
311
    dict_unref(xdata);
312
    return 0;
313
err:
314
    STACK_UNWIND_STRICT(readdirp, frame, -1, op_errno, NULL, NULL);
315
    if (xdata) {
316
        dict_unref(xdata);
317
    }
318
    return 0;
319
}
320

321
int32_t
322
cs_truncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
323
                int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
324
                struct iatt *postbuf, dict_t *xdata)
325
{
326
    cs_local_t *local = NULL;
327
    int ret = 0;
328
    uint64_t val = 0;
329

330
    local = frame->local;
331

332
    local->call_cnt++;
333

334
    if (op_ret == -1) {
335
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "truncate failed");
336
        ret = dict_get_uint64(xdata, GF_CS_OBJECT_STATUS, &val);
337
        if (ret == 0) {
338
            if (val == GF_CS_ERROR) {
339
                gf_msg(this->name, GF_LOG_ERROR, 0, 0,
340
                       "could not get file state, unwinding");
341
                op_ret = -1;
342
                op_errno = EIO;
343
                goto unwind;
344
            } else {
345
                __cs_inode_ctx_update(this, local->loc.inode, val);
346
                gf_msg(this->name, GF_LOG_INFO, 0, 0, " state = %" PRIu64, val);
347

348
                if (local->call_cnt == 1 &&
349
                    (val == GF_CS_REMOTE || val == GF_CS_DOWNLOADING)) {
350
                    gf_msg(this->name, GF_LOG_WARNING, 0, 0,
351
                           "will repair and download "
352
                           "the file, current state : %" PRIu64,
353
                           val);
354
                    goto repair;
355
                } else {
356
                    gf_msg(this->name, GF_LOG_ERROR, 0, 0,
357
                           "second truncate, Unwinding");
358
                    goto unwind;
359
                }
360
            }
361
        } else {
362
            gf_msg(this->name, GF_LOG_ERROR, 0, 0,
363
                   "file state "
364
                   "could not be figured, unwinding");
365
            goto unwind;
366
        }
367
    } else {
368
        /* successful write => file is local */
369
        __cs_inode_ctx_update(this, local->loc.inode, GF_CS_LOCAL);
370
        gf_msg(this->name, GF_LOG_INFO, 0, 0,
371
               "state : GF_CS_LOCAL"
372
               ", truncate successful");
373

374
        goto unwind;
375
    }
376

377
repair:
378
    ret = locate_and_execute(frame);
379
    if (ret) {
380
        goto unwind;
381
    }
382

383
    return 0;
384

385
unwind:
386
    CS_STACK_UNWIND(truncate, frame, op_ret, op_errno, prebuf, postbuf, xdata);
387
    return 0;
388
}
389

390
int32_t
391
cs_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset,
392
            dict_t *xdata)
393
{
394
    cs_local_t *local = NULL;
395
    int ret = 0;
396
    cs_inode_ctx_t *ctx = NULL;
397
    gf_cs_obj_state state = -1;
398

399
    VALIDATE_OR_GOTO(frame, err);
400
    VALIDATE_OR_GOTO(this, err);
401
    VALIDATE_OR_GOTO(loc, err);
402

403
    local = cs_local_init(this, frame, loc, NULL, GF_FOP_TRUNCATE);
404
    if (!local) {
405
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "local init failed");
406
        goto err;
407
    }
408

409
    __cs_inode_ctx_get(this, loc->inode, &ctx);
410

411
    if (ctx)
412
        state = __cs_get_file_state(loc->inode, ctx);
413
    else
414
        state = GF_CS_LOCAL;
415

416
    local->xattr_req = xdata ? dict_ref(xdata) : (xdata = dict_new());
417

418
    ret = dict_set_uint32(local->xattr_req, GF_CS_OBJECT_STATUS, 1);
419
    if (ret) {
420
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
421
               "dict_set failed key:"
422
               " %s",
423
               GF_CS_OBJECT_STATUS);
424
        goto err;
425
    }
426

427
    local->stub = fop_truncate_stub(frame, cs_resume_truncate, loc, offset,
428
                                    xdata);
429
    if (!local->stub) {
430
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "insufficient memory");
431
        goto err;
432
    }
433

434
    if (state == GF_CS_LOCAL) {
435
        STACK_WIND(frame, cs_truncate_cbk, FIRST_CHILD(this),
436
                   FIRST_CHILD(this)->fops->truncate, loc, offset, xdata);
437

438
    } else {
439
        local->call_cnt++;
440
        ret = locate_and_execute(frame);
441
        if (ret) {
442
            goto err;
443
        }
444
    }
445

446
    return 0;
447
err:
448
    CS_STACK_UNWIND(truncate, frame, -1, ENOMEM, NULL, NULL, NULL);
449
    return 0;
450
}
451

452
int32_t
453
cs_statfs_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
454
              int32_t op_errno, struct statvfs *buf, dict_t *xdata)
455
{
456
    STACK_UNWIND_STRICT(statfs, frame, op_ret, op_errno, buf, xdata);
457
    return 0;
458
}
459

460
int32_t
461
cs_statfs(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
462
{
463
    STACK_WIND(frame, cs_statfs_cbk, FIRST_CHILD(this),
464
               FIRST_CHILD(this)->fops->statfs, loc, xdata);
465
    return 0;
466
}
467

468
int32_t
469
cs_getxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
470
                int32_t op_ret, int32_t op_errno, dict_t *dict, dict_t *xdata)
471
{
472
    STACK_UNWIND_STRICT(getxattr, frame, op_ret, op_errno, dict, xdata);
473
    return 0;
474
}
475

476
int32_t
477
cs_getxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, const char *name,
478
            dict_t *xattr_req)
479
{
480
    STACK_WIND(frame, cs_getxattr_cbk, FIRST_CHILD(this),
481
               FIRST_CHILD(this)->fops->getxattr, loc, name, xattr_req);
482
    return 0;
483
}
484

485
int32_t
486
cs_setxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
487
                int32_t op_ret, int32_t op_errno, dict_t *xdata)
488
{
489
    cs_local_t *local = NULL;
490

491
    local = frame->local;
492

493
    if (local->locked)
494
        cs_inodelk_unlock(frame);
495

496
    CS_STACK_UNWIND(setxattr, frame, op_ret, op_errno, xdata);
497

498
    return 0;
499
}
500

501
int32_t
502
cs_setxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict,
503
            int32_t flags, dict_t *xdata)
504
{
505
    data_t *tmp = NULL;
506
    cs_local_t *local = NULL;
507
    int ret = 0;
508

509
    VALIDATE_OR_GOTO(frame, err);
510
    VALIDATE_OR_GOTO(this, err);
511

512
    local = cs_local_init(this, frame, loc, NULL, GF_FOP_SETXATTR);
513
    if (!local) {
514
        ret = -1;
515
        goto err;
516
    }
517

518
    local->xattr_req = xdata ? dict_ref(xdata) : (xdata = dict_new());
519

520
    tmp = dict_get_sizen(dict, GF_CS_OBJECT_UPLOAD_COMPLETE);
521
    if (tmp) {
522
        /* Value of key should be the atime */
523
        local->stub = fop_setxattr_stub(frame, cs_resume_setxattr, loc, dict,
524
                                        flags, xdata);
525

526
        if (!local->stub)
527
            goto err;
528

529
        ret = locate_and_execute(frame);
530
        if (ret) {
531
            goto err;
532
        }
533

534
        return 0;
535
    }
536

537
    STACK_WIND(frame, cs_setxattr_cbk, FIRST_CHILD(this),
538
               FIRST_CHILD(this)->fops->setxattr, loc, dict, flags, xdata);
539
    return 0;
540
err:
541
    CS_STACK_UNWIND(setxattr, frame, -1, errno, NULL);
542
    return 0;
543
}
544

545
int32_t
546
cs_fgetxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
547
                 int32_t op_ret, int32_t op_errno, dict_t *dict, dict_t *xdata)
548
{
549
    STACK_UNWIND_STRICT(fgetxattr, frame, op_ret, op_errno, dict, xdata);
550
    return 0;
551
}
552

553
int32_t
554
cs_fgetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, const char *name,
555
             dict_t *xdata)
556
{
557
    STACK_WIND(frame, cs_fgetxattr_cbk, FIRST_CHILD(this),
558
               FIRST_CHILD(this)->fops->fgetxattr, fd, name, xdata);
559
    return 0;
560
}
561

562
int32_t
563
cs_fsetxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
564
                 int32_t op_ret, int32_t op_errno, dict_t *xdata)
565
{
566
    STACK_UNWIND_STRICT(fsetxattr, frame, op_ret, op_errno, xdata);
567
    return 0;
568
}
569

570
int32_t
571
cs_fsetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict,
572
             int32_t flags, dict_t *xdata)
573
{
574
    STACK_WIND(frame, cs_fsetxattr_cbk, FIRST_CHILD(this),
575
               FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata);
576
    return 0;
577
}
578

579
int32_t
580
cs_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
581
              int32_t op_errno, struct iatt *preparent, struct iatt *postparent,
582
              dict_t *xdata)
583
{
584
    STACK_UNWIND_STRICT(unlink, frame, op_ret, op_errno, preparent, postparent,
585
                        xdata);
586
    return 0;
587
}
588

589
int32_t
590
cs_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
591
          dict_t *xattr_req)
592
{
593
    cs_local_t *local = NULL;
594
    int ret = 0;
595

596
    local = cs_local_init(this, frame, loc, NULL, GF_FOP_UNLINK);
597
    if (!local)
598
        goto err;
599

600
    local->xattr_req = xattr_req ? dict_ref(xattr_req) : dict_new();
601

602
    ret = dict_set_uint32(local->xattr_req, GF_CS_OBJECT_STATUS, 1);
603
    if (ret) {
604
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
605
               "dict_set failed key:"
606
               " %s",
607
               GF_CS_OBJECT_STATUS);
608
        goto err;
609
    }
610
    STACK_WIND(frame, cs_unlink_cbk, FIRST_CHILD(this),
611
               FIRST_CHILD(this)->fops->unlink, loc, flags, local->xattr_req);
612
    return 0;
613
err:
614
    CS_STACK_UNWIND(unlink, frame, -1, errno, NULL, NULL, NULL);
615
    return 0;
616
}
617

618
int32_t
619
cs_open_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
620
            int32_t op_errno, fd_t *fd, dict_t *xdata)
621
{
622
    int ret = 0;
623
    uint64_t val = 0;
624

625
    if (op_ret == 0) {
626
        ret = dict_get_uint64(xdata, GF_CS_OBJECT_STATUS, &val);
627
        if (!ret) {
628
            ret = __cs_inode_ctx_update(this, fd->inode, val);
629
            if (ret) {
630
                gf_msg(this->name, GF_LOG_ERROR, 0, 0, "ctx update failed");
631
            }
632
        }
633
    } else {
634
        cs_inode_ctx_reset(this, fd->inode);
635
    }
636

637
    CS_STACK_UNWIND(open, frame, op_ret, op_errno, fd, xdata);
638
    return 0;
639
}
640

641
int32_t
642
cs_open(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
643
        fd_t *fd, dict_t *xattr_req)
644
{
645
    cs_local_t *local = NULL;
646
    int ret = 0;
647

648
    local = cs_local_init(this, frame, NULL, fd, GF_FOP_OPEN);
649
    if (!local)
650
        goto err;
651

652
    local->xattr_req = xattr_req ? dict_ref(xattr_req) : dict_new();
653

654
    ret = dict_set_uint32(local->xattr_req, GF_CS_OBJECT_STATUS, 1);
655
    if (ret) {
656
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
657
               "dict_set failed key:"
658
               " %s",
659
               GF_CS_OBJECT_STATUS);
660
        goto err;
661
    }
662

663
    STACK_WIND(frame, cs_open_cbk, FIRST_CHILD(this),
664
               FIRST_CHILD(this)->fops->open, loc, flags, fd, local->xattr_req);
665
    return 0;
666
err:
667
    CS_STACK_UNWIND(open, frame, -1, errno, NULL, NULL);
668
    return 0;
669
}
670

671
int32_t
672
cs_fstat_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
673
             int32_t op_errno, struct iatt *buf, dict_t *xdata)
674
{
675
    int ret = 0;
676
    uint64_t val = 0;
677
    fd_t *fd = NULL;
678
    cs_local_t *local = NULL;
679

680
    local = frame->local;
681

682
    fd = local->fd;
683

684
    if (op_ret == 0) {
685
        ret = dict_get_uint64(xdata, GF_CS_OBJECT_STATUS, &val);
686
        if (!ret) {
687
            gf_msg_debug(this->name, 0, "state %" PRIu64, val);
688
            ret = __cs_inode_ctx_update(this, fd->inode, val);
689
            if (ret) {
690
                gf_msg(this->name, GF_LOG_ERROR, 0, 0, "ctx update failed");
691
            }
692
        }
693
    } else {
694
        cs_inode_ctx_reset(this, fd->inode);
695
    }
696

697
    CS_STACK_UNWIND(fstat, frame, op_ret, op_errno, buf, xdata);
698

699
    return 0;
700
}
701

702
int32_t
703
cs_fstat(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xattr_req)
704
{
705
    cs_local_t *local = NULL;
706
    int ret = 0;
707

708
    local = cs_local_init(this, frame, NULL, fd, GF_FOP_FSTAT);
709
    if (!local)
710
        goto err;
711

712
    if (fd->inode->ia_type == IA_IFDIR)
713
        goto wind;
714

715
    local->xattr_req = xattr_req ? dict_ref(xattr_req) : dict_new();
716

717
    ret = dict_set_uint32(local->xattr_req, GF_CS_OBJECT_STATUS, 1);
718
    if (ret) {
719
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
720
               "dict_set failed key:"
721
               " %s",
722
               GF_CS_OBJECT_STATUS);
723
        goto err;
724
    }
725

726
wind:
727
    STACK_WIND(frame, cs_fstat_cbk, FIRST_CHILD(this),
728
               FIRST_CHILD(this)->fops->fstat, fd, local->xattr_req);
729
    return 0;
730
err:
731
    CS_STACK_UNWIND(fstat, frame, -1, errno, NULL, NULL);
732
    return 0;
733
}
734

735
cs_local_t *
736
cs_local_init(xlator_t *this, call_frame_t *frame, loc_t *loc, fd_t *fd,
737
              glusterfs_fop_t fop)
738
{
739
    cs_local_t *local = NULL;
740
    int ret = 0;
741

742
    local = mem_get0(this->local_pool);
743
    if (!local)
744
        goto out;
745

746
    if (loc) {
747
        ret = loc_copy(&local->loc, loc);
748
        if (ret)
749
            goto out;
750
    }
751

752
    if (fd) {
753
        local->fd = fd_ref(fd);
754
    }
755

756
    local->op_ret = -1;
757
    local->op_errno = EUCLEAN;
758
    local->fop = fop;
759
    local->dloffset = 0;
760
    frame->local = local;
761
    local->locked = _gf_false;
762
    local->call_cnt = 0;
763
out:
764
    if (ret) {
765
        if (local)
766
            mem_put(local);
767
        local = NULL;
768
    }
769

770
    return local;
771
}
772

773
call_frame_t *
774
cs_lock_frame(call_frame_t *parent_frame)
775
{
776
    call_frame_t *lock_frame = NULL;
777

778
    lock_frame = copy_frame(parent_frame);
779

780
    if (lock_frame == NULL)
781
        goto out;
782

783
    set_lk_owner_from_ptr(&lock_frame->root->lk_owner, parent_frame->root);
784

785
out:
786
    return lock_frame;
787
}
788

789
void
790
cs_lock_wipe(call_frame_t *lock_frame)
791
{
792
    CS_STACK_DESTROY(lock_frame);
793
}
794

795
int32_t
796
cs_inodelk_unlock_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
797
                      int32_t op_ret, int32_t op_errno, dict_t *xdata)
798
{
799
    cs_lock_wipe(frame);
800

801
    return 0;
802
}
803

804
int
805
cs_inodelk_unlock(call_frame_t *main_frame)
806
{
807
    xlator_t *this = NULL;
808
    struct gf_flock flock = {
809
        0,
810
    };
811
    call_frame_t *lock_frame = NULL;
812
    cs_local_t *lock_local = NULL;
813
    cs_local_t *main_local = NULL;
814
    int ret = 0;
815

816
    this = main_frame->this;
817
    main_local = main_frame->local;
818

819
    lock_frame = cs_lock_frame(main_frame);
820
    if (!lock_frame)
821
        goto out;
822

823
    lock_local = cs_local_init(this, lock_frame, NULL, NULL, 0);
824
    if (!lock_local)
825
        goto out;
826

827
    ret = cs_build_loc(&lock_local->loc, main_frame);
828
    if (ret) {
829
        goto out;
830
    }
831

832
    flock.l_type = F_UNLCK;
833

834
    main_local->locked = _gf_false;
835

836
    STACK_WIND(lock_frame, cs_inodelk_unlock_cbk, FIRST_CHILD(this),
837
               FIRST_CHILD(this)->fops->inodelk, CS_LOCK_DOMAIN,
838
               &lock_local->loc, F_SETLKW, &flock, NULL);
839

840
    return 0;
841

842
out:
843
    gf_msg(this->name, GF_LOG_ERROR, 0, 0,
844
           "Stale lock would be found on"
845
           " server");
846

847
    if (lock_frame)
848
        cs_lock_wipe(lock_frame);
849

850
    return 0;
851
}
852

853
int
854
cs_download_task(void *arg)
855
{
856
    call_frame_t *frame = NULL;
857
    xlator_t *this = NULL;
858
    cs_private_t *priv = NULL;
859
    int ret = -1;
860
    char *sign_req = NULL;
861
    fd_t *fd = NULL;
862
    cs_local_t *local = NULL;
863
    dict_t *dict = NULL;
864

865
    frame = (call_frame_t *)arg;
866

867
    this = frame->this;
868

869
    priv = this->private;
870

871
    if (!priv->stores) {
872
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
873
               "No remote store "
874
               "plugins found");
875
        ret = -1;
876
        goto out;
877
    }
878

879
    local = frame->local;
880

881
    if (local->fd)
882
        fd = fd_anonymous(local->fd->inode);
883
    else
884
        fd = fd_anonymous(local->loc.inode);
885

886
    if (!fd) {
887
        gf_msg("CS", GF_LOG_ERROR, 0, 0, "fd creation failed");
888
        ret = -1;
889
        goto out;
890
    }
891

892
    local->dlfd = fd;
893
    local->dloffset = 0;
894

895
    dict = dict_new();
896
    if (!dict) {
897
        gf_msg(this->name, GF_LOG_ERROR, 0, ENOMEM,
898
               "failed to create "
899
               "dict");
900
        ret = -1;
901
        goto out;
902
    }
903

904
    ret = dict_set_uint32(dict, GF_CS_OBJECT_DOWNLOADING, 1);
905
    if (ret) {
906
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "dict_set failed");
907
        ret = -1;
908
        goto out;
909
    }
910

911
    ret = syncop_fsetxattr(this, local->fd, dict, 0, NULL, NULL);
912
    if (ret) {
913
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
914
               "fsetxattr failed "
915
               "key %s",
916
               GF_CS_OBJECT_DOWNLOADING);
917
        ret = -1;
918
        goto out;
919
    }
920
    /*this calling method is for per volume setting */
921
    ret = priv->stores->dlfop(frame, priv->stores->config);
922
    if (ret) {
923
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
924
               "download failed"
925
               ", remotepath: %s",
926
               local->remotepath);
927

928
        /*using dlfd as it is anonymous and have RDWR flag*/
929
        ret = syncop_ftruncate(FIRST_CHILD(this), local->dlfd, 0, NULL, NULL,
930
                               NULL, NULL);
931
        if (ret) {
932
            gf_msg(this->name, GF_LOG_ERROR, 0, -ret, "ftruncate failed");
933
        } else {
934
            gf_msg_debug(this->name, 0, "ftruncate succeed");
935
        }
936

937
        ret = -1;
938
        goto out;
939
    } else {
940
        gf_msg(this->name, GF_LOG_INFO, 0, 0,
941
               "download success, path"
942
               " : %s",
943
               local->remotepath);
944

945
        ret = syncop_fremovexattr(this, local->fd, GF_CS_OBJECT_REMOTE, NULL,
946
                                  NULL);
947
        if (ret) {
948
            gf_msg(this->name, GF_LOG_ERROR, 0, -ret,
949
                   "removexattr failed, remotexattr");
950
            ret = -1;
951
            goto out;
952
        } else {
953
            gf_msg_debug(this->name, 0,
954
                         "fremovexattr success, "
955
                         "path : %s",
956
                         local->remotepath);
957
        }
958

959
        ret = syncop_fremovexattr(this, local->fd, GF_CS_OBJECT_DOWNLOADING,
960
                                  NULL, NULL);
961
        if (ret) {
962
            gf_msg(this->name, GF_LOG_ERROR, 0, -ret,
963
                   "removexattr failed, downloading xattr, path %s",
964
                   local->remotepath);
965
            ret = -1;
966
            goto out;
967
        } else {
968
            gf_msg_debug(this->name, 0,
969
                         "fremovexattr success"
970
                         " path  %s",
971
                         local->remotepath);
972
        }
973
    }
974

975
out:
976
    GF_FREE(sign_req);
977

978
    if (dict)
979
        dict_unref(dict);
980

981
    if (fd) {
982
        fd_unref(fd);
983
        local->dlfd = NULL;
984
    }
985

986
    return ret;
987
}
988

989
int
990
cs_download(call_frame_t *frame)
991
{
992
    int ret = 0;
993
    cs_local_t *local = NULL;
994
    xlator_t *this = NULL;
995

996
    local = frame->local;
997
    this = frame->this;
998

999
    if (!local->remotepath) {
1000
        ret = -1;
1001
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
1002
               "remote path not"
1003
               " available. Check posix logs to resolve");
1004
        goto out;
1005
    }
1006

1007
    ret = cs_download_task((void *)frame);
1008
out:
1009
    return ret;
1010
}
1011

1012
int
1013
cs_set_xattr_req(call_frame_t *frame)
1014
{
1015
    cs_local_t *local = NULL;
1016
    GF_UNUSED int ret = 0;
1017

1018
    local = frame->local;
1019

1020
    /* When remote reads are performed (i.e. reads on remote store),
1021
     * there needs to be a way to associate a file on gluster volume
1022
     * with its correspnding file on the remote store. In order to do
1023
     * that, a unique key can be maintained as an xattr
1024
     * (GF_CS_XATTR_ARCHIVE_UUID)on the stub file on gluster bricks.
1025
     * This xattr should be provided to the plugin to
1026
     * perform the read fop on the correct file. This assumes that the file
1027
     * hierarchy and name need not be the same on remote store as that of
1028
     * the gluster volume.
1029
     */
1030
    ret = dict_set_sizen_str_sizen(local->xattr_req, GF_CS_XATTR_ARCHIVE_UUID,
1031
                                   "1");
1032

1033
    return 0;
1034
}
1035

1036
int
1037
cs_update_xattrs(call_frame_t *frame, dict_t *xdata)
1038
{
1039
    cs_local_t *local = NULL;
1040
    xlator_t *this = NULL;
1041
    int size = -1;
1042
    GF_UNUSED int ret = 0;
1043

1044
    local = frame->local;
1045
    this = frame->this;
1046

1047
    local->xattrinfo.lxattr = GF_CALLOC(1, sizeof(cs_loc_xattr_t),
1048
                                        gf_cs_mt_cs_lxattr_t);
1049
    if (!local->xattrinfo.lxattr) {
1050
        local->op_ret = -1;
1051
        local->op_errno = ENOMEM;
1052
        goto err;
1053
    }
1054

1055
    gf_uuid_copy(local->xattrinfo.lxattr->gfid, local->loc.gfid);
1056

1057
    if (local->remotepath) {
1058
        local->xattrinfo.lxattr->file_path = gf_strdup(local->remotepath);
1059
        if (!local->xattrinfo.lxattr->file_path) {
1060
            local->op_ret = -1;
1061
            local->op_errno = ENOMEM;
1062
            goto err;
1063
        }
1064
    }
1065

1066
    ret = dict_get_gfuuid(xdata, GF_CS_XATTR_ARCHIVE_UUID,
1067
                          &(local->xattrinfo.lxattr->uuid));
1068

1069
    if (ret) {
1070
        gf_uuid_clear(local->xattrinfo.lxattr->uuid);
1071
    }
1072
    size = strlen(this->name) - strlen("-cloudsync") + 1;
1073
    local->xattrinfo.lxattr->volname = GF_CALLOC(1, size, gf_common_mt_char);
1074
    if (!local->xattrinfo.lxattr->volname) {
1075
        local->op_ret = -1;
1076
        local->op_errno = ENOMEM;
1077
        goto err;
1078
    }
1079
    strncpy(local->xattrinfo.lxattr->volname, this->name, size - 1);
1080
    local->xattrinfo.lxattr->volname[size - 1] = '\0';
1081

1082
    return 0;
1083
err:
1084
    cs_xattrinfo_wipe(local);
1085
    return -1;
1086
}
1087

1088
int
1089
cs_serve_readv(call_frame_t *frame, off_t offset, size_t size, uint32_t flags)
1090
{
1091
    xlator_t *this = NULL;
1092
    cs_private_t *priv = NULL;
1093
    int ret = -1;
1094
    fd_t *fd = NULL;
1095
    cs_local_t *local = NULL;
1096

1097
    local = frame->local;
1098
    this = frame->this;
1099
    priv = this->private;
1100

1101
    if (!local->remotepath) {
1102
        ret = -1;
1103
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
1104
               "remote path not"
1105
               " available. Check posix logs to resolve");
1106
        goto out;
1107
    }
1108

1109
    if (!priv->stores) {
1110
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
1111
               "No remote store "
1112
               "plugins found");
1113
        ret = -1;
1114
        goto out;
1115
    }
1116

1117
    if (local->fd) {
1118
        fd = fd_anonymous(local->fd->inode);
1119
    } else {
1120
        fd = fd_anonymous(local->loc.inode);
1121
    }
1122

1123
    local->xattrinfo.size = size;
1124
    local->xattrinfo.offset = offset;
1125
    local->xattrinfo.flags = flags;
1126

1127
    if (!fd) {
1128
        gf_msg("CS", GF_LOG_ERROR, 0, 0, "fd creation failed");
1129
        ret = -1;
1130
        goto out;
1131
    }
1132

1133
    local->dlfd = fd;
1134
    local->dloffset = offset;
1135

1136
    /*this calling method is for per volume setting */
1137
    ret = priv->stores->rdfop(frame, priv->stores->config);
1138
    if (ret) {
1139
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
1140
               "read failed"
1141
               ", remotepath: %s",
1142
               local->remotepath);
1143
        ret = -1;
1144
        goto out;
1145
    } else {
1146
        gf_msg(this->name, GF_LOG_INFO, 0, 0,
1147
               "read success, path"
1148
               " : %s",
1149
               local->remotepath);
1150
    }
1151

1152
out:
1153
    if (fd) {
1154
        fd_unref(fd);
1155
        local->dlfd = NULL;
1156
    }
1157
    return ret;
1158
}
1159

1160
int32_t
1161
cs_readv_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
1162
             int32_t op_errno, struct iovec *vector, int32_t count,
1163
             struct iatt *stbuf, struct iobref *iobref, dict_t *xdata)
1164
{
1165
    cs_local_t *local = NULL;
1166
    int ret = 0;
1167
    uint64_t val = 0;
1168
    fd_t *fd = NULL;
1169

1170
    local = frame->local;
1171
    fd = local->fd;
1172

1173
    local->call_cnt++;
1174

1175
    if (op_ret == -1) {
1176
        ret = dict_get_uint64(xdata, GF_CS_OBJECT_STATUS, &val);
1177
        if (ret == 0) {
1178
            if (val == GF_CS_ERROR) {
1179
                gf_msg(this->name, GF_LOG_ERROR, 0, 0,
1180
                       "could not get file state, unwinding");
1181
                op_ret = -1;
1182
                op_errno = EIO;
1183
                goto unwind;
1184
            } else {
1185
                __cs_inode_ctx_update(this, fd->inode, val);
1186
                gf_msg(this->name, GF_LOG_INFO, 0, 0, " state = %" PRIu64, val);
1187

1188
                if (local->call_cnt == 1 &&
1189
                    (val == GF_CS_REMOTE || val == GF_CS_DOWNLOADING)) {
1190
                    gf_msg(this->name, GF_LOG_INFO, 0, 0,
1191
                           " will read from remote : %" PRIu64, val);
1192
                    goto repair;
1193
                } else {
1194
                    gf_msg(this->name, GF_LOG_ERROR, 0, 0,
1195
                           "second readv, Unwinding");
1196
                    goto unwind;
1197
                }
1198
            }
1199
        } else {
1200
            gf_msg(this->name, GF_LOG_ERROR, 0, 0,
1201
                   "file state "
1202
                   "could not be figured, unwinding");
1203
            goto unwind;
1204
        }
1205
    } else {
1206
        /* successful readv => file is local */
1207
        __cs_inode_ctx_update(this, fd->inode, GF_CS_LOCAL);
1208
        gf_msg(this->name, GF_LOG_INFO, 0, 0,
1209
               "state : GF_CS_LOCAL"
1210
               ", readv successful");
1211

1212
        goto unwind;
1213
    }
1214

1215
repair:
1216
    ret = locate_and_execute(frame);
1217
    if (ret) {
1218
        goto unwind;
1219
    }
1220

1221
    return 0;
1222

1223
unwind:
1224
    CS_STACK_UNWIND(readv, frame, op_ret, op_errno, vector, count, stbuf,
1225
                    iobref, xdata);
1226

1227
    return 0;
1228
}
1229

1230
int32_t
1231
cs_resume_readv(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
1232
                off_t offset, uint32_t flags, dict_t *xdata)
1233
{
1234
    int ret = 0;
1235

1236
    ret = cs_resume_postprocess(this, frame, fd->inode);
1237
    if (ret) {
1238
        goto unwind;
1239
    }
1240

1241
    cs_inodelk_unlock(frame);
1242

1243
    STACK_WIND(frame, cs_readv_cbk, FIRST_CHILD(this),
1244
               FIRST_CHILD(this)->fops->readv, fd, size, offset, flags, xdata);
1245

1246
    return 0;
1247

1248
unwind:
1249
    cs_inodelk_unlock(frame);
1250

1251
    cs_common_cbk(frame);
1252

1253
    return 0;
1254
}
1255

1256
int32_t
1257
cs_resume_remote_readv(call_frame_t *frame, xlator_t *this, fd_t *fd,
1258
                       size_t size, off_t offset, uint32_t flags, dict_t *xdata)
1259
{
1260
    int ret = 0;
1261
    cs_local_t *local = NULL;
1262
    gf_cs_obj_state state = -1;
1263
    cs_inode_ctx_t *ctx = NULL;
1264

1265
    cs_inodelk_unlock(frame);
1266

1267
    local = frame->local;
1268
    if (!local) {
1269
        ret = -1;
1270
        goto unwind;
1271
    }
1272

1273
    __cs_inode_ctx_get(this, fd->inode, &ctx);
1274

1275
    state = __cs_get_file_state(fd->inode, ctx);
1276
    if (state == GF_CS_ERROR) {
1277
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
1278
               "status is GF_CS_ERROR."
1279
               " Aborting readv");
1280
        local->op_ret = -1;
1281
        local->op_errno = EREMOTE;
1282
        ret = -1;
1283
        goto unwind;
1284
    }
1285

1286
    /* Serve readv from remote store only if it is remote. */
1287
    gf_msg_debug(this->name, 0, "status of file %s is %d",
1288
                 local->remotepath ? local->remotepath : "", state);
1289

1290
    /* We will reach this condition if local inode ctx had REMOTE
1291
     * state when the control was in cs_readv but after stat
1292
     * we got an updated state saying that the file is LOCAL.
1293
     */
1294
    if (state == GF_CS_LOCAL) {
1295
        STACK_WIND(frame, cs_readv_cbk, FIRST_CHILD(this),
1296
                   FIRST_CHILD(this)->fops->readv, fd, size, offset, flags,
1297
                   xdata);
1298
    } else if (state == GF_CS_REMOTE) {
1299
        ret = cs_resume_remote_readv_postprocess(this, frame, fd->inode, offset,
1300
                                                 size, flags);
1301
        /* Failed to submit the remote readv fop to plugin */
1302
        if (ret) {
1303
            local->op_ret = -1;
1304
            local->op_errno = EREMOTE;
1305
            goto unwind;
1306
        }
1307
        /* When the file is in any other intermediate state,
1308
         * we should not perform remote reads.
1309
         */
1310
    } else {
1311
        local->op_ret = -1;
1312
        local->op_errno = EINVAL;
1313
        goto unwind;
1314
    }
1315

1316
    return 0;
1317

1318
unwind:
1319
    cs_common_cbk(frame);
1320

1321
    return 0;
1322
}
1323

1324
int32_t
1325
cs_readv(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
1326
         off_t offset, uint32_t flags, dict_t *xdata)
1327
{
1328
    int op_errno = ENOMEM;
1329
    cs_local_t *local = NULL;
1330
    int ret = 0;
1331
    cs_inode_ctx_t *ctx = NULL;
1332
    gf_cs_obj_state state = -1;
1333
    cs_private_t *priv = NULL;
1334

1335
    VALIDATE_OR_GOTO(frame, err);
1336
    VALIDATE_OR_GOTO(this, err);
1337
    VALIDATE_OR_GOTO(fd, err);
1338

1339
    priv = this->private;
1340

1341
    local = cs_local_init(this, frame, NULL, fd, GF_FOP_READ);
1342
    if (!local) {
1343
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "local init failed");
1344
        goto err;
1345
    }
1346

1347
    __cs_inode_ctx_get(this, fd->inode, &ctx);
1348

1349
    if (ctx)
1350
        state = __cs_get_file_state(fd->inode, ctx);
1351
    else
1352
        state = GF_CS_LOCAL;
1353

1354
    local->xattr_req = xdata ? dict_ref(xdata) : (xdata = dict_new());
1355

1356
    ret = dict_set_uint32(local->xattr_req, GF_CS_OBJECT_STATUS, 1);
1357
    if (ret) {
1358
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
1359
               "dict_set failed key:"
1360
               " %s",
1361
               GF_CS_OBJECT_STATUS);
1362
        goto err;
1363
    }
1364

1365
    if (priv->remote_read) {
1366
        local->stub = fop_readv_stub(frame, cs_resume_remote_readv, fd, size,
1367
                                     offset, flags, xdata);
1368
    } else {
1369
        local->stub = fop_readv_stub(frame, cs_resume_readv, fd, size, offset,
1370
                                     flags, xdata);
1371
    }
1372
    if (!local->stub) {
1373
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "insufficient memory");
1374
        goto err;
1375
    }
1376

1377
    if (state == GF_CS_LOCAL) {
1378
        STACK_WIND(frame, cs_readv_cbk, FIRST_CHILD(this),
1379
                   FIRST_CHILD(this)->fops->readv, fd, size, offset, flags,
1380
                   xdata);
1381
    } else {
1382
        local->call_cnt++;
1383
        ret = locate_and_execute(frame);
1384
        if (ret) {
1385
            goto err;
1386
        }
1387
    }
1388

1389
    return 0;
1390

1391
err:
1392
    CS_STACK_UNWIND(readv, frame, -1, op_errno, NULL, -1, NULL, NULL, NULL);
1393

1394
    return 0;
1395
}
1396

1397
int
1398
cs_resume_remote_readv_postprocess(xlator_t *this, call_frame_t *frame,
1399
                                   inode_t *inode, off_t offset, size_t size,
1400
                                   uint32_t flags)
1401
{
1402
    int ret = 0;
1403

1404
    ret = cs_serve_readv(frame, offset, size, flags);
1405

1406
    return ret;
1407
}
1408

1409
int
1410
cs_stat_check_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,
1411
                  int op_errno, struct iatt *stbuf, dict_t *xdata)
1412
{
1413
    cs_local_t *local = NULL;
1414
    call_stub_t *stub = NULL;
1415
    char *filepath = NULL;
1416
    int ret = 0;
1417
    inode_t *inode = NULL;
1418
    uint64_t val = 0;
1419

1420
    local = frame->local;
1421

1422
    if (op_ret == -1) {
1423
        local->op_ret = op_ret;
1424
        local->op_errno = op_errno;
1425
        gf_msg(this->name, GF_LOG_ERROR, 0, op_errno, "stat check failed");
1426
        goto err;
1427
    } else {
1428
        if (local->fd)
1429
            inode = local->fd->inode;
1430
        else
1431
            inode = local->loc.inode;
1432

1433
        if (!inode) {
1434
            local->op_ret = -1;
1435
            local->op_errno = EINVAL;
1436
            gf_msg(this->name, GF_LOG_ERROR, 0, 0,
1437
                   "null inode "
1438
                   "returned");
1439
            goto err;
1440
        }
1441

1442
        ret = dict_get_uint64(xdata, GF_CS_OBJECT_STATUS, &val);
1443
        if (ret == 0) {
1444
            if (val == GF_CS_ERROR) {
1445
                cs_inode_ctx_reset(this, inode);
1446
                local->op_ret = -1;
1447
                local->op_errno = EIO;
1448
                gf_msg(this->name, GF_LOG_ERROR, 0, 0,
1449
                       "status = GF_CS_ERROR. failed to get "
1450
                       " file state");
1451
                goto err;
1452
            } else {
1453
                ret = __cs_inode_ctx_update(this, inode, val);
1454
                gf_msg_debug(this->name, 0, "status : %" PRIu64, val);
1455
                if (ret) {
1456
                    gf_msg(this->name, GF_LOG_ERROR, 0, 0, "ctx update failed");
1457
                    local->op_ret = -1;
1458
                    local->op_errno = ENOMEM;
1459
                    goto err;
1460
                }
1461
            }
1462
        } else {
1463
            gf_msg_debug(this->name, 0, "status not found in dict");
1464
            local->op_ret = -1;
1465
            local->op_errno = ENOMEM;
1466
            goto err;
1467
        }
1468

1469
        ret = dict_get_str_sizen(xdata, GF_CS_OBJECT_REMOTE, &filepath);
1470
        if (filepath) {
1471
            gf_msg_debug(this->name, 0, "filepath returned %s", filepath);
1472
            local->remotepath = gf_strdup(filepath);
1473
            if (!local->remotepath) {
1474
                local->op_ret = -1;
1475
                local->op_errno = ENOMEM;
1476
                goto err;
1477
            }
1478
        } else {
1479
            gf_msg_debug(this->name, 0, "NULL filepath");
1480
        }
1481

1482
        ret = cs_update_xattrs(frame, xdata);
1483
        if (ret)
1484
            goto err;
1485

1486
        local->op_ret = 0;
1487
        local->xattr_rsp = dict_ref(xdata);
1488
        memcpy(&local->stbuf, stbuf, sizeof(struct iatt));
1489
    }
1490

1491
    stub = local->stub;
1492
    local->stub = NULL;
1493
    call_resume(stub);
1494

1495
    return 0;
1496
err:
1497
    cs_inodelk_unlock(frame);
1498

1499
    cs_common_cbk(frame);
1500

1501
    return 0;
1502
}
1503

1504
int
1505
cs_do_stat_check(call_frame_t *main_frame)
1506
{
1507
    cs_local_t *local = NULL;
1508
    xlator_t *this = NULL;
1509
    int ret = 0;
1510

1511
    local = main_frame->local;
1512
    this = main_frame->this;
1513

1514
    ret = dict_set_uint32(local->xattr_req, GF_CS_OBJECT_REPAIR, 256);
1515
    if (ret) {
1516
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "dict_set failed");
1517
        goto err;
1518
    }
1519

1520
    cs_set_xattr_req(main_frame);
1521

1522
    if (local->fd) {
1523
        STACK_WIND(main_frame, cs_stat_check_cbk, FIRST_CHILD(this),
1524
                   FIRST_CHILD(this)->fops->fstat, local->fd, local->xattr_req);
1525
    } else {
1526
        STACK_WIND(main_frame, cs_stat_check_cbk, FIRST_CHILD(this),
1527
                   FIRST_CHILD(this)->fops->stat, &local->loc,
1528
                   local->xattr_req);
1529
    }
1530

1531
    return 0;
1532

1533
err:
1534
    cs_inodelk_unlock(main_frame);
1535

1536
    cs_common_cbk(main_frame);
1537

1538
    return 0;
1539
}
1540

1541
void
1542
cs_common_cbk(call_frame_t *frame)
1543
{
1544
    glusterfs_fop_t fop = -1;
1545
    cs_local_t *local = NULL;
1546

1547
    local = frame->local;
1548

1549
    fop = local->fop;
1550

1551
    /*Note: Only the failure case needs to be handled here. Since for
1552
     * successful stat check the fop will resume anyway. The unwind can
1553
     * happen from the fop_cbk and each cbk can unlock the inodelk in case
1554
     * a lock was taken before. The lock status can be stored in frame */
1555

1556
    /* for failure case  */
1557

1558
    /*TODO: add other fops*/
1559
    switch (fop) {
1560
        case GF_FOP_WRITE:
1561
            CS_STACK_UNWIND(writev, frame, local->op_ret, local->op_errno, NULL,
1562
                            NULL, NULL);
1563
            break;
1564

1565
        case GF_FOP_SETXATTR:
1566
            CS_STACK_UNWIND(setxattr, frame, local->op_ret, local->op_errno,
1567
                            NULL);
1568
            break;
1569
        case GF_FOP_READ:
1570
            CS_STACK_UNWIND(readv, frame, local->op_ret, local->op_errno, NULL,
1571
                            0, NULL, NULL, NULL);
1572
            break;
1573
        case GF_FOP_FTRUNCATE:
1574
            CS_STACK_UNWIND(ftruncate, frame, local->op_ret, local->op_errno,
1575
                            NULL, NULL, NULL);
1576
            break;
1577

1578
        case GF_FOP_TRUNCATE:
1579
            CS_STACK_UNWIND(truncate, frame, local->op_ret, local->op_errno,
1580
                            NULL, NULL, NULL);
1581
            break;
1582
        default:
1583
            break;
1584
    }
1585

1586
    return;
1587
}
1588

1589
int
1590
cs_blocking_inodelk_cbk(call_frame_t *lock_frame, void *cookie, xlator_t *this,
1591
                        int32_t op_ret, int32_t op_errno, dict_t *xdata)
1592
{
1593
    cs_local_t *main_local = NULL;
1594
    call_frame_t *main_frame = NULL;
1595
    cs_local_t *lock_local = NULL;
1596

1597
    lock_local = lock_frame->local;
1598

1599
    main_frame = lock_local->main_frame;
1600
    main_local = main_frame->local;
1601

1602
    if (op_ret) {
1603
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "inodelk failed");
1604
        main_local->op_errno = op_errno;
1605
        main_local->op_ret = op_ret;
1606
        goto err;
1607
    }
1608

1609
    main_local->locked = _gf_true;
1610

1611
    cs_lock_wipe(lock_frame);
1612

1613
    cs_do_stat_check(main_frame);
1614

1615
    return 0;
1616
err:
1617
    cs_common_cbk(main_frame);
1618

1619
    cs_lock_wipe(lock_frame);
1620

1621
    return 0;
1622
}
1623

1624
int
1625
cs_build_loc(loc_t *loc, call_frame_t *frame)
1626
{
1627
    cs_local_t *local = NULL;
1628
    int ret = -1;
1629

1630
    local = frame->local;
1631

1632
    if (local->fd) {
1633
        loc->inode = inode_ref(local->fd->inode);
1634
        if (loc->inode) {
1635
            gf_uuid_copy(loc->gfid, loc->inode->gfid);
1636
            ret = 0;
1637
            goto out;
1638
        } else {
1639
            ret = -1;
1640
            goto out;
1641
        }
1642
    } else {
1643
        loc->inode = inode_ref(local->loc.inode);
1644
        if (loc->inode) {
1645
            gf_uuid_copy(loc->gfid, loc->inode->gfid);
1646
            ret = 0;
1647
            goto out;
1648
        } else {
1649
            ret = -1;
1650
            goto out;
1651
        }
1652
    }
1653
out:
1654
    return ret;
1655
}
1656

1657
int
1658
cs_blocking_inodelk(call_frame_t *parent_frame)
1659
{
1660
    call_frame_t *lock_frame = NULL;
1661
    cs_local_t *lock_local = NULL;
1662
    xlator_t *this = NULL;
1663
    struct gf_flock flock = {
1664
        0,
1665
    };
1666
    int ret = 0;
1667

1668
    this = parent_frame->this;
1669

1670
    lock_frame = cs_lock_frame(parent_frame);
1671
    if (!lock_frame) {
1672
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "insuffcient memory");
1673
        goto err;
1674
    }
1675

1676
    lock_local = cs_local_init(this, lock_frame, NULL, NULL, 0);
1677
    if (!lock_local) {
1678
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "local init failed");
1679
        goto err;
1680
    }
1681

1682
    lock_local->main_frame = parent_frame;
1683

1684
    flock.l_type = F_WRLCK;
1685

1686
    ret = cs_build_loc(&lock_local->loc, parent_frame);
1687
    if (ret) {
1688
        gf_msg(this->name, GF_LOG_ERROR, 0, 0, "build_loc failed");
1689
        goto err;
1690
    }
1691

1692
    STACK_WIND(lock_frame, cs_blocking_inodelk_cbk, FIRST_CHILD(this),
1693
               FIRST_CHILD(this)->fops->inodelk, CS_LOCK_DOMAIN,
1694
               &lock_local->loc, F_SETLKW, &flock, NULL);
1695

1696
    return 0;
1697
err:
1698
    if (lock_frame)
1699
        cs_lock_wipe(lock_frame);
1700

1701
    return -1;
1702
}
1703

1704
int
1705
locate_and_execute(call_frame_t *frame)
1706
{
1707
    int ret = 0;
1708

1709
    ret = cs_blocking_inodelk(frame);
1710

1711
    if (ret)
1712
        return -1;
1713
    else
1714
        return 0;
1715
}
1716

1717
int32_t
1718
cs_resume_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc,
1719
                   off_t offset, dict_t *xattr_req)
1720
{
1721
    cs_local_t *local = NULL;
1722
    int ret = 0;
1723

1724
    local = frame->local;
1725

1726
    ret = cs_resume_postprocess(this, frame, loc->inode);
1727
    if (ret) {
1728
        goto unwind;
1729
    }
1730

1731
    cs_inodelk_unlock(frame);
1732

1733
    STACK_WIND(frame, cs_truncate_cbk, FIRST_CHILD(this),
1734
               FIRST_CHILD(this)->fops->truncate, loc, offset,
1735
               local->xattr_req);
1736

1737
    return 0;
1738

1739
unwind:
1740
    cs_inodelk_unlock(frame);
1741

1742
    cs_common_cbk(frame);
1743

1744
    return 0;
1745
}
1746

1747
int32_t
1748
cs_resume_setxattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
1749
                   dict_t *dict, int32_t flags, dict_t *xdata)
1750
{
1751
    cs_local_t *local = NULL;
1752
    cs_inode_ctx_t *ctx = NULL;
1753
    gf_cs_obj_state state = GF_CS_ERROR;
1754

1755
    local = frame->local;
1756

1757
    __cs_inode_ctx_get(this, loc->inode, &ctx);
1758

1759
    state = __cs_get_file_state(loc->inode, ctx);
1760

1761
    if (state == GF_CS_ERROR) {
1762
        /* file is already remote */
1763
        local->op_ret = -1;
1764
        local->op_errno = EINVAL;
1765
        gf_msg(this->name, GF_LOG_WARNING, 0, 0,
1766
               "file %s , could not figure file state", loc->path);
1767
        goto unwind;
1768
    }
1769

1770
    if (state == GF_CS_REMOTE) {
1771
        /* file is already remote */
1772
        local->op_ret = -1;
1773
        local->op_errno = EINVAL;
1774
        gf_msg(this->name, GF_LOG_WARNING, 0, EINVAL,
1775
               "file %s is already remote", loc->path);
1776
        goto unwind;
1777
    }
1778

1779
    if (state == GF_CS_DOWNLOADING) {
1780
        gf_msg(this->name, GF_LOG_WARNING, 0, 0,
1781
               " file is in downloading state.");
1782
        local->op_ret = -1;
1783
        local->op_errno = EINVAL;
1784
        goto unwind;
1785
    }
1786

1787
    STACK_WIND(frame, cs_setxattr_cbk, FIRST_CHILD(this),
1788
               FIRST_CHILD(this)->fops->setxattr, loc, dict, flags,
1789
               local->xattr_req);
1790

1791
    return 0;
1792
unwind:
1793
    cs_inodelk_unlock(frame);
1794

1795
    cs_common_cbk(frame);
1796

1797
    return 0;
1798
}
1799

1800
gf_cs_obj_state
1801
__cs_get_file_state(inode_t *inode, cs_inode_ctx_t *ctx)
1802
{
1803
    gf_cs_obj_state state = -1;
1804

1805
    if (!ctx)
1806
        return GF_CS_ERROR;
1807

1808
    LOCK(&inode->lock);
1809
    {
1810
        state = ctx->state;
1811
    }
1812
    UNLOCK(&inode->lock);
1813

1814
    return state;
1815
}
1816

1817
void
1818
__cs_inode_ctx_get(xlator_t *this, inode_t *inode, cs_inode_ctx_t **ctx)
1819
{
1820
    uint64_t ctxint = 0;
1821
    int ret = 0;
1822

1823
    LOCK(&inode->lock);
1824
    {
1825
        ret = __inode_ctx_get(inode, this, &ctxint);
1826
    }
1827
    UNLOCK(&inode->lock);
1828

1829
    if (ret)
1830
        *ctx = NULL;
1831
    else
1832
        *ctx = (cs_inode_ctx_t *)(uintptr_t)ctxint;
1833

1834
    return;
1835
}
1836

1837
int
1838
__cs_inode_ctx_update(xlator_t *this, inode_t *inode, uint64_t val)
1839
{
1840
    cs_inode_ctx_t *ctx = NULL;
1841
    uint64_t ctxint = 0;
1842
    int ret = 0;
1843

1844
    LOCK(&inode->lock);
1845
    {
1846
        ret = __inode_ctx_get(inode, this, &ctxint);
1847
        if (ret) {
1848
            ctx = GF_CALLOC(1, sizeof(*ctx), gf_cs_mt_cs_inode_ctx_t);
1849
            if (!ctx) {
1850
                gf_msg(this->name, GF_LOG_ERROR, 0, 0, "ctx allocation failed");
1851
                ret = -1;
1852
                goto out;
1853
            }
1854

1855
            ctx->state = val;
1856

1857
            ctxint = (uint64_t)(uintptr_t)ctx;
1858

1859
            ret = __inode_ctx_set(inode, this, &ctxint);
1860
            if (ret) {
1861
                GF_FREE(ctx);
1862
                goto out;
1863
            }
1864
        } else {
1865
            ctx = (cs_inode_ctx_t *)(uintptr_t)ctxint;
1866

1867
            ctx->state = val;
1868
        }
1869
    }
1870

1871
out:
1872
    UNLOCK(&inode->lock);
1873

1874
    return ret;
1875
}
1876

1877
int
1878
cs_inode_ctx_reset(xlator_t *this, inode_t *inode)
1879
{
1880
    cs_inode_ctx_t *ctx = NULL;
1881
    uint64_t ctxint = 0;
1882

1883
    inode_ctx_del(inode, this, &ctxint);
1884
    if (!ctxint) {
1885
        return 0;
1886
    }
1887

1888
    ctx = (cs_inode_ctx_t *)(uintptr_t)ctxint;
1889

1890
    GF_FREE(ctx);
1891
    return 0;
1892
}
1893

1894
int
1895
cs_resume_postprocess(xlator_t *this, call_frame_t *frame, inode_t *inode)
1896
{
1897
    cs_local_t *local = NULL;
1898
    gf_cs_obj_state state = -1;
1899
    cs_inode_ctx_t *ctx = NULL;
1900
    int ret = 0;
1901

1902
    local = frame->local;
1903
    if (!local) {
1904
        ret = -1;
1905
        goto out;
1906
    }
1907

1908
    __cs_inode_ctx_get(this, inode, &ctx);
1909

1910
    state = __cs_get_file_state(inode, ctx);
1911
    if (state == GF_CS_ERROR) {
1912
        gf_msg(this->name, GF_LOG_ERROR, 0, 0,
1913
               "status is GF_CS_ERROR."
1914
               " Aborting write");
1915
        local->op_ret = -1;
1916
        local->op_errno = EREMOTE;
1917
        ret = -1;
1918
        goto out;
1919
    }
1920

1921
    if (state == GF_CS_REMOTE || state == GF_CS_DOWNLOADING) {
1922
        gf_msg_debug(this->name, 0, "status is %d", state);
1923
        ret = cs_download(frame);
1924
        if (ret == 0) {
1925
            gf_msg_debug(this->name, 0, "Winding for Final Write");
1926
        } else {
1927
            gf_msg(this->name, GF_LOG_ERROR, 0, 0,
1928
                   " download failed, unwinding writev");
1929
            local->op_ret = -1;
1930
            local->op_errno = EREMOTE;
1931
            ret = -1;
1932
        }
1933
    }
1934
out:
1935
    return ret;
1936
}
1937

1938
int32_t
1939
cs_fdctx_to_dict(xlator_t *this, fd_t *fd, dict_t *dict)
1940
{
1941
    return 0;
1942
}
1943

1944
int32_t
1945
cs_inode(xlator_t *this)
1946
{
1947
    return 0;
1948
}
1949

1950
int32_t
1951
cs_inode_to_dict(xlator_t *this, dict_t *dict)
1952
{
1953
    return 0;
1954
}
1955

1956
int32_t
1957
cs_history(xlator_t *this)
1958
{
1959
    return 0;
1960
}
1961

1962
int32_t
1963
cs_fd(xlator_t *this)
1964
{
1965
    return 0;
1966
}
1967

1968
int32_t
1969
cs_fd_to_dict(xlator_t *this, dict_t *dict)
1970
{
1971
    return 0;
1972
}
1973

1974
int32_t
1975
cs_fdctx(xlator_t *this, fd_t *fd)
1976
{
1977
    return 0;
1978
}
1979

1980
int32_t
1981
cs_inodectx(xlator_t *this, inode_t *ino)
1982
{
1983
    return 0;
1984
}
1985

1986
int32_t
1987
cs_inodectx_to_dict(xlator_t *this, inode_t *ino, dict_t *dict)
1988
{
1989
    return 0;
1990
}
1991

1992
int32_t
1993
cs_priv_to_dict(xlator_t *this, dict_t *dict, char *brickname)
1994
{
1995
    return 0;
1996
}
1997

1998
int32_t
1999
cs_priv(xlator_t *this)
2000
{
2001
    return 0;
2002
}
2003

2004
int
2005
cs_notify(xlator_t *this, int event, void *data, ...)
2006
{
2007
    return default_notify(this, event, data);
2008
}
2009

2010
struct xlator_fops cs_fops = {
2011
    .stat = cs_stat,
2012
    .readdirp = cs_readdirp,
2013
    .truncate = cs_truncate,
2014
    .seek = cs_seek,
2015
    .statfs = cs_statfs,
2016
    .fallocate = cs_fallocate,
2017
    .discard = cs_discard,
2018
    .getxattr = cs_getxattr,
2019
    .writev = cs_writev,
2020
    .setxattr = cs_setxattr,
2021
    .fgetxattr = cs_fgetxattr,
2022
    .lookup = cs_lookup,
2023
    .fsetxattr = cs_fsetxattr,
2024
    .readv = cs_readv,
2025
    .ftruncate = cs_ftruncate,
2026
    .rchecksum = cs_rchecksum,
2027
    .unlink = cs_unlink,
2028
    .open = cs_open,
2029
    .fstat = cs_fstat,
2030
    .zerofill = cs_zerofill,
2031
};
2032

2033
struct xlator_cbks cs_cbks = {
2034
    .forget = cs_forget,
2035
};
2036

2037
struct xlator_dumpops cs_dumpops = {
2038
    .fdctx_to_dict = cs_fdctx_to_dict,
2039
    .inode = cs_inode,
2040
    .inode_to_dict = cs_inode_to_dict,
2041
    .history = cs_history,
2042
    .fd = cs_fd,
2043
    .fd_to_dict = cs_fd_to_dict,
2044
    .fdctx = cs_fdctx,
2045
    .inodectx = cs_inodectx,
2046
    .inodectx_to_dict = cs_inodectx_to_dict,
2047
    .priv_to_dict = cs_priv_to_dict,
2048
    .priv = cs_priv,
2049
};
2050

2051
struct volume_options cs_options[] = {
2052
    {.key = {"cloudsync-storetype"},
2053
     .type = GF_OPTION_TYPE_STR,
2054
     .description = "Defines which remote store is enabled"},
2055
    {.key = {"cloudsync-remote-read"},
2056
     .type = GF_OPTION_TYPE_BOOL,
2057
     .description = "Defines a remote read fop when on"},
2058
    {.key = {"cloudsync-store-id"},
2059
     .type = GF_OPTION_TYPE_STR,
2060
     .description = "Defines a volume wide store id"},
2061
    {.key = {"cloudsync-product-id"},
2062
     .type = GF_OPTION_TYPE_STR,
2063
     .description = "Defines a volume wide product id"},
2064
    {.key = {NULL}},
2065
};
2066

2067
xlator_api_t xlator_api = {
2068
    .init = cs_init,
2069
    .fini = cs_fini,
2070
    .notify = cs_notify,
2071
    .reconfigure = cs_reconfigure,
2072
    .mem_acct_init = cs_mem_acct_init,
2073
    .dumpops = &cs_dumpops,
2074
    .fops = &cs_fops,
2075
    .cbks = &cs_cbks,
2076
    .options = cs_options,
2077
    .identifier = "cloudsync",
2078
    .category = GF_TECH_PREVIEW,
2079
};
2080

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.