glusterfs

Форк
0
/
glusterd-brick-ops.c 
2565 строк · 84.3 Кб
1
/*
2
   Copyright (c) 2011-2012 Red Hat, Inc. <http://www.redhat.com>
3
   This file is part of GlusterFS.
4

5
   This file is licensed to you under your choice of the GNU Lesser
6
   General Public License, version 3 or any later version (LGPLv3 or
7
   later), or the GNU General Public License, version 2 (GPLv2), in all
8
   cases as published by the Free Software Foundation.
9
*/
10
#include "glusterd-op-sm.h"
11
#include "glusterd-geo-rep.h"
12
#include "glusterd-store.h"
13
#include "glusterd-mgmt.h"
14
#include "glusterd-utils.h"
15
#include "glusterd-volgen.h"
16
#include "glusterd-svc-helper.h"
17
#include "glusterd-messages.h"
18
#include "glusterd-server-quorum.h"
19
#include <glusterfs/run.h>
20
#include <glusterfs/syscall.h>
21
#include <sys/signal.h>
22

23
/* misc */
24

25
/* In this function, we decide, based on the 'count' of the brick,
26
   where to add it in the current volume. 'count' tells us already
27
   how many of the given bricks are added. other argument are self-
28
   descriptive. */
29
int
30
add_brick_at_right_order(glusterd_brickinfo_t *brickinfo,
31
                         glusterd_volinfo_t *volinfo, int count,
32
                         int32_t replica_cnt)
33
{
34
    int idx = 0;
35
    int i = 0;
36
    int sub_cnt = 0;
37
    glusterd_brickinfo_t *brick = NULL;
38

39
    /* The complexity of the function is in deciding at which index
40
       to add new brick. Even though it can be defined with a complex
41
       single formula for all volume, it is separated out to make it
42
       more readable */
43

44
    /* replica count is set */
45
    /* common formula when 'replica_count' is set */
46
    /* idx = ((count / (replica_cnt - existing_replica_count)) *
47
       existing_replica_count) +
48
       (count + existing_replica_count);
49
    */
50

51
    sub_cnt = volinfo->replica_count;
52
    idx = (count / (replica_cnt - sub_cnt) * sub_cnt) + (count + sub_cnt);
53

54
    i = 0;
55
    cds_list_for_each_entry(brick, &volinfo->bricks, brick_list)
56
    {
57
        i++;
58
        if (i < idx)
59
            continue;
60
        gf_msg_debug(THIS->name, 0, "brick:%s index=%d, count=%d", brick->path,
61
                     idx, count);
62

63
        cds_list_add(&brickinfo->brick_list, &brick->brick_list);
64
        break;
65
    }
66

67
    return 0;
68
}
69

70
static int
71
gd_addbr_validate_replica_count(glusterd_volinfo_t *volinfo, int replica_count,
72
                                int arbiter_count, int total_bricks, int *type,
73
                                char *err_str, int err_len)
74
{
75
    int ret = -1;
76

77
    /* replica count is set */
78
    switch (volinfo->type) {
79
        case GF_CLUSTER_TYPE_NONE:
80
            if ((volinfo->brick_count * replica_count) == total_bricks) {
81
                /* Change the volume type */
82
                *type = GF_CLUSTER_TYPE_REPLICATE;
83
                gf_msg(THIS->name, GF_LOG_INFO, 0,
84
                       GD_MSG_VOL_TYPE_CHANGING_INFO,
85
                       "Changing the type of volume %s from "
86
                       "'distribute' to 'replica'",
87
                       volinfo->volname);
88
                ret = 0;
89
                goto out;
90

91
            } else {
92
                snprintf(err_str, err_len,
93
                         "Incorrect number of "
94
                         "bricks (%d) supplied for replica count (%d).",
95
                         (total_bricks - volinfo->brick_count), replica_count);
96
                gf_msg(THIS->name, GF_LOG_ERROR, EINVAL, GD_MSG_INVALID_ENTRY,
97
                       "%s", err_str);
98
                goto out;
99
            }
100
            break;
101
        case GF_CLUSTER_TYPE_REPLICATE:
102
            if (replica_count < volinfo->replica_count) {
103
                snprintf(err_str, err_len,
104
                         "Incorrect replica count (%d) supplied. "
105
                         "Volume already has (%d)",
106
                         replica_count, volinfo->replica_count);
107
                gf_msg(THIS->name, GF_LOG_ERROR, EINVAL, GD_MSG_INVALID_ENTRY,
108
                       "%s", err_str);
109
                goto out;
110
            }
111
            if (replica_count == volinfo->replica_count) {
112
                if (arbiter_count && !volinfo->arbiter_count) {
113
                    snprintf(err_str, err_len,
114
                             "Cannot convert replica 3 volume "
115
                             "to arbiter volume.");
116
                    gf_msg(THIS->name, GF_LOG_ERROR, EINVAL,
117
                           GD_MSG_INVALID_ENTRY, "%s", err_str);
118
                    goto out;
119
                }
120
                if (!(total_bricks % volinfo->dist_leaf_count)) {
121
                    ret = 1;
122
                    goto out;
123
                }
124
            }
125
            if (replica_count > volinfo->replica_count) {
126
                /* We have to make sure before and after 'add-brick',
127
                   the number or subvolumes for distribute will remain
128
                   same, when replica count is given */
129
                if ((total_bricks * volinfo->dist_leaf_count) ==
130
                    (volinfo->brick_count * replica_count)) {
131
                    /* Change the dist_leaf_count */
132
                    gf_msg(THIS->name, GF_LOG_INFO, 0,
133
                           GD_MSG_REPLICA_COUNT_CHANGE_INFO,
134
                           "Changing the replica count of "
135
                           "volume %s from %d to %d",
136
                           volinfo->volname, volinfo->replica_count,
137
                           replica_count);
138
                    ret = 0;
139
                    goto out;
140
                }
141
            }
142
            break;
143
        case GF_CLUSTER_TYPE_DISPERSE:
144
            snprintf(err_str, err_len,
145
                     "Volume %s cannot be converted "
146
                     "from dispersed to replicated-"
147
                     "dispersed",
148
                     volinfo->volname);
149
            gf_msg(THIS->name, GF_LOG_ERROR, EPERM, GD_MSG_OP_NOT_PERMITTED,
150
                   "%s", err_str);
151
            goto out;
152
    }
153
out:
154
    return ret;
155
}
156

157
static int
158
gd_rmbr_validate_replica_count(glusterd_volinfo_t *volinfo,
159
                               int32_t replica_count, int32_t brick_count,
160
                               char *err_str, size_t err_len)
161
{
162
    int ret = -1;
163
    int replica_nodes = 0;
164

165
    switch (volinfo->type) {
166
        case GF_CLUSTER_TYPE_NONE:
167
        case GF_CLUSTER_TYPE_DISPERSE:
168
            snprintf(err_str, err_len,
169
                     "replica count (%d) option given for non replicate "
170
                     "volume %s",
171
                     replica_count, volinfo->volname);
172
            gf_smsg(THIS->name, GF_LOG_WARNING, EINVAL, GD_MSG_INVALID_ARGUMENT,
173
                    err_str, NULL);
174
            goto out;
175

176
        case GF_CLUSTER_TYPE_REPLICATE:
177
            /* in remove brick, you can only reduce the replica count */
178
            if (replica_count > volinfo->replica_count) {
179
                snprintf(err_str, err_len,
180
                         "given replica count (%d) option is more "
181
                         "than volume %s's replica count (%d)",
182
                         replica_count, volinfo->volname,
183
                         volinfo->replica_count);
184
                gf_smsg(THIS->name, GF_LOG_WARNING, EINVAL,
185
                        GD_MSG_INVALID_ARGUMENT, err_str, NULL);
186
                goto out;
187
            }
188
            if (replica_count == volinfo->replica_count) {
189
                /* This means the 'replica N' option on CLI was
190
                   redundant. Check if the total number of bricks given
191
                   for removal is same as 'dist_leaf_count' */
192
                if (brick_count % volinfo->dist_leaf_count) {
193
                    snprintf(err_str, err_len,
194
                             "number of bricks provided (%d) is "
195
                             "not valid. need at least %d "
196
                             "(or %dxN)",
197
                             brick_count, volinfo->dist_leaf_count,
198
                             volinfo->dist_leaf_count);
199
                    gf_smsg(THIS->name, GF_LOG_WARNING, EINVAL,
200
                            GD_MSG_INVALID_ARGUMENT, err_str, NULL);
201
                    goto out;
202
                }
203
                ret = 1;
204
                goto out;
205
            }
206

207
            replica_nodes = ((volinfo->brick_count / volinfo->replica_count) *
208
                             (volinfo->replica_count - replica_count));
209

210
            if (brick_count % replica_nodes) {
211
                snprintf(err_str, err_len,
212
                         "need %d(xN) bricks for reducing replica "
213
                         "count of the volume from %d to %d",
214
                         replica_nodes, volinfo->replica_count, replica_count);
215
                gf_smsg(THIS->name, GF_LOG_WARNING, EINVAL,
216
                        GD_MSG_INVALID_ARGUMENT, err_str, NULL);
217
                goto out;
218
            }
219
            break;
220
    }
221

222
    ret = 0;
223
out:
224
    return ret;
225
}
226

227
/* Handler functions */
228
int
229
__glusterd_handle_add_brick(rpcsvc_request_t *req)
230
{
231
    int32_t ret = -1;
232
    gf_cli_req cli_req = {{
233
        0,
234
    }};
235
    dict_t *dict = NULL;
236
    char *bricks = NULL;
237
    char *volname = NULL;
238
    int brick_count = 0;
239
    void *cli_rsp = NULL;
240
    char err_str[2048] = "";
241
    gf_cli_rsp rsp = {
242
        0,
243
    };
244
    glusterd_volinfo_t *volinfo = NULL;
245
    xlator_t *this = THIS;
246
    int total_bricks = 0;
247
    int32_t replica_count = 0;
248
    int32_t arbiter_count = 0;
249
    int type = 0;
250

251
    GF_ASSERT(req);
252

253
    ret = xdr_to_generic(req->msg[0], &cli_req, (xdrproc_t)xdr_gf_cli_req);
254
    if (ret < 0) {
255
        // failed to decode msg;
256
        req->rpc_err = GARBAGE_ARGS;
257
        snprintf(err_str, sizeof(err_str), "Garbage args received");
258
        gf_smsg(this->name, GF_LOG_ERROR, errno, GD_MSG_GARBAGE_ARGS, NULL);
259
        goto out;
260
    }
261

262
    gf_msg(this->name, GF_LOG_INFO, 0, GD_MSG_ADD_BRICK_REQ_RECVD,
263
           "Received add brick req");
264

265
    if (cli_req.dict.dict_len) {
266
        /* Unserialize the dictionary */
267
        dict = dict_new();
268

269
        ret = dict_unserialize(cli_req.dict.dict_val, cli_req.dict.dict_len,
270
                               &dict);
271
        if (ret < 0) {
272
            gf_msg(this->name, GF_LOG_ERROR, errno,
273
                   GD_MSG_DICT_UNSERIALIZE_FAIL,
274
                   "failed to "
275
                   "unserialize req-buffer to dictionary");
276
            snprintf(err_str, sizeof(err_str),
277
                     "Unable to decode "
278
                     "the command");
279
            goto out;
280
        }
281
    }
282

283
    ret = dict_get_str(dict, "volname", &volname);
284

285
    if (ret) {
286
        snprintf(err_str, sizeof(err_str),
287
                 "Unable to get volume "
288
                 "name");
289
        gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED, "%s",
290
               err_str);
291
        goto out;
292
    }
293

294
    ret = glusterd_volinfo_find(volname, &volinfo);
295
    if (ret) {
296
        snprintf(err_str, sizeof(err_str),
297
                 "Unable to get volinfo "
298
                 "for volume name %s",
299
                 volname);
300
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOLINFO_GET_FAIL, "%s",
301
               err_str);
302
        goto out;
303
    }
304

305
    ret = dict_get_int32(dict, "count", &brick_count);
306
    if (ret) {
307
        snprintf(err_str, sizeof(err_str),
308
                 "Unable to get volume "
309
                 "brick count");
310
        gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED, "%s",
311
               err_str);
312
        goto out;
313
    }
314

315
    ret = dict_get_int32(dict, "replica-count", &replica_count);
316
    if (!ret) {
317
        gf_msg(this->name, GF_LOG_INFO, errno, GD_MSG_DICT_GET_SUCCESS,
318
               "replica-count is %d", replica_count);
319
    }
