glusterfs

Форк
0
/
glusterd-syncop.c 
2745 строк · 77.0 Кб
1
/*
2
   Copyright (c) 2012-2012 Red Hat, Inc. <http://www.redhat.com>
3
   This file is part of GlusterFS.
4

5
   This file is licensed to you under your choice of the GNU Lesser
6
   General Public License, version 3 or any later version (LGPLv3 or
7
   later), or the GNU General Public License, version 2 (GPLv2), in all
8
   cases as published by the Free Software Foundation.
9
*/
10
/* rpc related syncops */
11
#include "glusterd-syncop.h"
12
#include "glusterd-mgmt.h"
13

14
#include "glusterd.h"
15
#include "glusterd-op-sm.h"
16
#include "glusterd-utils.h"
17
#include "glusterd-server-quorum.h"
18
#include "glusterd-locks.h"
19
#include "glusterd-snapshot-utils.h"
20
#include "glusterd-messages.h"
21
#include "glusterd-errno.h"
22

23
extern glusterd_op_info_t opinfo;
24

25
void
26
gd_synctask_barrier_wait(struct syncargs *args, int count)
27
{
28
    glusterd_conf_t *conf = THIS->private;
29

30
    synclock_unlock(&conf->big_lock);
31
    synctask_barrier_wait(args, count);
32
    synclock_lock(&conf->big_lock);
33
}
34

35
static void
36
gd_collate_errors(struct syncargs *args, int op_ret, int op_errno,
37
                  char *op_errstr, int op_code, uuid_t peerid, u_char *uuid)
38
{
39
    char err_str[PATH_MAX] = "Please check log file for details.";
40
    char op_err[PATH_MAX] = "";
41
    int len = -1;
42
    char *peer_str = NULL;
43
    glusterd_peerinfo_t *peerinfo = NULL;
44

45
    if (op_ret) {
46
        args->op_ret = op_ret;
47
        args->op_errno = op_errno;
48

49
        RCU_READ_LOCK;
50
        peerinfo = glusterd_peerinfo_find(peerid, NULL);
51
        if (peerinfo)
52
            peer_str = gf_strdup(peerinfo->hostname);
53
        else
54
            peer_str = gf_strdup(uuid_utoa(uuid));
55
        RCU_READ_UNLOCK;
56

57
        if (op_errstr && strcmp(op_errstr, "")) {
58
            len = snprintf(err_str, sizeof(err_str) - 1, "Error: %s",
59
                           op_errstr);
60
            err_str[len] = '\0';
61
        }
62

63
        switch (op_code) {
64
            case GLUSTERD_MGMT_CLUSTER_LOCK: {
65
                len = snprintf(op_err, sizeof(op_err) - 1,
66
                               "Locking failed on %s. %s", peer_str, err_str);
67
                break;
68
            }
69
            case GLUSTERD_MGMT_CLUSTER_UNLOCK: {
70
                len = snprintf(op_err, sizeof(op_err) - 1,
71
                               "Unlocking failed on %s. %s", peer_str, err_str);
72
                break;
73
            }
74
            case GLUSTERD_MGMT_STAGE_OP: {
75
                len = snprintf(op_err, sizeof(op_err) - 1,
76
                               "Staging failed on %s. %s", peer_str, err_str);
77
                break;
78
            }
79
            case GLUSTERD_MGMT_COMMIT_OP: {
80
                len = snprintf(op_err, sizeof(op_err) - 1,
81
                               "Commit failed on %s. %s", peer_str, err_str);
82
                break;
83
            }
84
        }
85

86
        if (len > 0)
87
            op_err[len] = '\0';
88

89
        if (args->errstr) {
90
            len = snprintf(err_str, sizeof(err_str) - 1, "%s\n%s", args->errstr,
91
                           op_err);
92
            GF_FREE(args->errstr);
93
            args->errstr = NULL;
94
        } else
95
            len = snprintf(err_str, sizeof(err_str) - 1, "%s", op_err);
96
        err_str[len] = '\0';
97

98
        gf_msg("glusterd", GF_LOG_ERROR, 0, GD_MSG_MGMT_OP_FAIL, "%s", op_err);
99
        args->errstr = gf_strdup(err_str);
100
    }
101

102
    GF_FREE(peer_str);
103

104
    return;
105
}
106

107
int
108
gd_syncargs_init(struct syncargs *args, dict_t *op_ctx)
109
{
110
    int ret = 0;
111

112
    ret = pthread_mutex_init(&args->lock_dict, NULL);
113
    if (ret)
114
        return ret;
115

116
    ret = synctask_barrier_init(args);
117
    if (ret) {
118
        pthread_mutex_destroy(&args->lock_dict);
119
        return ret;
120
    }
121

122
    args->dict = op_ctx;
123
    return 0;
124
}
125

126
void
127
gd_syncargs_fini(struct syncargs *args)
128
{
129
    if (args->barrier.initialized) {
130
        pthread_mutex_destroy(&args->lock_dict);
131
        syncbarrier_destroy(&args->barrier);
132
    }
133
}
134

135
static void
136
gd_stage_op_req_free(gd1_mgmt_stage_op_req *req)
137
{
138
    if (!req)
139
        return;
140

141
    GF_FREE(req->buf.buf_val);
142
    GF_FREE(req);
143
}
144

145
static void
146
gd_commit_op_req_free(gd1_mgmt_commit_op_req *req)
147
{
148
    if (!req)
149
        return;
150

151
    GF_FREE(req->buf.buf_val);
152
    GF_FREE(req);
153
}
154

155
static void
156
gd_brick_op_req_free(gd1_mgmt_brick_op_req *req)
157
{
158
    if (!req)
159
        return;
160

161
    if (req->dict.dict_val)
162
        GF_FREE(req->dict.dict_val);
163
    GF_FREE(req->input.input_val);
164
    GF_FREE(req);
165
}
166

167
int
168
gd_syncop_submit_request(struct rpc_clnt *rpc, void *req, void *local,
169
                         void *cookie, rpc_clnt_prog_t *prog, int procnum,
170
                         fop_cbk_fn_t cbkfn, xdrproc_t xdrproc)
171
{
172
    int ret = -1;
173
    struct iobuf *iobuf = NULL;
174
    struct iobref *iobref = NULL;
175
    int count = 0;
176
    struct iovec iov = {
177
        0,
178
    };
179
    ssize_t req_size = 0;
180
    call_frame_t *frame = NULL;
181

182
    GF_ASSERT(rpc);
183
    if (!req)
184
        goto out;
185

186
    req_size = xdr_sizeof(xdrproc, req);
187
    iobuf = iobuf_get2(rpc->ctx->iobuf_pool, req_size);
188
    if (!iobuf)
189
        goto out;
190

191
    iobref = iobref_new();
192
    if (!iobref)
193
        goto out;
194

195
    frame = create_frame(THIS, THIS->ctx->pool);
196
    if (!frame)
197
        goto out;
198

199
    iobref_add(iobref, iobuf);
200

201
    iov.iov_base = iobuf->ptr;
202
    iov.iov_len = iobuf_pagesize(iobuf);
203

204
    /* Create the xdr payload */
205
    ret = xdr_serialize_generic(iov, req, xdrproc);
206
    if (ret == -1)
207
        goto out;
208

209
    iov.iov_len = ret;
210
    count = 1;
211

212
    frame->local = local;
213
    frame->cookie = cookie;
214

215
    /* Send the msg */
216
    ret = rpc_clnt_submit(rpc, prog, procnum, cbkfn, &iov, count, NULL, 0,
217
                          iobref, frame, NULL, 0, NULL, 0, NULL);
218

219
    /* TODO: do we need to start ping also? */
220
    /* In case of error the frame will be destroy by rpc_clnt_submit */
221
    frame = NULL;
222

223
out:
224
    iobref_unref(iobref);
225
    iobuf_unref(iobuf);
226

227
    if (ret && frame)
228
        STACK_DESTROY(frame->root);
229
    return ret;
230
}
231

232
/* Defined in glusterd-rpc-ops.c */
233
extern struct rpc_clnt_program gd_mgmt_prog;
234
extern struct rpc_clnt_program gd_brick_prog;
235
extern struct rpc_clnt_program gd_mgmt_v3_prog;
236

237
static int32_t
238
glusterd_append_gsync_status(dict_t *dst, dict_t *src)
239
{
240
    int ret = 0;
241
    char *stop_msg = NULL;
242

243
    ret = dict_get_str(src, "gsync-status", &stop_msg);
244
    if (ret) {
245
        gf_smsg("glusterd", GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
246
                "Key=gsync-status", NULL);
247
        ret = 0;
248
        goto out;
249
    }
250

251
    ret = dict_set_dynstr_with_alloc(dst, "gsync-status", stop_msg);
252
    if (ret) {
253
        gf_msg("glusterd", GF_LOG_WARNING, 0, GD_MSG_DICT_SET_FAILED,
254
               "Unable to set the stop"
255
               "message in the ctx dictionary");
256
        goto out;
257
    }
258

259
    ret = 0;
260
out:
261
    gf_msg_debug("glusterd", 0, "Returning %d", ret);
262
    return ret;
263
}
264

265
static int32_t
266
glusterd_append_status_dicts(dict_t *dst, dict_t *src)
267
{
268
    char sts_val_name[PATH_MAX] = "";
269
    int dst_count = 0;
270
    int src_count = 0;
271
    int i = 0;
272
    int ret = 0;
273
    gf_gsync_status_t *sts_val = NULL;
274
    gf_gsync_status_t *dst_sts_val = NULL;
275

276
    GF_ASSERT(dst);
277

278
    if (src == NULL)
279
        goto out;
280

281
    ret = dict_get_int32(dst, "gsync-count", &dst_count);
282
    if (ret)
283
        dst_count = 0;
284

285
    ret = dict_get_int32(src, "gsync-count", &src_count);
286
    if (ret || !src_count) {
287
        gf_msg_debug("glusterd", 0, "Source brick empty");
288
        ret = 0;
289
        goto out;
290
    }
291

292
    for (i = 0; i < src_count; i++) {
293
        snprintf(sts_val_name, sizeof(sts_val_name), "status_value%d", i);
294

295
        ret = dict_get_bin(src, sts_val_name, (void **)&sts_val);
296
        if (ret)
297
            goto out;
298

299
        dst_sts_val = GF_MALLOC(sizeof(gf_gsync_status_t),
300
                                gf_common_mt_gsync_status_t);
301
        if (!dst_sts_val) {
302
            gf_msg("glusterd", GF_LOG_ERROR, ENOMEM, GD_MSG_NO_MEMORY,
303
                   "Out Of Memory");
304
            goto out;
305
        }
306

307
        memcpy(dst_sts_val, sts_val, sizeof(gf_gsync_status_t));
308

309
        snprintf(sts_val_name, sizeof(sts_val_name), "status_value%d",
310
                 i + dst_count);
311

312
        ret = dict_set_bin(dst, sts_val_name, dst_sts_val,
313
                           sizeof(gf_gsync_status_t));
314
        if (ret) {
315
            GF_FREE(dst_sts_val);
316
            goto out;
317
        }
318
    }
319

320
    ret = dict_set_int32_sizen(dst, "gsync-count", dst_count + src_count);
321

322
out:
323
    gf_msg_debug("glusterd", 0, "Returning %d", ret);
324
    return ret;
325
}
326

327
static int32_t
328
glusterd_gsync_use_rsp_dict(dict_t *aggr, dict_t *rsp_dict, char *op_errstr)
329
{
330
    dict_t *ctx = NULL;
331
    int ret = 0;
332
    char *conf_path = NULL;
333

334
    if (aggr) {
335
        ctx = aggr;
336

337
    } else {
338
        ctx = glusterd_op_get_ctx();
339
        if (!ctx) {
340
            gf_msg("glusterd", GF_LOG_ERROR, 0, GD_MSG_OPCTX_GET_FAIL,
341
                   "Operation Context is not present");
342
            GF_ASSERT(0);
343
        }
344
    }
345

346
    if (rsp_dict) {
347
        ret = glusterd_append_status_dicts(ctx, rsp_dict);
348
        if (ret)
349
            goto out;
350

351
        ret = glusterd_append_gsync_status(ctx, rsp_dict);
352
        if (ret)
353
            goto out;
354

355
        ret = dict_get_str(rsp_dict, "conf_path", &conf_path);
356
        if (!ret && conf_path) {
357
            ret = dict_set_dynstr_with_alloc(ctx, "conf_path", conf_path);
358
            if (ret) {
359
                gf_msg("glusterd", GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
360
                       "Unable to store conf path.");
361
                goto out;
362
            }
363
        }
364
    }
365
    if ((op_errstr) && (strcmp("", op_errstr))) {
366
        ret = dict_set_dynstr_with_alloc(ctx, "errstr", op_errstr);
367
        if (ret)
368
            goto out;
369
    }
370

371
    ret = 0;
372
out:
373
    gf_msg_debug("glusterd", 0, "Returning %d ", ret);
374
    return ret;
375
}
376

