glusterfs

Форк
0
5339 строк · 140.2 Кб
1
/*
2
   Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
3
   This file is part of GlusterFS.
4

5
   This file is licensed to you under your choice of the GNU Lesser
6
   General Public License, version 3 or any later version (LGPLv3 or
7
   later), or the GNU General Public License, version 2 (GPLv2), in all
8
   cases as published by the Free Software Foundation.
9
*/
10

11
#include "quota.h"
12
#include <glusterfs/statedump.h>
13
#include "quota-messages.h"
14
#include <glusterfs/events.h>
15

16
struct volume_options options[];
17

18
static int32_t
19
__quota_init_inode_ctx(inode_t *inode, xlator_t *this,
20
                       quota_inode_ctx_t **context)
21
{
22
    int32_t ret = -1;
23
    quota_inode_ctx_t *ctx = NULL;
24

25
    if (inode == NULL) {
26
        goto out;
27
    }
28

29
    QUOTA_ALLOC_OR_GOTO(ctx, quota_inode_ctx_t, out);
30

31
    LOCK_INIT(&ctx->lock);
32

33
    if (context != NULL) {
34
        *context = ctx;
35
    }
36

37
    INIT_LIST_HEAD(&ctx->parents);
38

39
    ret = __inode_ctx_put(inode, this, (uint64_t)(long)ctx);
40
    if (ret) {
41
        gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_INODE_CTX_SET_FAILED,
42
               "cannot set quota context "
43
               "in inode (gfid:%s)",
44
               uuid_utoa(inode->gfid));
45
        GF_FREE(ctx);
46
    }
47
out:
48
    return ret;
49
}
50

51
static int32_t
52
quota_inode_ctx_get(inode_t *inode, xlator_t *this, quota_inode_ctx_t **ctx,
53
                    char create_if_absent)
54
{
55
    int32_t ret = 0;
56
    uint64_t ctx_int;
57

58
    LOCK(&inode->lock);
59
    {
60
        ret = __inode_ctx_get(inode, this, &ctx_int);
61

62
        if ((ret == 0) && (ctx != NULL)) {
63
            *ctx = (quota_inode_ctx_t *)(unsigned long)ctx_int;
64
        } else if (create_if_absent) {
65
            ret = __quota_init_inode_ctx(inode, this, ctx);
66
        }
67
    }
68
    UNLOCK(&inode->lock);
69

70
    return ret;
71
}
72

73
int
74
quota_loc_fill(loc_t *loc, inode_t *inode, inode_t *parent, char *path)
75
{
76
    int ret = -1;
77

78
    if (!loc || (inode == NULL))
79
        return ret;
80

81
    if (inode) {
82
        loc->inode = inode_ref(inode);
83
        gf_uuid_copy(loc->gfid, inode->gfid);
84
    }
85

86
    if (parent) {
87
        loc->parent = inode_ref(parent);
88
    }
89

90
    if (path != NULL) {
91
        loc->path = gf_strdup(path);
92

93
        loc->name = strrchr(loc->path, '/');
94
        if (loc->name) {
95
            loc->name++;
96
        }
97
    }
98

99
    ret = 0;
100

101
    return ret;
102
}
103

104
int
105
quota_inode_loc_fill(inode_t *inode, loc_t *loc)
106
{
107
    char *resolvedpath = NULL;
108
    inode_t *parent = NULL;
109
    int ret = -1;
110
    xlator_t *this = NULL;
111

112
    if ((!inode) || (!loc)) {
113
        return ret;
114
    }
115

116
    this = THIS;
117

118
    if ((inode) && __is_root_gfid(inode->gfid)) {
119
        loc->parent = NULL;
120
        goto ignore_parent;
121
    }
122

123
    parent = inode_parent(inode, 0, NULL);
124
    if (!parent) {
125
        gf_msg_debug(this->name, 0,
126
                     "cannot find parent for "
127
                     "inode (gfid:%s)",
128
                     uuid_utoa(inode->gfid));
129
    }
130

131
ignore_parent:
132
    ret = inode_path(inode, NULL, &resolvedpath);
133
    if (ret < 0) {
134
        gf_msg_debug(this->name, 0,
135
                     "cannot construct path for "
136
                     "inode (gfid:%s)",
137
                     uuid_utoa(inode->gfid));
138
    }
139

140
    ret = quota_loc_fill(loc, inode, parent, resolvedpath);
141
    if (ret < 0) {
142
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
143
               "cannot fill loc");
144
        goto err;
145
    }
146

147
err:
148
    if (parent) {
149
        inode_unref(parent);
150
    }
151

152
    GF_FREE(resolvedpath);
153

154
    return ret;
155
}
156

157
int32_t
158
quota_local_cleanup(quota_local_t *local)
159
{
160
    if (local == NULL) {
161
        goto out;
162
    }
163

164
    loc_wipe(&local->loc);
165
    loc_wipe(&local->newloc);
166
    loc_wipe(&local->oldloc);
167
    loc_wipe(&local->validate_loc);
168

169
    inode_unref(local->inode);
170

171
    if (local->xdata)
172
        dict_unref(local->xdata);
173

174
    if (local->validate_xdata)
175
        dict_unref(local->validate_xdata);
176

177
    if (local->stub)
178
        call_stub_destroy(local->stub);
179

180
    LOCK_DESTROY(&local->lock);
181

182
    mem_put(local);
183
out:
184
    return 0;
185
}
186

187
static quota_local_t *
188
quota_local_new(void)
189
{
190
    quota_local_t *local = NULL;
191
    local = mem_get0(THIS->local_pool);
192
    if (local == NULL)
193
        goto out;
194

195
    LOCK_INIT(&local->lock);
196
    local->space_available = -1;
197

198
out:
199
    return local;
200
}
201

202
quota_dentry_t *
203
__quota_dentry_new(quota_inode_ctx_t *ctx, char *name, uuid_t par)
204
{
205
    quota_dentry_t *dentry = NULL;
206
    GF_UNUSED int32_t ret = 0;
207

208
    QUOTA_ALLOC_OR_GOTO(dentry, quota_dentry_t, err);
209

210
    INIT_LIST_HEAD(&dentry->next);
211

212
    dentry->name = gf_strdup(name);
213
    if (dentry->name == NULL) {
214
        GF_FREE(dentry);
215
        dentry = NULL;
216
        goto err;
217
    }
218

219
    gf_uuid_copy(dentry->par, par);
220

221
    if (ctx != NULL)
222
        list_add_tail(&dentry->next, &ctx->parents);
223

224
err:
225
    return dentry;
226
}
227

228
void
229
__quota_dentry_free(quota_dentry_t *dentry)
230
{
231
    if (dentry == NULL) {
232
        goto out;
233
    }
234

235
    list_del_init(&dentry->next);
236

237
    GF_FREE(dentry->name);
238
    GF_FREE(dentry);
239
out:
240
    return;
241
}
242

243
void
244
__quota_dentry_del(quota_inode_ctx_t *ctx, const char *name, uuid_t par)
245
{
246
    quota_dentry_t *dentry = NULL;
247
    quota_dentry_t *tmp = NULL;
248

249
    list_for_each_entry_safe(dentry, tmp, &ctx->parents, next)
250
    {
251
        if ((strcmp(dentry->name, name) == 0) &&
252
            (gf_uuid_compare(dentry->par, par) == 0)) {
253
            __quota_dentry_free(dentry);
254
            break;
255
        }
256
    }
257
}
258

259
void
260
quota_dentry_del(quota_inode_ctx_t *ctx, const char *name, uuid_t par)
261
{
262
    LOCK(&ctx->lock);
263
    {
264
        __quota_dentry_del(ctx, name, par);
265
    }
266
    UNLOCK(&ctx->lock);
267
}
268

269
static inode_t *
270
__quota_inode_parent(inode_t *inode, uuid_t pargfid, const char *name)
271
{
272
    inode_t *parent = NULL;
273

274
    parent = inode_parent(inode, pargfid, name);
275
    inode_unref(inode);
276
    return parent;
277
}
278

279
static inode_t *
280
quota_inode_parent(inode_t *inode, uuid_t pargfid, const char *name)
281
{
282
    inode_t *parent = NULL;
283

284
    parent = __quota_inode_parent(inode, pargfid, name);
285
    if (!parent)
286
        gf_msg_callingfn(THIS->name, GF_LOG_ERROR, 0, Q_MSG_PARENT_NULL,
287
                         "Failed to find "
288
                         "ancestor for inode (%s)",
289
                         uuid_utoa(inode->gfid));
290

291
    return parent;
292
}
293

294
int32_t
295
quota_inode_depth(inode_t *inode)
296
{
297
    int depth = 0;
298
    inode_t *cur_inode = NULL;
299

300
    cur_inode = inode_ref(inode);
301
    while (cur_inode && !__is_root_gfid(cur_inode->gfid)) {
302
        depth++;
303
        cur_inode = quota_inode_parent(cur_inode, 0, NULL);
304
        if (!cur_inode)
305
            depth = -1;
306
    }
307

308
    if (cur_inode)
309
        inode_unref(cur_inode);
310

311
    return depth;
312
}
313

314
int32_t
315
quota_find_common_ancestor(inode_t *inode1, inode_t *inode2,
316
                           uuid_t *common_ancestor)
317
{
318
    int32_t depth1 = 0;
319
    int32_t depth2 = 0;
320
    int32_t ret = -1;
321
    inode_t *cur_inode1 = NULL;
322
    inode_t *cur_inode2 = NULL;
323

324
    depth1 = quota_inode_depth(inode1);
325
    if (depth1 < 0)
326
        goto out;
327

328
    depth2 = quota_inode_depth(inode2);
329
    if (depth2 < 0)
330
        goto out;
331

332
    cur_inode1 = inode_ref(inode1);
333
    cur_inode2 = inode_ref(inode2);
334

335
    while (cur_inode1 && depth1 > depth2) {
336
        cur_inode1 = quota_inode_parent(cur_inode1, 0, NULL);
337
        depth1--;
338
    }
339

340
    while (cur_inode2 && depth2 > depth1) {
341
        cur_inode2 = quota_inode_parent(cur_inode2, 0, NULL);
342
        depth2--;
343
    }
344

345
    while (depth1 && cur_inode1 && cur_inode2 && cur_inode1 != cur_inode2) {
346
        cur_inode1 = quota_inode_parent(cur_inode1, 0, NULL);
347
        cur_inode2 = quota_inode_parent(cur_inode2, 0, NULL);
348
        depth1--;
349
    }
350

351
    if (cur_inode1 && cur_inode2) {
352
        gf_uuid_copy(*common_ancestor, cur_inode1->gfid);
353
        ret = 0;
354
    }
355
out:
356
    if (cur_inode1)
357
        inode_unref(cur_inode1);
358

359
    if (cur_inode2)
360
        inode_unref(cur_inode2);
361

362
    return ret;
363
}
364

365
void
366
check_ancestory_continue(struct list_head *parents, inode_t *inode,
367
                         int32_t op_ret, int32_t op_errno, void *data)
368
{
369
    call_frame_t *frame = NULL;
370
    quota_local_t *local = NULL;
371
    uint32_t link_count = 0;
372

373
    frame = data;
374
    local = frame->local;
375

376
    if (parents && list_empty(parents)) {
377
        gf_msg(THIS->name, GF_LOG_WARNING, EIO, Q_MSG_ANCESTRY_BUILD_FAILED,
378
               "Couldn't build ancestry for inode (gfid:%s). "
379
               "Without knowing ancestors till root, quota "
380
               "cannot be enforced. "
381
               "Hence, failing fop with EIO",
382
               uuid_utoa(inode->gfid));
383
        op_errno = EIO;
384
        op_ret = -1;
385
    }
386

387
    LOCK(&local->lock);
388
    {
389
        link_count = --local->link_count;
390
        if (op_ret < 0) {
391
            local->op_ret = op_ret;
392
            local->op_errno = op_errno;
393
        }
394
    }
395
    UNLOCK(&local->lock);
396

397
    if (link_count == 0)
398
        local->fop_continue_cbk(frame);
399
}
400

401
void
402
check_ancestory(call_frame_t *frame, inode_t *inode)
403
{
404
    inode_t *cur_inode = NULL;
405
    inode_t *parent = NULL;
406

407
    cur_inode = inode_ref(inode);
408
    while (cur_inode && !__is_root_gfid(cur_inode->gfid)) {
409
        parent = inode_parent(cur_inode, 0, NULL);
410
        if (!parent) {
411
            quota_build_ancestry(cur_inode, check_ancestory_continue, frame);
412
            inode_unref(cur_inode);
413
            return;
414
        }
415
        inode_unref(cur_inode);
416
        cur_inode = parent;
417
    }
418

419
    if (cur_inode) {
420
        inode_unref(cur_inode);
421
        check_ancestory_continue(NULL, NULL, 0, 0, frame);
422
    } else {
423
        check_ancestory_continue(NULL, NULL, -1, ESTALE, frame);
424
    }
425
}
426

427
void
428
check_ancestory_2_cbk(struct list_head *parents, inode_t *inode, int32_t op_ret,
429
                      int32_t op_errno, void *data)
430
{
431
    inode_t *this_inode = NULL;
432
    quota_inode_ctx_t *ctx = NULL;
433

434
    this_inode = data;
435

436
    if (op_ret < 0)
437
        goto out;
438

439
    if (parents == NULL || list_empty(parents)) {
440
        gf_msg(THIS->name, GF_LOG_WARNING, 0, Q_MSG_ENFORCEMENT_FAILED,
441
               "Couldn't build ancestry for inode (gfid:%s). "
442
               "Without knowing ancestors till root, quota "
443
               "cannot be enforced.",
444
               uuid_utoa(this_inode->gfid));
445
        goto out;
446
    }
447

448
    quota_inode_ctx_get(this_inode, THIS, &ctx, 0);
449
    if (ctx)
450
        ctx->ancestry_built = _gf_true;
451

452
out:
453
    inode_unref(this_inode);
454
}
455

456
void
457
check_ancestory_2(xlator_t *this, quota_local_t *local, inode_t *inode)
458
{
459
    inode_t *cur_inode = NULL;
460
    inode_t *parent = NULL;
461
    quota_inode_ctx_t *ctx = NULL;
462
    char *name = NULL;
463
    uuid_t pgfid = {0};
464

465
    name = (char *)local->loc.name;
466
    if (local->loc.parent) {
467
        gf_uuid_copy(pgfid, local->loc.parent->gfid);
468
    }
469

470
    cur_inode = inode_ref(inode);
471
    while (cur_inode && !__is_root_gfid(cur_inode->gfid)) {
472
        quota_inode_ctx_get(cur_inode, this, &ctx, 0);
473
        /* build ancestry is required only on the first lookup,
474
         * so stop crawling when the inode_ctx is set for an inode
475
         */
476
        if (ctx && ctx->ancestry_built)
477
            goto setctx;
478

479
        parent = inode_parent(cur_inode, pgfid, name);
480
        if (!parent) {
481
            quota_build_ancestry(cur_inode, check_ancestory_2_cbk,
482
                                 inode_ref(inode));
483
            goto out;
484
        }
485

486
        if (name != NULL) {
487
            name = NULL;
488
            gf_uuid_clear(pgfid);
489
        }
490

491
        inode_unref(cur_inode);
492
        cur_inode = parent;
493
    }
494

495
setctx:
496
    if (cur_inode && cur_inode != inode) {
497
        quota_inode_ctx_get(inode, this, &ctx, 0);
498
        if (ctx)
499
            ctx->ancestry_built = _gf_true;
500
    }
501
out:
502
    if (cur_inode)
503
        inode_unref(cur_inode);
504
}
505

506
static void
507
quota_link_count_decrement(call_frame_t *frame)
508
{
509
    call_frame_t *tmpframe = NULL;
510
    quota_local_t *local = NULL;
511
    call_stub_t *stub = NULL;
512
    int link_count = -1;
513

514
    local = frame->local;
515
    if (local && local->par_frame) {
516
        local = local->par_frame->local;
517
        tmpframe = frame;
518
    }
519

520
    if (local == NULL)
521
        goto out;
522

523
    LOCK(&local->lock);
524
    {
525
        link_count = --local->link_count;
526
        if (link_count == 0) {
527
            stub = local->stub;
528
            local->stub = NULL;
529
        }
530
    }
531
    UNLOCK(&local->lock);
532

533
    if (stub != NULL) {
534
        call_resume(stub);
535
    }
536

537
out:
538
    if (tmpframe) {
539
        local = tmpframe->local;
540
        tmpframe->local = NULL;
541

542
        STACK_DESTROY(frame->root);
543
        if (local)
544
            quota_local_cleanup(local);
545
    }
546

547
    return;
548
}
549

550
static void
551
quota_handle_validate_error(call_frame_t *frame, int32_t op_ret,
552
                            int32_t op_errno)
553
{
554
    quota_local_t *local;
555

556
    local = frame->local;
557
    if (local && local->par_frame)
558
        local = local->par_frame->local;
559

560
    if (local == NULL)
561
        goto out;
562

563
    if (op_ret < 0) {
564
        LOCK(&local->lock);
565
        {
566
            local->op_ret = op_ret;
567
            local->op_errno = op_errno;
568
        }
569
        UNLOCK(&local->lock);
570
    }
571
    /* we abort checking limits on this path to root */
572
    quota_link_count_decrement(frame);
573
out:
574
    return;
575
}
576

577
int32_t
578
quota_validate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
579
                   int32_t op_ret, int32_t op_errno, inode_t *inode,
580
                   struct iatt *buf, dict_t *xdata, struct iatt *postparent)
581
{
582
    quota_local_t *local = NULL;
583
    int32_t ret = 0;
584
    quota_inode_ctx_t *ctx = NULL;
585
    uint64_t value = 0;
586
    quota_meta_t size = {
587
        0,
588
    };
589

590
    local = frame->local;
591

592
    if (op_ret < 0) {
593
        goto unwind;
594
    }
595

596
    GF_ASSERT(local);
597
    GF_ASSERT(frame);
598
    GF_VALIDATE_OR_GOTO_WITH_ERROR("quota", this, unwind, op_errno, EINVAL);
599
    GF_VALIDATE_OR_GOTO_WITH_ERROR(this->name, xdata, unwind, op_errno, EINVAL);
600

601
    ret = inode_ctx_get(local->validate_loc.inode, this, &value);
602

603
    ctx = (quota_inode_ctx_t *)(unsigned long)value;
604
    if ((ret == -1) || (ctx == NULL)) {
605
        gf_msg(this->name, GF_LOG_WARNING, EINVAL, Q_MSG_INODE_CTX_GET_FAILED,
606
               "quota context is"
607
               " not present in  inode (gfid:%s)",
608
               uuid_utoa(local->validate_loc.inode->gfid));
609
        op_errno = EINVAL;
610
        goto unwind;
611
    }
612

613
    ret = quota_dict_get_meta(xdata, QUOTA_SIZE_KEY, SLEN(QUOTA_SIZE_KEY),
614
                              &size);
615
    if (ret == -1) {
616
        gf_msg(this->name, GF_LOG_WARNING, EINVAL, Q_MSG_SIZE_KEY_MISSING,
617
               "quota size key not present "
618
               "in dict");
619
        op_errno = EINVAL;
620
    }
621

622
    local->just_validated = 1; /* so that we don't go into infinite
623
                                * loop of validation and checking
624
                                * limit when timeout is zero.
625
                                */
626
    LOCK(&ctx->lock);
627
    {
628
        ctx->size = size.size;
629
        ctx->validate_time = gf_time();
630
        ctx->file_count = size.file_count;
631
        ctx->dir_count = size.dir_count;
632
    }
633
    UNLOCK(&ctx->lock);
634

635
    quota_check_limit(frame, local->validate_loc.inode, this);
636
    return 0;
637

638
unwind:
639
    quota_handle_validate_error(frame, op_ret, op_errno);
640
    return 0;
641
}
642

643
static inline gf_boolean_t
644
quota_timeout(time_t t, time_t timeout)
645
{
646
    return (gf_time() - t) >= timeout;
647
}
648

649
/* Return: 1 if new entry added
650
 *         0 no entry added
651
 *        -1 on errors
652
 */
653
static int32_t
654
quota_add_parent(struct list_head *list, char *name, uuid_t pgfid)
655
{
656
    quota_dentry_t *entry = NULL;
657
    gf_boolean_t found = _gf_false;
658
    int ret = 0;
659

660
    if (!list_empty(list)) {
661
        list_for_each_entry(entry, list, next)
662
        {
663
            if (gf_uuid_compare(pgfid, entry->par) == 0) {
664
                found = _gf_true;
665
                goto out;
666
            }
667
        }
668
    }
669

670
    entry = __quota_dentry_new(NULL, name, pgfid);
671
    if (entry)
672
        list_add_tail(&entry->next, list);
673
    else
674
        ret = -1;
675

676
out:
677
    if (found)
678
        return 0;
679
    else if (ret == 0)
680
        return 1;
681
    else
682
        return -1;
683
}
684

685
/* This function iterates the parent list in inode
686
 * context and add unique parent to the list
687
 * Returns number of dentry added to the list, or -1 on errors
688
 */
689
static int32_t
690
quota_add_parents_from_ctx(quota_inode_ctx_t *ctx, struct list_head *list)
691
{
692
    int ret = 0;
693
    quota_dentry_t *dentry = NULL;
694
    int32_t count = 0;
695

696
    if (ctx == NULL || list == NULL)
697
        goto out;
698

699
    LOCK(&ctx->lock);
700
    {
701
        list_for_each_entry(dentry, &ctx->parents, next)
702
        {
703
            ret = quota_add_parent(list, dentry->name, dentry->par);
704
            if (ret == 1)
705
                count++;
706
            else if (ret == -1)
707
                break;
708
        }
709
    }
710
    UNLOCK(&ctx->lock);
711

712
out:
713
    return (ret == -1) ? -1 : count;
714
}
715

716
int32_t
717
quota_build_ancestry_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
718
                         int32_t op_ret, int32_t op_errno, gf_dirent_t *entries,
719
                         dict_t *xdata)