320

321
    ret = dict_get_int32(dict, "arbiter-count", &arbiter_count);
322
    if (!ret) {
323
        gf_msg(this->name, GF_LOG_INFO, errno, GD_MSG_DICT_GET_SUCCESS,
324
               "arbiter-count is %d", arbiter_count);
325
    }
326

327
    if (!dict_get_sizen(dict, "force")) {
328
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
329
               "Failed to get flag");
330
        goto out;
331
    }
332

333
    total_bricks = volinfo->brick_count + brick_count;
334

335
    if (!replica_count) {
336
        if (volinfo->type == GF_CLUSTER_TYPE_NONE)
337
            goto brick_val;
338

339
        if ((volinfo->brick_count < volinfo->dist_leaf_count) &&
340
            (total_bricks <= volinfo->dist_leaf_count))
341
            goto brick_val;
342

343
        if ((brick_count % volinfo->dist_leaf_count) != 0) {
344
            snprintf(err_str, sizeof(err_str),
345
                     "Incorrect number "
346
                     "of bricks supplied %d with count %d",
347
                     brick_count, volinfo->dist_leaf_count);
348
            gf_msg(this->name, GF_LOG_ERROR, EINVAL, GD_MSG_VOL_NOT_REPLICA,
349
                   "%s", err_str);
350
            ret = -1;
351
            goto out;
352
        }
353
        goto brick_val;
354
        /* done with validation.. below section is if replica
355
           count is given */
356
    }
357

358
    ret = gd_addbr_validate_replica_count(volinfo, replica_count, arbiter_count,
359
                                          total_bricks, &type, err_str,
360
                                          sizeof(err_str));
361
    if (ret == -1) {
362
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_COUNT_VALIDATE_FAILED, "%s",
363
               err_str);
364
        goto out;
365
    }
366

367
    /* if replica count is same as earlier, set it back to 0 */
368
    if (ret == 1)
369
        replica_count = 0;
370

371
    ret = dict_set_int32_sizen(dict, "replica-count", replica_count);
372
    if (ret) {
373
        gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
374
               "failed to set the replica-count in dict");
375
        goto out;
376
    }
377

378
brick_val:
379
    ret = dict_get_str(dict, "bricks", &bricks);
380
    if (ret) {
381
        snprintf(err_str, sizeof(err_str),
382
                 "Unable to get volume "
383
                 "bricks");
384
        gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED, "%s",
385
               err_str);
386
        goto out;
387
    }
388

389
    if (type != volinfo->type) {
390
        ret = dict_set_int32_sizen(dict, "type", type);
391
        if (ret) {
392
            gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
393
                   "failed to set the new type in dict");
394
            goto out;
395
        }
396
    }
397

398
    ret = glusterd_mgmt_v3_initiate_all_phases(req, GD_OP_ADD_BRICK, dict);
399

400
out:
401
    if (ret) {
402
        rsp.op_ret = -1;
403
        rsp.op_errno = 0;
404
        if (err_str[0] == '\0')
405
            snprintf(err_str, sizeof(err_str), "Operation failed");
406
        rsp.op_errstr = err_str;
407
        cli_rsp = &rsp;
408
        glusterd_to_cli(req, cli_rsp, NULL, 0, NULL, (xdrproc_t)xdr_gf_cli_rsp,
409
                        dict);
410
        ret = 0;  // sent error to cli, prevent second reply
411
    }
412

413
    free(cli_req.dict.dict_val);  // its malloced by xdr
414

415
    return ret;
416
}
417

418
int
419
glusterd_handle_add_brick(rpcsvc_request_t *req)
420
{
421
    return glusterd_big_locked_handler(req, __glusterd_handle_add_brick);
422
}
423

424
static int
425
subvol_matcher_init(int **subvols, int count)
426
{
427
    int ret = -1;
428

429
    *subvols = GF_CALLOC(count, sizeof(int), gf_gld_mt_int);
430
    if (*subvols)
431
        ret = 0;
432

433
    return ret;
434
}
435

436
static void
437
subvol_matcher_update(int *subvols, glusterd_volinfo_t *volinfo,
438
                      glusterd_brickinfo_t *brickinfo)
439
{
440
    glusterd_brickinfo_t *tmp = NULL;
441
    int32_t sub_volume = 0;
442
    int pos = 0;
443
    if (subvols) {
444
        cds_list_for_each_entry(tmp, &volinfo->bricks, brick_list)
445
        {
446
            if (strcmp(tmp->hostname, brickinfo->hostname) ||
447
                strcmp(tmp->path, brickinfo->path)) {
448
                pos++;
449
                continue;
450
            }
451
            gf_msg_debug(THIS->name, 0, LOGSTR_FOUND_BRICK, brickinfo->hostname,
452
                         brickinfo->path, volinfo->volname);
453
            sub_volume = (pos / volinfo->dist_leaf_count);
454
            subvols[sub_volume]++;
455
            break;
456
        }
457
    }
458
}
459

460
static int
461
subvol_matcher_verify(int *subvols, glusterd_volinfo_t *volinfo, char *err_str,
462
                      size_t err_len, char *vol_type, int replica_count)
463
{
464
    int i = 0;
465
    int ret = 0;
466
    int count = volinfo->replica_count - replica_count;
467
    xlator_t *this = THIS;
468

469
    if (replica_count && subvols) {
470
        for (i = 0; i < volinfo->subvol_count; i++) {
471
            if (subvols[i] != count) {
472
                ret = -1;
473
                snprintf(err_str, err_len,
474
                         "Remove exactly %d"
475
                         " brick(s) from each subvolume.",
476
                         count);
477
                gf_smsg(this->name, GF_LOG_ERROR, errno,
478
                        GD_MSG_BRICK_SUBVOL_VERIFY_FAIL, err_str, NULL);
479
                break;
480
            }
481
        }
482
        return ret;
483
    }
484

485
    do {
486
        if (subvols && (subvols[i] % volinfo->dist_leaf_count == 0)) {
487
            continue;
488
        } else {
489
            ret = -1;
490
            snprintf(err_str, err_len, "Bricks not from same subvol for %s",
491
                     vol_type);
492
            gf_smsg(this->name, GF_LOG_ERROR, errno,
493
                    GD_MSG_BRICK_SUBVOL_VERIFY_FAIL, err_str, NULL);
494
            break;
495
        }
496
    } while (++i < volinfo->subvol_count);
497

498
    return ret;
499
}
500

501
static void
502
subvol_matcher_destroy(int *subvols)
503
{
504
    GF_FREE(subvols);
505
}
506

507
static int
508
glusterd_remove_brick_validate_arbiters(glusterd_volinfo_t *volinfo,
509
                                        int32_t count, int32_t replica_count,
510
                                        glusterd_brickinfo_t **brickinfo_list,
511
                                        char *err_str, size_t err_len)
512
{
513
    int i = 0;
514
    int ret = 0;
515
    glusterd_brickinfo_t *brickinfo = NULL;
516
    glusterd_brickinfo_t *last = NULL;
517
    char *arbiter_array = NULL;
518
    xlator_t *this = THIS;
519

520
    if (volinfo->type != GF_CLUSTER_TYPE_REPLICATE)
521
        goto out;
522

523
    if (!replica_count || !volinfo->arbiter_count)
524
        goto out;
525

526
    if (replica_count == 2) {
527
        /* If it is an arbiter to replica 2 conversion, only permit
528
         *  removal of the arbiter brick.*/
529
        for (i = 0; i < count; i++) {
530
            brickinfo = brickinfo_list[i];
531
            last = get_last_brick_of_brick_group(volinfo, brickinfo);
532
            if (last != brickinfo) {
533
                snprintf(err_str, err_len,
534
                         "Remove arbiter "
535
                         "brick(s) only when converting from "
536
                         "arbiter to replica 2 subvolume.");
537
                gf_smsg(this->name, GF_LOG_ERROR, errno,
538
                        GD_MSG_REMOVE_ARBITER_BRICK, err_str, NULL);
539
                ret = -1;
540
                goto out;
541
            }
542
        }
543
    } else if (replica_count == 1) {
544
        /* If it is an arbiter to plain distribute conversion, in every
545
         * replica subvol, the arbiter has to be one of the bricks that
546
         * are removed. */
547
        arbiter_array = GF_CALLOC(volinfo->subvol_count, sizeof(*arbiter_array),
548
                                  gf_common_mt_char);
549
        if (!arbiter_array)
550
            return -1;
551
        for (i = 0; i < count; i++) {
552
            brickinfo = brickinfo_list[i];
553
            last = get_last_brick_of_brick_group(volinfo, brickinfo);
554
            if (last == brickinfo)
555
                arbiter_array[brickinfo->group] = 1;
556
        }
557
        for (i = 0; i < volinfo->subvol_count; i++)
558
            if (!arbiter_array[i]) {
559
                snprintf(err_str, err_len,
560
                         "Removed bricks "
561
                         "must contain arbiter when converting"
562
                         " to plain distribute.");
563
                gf_smsg(this->name, GF_LOG_ERROR, errno,
564
                        GD_MSG_REMOVE_ARBITER_BRICK, err_str, NULL);
565
                ret = -1;
566
                break;
567
            }
568
        GF_FREE(arbiter_array);
569
    }
570

571
out:
572
    return ret;
573
}
574

575
int
576
__glusterd_handle_remove_brick(rpcsvc_request_t *req)
577
{
578
    int32_t ret = -1;
579
    gf_cli_req cli_req = {{
580
        0,
581
    }};
582
    dict_t *dict = NULL;
583
    int32_t count = 0;
584
    char *brick = NULL;
585
    char key[64] = "";
586
    int keylen;
587
    int i = 1;
588
    glusterd_conf_t *conf = NULL;
589
    glusterd_volinfo_t *volinfo = NULL;
590
    glusterd_brickinfo_t *brickinfo = NULL;
591
    glusterd_brickinfo_t **brickinfo_list = NULL;
592
    int *subvols = NULL;
593
    char err_str[2048] = "";
594
    gf_cli_rsp rsp = {
595
        0,
596
    };
597
    void *cli_rsp = NULL;
598
    char vol_type[256] = "";
599
    int32_t replica_count = 0;
600
    char *volname = 0;
601
    xlator_t *this = THIS;
602
    int cmd = -1;
603

604
    GF_ASSERT(req);
605
    conf = this->private;
606
    GF_ASSERT(conf);
607

608
    ret = xdr_to_generic(req->msg[0], &cli_req, (xdrproc_t)xdr_gf_cli_req);
609
    if (ret < 0) {
610
        // failed to decode msg;
611
        req->rpc_err = GARBAGE_ARGS;
612
        snprintf(err_str, sizeof(err_str), "Received garbage args");
613
        gf_smsg(this->name, GF_LOG_ERROR, errno, GD_MSG_GARBAGE_ARGS, NULL);
614
        goto out;
615
    }
616

617
    gf_msg(this->name, GF_LOG_INFO, 0, GD_MSG_REM_BRICK_REQ_RECVD,
618
           "Received rem brick req");
619

620
    if (cli_req.dict.dict_len) {
621
        /* Unserialize the dictionary */
622
        dict = dict_new();
623

624
        ret = dict_unserialize(cli_req.dict.dict_val, cli_req.dict.dict_len,
625
                               &dict);
626
        if (ret < 0) {
627
            gf_msg(this->name, GF_LOG_ERROR, errno,
628
                   GD_MSG_DICT_UNSERIALIZE_FAIL,
629
                   "failed to "
630
                   "unserialize req-buffer to dictionary");
631
            snprintf(err_str, sizeof(err_str),
632
                     "Unable to decode "
633
                     "the command");
634
            goto out;
635
        }
636
    }
637

638
    ret = dict_get_str(dict, "volname", &volname);
639
    if (ret) {
640
        snprintf(err_str, sizeof(err_str),
641
                 "Unable to get volume "
642
                 "name");
643
        gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED, "%s",
644
               err_str);
645
        goto out;
646
    }
647

648
    ret = dict_get_int32(dict, "count", &count);
649
    if (ret) {
650
        snprintf(err_str, sizeof(err_str),
651
                 "Unable to get brick "
652
                 "count");
653
        gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED, "%s",
654
               err_str);
655
        goto out;
656
    }
657

658
    ret = glusterd_volinfo_find(volname, &volinfo);