377
static int
378
glusterd_max_opversion_use_rsp_dict(dict_t *dst, dict_t *src)
379
{
380
    int ret = -1;
381
    int src_max_opversion = -1;
382
    int max_opversion = -1;
383

384
    GF_VALIDATE_OR_GOTO(THIS->name, dst, out);
385
    GF_VALIDATE_OR_GOTO(THIS->name, src, out);
386

387
    ret = dict_get_int32(dst, "max-opversion", &max_opversion);
388
    if (ret)
389
        gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
390
               "Maximum supported op-version not set in destination "
391
               "dictionary");
392

393
    ret = dict_get_int32(src, "max-opversion", &src_max_opversion);
394
    if (ret) {
395
        gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
396
               "Failed to get maximum supported op-version from source");
397
        goto out;
398
    }
399

400
    if (max_opversion == -1 || src_max_opversion < max_opversion)
401
        max_opversion = src_max_opversion;
402

403
    ret = dict_set_int32_sizen(dst, "max-opversion", max_opversion);
404
    if (ret) {
405
        gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
406
               "Failed to set max op-version");
407
        goto out;
408
    }
409
out:
410
    return ret;
411
}
412

413
static int
414
glusterd_volume_bitrot_scrub_use_rsp_dict(dict_t *aggr, dict_t *rsp_dict)
415
{
416
    int ret = -1;
417
    int j = 0;
418
    uint64_t value = 0;
419
    char key[64] = "";
420
    int keylen;
421
    char *last_scrub_time = NULL;
422
    char *scrub_time = NULL;
423
    char *volname = NULL;
424
    char *node_uuid = NULL;
425
    char *node_uuid_str = NULL;
426
    char *bitd_log = NULL;
427
    char *scrub_log = NULL;
428
    char *scrub_freq = NULL;
429
    char *scrub_state = NULL;
430
    char *scrub_impact = NULL;
431
    char *bad_gfid_str = NULL;
432
    xlator_t *this = THIS;
433
    glusterd_conf_t *priv = NULL;
434
    glusterd_volinfo_t *volinfo = NULL;
435
    int src_count = 0;
436
    int dst_count = 0;
437
    int8_t scrub_running = 0;
438

439
    priv = this->private;
440
    GF_ASSERT(priv);
441

442
    ret = dict_get_str(aggr, "volname", &volname);
443
    if (ret) {
444
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
445
               "Unable to get volume name");
446
        goto out;
447
    }
448

449
    ret = glusterd_volinfo_find(volname, &volinfo);
450
    if (ret) {
451
        gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_VOL_NOT_FOUND,
452
               "Unable to find volinfo for volume: %s", volname);
453
        goto out;
454
    }
455

456
    ret = dict_get_int32(aggr, "count", &dst_count);
457

458
    ret = dict_get_int32(rsp_dict, "count", &src_count);
459
    if (ret) {
460
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
461
               "failed to get count value");
462
        ret = 0;
463
        goto out;
464
    }
465

466
    ret = dict_set_int32_sizen(aggr, "count", src_count + dst_count);
467
    if (ret)
468
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
469
               "Failed to set count in dictonary");
470

471
    keylen = snprintf(key, sizeof(key), "node-uuid-%d", src_count);
472
    ret = dict_get_strn(rsp_dict, key, keylen, &node_uuid);
473
    if (!ret) {
474
        node_uuid_str = gf_strdup(node_uuid);
475
        keylen = snprintf(key, sizeof(key), "node-uuid-%d",
476
                          src_count + dst_count);
477
        ret = dict_set_dynstrn(aggr, key, keylen, node_uuid_str);
478
        if (ret) {
479
            gf_msg_debug(this->name, 0, "failed to set node-uuid");
480
        }
481
    }
482

483
    snprintf(key, sizeof(key), "scrub-running-%d", src_count);
484
    ret = dict_get_int8(rsp_dict, key, &scrub_running);
485
    if (!ret) {
486
        snprintf(key, sizeof(key), "scrub-running-%d", src_count + dst_count);
487
        ret = dict_set_int8(aggr, key, scrub_running);
488
        if (ret) {
489
            gf_msg_debug(this->name, 0,
490
                         "Failed to set "
491
                         "scrub-running value");
492
        }
493
    }
494

495
    snprintf(key, sizeof(key), "scrubbed-files-%d", src_count);
496
    ret = dict_get_uint64(rsp_dict, key, &value);
497
    if (!ret) {
498
        snprintf(key, sizeof(key), "scrubbed-files-%d", src_count + dst_count);
499
        ret = dict_set_uint64(aggr, key, value);
500
        if (ret) {
501
            gf_msg_debug(this->name, 0,
502
                         "Failed to set "
503
                         "scrubbed-file value");
504
        }
505
    }
506

507
    snprintf(key, sizeof(key), "unsigned-files-%d", src_count);
508
    ret = dict_get_uint64(rsp_dict, key, &value);
509
    if (!ret) {
510
        snprintf(key, sizeof(key), "unsigned-files-%d", src_count + dst_count);
511
        ret = dict_set_uint64(aggr, key, value);
512
        if (ret) {
513
            gf_msg_debug(this->name, 0,
514
                         "Failed to set "
515
                         "unsigned-file value");
516
        }
517
    }
518

519
    keylen = snprintf(key, sizeof(key), "last-scrub-time-%d", src_count);
520
    ret = dict_get_strn(rsp_dict, key, keylen, &last_scrub_time);
521
    if (!ret) {
522
        scrub_time = gf_strdup(last_scrub_time);
523
        keylen = snprintf(key, sizeof(key), "last-scrub-time-%d",
524
                          src_count + dst_count);
525
        ret = dict_set_dynstrn(aggr, key, keylen, scrub_time);
526
        if (ret) {
527
            gf_msg_debug(this->name, 0,
528
                         "Failed to set "
529
                         "last scrub time value");
530
        }
531
    }
532

533
    snprintf(key, sizeof(key), "scrub-duration-%d", src_count);
534
    ret = dict_get_uint64(rsp_dict, key, &value);
535
    if (!ret) {
536
        snprintf(key, sizeof(key), "scrub-duration-%d", src_count + dst_count);
537
        ret = dict_set_uint64(aggr, key, value);
538
        if (ret) {
539
            gf_msg_debug(this->name, 0,
540
                         "Failed to set "
541
                         "scrubbed-duration value");
542
        }
543
    }
544

545
    snprintf(key, sizeof(key), "error-count-%d", src_count);
546
    ret = dict_get_uint64(rsp_dict, key, &value);
547
    if (!ret) {
548
        snprintf(key, sizeof(key), "error-count-%d", src_count + dst_count);
549
        ret = dict_set_uint64(aggr, key, value);
550
        if (ret) {
551
            gf_msg_debug(this->name, 0,
552
                         "Failed to set error "
553
                         "count value");
554
        }
555

556
        /* Storing all the bad files in the dictionary */
557
        for (j = 0; j < value; j++) {
558
            keylen = snprintf(key, sizeof(key), "quarantine-%d-%d", j,
559
                              src_count);
560
            ret = dict_get_strn(rsp_dict, key, keylen, &bad_gfid_str);
561
            if (!ret) {
562
                snprintf(key, sizeof(key), "quarantine-%d-%d", j,
563
                         src_count + dst_count);
564
                ret = dict_set_dynstr_with_alloc(aggr, key, bad_gfid_str);
565
                if (ret) {
566
                    gf_msg_debug(this->name, 0,
567
                                 "Failed to"
568
                                 "bad file gfid ");
569
                }
570
            }
571
        }
572
    }
573

574
    ret = dict_get_str(rsp_dict, "bitrot_log_file", &bitd_log);
575
    if (!ret) {
576
        ret = dict_set_dynstr_with_alloc(aggr, "bitrot_log_file", bitd_log);
577
        if (ret) {
578
            gf_msg_debug(this->name, 0,
579
                         "Failed to set "
580
                         "bitrot log file location");
581
            goto out;
582
        }
583
    }
584

585
    ret = dict_get_str(rsp_dict, "scrub_log_file", &scrub_log);
586
    if (!ret) {
587
        ret = dict_set_dynstr_with_alloc(aggr, "scrub_log_file", scrub_log);
588
        if (ret) {
589
            gf_msg_debug(this->name, 0,
590
                         "Failed to set "
591
                         "scrubber log file location");
592
            goto out;
593
        }
594
    }
595

596
    ret = dict_get_str(rsp_dict, "features.scrub-freq", &scrub_freq);
597
    if (!ret) {
598
        ret = dict_set_dynstr_with_alloc(aggr, "features.scrub-freq",
599
                                         scrub_freq);
600
        if (ret) {
601
            gf_msg_debug(this->name, 0,
602
                         "Failed to set "
603
                         "scrub-frequency value to dictionary");
604
            goto out;
605
        }
606
    }
607

608
    ret = dict_get_str(rsp_dict, "features.scrub-throttle", &scrub_impact);
609
    if (!ret) {
610
        ret = dict_set_dynstr_with_alloc(aggr, "features.scrub-throttle",
611
                                         scrub_impact);
612
        if (ret) {
613
            gf_msg_debug(this->name, 0,
614
                         "Failed to set "
615
                         "scrub-throttle value to dictionary");
616
            goto out;
617
        }
618
    }
619

620
    ret = dict_get_str(rsp_dict, "features.scrub", &scrub_state);
621
    if (!ret) {
622
        ret = dict_set_dynstr_with_alloc(aggr, "features.scrub", scrub_state);
623
        if (ret) {
624
            gf_msg_debug(this->name, 0,
625
                         "Failed to set "
626
                         "scrub state value to dictionary");
627
            goto out;
628
        }
629
    }
630

631
    ret = 0;
632
out:
633
    return ret;
634
}
635

636
static int
637
glusterd_sys_exec_output_rsp_dict(dict_t *dst, dict_t *src)
638
{
639
    char output_name[64] = "";
640
    char *output = NULL;
641
    int ret = 0;
642
    int i = 0;
643
    int keylen;
644
    int src_output_count = 0;
645
    int dst_output_count = 0;
646

647
    if (!dst || !src) {
648
        gf_msg("glusterd", GF_LOG_ERROR, 0, GD_MSG_DICT_EMPTY,
649
               "Source or Destination "
650
               "dict is empty.");
651
        goto out;
652
    }
653

654
    ret = dict_get_int32(dst, "output_count", &dst_output_count);
655

656
    ret = dict_get_int32(src, "output_count", &src_output_count);
657
    if (ret) {
658
        gf_msg_debug("glusterd", 0, "No output from source");
659
        ret = 0;
660
        goto out;
661
    }
662

663
    for (i = 1; i <= src_output_count; i++) {
664
        keylen = snprintf(output_name, sizeof(output_name), "output_%d", i);
665
        if (keylen <= 0 || keylen >= sizeof(output_name)) {
666
            ret = -1;
667
            goto out;
668
        }
669
        ret = dict_get_strn(src, output_name, keylen, &output);
670
        if (ret) {
671
            gf_msg("glusterd", GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
672
                   "Unable to fetch %s", output_name);
673
            goto out;
674
        }
675

676
        keylen = snprintf(output_name, sizeof(output_name), "output_%d",
677
                          i + dst_output_count);
678
        if (keylen <= 0 || keylen >= sizeof(output_name)) {
679
            ret = -1;
680
            goto out;
681
        }
682

683
        ret = dict_set_dynstrn(dst, output_name, keylen, gf_strdup(output));
684
        if (ret) {
685
            gf_msg("glusterd", GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
686
                   "Unable to set %s", output_name);
687
            goto out;
688
        }
689
    }
690

691
    ret = dict_set_int32_sizen(dst, "output_count",
692
                               dst_output_count + src_output_count);
693
out:
694
    gf_msg_debug("glusterd", 0, "Returning %d", ret);
695
    return ret;
696
}
697

698
static int
699
glusterd_use_rsp_dict(dict_t *aggr, dict_t *rsp_dict)
700
{
701
    int ret = 0;
702

703
    GF_ASSERT(aggr);
704
    GF_ASSERT(rsp_dict);
705

706
    if (aggr)
707
        dict_copy(rsp_dict, aggr);
708

709
    return ret;
710
}
711

712
static int
713
glusterd_volume_heal_use_rsp_dict(dict_t *aggr, dict_t *rsp_dict)
714
{
715
    int ret = 0;
716
    dict_t *ctx_dict = NULL;
717
    uuid_t *txn_id = NULL;
718
    glusterd_op_info_t txn_op_info = {
719
        GD_OP_STATE_DEFAULT,
720
    };
721
    glusterd_op_t op = GD_OP_NONE;
722

723
    GF_ASSERT(rsp_dict);
724

725
    ret = dict_get_bin(aggr, "transaction_id", (void **)&txn_id);
726
    if (ret)
727
        goto out;
728
    gf_msg_debug(THIS->name, 0, "transaction ID = %s", uuid_utoa(*txn_id));
729

730
    ret = glusterd_get_txn_opinfo(txn_id, &txn_op_info);
731
    if (ret) {
732
        gf_msg_callingfn(THIS->name, GF_LOG_ERROR, 0,
733
                         GD_MSG_TRANS_OPINFO_GET_FAIL,
734
                         "Unable to get transaction opinfo "
735
                         "for transaction ID : %s",
736
                         uuid_utoa(*txn_id));
737
        goto out;
738
    }
739

740
    op = txn_op_info.op;
741
    GF_ASSERT(GD_OP_HEAL_VOLUME == op);
742

743
    if (aggr) {
744
        ctx_dict = aggr;
745

746
    } else {
747
        ctx_dict = txn_op_info.op_ctx;
748
    }
749

750
    if (!ctx_dict)
751
        goto out;
752
    dict_copy(rsp_dict, ctx_dict);
753
out:
754
    return ret;
755
}
756