720
{
721
    inode_t *parent = NULL;
722
    inode_t *tmp_parent = NULL;
723
    inode_t *linked_inode = NULL;
724
    inode_t *tmp_inode = NULL;
725
    gf_dirent_t *entry = NULL;
726
    loc_t loc = {
727
        0,
728
    };
729
    quota_dentry_t *dentry = NULL;
730
    quota_dentry_t *tmp = NULL;
731
    quota_inode_ctx_t *ctx = NULL;
732
    struct list_head parents;
733
    quota_local_t *local = NULL;
734
    int ret;
735

736
    INIT_LIST_HEAD(&parents);
737

738
    local = frame->local;
739
    frame->local = NULL;
740

741
    if (op_ret < 0)
742
        goto err;
743

744
    if ((op_ret > 0) && (entries != NULL)) {
745
        list_for_each_entry(entry, &entries->list, list)
746
        {
747
            if (__is_root_gfid(entry->inode->gfid)) {
748
                /* The list contains a sub-list for each
749
                 * possible path to the target inode. Each
750
                 * sub-list starts with the root entry of the
751
                 * tree and is followed by the child entries
752
                 * for a particular path to the target entry.
753
                 * The root entry is an implied sub-list
754
                 * delimiter, as it denotes we have started
755
                 * processing a new path. Reset the parent
756
                 * pointer and continue
757
                 */
758

759
                tmp_parent = NULL;
760
            } else {
761
                /* For a non-root entry, link this inode */
762
                linked_inode = inode_link(entry->inode, tmp_parent,
763
                                          entry->d_name, &entry->d_stat);
764
                if (linked_inode) {
765
                    tmp_inode = entry->inode;
766
                    entry->inode = linked_inode;
767
                    inode_unref(tmp_inode);
768
                } else {
769
                    gf_msg(this->name, GF_LOG_WARNING, EINVAL,
770
                           Q_MSG_PARENT_NULL, "inode link failed");
771
                    op_errno = EINVAL;
772
                    goto err;
773
                }
774
            }
775

776
            gf_uuid_copy(loc.gfid, entry->d_stat.ia_gfid);
777

778
            loc.inode = inode_ref(entry->inode);
779
            loc.parent = inode_ref(tmp_parent);
780
            loc.name = entry->d_name;
781

782
            quota_fill_inodectx(this, entry->inode, entry->dict, &loc,
783
                                &entry->d_stat, &op_errno);
784

785
            /* For non-directory, posix_get_ancestry_non_directory
786
             * returns all hard-links that are represented by nodes
787
             * adjacent to each other in the dentry-list.
788
             * (Unlike the directory case where adjacent nodes
789
             * either have a parent/child relationship or belong to
790
             * different paths).
791
             */
792
            if (entry->inode->ia_type == IA_IFDIR)
793
                tmp_parent = entry->inode;
794

795
            loc_wipe(&loc);
796
        }
797
    }
798

799
    parent = inode_parent(local->loc.inode, 0, NULL);
800
    if (parent == NULL) {
801
        gf_msg(this->name, GF_LOG_WARNING, EINVAL, Q_MSG_PARENT_NULL,
802
               "parent is NULL");
803
        op_errno = EINVAL;
804
        goto err;
805
    }
806

807
    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);
808

809
    ret = quota_add_parents_from_ctx(ctx, &parents);
810
    if (ret == -1) {
811
        op_errno = errno;
812
        goto err;
813
    }
814

815
    if (list_empty(&parents)) {
816
        /* we built ancestry for a directory */
817
        list_for_each_entry(entry, &entries->list, list)
818
        {
819
            if (entry->inode == local->loc.inode)
820
                break;
821
        }
822

823
        /* Getting assertion here, need to investigate
824
           comment for now
825
           GF_ASSERT (&entry->list != &entries->list);
826
        */
827

828
        ret = quota_add_parent(&parents, entry->d_name, parent->gfid);
829
        if (ret == -1) {
830
            op_errno = errno;
831
            goto err;
832
        }
833
    }
834

835
    local->ancestry_cbk(&parents, local->loc.inode, 0, 0, local->ancestry_data);
836
    goto cleanup;
837

838
err:
839
    local->ancestry_cbk(NULL, NULL, -1, op_errno, local->ancestry_data);
840

841
cleanup:
842
    STACK_DESTROY(frame->root);
843
    quota_local_cleanup(local);
844

845
    if (parent != NULL) {
846
        inode_unref(parent);
847
        parent = NULL;
848
    }
849

850
    if (!list_empty(&parents)) {
851
        list_for_each_entry_safe(dentry, tmp, &parents, next)
852
        {
853
            __quota_dentry_free(dentry);
854
        }
855
    }
856

857
    return 0;
858
}
859

860
int
861
quota_build_ancestry(inode_t *inode, quota_ancestry_built_t ancestry_cbk,
862
                     void *data)
863
{
864
    fd_t *fd = NULL;
865
    quota_local_t *local = NULL;
866
    call_frame_t *new_frame = NULL;
867
    int op_errno = ENOMEM;
868
    int op_ret = -1;
869
    xlator_t *this = NULL;
870
    dict_t *xdata_req = NULL;
871

872
    this = THIS;
873

874
    xdata_req = dict_new();
875
    if (xdata_req == NULL)
876
        goto err;
877

878
    fd = fd_anonymous(inode);
879
    if (fd == NULL)
880
        goto err;
881

882
    new_frame = create_frame(this, this->ctx->pool);
883
    if (new_frame == NULL)
884
        goto err;
885

886
    local = quota_local_new();
887
    if (local == NULL)
888
        goto err;
889

890
    new_frame->root->uid = new_frame->root->gid = 0;
891
    new_frame->local = local;
892
    local->ancestry_cbk = ancestry_cbk;
893
    local->ancestry_data = data;
894
    local->loc.inode = inode_ref(inode);
895

896
    op_ret = dict_set_int8(xdata_req, QUOTA_LIMIT_KEY, 1);
897
    if (op_ret < 0) {
898
        op_errno = -op_ret;
899
        goto err;
900
    }
901

902
    op_ret = dict_set_int8(xdata_req, QUOTA_LIMIT_OBJECTS_KEY, 1);
903
    if (op_ret < 0) {
904
        op_errno = -op_ret;
905
        goto err;
906
    }
907

908
    op_ret = dict_set_int8(xdata_req, GET_ANCESTRY_DENTRY_KEY, 1);
909
    if (op_ret < 0) {
910
        op_errno = -op_ret;
911
        goto err;
912
    }
913

914
    /* This would ask posix layer to construct dentry chain till root
915
     * We don't need to do a opendir, we can use the anonymous fd
916
     * here for  the readidrp.
917
     * avoiding opendir also reduces the window size where another FOP
918
     * can be executed before completion of build ancestry
919
     */
920
    STACK_WIND(new_frame, quota_build_ancestry_cbk, FIRST_CHILD(this),
921
               FIRST_CHILD(this)->fops->readdirp, fd, 0, 0, xdata_req);
922

923
    op_ret = 0;
924

925
err:
926
    if (fd)
927
        fd_unref(fd);
928

929
    if (xdata_req)
930
        dict_unref(xdata_req);
931

932
    if (op_ret < 0) {
933
        ancestry_cbk(NULL, NULL, -1, op_errno, data);
934

935
        if (new_frame) {
936
            local = new_frame->local;
937
            new_frame->local = NULL;
938
            STACK_DESTROY(new_frame->root);
939
        }
940

941
        if (local)
942
            quota_local_cleanup(local);
943
    }
944

945
    return 0;
946
}
947

948
int
949
quota_validate(call_frame_t *frame, inode_t *inode, xlator_t *this,
950
               fop_lookup_cbk_t cbk_fn)
951
{
952
    quota_local_t *local = NULL;
953
    int ret = 0;
954
    dict_t *xdata = NULL;
955
    quota_priv_t *priv = NULL;
956

957
    local = frame->local;
958
    priv = this->private;
959

960
    LOCK(&local->lock);
961
    {
962
        loc_wipe(&local->validate_loc);
963

964
        ret = quota_inode_loc_fill(inode, &local->validate_loc);
965
        if (ret < 0) {
966
            gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENFORCEMENT_FAILED,
967
                   "cannot fill loc for inode (gfid:%s), hence "
968
                   "aborting quota-checks and continuing with fop",
969
                   uuid_utoa(inode->gfid));
970
        }
971
    }
972
    UNLOCK(&local->lock);
973

974
    if (ret < 0) {
975
        ret = -ENOMEM;
976
        goto err;
977
    }
978

979
    xdata = dict_new();
980
    if (xdata == NULL) {
981
        ret = -ENOMEM;
982
        goto err;
983
    }
984

985
    ret = dict_set_int8(xdata, QUOTA_SIZE_KEY, 1);
986
    if (ret < 0) {
987
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
988
               "dict set failed");
989
        ret = -ENOMEM;
990
        goto err;
991
    }
992

993
    ret = dict_set_str(xdata, "volume-uuid", priv->volume_uuid);
994
    if (ret < 0) {
995
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
996
               "dict set failed");
997
        ret = -ENOMEM;
998
        goto err;
999
    }
1000

1001
    ret = quota_enforcer_lookup(frame, this, xdata, cbk_fn);
1002
    if (ret < 0) {
1003
        ret = -ENOTCONN;
1004
        goto err;
1005
    }
1006

1007
    ret = 0;
1008
err:
1009
    if (xdata)
1010
        dict_unref(xdata);
1011

1012
    return ret;
1013
}
1014

1015
void
1016
quota_check_limit_continuation(struct list_head *parents, inode_t *inode,
1017
                               int32_t op_ret, int32_t op_errno, void *data)
1018
{
1019
    call_frame_t *frame = NULL;
1020
    xlator_t *this = NULL;
1021
    quota_local_t *local = NULL;
1022
    quota_local_t *par_local = NULL;
1023
    quota_dentry_t *entry = NULL;
1024
    inode_t *parent = NULL;
1025
    int parent_count = 0;
1026

1027
    frame = data;
1028
    local = frame->local;
1029
    this = THIS;
1030

1031
    if (local->par_frame)
1032
        par_local = local->par_frame->local;
1033
    else
1034
        par_local = local;
1035

1036
    if ((op_ret < 0) || list_empty(parents)) {
1037
        if (op_ret >= 0) {
1038
            gf_msg(this->name, GF_LOG_WARNING, EIO, Q_MSG_ANCESTRY_BUILD_FAILED,
1039
                   "Couldn't build ancestry for inode (gfid:%s). "
1040
                   "Without knowing ancestors till root, quota"
1041
                   "cannot be enforced. "
1042
                   "Hence, failing fop with EIO",
1043
                   uuid_utoa(inode->gfid));
1044
            op_errno = EIO;
1045
        }
1046

1047
        quota_handle_validate_error(frame, -1, op_errno);
1048
        goto out;
1049
    }
1050

1051
    list_for_each_entry(entry, parents, next)
1052
    {
1053
        parent_count++;
1054
    }
1055

1056
    LOCK(&par_local->lock);
1057
    {
1058
        par_local->link_count += (parent_count - 1);
1059
    }
1060
    UNLOCK(&par_local->lock);
1061

1062
    if (local->par_frame) {
1063
        list_for_each_entry(entry, parents, next)
1064
        {
1065
            parent = inode_find(inode->table, entry->par);
1066
            quota_check_limit(frame, parent, this);
1067
            inode_unref(parent);
1068
        }
1069
    } else {
1070
        list_for_each_entry(entry, parents, next)
1071
        {
1072
            parent = do_quota_check_limit(frame, inode, this, entry, _gf_true);
1073
            if (parent)
1074
                inode_unref(parent);
1075
            else
1076
                quota_link_count_decrement(frame);
1077
        }
1078
    }
1079

1080
out:
1081
    return;
1082
}
1083

1084
int32_t
1085
quota_check_object_limit(call_frame_t *frame, quota_inode_ctx_t *ctx,
1086
                         quota_priv_t *priv, inode_t *_inode, xlator_t *this,
1087
                         int32_t *op_errno, int just_validated,
1088
                         quota_local_t *local, gf_boolean_t *skip_check)
1089
{
1090
    int32_t ret = -1;
1091
    uint64_t timeout = 0;
1092
    char need_validate = 0;
1093
    gf_boolean_t hard_limit_exceeded = 0;
1094
    int64_t object_aggr_count = 0;
1095

1096
    GF_ASSERT(frame);
1097
    GF_ASSERT(priv);
1098
    GF_ASSERT(_inode);
1099
    GF_ASSERT(this);
1100
    GF_ASSERT(local);
1101

1102
    if (ctx != NULL && (ctx->object_hard_lim > 0 || ctx->object_soft_lim)) {
1103
        LOCK(&ctx->lock);
1104
        {
1105
            timeout = priv->soft_timeout;
1106

1107
            object_aggr_count = ctx->file_count + ctx->dir_count + 1;
1108
            if (((ctx->object_soft_lim >= 0) &&
1109
                 (object_aggr_count) > ctx->object_soft_lim)) {
1110
                timeout = priv->hard_timeout;
1111
            }
1112

1113
            if (!just_validated && quota_timeout(ctx->validate_time, timeout)) {
1114
                need_validate = 1;
1115
            } else if ((object_aggr_count) > ctx->object_hard_lim) {
1116
                hard_limit_exceeded = 1;
1117
            }
1118
        }
1119
        UNLOCK(&ctx->lock);
1120

1121
        if (need_validate && *skip_check != _gf_true) {
1122
            *skip_check = _gf_true;
1123
            ret = quota_validate(frame, _inode, this, quota_validate_cbk);
1124
            if (ret < 0) {
1125
                *op_errno = -ret;
1126
                *skip_check = _gf_false;
1127
            }
1128
            goto out;
1129
        }
1130

1131
        if (hard_limit_exceeded) {
1132
            local->op_ret = -1;
1133
            local->op_errno = EDQUOT;
1134
            *op_errno = EDQUOT;
1135
            goto out;
1136
        }
1137

1138
        /*We log usage only if quota limit is configured on
1139
           that inode
1140
        */
1141
        quota_log_usage(this, ctx, _inode, 0);
1142
    }
1143

1144
    ret = 0;
1145

1146
out:
1147
    return ret;
1148
}
1149

1150
int32_t
1151
quota_check_size_limit(call_frame_t *frame, quota_inode_ctx_t *ctx,
1152
                       quota_priv_t *priv, inode_t *_inode, xlator_t *this,
1153
                       int32_t *op_errno, int just_validated, int64_t delta,
1154
                       quota_local_t *local, gf_boolean_t *skip_check)
1155
{
1156
    int32_t ret = -1;
1157
    uint64_t timeout = 0;
1158
    char need_validate = 0;
1159
    gf_boolean_t hard_limit_exceeded = 0;
1160
    int64_t space_available = 0;
1161
    int64_t wouldbe_size = 0;
1162

1163
    GF_ASSERT(frame);
1164
    GF_ASSERT(priv);
1165
    GF_ASSERT(_inode);
1166
    GF_ASSERT(this);
1167
    GF_ASSERT(local);
1168

1169
    if (ctx != NULL && (ctx->hard_lim > 0 || ctx->soft_lim > 0)) {
1170
        wouldbe_size = ctx->size + delta;
1171

1172
        LOCK(&ctx->lock);
1173
        {
1174
            timeout = priv->soft_timeout;
1175

1176
            if ((ctx->soft_lim >= 0) && (wouldbe_size > ctx->soft_lim)) {
1177
                timeout = priv->hard_timeout;
1178
            }
1179

1180
            if (!just_validated && quota_timeout(ctx->validate_time, timeout)) {
1181
                need_validate = 1;
1182
            } else if (wouldbe_size >= ctx->hard_lim) {
1183
                hard_limit_exceeded = 1;
1184
            }
1185
        }
1186
        UNLOCK(&ctx->lock);
1187

1188
        if (need_validate && *skip_check != _gf_true) {
1189
            *skip_check = _gf_true;
1190
            ret = quota_validate(frame, _inode, this, quota_validate_cbk);
1191
            if (ret < 0) {
1192
                *op_errno = -ret;
1193
                *skip_check = _gf_false;
1194
            }
1195
            goto out;
1196
        }
1197

1198
        if (hard_limit_exceeded) {
1199
            local->op_ret = -1;
1200
            local->op_errno = EDQUOT;
1201

1202
            space_available = ctx->hard_lim - ctx->size;
1203

1204
            if (space_available < 0)
1205
                space_available = 0;
1206

1207
            if ((local->space_available < 0) ||
1208
                (local->space_available > space_available)) {
1209
                local->space_available = space_available;
1210
            }
1211

1212
            if (space_available == 0) {
1213
                *op_errno = EDQUOT;
1214
                goto out;
1215
            }
1216
        }
1217

1218
        /* We log usage only if quota limit is configured on
1219
           that inode. */
1220
        quota_log_usage(this, ctx, _inode, delta);
1221
    }
1222

1223
    ret = 0;
1224
out:
1225
    return ret;
1226
}
1227

1228
int32_t
1229
quota_check_limit(call_frame_t *frame, inode_t *inode, xlator_t *this)
1230
{
1231
    int32_t ret = -1, op_errno = EINVAL;
1232
    inode_t *_inode = NULL, *parent = NULL;
1233
    quota_inode_ctx_t *ctx = NULL;
1234
    quota_priv_t *priv = NULL;
1235
    quota_local_t *local = NULL;
1236
    quota_local_t *par_local = NULL;
1237
    char just_validated = 0;
1238
    int64_t delta = 0;
1239
    int8_t object_delta = 0;
1240
    uint64_t value = 0;
1241
    gf_boolean_t skip_check = _gf_false;
1242

1243
    GF_VALIDATE_OR_GOTO("quota", this, err);
1244
    GF_VALIDATE_OR_GOTO(this->name, frame, err);
1245
    GF_VALIDATE_OR_GOTO(this->name, inode, err);
1246

1247
    local = frame->local;
1248
    GF_VALIDATE_OR_GOTO(this->name, local, err);
1249

1250
    if (local->par_frame) {
1251
        par_local = local->par_frame->local;
1252
        GF_VALIDATE_OR_GOTO(this->name, par_local, err);
1253
    } else {
1254
        par_local = local;
1255
    }
1256

1257
    delta = par_local->delta;
1258
    object_delta = par_local->object_delta;
1259

1260
    GF_VALIDATE_OR_GOTO(this->name, par_local->stub, err);
1261
    /* Allow all the trusted clients
1262
     * Don't block the gluster internal processes like rebalance, gsyncd,
1263
     * self heal etc from the disk quotas.
1264
     *
1265
     * Method: Allow all the clients with PID negative. This is by the
1266
     * assumption that any kernel assigned pid doesn't have the negative
1267
     * number.
1268
     */
1269
    if (0 > frame->root->pid) {
1270
        ret = 0;
1271
        quota_link_count_decrement(frame);
1272
        goto done;
1273
    }
1274

1275
    priv = this->private;
1276

1277
    inode_ctx_get(inode, this, &value);
1278
    ctx = (quota_inode_ctx_t *)(unsigned long)value;
1279

1280
    _inode = inode_ref(inode);
1281

1282
    LOCK(&local->lock);
1283
    {
1284
        just_validated = local->just_validated;
1285
        local->just_validated = 0;
1286
    }
1287
    UNLOCK(&local->lock);
1288

1289
    do {
1290
        /* In a rename operation, enforce should be stopped at common
1291
           ancestor */
1292
        if (!gf_uuid_is_null(par_local->common_ancestor) &&
1293
            !gf_uuid_compare(_inode->gfid, par_local->common_ancestor)) {
1294
            quota_link_count_decrement(frame);
1295
            break;
1296
        }
1297

1298
        if (object_delta <= 0)
1299
            goto skip_check_object_limit;
1300

1301
        ret = quota_check_object_limit(frame, ctx, priv, _inode, this,
1302
                                       &op_errno, just_validated, par_local,
1303
                                       &skip_check);
1304
        if (skip_check == _gf_true)
1305
            goto done;
1306

1307
        if (ret) {
1308
            if (op_errno != EDQUOT)
1309
                gf_msg(this->name, GF_LOG_ERROR, 0, Q_MSG_ENFORCEMENT_FAILED,
1310
                       "Failed to "
1311
                       "check quota object limit");
1312
            goto err;
1313
        }
1314

1315
    skip_check_object_limit:
1316
        ret = quota_check_size_limit(frame, ctx, priv, _inode, this, &op_errno,
1317
                                     just_validated, delta, par_local,
1318
                                     &skip_check);
1319
        if (skip_check == _gf_true)
1320
            goto done;
1321

1322
        if (ret) {
1323
            if (op_errno != EDQUOT)
1324
                gf_msg(this->name, GF_LOG_ERROR, 0, Q_MSG_ENFORCEMENT_FAILED,
1325
                       "Failed to "
1326
                       "check quota size limit");
1327
            goto err;
1328
        }
1329

1330
        if (__is_root_gfid(_inode->gfid)) {
1331
            quota_link_count_decrement(frame);
1332
            break;
1333
        }
1334

1335
        parent = inode_parent(_inode, 0, NULL);
1336
        if (parent == NULL) {
1337
            ret = quota_build_ancestry(_inode, quota_check_limit_continuation,
1338
                                       frame);
1339
            if (ret < 0) {
1340
                op_errno = -ret;
1341
                goto err;
1342
            }
1343

1344
            break;
1345
        }
1346

1347
        inode_unref(_inode);
1348
        _inode = parent;
1349
        just_validated = 0;
1350

1351
        value = 0;
1352
        inode_ctx_get(_inode, this, &value);
1353
        ctx = (quota_inode_ctx_t *)(unsigned long)value;
1354
    } while (1);
1355

1356
done:
1357
    if (_inode != NULL) {
1358
        inode_unref(_inode);
1359
        _inode = NULL;
1360
    }
1361
    return 0;
1362

1363
err:
1364
    quota_handle_validate_error(frame, -1, op_errno);
1365

1366
    inode_unref(_inode);
1367
    return 0;
1368
}
1369

1370
inode_t *
1371
do_quota_check_limit(call_frame_t *frame, inode_t *inode, xlator_t *this,
1372
                     quota_dentry_t *dentry, gf_boolean_t force)
1373
{
1374
    int32_t ret = -1;
1375
    inode_t *parent = NULL;
1376
    call_frame_t *new_frame = NULL;
1377
    quota_local_t *new_local = NULL;
1378

1379
    parent = inode_parent(inode, dentry->par, dentry->name);
1380
    if (parent == NULL) {
1381
        if (force)
1382
            parent = inode_find(inode->table, dentry->par);
1383
        else
1384
            goto out;
1385
    }
1386
    if (parent == NULL)
1387
        goto out;
1388

1389
    new_frame = copy_frame(frame);
1390
    if (new_frame == NULL)
1391
        goto out;
1392

1393
    new_local = quota_local_new();
1394
    if (new_local == NULL)
1395
        goto out;
1396

1397
    new_frame->local = new_local;
1398
    new_local->par_frame = frame;
1399

1400
    quota_check_limit(new_frame, parent, this);
1401

1402
    ret = 0;
1403
out:
1404
    if (ret < 0) {
1405
        if (parent) {
1406
            /* Caller should decrement link_count, in case parent is
1407
             * NULL
1408
             */
1409
            quota_handle_validate_error(frame, -1, ENOMEM);
1410
        }
1411

1412
        if (new_frame) {
1413
            new_frame->local = NULL;
1414
            STACK_DESTROY(new_frame->root);
1415
        }
1416
    }
1417

1418
    return parent;
1419
}
1420

1421
static int
1422
quota_get_limits(xlator_t *this, dict_t *dict, int64_t *hard_lim,
1423
                 int64_t *soft_lim, int64_t *object_hard_limit,
1424
                 int64_t *object_soft_limit)