659
    if (ret) {
660
        snprintf(err_str, sizeof(err_str), "Volume %s does not exist", volname);
661
        gf_msg(this->name, GF_LOG_ERROR, EINVAL, GD_MSG_VOL_NOT_FOUND, "%s",
662
               err_str);
663
        goto out;
664
    }
665

666
    ret = dict_get_int32(dict, "command", &cmd);
667
    if (ret) {
668
        snprintf(err_str, sizeof(err_str),
669
                 "Unable to get cmd "
670
                 "ccommand");
671
        gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED, "%s",
672
               err_str);
673
        goto out;
674
    }
675

676
    ret = dict_get_int32(dict, "replica-count", &replica_count);
677
    if (!ret) {
678
        gf_msg(this->name, GF_LOG_INFO, -ret, GD_MSG_DICT_GET_FAILED,
679
               "request to change replica-count to %d", replica_count);
680
        ret = gd_rmbr_validate_replica_count(volinfo, replica_count, count,
681
                                             err_str, sizeof(err_str));
682
        if (ret < 0) {
683
            /* logging and error msg are done in above function
684
               itself */
685
            goto out;
686
        }
687
        dict_del_sizen(dict, "replica-count");
688
        if (ret) {
689
            replica_count = 0;
690
        } else {
691
            ret = dict_set_int32_sizen(dict, "replica-count", replica_count);
692
            if (ret) {
693
                gf_msg(this->name, GF_LOG_WARNING, -ret, GD_MSG_DICT_SET_FAILED,
694
                       "failed to set the replica_count "
695
                       "in dict");
696
                goto out;
697
            }
698
        }
699
    }
700

701
    /* 'vol_type' is used for giving the meaning full error msg for user */
702
    if (volinfo->type == GF_CLUSTER_TYPE_REPLICATE) {
703
        strcpy(vol_type, "replica");
704
    } else if (volinfo->type == GF_CLUSTER_TYPE_DISPERSE) {
705
        strcpy(vol_type, "disperse");
706
    } else {
707
        strcpy(vol_type, "distribute");
708
    }
709

710
    if (!replica_count && (volinfo->type == GF_CLUSTER_TYPE_REPLICATE) &&
711
        (volinfo->brick_count == volinfo->dist_leaf_count)) {
712
        snprintf(err_str, sizeof(err_str),
713
                 "Removing bricks from replicate configuration "
714
                 "is not allowed without reducing replica count "
715
                 "explicitly.");
716
        gf_msg(this->name, GF_LOG_ERROR, EPERM, GD_MSG_OP_NOT_PERMITTED_AC_REQD,
717
               "%s", err_str);
718
        ret = -1;
719
        goto out;
720
    }
721

722
    /* Do not allow remove-brick if the bricks given is less than
723
       the replica count */
724
    if (!replica_count && (volinfo->type != GF_CLUSTER_TYPE_NONE)) {
725
        if (volinfo->dist_leaf_count && (count % volinfo->dist_leaf_count)) {
726
            snprintf(err_str, sizeof(err_str),
727
                     "Remove brick "
728
                     "incorrect brick count of %d for %s %d",
729
                     count, vol_type, volinfo->dist_leaf_count);
730
            gf_msg(this->name, GF_LOG_ERROR, EINVAL, GD_MSG_INVALID_ENTRY, "%s",
731
                   err_str);
732
            ret = -1;
733
            goto out;
734
        }
735
    }
736

737
    if ((volinfo->type != GF_CLUSTER_TYPE_NONE) &&
738
        (volinfo->subvol_count > 1)) {
739
        ret = subvol_matcher_init(&subvols, volinfo->subvol_count);
740
        if (ret)
741
            goto out;
742
    }
743

744
    brickinfo_list = GF_CALLOC(count, sizeof(*brickinfo_list),
745
                               gf_common_mt_pointer);
746
    if (!brickinfo_list) {
747
        ret = -1;
748
        goto out;
749
    }
750

751
    while (i <= count) {
752
        keylen = snprintf(key, sizeof(key), "brick%d", i);
753
        ret = dict_get_strn(dict, key, keylen, &brick);
754
        if (ret) {
755
            snprintf(err_str, sizeof(err_str), "Unable to get %s", key);
756
            gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED, "%s",
757
                   err_str);
758
            goto out;
759
        }
760
        gf_msg_debug(this->name, 0,
761
                     "Remove brick count %d brick:"
762
                     " %s",
763
                     i, brick);
764

765
        ret = glusterd_volume_brickinfo_get_by_brick(brick, volinfo, &brickinfo,
766
                                                     _gf_false);
767

768
        if (ret) {
769
            snprintf(err_str, sizeof(err_str),
770
                     "Incorrect brick "
771
                     "%s for volume %s",
772
                     brick, volname);
773
            gf_msg(this->name, GF_LOG_ERROR, EINVAL, GD_MSG_BRICK_NOT_FOUND,
774
                   "%s", err_str);
775
            goto out;
776
        }
777
        brickinfo_list[i - 1] = brickinfo;
778

779
        i++;
780
        if ((volinfo->type == GF_CLUSTER_TYPE_NONE) ||
781
            (volinfo->brick_count <= volinfo->dist_leaf_count))
782
            continue;
783

784
        subvol_matcher_update(subvols, volinfo, brickinfo);
785
    }
786

787
    if ((volinfo->type != GF_CLUSTER_TYPE_NONE) &&
788
        (volinfo->subvol_count > 1)) {
789
        ret = subvol_matcher_verify(subvols, volinfo, err_str, sizeof(err_str),
790
                                    vol_type, replica_count);
791
        if (ret)
792
            goto out;
793
    }
794

795
    ret = glusterd_remove_brick_validate_arbiters(volinfo, count, replica_count,
796
                                                  brickinfo_list, err_str,
797
                                                  sizeof(err_str));
798
    if (ret)
799
        goto out;
800

801
    if (conf->op_version < GD_OP_VERSION_8_0) {
802
        gf_msg_debug(this->name, 0,
803
                     "The cluster is operating at "
804
                     "version less than %d. remove-brick operation"
805
                     "falling back to syncop framework.",
806
                     GD_OP_VERSION_8_0);
807
        ret = glusterd_op_begin_synctask(req, GD_OP_REMOVE_BRICK, dict);
808
    } else {
809
        ret = glusterd_mgmt_v3_initiate_all_phases(req, GD_OP_REMOVE_BRICK,
810
                                                   dict);
811
    }
812

813
out:
814
    if (ret) {
815
        rsp.op_ret = -1;
816
        rsp.op_errno = 0;
817
        if (err_str[0] == '\0')
818
            snprintf(err_str, sizeof(err_str), "Operation failed");
819
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_GLUSTERD_OP_FAILED, "%s",
820
               err_str);
821
        rsp.op_errstr = err_str;
822
        cli_rsp = &rsp;
823
        glusterd_to_cli(req, cli_rsp, NULL, 0, NULL, (xdrproc_t)xdr_gf_cli_rsp,
824
                        dict);
825

826
        ret = 0;  // sent error to cli, prevent second reply
827
    }
828

829
    if (brickinfo_list)
830
        GF_FREE(brickinfo_list);
831
    subvol_matcher_destroy(subvols);
832
    free(cli_req.dict.dict_val);  // its malloced by xdr
833

834
    return ret;
835
}
836

837
int
838
glusterd_handle_remove_brick(rpcsvc_request_t *req)
839
{
840
    return glusterd_big_locked_handler(req, __glusterd_handle_remove_brick);
841
}
842

843
static int
844
_glusterd_restart_gsync_session(dict_t *this, char *key, data_t *value,
845
                                void *data)
846
{
847
    char *secondary = NULL;
848
    char *secondary_buf = NULL;
849
    char *path_list = NULL;
850
    char *secondary_vol = NULL;
851
    char *secondary_host = NULL;
852
    char *secondary_url = NULL;
853
    char *conf_path = NULL;
854
    char **errmsg = NULL;
855
    int ret = -1;
856
    glusterd_gsync_status_temp_t *param = NULL;
857
    gf_boolean_t is_running = _gf_false;
858

859
    param = (glusterd_gsync_status_temp_t *)data;
860

861
    GF_ASSERT(param);
862
    GF_ASSERT(param->volinfo);
863

864
    secondary = strchr(value->data, ':');
865
    if (secondary) {
866
        secondary++;
867
        secondary_buf = gf_strdup(secondary);
868
        if (!secondary_buf) {
869
            gf_msg("glusterd", GF_LOG_ERROR, ENOMEM, GD_MSG_NO_MEMORY,
870
                   "Failed to gf_strdup");
871
            ret = -1;
872
            goto out;
873
        }
874
    } else
875
        return 0;
876

877
    ret = dict_set_dynstr_sizen(param->rsp_dict, "secondary", secondary_buf);
878
    if (ret) {
879
        gf_msg("glusterd", GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
880
               "Unable to store secondary");
881
        if (secondary_buf)
882
            GF_FREE(secondary_buf);
883
        goto out;
884
    }
885

886
    ret = glusterd_get_secondary_details_confpath(
887
        param->volinfo, param->rsp_dict, &secondary_url, &secondary_host,
888
        &secondary_vol, &conf_path, errmsg);
889
    if (ret) {
890
        if (errmsg && *errmsg)
891
            gf_msg("glusterd", GF_LOG_ERROR, 0,
892
                   GD_MSG_SECONDARY_CONFPATH_DETAILS_FETCH_FAIL, "%s", *errmsg);
893
        else
894
            gf_msg("glusterd", GF_LOG_ERROR, 0,
895
                   GD_MSG_SECONDARY_CONFPATH_DETAILS_FETCH_FAIL,
896
                   "Unable to fetch secondary or confpath details.");
897
        goto out;
898
    }
899

900
    /* In cases that gsyncd is not running, we will not invoke it
901
     * because of add-brick. */
902
    ret = glusterd_check_gsync_running_local(param->volinfo->volname, secondary,
903
                                             conf_path, &is_running);
904
    if (ret) {
905
        gf_msg("glusterd", GF_LOG_ERROR, 0, GD_MSG_GSYNC_VALIDATION_FAIL,
906
               "gsync running validation failed.");
907
        goto out;
908
    }
909
    if (_gf_false == is_running) {
910
        gf_msg_debug("glusterd", 0,
911
                     "gsync session for %s and %s is"
912
                     " not running on this node. Hence not restarting.",
913
                     param->volinfo->volname, secondary);
914
        ret = 0;
915
        goto out;
916
    }
917

918
    ret = glusterd_get_local_brickpaths(param->volinfo, &path_list);
919
    if (!path_list) {
920
        gf_msg_debug("glusterd", 0,
921
                     "This node not being part of"
922
                     " volume should not be running gsyncd. Hence"
923
                     " no gsyncd process to restart.");
924
        ret = 0;
925
        goto out;
926
    }
927

928
    ret = glusterd_check_restart_gsync_session(
929
        param->volinfo, secondary, param->rsp_dict, path_list, conf_path, 0);
930
    if (ret)
931
        gf_msg("glusterd", GF_LOG_ERROR, 0, GD_MSG_GSYNC_RESTART_FAIL,
932
               "Unable to restart gsync session.");
933

934
out:
935
    gf_msg_debug("glusterd", 0, "Returning %d.", ret);
936
    return ret;
937
}
938

939
/* op-sm */
940

941
int
942
glusterd_op_perform_add_bricks(glusterd_volinfo_t *volinfo, int32_t count,
943
                               char *bricks, dict_t *dict)