757
static int
758
glusterd_volume_quota_copy_to_op_ctx_dict(dict_t *dict, dict_t *rsp_dict)
759
{
760
    int ret = -1;
761
    int i = 0;
762
    int count = 0;
763
    int rsp_dict_count = 0;
764
    char *uuid_str = NULL;
765
    char *uuid_str_dup = NULL;
766
    char key[64] = "";
767
    int keylen;
768
    xlator_t *this = THIS;
769
    int type = GF_QUOTA_OPTION_TYPE_NONE;
770

771
    ret = dict_get_int32(dict, "type", &type);
772
    if (ret) {
773
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
774
               "Failed to get quota opcode");
775
        goto out;
776
    }
777

778
    if ((type != GF_QUOTA_OPTION_TYPE_LIMIT_USAGE) &&
779
        (type != GF_QUOTA_OPTION_TYPE_LIMIT_OBJECTS) &&
780
        (type != GF_QUOTA_OPTION_TYPE_REMOVE) &&
781
        (type != GF_QUOTA_OPTION_TYPE_REMOVE_OBJECTS)) {
782
        dict_copy(rsp_dict, dict);
783
        ret = 0;
784
        goto out;
785
    }
786

787
    ret = dict_get_int32(rsp_dict, "count", &rsp_dict_count);
788
    if (ret) {
789
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
790
               "Failed to get the count of "
791
               "gfids from the rsp dict");
792
        goto out;
793
    }
794

795
    ret = dict_get_int32(dict, "count", &count);
796
    if (ret)
797
        /* The key "count" is absent in op_ctx when this function is
798
         * called after self-staging on the originator. This must not
799
         * be treated as error.
800
         */
801
        gf_msg_debug(this->name, 0,
802
                     "Failed to get count of gfids"
803
                     " from req dict. This could be because count is not yet"
804
                     " copied from rsp_dict into op_ctx");
805

806
    for (i = 0; i < rsp_dict_count; i++) {
807
        keylen = snprintf(key, sizeof(key), "gfid%d", i);
808
        ret = dict_get_strn(rsp_dict, key, keylen, &uuid_str);
809
        if (ret) {
810
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
811
                   "Failed to get gfid "
812
                   "from rsp dict");
813
            goto out;
814
        }
815

816
        uuid_str_dup = gf_strdup(uuid_str);
817
        if (!uuid_str_dup) {
818
            ret = -1;
819
            goto out;
820
        }
821

822
        keylen = snprintf(key, sizeof(key), "gfid%d", i + count);
823
        ret = dict_set_dynstrn(dict, key, keylen, uuid_str_dup);
824
        if (ret) {
825
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
826
                   "Failed to set gfid "
827
                   "from rsp dict into req dict");
828
            GF_FREE(uuid_str_dup);
829
            goto out;
830
        }
831
    }
832

833
    ret = dict_set_int32_sizen(dict, "count", rsp_dict_count + count);
834
    if (ret) {
835
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
836
               "Failed to set aggregated "
837
               "count in req dict");
838
        goto out;
839
    }
840

841
out:
842
    return ret;
843
}
844

845
int
846
glusterd_syncop_aggr_rsp_dict(glusterd_op_t op, dict_t *aggr, dict_t *rsp)
847
{
848
    int ret = 0;
849

850
    switch (op) {
851
        case GD_OP_CREATE_VOLUME:
852
        case GD_OP_ADD_BRICK:
853
        case GD_OP_START_VOLUME:
854
            ret = glusterd_aggr_brick_mount_dirs(aggr, rsp);
855
            if (ret) {
856
                gf_msg(THIS->name, GF_LOG_ERROR, 0,
857
                       GD_MSG_BRICK_MOUNDIRS_AGGR_FAIL,
858
                       "Failed to "
859
                       "aggregate brick mount dirs");
860
                goto out;
861
            }
862
            break;
863

864
        case GD_OP_REPLACE_BRICK:
865
        case GD_OP_RESET_BRICK:
866
            ret = glusterd_rb_use_rsp_dict(aggr, rsp);
867
            if (ret)
868
                goto out;
869
            break;
870

871
        case GD_OP_SYNC_VOLUME:
872
            ret = glusterd_sync_use_rsp_dict(aggr, rsp);
873
            if (ret)
874
                goto out;
875
            break;
876

877
        case GD_OP_GSYNC_CREATE:
878
            break;
879

880
        case GD_OP_GSYNC_SET:
881
            ret = glusterd_gsync_use_rsp_dict(aggr, rsp, NULL);
882
            if (ret)
883
                goto out;
884
            break;
885

886
        case GD_OP_STATUS_VOLUME:
887
            ret = glusterd_volume_status_copy_to_op_ctx_dict(aggr, rsp);
888
            if (ret)
889
                goto out;
890
            break;
891

892
        case GD_OP_HEAL_VOLUME:
893
            ret = glusterd_volume_heal_use_rsp_dict(aggr, rsp);
894
            if (ret)
895
                goto out;
896

897
            break;
898

899
        case GD_OP_CLEARLOCKS_VOLUME:
900
            ret = glusterd_use_rsp_dict(aggr, rsp);
901
            if (ret)
902
                goto out;
903
            break;
904

905
        case GD_OP_QUOTA:
906
            ret = glusterd_volume_quota_copy_to_op_ctx_dict(aggr, rsp);
907
            if (ret)
908
                goto out;
909
            break;
910

911
        case GD_OP_SYS_EXEC:
912
            ret = glusterd_sys_exec_output_rsp_dict(aggr, rsp);
913
            if (ret)
914
                goto out;
915
            break;
916

917
        case GD_OP_SNAP:
918
            ret = glusterd_snap_use_rsp_dict(aggr, rsp);
919
            if (ret)
920
                goto out;
921
            break;
922

923
        case GD_OP_SCRUB_STATUS:
924
            ret = glusterd_volume_bitrot_scrub_use_rsp_dict(aggr, rsp);
925
            break;
926

927
        case GD_OP_SCRUB_ONDEMAND:
928
            break;
929

930
        case GD_OP_MAX_OPVERSION:
931
            ret = glusterd_max_opversion_use_rsp_dict(aggr, rsp);
932
            break;
933

934
        case GD_OP_PROFILE_VOLUME:
935
            ret = glusterd_profile_volume_use_rsp_dict(aggr, rsp);
936
            break;
937

938
        case GD_OP_REBALANCE:
939
        case GD_OP_DEFRAG_BRICK_VOLUME:
940
            ret = glusterd_volume_rebalance_use_rsp_dict(aggr, rsp);
941
            break;
942

943
        default:
944
            break;
945
    }
946
out:
947
    return ret;
948
}
949

950
int32_t
951
gd_syncop_mgmt_v3_lock_cbk_fn(struct rpc_req *req, struct iovec *iov, int count,
952
                              void *myframe)
953
{
954
    int ret = -1;
955
    struct syncargs *args = NULL;
956
    gd1_mgmt_v3_lock_rsp rsp = {
957
        {0},
958
    };
959
    call_frame_t *frame = NULL;
960
    int op_ret = -1;
961
    int op_errno = -1;
962
    uuid_t *peerid = NULL;
963

964
    GF_ASSERT(req);
965
    GF_ASSERT(myframe);
966

967
    frame = myframe;
968
    args = frame->local;
969
    peerid = frame->cookie;
970
    frame->local = NULL;
971
    frame->cookie = NULL;
972

973
    if (-1 == req->rpc_status) {
974
        op_errno = ENOTCONN;
975
        goto out;
976
    }
977

978
    GF_VALIDATE_OR_GOTO_WITH_ERROR(THIS->name, iov, out, op_errno, EINVAL);
979

980
    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gd1_mgmt_v3_lock_rsp);
981
    if (ret < 0)
982
        goto out;
983

984
    gf_uuid_copy(args->uuid, rsp.uuid);
985

986
    op_ret = rsp.op_ret;
987
    op_errno = rsp.op_errno;
988
out:
989
    gd_mgmt_v3_collate_errors(args, op_ret, op_errno, NULL,
990
                              GLUSTERD_MGMT_V3_LOCK, *peerid, rsp.uuid);
991

992
    GF_FREE(peerid);
993
    /* req->rpc_status set to -1 means, STACK_DESTROY will be called from
994
     * the caller function.
995
     */
996
    if (req->rpc_status != -1)
997
        STACK_DESTROY(frame->root);
998
    synctask_barrier_wake(args);
999
    return 0;
1000
}
1001

1002
int32_t
1003
gd_syncop_mgmt_v3_lock_cbk(struct rpc_req *req, struct iovec *iov, int count,
1004
                           void *myframe)
1005
{
1006
    return glusterd_big_locked_cbk(req, iov, count, myframe,
1007
                                   gd_syncop_mgmt_v3_lock_cbk_fn);
1008
}
1009

1010
int
1011
gd_syncop_mgmt_v3_lock(glusterd_op_t op, dict_t *op_ctx,
1012
                       glusterd_peerinfo_t *peerinfo, struct syncargs *args,
1013
                       uuid_t my_uuid, uuid_t recv_uuid, uuid_t txn_id)
1014
{
1015
    int ret = -1;
1016
    gd1_mgmt_v3_lock_req req = {
1017
        {0},
1018
    };
1019
    uuid_t *peerid = NULL;
1020

1021
    GF_ASSERT(op_ctx);
1022
    GF_ASSERT(peerinfo);
1023
    GF_ASSERT(args);
1024

1025
    ret = dict_allocate_and_serialize(op_ctx, &req.dict.dict_val,
1026
                                      &req.dict.dict_len);
1027
    if (ret) {
1028
        gf_smsg("glusterd", GF_LOG_ERROR, errno,
1029
                GD_MSG_DICT_ALLOC_AND_SERL_LENGTH_GET_FAIL, NULL);
1030
        goto out;
1031
    }
1032

1033
    gf_uuid_copy(req.uuid, my_uuid);
1034
    gf_uuid_copy(req.txn_id, txn_id);
1035
    req.op = op;
1036

1037
    GD_ALLOC_COPY_UUID(peerid, peerinfo->uuid, ret);
1038
    if (ret)
1039
        goto out;
1040

1041
    ret = gd_syncop_submit_request(peerinfo->rpc, &req, args, peerid,
1042
                                   &gd_mgmt_v3_prog, GLUSTERD_MGMT_V3_LOCK,
1043
                                   gd_syncop_mgmt_v3_lock_cbk,
1044
                                   (xdrproc_t)xdr_gd1_mgmt_v3_lock_req);
1045
out:
1046
    GF_FREE(req.dict.dict_val);
1047
    gf_msg_debug("glusterd", 0, "Returning %d", ret);
1048
    return ret;
1049
}
1050

1051
int32_t
1052
gd_syncop_mgmt_v3_unlock_cbk_fn(struct rpc_req *req, struct iovec *iov,
1053
                                int count, void *myframe)
1054
{
1055
    int ret = -1;
1056
    struct syncargs *args = NULL;
1057
    gd1_mgmt_v3_unlock_rsp rsp = {
1058
        {0},
1059
    };
1060
    call_frame_t *frame = NULL;
1061
    int op_ret = -1;
1062
    int op_errno = -1;
1063
    uuid_t *peerid = NULL;
1064

1065
    GF_ASSERT(req);
1066
    GF_ASSERT(myframe);
1067

1068
    frame = myframe;
1069
    args = frame->local;
1070
    peerid = frame->cookie;
1071
    frame->local = NULL;
1072
    frame->cookie = NULL;
1073

1074
    if (-1 == req->rpc_status) {
1075
        op_errno = ENOTCONN;
1076
        goto out;
1077
    }
1078

1079
    GF_VALIDATE_OR_GOTO_WITH_ERROR(THIS->name, iov, out, op_errno, EINVAL);
1080

1081
    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gd1_mgmt_v3_unlock_rsp);
1082
    if (ret < 0)
1083
        goto out;
1084

1085
    gf_uuid_copy(args->uuid, rsp.uuid);
1086

1087
    op_ret = rsp.op_ret;
1088
    op_errno = rsp.op_errno;
1089
out:
1090
    gd_mgmt_v3_collate_errors(args, op_ret, op_errno, NULL,
1091
                              GLUSTERD_MGMT_V3_UNLOCK, *peerid, rsp.uuid);
1092

1093
    GF_FREE(peerid);
1094
    /* req->rpc_status set to -1 means, STACK_DESTROY will be called from
1095
     * the caller function.
1096
     */