1425
{
1426
    quota_limits_t *limit = NULL;
1427
    quota_limits_t *object_limit = NULL;
1428
    quota_priv_t *priv = NULL;
1429
    int64_t soft_lim_percent = 0;
1430
    int64_t *ptr = NULL;
1431
    int ret = 0;
1432

1433
    if ((this == NULL) || (dict == NULL) || (hard_lim == NULL) ||
1434
        (soft_lim == NULL))
1435
        goto out;
1436

1437
    priv = this->private;
1438

1439
    ret = dict_get_bin(dict, QUOTA_LIMIT_KEY, (void **)&ptr);
1440
    limit = (quota_limits_t *)ptr;
1441

1442
    if (limit) {
1443
        *hard_lim = be64toh(limit->hl);
1444
        soft_lim_percent = be64toh(limit->sl);
1445
    }
1446

1447
    if (soft_lim_percent < 0) {
1448
        soft_lim_percent = priv->default_soft_lim;
1449
    }
1450

1451
    if ((*hard_lim > 0) && (soft_lim_percent > 0)) {
1452
        *soft_lim = (soft_lim_percent * (*hard_lim)) / 100;
1453
    }
1454

1455
    ret = dict_get_bin(dict, QUOTA_LIMIT_OBJECTS_KEY, (void **)&ptr);
1456
    if (ret)
1457
        return 0;
1458
    object_limit = (quota_limits_t *)ptr;
1459

1460
    if (object_limit) {
1461
        *object_hard_limit = be64toh(object_limit->hl);
1462
        soft_lim_percent = be64toh(object_limit->sl);
1463
    }
1464

1465
    if (soft_lim_percent < 0) {
1466
        soft_lim_percent = priv->default_soft_lim;
1467
    }
1468

1469
    if ((*object_hard_limit > 0) && (soft_lim_percent > 0)) {
1470
        *object_soft_limit = (soft_lim_percent * (*object_hard_limit)) / 100;
1471
    }
1472

1473
out:
1474
    return 0;
1475
}
1476

1477
int
1478
quota_fill_inodectx(xlator_t *this, inode_t *inode, dict_t *dict, loc_t *loc,
1479
                    struct iatt *buf, int32_t *op_errno)
1480
{
1481
    int32_t ret = -1;
1482
    char found = 0;
1483
    quota_inode_ctx_t *ctx = NULL;
1484
    quota_dentry_t *dentry = NULL;
1485
    uint64_t value = 0;
1486
    int64_t hard_lim = 0;
1487
    int64_t soft_lim = 0;
1488
    int64_t object_hard_limit = 0;
1489
    int64_t object_soft_limit = 0;
1490

1491
    quota_get_limits(this, dict, &hard_lim, &soft_lim, &object_hard_limit,
1492
                     &object_soft_limit);
1493

1494
    inode_ctx_get(inode, this, &value);
1495
    ctx = (quota_inode_ctx_t *)(unsigned long)value;
1496

1497
    if ((((ctx == NULL) || (ctx->hard_lim == hard_lim)) && (hard_lim < 0) &&
1498
         !QUOTA_REG_OR_LNK_FILE(buf->ia_type))) {
1499
        ret = 0;
1500
        goto out;
1501
    }
1502

1503
    ret = quota_inode_ctx_get(inode, this, &ctx, 1);
1504
    if ((ret == -1) || (ctx == NULL)) {
1505
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_INODE_CTX_GET_FAILED,
1506
               "cannot create quota "
1507
               "context in inode(gfid:%s)",
1508
               uuid_utoa(inode->gfid));
1509
        ret = -1;
1510
        *op_errno = ENOMEM;
1511
        goto out;
1512
    }
1513

1514
    LOCK(&ctx->lock);
1515
    {
1516
        ctx->hard_lim = hard_lim;
1517
        ctx->soft_lim = soft_lim;
1518
        ctx->object_hard_lim = object_hard_limit;
1519
        ctx->object_soft_lim = object_soft_limit;
1520

1521
        ctx->buf = *buf;
1522

1523
        if (!QUOTA_REG_OR_LNK_FILE(buf->ia_type)) {
1524
            goto unlock;
1525
        }
1526

1527
        /* do nothing if it is a nameless lookup */
1528
        if (loc->name == NULL || !loc->parent)
1529
            goto unlock;
1530

1531
        list_for_each_entry(dentry, &ctx->parents, next)
1532
        {
1533
            if ((strcmp(dentry->name, loc->name) == 0) &&
1534
                (gf_uuid_compare(loc->parent->gfid, dentry->par) == 0)) {
1535
                found = 1;
1536
                break;
1537
            }
1538
        }
1539

1540
        if (!found) {
1541
            dentry = __quota_dentry_new(ctx, (char *)loc->name,
1542
                                        loc->parent->gfid);
1543
            if (dentry == NULL) {
1544
                /*
1545
                  gf_msg (this->name, GF_LOG_WARNING, ENOMEM,
1546
                          Q_MSG_ENOMEM,
1547
                          "cannot create a new dentry (par:%"
1548
-                                          PRId64", name:%s) for inode(ino:%"
1549
-                                          PRId64", gfid:%s)",
1550
-                                          uuid_utoa (local->loc.inode->gfid));
1551
                */
1552
                ret = -1;
1553
                *op_errno = ENOMEM;
1554
                goto unlock;
1555
            }
1556
        }
1557
    }
1558
unlock:
1559
    UNLOCK(&ctx->lock);
1560

1561
out:
1562
    return ret;
1563
}
1564

1565
/*
1566
 * return _gf_true if enforcement is needed and _gf_false otherwise
1567
 */
1568
gf_boolean_t
1569
should_quota_enforce(xlator_t *this, dict_t *dict, glusterfs_fop_t fop)
1570
{
1571
    int ret = 0;
1572

1573
    ret = dict_check_flag(dict, GF_INTERNAL_CTX_KEY, GF_DHT_HEAL_DIR);
1574

1575
    if (fop == GF_FOP_MKDIR && ret == DICT_FLAG_SET) {
1576
        return _gf_false;
1577
    } else if (ret == -ENOENT) {
1578
        gf_msg(this->name, GF_LOG_DEBUG, EINVAL, Q_MSG_INTERNAL_FOP_KEY_MISSING,
1579
               "No internal fop context present");
1580
        goto out;
1581
    }
1582
out:
1583
    return _gf_true;
1584
}
1585

1586
int32_t
1587
quota_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
1588
                 int32_t op_ret, int32_t op_errno, inode_t *inode,
1589
                 struct iatt *buf, dict_t *dict, struct iatt *postparent)
1590
{
1591
    quota_local_t *local = NULL;
1592
    inode_t *this_inode = NULL;
1593

1594
    local = frame->local;
1595
    frame->local = NULL;
1596

1597
    if (op_ret >= 0 && inode) {
1598
        this_inode = inode_ref(inode);
1599

1600
        op_ret = quota_fill_inodectx(this, inode, dict, &local->loc, buf,
1601
                                     &op_errno);
1602
        if (op_ret < 0)
1603
            op_errno = ENOMEM;
1604
    }
1605

1606
    QUOTA_STACK_UNWIND(lookup, frame, op_ret, op_errno, inode, buf, dict,
1607
                       postparent);
1608

1609
    if (op_ret < 0 || this_inode == NULL || gf_uuid_is_null(this_inode->gfid))
1610
        goto out;
1611

1612
    check_ancestory_2(this, local, this_inode);
1613

1614
out:
1615
    if (this_inode)
1616
        inode_unref(this_inode);
1617

1618
    quota_local_cleanup(local);
1619

1620
    return 0;
1621
}
1622

1623
int32_t
1624
quota_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xattr_req)
1625
{
1626
    quota_priv_t *priv = NULL;
1627
    int32_t ret = -1;
1628
    quota_local_t *local = NULL;
1629

1630
    priv = this->private;
1631

1632
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
1633

1634
    xattr_req = xattr_req ? dict_ref(xattr_req) : dict_new();
1635
    if (!xattr_req)
1636
        goto err;
1637

1638
    local = quota_local_new();
1639
    if (local == NULL) {
1640
        goto err;
1641
    }
1642

1643
    frame->local = local;
1644
    loc_copy(&local->loc, loc);
1645

1646
    ret = dict_set_int8(xattr_req, QUOTA_LIMIT_KEY, 1);
1647
    if (ret < 0) {
1648
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
1649
               "dict set of key for "
1650
               "hard-limit failed");
1651
        goto err;
1652
    }
1653

1654
    ret = dict_set_int8(xattr_req, QUOTA_LIMIT_OBJECTS_KEY, 1);
1655
    if (ret < 0) {
1656
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
1657
               "dict set of key for quota object limit failed");
1658
        goto err;
1659
    }
1660

1661
    STACK_WIND(frame, quota_lookup_cbk, FIRST_CHILD(this),
1662
               FIRST_CHILD(this)->fops->lookup, loc, xattr_req);
1663

1664
    ret = 0;
1665

1666
err:
1667
    if (xattr_req)
1668
        dict_unref(xattr_req);
1669

1670
    if (ret < 0) {
1671
        QUOTA_STACK_UNWIND(lookup, frame, -1, ENOMEM, NULL, NULL, NULL, NULL);
1672
    }
1673

1674
    return 0;
1675

1676
off:
1677
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->lookup,
1678
                    loc, xattr_req);
1679
    return 0;
1680
}
1681

1682
int32_t
1683
quota_writev_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
1684
                 int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
1685
                 struct iatt *postbuf, dict_t *xdata)
1686
{
1687
    int32_t ret = 0;
1688
    uint64_t ctx_int = 0;
1689
    quota_inode_ctx_t *ctx = NULL;
1690
    quota_local_t *local = NULL;
1691

1692
    local = frame->local;
1693

1694
    if ((op_ret < 0) || (local == NULL) || (postbuf == NULL)) {
1695
        goto out;
1696
    }
1697

1698
    ret = inode_ctx_get(local->loc.inode, this, &ctx_int);
1699
    if (ret) {
1700
        gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_INODE_CTX_GET_FAILED,
1701
               "%s: failed to get the "
1702
               "context",
1703
               local->loc.path);
1704
        goto out;
1705
    }
1706

1707
    ctx = (quota_inode_ctx_t *)(unsigned long)ctx_int;
1708

1709
    if (ctx == NULL) {
1710
        gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_INODE_CTX_GET_FAILED,
1711
               "quota context not set in %s (gfid:%s)", local->loc.path,
1712
               uuid_utoa(local->loc.inode->gfid));
1713
        goto out;
1714
    }
1715

1716
    LOCK(&ctx->lock);
1717
    {
1718
        ctx->buf = *postbuf;
1719
    }
1720
    UNLOCK(&ctx->lock);
1721

1722
out:
1723
    QUOTA_STACK_UNWIND(writev, frame, op_ret, op_errno, prebuf, postbuf, xdata);
1724

1725
    return 0;
1726
}
1727

1728
static int gf_quota_enforcer_log;
1729

1730
int32_t
1731
quota_writev_helper(call_frame_t *frame, xlator_t *this, fd_t *fd,
1732
                    struct iovec *vector, int32_t count, off_t off,
1733
                    uint32_t flags, struct iobref *iobref, dict_t *xdata)
1734
{
1735
    quota_local_t *local = NULL;
1736
    int32_t op_errno = EINVAL;
1737
    struct iovec *new_vector = NULL;
1738
    int32_t new_count = 0;
1739

1740
    local = frame->local;
1741

1742
    GF_VALIDATE_OR_GOTO("quota", local, unwind);
1743

1744
    if (local->op_ret == -1) {
1745
        op_errno = local->op_errno;
1746

1747
        if ((op_errno == EDQUOT) && (local->space_available > 0)) {
1748
            new_count = iov_subset(vector, count, 0, local->space_available,
1749
                                   &new_vector, 0);
1750
            if (new_count < 0) {
1751
                local->op_ret = -1;
1752
                local->op_errno = ENOMEM;
1753
                goto unwind;
1754
            }
1755

1756
            vector = new_vector;
1757
            count = new_count;
1758
        } else if (op_errno == ENOENT || op_errno == ESTALE) {
1759
            /* We may get ENOENT/ESTALE in case of below scenario
1760
             *     fd = open file.txt
1761
             *     unlink file.txt
1762
             *     write on fd
1763
             * Here build_ancestry can fail as the file is removed.
1764
             * For now ignore ENOENT/ESTALE with writes on active fd
1765
             * We need to re-visit this code once we understand
1766
             * how other file-system behave in this scenario
1767
             */
1768
            gf_msg_debug(this->name, 0,
1769
                         "quota enforcer failed "
1770
                         "with ENOENT/ESTALE on %s, cannot check "
1771
                         "quota limits and allowing writes",
1772
                         uuid_utoa(fd->inode->gfid));
1773
        } else if ((op_errno == EINVAL) &&
1774
                   !inode_parent(local->loc.inode, 0, NULL)) {
1775
            /* We may get INVAL with parent == NULL,
1776
             * in case of below scenario
1777
             *     1. enable quota
1778
             *     2. glusterfsd stop/start
1779
             *     3. nameless lookup
1780
             *     4. write on fd
1781
             * Here build_ancestry can fail as the file's pgfid
1782
             * is't exist.
1783
             * For now ignore EINVAL with writes on active fd
1784
             * untils the pgfid is created at name lookup
1785
             */
1786
            GF_LOG_OCCASIONALLY(gf_quota_enforcer_log, this->name,
1787
                                GF_LOG_CRITICAL,
1788
                                "Quota cannot be enforced as "
1789
                                "parent is not available and writes are being "
1790
                                "allowed without checking whether they are "
1791
                                "within quota limits. This can happen if Quota "
1792
                                "crawl is not complete. If crawl has been "
1793
                                "completed, please file a bug.");
1794
        } else {
1795
            goto unwind;
1796
        }
1797
    }
1798

1799
    STACK_WIND(frame, quota_writev_cbk, FIRST_CHILD(this),
1800
               FIRST_CHILD(this)->fops->writev, fd, vector, count, off, flags,
1801
               iobref, xdata);
1802

1803
    if (new_vector != NULL)
1804
        GF_FREE(new_vector);
1805

1806
    return 0;
1807

1808
unwind:
1809
    QUOTA_STACK_UNWIND(writev, frame, -1, op_errno, NULL, NULL, NULL);
1810
    return 0;
1811
}
1812

1813
int32_t
1814
quota_writev(call_frame_t *frame, xlator_t *this, fd_t *fd,
1815
             struct iovec *vector, int32_t count, off_t off, uint32_t flags,
1816
             struct iobref *iobref, dict_t *xdata)
1817
{
1818
    quota_priv_t *priv = NULL;
1819
    int32_t op_errno = EINVAL;
1820
    int32_t parents = 0;
1821
    int32_t fail_count = 0;
1822
    uint64_t size = 0;
1823
    quota_local_t *local = NULL;
1824
    quota_inode_ctx_t *ctx = NULL;
1825
    quota_dentry_t *dentry = NULL, *tmp = NULL;
1826
    call_stub_t *stub = NULL;
1827
    struct list_head head;
1828
    inode_t *par_inode = NULL;
1829

1830
    priv = this->private;
1831

1832
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
1833

1834
    INIT_LIST_HEAD(&head);
1835

1836
    GF_ASSERT(frame);
1837
    GF_VALIDATE_OR_GOTO("quota", this, unwind);
1838
    GF_VALIDATE_OR_GOTO(this->name, fd, unwind);
1839

1840
    local = quota_local_new();
1841
    if (local == NULL) {
1842
        goto unwind;
1843
    }
1844

1845
    frame->local = local;
1846
    local->loc.inode = inode_ref(fd->inode);
1847

1848
    (void)quota_inode_ctx_get(fd->inode, this, &ctx, 0);
1849
    if (ctx == NULL) {
1850
        gf_msg_debug(this->name, 0,
1851
                     "quota context is NULL on inode"
1852
                     " (%s). If quota is not enabled recently and "
1853
                     "crawler has finished crawling, its an error",
1854
                     uuid_utoa(fd->inode->gfid));
1855
    }
1856

1857
    stub = fop_writev_stub(frame, quota_writev_helper, fd, vector, count, off,
1858
                           flags, iobref, xdata);
1859
    if (stub == NULL) {
1860
        op_errno = ENOMEM;
1861
        goto unwind;
1862
    }
1863

1864
    priv = this->private;
1865
    GF_VALIDATE_OR_GOTO(this->name, priv, unwind);
1866

1867
    parents = quota_add_parents_from_ctx(ctx, &head);
1868
    if (parents == -1) {
1869
        op_errno = errno;
1870
        goto unwind;
1871
    }
1872

1873
    size = iov_length(vector, count);
1874

1875
    LOCK(&local->lock);
1876
    {
1877
        local->delta = size;
1878
        local->object_delta = 0;
1879
        local->link_count = (parents != 0) ? parents : 1;
1880
        local->stub = stub;
1881
    }
1882
    UNLOCK(&local->lock);
1883

1884
    if (parents == 0) {
1885
        /* nameless lookup on this inode, allow quota to reconstruct
1886
         * ancestry as part of check_limit.
1887
         */
1888
        quota_check_limit(frame, fd->inode, this);
1889
    } else {
1890
        list_for_each_entry_safe(dentry, tmp, &head, next)
1891
        {
1892
            par_inode = do_quota_check_limit(frame, fd->inode, this, dentry,
1893
                                             _gf_false);
1894
            if (par_inode == NULL) {
1895
                if (ctx) {
1896
                    /* remove stale entry from inode ctx */
1897
                    quota_dentry_del(ctx, dentry->name, dentry->par);
1898
                    parents--;
1899
                    fail_count++;
1900
                }
1901
            } else {
1902
                inode_unref(par_inode);
1903
            }
1904
            __quota_dentry_free(dentry);
1905
        }
1906

1907
        if (parents == 0) {
1908
            LOCK(&local->lock);
1909
            {
1910
                local->link_count++;
1911
            }
1912
            UNLOCK(&local->lock);
1913
            quota_check_limit(frame, fd->inode, this);
1914
        }
1915

1916
        while (fail_count != 0) {
1917
            quota_link_count_decrement(frame);
1918
            fail_count--;
1919
        }
1920
    }
1921

1922
    return 0;
1923

1924
unwind:
1925
    QUOTA_STACK_UNWIND(writev, frame, -1, op_errno, NULL, NULL, NULL);
1926
    return 0;
1927

1928
off:
1929
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->writev,
1930
                    fd, vector, count, off, flags, iobref, xdata);
1931
    return 0;
1932
}
1933

1934
int32_t
1935
quota_mkdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
1936
                int32_t op_ret, int32_t op_errno, inode_t *inode,
1937
                struct iatt *buf, struct iatt *preparent,
1938
                struct iatt *postparent, dict_t *xdata)
1939
{
1940
    QUOTA_STACK_UNWIND(mkdir, frame, op_ret, op_errno, inode, buf, preparent,
1941
                       postparent, xdata);
1942
    return 0;
1943
}
1944

1945
int32_t
1946
quota_mkdir_helper(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
1947
                   mode_t umask, dict_t *xdata)
1948
{
1949
    quota_local_t *local = NULL;
1950
    int32_t op_errno = EINVAL;
1951

1952
    local = frame->local;
1953

1954
    GF_VALIDATE_OR_GOTO("quota", local, unwind);
1955

1956
    op_errno = local->op_errno;
1957

1958
    if (local->op_ret == -1) {
1959
        goto unwind;
1960
    }
1961

1962
    STACK_WIND(frame, quota_mkdir_cbk, FIRST_CHILD(this),
1963
               FIRST_CHILD(this)->fops->mkdir, loc, mode, umask, xdata);
1964

1965
    return 0;
1966

1967
unwind:
1968
    QUOTA_STACK_UNWIND(mkdir, frame, -1, op_errno, NULL, NULL, NULL, NULL,
1969
                       NULL);
1970
    return 0;
1971
}
1972

1973
int32_t
1974
quota_mkdir(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
1975
            mode_t umask, dict_t *xdata)
1976
{
1977
    quota_priv_t *priv = NULL;
1978
    int32_t ret = 0, op_errno = 0;
1979
    quota_local_t *local = NULL;
1980
    call_stub_t *stub = NULL;
1981

1982
    priv = this->private;
1983
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
1984

1985
    if (!should_quota_enforce(this, xdata, GF_FOP_MKDIR)) {
1986
        gf_msg(this->name, GF_LOG_DEBUG, 0, Q_MSG_ENFORCEMENT_SKIPPED,
1987
               "Enforcement has been skipped(internal fop).");
1988
        goto off;
1989
    }
1990

1991
    local = quota_local_new();
1992
    if (local == NULL) {
1993
        op_errno = ENOMEM;
1994
        goto err;
1995
    }
1996

1997
    frame->local = local;
1998

1999
    ret = loc_copy(&local->loc, loc);
2000
    if (ret) {
2001
        op_errno = ENOMEM;
2002
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
2003
               "loc_copy failed");
2004
        goto err;
2005
    }
2006

2007
    stub = fop_mkdir_stub(frame, quota_mkdir_helper, loc, mode, umask, xdata);
2008
    if (stub == NULL) {
2009
        op_errno = ENOMEM;
2010
        goto err;
2011
    }
2012

2013
    LOCK(&local->lock);
2014
    {
2015
        local->stub = stub;
2016
        local->delta = 0;
2017
        local->object_delta = 1;
2018
        local->link_count = 1;
2019
    }
2020
    UNLOCK(&local->lock);
2021

2022
    quota_check_limit(frame, loc->parent, this);
2023
    return 0;
2024

2025
err:
2026
    QUOTA_STACK_UNWIND(mkdir, frame, -1, op_errno, NULL, NULL, NULL, NULL,
2027
                       NULL);
2028

2029
    return 0;
2030

2031
off:
2032
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->mkdir,
2033
                    loc, mode, umask, xdata);
2034

2035
    return 0;
2036
}
2037

2038
int32_t
2039
quota_create_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
2040
                 int32_t op_ret, int32_t op_errno, fd_t *fd, inode_t *inode,
2041
                 struct iatt *buf, struct iatt *preparent,
2042
                 struct iatt *postparent, dict_t *xdata)
2043
{
2044
    int32_t ret = -1;
2045
    quota_local_t *local = NULL;
2046
    quota_inode_ctx_t *ctx = NULL;
2047
    quota_dentry_t *dentry = NULL;
2048

2049
    local = frame->local;
2050
    if (op_ret < 0) {
2051
        goto unwind;
2052
    }
2053

2054
    ret = quota_inode_ctx_get(inode, this, &ctx, 1);
2055
    if ((ret == -1) || (ctx == NULL)) {
2056
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_INODE_CTX_GET_FAILED,
2057
               "cannot create quota "
2058
               "context in inode(gfid:%s)",
2059
               uuid_utoa(inode->gfid));
2060
        op_ret = -1;
2061
        op_errno = ENOMEM;
2062
        goto unwind;
2063
    }