944
{
945
    char *brick = NULL;
946
    int32_t i = 1;
947
    char *brick_list = NULL;
948
    char *free_ptr1 = NULL;
949
    char *free_ptr2 = NULL;
950
    char *saveptr = NULL;
951
    int32_t ret = -1;
952
    int32_t replica_count = 0;
953
    int32_t arbiter_count = 0;
954
    int32_t type = 0;
955
    glusterd_brickinfo_t *brickinfo = NULL;
956
    glusterd_gsync_status_temp_t param = {
957
        0,
958
    };
959
    gf_boolean_t restart_needed = 0;
960
    int brickid = 0;
961
    char key[64] = "";
962
    char *brick_mount_dir = NULL;
963
    xlator_t *this = THIS;
964
    gf_boolean_t is_valid_add_brick = _gf_false;
965
    gf_boolean_t restart_shd = _gf_false;
966
    struct statvfs brickstat = {
967
        0,
968
    };
969

970
    GF_ASSERT(volinfo);
971

972
    if (bricks) {
973
        brick_list = gf_strdup(bricks);
974
        free_ptr1 = brick_list;
975
    }
976

977
    if (count)
978
        brick = strtok_r(brick_list + 1, " \n", &saveptr);
979

980
    if (dict) {
981
        ret = dict_get_int32(dict, "replica-count", &replica_count);
982
        if (!ret)
983
            gf_msg(this->name, GF_LOG_INFO, errno, GD_MSG_DICT_GET_SUCCESS,
984
                   "replica-count is set %d", replica_count);
985
        ret = dict_get_int32(dict, "arbiter-count", &arbiter_count);
986
        if (!ret)
987
            gf_msg(this->name, GF_LOG_INFO, errno, GD_MSG_DICT_GET_SUCCESS,
988
                   "arbiter-count is set %d", arbiter_count);
989
        ret = dict_get_int32(dict, "type", &type);
990
        if (!ret)
991
            gf_msg(this->name, GF_LOG_INFO, errno, GD_MSG_DICT_GET_SUCCESS,
992
                   "type is set %d, need to change it", type);
993
    }
994

995
    brickid = glusterd_get_next_available_brickid(volinfo);
996
    if (brickid < 0)
997
        goto out;
998
    while (i <= count) {
999
        ret = glusterd_brickinfo_new_from_brick(brick, &brickinfo, _gf_true,
1000
                                                NULL);
1001
        if (ret)
1002
            goto out;
1003

1004
        GLUSTERD_ASSIGN_BRICKID_TO_BRICKINFO(brickinfo, volinfo, brickid++);
1005

1006
        brick_mount_dir = NULL;
1007

1008
        snprintf(key, sizeof(key), "brick%d.mount_dir", i);
1009
        ret = dict_get_str(dict, key, &brick_mount_dir);
1010
        if (ret) {
1011
            gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
1012
                   "%s not present", key);
1013
            goto out;
1014
        }
1015
        strncpy(brickinfo->mount_dir, brick_mount_dir,
1016
                sizeof(brickinfo->mount_dir) - 1);
1017

1018
        ret = glusterd_resolve_brick(brickinfo);
1019
        if (ret)
1020
            goto out;
1021

1022
        if (!gf_uuid_compare(brickinfo->uuid, MY_UUID)) {
1023
            ret = sys_statvfs(brickinfo->path, &brickstat);
1024
            if (ret) {
1025
                gf_msg(this->name, GF_LOG_ERROR, errno, GD_MSG_STATVFS_FAILED,
1026
                       "Failed to fetch disk utilization "
1027
                       "from the brick (%s:%s). Please check the health of "
1028
                       "the brick. Error code was %s",
1029
                       brickinfo->hostname, brickinfo->path, strerror(errno));
1030

1031
                goto out;
1032
            }
1033
            brickinfo->statfs_fsid = brickstat.f_fsid;
1034
        }
1035
        if (replica_count) {
1036
            add_brick_at_right_order(brickinfo, volinfo, (i - 1),
1037
                                     replica_count);
1038
        } else {
1039
            cds_list_add_tail(&brickinfo->brick_list, &volinfo->bricks);
1040
        }
1041
        brick = strtok_r(NULL, " \n", &saveptr);
1042
        i++;
1043
        volinfo->brick_count++;
1044
    }
1045

1046
    /* Gets changed only if the options are given in add-brick cli */
1047
    if (type)
1048
        volinfo->type = type;
1049
    /* performance.client-io-threads is turned on by default,
1050
     * however this has adverse effects on replicate volumes due to
1051
     * replication design issues, till that get addressed
1052
     * performance.client-io-threads option is turned off for all
1053
     * replicate volumes if not already explicitly enabled.
1054
     */
1055
    if (type && glusterd_is_volume_replicate(volinfo)) {
1056
        ret = dict_set_sizen_str_sizen(volinfo->dict,
1057
                                       "performance.client-io-threads", "off");
1058
        if (ret) {
1059
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
1060
                   "Failed to set "
1061
                   "performance.client-io-threads to off");
1062
            goto out;
1063
        }
1064
    }
1065

1066
    if (replica_count) {
1067
        volinfo->replica_count = replica_count;
1068
    }
1069
    if (arbiter_count) {
1070
        volinfo->arbiter_count = arbiter_count;
1071
    }
1072
    volinfo->dist_leaf_count = glusterd_get_dist_leaf_count(volinfo);
1073

1074
    /* backward compatibility */
1075
    volinfo->sub_count = ((volinfo->dist_leaf_count == 1)
1076
                              ? 0
1077
                              : volinfo->dist_leaf_count);
1078

1079
    volinfo->subvol_count = (volinfo->brick_count / volinfo->dist_leaf_count);
1080

1081
    ret = 0;
1082
    if (GLUSTERD_STATUS_STARTED != volinfo->status)
1083
        goto generate_volfiles;
1084

1085
    ret = generate_brick_volfiles(volinfo);
1086
    if (ret)
1087
        goto out;
1088

1089
    brick_list = gf_strdup(bricks);
1090
    free_ptr2 = brick_list;
1091
    i = 1;
1092

1093
    if (count)
1094
        brick = strtok_r(brick_list + 1, " \n", &saveptr);
1095

1096
    if (glusterd_is_volume_replicate(volinfo)) {
1097
        if (replica_count) {
1098
            is_valid_add_brick = _gf_true;
1099
            if (volinfo->status == GLUSTERD_STATUS_STARTED) {
1100
                ret = volinfo->shd.svc.stop(&(volinfo->shd.svc), SIGTERM);
1101
                if (ret) {
1102
                    gf_msg(this->name, GF_LOG_ERROR, 0,
1103
                           GD_MSG_GLUSTER_SERVICES_STOP_FAIL,
1104
                           "Failed to stop shd for %s.", volinfo->volname);
1105
                }
1106
                restart_shd = _gf_true;
1107
            }
1108
            ret = generate_dummy_client_volfiles(volinfo);
1109
            if (ret) {
1110
                gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOLFILE_CREATE_FAIL,
1111
                       "Failed to create volfile.");
1112
                goto out;
1113
            }
1114
        }
1115
    }
1116

1117
    while (i <= count) {
1118
        ret = glusterd_volume_brickinfo_get_by_brick(brick, volinfo, &brickinfo,
1119
                                                     _gf_true);
1120
        if (ret)
1121
            goto out;
1122

1123
        if (gf_uuid_is_null(brickinfo->uuid)) {
1124
            ret = glusterd_resolve_brick(brickinfo);
1125
            if (ret) {
1126
                gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_RESOLVE_BRICK_FAIL,
1127
                       FMTSTR_RESOLVE_BRICK, brickinfo->hostname,
1128
                       brickinfo->path);
1129
                goto out;
1130
            }
1131
        }
1132

1133
        /* if the volume is a replicate volume, do: */
1134
        if (is_valid_add_brick) {
1135
            if (!gf_uuid_compare(brickinfo->uuid, MY_UUID)) {
1136
                ret = glusterd_handle_replicate_brick_ops(volinfo, brickinfo,
1137
                                                          GD_OP_ADD_BRICK);
1138
                if (ret < 0)
1139
                    goto out;
1140
            }
1141
        }
1142
        ret = glusterd_brick_start(volinfo, brickinfo, _gf_true, _gf_false);
1143
        if (ret)
1144
            goto out;
1145
        i++;
1146
        brick = strtok_r(NULL, " \n", &saveptr);
1147

1148
        /* Check if the brick is added in this node, and set
1149
         * the restart_needed flag. */
1150
        if ((!gf_uuid_compare(brickinfo->uuid, MY_UUID)) && !restart_needed) {
1151
            restart_needed = 1;
1152
            gf_msg_debug(this->name, 0,
1153
                         "Restart gsyncd session, if it's already "
1154
                         "running.");
1155
        }
1156
    }
1157

1158
    /* If the restart_needed flag is set, restart gsyncd sessions for that
1159
     * particular primary with all the secondaries. */
1160
    if (restart_needed) {
1161
        param.rsp_dict = dict;
1162
        param.volinfo = volinfo;
1163
        dict_foreach(volinfo->gsync_secondaries,
1164
                     _glusterd_restart_gsync_session, &param);
1165
    }
1166

1167
generate_volfiles:
1168
    /*
1169
     * The cluster is operating at version greater than
1170
     * gluster-3.7.5. So no need to sent volfile fetch
1171
     * request in commit phase, the same will be done
1172
     * in post validate phase with v3 framework.
1173
     */
1174

1175
out:
1176
    GF_FREE(free_ptr1);
1177
    GF_FREE(free_ptr2);
1178
    if (restart_shd) {
1179
        if (volinfo->shd.svc.manager(&(volinfo->shd.svc), volinfo,
1180
                                     PROC_START_NO_WAIT)) {
1181
            gf_msg(this->name, GF_LOG_CRITICAL, 0,
1182
                   GD_MSG_GLUSTER_SERVICE_START_FAIL,
1183
                   "Failed to start shd for %s.", volinfo->volname);
1184
        }
1185
    }
1186

1187
    gf_msg_debug(this->name, 0, "Returning %d", ret);
1188
    return ret;
1189
}
1190

1191
int
1192
glusterd_op_perform_remove_brick(glusterd_volinfo_t *volinfo, char *brick,
1193
                                 int force, int *need_migrate)
1194
{
1195
    glusterd_brickinfo_t *brickinfo = NULL;
1196
    int32_t ret = -1;
1197

1198
    GF_ASSERT(volinfo);
1199
    GF_ASSERT(brick);
1200

1201
    ret = glusterd_volume_brickinfo_get_by_brick(brick, volinfo, &brickinfo,
1202
                                                 _gf_false);
1203
    if (ret)
1204
        goto out;
1205

1206
    ret = glusterd_resolve_brick(brickinfo);
1207
    if (ret)
1208
        goto out;
1209

1210
    glusterd_volinfo_reset_defrag_stats(volinfo);
1211

1212
    if (!gf_uuid_compare(brickinfo->uuid, MY_UUID)) {
1213
        /* Only if the brick is in this glusterd, do the rebalance */
1214
        if (need_migrate)
1215
            *need_migrate = 1;
1216
    }
1217

1218
    if (force) {
1219
        ret = glusterd_brick_stop(volinfo, brickinfo, _gf_true);
1220
        if (ret) {
1221
            gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_STOP_FAIL,
1222
                   "Unable to stop "
1223
                   "glusterfs, ret: %d",
1224
                   ret);
1225
        }
1226
        goto out;
1227
    }
1228

1229
    brickinfo->decommissioned = 1;
1230
    ret = 0;
1231
out:
1232
    gf_msg_debug("glusterd", 0, "Returning %d", ret);
1233
    return ret;
1234
}
1235

1236
int
1237
glusterd_op_stage_add_brick(dict_t *dict, char **op_errstr, dict_t *rsp_dict)
1238
{
1239
    int ret = 0;
1240
    char *volname = NULL;
1241
    int count = 0;
1242
    int replica_count = 0;
1243
    int i = 0;
1244
    int32_t local_brick_count = 0;
1245
    char *bricks = NULL;
1246
    char *brick_list = NULL;
1247
    char *saveptr = NULL;
1248
    char *free_ptr = NULL;
1249
    char *brick = NULL;
1250
    glusterd_brickinfo_t *brickinfo = NULL;
1251
    glusterd_volinfo_t *volinfo = NULL;
1252
    xlator_t *this = THIS;
1253
    char msg[4096] = "";
1254
    char key[64] = "";
1255
    gf_boolean_t brick_alloc = _gf_false;
1256
    char *all_bricks = NULL;
1257
    char *str_ret = NULL;
1258
    gf_boolean_t is_force = _gf_false;
1259
    int32_t len = 0;
1260

1261
    ret = dict_get_str(dict, "volname", &volname);
1262
    if (ret) {
1263
        gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
1264
               "Unable to get volume name");
1265
        goto out;
1266
    }
1267

1268
    ret = glusterd_volinfo_find(volname, &volinfo);
1269
    if (ret) {
1270
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOL_NOT_FOUND,
1271
               "Unable to find volume: %s", volname);
1272
        goto out;
1273
    }
1274

1275
    ret = glusterd_validate_volume_id(dict, volinfo);
1276
    if (ret)
1277
        goto out;
1278

1279
    ret = dict_get_int32(dict, "replica-count", &replica_count);
1280
    if (ret) {
1281
        gf_msg_debug(this->name, 0, "Unable to get replica count");
1282
    }
1283

1284
    glusterd_add_peers_to_auth_list(volname);
1285