1097
    if (req->rpc_status != -1)
1098
        STACK_DESTROY(frame->root);
1099
    synctask_barrier_wake(args);
1100
    return 0;
1101
}
1102

1103
int32_t
1104
gd_syncop_mgmt_v3_unlock_cbk(struct rpc_req *req, struct iovec *iov, int count,
1105
                             void *myframe)
1106
{
1107
    return glusterd_big_locked_cbk(req, iov, count, myframe,
1108
                                   gd_syncop_mgmt_v3_unlock_cbk_fn);
1109
}
1110

1111
int
1112
gd_syncop_mgmt_v3_unlock(dict_t *op_ctx, glusterd_peerinfo_t *peerinfo,
1113
                         struct syncargs *args, uuid_t my_uuid,
1114
                         uuid_t recv_uuid, uuid_t txn_id)
1115
{
1116
    int ret = -1;
1117
    gd1_mgmt_v3_unlock_req req = {
1118
        {0},
1119
    };
1120
    uuid_t *peerid = NULL;
1121

1122
    GF_ASSERT(op_ctx);
1123
    GF_ASSERT(peerinfo);
1124
    GF_ASSERT(args);
1125

1126
    ret = dict_allocate_and_serialize(op_ctx, &req.dict.dict_val,
1127
                                      &req.dict.dict_len);
1128
    if (ret) {
1129
        gf_smsg("glusterd", GF_LOG_ERROR, errno,
1130
                GD_MSG_DICT_ALLOC_AND_SERL_LENGTH_GET_FAIL, NULL);
1131
        goto out;
1132
    }
1133

1134
    gf_uuid_copy(req.uuid, my_uuid);
1135
    gf_uuid_copy(req.txn_id, txn_id);
1136

1137
    GD_ALLOC_COPY_UUID(peerid, peerinfo->uuid, ret);
1138
    if (ret)
1139
        goto out;
1140

1141
    ret = gd_syncop_submit_request(peerinfo->rpc, &req, args, peerid,
1142
                                   &gd_mgmt_v3_prog, GLUSTERD_MGMT_V3_UNLOCK,
1143
                                   gd_syncop_mgmt_v3_unlock_cbk,
1144
                                   (xdrproc_t)xdr_gd1_mgmt_v3_unlock_req);
1145
out:
1146
    GF_FREE(req.dict.dict_val);
1147
    gf_msg_debug("glusterd", 0, "Returning %d", ret);
1148
    return ret;
1149
}
1150

1151
int32_t
1152
_gd_syncop_mgmt_lock_cbk(struct rpc_req *req, struct iovec *iov, int count,
1153
                         void *myframe)
1154
{
1155
    int ret = -1;
1156
    struct syncargs *args = NULL;
1157
    glusterd_peerinfo_t *peerinfo = NULL;
1158
    gd1_mgmt_cluster_lock_rsp rsp = {
1159
        {0},
1160
    };
1161
    call_frame_t *frame = NULL;
1162
    int op_ret = -1;
1163
    int op_errno = -1;
1164
    xlator_t *this = THIS;
1165
    uuid_t *peerid = NULL;
1166

1167
    frame = myframe;
1168
    args = frame->local;
1169
    peerid = frame->cookie;
1170
    frame->local = NULL;
1171
    frame->cookie = NULL;
1172

1173
    if (-1 == req->rpc_status) {
1174
        op_errno = ENOTCONN;
1175
        goto out;
1176
    }
1177

1178
    GF_VALIDATE_OR_GOTO_WITH_ERROR(this->name, iov, out, op_errno, EINVAL);
1179

1180
    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gd1_mgmt_cluster_lock_rsp);
1181
    if (ret < 0)
1182
        goto out;
1183

1184
    gf_uuid_copy(args->uuid, rsp.uuid);
1185

1186
    RCU_READ_LOCK;
1187
    peerinfo = glusterd_peerinfo_find(*peerid, NULL);
1188
    if (peerinfo) {
1189
        /* Set peer as locked, so we unlock only the locked peers */
1190
        if (rsp.op_ret == 0)
1191
            peerinfo->locked = _gf_true;
1192
        RCU_READ_UNLOCK;
1193
    } else {
1194
        RCU_READ_UNLOCK;
1195
        rsp.op_ret = -1;
1196
        gf_msg(this->name, GF_LOG_ERROR, EINVAL, GD_MSG_PEER_NOT_FOUND,
1197
               "Could not find peer with "
1198
               "ID %s",
1199
               uuid_utoa(*peerid));
1200
    }
1201

1202
    op_ret = rsp.op_ret;
1203
    op_errno = rsp.op_errno;
1204
out:
1205
    gd_collate_errors(args, op_ret, op_errno, NULL, GLUSTERD_MGMT_CLUSTER_LOCK,
1206
                      *peerid, rsp.uuid);
1207

1208
    GF_FREE(peerid);
1209
    /* req->rpc_status set to -1 means, STACK_DESTROY will be called from
1210
     * the caller function.
1211
     */
1212
    if (req->rpc_status != -1)
1213
        STACK_DESTROY(frame->root);
1214
    synctask_barrier_wake(args);
1215
    return 0;
1216
}
1217

1218
int32_t
1219
gd_syncop_mgmt_lock_cbk(struct rpc_req *req, struct iovec *iov, int count,
1220
                        void *myframe)
1221
{
1222
    return glusterd_big_locked_cbk(req, iov, count, myframe,
1223
                                   _gd_syncop_mgmt_lock_cbk);
1224
}
1225

1226
int
1227
gd_syncop_mgmt_lock(glusterd_peerinfo_t *peerinfo, struct syncargs *args,
1228
                    uuid_t my_uuid, uuid_t recv_uuid)
1229
{
1230
    int ret = -1;
1231
    gd1_mgmt_cluster_lock_req req = {
1232
        {0},
1233
    };
1234
    uuid_t *peerid = NULL;
1235

1236
    gf_uuid_copy(req.uuid, my_uuid);
1237
    GD_ALLOC_COPY_UUID(peerid, peerinfo->uuid, ret);
1238
    if (ret)
1239
        goto out;
1240

1241
    ret = gd_syncop_submit_request(peerinfo->rpc, &req, args, peerid,
1242
                                   &gd_mgmt_prog, GLUSTERD_MGMT_CLUSTER_LOCK,
1243
                                   gd_syncop_mgmt_lock_cbk,
1244
                                   (xdrproc_t)xdr_gd1_mgmt_cluster_lock_req);
1245
out:
1246
    return ret;
1247
}
1248

1249
int32_t
1250
_gd_syncop_mgmt_unlock_cbk(struct rpc_req *req, struct iovec *iov, int count,
1251
                           void *myframe)
1252
{
1253
    int ret = -1;
1254
    struct syncargs *args = NULL;
1255
    glusterd_peerinfo_t *peerinfo = NULL;
1256
    gd1_mgmt_cluster_unlock_rsp rsp = {
1257
        {0},
1258
    };
1259
    call_frame_t *frame = NULL;
1260
    int op_ret = -1;
1261
    int op_errno = -1;
1262
    xlator_t *this = THIS;
1263
    uuid_t *peerid = NULL;
1264

1265
    frame = myframe;
1266
    args = frame->local;
1267
    peerid = frame->cookie;
1268
    frame->local = NULL;
1269
    frame->cookie = NULL;
1270

1271
    if (-1 == req->rpc_status) {
1272
        op_errno = ENOTCONN;
1273
        goto out;
1274
    }
1275

1276
    GF_VALIDATE_OR_GOTO_WITH_ERROR(this->name, iov, out, op_errno, EINVAL);
1277

1278
    ret = xdr_to_generic(*iov, &rsp,
1279
                         (xdrproc_t)xdr_gd1_mgmt_cluster_unlock_rsp);
1280
    if (ret < 0)
1281
        goto out;
1282

1283
    gf_uuid_copy(args->uuid, rsp.uuid);
1284

1285
    RCU_READ_LOCK;
1286
    peerinfo = glusterd_peerinfo_find(*peerid, NULL);
1287
    if (peerinfo) {
1288
        peerinfo->locked = _gf_false;
1289
        RCU_READ_UNLOCK;
1290
    } else {
1291
        RCU_READ_UNLOCK;
1292
        rsp.op_ret = -1;
1293
        gf_msg(this->name, GF_LOG_ERROR, EINVAL, GD_MSG_PEER_NOT_FOUND,
1294
               "Could not find peer with "
1295
               "ID %s",
1296
               uuid_utoa(*peerid));
1297
    }
1298

1299
    op_ret = rsp.op_ret;
1300
    op_errno = rsp.op_errno;
1301
out:
1302
    gd_collate_errors(args, op_ret, op_errno, NULL,
1303
                      GLUSTERD_MGMT_CLUSTER_UNLOCK, *peerid, rsp.uuid);
1304

1305
    GF_FREE(peerid);
1306
    /* req->rpc_status set to -1 means, STACK_DESTROY will be called from
1307
     * the caller function.
1308
     */
1309
    if (req->rpc_status != -1)
1310
        STACK_DESTROY(frame->root);
1311
    synctask_barrier_wake(args);
1312
    return 0;
1313
}
1314

1315
int32_t
1316
gd_syncop_mgmt_unlock_cbk(struct rpc_req *req, struct iovec *iov, int count,
1317
                          void *myframe)
1318
{
1319
    return glusterd_big_locked_cbk(req, iov, count, myframe,
1320
                                   _gd_syncop_mgmt_unlock_cbk);
1321
}
1322

1323
int
1324
gd_syncop_mgmt_unlock(glusterd_peerinfo_t *peerinfo, struct syncargs *args,
1325
                      uuid_t my_uuid, uuid_t recv_uuid)
1326
{
1327
    int ret = -1;
1328
    gd1_mgmt_cluster_unlock_req req = {
1329
        {0},
1330
    };
1331
    uuid_t *peerid = NULL;
1332

1333
    gf_uuid_copy(req.uuid, my_uuid);
1334
    GD_ALLOC_COPY_UUID(peerid, peerinfo->uuid, ret);
1335
    if (ret)
1336
        goto out;
1337

1338
    ret = gd_syncop_submit_request(peerinfo->rpc, &req, args, peerid,
1339
                                   &gd_mgmt_prog, GLUSTERD_MGMT_CLUSTER_UNLOCK,
1340
                                   gd_syncop_mgmt_unlock_cbk,
1341
                                   (xdrproc_t)xdr_gd1_mgmt_cluster_lock_req);
1342
out:
1343
    return ret;
1344
}
1345

1346
int32_t
1347
_gd_syncop_stage_op_cbk(struct rpc_req *req, struct iovec *iov, int count,
1348
                        void *myframe)
1349
{
1350
    int ret = -1;
1351
    gd1_mgmt_stage_op_rsp rsp = {
1352
        {0},
1353
    };
1354
    struct syncargs *args = NULL;
1355
    xlator_t *this = THIS;
1356
    dict_t *rsp_dict = NULL;
1357
    call_frame_t *frame = NULL;
1358
    int op_ret = -1;
1359
    int op_errno = -1;
1360
    uuid_t *peerid = NULL;
1361

1362
    frame = myframe;
1363
    args = frame->local;
1364
    peerid = frame->cookie;
1365
    frame->local = NULL;
1366
    frame->cookie = NULL;
1367

1368
    if (-1 == req->rpc_status) {
1369
        op_errno = ENOTCONN;
1370
        goto out;
1371
    }
1372

1373
    GF_VALIDATE_OR_GOTO_WITH_ERROR(this->name, iov, out, op_errno, EINVAL);
1374

1375
    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gd1_mgmt_stage_op_rsp);
1376
    if (ret < 0)
1377
        goto out;
1378

1379
    if (rsp.dict.dict_len) {
1380
        /* Unserialize the dictionary */
1381
        rsp_dict = dict_new();
1382

1383
        ret = dict_unserialize(rsp.dict.dict_val, rsp.dict.dict_len, &rsp_dict);
1384
        if (ret < 0) {
1385
            GF_FREE(rsp.dict.dict_val);
1386
            goto out;
1387
        } else {
1388
            rsp_dict->extra_stdfree = rsp.dict.dict_val;
1389
        }
1390
    }
1391

1392
    RCU_READ_LOCK;
1393
    ret = (glusterd_peerinfo_find(rsp.uuid, NULL) == NULL);
1394
    RCU_READ_UNLOCK;
1395
    if (ret) {
1396
        ret = -1;
1397
        gf_msg(this->name, GF_LOG_CRITICAL, 0, GD_MSG_RESP_FROM_UNKNOWN_PEER,
1398
               "Staging response "
1399
               "for 'Volume %s' received from unknown "
1400
               "peer: %s",
1401
               gd_op_list[rsp.op], uuid_utoa(rsp.uuid));
1402
        goto out;
1403
    }
1404

1405
    gf_uuid_copy(args->uuid, rsp.uuid);