2064

2065
    LOCK(&ctx->lock);
2066
    {
2067
        ctx->buf = *buf;
2068

2069
        dentry = __quota_dentry_new(ctx, (char *)local->loc.name,
2070
                                    local->loc.parent->gfid);
2071
        if (dentry == NULL) {
2072
            gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
2073
                   "cannot create a new dentry "
2074
                   "(name:%s) for inode(gfid:%s)",
2075
                   local->loc.name, uuid_utoa(local->loc.inode->gfid));
2076
            op_ret = -1;
2077
            op_errno = ENOMEM;
2078
            goto unlock;
2079
        }
2080
    }
2081
unlock:
2082
    UNLOCK(&ctx->lock);
2083

2084
unwind:
2085
    QUOTA_STACK_UNWIND(create, frame, op_ret, op_errno, fd, inode, buf,
2086
                       preparent, postparent, xdata);
2087
    return 0;
2088
}
2089

2090
int32_t
2091
quota_create_helper(call_frame_t *frame, xlator_t *this, loc_t *loc,
2092
                    int32_t flags, mode_t mode, mode_t umask, fd_t *fd,
2093
                    dict_t *xdata)
2094
{
2095
    quota_local_t *local = NULL;
2096
    int32_t op_errno = EINVAL;
2097

2098
    local = frame->local;
2099

2100
    GF_VALIDATE_OR_GOTO("quota", local, unwind);
2101

2102
    if (local->op_ret == -1) {
2103
        op_errno = local->op_errno;
2104
        goto unwind;
2105
    }
2106

2107
    STACK_WIND(frame, quota_create_cbk, FIRST_CHILD(this),
2108
               FIRST_CHILD(this)->fops->create, loc, flags, mode, umask, fd,
2109
               xdata);
2110
    return 0;
2111

2112
unwind:
2113
    QUOTA_STACK_UNWIND(create, frame, -1, op_errno, NULL, NULL, NULL, NULL,
2114
                       NULL, NULL);
2115
    return 0;
2116
}
2117

2118
int32_t
2119
quota_create(call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
2120
             mode_t mode, mode_t umask, fd_t *fd, dict_t *xdata)
2121
{
2122
    quota_priv_t *priv = NULL;
2123
    int32_t ret = -1;
2124
    quota_local_t *local = NULL;
2125
    int32_t op_errno = 0;
2126
    call_stub_t *stub = NULL;
2127

2128
    priv = this->private;
2129

2130
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
2131
    QUOTA_WIND_FOR_INTERNAL_FOP(xdata, off);
2132

2133
    local = quota_local_new();
2134
    if (local == NULL) {
2135
        op_errno = ENOMEM;
2136
        goto err;
2137
    }
2138

2139
    frame->local = local;
2140

2141
    ret = loc_copy(&local->loc, loc);
2142
    if (ret) {
2143
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
2144
               "loc_copy failed");
2145
        op_errno = ENOMEM;
2146
        goto err;
2147
    }
2148

2149
    stub = fop_create_stub(frame, quota_create_helper, loc, flags, mode, umask,
2150
                           fd, xdata);
2151
    if (stub == NULL) {
2152
        goto err;
2153
    }
2154

2155
    LOCK(&local->lock);
2156
    {
2157
        local->link_count = 1;
2158
        local->stub = stub;
2159
        local->delta = 0;
2160
        local->object_delta = 1;
2161
    }
2162
    UNLOCK(&local->lock);
2163

2164
    quota_check_limit(frame, loc->parent, this);
2165
    return 0;
2166
err:
2167
    QUOTA_STACK_UNWIND(create, frame, -1, op_errno, NULL, NULL, NULL, NULL,
2168
                       NULL, NULL);
2169

2170
    return 0;
2171

2172
off:
2173
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->create,
2174
                    loc, flags, mode, umask, fd, xdata);
2175
    return 0;
2176
}
2177

2178
int32_t
2179
quota_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
2180
                 int32_t op_ret, int32_t op_errno, struct iatt *preparent,
2181
                 struct iatt *postparent, dict_t *xdata)
2182
{
2183
    quota_local_t *local = NULL;
2184
    quota_inode_ctx_t *ctx = NULL;
2185
    uint64_t value = 0;
2186

2187
    if (op_ret < 0) {
2188
        goto out;
2189
    }
2190

2191
    local = (quota_local_t *)frame->local;
2192

2193
    inode_ctx_get(local->loc.inode, this, &value);
2194
    ctx = (quota_inode_ctx_t *)(unsigned long)value;
2195

2196
    if (ctx == NULL) {
2197
        gf_msg(this->name, GF_LOG_INFO, EINVAL, Q_MSG_INODE_CTX_GET_FAILED,
2198
               "quota context not set inode (gfid:%s)",
2199
               uuid_utoa(local->loc.gfid));
2200
        goto out;
2201
    }
2202

2203
    quota_dentry_del(ctx, local->loc.name, local->loc.parent->gfid);
2204

2205
out:
2206
    QUOTA_STACK_UNWIND(unlink, frame, op_ret, op_errno, preparent, postparent,
2207
                       xdata);
2208
    return 0;
2209
}
2210

2211
int32_t
2212
quota_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,
2213
             dict_t *xdata)
2214
{
2215
    quota_priv_t *priv = NULL;
2216
    int32_t ret = -1;
2217
    quota_local_t *local = NULL;
2218

2219
    priv = this->private;
2220

2221
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
2222

2223
    local = quota_local_new();
2224
    if (local == NULL) {
2225
        goto err;
2226
    }
2227

2228
    frame->local = local;
2229

2230
    ret = loc_copy(&local->loc, loc);
2231
    if (ret) {
2232
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
2233
               "loc_copy failed");
2234
        goto err;
2235
    }
2236

2237
    STACK_WIND(frame, quota_unlink_cbk, FIRST_CHILD(this),
2238
               FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata);
2239

2240
    ret = 0;
2241

2242
err:
2243
    if (ret == -1) {
2244
        QUOTA_STACK_UNWIND(unlink, frame, -1, 0, NULL, NULL, NULL);
2245
    }
2246

2247
    return 0;
2248

2249
off:
2250
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->unlink,
2251
                    loc, xflag, xdata);
2252
    return 0;
2253
}
2254

2255
int32_t
2256
quota_link_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
2257
               int32_t op_ret, int32_t op_errno, inode_t *inode,
2258
               struct iatt *buf, struct iatt *preparent,
2259
               struct iatt *postparent, dict_t *xdata)
2260
{
2261
    int32_t ret = -1;
2262
    quota_local_t *local = NULL;
2263
    quota_inode_ctx_t *ctx = NULL;
2264
    quota_dentry_t *dentry = NULL;
2265
    char found = 0;
2266

2267
    if (op_ret < 0) {
2268
        goto out;
2269
    }
2270

2271
    local = (quota_local_t *)frame->local;
2272

2273
    ret = quota_inode_ctx_get(inode, this, &ctx, 0);
2274
    if ((ret == -1) || (ctx == NULL)) {
2275
        gf_msg_debug(this->name, 0,
2276
                     "quota context is NULL on inode"
2277
                     " (%s). If quota is not enabled recently and "
2278
                     "crawler has finished crawling, its an error",
2279
                     uuid_utoa(inode->gfid));
2280
        goto out;
2281
    }
2282

2283
    LOCK(&ctx->lock);
2284
    {
2285
        list_for_each_entry(dentry, &ctx->parents, next)
2286
        {
2287
            if ((strcmp(dentry->name, local->loc.name) == 0) &&
2288
                (gf_uuid_compare(local->loc.parent->gfid, dentry->par) == 0)) {
2289
                found = 1;
2290

2291
                gf_msg_debug(this->name, 0,
2292
                             "new entry being"
2293
                             " linked (name:%s) for inode "
2294
                             "(gfid:%s) is already present "
2295
                             "in inode-dentry-list",
2296
                             dentry->name, uuid_utoa(local->loc.inode->gfid));
2297
                break;
2298
            }
2299
        }
2300

2301
        if (!found) {
2302
            dentry = __quota_dentry_new(ctx, (char *)local->loc.name,
2303
                                        local->loc.parent->gfid);
2304
            if (dentry == NULL) {
2305
                gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
2306
                       "cannot create a new dentry (name:%s)"
2307
                       "for inode(gfid:%s)",
2308
                       local->loc.name, uuid_utoa(local->loc.inode->gfid));
2309
                op_ret = -1;
2310
                op_errno = ENOMEM;
2311
                goto unlock;
2312
            }
2313
        }
2314

2315
        ctx->buf = *buf;
2316
    }
2317
unlock:
2318
    UNLOCK(&ctx->lock);
2319

2320
out:
2321
    QUOTA_STACK_UNWIND(link, frame, op_ret, op_errno, inode, buf, preparent,
2322
                       postparent, xdata);
2323

2324
    return 0;
2325
}
2326

2327
int32_t
2328
quota_link_helper(call_frame_t *frame, xlator_t *this, loc_t *oldloc,
2329
                  loc_t *newloc, dict_t *xdata)
2330
{
2331
    quota_local_t *local = NULL;
2332
    int32_t op_errno = EINVAL;
2333

2334
    local = frame->local;
2335

2336
    GF_VALIDATE_OR_GOTO("quota", local, unwind);
2337

2338
    op_errno = local->op_errno;
2339

2340
    if (local->op_ret == -1) {
2341
        goto unwind;
2342
    }
2343

2344
    STACK_WIND(frame, quota_link_cbk, FIRST_CHILD(this),
2345
               FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
2346
    return 0;
2347

2348
unwind:
2349
    QUOTA_STACK_UNWIND(link, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL);
2350
    return 0;
2351
}
2352

2353
void
2354
quota_link_continue(call_frame_t *frame)
2355
{
2356
    int32_t ret = -1;
2357
    int32_t op_errno = EIO;
2358
    quota_local_t *local = NULL;
2359
    uuid_t common_ancestor = {0};
2360
    xlator_t *this = NULL;
2361
    quota_inode_ctx_t *ctx = NULL;
2362
    inode_t *src_parent = NULL;
2363
    inode_t *dst_parent = NULL;
2364

2365
    local = frame->local;
2366
    this = THIS;
2367

2368
    if (local->op_ret < 0) {
2369
        op_errno = local->op_errno;
2370
        goto err;
2371
    }
2372

2373
    if (local->xdata && dict_get(local->xdata, GLUSTERFS_INTERNAL_FOP_KEY)) {
2374
        /* Treat link as rename, crawl upwards only till common ancestor
2375
         */
2376
        ret = quota_find_common_ancestor(
2377
            local->oldloc.inode, local->newloc.parent, &common_ancestor);
2378
        if (ret < 0 || gf_uuid_is_null(common_ancestor)) {
2379
            gf_msg(this->name, GF_LOG_ERROR, ESTALE,
2380
                   Q_MSG_ANCESTRY_BUILD_FAILED,
2381
                   "failed to get "
2382
                   "common_ancestor for %s and %s",
2383
                   local->oldloc.path, local->newloc.path);
2384
            op_errno = ESTALE;
2385
            goto err;
2386
        }
2387
    } else {
2388
        /* Treat link as a new file.
2389
         * TODO: Currently marker accounts twice for the links created
2390
         * across directories.
2391
         * This needs re-visit if marker accounts only once
2392
         * for the links created across directories
2393
         */
2394
        if (local->oldloc.parent)
2395
            src_parent = inode_ref(local->oldloc.parent);
2396
        else
2397
            src_parent = inode_parent(local->oldloc.inode, 0, NULL);
2398
        dst_parent = local->newloc.parent;
2399

2400
        /* No need to check quota limit if src and dst parents are same
2401
         */
2402
        if (src_parent == dst_parent ||
2403
            gf_uuid_compare(src_parent->gfid, dst_parent->gfid) == 0) {
2404
            inode_unref(src_parent);
2405
            goto wind;
2406
        }
2407

2408
        inode_unref(src_parent);
2409
    }
2410

2411
    quota_inode_ctx_get(local->oldloc.inode, this, &ctx, 0);
2412
    if (ctx == NULL) {
2413
        gf_msg_debug(this->name, 0,
2414
                     "quota context is NULL on inode"
2415
                     " (%s). If quota is not enabled recently and "
2416
                     "crawler has finished crawling, its an error",
2417
                     uuid_utoa(local->oldloc.inode->gfid));
2418
    }
2419

2420
    LOCK(&local->lock);
2421
    {
2422
        local->link_count = 1;
2423
        local->delta = (ctx != NULL) ? ctx->buf.ia_blocks * 512 : 0;
2424
        local->object_delta = 1;
2425
        gf_uuid_copy(local->common_ancestor, common_ancestor);
2426
    }
2427
    UNLOCK(&local->lock);
2428

2429
    quota_check_limit(frame, local->newloc.parent, this);
2430
    return;
2431

2432
err:
2433
    QUOTA_STACK_UNWIND(link, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL);
2434
    return;
2435

2436
wind:
2437
    STACK_WIND(frame, quota_link_cbk, FIRST_CHILD(this),
2438
               FIRST_CHILD(this)->fops->link, &(local->oldloc),
2439
               &(local->newloc), local->xdata);
2440
    return;
2441
}
2442

2443
int32_t
2444
quota_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
2445
           dict_t *xdata)
2446
{
2447
    quota_priv_t *priv = NULL;
2448
    int32_t ret = -1;
2449
    int32_t op_errno = ENOMEM;
2450
    quota_local_t *local = NULL;
2451
    call_stub_t *stub = NULL;
2452

2453
    priv = this->private;
2454

2455
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
2456

2457
    local = quota_local_new();
2458
    if (local == NULL) {
2459
        goto err;
2460
    }
2461

2462
    frame->local = (void *)local;
2463

2464
    if (xdata)
2465
        local->xdata = dict_ref(xdata);
2466

2467
    ret = loc_copy(&local->loc, newloc);
2468
    if (ret == -1) {
2469
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
2470
               "loc_copy failed");
2471
        goto err;
2472
    }
2473

2474
    ret = loc_copy(&local->oldloc, oldloc);
2475
    if (ret < 0) {
2476
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
2477
               "loc_copy failed");
2478
        goto err;
2479
    }
2480

2481
    ret = loc_copy(&local->newloc, newloc);
2482
    if (ret < 0) {
2483
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
2484
               "loc_copy failed");
2485
        goto err;
2486
    }
2487

2488
    /* No need to check quota limit if src and dst parents are same */
2489
    if (oldloc->parent && newloc->parent &&
2490
        !gf_uuid_compare(oldloc->parent->gfid, newloc->parent->gfid)) {
2491
        gf_msg_debug(this->name, GF_LOG_DEBUG,
2492
                     "link %s -> %s are "
2493
                     "in the same directory, so skip check limit",
2494
                     oldloc->path, newloc->path);
2495
        goto wind;
2496
    }
2497

2498
    stub = fop_link_stub(frame, quota_link_helper, oldloc, newloc, xdata);
2499
    if (stub == NULL) {
2500
        goto err;
2501
    }
2502

2503
    LOCK(&local->lock);
2504
    {
2505
        local->link_count = 2;
2506
        local->fop_continue_cbk = quota_link_continue;
2507
        local->stub = stub;
2508
    }
2509
    UNLOCK(&local->lock);
2510

2511
    check_ancestory(frame, newloc->parent);
2512

2513
    /* source parent can be NULL, so do check_ancestry on a file */
2514
    if (oldloc->parent)
2515
        check_ancestory(frame, oldloc->parent);
2516
    else
2517
        check_ancestory(frame, oldloc->inode);
2518

2519
    return 0;
2520

2521
err:
2522
    QUOTA_STACK_UNWIND(link, frame, -1, op_errno, NULL, NULL, NULL, NULL, NULL);
2523
    return 0;
2524

2525
off:
2526
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->link,
2527
                    oldloc, newloc, xdata);
2528
    return 0;
2529

2530
wind:
2531
    STACK_WIND(frame, quota_link_cbk, FIRST_CHILD(this),
2532
               FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
2533
    return 0;
2534
}
2535

2536
int32_t
2537
quota_rename_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
2538
                 int32_t op_ret, int32_t op_errno, struct iatt *buf,
2539
                 struct iatt *preoldparent, struct iatt *postoldparent,
2540
                 struct iatt *prenewparent, struct iatt *postnewparent,
2541
                 dict_t *xdata)
2542
{
2543
    int32_t ret = -1;
2544
    quota_local_t *local = NULL;
2545
    quota_inode_ctx_t *ctx = NULL;
2546
    quota_dentry_t *old_dentry = NULL, *dentry = NULL;
2547
    char new_dentry_found = 0;
2548

2549
    if (op_ret < 0) {
2550
        goto out;
2551
    }
2552

2553
    local = frame->local;
2554

2555
    GF_VALIDATE_OR_GOTO("quota", local, out);
2556

2557
    if (!QUOTA_REG_OR_LNK_FILE(local->oldloc.inode->ia_type))
2558
        goto out;
2559

2560
    ret = quota_inode_ctx_get(local->oldloc.inode, this, &ctx, 0);
2561
    if ((ret == -1) || (ctx == NULL)) {
2562
        gf_msg_debug(this->name, 0,
2563
                     "quota context is NULL on inode"
2564
                     " (%s). If quota is not enabled recently and "
2565
                     "crawler has finished crawling, its an error",
2566
                     uuid_utoa(local->oldloc.inode->gfid));
2567

2568
        goto out;
2569
    }
2570

2571
    LOCK(&ctx->lock);
2572
    {
2573
        list_for_each_entry(dentry, &ctx->parents, next)
2574
        {
2575
            if ((strcmp(dentry->name, local->oldloc.name) == 0) &&
2576
                (gf_uuid_compare(local->oldloc.parent->gfid, dentry->par) ==
2577
                 0)) {
2578
                old_dentry = dentry;
2579
            } else if ((strcmp(dentry->name, local->newloc.name) == 0) &&
2580
                       (gf_uuid_compare(local->newloc.parent->gfid,
2581
                                        dentry->par) == 0)) {
2582
                new_dentry_found = 1;
2583
                gf_msg_debug(this->name, 0,
2584
                             "new entry being "
2585
                             "linked (name:%s) for inode (gfid:%s) "
2586
                             "is in inode-dentry-list",
2587
                             dentry->name,
2588
                             uuid_utoa(local->oldloc.inode->gfid));
2589
            }
2590

2591
            if (old_dentry && new_dentry_found)
2592
                break;
2593
        }
2594

2595
        if (old_dentry != NULL) {
2596
            __quota_dentry_free(old_dentry);
2597
        } else {
2598
            gf_msg_debug(this->name, 0,
2599
                         "dentry corresponding"
2600
                         "the path just renamed (name:%s) is not"
2601
                         " present",
2602
                         local->oldloc.name);
2603
        }
2604

2605
        if (!new_dentry_found) {
2606
            dentry = __quota_dentry_new(ctx, (char *)local->newloc.name,
2607
                                        local->newloc.parent->gfid);
2608
            if (dentry == NULL) {
2609
                gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
2610
                       "cannot create a new dentry (name:%s) "
2611
                       "for inode(gfid:%s)",
2612
                       local->newloc.name,
2613
                       uuid_utoa(local->newloc.inode->gfid));
2614
                op_ret = -1;
2615
                op_errno = ENOMEM;
2616
                goto unlock;
2617
            }
2618
        }
2619

2620
        ctx->buf = *buf;
2621
    }
2622
unlock:
2623
    UNLOCK(&ctx->lock);
2624

2625
out:
2626
    QUOTA_STACK_UNWIND(rename, frame, op_ret, op_errno, buf, preoldparent,
2627
                       postoldparent, prenewparent, postnewparent, xdata);
2628

2629
    return 0;
2630
}
2631

2632
int32_t
2633
quota_rename_helper(call_frame_t *frame, xlator_t *this, loc_t *oldloc,
2634
                    loc_t *newloc, dict_t *xdata)
2635
{
2636
    quota_local_t *local = NULL;
2637
    int32_t op_errno = EINVAL;
2638

2639
    local = frame->local;
2640

2641
    GF_VALIDATE_OR_GOTO("quota", local, unwind);
2642

2643
    op_errno = local->op_errno;
2644

2645
    if (local->op_ret == -1) {
2646
        goto unwind;
2647
    }
2648

2649
    STACK_WIND(frame, quota_rename_cbk, FIRST_CHILD(this),
2650
               FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata);
2651

2652
    return 0;
2653

2654
unwind:
2655
    QUOTA_STACK_UNWIND(rename, frame, -1, op_errno, NULL, NULL, NULL, NULL,
2656
                       NULL, NULL);
2657
    return 0;
2658
}
2659

2660
static int32_t
2661
quota_rename_get_size_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
2662
                          int32_t op_ret, int32_t op_errno, inode_t *inode,
2663
                          struct iatt *buf, dict_t *xdata,
2664
                          struct iatt *postparent)
2665
{
2666
    quota_local_t *local = NULL;
2667
    int32_t ret = 0;
2668
    int64_t *size = 0;
2669

2670
    GF_ASSERT(frame);
2671
    GF_VALIDATE_OR_GOTO_WITH_ERROR("quota", this, out, op_errno, EINVAL);
2672
    GF_VALIDATE_OR_GOTO_WITH_ERROR(this->name, xdata, out, op_errno, EINVAL);
2673
    local = frame->local;
2674
    GF_ASSERT(local);
2675
    local->link_count = 1;
2676

2677
    if (op_ret < 0)
2678
        goto out;
2679

2680
    ret = dict_get_bin(xdata, QUOTA_SIZE_KEY, (void **)&size);
2681
    if (ret < 0) {
2682
        gf_msg(this->name, GF_LOG_WARNING, EINVAL, Q_MSG_SIZE_KEY_MISSING,
2683
               "size key not present in dict");
2684
        op_errno = EINVAL;
2685
        goto out;
2686
    }
2687
    local->delta = be64toh(*size);
2688
    local->object_delta = 1;
2689
    quota_check_limit(frame, local->newloc.parent, this);
2690
    return 0;
2691

2692
out:
2693
    quota_handle_validate_error(frame, -1, op_errno);
2694
    return 0;
2695
}
2696