1286
    if (replica_count && glusterd_is_volume_replicate(volinfo)) {
1287
        /* Do not allow add-brick for stopped volumes when replica-count
1288
         * is being increased.
1289
         */
1290
        if (GLUSTERD_STATUS_STOPPED == volinfo->status) {
1291
            ret = -1;
1292
            snprintf(msg, sizeof(msg),
1293
                     " Volume must not be in"
1294
                     " stopped state when replica-count needs to "
1295
                     " be increased.");
1296
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_ADD_FAIL, "%s",
1297
                   msg);
1298
            *op_errstr = gf_strdup(msg);
1299
            goto out;
1300
        }
1301

1302
        /* Do not allow increasing replica count for arbiter volumes. */
1303
        if (volinfo->arbiter_count) {
1304
            ret = -1;
1305
            snprintf(msg, sizeof(msg),
1306
                     "Increasing replica count "
1307
                     "for arbiter volumes is not supported.");
1308
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_ADD_FAIL, "%s",
1309
                   msg);
1310
            *op_errstr = gf_strdup(msg);
1311
            goto out;
1312
        }
1313
    }
1314

1315
    is_force = dict_get_str_boolean(dict, "force", _gf_false);
1316

1317
    /* Check brick order if the volume type is replicate or disperse. If
1318
     * force at the end of command not given then check brick order.
1319
     * doing this check at the originator node is sufficient.
1320
     */
1321

1322
    if (!is_force && is_origin_glusterd(dict)) {
1323
        ret = 0;
1324
        if (volinfo->type == GF_CLUSTER_TYPE_REPLICATE) {
1325
            gf_msg_debug(this->name, 0,
1326
                         "Replicate cluster type "
1327
                         "found. Checking brick order.");
1328
            if (replica_count && (replica_count != volinfo->replica_count))
1329
                ret = glusterd_check_brick_order(dict, msg, volinfo->type,
1330
                                                 &volname, &bricks, &count,
1331
                                                 replica_count, 1);
1332
            else
1333
                ret = glusterd_check_brick_order(dict, msg, volinfo->type,
1334
                                                 &volname, &bricks, &count,
1335
                                                 volinfo->replica_count, 0);
1336
        } else if (volinfo->type == GF_CLUSTER_TYPE_DISPERSE) {
1337
            gf_msg_debug(this->name, 0,
1338
                         "Disperse cluster type"
1339
                         " found. Checking brick order.");
1340
            ret = glusterd_check_brick_order(dict, msg, volinfo->type, &volname,
1341
                                             &bricks, &count,
1342
                                             volinfo->disperse_count, 0);
1343
        }
1344
        if (ret) {
1345
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BAD_BRKORDER,
1346
                   "Not adding brick because of "
1347
                   "bad brick order. %s",
1348
                   msg);
1349
            *op_errstr = gf_strdup(msg);
1350
            goto out;
1351
        }
1352
    }
1353

1354
    if (volinfo->replica_count < replica_count && !is_force) {
1355
        cds_list_for_each_entry(brickinfo, &volinfo->bricks, brick_list)
1356
        {
1357
            if (gf_uuid_compare(brickinfo->uuid, MY_UUID))
1358
                continue;
1359
            if (brickinfo->status == GF_BRICK_STOPPED) {
1360
                ret = -1;
1361
                len = snprintf(msg, sizeof(msg),
1362
                               "Brick %s "
1363
                               "is down, changing replica "
1364
                               "count needs all the bricks "
1365
                               "to be up to avoid data loss",
1366
                               brickinfo->path);
1367
                if (len < 0) {
1368
                    strcpy(msg, "<error>");
1369
                }
1370
                gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_ADD_FAIL, "%s",
1371
                       msg);
1372
                *op_errstr = gf_strdup(msg);
1373
                goto out;
1374
            }
1375
        }
1376
    }
1377

1378
    if (is_origin_glusterd(dict)) {
1379
        ret = glusterd_validate_quorum(this, GD_OP_ADD_BRICK, dict, op_errstr);
1380
        if (ret) {
1381
            gf_msg(this->name, GF_LOG_CRITICAL, 0, GD_MSG_SERVER_QUORUM_NOT_MET,
1382
                   "Server quorum not met. Rejecting operation.");
1383
            goto out;
1384
        }
1385
    } else {
1386
        /* Case 1: conf->op_version <= GD_OP_VERSION_3_7_5
1387
         *         in this case the add-brick is running
1388
         *         syncop framework that will do a quorum
1389
         *         check by default
1390
         * Case 2: We don't need to do quorum check on every
1391
         *         node, only originator glusterd need to
1392
         *         check for quorum
1393
         * So nothing need to be done in else
1394
         */
1395
    }
1396

1397
    if (glusterd_is_defrag_on(volinfo)) {
1398
        snprintf(msg, sizeof(msg),
1399
                 "Volume name %s rebalance is in "
1400
                 "progress. Please retry after completion",
1401
                 volname);
1402
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_OIP_RETRY_LATER, "%s", msg);
1403
        *op_errstr = gf_strdup(msg);
1404
        ret = -1;
1405
        goto out;
1406
    }
1407

1408
    if (volinfo->snap_count > 0 || !cds_list_empty(&volinfo->snap_volumes)) {
1409
        snprintf(msg, sizeof(msg),
1410
                 "Volume %s  has %" PRIu64
1411
                 " snapshots. "
1412
                 "Changing the volume configuration will not effect snapshots."
1413
                 "But the snapshot brick mount should be intact to "
1414
                 "make them function.",
1415
                 volname, volinfo->snap_count);
1416
        gf_msg("glusterd", GF_LOG_WARNING, 0, GD_MSG_SNAP_WARN, "%s", msg);
1417
        msg[0] = '\0';
1418
    }
1419

1420
    if (!count) {
1421
        ret = dict_get_int32(dict, "count", &count);
1422
        if (ret) {
1423
            gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
1424
                   "Unable to get count");
1425
            goto out;
1426
        }
1427
    }
1428

1429
    if (!bricks) {
1430
        ret = dict_get_str(dict, "bricks", &bricks);
1431
        if (ret) {
1432
            gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
1433
                   "Unable to get bricks");
1434
            goto out;
1435
        }
1436
    }
1437

1438
    if (bricks) {
1439
        brick_list = gf_strdup(bricks);
1440
        all_bricks = gf_strdup(bricks);
1441
        free_ptr = brick_list;
1442
    }
1443

1444
    if (count)
1445
        brick = strtok_r(brick_list + 1, " \n", &saveptr);
1446

1447
    while (i < count) {
1448
        if (!glusterd_store_is_valid_brickpath(volname, brick) ||
1449
            !glusterd_is_valid_volfpath(volname, brick)) {
1450
            snprintf(msg, sizeof(msg),
1451
                     "brick path %s is "
1452
                     "too long",
1453
                     brick);
1454
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRKPATH_TOO_LONG, "%s",
1455
                   msg);
1456
            *op_errstr = gf_strdup(msg);
1457

1458
            ret = -1;
1459
            goto out;
1460
        }
1461

1462
        ret = glusterd_brickinfo_new_from_brick(brick, &brickinfo, _gf_true,
1463
                                                NULL);
1464
        if (ret) {
1465
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_NOT_FOUND,
1466
                   "Add-brick: Unable"
1467
                   " to get brickinfo");
1468
            goto out;
1469
        }
1470
        brick_alloc = _gf_true;
1471

1472
        ret = glusterd_new_brick_validate(brick, brickinfo, msg, sizeof(msg),
1473
                                          NULL);
1474
        if (ret) {
1475
            *op_errstr = gf_strdup(msg);
1476
            ret = -1;
1477
            goto out;
1478
        }
1479

1480
        if (!gf_uuid_compare(brickinfo->uuid, MY_UUID)) {
1481
            ret = glusterd_validate_and_create_brickpath(
1482
                brickinfo, volinfo->volume_id, volinfo->volname, op_errstr,
1483
                is_force, _gf_false);
1484
            if (ret)
1485
                goto out;
1486

1487
            ret = glusterd_get_brick_mount_dir(
1488
                brickinfo->path, brickinfo->hostname, brickinfo->mount_dir);
1489
            if (ret) {
1490
                gf_msg(this->name, GF_LOG_ERROR, 0,
1491
                       GD_MSG_BRICK_MOUNTDIR_GET_FAIL,
1492
                       "Failed to get brick mount_dir");
1493
                goto out;
1494
            }
1495

1496
            snprintf(key, sizeof(key), "brick%d.mount_dir", i + 1);
1497
            ret = dict_set_dynstr_with_alloc(rsp_dict, key,
1498
                                             brickinfo->mount_dir);
1499
            if (ret) {
1500
                gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
1501
                       "Failed to set %s", key);
1502
                goto out;
1503
            }
1504

1505
            local_brick_count = i + 1;
1506
        }
1507

1508
        glusterd_brickinfo_delete(brickinfo);
1509
        brick_alloc = _gf_false;
1510
        brickinfo = NULL;
1511
        brick = strtok_r(NULL, " \n", &saveptr);
1512
        i++;
1513
    }
1514

1515
    ret = dict_set_int32_sizen(rsp_dict, "brick_count", local_brick_count);
1516
    if (ret) {
1517
        gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
1518
               "Failed to set local_brick_count");
1519
        goto out;
1520
    }
1521

1522
out:
1523
    GF_FREE(free_ptr);
1524
    if (brick_alloc && brickinfo)
1525
        glusterd_brickinfo_delete(brickinfo);
1526
    GF_FREE(str_ret);
1527
    GF_FREE(all_bricks);
1528

1529
    gf_msg_debug(this->name, 0, "Returning %d", ret);
1530

1531
    return ret;
1532
}
1533

1534
int
1535
glusterd_remove_brick_validate_bricks(gf1_op_commands cmd, int32_t brick_count,
1536
                                      dict_t *dict, glusterd_volinfo_t *volinfo,
1537
                                      char **errstr,
1538
                                      gf_defrag_type_t cmd_defrag)