1406
    if (rsp.op == GD_OP_REPLACE_BRICK || rsp.op == GD_OP_QUOTA ||
1407
        rsp.op == GD_OP_CREATE_VOLUME || rsp.op == GD_OP_ADD_BRICK ||
1408
        rsp.op == GD_OP_START_VOLUME) {
1409
        pthread_mutex_lock(&args->lock_dict);
1410
        {
1411
            ret = glusterd_syncop_aggr_rsp_dict(rsp.op, args->dict, rsp_dict);
1412
            if (ret)
1413
                gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_RESP_AGGR_FAIL, "%s",
1414
                       "Failed to aggregate response from "
1415
                       " node/brick");
1416
        }
1417
        pthread_mutex_unlock(&args->lock_dict);
1418
    }
1419

1420
    op_ret = rsp.op_ret;
1421
    op_errno = rsp.op_errno;
1422

1423
out:
1424
    gd_collate_errors(args, op_ret, op_errno, rsp.op_errstr,
1425
                      GLUSTERD_MGMT_STAGE_OP, *peerid, rsp.uuid);
1426

1427
    if (rsp_dict)
1428
        dict_unref(rsp_dict);
1429
    GF_FREE(peerid);
1430
    /* req->rpc_status set to -1 means, STACK_DESTROY will be called from
1431
     * the caller function.
1432
     */
1433
    if (req->rpc_status != -1)
1434
        STACK_DESTROY(frame->root);
1435
    synctask_barrier_wake(args);
1436
    return 0;
1437
}
1438

1439
int32_t
1440
gd_syncop_stage_op_cbk(struct rpc_req *req, struct iovec *iov, int count,
1441
                       void *myframe)
1442
{
1443
    return glusterd_big_locked_cbk(req, iov, count, myframe,
1444
                                   _gd_syncop_stage_op_cbk);
1445
}
1446

1447
int
1448
gd_syncop_mgmt_stage_op(glusterd_peerinfo_t *peerinfo, struct syncargs *args,
1449
                        uuid_t my_uuid, uuid_t recv_uuid, int op,
1450
                        dict_t *dict_out, dict_t *op_ctx)
1451
{
1452
    gd1_mgmt_stage_op_req *req = NULL;
1453
    int ret = -1;
1454
    uuid_t *peerid = NULL;
1455

1456
    req = GF_CALLOC(1, sizeof(*req), gf_gld_mt_mop_stage_req_t);
1457
    if (!req) {
1458
        gf_smsg("glusterd", GF_LOG_ERROR, errno, GD_MSG_NO_MEMORY, NULL);
1459
        goto out;
1460
    }
1461

1462
    gf_uuid_copy(req->uuid, my_uuid);
1463
    req->op = op;
1464

1465
    ret = dict_allocate_and_serialize(dict_out, &req->buf.buf_val,
1466
                                      &req->buf.buf_len);
1467
    if (ret) {
1468
        gf_smsg("glusterd", GF_LOG_ERROR, errno,
1469
                GD_MSG_DICT_ALLOC_AND_SERL_LENGTH_GET_FAIL, NULL);
1470
        goto out;
1471
    }
1472

1473
    GD_ALLOC_COPY_UUID(peerid, peerinfo->uuid, ret);
1474
    if (ret)
1475
        goto out;
1476

1477
    ret = gd_syncop_submit_request(
1478
        peerinfo->rpc, req, args, peerid, &gd_mgmt_prog, GLUSTERD_MGMT_STAGE_OP,
1479
        gd_syncop_stage_op_cbk, (xdrproc_t)xdr_gd1_mgmt_stage_op_req);
1480
out:
1481
    gd_stage_op_req_free(req);
1482
    return ret;
1483
}
1484

1485
int32_t
1486
_gd_syncop_brick_op_cbk(struct rpc_req *req, struct iovec *iov, int count,
1487
                        void *myframe)
1488
{
1489
    struct syncargs *args = NULL;
1490
    gd1_mgmt_brick_op_rsp rsp = {
1491
        0,
1492
    };
1493
    int ret = -1;
1494
    call_frame_t *frame = NULL;
1495
    xlator_t *this = THIS;
1496

1497
    frame = myframe;
1498
    args = frame->local;
1499
    frame->local = NULL;
1500

1501
    /* initialize */
1502
    args->op_ret = -1;
1503
    args->op_errno = EINVAL;
1504

1505
    if (-1 == req->rpc_status) {
1506
        args->op_errno = ENOTCONN;
1507
        goto out;
1508
    }
1509

1510
    GF_VALIDATE_OR_GOTO_WITH_ERROR(this->name, iov, out, args->op_errno,
1511
                                   EINVAL);
1512

1513
    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gd1_mgmt_brick_op_rsp);
1514
    if (ret < 0)
1515
        goto out;
1516

1517
    if (rsp.output.output_len) {
1518
        args->dict = dict_new();
1519
        if (!args->dict) {
1520
            gf_smsg(this->name, GF_LOG_ERROR, errno, GD_MSG_DICT_CREATE_FAIL,
1521
                    NULL);
1522
            ret = -1;
1523
            args->op_errno = ENOMEM;
1524
            goto out;
1525
        }
1526

1527
        ret = dict_unserialize(rsp.output.output_val, rsp.output.output_len,
1528
                               &args->dict);
1529
        if (ret < 0) {
1530
            gf_smsg(this->name, GF_LOG_ERROR, errno,
1531
                    GD_MSG_DICT_UNSERIALIZE_FAIL, NULL);
1532
            goto out;
1533
        }
1534
    }
1535

1536
    args->op_ret = rsp.op_ret;
1537
    args->op_errno = rsp.op_errno;
1538
    args->errstr = gf_strdup(rsp.op_errstr);
1539

1540
out:
1541
    if ((rsp.op_errstr) && (strcmp(rsp.op_errstr, "") != 0))
1542
        free(rsp.op_errstr);
1543
    free(rsp.output.output_val);
1544

1545
    /* req->rpc_status set to -1 means, STACK_DESTROY will be called from
1546
     * the caller function.
1547
     */
1548
    if (req->rpc_status != -1)
1549
        STACK_DESTROY(frame->root);
1550
    __wake(args);
1551

1552
    return 0;
1553
}
1554

1555
int32_t
1556
gd_syncop_brick_op_cbk(struct rpc_req *req, struct iovec *iov, int count,
1557
                       void *myframe)
1558
{
1559
    return glusterd_big_locked_cbk(req, iov, count, myframe,
1560
                                   _gd_syncop_brick_op_cbk);
1561
}
1562

1563
int
1564
gd_syncop_mgmt_brick_op(struct rpc_clnt *rpc, glusterd_pending_node_t *pnode,
1565
                        int op, dict_t *dict_out, dict_t *op_ctx, char **errstr)
1566
{
1567
    struct syncargs args = {
1568
        0,
1569
    };
1570
    gd1_mgmt_brick_op_req *req = NULL;
1571
    int ret = 0;
1572

1573
    args.op_ret = -1;
1574
    args.op_errno = ENOTCONN;
1575

1576
    if ((pnode->type == GD_NODE_NFS) || (pnode->type == GD_NODE_QUOTAD) ||
1577
        (pnode->type == GD_NODE_SCRUB) ||
1578
        ((pnode->type == GD_NODE_SHD) && (op == GD_OP_STATUS_VOLUME))) {
1579
        ret = glusterd_node_op_build_payload(op, &req, dict_out);
1580

1581
    } else {
1582
        ret = glusterd_brick_op_build_payload(op, pnode->node, &req, dict_out);
1583
    }
1584

1585
    if (ret)
1586
        goto out;
1587

1588
    GD_SYNCOP(rpc, (&args), NULL, gd_syncop_brick_op_cbk, req, &gd_brick_prog,
1589
              req->op, xdr_gd1_mgmt_brick_op_req);
1590

1591
    if (args.errstr) {
1592
        if ((strlen(args.errstr) > 0) && errstr)
1593
            *errstr = args.errstr;
1594
        else
1595
            GF_FREE(args.errstr);
1596
    }
1597

1598
    if (GD_OP_STATUS_VOLUME == op) {
1599
        ret = dict_set_int32(args.dict, "index", pnode->index);
1600
        if (ret) {
1601
            gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
1602
                   "Error setting index on brick status"
1603
                   " rsp dict");
1604
            args.op_ret = -1;
1605
            goto out;
1606
        }
1607
    }
1608

1609
    if (req->op == GLUSTERD_BRICK_TERMINATE) {
1610
        if (args.op_ret && (args.op_errno == ENOTCONN)) {
1611
            /*
1612
             * This is actually OK.  It happens when the target
1613
             * brick process exits and we saw the closed connection
1614
             * before we read the response.  If we didn't read the
1615
             * response quickly enough that's kind of our own
1616
             * fault, and the fact that the process exited means
1617
             * that our goal of terminating the brick was achieved.
1618
             */
1619
            args.op_ret = 0;
1620
        }
1621
    }
1622

1623
    if (args.op_ret == 0)
1624
        glusterd_handle_node_rsp(dict_out, pnode->node, op, args.dict, op_ctx,
1625
                                 errstr, pnode->type);
1626

1627
out:
1628
    errno = args.op_errno;
1629
    if (args.dict)
1630
        dict_unref(args.dict);
1631
    if (args.op_ret && errstr && (*errstr == NULL)) {
1632
        if (op == GD_OP_HEAL_VOLUME) {
1633
            gf_asprintf(errstr,
1634
                        "Glusterd Syncop Mgmt brick op '%s' failed."
1635
                        " Please check glustershd log file for details.",
1636
                        gd_op_list[op]);
1637
        } else {
1638
            gf_asprintf(errstr,
1639
                        "Glusterd Syncop Mgmt brick op '%s' failed."
1640
                        " Please check brick log file for details.",
1641
                        gd_op_list[op]);
1642
        }
1643
    }
1644
    gd_brick_op_req_free(req);
1645
    return args.op_ret;
1646
}
1647

1648
int32_t
1649
_gd_syncop_commit_op_cbk(struct rpc_req *req, struct iovec *iov, int count,
1650
                         void *myframe)
1651
{
1652
    int ret = -1;
1653
    gd1_mgmt_commit_op_rsp rsp = {
1654
        {0},
1655
    };
1656
    struct syncargs *args = NULL;
1657
    xlator_t *this = THIS;
1658
    dict_t *rsp_dict = NULL;
1659
    call_frame_t *frame = NULL;
1660
    int op_ret = -1;
1661
    int op_errno = -1;
1662
    int type = GF_QUOTA_OPTION_TYPE_NONE;
1663
    uuid_t *peerid = NULL;
1664

1665
    frame = myframe;
1666
    args = frame->local;
1667
    peerid = frame->cookie;
1668
    frame->local = NULL;
1669
    frame->cookie = NULL;
1670

1671
    if (-1 == req->rpc_status) {
1672
        op_errno = ENOTCONN;
1673
        goto out;
1674
    }
1675

1676
    GF_VALIDATE_OR_GOTO_WITH_ERROR(this->name, iov, out, op_errno, EINVAL);
1677

1678
    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gd1_mgmt_commit_op_rsp);
1679
    if (ret < 0) {
1680
        goto out;
1681
    }
1682

1683
    if (rsp.dict.dict_len) {
1684
        /* Unserialize the dictionary */
1685
        rsp_dict = dict_new();
1686

1687
        ret = dict_unserialize(rsp.dict.dict_val, rsp.dict.dict_len, &rsp_dict);
1688
        if (ret < 0) {
1689
            GF_FREE(rsp.dict.dict_val);
1690
            goto out;
1691
        } else {
1692
            rsp_dict->extra_stdfree = rsp.dict.dict_val;
1693
        }
1694
    }
1695

1696
    RCU_READ_LOCK;
1697
    ret = (glusterd_peerinfo_find(rsp.uuid, NULL) == 0);
1698
    RCU_READ_UNLOCK;
1699
    if (ret) {
1700
        ret = -1;
1701
        gf_msg(this->name, GF_LOG_CRITICAL, 0, GD_MSG_RESP_FROM_UNKNOWN_PEER,
1702
               "Commit response "
1703
               "for 'Volume %s' received from unknown "
1704
               "peer: %s",
1705
               gd_op_list[rsp.op], uuid_utoa(rsp.uuid));
1706
        goto out;
1707
    }
1708

1709
    gf_uuid_copy(args->uuid, rsp.uuid);
1710
    if (rsp.op == GD_OP_QUOTA) {
1711
        ret = dict_get_int32(args->dict, "type", &type);
1712
        if (ret) {
1713
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1714
                   "Failed to get "
1715
                   "opcode");
1716
            goto out;
1717
        }
1718
    }
1719

1720
    if ((rsp.op != GD_OP_QUOTA) || (type == GF_QUOTA_OPTION_TYPE_LIST)) {
1721
        pthread_mutex_lock(&args->lock_dict);
1722
        {
1723
            ret = glusterd_syncop_aggr_rsp_dict(rsp.op, args->dict, rsp_dict);
1724
            if (ret)
1725
                gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_RESP_AGGR_FAIL, "%s",
1726
                       "Failed to aggregate response from "
1727
                       " node/brick");
1728
        }
1729
        pthread_mutex_unlock(&args->lock_dict);