2697
void
2698
quota_rename_continue(call_frame_t *frame)
2699
{
2700
    int32_t ret = -1;
2701
    int32_t op_errno = EIO;
2702
    quota_local_t *local = NULL;
2703
    uuid_t common_ancestor = {0};
2704
    xlator_t *this = NULL;
2705
    quota_inode_ctx_t *ctx = NULL;
2706

2707
    local = frame->local;
2708
    this = THIS;
2709

2710
    if (local->op_ret < 0) {
2711
        op_errno = local->op_errno;
2712
        goto err;
2713
    }
2714

2715
    ret = quota_find_common_ancestor(local->oldloc.parent, local->newloc.parent,
2716
                                     &common_ancestor);
2717
    if (ret < 0 || gf_uuid_is_null(common_ancestor)) {
2718
        gf_msg(this->name, GF_LOG_ERROR, ESTALE, Q_MSG_ANCESTRY_BUILD_FAILED,
2719
               "failed to get "
2720
               "common_ancestor for %s and %s",
2721
               local->oldloc.path, local->newloc.path);
2722
        op_errno = ESTALE;
2723
        goto err;
2724
    }
2725

2726
    LOCK(&local->lock);
2727
    {
2728
        local->link_count = 1;
2729
        gf_uuid_copy(local->common_ancestor, common_ancestor);
2730
    }
2731
    UNLOCK(&local->lock);
2732

2733
    if (QUOTA_REG_OR_LNK_FILE(local->oldloc.inode->ia_type)) {
2734
        ret = quota_inode_ctx_get(local->oldloc.inode, this, &ctx, 0);
2735
        if (ctx == NULL) {
2736
            gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_INODE_CTX_GET_FAILED,
2737
                   "quota context not set in inode (gfid:%s), "
2738
                   "considering file size as zero while enforcing "
2739
                   "quota on new ancestry",
2740
                   uuid_utoa(local->oldloc.inode->gfid));
2741

2742
            local->delta = 0;
2743
            local->object_delta = 1;
2744
        } else {
2745
            /* FIXME: We need to account for the size occupied by
2746
             * this inode on the target directory. To avoid double
2747
             * accounting, we need to modify enforcer to perform
2748
             * quota_check_limit only up till the least common
2749
             * ancestor directory inode*/
2750

2751
            /* FIXME: The following code assumes that regular files
2752
             * and link files are present, in their entirety, in a
2753
             * single brick. This *assumption is invalid in the
2754
             * case of stripe.*/
2755

2756
            local->delta = ctx->buf.ia_blocks * 512;
2757
            local->object_delta = 1;
2758
        }
2759

2760
    } else if (IA_ISDIR(local->oldloc.inode->ia_type)) {
2761
        ret = quota_validate(frame, local->oldloc.inode, this,
2762
                             quota_rename_get_size_cbk);
2763
        if (ret) {
2764
            op_errno = -ret;
2765
            goto err;
2766
        }
2767

2768
        return;
2769
    }
2770

2771
    quota_check_limit(frame, local->newloc.parent, this);
2772
    return;
2773

2774
err:
2775
    QUOTA_STACK_UNWIND(rename, frame, -1, op_errno, NULL, NULL, NULL, NULL,
2776
                       NULL, NULL);
2777
    return;
2778
}
2779

2780
int32_t
2781
quota_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
2782
             dict_t *xdata)
2783
{
2784
    quota_priv_t *priv = NULL;
2785
    int32_t ret = -1;
2786
    int32_t op_errno = ENOMEM;
2787
    quota_local_t *local = NULL;
2788
    call_stub_t *stub = NULL;
2789

2790
    priv = this->private;
2791

2792
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
2793

2794
    local = quota_local_new();
2795
    if (local == NULL) {
2796
        goto err;
2797
    }
2798

2799
    frame->local = local;
2800

2801
    ret = loc_copy(&local->oldloc, oldloc);
2802
    if (ret < 0) {
2803
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
2804
               "loc_copy failed");
2805
        goto err;
2806
    }
2807

2808
    ret = loc_copy(&local->newloc, newloc);
2809
    if (ret < 0) {
2810
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
2811
               "loc_copy failed");
2812
        goto err;
2813
    }
2814

2815
    /* No need to check quota limit if src and dst parents are same */
2816
    if (oldloc->parent && newloc->parent &&
2817
        !gf_uuid_compare(oldloc->parent->gfid, newloc->parent->gfid)) {
2818
        gf_msg_debug(this->name, 0,
2819
                     "rename %s -> %s are "
2820
                     "in the same directory, so skip check limit",
2821
                     oldloc->path, newloc->path);
2822
        goto wind;
2823
    }
2824

2825
    stub = fop_rename_stub(frame, quota_rename_helper, oldloc, newloc, xdata);
2826
    if (stub == NULL) {
2827
        goto err;
2828
    }
2829

2830
    LOCK(&local->lock);
2831
    {
2832
        /* link_count here tell how many check_ancestry should be done
2833
         * before continuing the FOP
2834
         */
2835
        local->link_count = 2;
2836
        local->stub = stub;
2837
        local->fop_continue_cbk = quota_rename_continue;
2838
    }
2839
    UNLOCK(&local->lock);
2840

2841
    check_ancestory(frame, newloc->parent);
2842
    check_ancestory(frame, oldloc->parent);
2843
    return 0;
2844

2845
err:
2846
    QUOTA_STACK_UNWIND(rename, frame, -1, op_errno, NULL, NULL, NULL, NULL,
2847
                       NULL, NULL);
2848
    return 0;
2849

2850
off:
2851
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename,
2852
                    oldloc, newloc, xdata);
2853
    return 0;
2854

2855
wind:
2856
    STACK_WIND(frame, quota_rename_cbk, FIRST_CHILD(this),
2857
               FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata);
2858
    return 0;
2859
}
2860

2861
int32_t
2862
quota_symlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
2863
                  int32_t op_ret, int32_t op_errno, inode_t *inode,
2864
                  struct iatt *buf, struct iatt *preparent,
2865
                  struct iatt *postparent, dict_t *xdata)
2866
{
2867
    quota_local_t *local = NULL;
2868
    quota_inode_ctx_t *ctx = NULL;
2869
    quota_dentry_t *dentry = NULL;
2870
    int32_t ret = -1;
2871

2872
    if (op_ret < 0) {
2873
        goto out;
2874
    }
2875

2876
    local = frame->local;
2877

2878
    ret = quota_inode_ctx_get(local->loc.inode, this, &ctx, 1);
2879
    if ((ret == -1) || (ctx == NULL)) {
2880
        gf_msg_debug(this->name, 0,
2881
                     "quota context is NULL on inode"
2882
                     " (%s). If quota is not enabled recently and "
2883
                     "crawler has finished crawling, its an error",
2884
                     uuid_utoa(local->loc.inode->gfid));
2885

2886
        goto out;
2887
    }
2888

2889
    LOCK(&ctx->lock);
2890
    {
2891
        ctx->buf = *buf;
2892

2893
        dentry = __quota_dentry_new(ctx, (char *)local->loc.name,
2894
                                    local->loc.parent->gfid);
2895
        if (dentry == NULL) {
2896
            gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
2897
                   "cannot create "
2898
                   "a new dentry (name:%s) for inode(gfid:%s)",
2899
                   local->loc.name, uuid_utoa(local->loc.inode->gfid));
2900
            op_ret = -1;
2901
            op_errno = ENOMEM;
2902
        }
2903
    }
2904
    UNLOCK(&ctx->lock);
2905

2906
out:
2907
    QUOTA_STACK_UNWIND(symlink, frame, op_ret, op_errno, inode, buf, preparent,
2908
                       postparent, xdata);
2909

2910
    return 0;
2911
}
2912

2913
int
2914
quota_symlink_helper(call_frame_t *frame, xlator_t *this, const char *linkpath,
2915
                     loc_t *loc, mode_t umask, dict_t *xdata)
2916
{
2917
    quota_local_t *local = NULL;
2918
    int32_t op_errno = EINVAL;
2919

2920
    local = frame->local;
2921

2922
    GF_VALIDATE_OR_GOTO("quota", local, unwind);
2923

2924
    if (local->op_ret == -1) {
2925
        op_errno = local->op_errno;
2926
        goto unwind;
2927
    }
2928

2929
    STACK_WIND(frame, quota_symlink_cbk, FIRST_CHILD(this),
2930
               FIRST_CHILD(this)->fops->symlink, linkpath, loc, umask, xdata);
2931
    return 0;
2932

2933
unwind:
2934
    QUOTA_STACK_UNWIND(symlink, frame, -1, op_errno, NULL, NULL, NULL, NULL,
2935
                       NULL);
2936
    return 0;
2937
}
2938

2939
int
2940
quota_symlink(call_frame_t *frame, xlator_t *this, const char *linkpath,
2941
              loc_t *loc, mode_t umask, dict_t *xdata)
2942
{
2943
    quota_priv_t *priv = NULL;
2944
    int32_t ret = -1;
2945
    int32_t op_errno = ENOMEM;
2946
    quota_local_t *local = NULL;
2947
    call_stub_t *stub = NULL;
2948

2949
    priv = this->private;
2950

2951
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
2952

2953
    local = quota_local_new();
2954
    if (local == NULL) {
2955
        goto err;
2956
    }
2957

2958
    frame->local = local;
2959

2960
    ret = loc_copy(&local->loc, loc);
2961
    if (ret < 0) {
2962
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
2963
               "loc_copy failed");
2964
        goto err;
2965
    }
2966

2967
    stub = fop_symlink_stub(frame, quota_symlink_helper, linkpath, loc, umask,
2968
                            xdata);
2969
    if (stub == NULL) {
2970
        goto err;
2971
    }
2972

2973
    LOCK(&local->lock);
2974
    {
2975
        local->stub = stub;
2976
        local->delta = strlen(linkpath);
2977
        local->object_delta = 1;
2978
        local->link_count = 1;
2979
    }
2980
    UNLOCK(&local->lock);
2981

2982
    quota_check_limit(frame, loc->parent, this);
2983
    return 0;
2984

2985
err:
2986
    QUOTA_STACK_UNWIND(symlink, frame, -1, op_errno, NULL, NULL, NULL, NULL,
2987
                       NULL);
2988

2989
    return 0;
2990

2991
off:
2992
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->symlink,
2993
                    linkpath, loc, umask, xdata);
2994
    return 0;
2995
}
2996

2997
int32_t
2998
quota_truncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
2999
                   int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
3000
                   struct iatt *postbuf, dict_t *xdata)
3001
{
3002
    quota_local_t *local = NULL;
3003
    quota_inode_ctx_t *ctx = NULL;
3004

3005
    if (op_ret < 0) {
3006
        goto out;
3007
    }
3008

3009
    local = frame->local;
3010

3011
    GF_VALIDATE_OR_GOTO("quota", local, out);
3012

3013
    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);
3014
    if (ctx == NULL) {
3015
        gf_msg_debug(this->name, 0,
3016
                     "quota context is NULL on inode"
3017
                     " (%s). If quota is not enabled recently and "
3018
                     "crawler has finished crawling, its an error",
3019
                     uuid_utoa(local->loc.inode->gfid));
3020
        goto out;
3021
    }
3022

3023
    LOCK(&ctx->lock);
3024
    {
3025
        ctx->buf = *postbuf;
3026
    }
3027
    UNLOCK(&ctx->lock);
3028

3029
out:
3030
    QUOTA_STACK_UNWIND(truncate, frame, op_ret, op_errno, prebuf, postbuf,
3031
                       xdata);
3032
    return 0;
3033
}
3034

3035
int32_t
3036
quota_truncate(call_frame_t *frame, xlator_t *this, loc_t *loc, off_t offset,
3037
               dict_t *xdata)
3038
{
3039
    quota_priv_t *priv = NULL;
3040
    int32_t ret = -1;
3041
    quota_local_t *local = NULL;
3042

3043
    priv = this->private;
3044

3045
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
3046

3047
    local = quota_local_new();
3048
    if (local == NULL) {
3049
        goto err;
3050
    }
3051

3052
    frame->local = local;
3053

3054
    ret = loc_copy(&local->loc, loc);
3055
    if (ret < 0) {
3056
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
3057
               "loc_copy failed");
3058
        goto err;
3059
    }
3060

3061
    STACK_WIND(frame, quota_truncate_cbk, FIRST_CHILD(this),
3062
               FIRST_CHILD(this)->fops->truncate, loc, offset, xdata);
3063

3064
    return 0;
3065

3066
err:
3067
    QUOTA_STACK_UNWIND(truncate, frame, -1, ENOMEM, NULL, NULL, NULL);
3068

3069
    return 0;
3070
off:
3071
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->truncate,
3072
                    loc, offset, xdata);
3073
    return 0;
3074
}
3075

3076
int32_t
3077
quota_ftruncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
3078
                    int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
3079
                    struct iatt *postbuf, dict_t *xdata)
3080
{
3081
    quota_local_t *local = NULL;
3082
    quota_inode_ctx_t *ctx = NULL;
3083

3084
    if (op_ret < 0) {
3085
        goto out;
3086
    }
3087

3088
    local = frame->local;
3089

3090
    GF_VALIDATE_OR_GOTO("quota", local, out);
3091

3092
    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);
3093
    if (ctx == NULL) {
3094
        gf_msg_debug(this->name, 0,
3095
                     "quota context is NULL on inode"
3096
                     " (%s). If quota is not enabled recently and "
3097
                     "crawler has finished crawling, its an error",
3098
                     uuid_utoa(local->loc.inode->gfid));
3099
        goto out;
3100
    }
3101

3102
    LOCK(&ctx->lock);
3103
    {
3104
        ctx->buf = *postbuf;
3105
    }
3106
    UNLOCK(&ctx->lock);
3107

3108
out:
3109
    QUOTA_STACK_UNWIND(ftruncate, frame, op_ret, op_errno, prebuf, postbuf,
3110
                       xdata);
3111
    return 0;
3112
}
3113

3114
int32_t
3115
quota_ftruncate(call_frame_t *frame, xlator_t *this, fd_t *fd, off_t offset,
3116
                dict_t *xdata)
3117
{
3118
    quota_priv_t *priv = NULL;
3119
    quota_local_t *local = NULL;
3120

3121
    priv = this->private;
3122

3123
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
3124

3125
    local = quota_local_new();
3126
    if (local == NULL)
3127
        goto err;
3128

3129
    frame->local = local;
3130

3131
    local->loc.inode = inode_ref(fd->inode);
3132

3133
    STACK_WIND(frame, quota_ftruncate_cbk, FIRST_CHILD(this),
3134
               FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata);
3135

3136
    return 0;
3137
err:
3138
    QUOTA_STACK_UNWIND(ftruncate, frame, -1, ENOMEM, NULL, NULL, NULL);
3139

3140
    return 0;
3141

3142
off:
3143
    STACK_WIND_TAIL(frame, FIRST_CHILD(this),
3144
                    FIRST_CHILD(this)->fops->ftruncate, fd, offset, xdata);
3145
    return 0;
3146
}
3147

3148
static int32_t
3149
quota_send_dir_limit_to_cli(call_frame_t *frame, xlator_t *this, inode_t *inode,
3150
                            const char *name, const int namelen)
3151
{
3152
    int32_t ret = 0;
3153
    int dir_limit_len = 0;
3154
    char dir_limit[64] = {
3155
        0,
3156
    };
3157
    dict_t *dict = NULL;
3158
    quota_inode_ctx_t *ctx = NULL;
3159
    uint64_t value = 0;
3160
    quota_priv_t *priv = NULL;
3161

3162
    priv = this->private;
3163
    if (!priv->is_quota_on) {
3164
        dir_limit_len = snprintf(dir_limit, sizeof(dir_limit),
3165
                                 "Quota is disabled please turn on");
3166
        goto dict_set;
3167
    }
3168

3169
    ret = inode_ctx_get(inode, this, &value);
3170
    if (ret < 0)
3171
        goto out;
3172

3173
    ctx = (quota_inode_ctx_t *)(unsigned long)value;
3174
    dir_limit_len = snprintf(dir_limit, sizeof(dir_limit),
3175
                             "%" PRId64 ",%" PRId64, ctx->size, ctx->hard_lim);
3176

3177
dict_set:
3178
    dict = dict_new();
3179
    if (dict == NULL) {
3180
        ret = -1;
3181
        goto out;
3182
    }
3183

3184
    ret = dict_set_nstrn(dict, (char *)name, namelen, dir_limit, dir_limit_len);
3185
    if (ret < 0)
3186
        goto out;
3187

3188
    gf_msg_debug(this->name, 0, "str = %s", dir_limit);
3189

3190
    QUOTA_STACK_UNWIND(getxattr, frame, 0, 0, dict, NULL);
3191

3192
    ret = 0;
3193

3194
out:
3195
    if (dict)
3196
        dict_unref(dict);
3197
    return ret;
3198
}
3199

3200
int32_t
3201
quota_fgetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, const char *name,
3202
                dict_t *xdata)
3203
{
3204
    int32_t ret = 0;
3205

3206
    if (name && strcasecmp(name, "trusted.limit.list") == 0) {
3207
        ret = quota_send_dir_limit_to_cli(frame, this, fd->inode,
3208
                                          "trusted.limit.list",
3209
                                          SLEN("trusted.limit.list"));
3210
        if (ret == 0) {
3211
            return 0;
3212
        }
3213
    }
3214

3215
    STACK_WIND(frame, default_fgetxattr_cbk, FIRST_CHILD(this),
3216
               FIRST_CHILD(this)->fops->fgetxattr, fd, name, xdata);
3217
    return 0;
3218
}
3219

3220
int32_t
3221
quota_getxattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
3222
               const char *name, dict_t *xdata)
3223
{
3224
    int32_t ret = 0;
3225

3226
    if ((name != NULL) && strcasecmp(name, "trusted.limit.list") == 0) {
3227
        ret = quota_send_dir_limit_to_cli(frame, this, loc->inode,
3228
                                          "trusted.limit.list",
3229
                                          SLEN("trusted.limit.list"));
3230
        if (ret == 0)
3231
            return 0;
3232
    }
3233

3234
    STACK_WIND(frame, default_getxattr_cbk, FIRST_CHILD(this),
3235
               FIRST_CHILD(this)->fops->getxattr, loc, name, xdata);
3236
    return 0;
3237
}
3238

3239
int32_t
3240
quota_stat_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
3241
               int32_t op_ret, int32_t op_errno, struct iatt *buf,
3242
               dict_t *xdata)
3243
{
3244
    quota_local_t *local = NULL;
3245
    quota_inode_ctx_t *ctx = NULL;
3246

3247
    if (op_ret < 0) {
3248
        goto out;
3249
    }
3250

3251
    local = frame->local;
3252

3253
    GF_VALIDATE_OR_GOTO("quota", local, out);
3254

3255
    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);
3256
    if (ctx == NULL) {
3257
        if (!IA_ISDIR(buf->ia_type)) {
3258
            gf_msg_debug(this->name, 0,
3259
                         "quota context is NULL on inode"
3260
                         " (%s). If quota is not enabled recently and "
3261
                         "crawler has finished crawling, its an error",
3262
                         uuid_utoa(local->loc.inode->gfid));
3263
        }
3264

3265
        goto out;
3266
    }
3267

3268
    if (buf) {
3269
        LOCK(&ctx->lock);
3270
        ctx->buf = *buf;
3271
        UNLOCK(&ctx->lock);
3272
    }
3273

3274
out:
3275
    QUOTA_STACK_UNWIND(stat, frame, op_ret, op_errno, buf, xdata);
3276
    return 0;
3277
}
3278

3279
int32_t
3280
quota_stat(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
3281
{
3282
    quota_priv_t *priv = NULL;
3283
    quota_local_t *local = NULL;
3284
    int32_t ret = -1;
3285

3286
    priv = this->private;
3287

3288
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
3289

3290
    local = quota_local_new();
3291
    if (local == NULL) {
3292
        goto unwind;
3293
    }
3294

3295
    frame->local = local;
3296
    ret = loc_copy(&local->loc, loc);
3297
    if (ret < 0) {
3298
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
3299
               "loc_copy failed");
3300
        goto unwind;
3301
    }
3302

3303
    STACK_WIND(frame, quota_stat_cbk, FIRST_CHILD(this),
3304
               FIRST_CHILD(this)->fops->stat, loc, xdata);
3305
    return 0;
3306

3307
unwind:
3308
    QUOTA_STACK_UNWIND(stat, frame, -1, ENOMEM, NULL, NULL);
3309
    return 0;
3310

3311
off:
3312
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->stat,
3313
                    loc, xdata);
3314
    return 0;
3315
}
3316

3317
int32_t
3318
quota_fstat_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
3319
                int32_t op_ret, int32_t op_errno, struct iatt *buf,
3320
                dict_t *xdata)
3321
{
3322
    quota_local_t *local = NULL;
3323
    quota_inode_ctx_t *ctx = NULL;
3324

3325
    if (op_ret < 0) {
3326
        goto out;
3327
    }
3328

3329
    local = frame->local;
3330

3331
    GF_VALIDATE_OR_GOTO("quota", local, out);
3332

3333
    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);
3334
    if (ctx == NULL) {
3335
        if (!IA_ISDIR(buf->ia_type)) {
3336
            gf_msg_debug(this->name, 0,
3337
                         "quota context is NULL on inode"
3338
                         " (%s). If quota is not enabled recently and "
3339
                         "crawler has finished crawling, its an error",
3340
                         uuid_utoa(local->loc.inode->gfid));
3341
        }
3342

3343
        goto out;
3344
    }
3345

3346
    if (buf) {
3347
        LOCK(&ctx->lock);
3348
        ctx->buf = *buf;
3349
        UNLOCK(&ctx->lock);
3350
    }
3351

3352
out:
3353
    QUOTA_STACK_UNWIND(fstat, frame, op_ret, op_errno, buf, xdata);
3354
    return 0;
3355
}
3356

3357
int32_t
3358
quota_fstat(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *xdata)
3359
{
3360
    quota_priv_t *priv = NULL;
3361
    quota_local_t *local = NULL;
3362

3363
    priv = this->private;
3364

3365
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
3366

3367
    local = quota_local_new();
3368
    if (local == NULL) {
3369
        goto unwind;
3370
    }
3371

3372
    frame->local = local;
3373

3374
    local->loc.inode = inode_ref(fd->inode);
3375

3376
    STACK_WIND(frame, quota_fstat_cbk, FIRST_CHILD(this),
3377
               FIRST_CHILD(this)->fops->fstat, fd, xdata);
3378
    return 0;
3379

3380
unwind:
3381
    QUOTA_STACK_UNWIND(fstat, frame, -1, ENOMEM, NULL, NULL);
3382
    return 0;
3383

3384
off:
3385
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fstat,
3386
                    fd, xdata);
3387
    return 0;
3388
}
3389

3390
int32_t
3391
quota_readlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
3392
                   int32_t op_ret, int32_t op_errno, const char *path,
3393
                   struct iatt *buf, dict_t *xdata)
3394
{
3395
    quota_local_t *local = NULL;
3396
    quota_inode_ctx_t *ctx = NULL;
3397

3398
    if (op_ret < 0) {
3399
        goto out;
3400
    }
3401

3402
    local = frame->local;
3403

3404
    GF_VALIDATE_OR_GOTO("quota", local, out);
3405

3406
    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);
3407
    if (ctx == NULL) {
3408
        gf_msg_debug(this->name, 0,
3409
                     "quota context is NULL on inode"
3410
                     " (%s). If quota is not enabled recently and "
3411
                     "crawler has finished crawling, its an error",
3412
                     uuid_utoa(local->loc.inode->gfid));
3413
        goto out;
3414
    }