1539
{
1540
    char *brick = NULL;
1541
    char msg[2048] = "";
1542
    char key[64] = "";
1543
    int keylen;
1544
    glusterd_brickinfo_t *brickinfo = NULL;
1545
    glusterd_peerinfo_t *peerinfo = NULL;
1546
    int i = 0;
1547
    int ret = -1;
1548
    char pidfile[PATH_MAX + 1] = {
1549
        0,
1550
    };
1551
    xlator_t *this = THIS;
1552
    glusterd_conf_t *priv = this->private;
1553
    int pid = -1;
1554

1555
    /* Check whether all the nodes of the bricks to be removed are
1556
     * up, if not fail the operation */
1557
    for (i = 1; i <= brick_count; i++) {
1558
        keylen = snprintf(key, sizeof(key), "brick%d", i);
1559
        ret = dict_get_strn(dict, key, keylen, &brick);
1560
        if (ret) {
1561
            snprintf(msg, sizeof(msg), "Unable to get %s", key);
1562
            gf_smsg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
1563
                    "key=%s", key, NULL);
1564
            *errstr = gf_strdup(msg);
1565
            goto out;
1566
        }
1567

1568
        ret = glusterd_volume_brickinfo_get_by_brick(brick, volinfo, &brickinfo,
1569
                                                     _gf_false);
1570
        if (ret) {
1571
            snprintf(msg, sizeof(msg),
1572
                     "Incorrect brick "
1573
                     "%s for volume %s",
1574
                     brick, volinfo->volname);
1575
            gf_smsg(this->name, GF_LOG_ERROR, errno, GD_MSG_INCORRECT_BRICK,
1576
                    "Brick=%s, Volume=%s", brick, volinfo->volname, NULL);
1577
            *errstr = gf_strdup(msg);
1578
            goto out;
1579
        }
1580
        /* Do not allow commit if the bricks are not decommissioned
1581
         * if its a remove brick commit
1582
         */
1583
        if (!brickinfo->decommissioned && cmd == GF_OP_CMD_COMMIT) {
1584
            snprintf(msg, sizeof(msg),
1585
                     "Brick %s "
1586
                     "is not decommissioned. "
1587
                     "Use start or force option",
1588
                     brick);
1589
            gf_smsg(this->name, GF_LOG_ERROR, errno, GD_MSG_BRICK_NOT_DECOM,
1590
                    "Use 'start' or 'force' option, Brick=%s", brick, NULL);
1591
            *errstr = gf_strdup(msg);
1592
            ret = -1;
1593
            goto out;
1594
        }
1595

1596
        if (glusterd_is_local_brick(volinfo, brickinfo)) {
1597
            switch (cmd) {
1598
                case GF_OP_CMD_START:
1599
                    goto check;
1600
                case GF_OP_CMD_NONE:
1601
                default:
1602
                    break;
1603
            }
1604

1605
            switch (cmd_defrag) {
1606
                case GF_DEFRAG_CMD_NONE:
1607
                default:
1608
                    continue;
1609
            }
1610
        check:
1611
            if (brickinfo->status != GF_BRICK_STARTED) {
1612
                snprintf(msg, sizeof(msg),
1613
                         "Found stopped "
1614
                         "brick %s. Use force option to "
1615
                         "remove the offline brick",
1616
                         brick);
1617
                gf_smsg(
1618
                    this->name, GF_LOG_ERROR, errno, GD_MSG_BRICK_STOPPED,
1619
                    "Use 'force' option to remove the offline brick, Brick=%s",
1620
                    brick, NULL);
1621
                *errstr = gf_strdup(msg);
1622
                ret = -1;
1623
                goto out;
1624
            }
1625
            GLUSTERD_GET_BRICK_PIDFILE(pidfile, volinfo, brickinfo, priv);
1626
            if (!gf_is_service_running(pidfile, &pid)) {
1627
                snprintf(msg, sizeof(msg),
1628
                         "Found dead "
1629
                         "brick %s",
1630
                         brick);
1631
                gf_smsg(this->name, GF_LOG_ERROR, errno, GD_MSG_BRICK_DEAD,
1632
                        "Brick=%s", brick, NULL);
1633
                *errstr = gf_strdup(msg);
1634
                ret = -1;
1635
                goto out;
1636
            } else {
1637
                ret = 0;
1638
            }
1639
            continue;
1640
        }
1641

1642
        RCU_READ_LOCK;
1643
        peerinfo = glusterd_peerinfo_find_by_uuid(brickinfo->uuid);
1644
        if (!peerinfo) {
1645
            RCU_READ_UNLOCK;
1646
            snprintf(msg, sizeof(msg),
1647
                     "Host node of the "
1648
                     "brick %s is not in cluster",
1649
                     brick);
1650
            gf_smsg(this->name, GF_LOG_ERROR, errno,
1651
                    GD_MSG_BRICK_HOST_NOT_FOUND, "Brick=%s", brick, NULL);
1652
            *errstr = gf_strdup(msg);
1653
            ret = -1;
1654
            goto out;
1655
        }
1656
        if (!peerinfo->connected) {
1657
            RCU_READ_UNLOCK;
1658
            snprintf(msg, sizeof(msg),
1659
                     "Host node of the "
1660
                     "brick %s is down",
1661
                     brick);
1662
            gf_smsg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_HOST_DOWN,
1663
                    "Brick=%s", brick, NULL);
1664
            *errstr = gf_strdup(msg);
1665
            ret = -1;
1666
            goto out;
1667
        }
1668
        RCU_READ_UNLOCK;
1669
    }
1670

1671
out:
1672
    return ret;
1673
}
1674

1675
int
1676
glusterd_op_stage_remove_brick(dict_t *dict, char **op_errstr)
1677
{
1678
    int ret = -1;
1679
    char *volname = NULL;
1680
    glusterd_volinfo_t *volinfo = NULL;
1681
    char *errstr = NULL;
1682
    int32_t brick_count = 0;
1683
    char msg[2048] = "";
1684
    int32_t flag = 0;
1685
    gf1_op_commands cmd = GF_OP_CMD_NONE;
1686
    char *task_id_str = NULL;
1687
    xlator_t *this = THIS;
1688
    gsync_status_param_t param = {
1689
        0,
1690
    };
1691

1692
    ret = dict_get_str(dict, "volname", &volname);
1693
    if (ret) {
1694
        gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
1695
               "Unable to get volume name");
1696
        goto out;
1697
    }
1698

1699
    ret = glusterd_volinfo_find(volname, &volinfo);
1700

1701
    if (ret) {
1702
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOL_NOT_FOUND,
1703
               "Volume %s does not exist", volname);
1704
        goto out;
1705
    }
1706

1707
    ret = glusterd_validate_volume_id(dict, volinfo);
1708
    if (ret)
1709
        goto out;
1710

1711
    ret = dict_get_int32(dict, "command", &flag);
1712
    if (ret) {
1713
        gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
1714
               "Unable to get brick command");
1715
        goto out;
1716
    }
1717
    cmd = flag;
1718

1719
    ret = dict_get_int32(dict, "count", &brick_count);
1720
    if (ret) {
1721
        gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
1722
               "Unable to get brick count");
1723
        goto out;
1724
    }
1725

1726
    ret = 0;
1727
    if (volinfo->brick_count == brick_count) {
1728
        errstr = gf_strdup(
1729
            "Deleting all the bricks of the "
1730
            "volume is not allowed");
1731
        gf_smsg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_DELETE, NULL);
1732
        ret = -1;
1733
        goto out;
1734
    }
1735

1736
    ret = -1;
1737
    switch (cmd) {
1738
        case GF_OP_CMD_NONE:
1739
            errstr = gf_strdup("no remove-brick command issued");
1740
            gf_smsg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_NO_REMOVE_CMD,
1741
                    NULL);
1742
            goto out;
1743

1744
        case GF_OP_CMD_STATUS:
1745
            ret = 0;
1746
            goto out;
1747
        case GF_OP_CMD_START: {
1748
            if ((volinfo->type == GF_CLUSTER_TYPE_REPLICATE) &&
1749
                dict_get_sizen(dict, "replica-count")) {
1750
                snprintf(msg, sizeof(msg),
1751
                         "Migration of data is not "
1752
                         "needed when reducing replica count. Use the"
1753
                         " 'force' option");
1754
                errstr = gf_strdup(msg);
1755
                gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_USE_THE_FORCE, "%s",
1756
                       errstr);
1757
                goto out;
1758
            }
1759

1760
            if (GLUSTERD_STATUS_STARTED != volinfo->status) {
1761
                snprintf(msg, sizeof(msg),
1762
                         "Volume %s needs "
1763
                         "to be started before remove-brick "
1764
                         "(you can use 'force' or 'commit' "
1765
                         "to override this behavior)",
1766
                         volinfo->volname);
1767
                errstr = gf_strdup(msg);
1768
                gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOL_NOT_STARTED,
1769
                       "%s", errstr);
1770
                goto out;
1771
            }
1772
            if (!gd_is_remove_brick_committed(volinfo)) {
1773
                snprintf(msg, sizeof(msg),
1774
                         "An earlier remove-brick "
1775
                         "task exists for volume %s. Either commit it"
1776
                         " or stop it before starting a new task.",
1777
                         volinfo->volname);
1778
                errstr = gf_strdup(msg);
1779
                gf_msg(this->name, GF_LOG_ERROR, 0,
1780
                       GD_MSG_OLD_REMOVE_BRICK_EXISTS,
1781
                       "Earlier remove-brick"
1782
                       " task exists for volume %s.",
1783
                       volinfo->volname);
1784
                goto out;
1785
            }
1786
            if (glusterd_is_defrag_on(volinfo)) {
1787
                errstr = gf_strdup(
1788
                    "Rebalance is in progress. Please "
1789
                    "retry after completion");
1790
                gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_OIP_RETRY_LATER,
1791
                       "%s", errstr);
1792
                goto out;
1793
            }
1794

1795
            if (volinfo->snap_count > 0 ||
1796
                !cds_list_empty(&volinfo->snap_volumes)) {
1797
                snprintf(msg, sizeof(msg),
1798
                         "Volume %s  has %" PRIu64
1799
                         " snapshots. "
1800
                         "Changing the volume configuration will not effect "
1801
                         "snapshots."
1802
                         "But the snapshot brick mount should be intact to "
1803
                         "make them function.",
1804
                         volname, volinfo->snap_count);
1805
                gf_msg("glusterd", GF_LOG_WARNING, 0, GD_MSG_SNAP_WARN, "%s",
1806
                       msg);
1807
                msg[0] = '\0';
1808
            }
1809

1810
            ret = glusterd_remove_brick_validate_bricks(
1811
                cmd, brick_count, dict, volinfo, &errstr, GF_DEFRAG_CMD_NONE);
1812
            if (ret)
1813
                goto out;
1814

1815
            if (is_origin_glusterd(dict)) {
1816
                ret = glusterd_generate_and_set_task_id(
1817
                    dict, GF_REMOVE_BRICK_TID_KEY,
1818
                    SLEN(GF_REMOVE_BRICK_TID_KEY));
1819
                if (ret) {
1820
                    gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_TASKID_GEN_FAIL,
1821
                           "Failed to generate task-id");
1822
                    goto out;
1823
                }
1824
            } else {
1825
                ret = dict_get_str(dict, GF_REMOVE_BRICK_TID_KEY, &task_id_str);
1826
                if (ret) {
1827
                    gf_msg(this->name, GF_LOG_WARNING, -ret,
1828
                           GD_MSG_DICT_GET_FAILED, "Missing remove-brick-id");
1829
                    ret = 0;
1830
                }
1831
            }
1832
            break;
1833
        }
1834

1835
        case GF_OP_CMD_STOP:
1836
            ret = 0;
1837
            break;
1838

1839
        case GF_OP_CMD_COMMIT:
1840
            if (volinfo->decommission_in_progress) {
1841
                errstr = gf_strdup(
1842
                    "use 'force' option as migration "
1843
                    "is in progress");
1844
                gf_smsg(this->name, GF_LOG_WARNING, 0, GD_MSG_MIGRATION_PROG,
1845
                        "Use 'force' option", NULL);
1846
                goto out;
1847
            }
1848

1849
            if (volinfo->rebal.defrag_status == GF_DEFRAG_STATUS_FAILED) {
1850
                errstr = gf_strdup(
1851
                    "use 'force' option as migration "
1852
                    "has failed");
1853
                gf_smsg(this->name, GF_LOG_WARNING, 0, GD_MSG_MIGRATION_FAIL,
1854
                        "Use 'force' option", NULL);
1855
                goto out;
1856
            }
1857

1858
            if (volinfo->rebal.defrag_status == GF_DEFRAG_STATUS_COMPLETE) {
1859
                if (volinfo->rebal.rebalance_failures > 0 ||
1860
                    volinfo->rebal.skipped_files > 0) {
1861
                    errstr = gf_strdup(
1862
                        "use 'force' option as migration "
1863
                        "of some files might have been skipped or "
1864
                        "has failed");
1865
                    gf_smsg(this->name, GF_LOG_WARNING, 0,
1866
                            GD_MSG_MIGRATION_FAIL,
1867
                            "Use 'force' option, some files might have been "
1868
                            "skipped",
1869
                            NULL);
1870
                    goto out;
1871
                }
1872
            }
1873

1874
            ret = glusterd_remove_brick_validate_bricks(
1875
                cmd, brick_count, dict, volinfo, &errstr, GF_DEFRAG_CMD_NONE);
1876
            if (ret)
1877
                goto out;
1878

1879
            /* If geo-rep is configured, for this volume, it should be
1880
             * stopped.
1881
             */
1882
            param.volinfo = volinfo;
1883
            ret = glusterd_check_geo_rep_running(&param, op_errstr);
1884
            if (ret || param.is_active) {
1885
                ret = -1;
1886
                goto out;
1887
            }
1888

1889
            break;
1890

1891
        case GF_OP_CMD_COMMIT_FORCE:
1892
        case GF_OP_CMD_DETACH_START:
1893
        case GF_OP_CMD_DETACH_COMMIT:
1894
        case GF_OP_CMD_DETACH_COMMIT_FORCE:
1895
        case GF_OP_CMD_STOP_DETACH_TIER:
1896
            break;
1897
    }
1898
    ret = 0;
1899

1900
out:
1901
    gf_msg_debug(this->name, 0, "Returning %d", ret);
1902
    if (ret && errstr) {
1903
        if (op_errstr)
1904
            *op_errstr = errstr;
1905
    }
1906
    if (!op_errstr && errstr)
1907
        GF_FREE(errstr);
1908
    return ret;
1909
}
1910

1911
int
1912
glusterd_remove_brick_migrate_cbk(glusterd_volinfo_t *volinfo,
1913
                                  gf_defrag_status_t status)