1730
    }
1731

1732
    op_ret = rsp.op_ret;
1733
    op_errno = rsp.op_errno;
1734

1735
out:
1736
    gd_collate_errors(args, op_ret, op_errno, rsp.op_errstr,
1737
                      GLUSTERD_MGMT_COMMIT_OP, *peerid, rsp.uuid);
1738
    if (rsp_dict)
1739
        dict_unref(rsp_dict);
1740
    GF_FREE(peerid);
1741
    /* req->rpc_status set to -1 means, STACK_DESTROY will be called from
1742
     * the caller function.
1743
     */
1744
    if (req->rpc_status != -1)
1745
        STACK_DESTROY(frame->root);
1746
    synctask_barrier_wake(args);
1747

1748
    return 0;
1749
}
1750

1751
int32_t
1752
gd_syncop_commit_op_cbk(struct rpc_req *req, struct iovec *iov, int count,
1753
                        void *myframe)
1754
{
1755
    return glusterd_big_locked_cbk(req, iov, count, myframe,
1756
                                   _gd_syncop_commit_op_cbk);
1757
}
1758

1759
int
1760
gd_syncop_mgmt_commit_op(glusterd_peerinfo_t *peerinfo, struct syncargs *args,
1761
                         uuid_t my_uuid, uuid_t recv_uuid, int op,
1762
                         dict_t *dict_out, dict_t *op_ctx)
1763
{
1764
    gd1_mgmt_commit_op_req *req = NULL;
1765
    int ret = -1;
1766
    uuid_t *peerid = NULL;
1767

1768
    req = GF_CALLOC(1, sizeof(*req), gf_gld_mt_mop_commit_req_t);
1769
    if (!req) {
1770
        gf_smsg("glusterd", GF_LOG_ERROR, errno, GD_MSG_NO_MEMORY, NULL);
1771
        goto out;
1772
    }
1773

1774
    gf_uuid_copy(req->uuid, my_uuid);
1775
    req->op = op;
1776

1777
    ret = dict_allocate_and_serialize(dict_out, &req->buf.buf_val,
1778
                                      &req->buf.buf_len);
1779
    if (ret) {
1780
        gf_smsg("glusterd", GF_LOG_ERROR, errno,
1781
                GD_MSG_DICT_ALLOC_AND_SERL_LENGTH_GET_FAIL, NULL);
1782
        goto out;
1783
    }
1784

1785
    GD_ALLOC_COPY_UUID(peerid, peerinfo->uuid, ret);
1786
    if (ret)
1787
        goto out;
1788

1789
    ret = gd_syncop_submit_request(peerinfo->rpc, req, args, peerid,
1790
                                   &gd_mgmt_prog, GLUSTERD_MGMT_COMMIT_OP,
1791
                                   gd_syncop_commit_op_cbk,
1792
                                   (xdrproc_t)xdr_gd1_mgmt_commit_op_req);
1793
out:
1794
    gd_commit_op_req_free(req);
1795
    return ret;
1796
}
1797

1798
int
1799
gd_lock_op_phase(glusterd_conf_t *conf, glusterd_op_t op, dict_t *op_ctx,
1800
                 char **op_errstr, uuid_t txn_id,
1801
                 glusterd_op_info_t *txn_opinfo, gf_boolean_t cluster_lock)
1802
{
1803
    int ret = -1;
1804
    int peer_cnt = 0;
1805
    uuid_t peer_uuid = {0};
1806
    xlator_t *this = THIS;
1807
    glusterd_peerinfo_t *peerinfo = NULL;
1808
    struct syncargs args = {0};
1809

1810
    ret = synctask_barrier_init((&args));
1811
    if (ret)
1812
        goto out;
1813

1814
    peer_cnt = 0;
1815

1816
    RCU_READ_LOCK;
1817
    cds_list_for_each_entry_rcu(peerinfo, &conf->peers, uuid_list)
1818
    {
1819
        /* Only send requests to peers who were available before the
1820
         * transaction started
1821
         */
1822
        if (peerinfo->generation > txn_opinfo->txn_generation)
1823
            continue;
1824

1825
        if (!peerinfo->connected)
1826
            continue;
1827
        if (op != GD_OP_SYNC_VOLUME &&
1828
            peerinfo->state != GD_FRIEND_STATE_BEFRIENDED)
1829
            continue;
1830

1831
        if (cluster_lock) {
1832
            /* Reset lock status */
1833
            peerinfo->locked = _gf_false;
1834
            gd_syncop_mgmt_lock(peerinfo, &args, MY_UUID, peer_uuid);
1835
        } else
1836
            gd_syncop_mgmt_v3_lock(op, op_ctx, peerinfo, &args, MY_UUID,
1837
                                   peer_uuid, txn_id);
1838
        peer_cnt++;
1839
    }
1840
    RCU_READ_UNLOCK;
1841

1842
    if (0 == peer_cnt) {
1843
        ret = 0;
1844
        goto out;
1845
    }
1846

1847
    gd_synctask_barrier_wait((&args), peer_cnt);
1848

1849
    if (args.op_ret) {
1850
        if (args.errstr)
1851
            *op_errstr = gf_strdup(args.errstr);
1852
        else {
1853
            ret = gf_asprintf(op_errstr,
1854
                              "Another transaction "
1855
                              "could be in progress. Please try "
1856
                              "again after some time.");
1857
            if (ret == -1)
1858
                *op_errstr = NULL;
1859

1860
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_PEER_LOCK_FAIL,
1861
                   "Failed to acquire lock");
1862
        }
1863
    }
1864

1865
    ret = args.op_ret;
1866

1867
    gf_msg_debug(this->name, 0,
1868
                 "Sent lock op req for 'Volume %s' "
1869
                 "to %d peers. Returning %d",
1870
                 gd_op_list[op], peer_cnt, ret);
1871
out:
1872
    return ret;
1873
}
1874

1875
static int
1876
glusterd_validate_and_set_gfid(dict_t *op_ctx, dict_t *req_dict,
1877
                               char **op_errstr)
1878
{
1879
    int ret = -1;
1880
    int count = 0;
1881
    int i = 0;
1882
    int op_code = GF_QUOTA_OPTION_TYPE_NONE;
1883
    uuid_t uuid1 = {0};
1884
    uuid_t uuid2 = {
1885
        0,
1886
    };
1887
    char *path = NULL;
1888
    char key[64] = "";
1889
    int keylen;
1890
    char *uuid1_str = NULL;
1891
    char *uuid1_str_dup = NULL;
1892
    char *uuid2_str = NULL;
1893

1894
    ret = dict_get_int32(op_ctx, "type", &op_code);
1895
    if (ret) {
1896
        gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1897
               "Failed to get quota opcode");
1898
        goto out;
1899
    }
1900

1901
    if ((op_code != GF_QUOTA_OPTION_TYPE_LIMIT_USAGE) &&
1902
        (op_code != GF_QUOTA_OPTION_TYPE_LIMIT_OBJECTS) &&
1903
        (op_code != GF_QUOTA_OPTION_TYPE_REMOVE) &&
1904
        (op_code != GF_QUOTA_OPTION_TYPE_REMOVE_OBJECTS)) {
1905
        ret = 0;
1906
        goto out;
1907
    }
1908

1909
    ret = dict_get_str(op_ctx, "path", &path);
1910
    if (ret) {
1911
        gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1912
               "Failed to get path");
1913
        goto out;
1914
    }
1915

1916
    ret = dict_get_int32(op_ctx, "count", &count);
1917
    if (ret) {
1918
        gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1919
               "Failed to get count");
1920
        goto out;
1921
    }
1922

1923
    /* If count is 0, fail the command with ENOENT.
1924
     *
1925
     * If count is 1, treat gfid0 as the gfid on which the operation
1926
     * is to be performed and resume the command.
1927
     *
1928
     * if count > 1, get the 0th gfid from the op_ctx and,
1929
     * compare it with the remaining 'count -1' gfids.
1930
     * If they are found to be the same, set gfid0 in the op_ctx and
1931
     * resume the operation, else error out.
1932
     */
1933

1934
    if (count == 0) {
1935
        gf_asprintf(op_errstr,
1936
                    "Failed to get trusted.gfid attribute "
1937
                    "on path %s. Reason : %s",
1938
                    path, strerror(ENOENT));
1939
        ret = -ENOENT;
1940
        goto out;
1941
    }
1942

1943
    keylen = snprintf(key, sizeof(key), "gfid%d", 0);
1944

1945
    ret = dict_get_strn(op_ctx, key, keylen, &uuid1_str);
1946
    if (ret) {
1947
        gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1948
               "Failed to get key '%s'", key);
1949
        goto out;
1950
    }
1951

1952
    gf_uuid_parse(uuid1_str, uuid1);
1953

1954
    for (i = 1; i < count; i++) {
1955
        keylen = snprintf(key, sizeof(key), "gfid%d", i);
1956

1957
        ret = dict_get_strn(op_ctx, key, keylen, &uuid2_str);
1958
        if (ret) {
1959
            gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1960
                   "Failed to get key "
1961
                   "'%s'",
1962
                   key);
1963
            goto out;
1964
        }
1965

1966
        gf_uuid_parse(uuid2_str, uuid2);
1967

1968
        if (gf_uuid_compare(uuid1, uuid2)) {
1969
            gf_asprintf(op_errstr,
1970
                        "gfid mismatch between %s and "
1971
                        "%s for path %s",
1972
                        uuid1_str, uuid2_str, path);
1973
            ret = -1;
1974
            goto out;
1975
        }
1976
    }
1977

1978
    if (i == count) {
1979
        uuid1_str_dup = gf_strdup(uuid1_str);
1980
        if (!uuid1_str_dup) {
1981
            ret = -1;
1982
            goto out;
1983
        }
1984

1985
        ret = dict_set_dynstr_sizen(req_dict, "gfid", uuid1_str_dup);
1986
        if (ret) {
1987
            gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
1988
                   "Failed to set gfid");
1989
            GF_FREE(uuid1_str_dup);
1990
            goto out;
1991
        }
1992
    } else {
1993
        gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_ITER_FAIL,
1994
               "Failed to iterate through %d"
1995
               " entries in the req dict",
1996
               count);
1997
        ret = -1;
1998
        goto out;
1999
    }
2000

2001
    ret = 0;
2002
out:
2003
    return ret;
2004
}
2005

2006
int
2007
gd_stage_op_phase(glusterd_op_t op, dict_t *op_ctx, dict_t *req_dict,
2008
                  char **op_errstr, glusterd_op_info_t *txn_opinfo)
2009
{
2010
    int ret = -1;
2011
    int peer_cnt = 0;
2012
    dict_t *rsp_dict = NULL;
2013
    char *hostname = NULL;
2014
    xlator_t *this = THIS;
2015
    glusterd_conf_t *conf = NULL;
2016
    glusterd_peerinfo_t *peerinfo = NULL;
2017
    uuid_t tmp_uuid = {0};
2018
    char *errstr = NULL;
2019
    struct syncargs args = {0};
2020
    dict_t *aggr_dict = NULL;
2021

2022
    conf = this->private;
2023
    GF_ASSERT(conf);
2024

2025
    rsp_dict = dict_new();
2026
    if (!rsp_dict) {
2027
        gf_smsg(this->name, GF_LOG_ERROR, errno, GD_MSG_DICT_CREATE_FAIL, NULL);
2028
        goto out;
2029
    }
2030

2031
    if ((op == GD_OP_CREATE_VOLUME) || (op == GD_OP_ADD_BRICK) ||
2032
        (op == GD_OP_START_VOLUME))
2033
        aggr_dict = req_dict;
2034
    else
2035
        aggr_dict = op_ctx;
2036

2037
    ret = glusterd_validate_quorum(this, op, req_dict, op_errstr);
2038
    if (ret) {
2039
        gf_msg(this->name, GF_LOG_CRITICAL, 0, GD_MSG_SERVER_QUORUM_NOT_MET,
2040
               "Server quorum not met. Rejecting operation.");
2041
        goto out;
2042
    }
2043

2044
    ret = glusterd_op_stage_validate(op, req_dict, op_errstr, rsp_dict);
2045
    if (ret) {
2046
        hostname = "localhost";
2047
        goto stage_done;
2048
    }
2049

2050
    if ((op == GD_OP_REPLACE_BRICK || op == GD_OP_QUOTA ||
2051
         op == GD_OP_CREATE_VOLUME || op == GD_OP_ADD_BRICK ||
2052
         op == GD_OP_START_VOLUME)) {
2053
        ret = glusterd_syncop_aggr_rsp_dict(op, aggr_dict, rsp_dict);
2054
        if (ret) {
2055
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_RESP_AGGR_FAIL, "%s",
2056
                   "Failed to aggregate response from node/brick");
2057
            goto out;
2058
        }
2059
    }
2060
    dict_unref(rsp_dict);
2061
    rsp_dict = NULL;
2062

2063
stage_done:
2064
    if (ret) {
2065
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VALIDATE_FAILED,
2066
               LOGSTR_STAGE_FAIL, gd_op_list[op], hostname,
2067
               (*op_errstr) ? ":" : " ", (*op_errstr) ? *op_errstr : " ");