3415

3416
    LOCK(&ctx->lock);
3417
    {
3418
        ctx->buf = *buf;
3419
    }
3420
    UNLOCK(&ctx->lock);
3421

3422
out:
3423
    QUOTA_STACK_UNWIND(readlink, frame, op_ret, op_errno, path, buf, xdata);
3424
    return 0;
3425
}
3426

3427
int32_t
3428
quota_readlink(call_frame_t *frame, xlator_t *this, loc_t *loc, size_t size,
3429
               dict_t *xdata)
3430
{
3431
    quota_priv_t *priv = NULL;
3432
    quota_local_t *local = NULL;
3433
    int32_t ret = -1;
3434

3435
    priv = this->private;
3436

3437
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
3438

3439
    local = quota_local_new();
3440
    if (local == NULL) {
3441
        goto unwind;
3442
    }
3443

3444
    frame->local = local;
3445

3446
    ret = loc_copy(&local->loc, loc);
3447
    if (ret < 0) {
3448
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
3449
               "loc_copy failed");
3450
        goto unwind;
3451
    }
3452

3453
    STACK_WIND(frame, quota_readlink_cbk, FIRST_CHILD(this),
3454
               FIRST_CHILD(this)->fops->readlink, loc, size, xdata);
3455
    return 0;
3456

3457
unwind:
3458
    QUOTA_STACK_UNWIND(readlink, frame, -1, ENOMEM, NULL, NULL, NULL);
3459
    return 0;
3460

3461
off:
3462
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readlink,
3463
                    loc, size, xdata);
3464
    return 0;
3465
}
3466

3467
int32_t
3468
quota_readv_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
3469
                int32_t op_ret, int32_t op_errno, struct iovec *vector,
3470
                int32_t count, struct iatt *buf, struct iobref *iobref,
3471
                dict_t *xdata)
3472
{
3473
    quota_local_t *local = NULL;
3474
    quota_inode_ctx_t *ctx = NULL;
3475

3476
    if (op_ret < 0) {
3477
        goto out;
3478
    }
3479

3480
    local = frame->local;
3481

3482
    GF_VALIDATE_OR_GOTO("quota", local, out);
3483

3484
    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);
3485
    if (ctx == NULL) {
3486
        gf_msg_debug(this->name, 0,
3487
                     "quota context is NULL on inode"
3488
                     " (%s). If quota is not enabled recently and "
3489
                     "crawler has finished crawling, its an error",
3490
                     uuid_utoa(local->loc.inode->gfid));
3491
        goto out;
3492
    }
3493

3494
    LOCK(&ctx->lock);
3495
    {
3496
        ctx->buf = *buf;
3497
    }
3498
    UNLOCK(&ctx->lock);
3499

3500
out:
3501
    QUOTA_STACK_UNWIND(readv, frame, op_ret, op_errno, vector, count, buf,
3502
                       iobref, xdata);
3503
    return 0;
3504
}
3505

3506
int32_t
3507
quota_readv(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
3508
            off_t offset, uint32_t flags, dict_t *xdata)
3509
{
3510
    quota_priv_t *priv = NULL;
3511
    quota_local_t *local = NULL;
3512

3513
    priv = this->private;
3514

3515
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
3516

3517
    local = quota_local_new();
3518
    if (local == NULL) {
3519
        goto unwind;
3520
    }
3521

3522
    frame->local = local;
3523

3524
    local->loc.inode = inode_ref(fd->inode);
3525

3526
    STACK_WIND(frame, quota_readv_cbk, FIRST_CHILD(this),
3527
               FIRST_CHILD(this)->fops->readv, fd, size, offset, flags, xdata);
3528
    return 0;
3529

3530
unwind:
3531
    QUOTA_STACK_UNWIND(readv, frame, -1, ENOMEM, NULL, -1, NULL, NULL, NULL);
3532
    return 0;
3533

3534
off:
3535
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readv,
3536
                    fd, size, offset, flags, xdata);
3537
    return 0;
3538
}
3539

3540
int32_t
3541
quota_fsync_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
3542
                int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
3543
                struct iatt *postbuf, dict_t *xdata)
3544
{
3545
    quota_local_t *local = NULL;
3546
    quota_inode_ctx_t *ctx = NULL;
3547

3548
    if (op_ret < 0) {
3549
        goto out;
3550
    }
3551

3552
    local = frame->local;
3553

3554
    GF_VALIDATE_OR_GOTO("quota", local, out);
3555

3556
    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);
3557
    if (ctx == NULL) {
3558
        gf_msg_debug(this->name, 0,
3559
                     "quota context is NULL on inode"
3560
                     " (%s). If quota is not enabled recently and "
3561
                     "crawler has finished crawling, its an error",
3562
                     uuid_utoa(local->loc.inode->gfid));
3563
        goto out;
3564
    }
3565

3566
    LOCK(&ctx->lock);
3567
    {
3568
        ctx->buf = *postbuf;
3569
    }
3570
    UNLOCK(&ctx->lock);
3571

3572
out:
3573
    QUOTA_STACK_UNWIND(fsync, frame, op_ret, op_errno, prebuf, postbuf, xdata);
3574
    return 0;
3575
}
3576

3577
int32_t
3578
quota_fsync(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t flags,
3579
            dict_t *xdata)
3580
{
3581
    quota_priv_t *priv = NULL;
3582
    quota_local_t *local = NULL;
3583

3584
    priv = this->private;
3585

3586
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
3587

3588
    local = quota_local_new();
3589
    if (local == NULL) {
3590
        goto unwind;
3591
    }
3592

3593
    local->loc.inode = inode_ref(fd->inode);
3594

3595
    frame->local = local;
3596

3597
    STACK_WIND(frame, quota_fsync_cbk, FIRST_CHILD(this),
3598
               FIRST_CHILD(this)->fops->fsync, fd, flags, xdata);
3599
    return 0;
3600

3601
unwind:
3602
    QUOTA_STACK_UNWIND(fsync, frame, -1, ENOMEM, NULL, NULL, NULL);
3603
    return 0;
3604

3605
off:
3606
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsync,
3607
                    fd, flags, xdata);
3608
    return 0;
3609
}
3610

3611
int32_t
3612
quota_setattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
3613
                  int32_t op_ret, int32_t op_errno, struct iatt *statpre,
3614
                  struct iatt *statpost, dict_t *xdata)
3615
{
3616
    quota_local_t *local = NULL;
3617
    quota_inode_ctx_t *ctx = NULL;
3618

3619
    if (op_ret < 0) {
3620
        goto out;
3621
    }
3622

3623
    local = frame->local;
3624

3625
    GF_VALIDATE_OR_GOTO("quota", local, out);
3626

3627
    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);
3628
    if (ctx == NULL) {
3629
        if (!IA_ISDIR(statpost->ia_type)) {
3630
            gf_msg_debug(this->name, 0,
3631
                         "quota context is NULL on inode"
3632
                         " (%s). If quota is not enabled recently and "
3633
                         "crawler has finished crawling, its an error",
3634
                         uuid_utoa(local->loc.inode->gfid));
3635
        }
3636

3637
        goto out;
3638
    }
3639

3640
    if (statpost) {
3641
        LOCK(&ctx->lock);
3642
        ctx->buf = *statpost;
3643
        UNLOCK(&ctx->lock);
3644
    }
3645

3646
out:
3647
    QUOTA_STACK_UNWIND(setattr, frame, op_ret, op_errno, statpre, statpost,
3648
                       xdata);
3649
    return 0;
3650
}
3651

3652
int32_t
3653
quota_setattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
3654
              struct iatt *stbuf, int32_t valid, dict_t *xdata)
3655
{
3656
    quota_priv_t *priv = NULL;
3657
    quota_local_t *local = NULL;
3658
    int32_t ret = -1;
3659

3660
    priv = this->private;
3661

3662
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
3663

3664
    local = quota_local_new();
3665
    if (local == NULL) {
3666
        goto unwind;
3667
    }
3668

3669
    frame->local = local;
3670

3671
    ret = loc_copy(&local->loc, loc);
3672
    if (ret < 0) {
3673
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
3674
               "loc_copy failed");
3675
        goto unwind;
3676
    }
3677

3678
    STACK_WIND(frame, quota_setattr_cbk, FIRST_CHILD(this),
3679
               FIRST_CHILD(this)->fops->setattr, loc, stbuf, valid, xdata);
3680
    return 0;
3681

3682
unwind:
3683
    QUOTA_STACK_UNWIND(setattr, frame, -1, ENOMEM, NULL, NULL, NULL);
3684
    return 0;
3685

3686
off:
3687
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setattr,
3688
                    loc, stbuf, valid, xdata);
3689
    return 0;
3690
}
3691

3692
int32_t
3693
quota_fsetattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
3694
                   int32_t op_ret, int32_t op_errno, struct iatt *statpre,
3695
                   struct iatt *statpost, dict_t *xdata)
3696
{
3697
    quota_local_t *local = NULL;
3698
    quota_inode_ctx_t *ctx = NULL;
3699

3700
    if (op_ret < 0) {
3701
        goto out;
3702
    }
3703

3704
    local = frame->local;
3705

3706
    GF_VALIDATE_OR_GOTO("quota", local, out);
3707

3708
    quota_inode_ctx_get(local->loc.inode, this, &ctx, 0);
3709
    if (ctx == NULL) {
3710
        if (!IA_ISDIR(statpost->ia_type)) {
3711
            gf_msg_debug(this->name, 0,
3712
                         "quota context is NULL on inode"
3713
                         " (%s). If quota is not enabled recently and "
3714
                         "crawler has finished crawling, its an error",
3715
                         uuid_utoa(local->loc.inode->gfid));
3716
        }
3717

3718
        goto out;
3719
    }
3720

3721
    LOCK(&ctx->lock);
3722
    {
3723
        ctx->buf = *statpost;
3724
    }
3725
    UNLOCK(&ctx->lock);
3726

3727
out:
3728
    QUOTA_STACK_UNWIND(fsetattr, frame, op_ret, op_errno, statpre, statpost,
3729
                       xdata);
3730
    return 0;
3731
}
3732

3733
int32_t
3734
quota_fsetattr(call_frame_t *frame, xlator_t *this, fd_t *fd,
3735
               struct iatt *stbuf, int32_t valid, dict_t *xdata)
3736
{
3737
    quota_priv_t *priv = NULL;
3738
    quota_local_t *local = NULL;
3739

3740
    priv = this->private;
3741

3742
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
3743

3744
    local = quota_local_new();
3745
    if (local == NULL) {
3746
        goto unwind;
3747
    }
3748

3749
    frame->local = local;
3750

3751
    local->loc.inode = inode_ref(fd->inode);
3752

3753
    STACK_WIND(frame, quota_fsetattr_cbk, FIRST_CHILD(this),
3754
               FIRST_CHILD(this)->fops->fsetattr, fd, stbuf, valid, xdata);
3755
    return 0;
3756

3757
unwind:
3758
    QUOTA_STACK_UNWIND(fsetattr, frame, -1, ENOMEM, NULL, NULL, NULL);
3759
    return 0;
3760

3761
off:
3762
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->fsetattr,
3763
                    fd, stbuf, valid, xdata);
3764
    return 0;
3765
}
3766

3767
int32_t
3768
quota_mknod_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
3769
                int32_t op_ret, int32_t op_errno, inode_t *inode,
3770
                struct iatt *buf, struct iatt *preparent,
3771
                struct iatt *postparent, dict_t *xdata)
3772
{
3773
    int32_t ret = -1;
3774
    quota_local_t *local = NULL;
3775
    quota_inode_ctx_t *ctx = NULL;
3776
    quota_dentry_t *dentry = NULL;
3777

3778
    local = frame->local;
3779
    if (op_ret < 0) {
3780
        goto unwind;
3781
    }
3782

3783
    ret = quota_inode_ctx_get(inode, this, &ctx, 1);
3784
    if ((ret == -1) || (ctx == NULL)) {
3785
        gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_INODE_CTX_GET_FAILED,
3786
               "cannot create quota context in "
3787
               "inode(gfid:%s)",
3788
               uuid_utoa(inode->gfid));
3789
        op_ret = -1;
3790
        op_errno = ENOMEM;
3791
        goto unwind;
3792
    }
3793

3794
    LOCK(&ctx->lock);
3795
    {
3796
        ctx->buf = *buf;
3797

3798
        dentry = __quota_dentry_new(ctx, (char *)local->loc.name,
3799
                                    local->loc.parent->gfid);
3800
        if (dentry == NULL) {
3801
            gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
3802
                   "cannot create a new dentry "
3803
                   "(name:%s) for inode(gfid:%s)",
3804
                   local->loc.name, uuid_utoa(local->loc.inode->gfid));
3805
            op_ret = -1;
3806
            op_errno = ENOMEM;
3807
            goto unlock;
3808
        }
3809
    }
3810
unlock:
3811
    UNLOCK(&ctx->lock);
3812

3813
unwind:
3814
    QUOTA_STACK_UNWIND(mknod, frame, op_ret, op_errno, inode, buf, preparent,
3815
                       postparent, xdata);
3816
    return 0;
3817
}
3818

3819
int
3820
quota_mknod_helper(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
3821
                   dev_t rdev, mode_t umask, dict_t *xdata)
3822
{
3823
    quota_local_t *local = NULL;
3824
    int32_t op_errno = EINVAL;
3825

3826
    local = frame->local;
3827

3828
    GF_VALIDATE_OR_GOTO("quota", local, unwind);
3829

3830
    if (local->op_ret == -1) {
3831
        op_errno = local->op_errno;
3832
        goto unwind;
3833
    }
3834

3835
    STACK_WIND(frame, quota_mknod_cbk, FIRST_CHILD(this),
3836
               FIRST_CHILD(this)->fops->mknod, loc, mode, rdev, umask, xdata);
3837

3838
    return 0;
3839

3840
unwind:
3841
    QUOTA_STACK_UNWIND(mknod, frame, -1, op_errno, NULL, NULL, NULL, NULL,
3842
                       NULL);
3843
    return 0;
3844
}
3845

3846
int
3847
quota_mknod(call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
3848
            dev_t rdev, mode_t umask, dict_t *xdata)
3849
{
3850
    quota_priv_t *priv = NULL;
3851
    int32_t ret = -1;
3852
    quota_local_t *local = NULL;
3853
    call_stub_t *stub = NULL;
3854

3855
    priv = this->private;
3856

3857
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
3858
    QUOTA_WIND_FOR_INTERNAL_FOP(xdata, off);
3859

3860
    local = quota_local_new();
3861
    if (local == NULL) {
3862
        goto err;
3863
    }
3864

3865
    frame->local = local;
3866

3867
    ret = loc_copy(&local->loc, loc);
3868
    if (ret) {
3869
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
3870
               "loc_copy failed");
3871
        goto err;
3872
    }
3873

3874
    stub = fop_mknod_stub(frame, quota_mknod_helper, loc, mode, rdev, umask,
3875
                          xdata);
3876
    if (stub == NULL) {
3877
        goto err;
3878
    }
3879

3880
    LOCK(&local->lock);
3881
    {
3882
        local->link_count = 1;
3883
        local->stub = stub;
3884
        local->delta = 0;
3885
        local->object_delta = 1;
3886
    }
3887
    UNLOCK(&local->lock);
3888

3889
    quota_check_limit(frame, loc->parent, this);
3890
    return 0;
3891

3892
err:
3893
    QUOTA_STACK_UNWIND(mknod, frame, -1, ENOMEM, NULL, NULL, NULL, NULL, NULL);
3894
    return 0;
3895

3896
off:
3897
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->mknod,
3898
                    loc, mode, rdev, umask, xdata);
3899
    return 0;
3900
}
3901

3902
int
3903
quota_setxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
3904
                   int op_ret, int op_errno, dict_t *xdata)
3905
{
3906
    quota_local_t *local = NULL;
3907
    quota_inode_ctx_t *ctx = NULL;
3908
    int ret = 0;
3909

3910
    if (op_ret < 0) {
3911
        goto out;
3912
    }
3913

3914
    local = frame->local;
3915
    if (!local)
3916
        goto out;
3917

3918
    ret = quota_inode_ctx_get(local->loc.inode, this, &ctx, 1);
3919
    if ((ret < 0) || (ctx == NULL)) {
3920
        op_errno = -1;
3921
        goto out;
3922
    }
3923

3924
    LOCK(&ctx->lock);
3925
    {
3926
        ctx->hard_lim = local->limit.hl;
3927
        ctx->soft_lim = local->limit.sl;
3928
        ctx->object_hard_lim = local->object_limit.hl;
3929
        ctx->object_soft_lim = local->object_limit.sl;
3930
    }
3931
    UNLOCK(&ctx->lock);
3932

3933
out:
3934
    QUOTA_STACK_UNWIND(setxattr, frame, op_ret, op_errno, xdata);
3935
    return 0;
3936
}
3937

3938
int
3939
quota_setxattr(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *dict,
3940
               int flags, dict_t *xdata)
3941
{
3942
    quota_priv_t *priv = NULL;
3943
    int op_errno = EINVAL;
3944
    int op_ret = -1;
3945
    int64_t hard_lim = -1;
3946
    int64_t soft_lim = -1;
3947
    int64_t object_hard_limit = -1;
3948
    int64_t object_soft_limit = -1;
3949
    quota_local_t *local = NULL;
3950
    gf_boolean_t internal_fop = _gf_false;
3951

3952
    priv = this->private;
3953

3954
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
3955

3956
    VALIDATE_OR_GOTO(frame, err);
3957
    VALIDATE_OR_GOTO(this, err);
3958
    VALIDATE_OR_GOTO(loc, err);
3959

3960
    if (xdata && dict_get_sizen(xdata, GLUSTERFS_INTERNAL_FOP_KEY))
3961
        internal_fop = _gf_true;
3962

3963
    if (frame->root->pid >= 0 && internal_fop == _gf_false) {
3964
        GF_IF_INTERNAL_XATTR_GOTO("trusted.glusterfs.quota*", dict, op_errno,
3965
                                  err);
3966
        GF_IF_INTERNAL_XATTR_GOTO("trusted.pgfid*", dict, op_errno, err);
3967
    }
3968

3969
    quota_get_limits(this, dict, &hard_lim, &soft_lim, &object_hard_limit,
3970
                     &object_soft_limit);
3971

3972
    if (hard_lim > 0 || object_hard_limit > 0) {
3973
        local = quota_local_new();
3974
        if (local == NULL) {
3975
            op_errno = ENOMEM;
3976
            goto err;
3977
        }
3978
        frame->local = local;
3979
        loc_copy(&local->loc, loc);
3980
    }
3981

3982
    if (hard_lim > 0) {
3983
        local->limit.hl = hard_lim;
3984
        local->limit.sl = soft_lim;
3985
    }
3986

3987
    if (object_hard_limit > 0) {
3988
        local->object_limit.hl = object_hard_limit;
3989
        local->object_limit.sl = object_soft_limit;
3990
    }
3991

3992
    STACK_WIND(frame, quota_setxattr_cbk, FIRST_CHILD(this),
3993
               FIRST_CHILD(this)->fops->setxattr, loc, dict, flags, xdata);
3994
    return 0;
3995
err:
3996
    QUOTA_STACK_UNWIND(setxattr, frame, op_ret, op_errno, NULL);
3997
    return 0;
3998

3999
off:
4000
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->setxattr,
4001
                    loc, dict, flags, xdata);
4002
    return 0;
4003
}
4004

4005
int
4006
quota_fsetxattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
4007
                    int op_ret, int op_errno, dict_t *xdata)
4008
{
4009
    quota_inode_ctx_t *ctx = NULL;
4010
    quota_local_t *local = NULL;
4011

4012
    if (op_ret < 0)
4013
        goto out;
4014

4015
    local = frame->local;
4016
    if (!local)
4017
        goto out;
4018

4019
    op_ret = quota_inode_ctx_get(local->loc.inode, this, &ctx, 1);
4020
    if ((op_ret < 0) || (ctx == NULL)) {
4021
        op_errno = ENOMEM;
4022
        goto out;
4023
    }
4024

4025
    LOCK(&ctx->lock);
4026
    {
4027
        ctx->hard_lim = local->limit.hl;
4028
        ctx->soft_lim = local->limit.sl;
4029
        ctx->object_hard_lim = local->object_limit.hl;
4030
        ctx->object_soft_lim = local->object_limit.sl;
4031
    }
4032
    UNLOCK(&ctx->lock);
4033

4034
out:
4035
    QUOTA_STACK_UNWIND(fsetxattr, frame, op_ret, op_errno, xdata);
4036
    return 0;
4037
}
4038

4039
int
4040
quota_fsetxattr(call_frame_t *frame, xlator_t *this, fd_t *fd, dict_t *dict,
4041
                int flags, dict_t *xdata)
4042
{
4043
    quota_priv_t *priv = NULL;
4044
    int32_t op_ret = -1;
4045
    int32_t op_errno = EINVAL;
4046
    quota_local_t *local = NULL;
4047
    int64_t hard_lim = -1;
4048
    int64_t soft_lim = -1;
4049
    int64_t object_hard_limit = -1;
4050
    int64_t object_soft_limit = -1;
4051

4052
    priv = this->private;
4053

4054
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
4055

4056
    VALIDATE_OR_GOTO(frame, err);
4057
    VALIDATE_OR_GOTO(this, err);
4058
    VALIDATE_OR_GOTO(fd, err);
4059

4060
    if (0 <= frame->root->pid) {
4061
        GF_IF_INTERNAL_XATTR_GOTO("trusted.glusterfs.quota*", dict, op_errno,
4062
                                  err);
4063
        GF_IF_INTERNAL_XATTR_GOTO("trusted.pgfid*", dict, op_errno, err);
4064
    }
4065

4066
    quota_get_limits(this, dict, &hard_lim, &soft_lim, &object_hard_limit,
4067
                     &object_soft_limit);
4068

4069
    if (hard_lim > 0 || object_hard_limit > 0) {
4070
        local = quota_local_new();
4071
        if (local == NULL) {
4072
            op_errno = ENOMEM;
4073
            goto err;
4074
        }
4075
        frame->local = local;
4076
        local->loc.inode = inode_ref(fd->inode);
4077
    }
4078

4079
    if (hard_lim > 0) {
4080
        local->limit.hl = hard_lim;
4081
        local->limit.sl = soft_lim;
4082
    }
4083

4084
    if (object_hard_limit > 0) {
4085
        local->object_limit.hl = object_hard_limit;
4086
        local->object_limit.sl = object_soft_limit;
4087
    }
4088

4089
    STACK_WIND(frame, quota_fsetxattr_cbk, FIRST_CHILD(this),
4090
               FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata);
4091
    return 0;
4092
err:
4093
    QUOTA_STACK_UNWIND(fsetxattr, frame, op_ret, op_errno, NULL);
4094
    return 0;
4095

4096
off:
4097
    STACK_WIND_TAIL(frame, FIRST_CHILD(this),
4098
                    FIRST_CHILD(this)->fops->fsetxattr, fd, dict, flags, xdata);
4099
    return 0;
4100
}
4101