1914
{
1915
    int ret = 0;
1916

1917
#if 0 /* TODO: enable this behavior once cluster-wide awareness comes for      \
1918
         defrag cbk function */
1919
        glusterd_brickinfo_t *brickinfo = NULL;
1920
        glusterd_brickinfo_t *tmp = NULL;
1921

1922
        switch (status) {
1923
        case GF_DEFRAG_STATUS_PAUSED:
1924
        case GF_DEFRAG_STATUS_FAILED:
1925
                /* No changes required in the volume file.
1926
                   everything should remain as is */
1927
                break;
1928
        case GF_DEFRAG_STATUS_STOPPED:
1929
                /* Fall back to the old volume file */
1930
                cds_list_for_each_entry_safe (brickinfo, tmp, &volinfo->bricks,
1931
                                              brick_list) {
1932
                        if (!brickinfo->decommissioned)
1933
                                continue;
1934
                        brickinfo->decommissioned = 0;
1935
                }
1936
                break;
1937

1938
        case GF_DEFRAG_STATUS_COMPLETE:
1939
                /* Done with the task, you can remove the brick from the
1940
                   volume file */
1941
                cds_list_for_each_entry_safe (brickinfo, tmp, &volinfo->bricks,
1942
                                              brick_list) {
1943
                        if (!brickinfo->decommissioned)
1944
                                continue;
1945
                        gf_log (THIS->name, GF_LOG_INFO, "removing the brick %s",
1946
                                brickinfo->path);
1947
                        brickinfo->decommissioned = 0;
1948
                        if (GLUSTERD_STATUS_STARTED == volinfo->status) {
1949
                            /*TODO: use the 'atomic' flavour of brick_stop*/
1950
                                ret = glusterd_brick_stop (volinfo, brickinfo);
1951
                                if (ret) {
1952
                                        gf_log (THIS->name, GF_LOG_ERROR,
1953
                                                "Unable to stop glusterfs (%d)", ret);
1954
                                }
1955
                        }
1956
                        glusterd_delete_brick (volinfo, brickinfo);
1957
                }
1958
                break;
1959

1960
        default:
1961
                GF_ASSERT (!"cbk function called with wrong status");
1962
                break;
1963
        }
1964

1965
        ret = glusterd_create_volfiles_and_notify_services (volinfo);
1966
        if (ret)
1967
                gf_log (THIS->name, GF_LOG_ERROR,
1968
                        "Unable to write volume files (%d)", ret);
1969

1970
        ret = glusterd_store_volinfo (volinfo, GLUSTERD_VOLINFO_VER_AC_INCREMENT);
1971
        if (ret)
1972
                gf_log (THIS->name, GF_LOG_ERROR,
1973
                        "Unable to store volume info (%d)", ret);
1974

1975

1976
        if (GLUSTERD_STATUS_STARTED == volinfo->status) {
1977
                ret = glusterd_check_generate_start_nfs ();
1978
                if (ret)
1979
                        gf_log (THIS->name, GF_LOG_ERROR,
1980
                                "Unable to start nfs process (%d)", ret);
1981
        }
1982

1983
#endif
1984

1985
    volinfo->decommission_in_progress = 0;
1986
    return ret;
1987
}
1988

1989
int
1990
glusterd_op_add_brick(dict_t *dict, char **op_errstr)
1991
{
1992
    int ret = 0;
1993
    char *volname = NULL;
1994
    glusterd_volinfo_t *volinfo = NULL;
1995
    char *bricks = NULL;
1996
    int32_t count = 0;
1997

1998
    ret = dict_get_str(dict, "volname", &volname);
1999

2000
    if (ret) {
2001
        gf_msg("glusterd", GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2002
               "Unable to get volume name");
2003
        goto out;
2004
    }
2005

2006
    ret = glusterd_volinfo_find(volname, &volinfo);
2007

2008
    if (ret) {
2009
        gf_msg("glusterd", GF_LOG_ERROR, EINVAL, GD_MSG_VOL_NOT_FOUND,
2010
               "Unable to allocate memory");
2011
        goto out;
2012
    }
2013

2014
    ret = dict_get_int32(dict, "count", &count);
2015
    if (ret) {
2016
        gf_msg("glusterd", GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2017
               "Unable to get count");
2018
        goto out;
2019
    }
2020

2021
    ret = dict_get_str(dict, "bricks", &bricks);
2022
    if (ret) {
2023
        gf_msg("glusterd", GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2024
               "Unable to get bricks");
2025
        goto out;
2026
    }
2027

2028
    ret = glusterd_op_perform_add_bricks(volinfo, count, bricks, dict);
2029
    if (ret) {
2030
        gf_msg("glusterd", GF_LOG_ERROR, 0, GD_MSG_BRICK_ADD_FAIL,
2031
               "Unable to add bricks");
2032
        goto out;
2033
    }
2034

2035
    if (GLUSTERD_STATUS_STARTED == volinfo->status)
2036
        ret = glusterd_svcs_manager(volinfo);
2037

2038
out:
2039
    return ret;
2040
}
2041

2042
int
2043
glusterd_post_commit_brick_operation(dict_t *dict, char **op_errstr)
2044
{
2045
    int ret = 0;
2046
    char *volname = NULL;
2047

2048
    ret = dict_get_str(dict, "volname", &volname);
2049

2050
    if (ret) {
2051
        gf_msg(THIS->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2052
               "Unable to get volume name");
2053
        goto out;
2054
    }
2055
    ret = glusterd_replace_old_auth_allow_list(volname);
2056
out:
2057
    return ret;
2058
}
2059

2060
int
2061
glusterd_set_rebalance_id_for_remove_brick(dict_t *req_dict, dict_t *rsp_dict)
2062
{
2063
    int ret = -1;
2064
    char *volname = NULL;
2065
    glusterd_volinfo_t *volinfo = NULL;
2066
    char msg[2048] = {0};
2067
    char *task_id_str = NULL;
2068
    xlator_t *this = THIS;
2069
    int32_t cmd = 0;
2070

2071
    GF_ASSERT(rsp_dict);
2072
    GF_ASSERT(req_dict);
2073

2074
    ret = dict_get_str(rsp_dict, "volname", &volname);
2075
    if (ret) {
2076
        gf_msg_debug(this->name, 0, "volname not found");
2077
        goto out;
2078
    }
2079

2080
    ret = glusterd_volinfo_find(volname, &volinfo);
2081
    if (ret) {
2082
        gf_msg(this->name, GF_LOG_ERROR, EINVAL, GD_MSG_VOL_NOT_FOUND,
2083
               "Unable to allocate memory");
2084
        goto out;
2085
    }
2086

2087
    ret = dict_get_int32(rsp_dict, "command", &cmd);
2088
    if (ret) {
2089
        gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2090
               "Unable to get command");
2091
        goto out;
2092
    }
2093

2094
    /* remove brick task id is generted in glusterd_op_stage_remove_brick(),
2095
     * but rsp_dict is unavailable there. So copying it to rsp_dict from
2096
     * req_dict here. */
2097

2098
    if (is_origin_glusterd(rsp_dict)) {
2099
        ret = dict_get_str(req_dict, GF_REMOVE_BRICK_TID_KEY, &task_id_str);
2100
        if (ret) {
2101
            snprintf(msg, sizeof(msg), "Missing rebalance id for remove-brick");
2102
            gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_REBALANCE_ID_MISSING,
2103
                   "%s", msg);
2104
            ret = 0;
2105
        } else {
2106
            gf_uuid_parse(task_id_str, volinfo->rebal.rebalance_id);
2107

2108
            ret = glusterd_copy_uuid_to_dict(volinfo->rebal.rebalance_id,
2109
                                             rsp_dict, GF_REMOVE_BRICK_TID_KEY,
2110
                                             SLEN(GF_REMOVE_BRICK_TID_KEY));
2111
            if (ret) {
2112
                gf_msg(this->name, GF_LOG_ERROR, 0,
2113
                       GD_MSG_REMOVE_BRICK_ID_SET_FAIL,
2114
                       "Failed to set remove-brick-id");
2115
                goto out;
2116
            }
2117
        }
2118
    }
2119
    if (!gf_uuid_is_null(volinfo->rebal.rebalance_id) &&
2120
        GD_OP_REMOVE_BRICK == volinfo->rebal.op) {
2121
        ret = glusterd_copy_uuid_to_dict(volinfo->rebal.rebalance_id, rsp_dict,
2122
                                         GF_REMOVE_BRICK_TID_KEY,
2123
                                         SLEN(GF_REMOVE_BRICK_TID_KEY));
2124
        if (ret) {
2125
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
2126
                   "Failed to set task-id for volume %s", volname);
2127
            goto out;
2128
        }
2129
    }
2130
out:
2131
    return ret;
2132
}
2133
int
2134
glusterd_op_remove_brick(dict_t *dict, char **op_errstr)
2135
{
2136
    int ret = -1;
2137
    char *volname = NULL;
2138
    glusterd_volinfo_t *volinfo = NULL;
2139
    char *brick = NULL;
2140
    int32_t count = 0;
2141
    int32_t i = 1;
2142
    char key[64] = "";
2143
    int keylen;
2144
    int32_t flag = 0;
2145
    int need_rebalance = 0;
2146
    int force = 0;
2147
    gf1_op_commands cmd = 0;
2148
    int32_t replica_count = 0;
2149
    char *task_id_str = NULL;
2150
    xlator_t *this = THIS;
2151
    dict_t *bricks_dict = NULL;
2152
    char *brick_tmpstr = NULL;
2153
    int start_remove = 0;
2154
    uint32_t commit_hash = 0;
2155
    int defrag_cmd = 0;
2156
    glusterd_conf_t *conf = NULL;
2157

2158
    conf = this->private;
2159
    GF_VALIDATE_OR_GOTO(this->name, conf, out);
2160

2161
    ret = dict_get_str(dict, "volname", &volname);
2162

2163
    if (ret) {
2164
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_ADD_FAIL,
2165
               "Unable to get volume name");
2166
        goto out;
2167
    }
2168

2169
    ret = glusterd_volinfo_find(volname, &volinfo);
2170
    if (ret) {
2171
        gf_msg(this->name, GF_LOG_ERROR, EINVAL, GD_MSG_VOL_NOT_FOUND,
2172
               "Unable to allocate memory");
2173
        goto out;
2174
    }
2175

2176
    ret = dict_get_int32(dict, "command", &flag);
2177
    if (ret) {
2178
        gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2179
               "Unable to get command");
2180
        goto out;
2181
    }
2182
    cmd = flag;
2183

2184
    if (GF_OP_CMD_START == cmd)
2185
        start_remove = 1;
2186

2187
    /* Set task-id, if available, in ctx dict for operations other than
2188
     * start
2189
     */
2190

2191
    if (is_origin_glusterd(dict) && (!start_remove)) {
2192
        if (!gf_uuid_is_null(volinfo->rebal.rebalance_id)) {
2193
            ret = glusterd_copy_uuid_to_dict(volinfo->rebal.rebalance_id, dict,
2194
                                             GF_REMOVE_BRICK_TID_KEY,
2195
                                             SLEN(GF_REMOVE_BRICK_TID_KEY));
2196
            if (ret) {
2197
                gf_msg(this->name, GF_LOG_ERROR, 0,
2198
                       GD_MSG_REMOVE_BRICK_ID_SET_FAIL,
2199
                       "Failed to set remove-brick-id");
2200
                goto out;
2201
            }
2202
        }
2203
    }
2204

2205
    /* Clear task-id, rebal.op and stored bricks on commmitting/stopping
2206
     * remove-brick */
2207
    if ((!start_remove) && (cmd != GF_OP_CMD_STATUS)) {
2208
        gf_uuid_clear(volinfo->rebal.rebalance_id);
2209
        volinfo->rebal.op = GD_OP_NONE;
2210
        dict_unref(volinfo->rebal.dict);
2211
        volinfo->rebal.dict = NULL;
2212
    }
2213

2214
    ret = -1;