2068
        if (*op_errstr == NULL)
2069
            gf_asprintf(op_errstr, OPERRSTR_STAGE_FAIL, hostname);
2070
        goto out;
2071
    }
2072

2073
    ret = gd_syncargs_init(&args, aggr_dict);
2074
    if (ret)
2075
        goto out;
2076

2077
    peer_cnt = 0;
2078

2079
    RCU_READ_LOCK;
2080
    cds_list_for_each_entry_rcu(peerinfo, &conf->peers, uuid_list)
2081
    {
2082
        /* Only send requests to peers who were available before the
2083
         * transaction started
2084
         */
2085
        if (peerinfo->generation > txn_opinfo->txn_generation)
2086
            continue;
2087

2088
        if (!peerinfo->connected)
2089
            continue;
2090
        if (op != GD_OP_SYNC_VOLUME &&
2091
            peerinfo->state != GD_FRIEND_STATE_BEFRIENDED)
2092
            continue;
2093

2094
        (void)gd_syncop_mgmt_stage_op(peerinfo, &args, MY_UUID, tmp_uuid, op,
2095
                                      req_dict, op_ctx);
2096
        peer_cnt++;
2097
    }
2098
    RCU_READ_UNLOCK;
2099

2100
    if (0 == peer_cnt) {
2101
        ret = 0;
2102
        goto out;
2103
    }
2104

2105
    gf_msg_debug(this->name, 0,
2106
                 "Sent stage op req for 'Volume %s' "
2107
                 "to %d peers",
2108
                 gd_op_list[op], peer_cnt);
2109

2110
    gd_synctask_barrier_wait((&args), peer_cnt);
2111

2112
    if (args.errstr)
2113
        *op_errstr = gf_strdup(args.errstr);
2114
    else if (dict_get_str(aggr_dict, "errstr", &errstr) == 0)
2115
        *op_errstr = gf_strdup(errstr);
2116

2117
    ret = args.op_ret;
2118

2119
out:
2120
    if ((ret == 0) && (op == GD_OP_QUOTA)) {
2121
        ret = glusterd_validate_and_set_gfid(op_ctx, req_dict, op_errstr);
2122
        if (ret)
2123
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_GFID_VALIDATE_SET_FAIL,
2124
                   "Failed to validate and set gfid");
2125
    }
2126

2127
    if (rsp_dict)
2128
        dict_unref(rsp_dict);
2129

2130
    gd_syncargs_fini(&args);
2131
    return ret;
2132
}
2133

2134
int
2135
gd_commit_op_phase(glusterd_op_t op, dict_t *op_ctx, dict_t *req_dict,
2136
                   char **op_errstr, glusterd_op_info_t *txn_opinfo)
2137
{
2138
    dict_t *rsp_dict = NULL;
2139
    int peer_cnt = -1;
2140
    int ret = -1;
2141
    char *hostname = NULL;
2142
    glusterd_peerinfo_t *peerinfo = NULL;
2143
    xlator_t *this = THIS;
2144
    glusterd_conf_t *conf = NULL;
2145
    uuid_t tmp_uuid = {0};
2146
    char *errstr = NULL;
2147
    struct syncargs args = {0};
2148
    int type = GF_QUOTA_OPTION_TYPE_NONE;
2149
    uint32_t cmd = 0;
2150
    gf_boolean_t origin_glusterd = _gf_false;
2151

2152
    conf = this->private;
2153
    GF_ASSERT(conf);
2154

2155
    rsp_dict = dict_new();
2156
    if (!rsp_dict) {
2157
        gf_smsg(this->name, GF_LOG_ERROR, errno, GD_MSG_DICT_CREATE_FAIL, NULL);
2158
        ret = -1;
2159
        goto out;
2160
    }
2161

2162
    ret = glusterd_op_commit_perform(op, req_dict, op_errstr, rsp_dict);
2163
    if (ret) {
2164
        hostname = "localhost";
2165
        goto commit_done;
2166
    }
2167

2168
    if (op == GD_OP_QUOTA) {
2169
        ret = dict_get_int32(op_ctx, "type", &type);
2170
        if (ret) {
2171
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
2172
                   "Failed to get "
2173
                   "opcode");
2174
            goto out;
2175
        }
2176
    }
2177

2178
    if (((op == GD_OP_QUOTA) &&
2179
         ((type == GF_QUOTA_OPTION_TYPE_LIST) ||
2180
          (type == GF_QUOTA_OPTION_TYPE_LIST_OBJECTS))) ||
2181
        ((op != GD_OP_SYNC_VOLUME) && (op != GD_OP_QUOTA))) {
2182
        ret = glusterd_syncop_aggr_rsp_dict(op, op_ctx, rsp_dict);
2183
        if (ret) {
2184
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_RESP_AGGR_FAIL, "%s",
2185
                   "Failed to aggregate "
2186
                   "response from node/brick");
2187
            goto out;
2188
        }
2189
    }
2190

2191
    dict_unref(rsp_dict);
2192
    rsp_dict = NULL;
2193

2194
commit_done:
2195
    if (ret) {
2196
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_COMMIT_OP_FAIL,
2197
               LOGSTR_COMMIT_FAIL, gd_op_list[op], hostname,
2198
               (*op_errstr) ? ":" : " ", (*op_errstr) ? *op_errstr : " ");
2199
        if (*op_errstr == NULL)
2200
            gf_asprintf(op_errstr, OPERRSTR_COMMIT_FAIL, hostname);
2201
        goto out;
2202
    }
2203

2204
    ret = gd_syncargs_init(&args, op_ctx);
2205
    if (ret)
2206
        goto out;
2207

2208
    peer_cnt = 0;
2209
    origin_glusterd = is_origin_glusterd(req_dict);
2210

2211
    if (op == GD_OP_STATUS_VOLUME) {
2212
        ret = dict_get_uint32(req_dict, "cmd", &cmd);
2213
        if (ret) {
2214
            gf_smsg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2215
                    "Key=cmd", NULL);
2216
            goto out;
2217
        }
2218

2219
        if (origin_glusterd) {
2220
            if ((cmd & GF_CLI_STATUS_ALL)) {
2221
                ret = 0;
2222
                goto out;
2223
            }
2224
        }
2225
    }
2226

2227
    RCU_READ_LOCK;
2228
    cds_list_for_each_entry_rcu(peerinfo, &conf->peers, uuid_list)
2229
    {
2230
        /* Only send requests to peers who were available before the
2231
         * transaction started
2232
         */
2233
        if (peerinfo->generation > txn_opinfo->txn_generation)
2234
            continue;
2235

2236
        if (!peerinfo->connected)
2237
            continue;
2238
        if (op != GD_OP_SYNC_VOLUME &&
2239
            peerinfo->state != GD_FRIEND_STATE_BEFRIENDED)
2240
            continue;
2241

2242
        (void)gd_syncop_mgmt_commit_op(peerinfo, &args, MY_UUID, tmp_uuid, op,
2243
                                       req_dict, op_ctx);
2244
        peer_cnt++;
2245
    }
2246
    RCU_READ_UNLOCK;
2247

2248
    if (0 == peer_cnt) {
2249
        ret = 0;
2250
        goto out;
2251
    }
2252

2253
    gd_synctask_barrier_wait((&args), peer_cnt);
2254
    ret = args.op_ret;
2255
    if (args.errstr)
2256
        *op_errstr = gf_strdup(args.errstr);
2257
    else if (dict_get_str(op_ctx, "errstr", &errstr) == 0)
2258
        *op_errstr = gf_strdup(errstr);
2259

2260
    gf_msg_debug(this->name, 0,
2261
                 "Sent commit op req for 'Volume %s' "
2262
                 "to %d peers",
2263
                 gd_op_list[op], peer_cnt);
2264
out:
2265
    if (!ret)
2266
        glusterd_op_modify_op_ctx(op, op_ctx);
2267

2268
    if (rsp_dict)
2269
        dict_unref(rsp_dict);
2270

2271
    GF_FREE(args.errstr);
2272
    args.errstr = NULL;
2273

2274
    gd_syncargs_fini(&args);
2275
    return ret;
2276
}
2277

2278
int
2279
gd_unlock_op_phase(glusterd_conf_t *conf, glusterd_op_t op, int *op_ret,
2280
                   rpcsvc_request_t *req, dict_t *op_ctx, char *op_errstr,
2281
                   char *volname, gf_boolean_t is_acquired, uuid_t txn_id,
2282
                   glusterd_op_info_t *txn_opinfo, gf_boolean_t cluster_lock)
2283
{
2284
    glusterd_peerinfo_t *peerinfo = NULL;
2285
    uuid_t tmp_uuid = {0};
2286
    int peer_cnt = 0;
2287
    int ret = -1;
2288
    xlator_t *this = THIS;
2289
    struct syncargs args = {0};
2290
    int32_t global = 0;
2291
    char *type = NULL;
2292

2293
    /* If the lock has not been held during this
2294
     * transaction, do not send unlock requests */
2295
    if (!is_acquired) {
2296
        ret = 0;
2297
        goto out;
2298
    }
2299

2300
    ret = synctask_barrier_init((&args));
2301
    if (ret)
2302
        goto out;
2303

2304
    peer_cnt = 0;
2305

2306
    if (cluster_lock) {
2307
        RCU_READ_LOCK;
2308
        cds_list_for_each_entry_rcu(peerinfo, &conf->peers, uuid_list)
2309
        {
2310
            /* Only send requests to peers who were available before
2311
             * the transaction started
2312
             */
2313
            if (peerinfo->generation > txn_opinfo->txn_generation)
2314
                continue;
2315

2316
            if (!peerinfo->connected)
2317
                continue;
2318
            if (op != GD_OP_SYNC_VOLUME &&
2319
                peerinfo->state != GD_FRIEND_STATE_BEFRIENDED)
2320
                continue;
2321

2322
            /* Only unlock peers that were locked */
2323
            if (peerinfo->locked) {
2324
                gd_syncop_mgmt_unlock(peerinfo, &args, MY_UUID, tmp_uuid);
2325
                peer_cnt++;
2326
            }
2327
        }
2328
        RCU_READ_UNLOCK;
2329
    } else {
2330
        ret = dict_get_int32(op_ctx, "hold_global_locks", &global);
2331
        if (!ret && global)
2332
            type = "global";
2333
        else
2334
            type = "vol";
2335
        if (volname || global) {
2336
            RCU_READ_LOCK;
2337
            cds_list_for_each_entry_rcu(peerinfo, &conf->peers, uuid_list)
2338
            {
2339
                /* Only send requests to peers who were
2340
                 * available before the transaction started
2341
                 */
2342
                if (peerinfo->generation > txn_opinfo->txn_generation)
2343
                    continue;
2344

2345
                if (!peerinfo->connected)
2346
                    continue;
2347
                if (op != GD_OP_SYNC_VOLUME &&
2348
                    peerinfo->state != GD_FRIEND_STATE_BEFRIENDED)
2349
                    continue;
2350

2351
                gd_syncop_mgmt_v3_unlock(op_ctx, peerinfo, &args, MY_UUID,
2352
                                         tmp_uuid, txn_id);
2353
                peer_cnt++;
2354
            }
2355
            RCU_READ_UNLOCK;
2356
        }
2357
    }
2358

2359
    if (0 == peer_cnt) {
2360
        ret = 0;
2361
        goto out;
2362
    }
2363

2364
    gd_synctask_barrier_wait((&args), peer_cnt);
2365

2366
    ret = args.op_ret;
2367

2368
    gf_msg_debug(this->name, 0,
2369
                 "Sent unlock op req for 'Volume %s' "
2370
                 "to %d peers. Returning %d",
2371
                 gd_op_list[op], peer_cnt, ret);
2372
    if (ret) {
2373
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_PEER_UNLOCK_FAIL,
2374
               "Failed to unlock "
2375
               "on some peer(s)");
2376
    }
2377

2378
out:
2379
    /* If unlock failed, and op_ret was previously set
2380
     * priority is given to the op_ret. If op_ret was
2381
     * not set, and unlock failed, then set op_ret */
2382
    if (!*op_ret)
2383
        *op_ret = ret;
2384

2385
    if (is_acquired) {
2386
        /* Based on the op-version,
2387
         * we release the cluster or mgmt_v3 lock
2388
         * and clear the op */
2389

2390
        glusterd_op_clear_op();
2391
        if (cluster_lock)
2392
            glusterd_unlock(MY_UUID);
2393
        else {
2394
            if (type) {
2395
                ret = glusterd_mgmt_v3_unlock(volname, MY_UUID, type);
2396
                if (ret)
2397
                    gf_msg(this->name, GF_LOG_ERROR, 0,
2398
                           GD_MSG_MGMTV3_UNLOCK_FAIL,
2399
                           "Unable to release lock for %s", volname);
2400
            }
2401
        }
2402
    }
2403

2404
    if (!*op_ret)
2405
        *op_ret = ret;
2406

2407
    /*
2408
     * If there are any quorum events while the OP is in progress, process
2409
     * them.
2410
     */