4102
int
4103
quota_removexattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
4104
                      int32_t op_ret, int32_t op_errno, dict_t *xdata)
4105
{
4106
    QUOTA_STACK_UNWIND(removexattr, frame, op_ret, op_errno, xdata);
4107
    return 0;
4108
}
4109

4110
int
4111
quota_removexattr(call_frame_t *frame, xlator_t *this, loc_t *loc,
4112
                  const char *name, dict_t *xdata)
4113
{
4114
    quota_priv_t *priv = NULL;
4115
    int32_t op_errno = EINVAL;
4116

4117
    priv = this->private;
4118

4119
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
4120

4121
    VALIDATE_OR_GOTO(this, err);
4122

4123
    /* all quota xattrs can be cleaned up by doing setxattr on special key.
4124
     * Hence its ok that we don't allow removexattr on quota keys here.
4125
     */
4126
    if (frame->root->pid >= 0) {
4127
        GF_IF_NATIVE_XATTR_GOTO("trusted.glusterfs.quota*", name, op_errno,
4128
                                err);
4129
        GF_IF_NATIVE_XATTR_GOTO("trusted.pgfid*", name, op_errno, err);
4130
    }
4131

4132
    VALIDATE_OR_GOTO(frame, err);
4133
    VALIDATE_OR_GOTO(loc, err);
4134

4135
    STACK_WIND(frame, quota_removexattr_cbk, FIRST_CHILD(this),
4136
               FIRST_CHILD(this)->fops->removexattr, loc, name, xdata);
4137
    return 0;
4138

4139
err:
4140
    QUOTA_STACK_UNWIND(removexattr, frame, -1, op_errno, NULL);
4141
    return 0;
4142

4143
off:
4144
    STACK_WIND_TAIL(frame, FIRST_CHILD(this),
4145
                    FIRST_CHILD(this)->fops->removexattr, loc, name, xdata);
4146
    return 0;
4147
}
4148

4149
int
4150
quota_fremovexattr_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
4151
                       int32_t op_ret, int32_t op_errno, dict_t *xdata)
4152
{
4153
    QUOTA_STACK_UNWIND(fremovexattr, frame, op_ret, op_errno, xdata);
4154
    return 0;
4155
}
4156

4157
int
4158
quota_fremovexattr(call_frame_t *frame, xlator_t *this, fd_t *fd,
4159
                   const char *name, dict_t *xdata)
4160
{
4161
    quota_priv_t *priv = NULL;
4162
    int32_t op_ret = -1;
4163
    int32_t op_errno = EINVAL;
4164

4165
    priv = this->private;
4166

4167
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
4168

4169
    VALIDATE_OR_GOTO(frame, err);
4170
    VALIDATE_OR_GOTO(this, err);
4171
    VALIDATE_OR_GOTO(fd, err);
4172

4173
    if (frame->root->pid >= 0) {
4174
        GF_IF_NATIVE_XATTR_GOTO("trusted.glusterfs.quota*", name, op_errno,
4175
                                err);
4176
        GF_IF_NATIVE_XATTR_GOTO("trusted.pgfid*", name, op_errno, err);
4177
    }
4178
    STACK_WIND(frame, quota_fremovexattr_cbk, FIRST_CHILD(this),
4179
               FIRST_CHILD(this)->fops->fremovexattr, fd, name, xdata);
4180
    return 0;
4181
err:
4182
    QUOTA_STACK_UNWIND(fremovexattr, frame, op_ret, op_errno, NULL);
4183
    return 0;
4184

4185
off:
4186
    STACK_WIND_TAIL(frame, FIRST_CHILD(this),
4187
                    FIRST_CHILD(this)->fops->fremovexattr, fd, name, xdata);
4188
    return 0;
4189
}
4190

4191
int32_t
4192
quota_statfs_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
4193
                 int32_t op_ret, int32_t op_errno, struct statvfs *buf,
4194
                 dict_t *xdata)
4195
{
4196
    inode_t *inode = NULL;
4197
    uint64_t value = 0;
4198
    int64_t usage = -1;
4199
    int64_t avail = -1;
4200
    int64_t blocks = 0;
4201
    quota_inode_ctx_t *ctx = NULL;
4202
    int ret = 0;
4203

4204
    inode = cookie;
4205

4206
    /* This fop will fail mostly in case of client disconnect,
4207
     * which is already logged. Hence, not logging here */
4208
    if (op_ret == -1)
4209
        goto unwind;
4210
    /*
4211
     * We should never get here unless quota_statfs (below) sent us a
4212
     * cookie, and it would only do so if the value was non-NULL.  This
4213
     * check is therefore just routine defensive coding.
4214
     */
4215

4216
    GF_VALIDATE_OR_GOTO("quota", inode, unwind);
4217

4218
    inode_ctx_get(inode, this, &value);
4219
    ctx = (quota_inode_ctx_t *)(unsigned long)value;
4220
    if (!ctx || ctx->hard_lim <= 0)
4221
        goto unwind;
4222

4223
    { /* statfs is adjusted in this code block */
4224
        usage = (ctx->size) / buf->f_bsize;
4225

4226
        blocks = ctx->hard_lim / buf->f_bsize;
4227
        buf->f_blocks = blocks;
4228

4229
        avail = buf->f_blocks - usage;
4230
        avail = max(avail, 0);
4231

4232
        buf->f_bfree = avail;
4233
        /*
4234
         * We have to assume that the total assigned quota
4235
         * won't cause us to dip into the reserved space,
4236
         * because dealing with the overcommitted cases is
4237
         * just too hairy (especially when different bricks
4238
         * might be using different reserved percentages and
4239
         * such).
4240
         */
4241
        buf->f_bavail = buf->f_bfree;
4242
    }
4243

4244
    xdata = xdata ? dict_ref(xdata) : dict_new();
4245
    if (!xdata)
4246
        goto unwind;
4247

4248
    ret = dict_set_int8(xdata, "quota-deem-statfs", 1);
4249
    if (-1 == ret)
4250
        gf_msg(this->name, GF_LOG_ERROR, ENOMEM, Q_MSG_ENOMEM,
4251
               "Dict set failed, deem-statfs option may "
4252
               "have no effect");
4253

4254
unwind:
4255
    QUOTA_STACK_UNWIND(statfs, frame, op_ret, op_errno, buf, xdata);
4256

4257
    if (xdata)
4258
        dict_unref(xdata);
4259

4260
    return 0;
4261
}
4262

4263
int32_t
4264
quota_statfs_helper(call_frame_t *frame, xlator_t *this, loc_t *loc,
4265
                    dict_t *xdata)
4266
{
4267
    quota_local_t *local = frame->local;
4268
    int op_errno = EINVAL;
4269

4270
    GF_VALIDATE_OR_GOTO("quota", local, err);
4271

4272
    if (-1 == local->op_ret) {
4273
        op_errno = local->op_errno;
4274
        goto err;
4275
    }
4276

4277
    STACK_WIND_COOKIE(frame, quota_statfs_cbk, local->inode, FIRST_CHILD(this),
4278
                      FIRST_CHILD(this)->fops->statfs, loc, xdata);
4279
    return 0;
4280
err:
4281
    QUOTA_STACK_UNWIND(statfs, frame, -1, op_errno, NULL, NULL);
4282

4283
    return 0;
4284
}
4285

4286
int32_t
4287
quota_statfs_validate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
4288
                          int32_t op_ret, int32_t op_errno, inode_t *inode,
4289
                          struct iatt *buf, dict_t *xdata,
4290
                          struct iatt *postparent)
4291
{
4292
    quota_local_t *local = NULL;
4293
    int32_t ret = 0;
4294
    quota_inode_ctx_t *ctx = NULL;
4295
    uint64_t value = 0;
4296
    quota_meta_t size = {
4297
        0,
4298
    };
4299

4300
    local = frame->local;
4301

4302
    if (op_ret < 0)
4303
        goto resume;
4304

4305
    GF_ASSERT(local);
4306
    GF_ASSERT(frame);
4307
    GF_VALIDATE_OR_GOTO_WITH_ERROR("quota", this, resume, op_errno, EINVAL);
4308
    GF_VALIDATE_OR_GOTO_WITH_ERROR(this->name, xdata, resume, op_errno, EINVAL);
4309

4310
    ret = inode_ctx_get(local->validate_loc.inode, this, &value);
4311

4312
    ctx = (quota_inode_ctx_t *)(unsigned long)value;
4313
    if ((ret == -1) || (ctx == NULL)) {
4314
        gf_msg(this->name, GF_LOG_WARNING, EINVAL, Q_MSG_INODE_CTX_GET_FAILED,
4315
               "quota context is not present in inode (gfid:%s)",
4316
               uuid_utoa(local->validate_loc.inode->gfid));
4317
        op_errno = EINVAL;
4318
        goto resume;
4319
    }
4320

4321
    ret = quota_dict_get_meta(xdata, QUOTA_SIZE_KEY, SLEN(QUOTA_SIZE_KEY),
4322
                              &size);
4323
    if (ret == -1) {
4324
        gf_msg(this->name, GF_LOG_WARNING, EINVAL, Q_MSG_SIZE_KEY_MISSING,
4325
               "size key not present in "
4326
               "dict");
4327
        op_errno = EINVAL;
4328
    }
4329

4330
    LOCK(&ctx->lock);
4331
    {
4332
        ctx->size = size.size;
4333
        ctx->validate_time = gf_time();
4334
        ctx->file_count = size.file_count;
4335
        ctx->dir_count = size.dir_count;
4336
    }
4337
    UNLOCK(&ctx->lock);
4338

4339
resume:
4340
    local->op_errno = op_errno;
4341
    quota_link_count_decrement(frame);
4342
    return 0;
4343
}
4344

4345
void
4346
quota_get_limit_dir_continuation(struct list_head *parents, inode_t *inode,
4347
                                 int32_t op_ret, int32_t op_errno, void *data)
4348
{
4349
    call_frame_t *frame = NULL;
4350
    xlator_t *this = NULL;
4351
    quota_dentry_t *entry = NULL;
4352
    inode_t *parent = NULL;
4353

4354
    frame = data;
4355
    this = THIS;
4356

4357
    if ((op_ret < 0) || list_empty(parents)) {
4358
        if (op_ret >= 0) {
4359
            gf_msg(this->name, GF_LOG_WARNING, EIO, Q_MSG_ANCESTRY_BUILD_FAILED,
4360
                   "Couldn't build ancestry for inode (gfid:%s). "
4361
                   "Without knowing ancestors till root, quota "
4362
                   "cannot be enforced. "
4363
                   "Hence, failing fop with EIO",
4364
                   uuid_utoa(inode->gfid));
4365
            op_errno = EIO;
4366
        }
4367

4368
        quota_handle_validate_error(frame, -1, op_errno);
4369
        goto out;
4370
    }
4371

4372
    entry = list_first_entry(parents, quota_dentry_t, next);
4373
    parent = inode_find(inode->table, entry->par);
4374

4375
    quota_get_limit_dir(frame, parent, this);
4376

4377
    inode_unref(parent);
4378
out:
4379
    return;
4380
}
4381

4382
void
4383
quota_statfs_continue(call_frame_t *frame, xlator_t *this, inode_t *inode)
4384
{
4385
    quota_local_t *local = frame->local;
4386
    int ret = -1;
4387

4388
    LOCK(&local->lock);
4389
    {
4390
        local->inode = inode_ref(inode);
4391
    }
4392
    UNLOCK(&local->lock);
4393

4394
    ret = quota_validate(frame, local->inode, this, quota_statfs_validate_cbk);
4395
    if (0 > ret)
4396
        quota_handle_validate_error(frame, -1, -ret);
4397
}
4398

4399
void
4400
quota_get_limit_dir(call_frame_t *frame, inode_t *cur_inode, xlator_t *this)
4401
{
4402
    inode_t *inode = NULL;
4403
    inode_t *parent = NULL;
4404
    uint64_t value = 0;
4405
    quota_inode_ctx_t *ctx = NULL;
4406
    quota_local_t *local = frame->local;
4407

4408
    if (!cur_inode)
4409
        goto out;
4410

4411
    inode = inode_ref(cur_inode);
4412
    while (inode) {
4413
        value = 0;
4414
        inode_ctx_get(inode, this, &value);
4415

4416
        if (value) {
4417
            ctx = (quota_inode_ctx_t *)(unsigned long)value;
4418
            if (ctx->hard_lim > 0)
4419
                break;
4420
        }
4421

4422
        if (__is_root_gfid(inode->gfid))
4423
            goto off;
4424

4425
        parent = inode_parent(inode, 0, NULL);
4426
        if (!parent) {
4427
            (void)quota_build_ancestry(inode, quota_get_limit_dir_continuation,
4428
                                       frame);
4429
            goto out;
4430
        }
4431

4432
        inode_unref(inode);
4433
        inode = parent;
4434
    }
4435

4436
    quota_statfs_continue(frame, this, inode);
4437
    inode_unref(inode);
4438
    return;
4439

4440
off:
4441
    gf_msg_debug(this->name, 0, "No limit set on the inode or it's parents.");
4442

4443
    QUOTA_STACK_WIND_TAIL(frame, FIRST_CHILD(this),
4444
                          FIRST_CHILD(this)->fops->statfs, &local->loc,
4445
                          local->xdata);
4446
out:
4447
    inode_unref(inode);
4448

4449
    return;
4450
}
4451

4452
int32_t
4453
quota_statfs(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
4454
{
4455
    int op_errno = 0;
4456
    int ret = -1;
4457
    int8_t ignore_deem_statfs = 0;
4458
    quota_priv_t *priv = NULL;
4459
    quota_local_t *local = NULL;
4460
    call_stub_t *stub = NULL;
4461

4462
    priv = this->private;
4463
    GF_ASSERT(loc);
4464

4465
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
4466

4467
    ret = dict_get_int8(xdata, GF_INTERNAL_IGNORE_DEEM_STATFS,
4468
                        &ignore_deem_statfs);
4469
    ret = 0;
4470

4471
    if (ignore_deem_statfs)
4472
        goto off;
4473

4474
    if (priv->consider_statfs && loc->inode) {
4475
        local = quota_local_new();
4476
        if (!local) {
4477
            op_errno = ENOMEM;
4478
            goto err;
4479
        }
4480
        frame->local = local;
4481

4482
        ret = loc_copy(&local->loc, loc);
4483
        if (-1 == ret) {
4484
            op_errno = ENOMEM;
4485
            goto err;
4486
        }
4487

4488
        if (xdata)
4489
            local->xdata = dict_ref(xdata);
4490

4491
        stub = fop_statfs_stub(frame, quota_statfs_helper, &local->loc,
4492
                               local->xdata);
4493
        if (!stub) {
4494
            op_errno = ENOMEM;
4495
            goto err;
4496
        }
4497

4498
        LOCK(&local->lock);
4499
        {
4500
            local->link_count = 1;
4501
            local->stub = stub;
4502
        }
4503
        UNLOCK(&local->lock);
4504

4505
        quota_get_limit_dir(frame, loc->inode, this);
4506

4507
        return 0;
4508
    }
4509

4510
    /*
4511
     * We have to make sure that we never get to quota_statfs_cbk
4512
     * with a cookie that points to something other than an inode,
4513
     * which is exactly what would happen with STACK_UNWIND using
4514
     * that as a callback.  Therefore, use default_statfs_cbk in
4515
     * this case instead.
4516
     *
4517
     * Also if the option deem-statfs is not set to "on" don't
4518
     * bother calculating quota limit on / in statfs_cbk.
4519
     */
4520
    if (priv->consider_statfs)
4521
        gf_log(this->name, GF_LOG_ERROR,
4522
               "Missing inode, can't adjust for quota");
4523

4524
off:
4525
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->statfs,
4526
                    loc, xdata);
4527
    return 0;
4528

4529
err:
4530
    QUOTA_STACK_UNWIND(statfs, frame, -1, op_errno, NULL, NULL);
4531

4532
    return 0;
4533
}
4534

4535
int
4536
quota_readdirp_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
4537
                   int op_ret, int op_errno, gf_dirent_t *entries,
4538
                   dict_t *xdata)
4539
{
4540
    gf_dirent_t *entry = NULL;
4541
    quota_local_t *local = NULL;
4542
    loc_t loc = {
4543
        0,
4544
    };
4545

4546
    if (op_ret <= 0)
4547
        goto unwind;
4548

4549
    local = frame->local;
4550

4551
    list_for_each_entry(entry, &entries->list, list)
4552
    {
4553
        /* skip . and .. */
4554
        if (entry->inode == NULL || inode_dir_or_parentdir(entry))
4555
            continue;
4556

4557
        gf_uuid_copy(loc.gfid, entry->d_stat.ia_gfid);
4558
        loc.inode = inode_ref(entry->inode);
4559
        loc.parent = inode_ref(local->loc.inode);
4560
        gf_uuid_copy(loc.pargfid, loc.parent->gfid);
4561
        loc.name = entry->d_name;
4562

4563
        quota_fill_inodectx(this, entry->inode, entry->dict, &loc,
4564
                            &entry->d_stat, &op_errno);
4565

4566
        loc_wipe(&loc);
4567
    }
4568

4569
unwind:
4570
    QUOTA_STACK_UNWIND(readdirp, frame, op_ret, op_errno, entries, xdata);
4571

4572
    return 0;
4573
}
4574

4575
int
4576
quota_readdirp(call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
4577
               off_t offset, dict_t *dict)
4578
{
4579
    quota_priv_t *priv = NULL;
4580
    int ret = 0;
4581
    gf_boolean_t new_dict = _gf_false;
4582
    quota_local_t *local = NULL;
4583

4584
    priv = this->private;
4585

4586
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
4587

4588
    local = quota_local_new();
4589

4590
    if (local == NULL) {
4591
        goto err;
4592
    }
4593

4594
    frame->local = local;
4595

4596
    local->loc.inode = inode_ref(fd->inode);
4597

4598
    if (dict == NULL) {
4599
        dict = dict_new();
4600
        new_dict = _gf_true;
4601
    }
4602

4603
    if (dict) {
4604
        ret = dict_set_int8(dict, QUOTA_LIMIT_KEY, 1);
4605
        if (ret < 0) {
4606
            gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
4607
                   "dict set of key for hard-limit");
4608
            goto err;
4609
        }
4610
    }
4611

4612
    if (dict) {
4613
        ret = dict_set_int8(dict, QUOTA_LIMIT_OBJECTS_KEY, 1);
4614
        if (ret < 0) {
4615
            gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
4616
                   "dict set of key for hard-limit "
4617
                   "failed");
4618
            goto err;
4619
        }
4620
    }
4621

4622
    STACK_WIND(frame, quota_readdirp_cbk, FIRST_CHILD(this),
4623
               FIRST_CHILD(this)->fops->readdirp, fd, size, offset, dict);
4624

4625
    if (new_dict) {
4626
        dict_unref(dict);
4627
    }
4628

4629
    return 0;
4630
err:
4631
    STACK_UNWIND_STRICT(readdirp, frame, -1, EINVAL, NULL, NULL);
4632

4633
    if (new_dict) {
4634
        dict_unref(dict);
4635
    }
4636

4637
    return 0;
4638

4639
off:
4640
    STACK_WIND_TAIL(frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->readdirp,
4641
                    fd, size, offset, dict);
4642
    return 0;
4643
}
4644

4645
int32_t
4646
quota_fallocate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
4647
                    int32_t op_ret, int32_t op_errno, struct iatt *prebuf,
4648
                    struct iatt *postbuf, dict_t *xdata)
4649
{
4650
    int32_t ret = 0;
4651
    uint64_t ctx_int = 0;
4652
    quota_inode_ctx_t *ctx = NULL;
4653
    quota_local_t *local = NULL;
4654

4655
    local = frame->local;
4656

4657
    if ((op_ret < 0) || (local == NULL)) {
4658
        goto out;
4659
    }
4660

4661
    ret = inode_ctx_get(local->loc.inode, this, &ctx_int);
4662
    if (ret) {
4663
        gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_INODE_CTX_GET_FAILED,
4664
               "%s: failed to get the context", local->loc.path);
4665
        goto out;
4666
    }
4667

4668
    ctx = (quota_inode_ctx_t *)(unsigned long)ctx_int;
4669

4670
    if (ctx == NULL) {
4671
        gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_INODE_CTX_GET_FAILED,
4672
               "quota context not set in %s (gfid:%s)", local->loc.path,
4673
               uuid_utoa(local->loc.inode->gfid));
4674
        goto out;
4675
    }
4676

4677
    LOCK(&ctx->lock);
4678
    {
4679
        ctx->buf = *postbuf;
4680
    }
4681
    UNLOCK(&ctx->lock);
4682

4683
out:
4684
    QUOTA_STACK_UNWIND(fallocate, frame, op_ret, op_errno, prebuf, postbuf,
4685
                       xdata);
4686

4687
    return 0;
4688
}
4689

4690
int32_t
4691
quota_fallocate_helper(call_frame_t *frame, xlator_t *this, fd_t *fd,
4692
                       int32_t mode, off_t offset, size_t len, dict_t *xdata)
4693
{
4694
    quota_local_t *local = NULL;
4695
    int32_t op_errno = EINVAL;
4696

4697
    local = frame->local;
4698

4699
    GF_VALIDATE_OR_GOTO("quota", local, unwind);
4700

4701
    if (local->op_ret == -1) {
4702
        op_errno = local->op_errno;
4703
        if (op_errno == ENOENT || op_errno == ESTALE) {
4704
            /* We may get ENOENT/ESTALE in case of below scenario
4705
             *     fd = open file.txt
4706
             *     unlink file.txt
4707
             *     fallocate on fd
4708
             * Here build_ancestry can fail as the file is removed.
4709
             * For now ignore ENOENT/ESTALE on active fd
4710
             * We need to re-visit this code once we understand
4711
             * how other file-system behave in this scenario
4712
             */
4713
            gf_msg_debug(this->name, 0,
4714
                         "quota enforcer failed "
4715
                         "with ENOENT/ESTALE on %s, cannot check "
4716
                         "quota limits and allowing fallocate",
4717
                         uuid_utoa(fd->inode->gfid));
4718
        } else {
4719
            goto unwind;
4720
        }
4721
    }
4722

4723
    STACK_WIND(frame, quota_fallocate_cbk, FIRST_CHILD(this),
4724
               FIRST_CHILD(this)->fops->fallocate, fd, mode, offset, len,
4725
               xdata);
4726
    return 0;
4727

4728
unwind:
4729
    QUOTA_STACK_UNWIND(fallocate, frame, -1, op_errno, NULL, NULL, NULL);
4730
    return 0;
4731
}
4732