2215
    switch (cmd) {
2216
        case GF_OP_CMD_NONE:
2217
            goto out;
2218

2219
        case GF_OP_CMD_STATUS:
2220
            ret = 0;
2221
            goto out;
2222

2223
        case GF_OP_CMD_STOP:
2224
        case GF_OP_CMD_START:
2225
            /* Reset defrag status to 'NOT STARTED' whenever a
2226
             * remove-brick/rebalance command is issued to remove
2227
             * stale information from previous run.
2228
             * Update defrag_cmd as well or it will only be done
2229
             * for nodes on which the brick to be removed exists.
2230
             */
2231
            /* coverity[MIXED_ENUMS] */
2232
            volinfo->rebal.defrag_cmd = cmd;
2233
            volinfo->rebal.defrag_status = GF_DEFRAG_STATUS_NOT_STARTED;
2234
            ret = dict_get_str(dict, GF_REMOVE_BRICK_TID_KEY, &task_id_str);
2235
            if (ret) {
2236
                gf_msg_debug(this->name, errno, "Missing remove-brick-id");
2237
                ret = 0;
2238
            } else {
2239
                gf_uuid_parse(task_id_str, volinfo->rebal.rebalance_id);
2240
                volinfo->rebal.op = GD_OP_REMOVE_BRICK;
2241
            }
2242
            force = 0;
2243
            break;
2244

2245
        case GF_OP_CMD_COMMIT:
2246
            force = 1;
2247
            break;
2248

2249
        case GF_OP_CMD_COMMIT_FORCE:
2250

2251
            if (volinfo->decommission_in_progress) {
2252
                if (volinfo->rebal.defrag) {
2253
                    LOCK(&volinfo->rebal.defrag->lock);
2254
                    /* Fake 'rebalance-complete' so the graph change
2255
                       happens right away */
2256
                    volinfo->rebal.defrag_status = GF_DEFRAG_STATUS_COMPLETE;
2257

2258
                    UNLOCK(&volinfo->rebal.defrag->lock);
2259
                }
2260
                /* Graph change happens in rebalance _cbk function,
2261
                   no need to do anything here */
2262
                /* TODO: '_cbk' function is not doing anything for now */
2263
            }
2264

2265
            ret = 0;
2266
            force = 1;
2267
            break;
2268
        case GF_OP_CMD_DETACH_START:
2269
        case GF_OP_CMD_DETACH_COMMIT_FORCE:
2270
        case GF_OP_CMD_DETACH_COMMIT:
2271
        case GF_OP_CMD_STOP_DETACH_TIER:
2272
            break;
2273
    }
2274

2275
    ret = dict_get_int32(dict, "count", &count);
2276
    if (ret) {
2277
        gf_msg("glusterd", GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2278
               "Unable to get count");
2279
        goto out;
2280
    }
2281
    /* Save the list of bricks for later usage only on starting a
2282
     * remove-brick. Right now this is required for displaying the task
2283
     * parameters with task status in volume status.
2284
     */
2285

2286
    if (start_remove) {
2287
        bricks_dict = dict_new();
2288
        if (!bricks_dict) {
2289
            ret = -1;
2290
            goto out;
2291
        }
2292
        ret = dict_set_int32_sizen(bricks_dict, "count", count);
2293
        if (ret) {
2294
            gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
2295
                   "Failed to save remove-brick count");
2296
            goto out;
2297
        }
2298
    }
2299

2300
    while (i <= count) {
2301
        keylen = snprintf(key, sizeof(key), "brick%d", i);
2302
        ret = dict_get_strn(dict, key, keylen, &brick);
2303
        if (ret) {
2304
            gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2305
                   "Unable to get %s", key);
2306
            goto out;
2307
        }
2308

2309
        if (start_remove) {
2310
            brick_tmpstr = gf_strdup(brick);
2311
            if (!brick_tmpstr) {
2312
                ret = -1;
2313
                gf_msg(this->name, GF_LOG_ERROR, ENOMEM, GD_MSG_NO_MEMORY,
2314
                       "Failed to duplicate brick name");
2315
                goto out;
2316
            }
2317
            ret = dict_set_dynstrn(bricks_dict, key, keylen, brick_tmpstr);
2318
            if (ret) {
2319
                gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
2320
                       "Failed to add brick to dict");
2321
                goto out;
2322
            }
2323
            brick_tmpstr = NULL;
2324
        }
2325

2326
        ret = glusterd_op_perform_remove_brick(volinfo, brick, force,
2327
                                               &need_rebalance);
2328
        if (ret)
2329
            goto out;
2330
        i++;
2331
    }
2332

2333
    if (start_remove)
2334
        volinfo->rebal.dict = dict_ref(bricks_dict);
2335

2336
    ret = dict_get_int32(dict, "replica-count", &replica_count);
2337
    if (!ret) {
2338
        gf_msg(this->name, GF_LOG_INFO, -ret, GD_MSG_DICT_GET_FAILED,
2339
               "changing replica count %d to %d on volume %s",
2340
               volinfo->replica_count, replica_count, volinfo->volname);
2341
        volinfo->replica_count = replica_count;
2342
        /* A reduction in replica count implies an arbiter volume
2343
         * earlier is now no longer one. */
2344
        if (volinfo->arbiter_count)
2345
            volinfo->arbiter_count = 0;
2346
        volinfo->sub_count = replica_count;
2347
        volinfo->dist_leaf_count = glusterd_get_dist_leaf_count(volinfo);
2348

2349
        /*
2350
         * volinfo->type and sub_count have already been set for
2351
         * volumes undergoing a detach operation, they should not
2352
         * be modified here.
2353
         */
2354
        if (replica_count == 1) {
2355
            if (volinfo->type == GF_CLUSTER_TYPE_REPLICATE) {
2356
                volinfo->type = GF_CLUSTER_TYPE_NONE;
2357
                /* backward compatibility */
2358
                volinfo->sub_count = 0;
2359
            }
2360
        }
2361
    }
2362
    volinfo->subvol_count = (volinfo->brick_count / volinfo->dist_leaf_count);
2363

2364
    if (!glusterd_is_volume_replicate(volinfo) &&
2365
        conf->op_version >= GD_OP_VERSION_3_12_2) {
2366
        ret = dict_set_sizen_str_sizen(volinfo->dict,
2367
                                       "performance.client-io-threads", "on");
2368
        if (ret) {
2369
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
2370
                   "Failed to set "
2371
                   "performance.client-io-threads to on");
2372
            goto out;
2373
        }
2374
    }
2375

2376
    ret = glusterd_create_volfiles_and_notify_services(volinfo);
2377
    if (ret) {
2378
        gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_VOLFILE_CREATE_FAIL,
2379
               "failed to create volfiles");
2380
        goto out;
2381
    }
2382

2383
    ret = glusterd_store_volinfo(volinfo, GLUSTERD_VOLINFO_VER_AC_INCREMENT);
2384
    if (ret) {
2385
        gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_VOLINFO_STORE_FAIL,
2386
               "failed to store volinfo");
2387
        goto out;
2388
    }
2389

2390
    if (start_remove && volinfo->status == GLUSTERD_STATUS_STARTED) {
2391
        ret = glusterd_svcs_reconfigure(volinfo);
2392
        if (ret) {
2393
            gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_NFS_RECONF_FAIL,
2394
                   "Unable to reconfigure NFS-Server");
2395
            goto out;
2396
        }
2397
    }
2398

2399
    /* Need to reset the defrag/rebalance status accordingly */
2400
    switch (volinfo->rebal.defrag_status) {
2401
        case GF_DEFRAG_STATUS_FAILED:
2402
        case GF_DEFRAG_STATUS_COMPLETE:
2403
            volinfo->rebal.defrag_status = 0;
2404
        /* FALLTHROUGH */
2405
        default:
2406
            break;
2407
    }
2408
    if (!force && need_rebalance) {
2409
        if (dict_get_uint32(dict, "commit-hash", &commit_hash) == 0) {
2410
            volinfo->rebal.commit_hash = commit_hash;
2411
        }
2412
        /* perform the rebalance operations */
2413
        defrag_cmd = GF_DEFRAG_CMD_START_FORCE;
2414
        /*
2415
         * We need to set this *before* we issue commands to the
2416
         * bricks, or else we might end up setting it after the bricks
2417
         * have responded.  If we fail to send the request(s) we'll
2418
         * clear it ourselves because nobody else will.
2419
         */
2420
        volinfo->decommission_in_progress = 1;
2421
        char err_str[4096] = "";
2422
        ret = glusterd_handle_defrag_start(
2423
            volinfo, err_str, sizeof(err_str), defrag_cmd,
2424
            glusterd_remove_brick_migrate_cbk, GD_OP_REMOVE_BRICK);
2425

2426
        if (ret) {
2427
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_REBALANCE_START_FAIL,
2428
                   "failed to start the rebalance");
2429
            /* TBD: shouldn't we do more than print a message? */
2430
            volinfo->decommission_in_progress = 0;
2431
            if (op_errstr)
2432
                *op_errstr = gf_strdup(err_str);
2433
        }
2434
    } else {
2435
        if (GLUSTERD_STATUS_STARTED == volinfo->status)
2436
            ret = glusterd_svcs_manager(volinfo);
2437
    }
2438
out:
2439
    GF_FREE(brick_tmpstr);
2440
    if (bricks_dict)
2441
        dict_unref(bricks_dict);
2442
    gf_msg_debug(this->name, 0, "returning %d ", ret);
2443
    return ret;
2444
}
2445

2446
int
2447
glusterd_op_stage_barrier(dict_t *dict, char **op_errstr)
2448
{
2449
    int ret = -1;
2450
    xlator_t *this = THIS;
2451
    char *volname = NULL;
2452
    glusterd_volinfo_t *vol = NULL;
2453
    char *barrier_op = NULL;
2454

2455
    GF_ASSERT(dict);
2456

2457
    ret = dict_get_str(dict, "volname", &volname);
2458
    if (ret) {
2459
        gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2460
               "Volname not present in "
2461
               "dict");
2462
        goto out;
2463
    }
2464

2465
    ret = glusterd_volinfo_find(volname, &vol);
2466
    if (ret) {
2467
        gf_asprintf(op_errstr, "Volume %s does not exist", volname);
2468
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOL_NOT_FOUND, "%s",
2469
               *op_errstr);
2470
        goto out;
2471
    }
2472

2473
    if (!glusterd_is_volume_started(vol)) {
2474
        gf_asprintf(op_errstr, "Volume %s is not started", volname);
2475
        ret = -1;
2476
        goto out;
2477
    }
2478

2479
    ret = dict_get_str(dict, "barrier", &barrier_op);
2480
    if (ret == -1) {
2481
        gf_asprintf(op_errstr,
2482
                    "Barrier op for volume %s not present "
2483
                    "in dict",
2484
                    volname);
2485
        gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED, "%s",
2486
               *op_errstr);
2487
        goto out;
2488
    }
2489
    ret = 0;
2490
out:
2491
    gf_msg_debug(this->name, 0, "Returning %d", ret);
2492
    return ret;
2493
}
2494

2495
int
2496
glusterd_op_barrier(dict_t *dict, char **op_errstr)
2497
{
2498
    int ret = -1;
2499
    xlator_t *this = THIS;
2500
    char *volname = NULL;
2501
    glusterd_volinfo_t *vol = NULL;
2502
    char *barrier_op = NULL;
2503

2504
    GF_ASSERT(dict);
2505

2506
    ret = dict_get_str(dict, "volname", &volname);
2507
    if (ret) {
2508
        gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2509
               "Volname not present in "
2510
               "dict");
2511
        goto out;
2512
    }
2513

2514
    ret = glusterd_volinfo_find(volname, &vol);
2515
    if (ret) {
2516
        gf_asprintf(op_errstr, "Volume %s does not exist", volname);
2517
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOL_NOT_FOUND, "%s",
2518
               *op_errstr);
2519
        goto out;
2520
    }
2521

2522
    ret = dict_get_str(dict, "barrier", &barrier_op);
2523
    if (ret) {
2524
        gf_asprintf(op_errstr,
2525
                    "Barrier op for volume %s not present "
2526
                    "in dict",
2527
                    volname);
2528
        gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED, "%s",
2529
               *op_errstr);
2530
        goto out;
2531
    }
2532

2533
    ret = dict_set_dynstr_with_alloc(vol->dict, "features.barrier", barrier_op);
2534
    if (ret) {
2535
        gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
2536
               "Failed to set barrier op in"
2537
               " volume option dict");
2538
        goto out;
2539
    }
2540

2541
    gd_update_volume_op_versions(vol);
2542
    ret = glusterd_create_volfiles(vol);
2543
    if (ret) {
2544
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOLFILE_CREATE_FAIL,
2545
               "Failed to create volfiles");
2546
        goto out;
2547
    }
2548
    ret = glusterd_store_volinfo(vol, GLUSTERD_VOLINFO_VER_AC_INCREMENT);
2549

2550
out:
2551
    gf_msg_debug(this->name, 0, "Returning %d", ret);
2552
    return ret;
2553
}
2554

2555
int
2556
glusterd_handle_add_tier_brick(rpcsvc_request_t *req)
2557
{
2558
    return 0;
2559
}
2560

2561
int
2562
glusterd_handle_attach_tier(rpcsvc_request_t *req)
2563
{
2564
    return 0;
2565
}
2566

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.