2411
    if (conf->pending_quorum_action)
2412
        glusterd_do_quorum_action();
2413

2414
    return 0;
2415
}
2416

2417
int
2418
gd_get_brick_count(struct cds_list_head *bricks)
2419
{
2420
    glusterd_pending_node_t *pending_node = NULL;
2421
    int npeers = 0;
2422
    cds_list_for_each_entry(pending_node, bricks, list)
2423
    {
2424
        npeers++;
2425
    }
2426
    return npeers;
2427
}
2428

2429
int
2430
gd_brick_op_phase(glusterd_op_t op, dict_t *op_ctx, dict_t *req_dict,
2431
                  char **op_errstr)
2432
{
2433
    glusterd_pending_node_t *pending_node = NULL;
2434
    glusterd_pending_node_t *tmp = NULL;
2435
    struct cds_list_head selected = {
2436
        0,
2437
    };
2438
    xlator_t *this = THIS;
2439
    int brick_count = 0;
2440
    int ret = -1;
2441
    rpc_clnt_t *rpc = NULL;
2442
    dict_t *rsp_dict = NULL;
2443
    int32_t cmd = GF_OP_CMD_NONE;
2444
    glusterd_volinfo_t *volinfo = NULL;
2445

2446
    rsp_dict = dict_new();
2447
    if (!rsp_dict) {
2448
        gf_smsg(this->name, GF_LOG_ERROR, errno, GD_MSG_DICT_CREATE_FAIL, NULL);
2449
        ret = -1;
2450
        goto out;
2451
    }
2452

2453
    CDS_INIT_LIST_HEAD(&selected);
2454
    ret = glusterd_op_bricks_select(op, req_dict, op_errstr, &selected,
2455
                                    rsp_dict);
2456
    if (ret) {
2457
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_OP_FAIL, "%s",
2458
               (*op_errstr) ? *op_errstr
2459
                            : "Brick op failed. Check "
2460
                              "glusterd log file for more details.");
2461
        goto out;
2462
    }
2463

2464
    if (op == GD_OP_HEAL_VOLUME) {
2465
        ret = glusterd_syncop_aggr_rsp_dict(op, op_ctx, rsp_dict);
2466
        if (ret)
2467
            goto out;
2468
    }
2469
    dict_unref(rsp_dict);
2470
    rsp_dict = NULL;
2471

2472
    brick_count = 0;
2473
    cds_list_for_each_entry_safe(pending_node, tmp, &selected, list)
2474
    {
2475
        rpc = glusterd_pending_node_get_rpc(pending_node);
2476
        /* In the case of rebalance if the rpc object is null, we try to
2477
         * create the rpc object. if the rebalance daemon is down, it returns
2478
         * -1. otherwise, rpc object will be created and referenced.
2479
         */
2480
        if (!rpc) {
2481
            if (pending_node->type == GD_NODE_REBALANCE && pending_node->node) {
2482
                volinfo = pending_node->node;
2483
                glusterd_defrag_ref(volinfo->rebal.defrag);
2484
                ret = glusterd_rebalance_rpc_create(volinfo);
2485
                if (ret) {
2486
                    ret = 0;
2487
                    glusterd_defrag_volume_node_rsp(req_dict, NULL, op_ctx);
2488
                    goto out;
2489
                } else {
2490
                    rpc = glusterd_defrag_rpc_get(volinfo->rebal.defrag);
2491
                }
2492
            } else {
2493
                ret = -1;
2494
                gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_RPC_FAILURE,
2495
                       "Brick Op failed "
2496
                       "due to rpc failure.");
2497
                goto out;
2498
            }
2499
        }
2500

2501
        ret = gd_syncop_mgmt_brick_op(rpc, pending_node, op, req_dict, op_ctx,
2502
                                      op_errstr);
2503
        if (op == GD_OP_STATUS_VOLUME) {
2504
            /* for client-list its enough to quit the loop
2505
             * once we get the value from one brick
2506
             * */
2507
            ret = dict_get_int32(req_dict, "cmd", &cmd);
2508
            if (!ret && (cmd & GF_CLI_STATUS_CLIENT_LIST)) {
2509
                if (dict_get(op_ctx, "client-count"))
2510
                    break;
2511
            }
2512
        }
2513
        if (ret)
2514
            goto out;
2515

2516
        brick_count++;
2517
        glusterd_pending_node_put_rpc(pending_node);
2518
        GF_FREE(pending_node);
2519
    }
2520

2521
    pending_node = NULL;
2522
    ret = 0;
2523
out:
2524
    if (pending_node && pending_node->node)
2525
        glusterd_pending_node_put_rpc(pending_node);
2526

2527
    if (rsp_dict)
2528
        dict_unref(rsp_dict);
2529
    gf_msg_debug(this->name, 0, "Sent op req to %d bricks", brick_count);
2530
    return ret;
2531
}
2532

2533
void
2534
gd_sync_task_begin(dict_t *op_ctx, rpcsvc_request_t *req)
2535
{
2536
    int ret = -1;
2537
    int op_ret = -1;
2538
    dict_t *req_dict = NULL;
2539
    glusterd_conf_t *conf = NULL;
2540
    glusterd_op_t op = GD_OP_NONE;
2541
    int32_t tmp_op = 0;
2542
    char *op_errstr = NULL;
2543
    char *tmp = NULL;
2544
    char *global = NULL;
2545
    char *volname = NULL;
2546
    xlator_t *this = THIS;
2547
    gf_boolean_t is_acquired = _gf_false;
2548
    gf_boolean_t is_global = _gf_false;
2549
    uuid_t *txn_id = NULL;
2550
    glusterd_op_info_t txn_opinfo = {
2551
        GD_OP_STATE_DEFAULT,
2552
    };
2553
    uint32_t op_errno = 0;
2554
    time_t timeout = 0;
2555

2556
    conf = this->private;
2557
    GF_ASSERT(conf);
2558

2559
    ret = dict_get_int32(op_ctx, GD_SYNC_OPCODE_KEY, &tmp_op);
2560
    if (ret) {
2561
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
2562
               "Failed to get volume "
2563
               "operation");
2564
        goto out;
2565
    }
2566
    op = tmp_op;
2567

2568
    /* Generate a transaction-id for this operation and
2569
     * save it in the dict */
2570
    ret = glusterd_generate_txn_id(op_ctx, &txn_id);
2571
    if (ret) {
2572
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_TRANS_IDGEN_FAIL,
2573
               "Failed to generate transaction id");
2574
        goto out;
2575
    }
2576

2577
    /* Save opinfo for this transaction with the transaction id. */
2578
    glusterd_txn_opinfo_init(&txn_opinfo, 0, (int *)&op, NULL, NULL);
2579
    ret = glusterd_set_txn_opinfo(txn_id, &txn_opinfo);
2580
    if (ret)
2581
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_TRANS_OPINFO_SET_FAIL,
2582
               "Unable to set transaction's opinfo");
2583

2584
    gf_msg_debug(this->name, 0, "Transaction ID : %s", uuid_utoa(*txn_id));
2585

2586
    /* Save the MY_UUID as the originator_uuid */
2587
    ret = glusterd_set_originator_uuid(op_ctx);
2588
    if (ret) {
2589
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_UUID_SET_FAIL,
2590
               "Failed to set originator_uuid.");
2591
        goto out;
2592
    }
2593

2594
    /* Cli will add timeout key to dict if the default timeout is
2595
     * other than 2 minutes. Here we use this value to check whether
2596
     * mgmt_v3_lock_timeout should be set to default value or we
2597
     * need to change the value according to timeout value
2598
     * i.e, timeout + 120 seconds. */
2599
    ret = dict_get_time(op_ctx, "timeout", &timeout);
2600
    if (!ret)
2601
        conf->mgmt_v3_lock_timeout = timeout + 120;
2602

2603
    ret = dict_get_str(op_ctx, "globalname", &global);
2604
    if (!ret) {
2605
        is_global = _gf_true;
2606
        goto global;
2607
    }
2608

2609
    /* If no volname is given as a part of the command, locks will
2610
     * not be held */
2611
    ret = dict_get_str(op_ctx, "volname", &tmp);
2612
    if (ret) {
2613
        gf_msg_debug("glusterd", 0, "Failed to get volume name");
2614
        goto local_locking_done;
2615
    } else {
2616
        /* Use a copy of volname, as cli response will be
2617
         * sent before the unlock, and the volname in the
2618
         * dict, might be removed */
2619
        volname = gf_strdup(tmp);
2620
        if (!volname)
2621
            goto out;
2622
    }
2623

2624
    ret = glusterd_mgmt_v3_lock(volname, MY_UUID, &op_errno, "vol");
2625
    if (ret) {
2626
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_MGMTV3_LOCK_GET_FAIL,
2627
               "Unable to acquire lock for %s", volname);
2628
        gf_asprintf(&op_errstr,
2629
                    "Another transaction is in progress "
2630
                    "for %s. Please try again after some time.",
2631
                    volname);
2632
        goto out;
2633
    }
2634

2635
global:
2636
    if (is_global) {
2637
        ret = glusterd_mgmt_v3_lock(global, MY_UUID, &op_errno, "global");
2638
        if (ret) {
2639
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_MGMTV3_LOCK_GET_FAIL,
2640
                   "Unable to acquire lock for %s", global);
2641
            gf_asprintf(&op_errstr,
2642
                        "Another transaction is in progress "
2643
                        "for %s. Please try again after some time.",
2644
                        global);
2645
            is_global = _gf_false;
2646
            goto out;
2647
        }
2648
    }
2649

2650
    is_acquired = _gf_true;
2651

2652
local_locking_done:
2653

2654
    /* If no volname is given as a part of the command, locks will
2655
     * not be held */
2656
    if (volname || is_global) {
2657
        ret = gd_lock_op_phase(conf, op, op_ctx, &op_errstr, *txn_id,
2658
                               &txn_opinfo, _gf_false);
2659
        if (ret) {
2660
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_PEER_LOCK_FAIL,
2661
                   "Locking Peers Failed.");
2662
            goto out;
2663
        }
2664
    }
2665

2666
    ret = glusterd_op_build_payload(&req_dict, &op_errstr, op_ctx);
2667
    if (ret) {
2668
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_OP_PAYLOAD_BUILD_FAIL,
2669
               LOGSTR_BUILD_PAYLOAD, gd_op_list[op]);
2670
        if (op_errstr == NULL)
2671
            gf_asprintf(&op_errstr, OPERRSTR_BUILD_PAYLOAD);
2672
        goto out;
2673
    }
2674

2675
    ret = gd_stage_op_phase(op, op_ctx, req_dict, &op_errstr, &txn_opinfo);
2676
    if (ret)
2677
        goto out;
2678

2679
    ret = gd_brick_op_phase(op, op_ctx, req_dict, &op_errstr);
2680
    if (ret)
2681
        goto out;
2682

2683
    ret = gd_commit_op_phase(op, op_ctx, req_dict, &op_errstr, &txn_opinfo);
2684
    if (ret)
2685
        goto out;
2686

2687
    ret = 0;
2688
out:
2689
    op_ret = ret;
2690
    if (txn_id) {
2691
        if (global)
2692
            (void)gd_unlock_op_phase(conf, op, &op_ret, req, op_ctx, op_errstr,
2693
                                     global, is_acquired, *txn_id, &txn_opinfo,
2694
                                     _gf_false);
2695
        else
2696
            (void)gd_unlock_op_phase(conf, op, &op_ret, req, op_ctx, op_errstr,
2697
                                     volname, is_acquired, *txn_id, &txn_opinfo,
2698
                                     _gf_false);
2699

2700
        /* Clearing the transaction opinfo */
2701
        ret = glusterd_clear_txn_opinfo(txn_id);
2702
        if (ret)
2703
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_TRANS_OPINFO_CLEAR_FAIL,
2704
                   "Unable to clear transaction's "
2705
                   "opinfo for transaction ID : %s",
2706
                   uuid_utoa(*txn_id));
2707
    }
2708

2709
    if (op_ret && (op_errno == 0))
2710
        op_errno = EG_INTRNL;
2711

2712
    glusterd_op_send_cli_response(op, op_ret, op_errno, req, op_ctx, op_errstr);
2713

2714
    if (volname)
2715
        GF_FREE(volname);
2716

2717
    if (req_dict)
2718
        dict_unref(req_dict);
2719

2720
    if (op_errstr) {
2721
        GF_FREE(op_errstr);
2722
        op_errstr = NULL;
2723
    }
2724

2725
    return;
2726
}
2727

2728
int32_t
2729
glusterd_op_begin_synctask(rpcsvc_request_t *req, glusterd_op_t op, void *dict)
2730
{
2731
    int ret = 0;
2732

2733
    ret = dict_set_int32(dict, GD_SYNC_OPCODE_KEY, op);
2734
    if (ret) {
2735
        gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
2736
               "dict set failed for setting operations");
2737
        goto out;
2738
    }
2739

2740
    gd_sync_task_begin(dict, req);
2741
    ret = 0;
2742
out:
2743

2744
    return ret;
2745
}
2746

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.