4733
int32_t
4734
quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,
4735
                off_t offset, size_t len, dict_t *xdata)
4736
{
4737
    int32_t op_errno = EINVAL;
4738
    int32_t parents = 0;
4739
    int32_t fail_count = 0;
4740
    quota_local_t *local = NULL;
4741
    quota_inode_ctx_t *ctx = NULL;
4742
    quota_priv_t *priv = NULL;
4743
    quota_dentry_t *dentry = NULL;
4744
    quota_dentry_t *tmp = NULL;
4745
    call_stub_t *stub = NULL;
4746
    struct list_head head = {
4747
        0,
4748
    };
4749
    inode_t *par_inode = NULL;
4750

4751
    priv = this->private;
4752
    GF_VALIDATE_OR_GOTO(this->name, priv, unwind);
4753

4754
    WIND_IF_QUOTAOFF(priv->is_quota_on, off);
4755

4756
    INIT_LIST_HEAD(&head);
4757

4758
    GF_ASSERT(frame);
4759
    GF_VALIDATE_OR_GOTO("quota", this, unwind);
4760
    GF_VALIDATE_OR_GOTO(this->name, fd, unwind);
4761

4762
    local = quota_local_new();
4763
    if (local == NULL) {
4764
        goto unwind;
4765
    }
4766

4767
    frame->local = local;
4768
    local->loc.inode = inode_ref(fd->inode);
4769

4770
    (void)quota_inode_ctx_get(fd->inode, this, &ctx, 0);
4771
    if (ctx == NULL) {
4772
        gf_msg_debug(this->name, 0,
4773
                     "quota context is NULL on inode"
4774
                     " (%s). If quota is not enabled recently and "
4775
                     "crawler has finished crawling, its an error",
4776
                     uuid_utoa(local->loc.inode->gfid));
4777
    }
4778

4779
    stub = fop_fallocate_stub(frame, quota_fallocate_helper, fd, mode, offset,
4780
                              len, xdata);
4781
    if (stub == NULL) {
4782
        op_errno = ENOMEM;
4783
        goto unwind;
4784
    }
4785

4786
    priv = this->private;
4787
    GF_VALIDATE_OR_GOTO(this->name, priv, unwind);
4788

4789
    parents = quota_add_parents_from_ctx(ctx, &head);
4790
    if (parents == -1) {
4791
        op_errno = errno;
4792
        goto unwind;
4793
    }
4794

4795
    /*
4796
     * Note that by using len as the delta we're assuming the range from
4797
     * offset to offset+len has not already been allocated. This can result
4798
     * in ENOSPC errors attempting to allocate an already allocated range.
4799
     */
4800
    local->delta = len;
4801
    local->object_delta = 0;
4802
    local->stub = stub;
4803
    local->link_count = parents;
4804

4805
    if (parents == 0) {
4806
        local->link_count = 1;
4807
        quota_check_limit(frame, fd->inode, this);
4808
    } else {
4809
        list_for_each_entry_safe(dentry, tmp, &head, next)
4810
        {
4811
            par_inode = do_quota_check_limit(frame, fd->inode, this, dentry,
4812
                                             _gf_false);
4813
            if (par_inode == NULL) {
4814
                /* remove stale entry from inode_ctx */
4815
                quota_dentry_del(ctx, dentry->name, dentry->par);
4816
                parents--;
4817
                fail_count++;
4818
            } else {
4819
                inode_unref(par_inode);
4820
            }
4821
            __quota_dentry_free(dentry);
4822
        }
4823

4824
        if (parents == 0) {
4825
            LOCK(&local->lock);
4826
            {
4827
                local->link_count++;
4828
            }
4829
            UNLOCK(&local->lock);
4830
            quota_check_limit(frame, fd->inode, this);
4831
        }
4832

4833
        while (fail_count != 0) {
4834
            quota_link_count_decrement(frame);
4835
            fail_count--;
4836
        }
4837
    }
4838

4839
    return 0;
4840

4841
unwind:
4842
    QUOTA_STACK_UNWIND(fallocate, frame, -1, op_errno, NULL, NULL, NULL);
4843
    return 0;
4844

4845
off:
4846
    STACK_WIND_TAIL(frame, FIRST_CHILD(this),
4847
                    FIRST_CHILD(this)->fops->fallocate, fd, mode, offset, len,
4848
                    xdata);
4849
    return 0;
4850
}
4851

4852
void
4853
quota_log_helper(char **usage_str, int64_t cur_size, inode_t *inode,
4854
                 char **path, time_t *cur_time)
4855
{
4856
    xlator_t *this = THIS;
4857

4858
    if (!usage_str || !inode || !path || !cur_time) {
4859
        gf_log(this->name, GF_LOG_ERROR, "Received null argument");
4860
        return;
4861
    }
4862

4863
    *usage_str = gf_uint64_2human_readable(cur_size);
4864
    if (!(*usage_str))
4865
        gf_msg(this->name, GF_LOG_ERROR, ENOMEM, Q_MSG_ENOMEM,
4866
               "integer to string conversion failed Reason"
4867
               ":\"Cannot allocate memory\"");
4868

4869
    inode_path(inode, NULL, path);
4870
    if (!(*path))
4871
        *path = uuid_utoa(inode->gfid);
4872

4873
    *cur_time = gf_time();
4874
}
4875

4876
/* Logs if
4877
 *  i.   Usage crossed soft limit
4878
 *  ii.  Usage above soft limit and alert-time elapsed
4879
 */
4880
void
4881
quota_log_usage(xlator_t *this, quota_inode_ctx_t *ctx, inode_t *inode,
4882
                int64_t delta)
4883
{
4884
    time_t cur_time = 0;
4885
    char *usage_str = NULL;
4886
    char *path = NULL;
4887
    int64_t cur_size = 0;
4888
    quota_priv_t *priv = NULL;
4889

4890
    priv = this->private;
4891
    cur_size = ctx->size + delta;
4892

4893
    if ((ctx->soft_lim <= 0) || cur_size < ctx->soft_lim)
4894
        return;
4895

4896
    /* Usage crossed/reached soft limit */
4897
    if (DID_REACH_LIMIT(ctx->soft_lim, ctx->size, cur_size)) {
4898
        quota_log_helper(&usage_str, cur_size, inode, &path, &cur_time);
4899

4900
        gf_msg(this->name, GF_LOG_ALERT, 0, Q_MSG_CROSSED_SOFT_LIMIT,
4901
               "Usage crossed soft limit: "
4902
               "%s used by %s",
4903
               usage_str, path);
4904

4905
        gf_event(EVENT_QUOTA_CROSSED_SOFT_LIMIT,
4906
                 "Usage=%s;volume=%s;"
4907
                 "path=%s",
4908
                 usage_str, priv->volume_uuid, path);
4909

4910
        ctx->prev_log_time = cur_time;
4911

4912
    }
4913
    /* Usage is above soft limit */
4914
    else if (cur_size > ctx->soft_lim &&
4915
             quota_timeout(ctx->prev_log_time, priv->log_timeout)) {
4916
        quota_log_helper(&usage_str, cur_size, inode, &path, &cur_time);
4917

4918
        gf_msg(this->name, GF_LOG_ALERT, 0, Q_MSG_CROSSED_SOFT_LIMIT,
4919
               "Usage is above soft limit: %s used by %s", usage_str, path);
4920

4921
        gf_event(EVENT_QUOTA_CROSSED_SOFT_LIMIT,
4922
                 "Usage=%s;volume=%s;"
4923
                 "path=%s",
4924
                 usage_str, priv->volume_uuid, path);
4925

4926
        ctx->prev_log_time = cur_time;
4927
    }
4928

4929
    if (path)
4930
        GF_FREE(path);
4931

4932
    if (usage_str)
4933
        GF_FREE(usage_str);
4934
}
4935

4936
int32_t
4937
mem_acct_init(xlator_t *this)
4938
{
4939
    int ret = -1;
4940

4941
    if (!this)
4942
        return ret;
4943

4944
    ret = xlator_mem_acct_init(this, gf_quota_mt_end);
4945

4946
    if (ret != 0) {
4947
        gf_msg(this->name, GF_LOG_WARNING, ENOMEM, Q_MSG_ENOMEM,
4948
               "Memory accounting init failed");
4949
        return ret;
4950
    }
4951

4952
    return ret;
4953
}
4954

4955
int32_t
4956
quota_forget(xlator_t *this, inode_t *inode)
4957
{
4958
    int32_t ret = 0;
4959
    uint64_t ctx_int = 0;
4960
    quota_inode_ctx_t *ctx = NULL;
4961
    quota_dentry_t *dentry = NULL, *tmp;
4962

4963
    ret = inode_ctx_del(inode, this, &ctx_int);
4964

4965
    if (ret < 0) {
4966
        return 0;
4967
    }
4968

4969
    ctx = (quota_inode_ctx_t *)(long)ctx_int;
4970

4971
    LOCK(&ctx->lock);
4972
    {
4973
        list_for_each_entry_safe(dentry, tmp, &ctx->parents, next)
4974
        {
4975
            __quota_dentry_free(dentry);
4976
        }
4977
    }
4978
    UNLOCK(&ctx->lock);
4979

4980
    LOCK_DESTROY(&ctx->lock);
4981

4982
    GF_FREE(ctx);
4983

4984
    return 0;
4985
}
4986

4987
int
4988
notify(xlator_t *this, int event, void *data, ...)
4989
{
4990
    quota_priv_t *priv = NULL;
4991
    int ret = 0;
4992
    rpc_clnt_t *rpc = NULL;
4993
    gf_boolean_t conn_status = _gf_true;
4994
    xlator_t *victim = data;
4995

4996
    priv = this->private;
4997
    if (!priv || !priv->is_quota_on)
4998
        goto out;
4999

5000
    if (event == GF_EVENT_PARENT_DOWN) {
5001
        rpc = priv->rpc_clnt;
5002
        if (rpc) {
5003
            rpc_clnt_disable(rpc);
5004
            pthread_mutex_lock(&priv->conn_mutex);
5005
            {
5006
                conn_status = priv->conn_status;
5007
                while (conn_status) {
5008
                    (void)pthread_cond_wait(&priv->conn_cond,
5009
                                            &priv->conn_mutex);
5010
                    conn_status = priv->conn_status;
5011
                }
5012
            }
5013
            pthread_mutex_unlock(&priv->conn_mutex);
5014
            gf_log(this->name, GF_LOG_INFO,
5015
                   "Notify GF_EVENT_PARENT_DOWN for brick %s", victim->name);
5016
        }
5017
    }
5018

5019
out:
5020
    ret = default_notify(this, event, data);
5021
    return ret;
5022
}
5023

5024
int32_t
5025
init(xlator_t *this)
5026
{
5027
    int32_t ret = -1;
5028
    quota_priv_t *priv = NULL;
5029
    rpc_clnt_t *rpc = NULL;
5030

5031
    if ((this->children == NULL) || this->children->next) {
5032
        gf_msg(this->name, GF_LOG_ERROR, 0, Q_MSG_INVALID_VOLFILE,
5033
               "FATAL: quota (%s) not configured with "
5034
               "exactly one child",
5035
               this->name);
5036
        return -1;
5037
    }
5038

5039
    if (this->parents == NULL) {
5040
        gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_INVALID_VOLFILE,
5041
               "dangling volume. check volfile");
5042
    }
5043

5044
    QUOTA_ALLOC_OR_GOTO(priv, quota_priv_t, err);
5045

5046
    LOCK_INIT(&priv->lock);
5047

5048
    this->private = priv;
5049

5050
    GF_OPTION_INIT("deem-statfs", priv->consider_statfs, bool, err);
5051
    GF_OPTION_INIT("server-quota", priv->is_quota_on, bool, err);
5052
    GF_OPTION_INIT("default-soft-limit", priv->default_soft_lim, percent, err);
5053
    GF_OPTION_INIT("soft-timeout", priv->soft_timeout, time, err);
5054
    GF_OPTION_INIT("hard-timeout", priv->hard_timeout, time, err);
5055
    GF_OPTION_INIT("alert-time", priv->log_timeout, time, err);
5056
    GF_OPTION_INIT("volume-uuid", priv->volume_uuid, str, err);
5057

5058
    this->local_pool = mem_pool_new(quota_local_t, 64);
5059
    if (!this->local_pool) {
5060
        ret = -1;
5061
        gf_msg(this->name, GF_LOG_ERROR, ENOMEM, Q_MSG_ENOMEM,
5062
               "failed to create local_t's memory pool");
5063
        goto err;
5064
    }
5065

5066
    pthread_mutex_init(&priv->conn_mutex, NULL);
5067
    pthread_cond_init(&priv->conn_cond, NULL);
5068
    priv->conn_status = _gf_false;
5069

5070
    if (priv->is_quota_on) {
5071
        rpc = quota_enforcer_init(this, this->options);
5072
        if (rpc == NULL) {
5073
            ret = -1;
5074
            gf_msg(this->name, GF_LOG_WARNING, 0,
5075
                   Q_MSG_QUOTA_ENFORCER_RPC_INIT_FAILED,
5076
                   "quota enforcer rpc init failed");
5077
            goto err;
5078
        }
5079

5080
        LOCK(&priv->lock);
5081
        {
5082
            priv->rpc_clnt = rpc;
5083
        }
5084
        UNLOCK(&priv->lock);
5085
    }
5086

5087
    ret = 0;
5088
err:
5089
    return ret;
5090
}
5091

5092
int
5093
reconfigure(xlator_t *this, dict_t *options)
5094
{
5095
    int32_t ret = -1;
5096
    quota_priv_t *priv = NULL;
5097
    gf_boolean_t quota_on = _gf_false;
5098
    rpc_clnt_t *rpc = NULL;
5099

5100
    priv = this->private;
5101

5102
    GF_OPTION_RECONF("deem-statfs", priv->consider_statfs, options, bool, out);
5103
    GF_OPTION_RECONF("server-quota", quota_on, options, bool, out);
5104
    GF_OPTION_RECONF("default-soft-limit", priv->default_soft_lim, options,
5105
                     percent, out);
5106
    GF_OPTION_RECONF("alert-time", priv->log_timeout, options, time, out);
5107
    GF_OPTION_RECONF("soft-timeout", priv->soft_timeout, options, time, out);
5108
    GF_OPTION_RECONF("hard-timeout", priv->hard_timeout, options, time, out);
5109

5110
    if (quota_on) {
5111
        priv->rpc_clnt = quota_enforcer_init(this, this->options);
5112
        if (priv->rpc_clnt == NULL) {
5113
            ret = -1;
5114
            gf_msg(this->name, GF_LOG_WARNING, 0,
5115
                   Q_MSG_QUOTA_ENFORCER_RPC_INIT_FAILED,
5116
                   "quota enforcer rpc init failed");
5117
            goto out;
5118
        }
5119

5120
    } else {
5121
        LOCK(&priv->lock);
5122
        {
5123
            rpc = priv->rpc_clnt;
5124
            priv->rpc_clnt = NULL;
5125
        }
5126
        UNLOCK(&priv->lock);
5127

5128
        if (rpc != NULL) {
5129
            // Quotad is shutdown when there is no started volume
5130
            // which has quota enabled. So, we should disable the
5131
            // enforcer client when quota is disabled on a volume,
5132
            // to avoid spurious reconnect attempts to a service
5133
            // (quotad), that is known to be down.
5134
            rpc_clnt_unref(rpc);
5135
        }
5136
    }
5137

5138
    priv->is_quota_on = quota_on;
5139

5140
    ret = 0;
5141
out:
5142
    return ret;
5143
}
5144

5145
int32_t
5146
quota_priv_dump(xlator_t *this)
5147
{
5148
    quota_priv_t *priv = NULL;
5149
    int32_t ret = -1;
5150

5151
    GF_ASSERT(this);
5152

5153
    priv = this->private;
5154
    if (!priv)
5155
        goto out;
5156

5157
    gf_proc_dump_add_section("xlators.features.quota.priv");
5158

5159
    ret = TRY_LOCK(&priv->lock);
5160
    if (ret)
5161
        goto out;
5162
    else {
5163
        gf_proc_dump_write("soft-timeout", "%ld", priv->soft_timeout);
5164
        gf_proc_dump_write("hard-timeout", "%ld", priv->hard_timeout);
5165
        gf_proc_dump_write("alert-time", "%ld", priv->log_timeout);
5166
        gf_proc_dump_write("quota-on", "%d", priv->is_quota_on);
5167
        gf_proc_dump_write("statfs", "%d", priv->consider_statfs);
5168
        gf_proc_dump_write("volume-uuid", "%s", priv->volume_uuid);
5169
        gf_proc_dump_write("validation-count", "%" PRIu64,
5170
                           priv->validation_count);
5171
    }
5172
    UNLOCK(&priv->lock);
5173

5174
out:
5175
    return 0;
5176
}
5177

5178
void
5179
fini(xlator_t *this)
5180
{
5181
    quota_priv_t *priv = NULL;
5182
    rpc_clnt_t *rpc = NULL;
5183

5184
    priv = this->private;
5185
    if (!priv)
5186
        return;
5187
    rpc = priv->rpc_clnt;
5188
    priv->rpc_clnt = NULL;
5189
    if (rpc) {
5190
        rpc_clnt_connection_cleanup(&rpc->conn);
5191
        rpc_clnt_unref(rpc);
5192
    }
5193

5194
    this->private = NULL;
5195
    LOCK_DESTROY(&priv->lock);
5196
    pthread_mutex_destroy(&priv->conn_mutex);
5197
    pthread_cond_destroy(&priv->conn_cond);
5198

5199
    GF_FREE(priv);
5200
    if (this->local_pool) {
5201
        mem_pool_destroy(this->local_pool);
5202
        this->local_pool = NULL;
5203
    }
5204
    return;
5205
}
5206

5207
struct xlator_fops fops = {
5208
    .statfs = quota_statfs,
5209
    .lookup = quota_lookup,
5210
    .writev = quota_writev,
5211
    .create = quota_create,
5212
    .mkdir = quota_mkdir,
5213
    .truncate = quota_truncate,
5214
    .ftruncate = quota_ftruncate,
5215
    .unlink = quota_unlink,
5216
    .symlink = quota_symlink,
5217
    .link = quota_link,
5218
    .rename = quota_rename,
5219
    .getxattr = quota_getxattr,
5220
    .fgetxattr = quota_fgetxattr,
5221
    .stat = quota_stat,
5222
    .fstat = quota_fstat,
5223
    .readlink = quota_readlink,
5224
    .readv = quota_readv,
5225
    .fsync = quota_fsync,
5226
    .setattr = quota_setattr,
5227
    .fsetattr = quota_fsetattr,
5228
    .mknod = quota_mknod,
5229
    .setxattr = quota_setxattr,
5230
    .fsetxattr = quota_fsetxattr,
5231
    .removexattr = quota_removexattr,
5232
    .fremovexattr = quota_fremovexattr,
5233
    .readdirp = quota_readdirp,
5234
    .fallocate = quota_fallocate,
5235
};
5236

5237
struct xlator_cbks cbks = {.forget = quota_forget};
5238

5239
struct xlator_dumpops dumpops = {
5240
    .priv = quota_priv_dump,
5241
};
5242
struct volume_options options[] = {
5243
    {
5244
        .key = {"enable"},
5245
        .type = GF_OPTION_TYPE_BOOL,
5246
        .default_value = "off",
5247
        .description = "enable is the volume option that can be used "
5248
                       "to turn on quota.",
5249
        .op_version = {1},
5250
        .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
5251
        .level = OPT_STATUS_BASIC,
5252
        .tags = {},
5253
    },
5254
    {
5255
        .key = {"deem-statfs"},
5256
        .type = GF_OPTION_TYPE_BOOL,
5257
        .default_value = "on",
5258
        .description = "If set to on, it takes quota limits into"
5259
                       " consideration while estimating fs size. (df command)"
5260
                       " (Default is on).",
5261
        .op_version = {2},
5262
        .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
5263
        .tags = {},
5264
    },
5265
    {
5266
        .key = {"server-quota"},
5267
        .type = GF_OPTION_TYPE_BOOL,
5268
        .default_value = "off",
5269
        .description = "Skip the quota enforcement if the feature is"
5270
                       " not turned on. This is not a user exposed option.",
5271
        .flags = OPT_FLAG_NONE,
5272
    },
5273
    {
5274
        .key = {"default-soft-limit"},
5275
        .type = GF_OPTION_TYPE_PERCENT,
5276
        .default_value = "80%",
5277
        .op_version = {3},
5278
        .description = "Soft limit is expressed as a proportion of hard limit."
5279
                       " Default-soft-limit is the proportion used when the "
5280
                       " user does not supply any soft limit value.",
5281
        .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
5282
        .tags = {},
5283
    },
5284
    {
5285
        .key = {"soft-timeout"},
5286
        .type = GF_OPTION_TYPE_TIME,
5287
        .min = 0,
5288
        .max = 1800,
5289
        .default_value = "60",
5290
        .description = "quota caches the directory sizes on client. "
5291
                       "soft-timeout indicates the timeout for the validity of"
5292
                       " cache before soft-limit has been crossed.",
5293
        .op_version = {3},
5294
        .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
5295
        .tags = {},
5296
    },
5297
    {
5298
        .key = {"hard-timeout"},
5299
        .type = GF_OPTION_TYPE_TIME,
5300
        .min = 0,
5301
        .max = 60,
5302
        .default_value = "5",
5303
        .description = "quota caches the directory sizes on client. "
5304
                       "hard-timeout indicates the timeout for the validity of"
5305
                       " cache after soft-limit has been crossed.",
5306
        .op_version = {3},
5307
        .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
5308
        .tags = {},
5309
    },
5310
    {.key = {"volume-uuid"},
5311
     .type = GF_OPTION_TYPE_STR,
5312
     .default_value = "{{ volume.id }}",
5313
     .description = "uuid of the volume this brick is part of."},
5314
    {
5315
        .key = {"alert-time"},
5316
        .type = GF_OPTION_TYPE_TIME,
5317
        .min = 0,
5318
        .max = 7 * 86400,
5319
        .default_value = "86400",
5320
        .op_version = {3},
5321
        .flags = OPT_FLAG_SETTABLE | OPT_FLAG_DOC,
5322
        .description = "Frequency of limit breach messages in log.",
5323
        .tags = {},
5324
    },
5325
    {.key = {NULL}}};
5326

5327
xlator_api_t xlator_api = {
5328
    .init = init,
5329
    .fini = fini,
5330
    .notify = notify,
5331
    .reconfigure = reconfigure,
5332
    .mem_acct_init = mem_acct_init,
5333
    .op_version = {1}, /* Present from the initial version */
5334
    .fops = &fops,
5335
    .cbks = &cbks,
5336
    .options = options,
5337
    .identifier = "quota",
5338
    .category = GF_MAINTAINED,
5339
};
5340

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.