2
Copyright (c) 2011-2012 Red Hat, Inc. <http://www.redhat.com>
3
This file is part of GlusterFS.
5
This file is licensed to you under your choice of the GNU Lesser
6
General Public License, version 3 or any later version (LGPLv3 or
7
later), or the GNU General Public License, version 2 (GPLv2), in all
8
cases as published by the Free Software Foundation.
10
#include "glusterd-op-sm.h"
11
#include "glusterd-geo-rep.h"
12
#include "glusterd-store.h"
13
#include "glusterd-mgmt.h"
14
#include "glusterd-utils.h"
15
#include "glusterd-volgen.h"
16
#include "glusterd-svc-helper.h"
17
#include "glusterd-messages.h"
18
#include "glusterd-server-quorum.h"
19
#include <glusterfs/run.h>
20
#include <glusterfs/syscall.h>
21
#include <sys/signal.h>
25
/* In this function, we decide, based on the 'count' of the brick,
26
where to add it in the current volume. 'count' tells us already
27
how many of the given bricks are added. other argument are self-
30
add_brick_at_right_order(glusterd_brickinfo_t *brickinfo,
31
glusterd_volinfo_t *volinfo, int count,
37
glusterd_brickinfo_t *brick = NULL;
39
/* The complexity of the function is in deciding at which index
40
to add new brick. Even though it can be defined with a complex
41
single formula for all volume, it is separated out to make it
44
/* replica count is set */
45
/* common formula when 'replica_count' is set */
46
/* idx = ((count / (replica_cnt - existing_replica_count)) *
47
existing_replica_count) +
48
(count + existing_replica_count);
51
sub_cnt = volinfo->replica_count;
52
idx = (count / (replica_cnt - sub_cnt) * sub_cnt) + (count + sub_cnt);
55
cds_list_for_each_entry(brick, &volinfo->bricks, brick_list)
60
gf_msg_debug(THIS->name, 0, "brick:%s index=%d, count=%d", brick->path,
63
cds_list_add(&brickinfo->brick_list, &brick->brick_list);
71
gd_addbr_validate_replica_count(glusterd_volinfo_t *volinfo, int replica_count,
72
int arbiter_count, int total_bricks, int *type,
73
char *err_str, int err_len)
77
/* replica count is set */
78
switch (volinfo->type) {
79
case GF_CLUSTER_TYPE_NONE:
80
if ((volinfo->brick_count * replica_count) == total_bricks) {
81
/* Change the volume type */
82
*type = GF_CLUSTER_TYPE_REPLICATE;
83
gf_msg(THIS->name, GF_LOG_INFO, 0,
84
GD_MSG_VOL_TYPE_CHANGING_INFO,
85
"Changing the type of volume %s from "
86
"'distribute' to 'replica'",
92
snprintf(err_str, err_len,
93
"Incorrect number of "
94
"bricks (%d) supplied for replica count (%d).",
95
(total_bricks - volinfo->brick_count), replica_count);
96
gf_msg(THIS->name, GF_LOG_ERROR, EINVAL, GD_MSG_INVALID_ENTRY,
101
case GF_CLUSTER_TYPE_REPLICATE:
102
if (replica_count < volinfo->replica_count) {
103
snprintf(err_str, err_len,
104
"Incorrect replica count (%d) supplied. "
105
"Volume already has (%d)",
106
replica_count, volinfo->replica_count);
107
gf_msg(THIS->name, GF_LOG_ERROR, EINVAL, GD_MSG_INVALID_ENTRY,
111
if (replica_count == volinfo->replica_count) {
112
if (arbiter_count && !volinfo->arbiter_count) {
113
snprintf(err_str, err_len,
114
"Cannot convert replica 3 volume "
115
"to arbiter volume.");
116
gf_msg(THIS->name, GF_LOG_ERROR, EINVAL,
117
GD_MSG_INVALID_ENTRY, "%s", err_str);
120
if (!(total_bricks % volinfo->dist_leaf_count)) {
125
if (replica_count > volinfo->replica_count) {
126
/* We have to make sure before and after 'add-brick',
127
the number or subvolumes for distribute will remain
128
same, when replica count is given */
129
if ((total_bricks * volinfo->dist_leaf_count) ==
130
(volinfo->brick_count * replica_count)) {
131
/* Change the dist_leaf_count */
132
gf_msg(THIS->name, GF_LOG_INFO, 0,
133
GD_MSG_REPLICA_COUNT_CHANGE_INFO,
134
"Changing the replica count of "
135
"volume %s from %d to %d",
136
volinfo->volname, volinfo->replica_count,
143
case GF_CLUSTER_TYPE_DISPERSE:
144
snprintf(err_str, err_len,
145
"Volume %s cannot be converted "
146
"from dispersed to replicated-"
149
gf_msg(THIS->name, GF_LOG_ERROR, EPERM, GD_MSG_OP_NOT_PERMITTED,
158
gd_rmbr_validate_replica_count(glusterd_volinfo_t *volinfo,
159
int32_t replica_count, int32_t brick_count,
160
char *err_str, size_t err_len)
163
int replica_nodes = 0;
165
switch (volinfo->type) {
166
case GF_CLUSTER_TYPE_NONE:
167
case GF_CLUSTER_TYPE_DISPERSE:
168
snprintf(err_str, err_len,
169
"replica count (%d) option given for non replicate "
171
replica_count, volinfo->volname);
172
gf_smsg(THIS->name, GF_LOG_WARNING, EINVAL, GD_MSG_INVALID_ARGUMENT,
176
case GF_CLUSTER_TYPE_REPLICATE:
177
/* in remove brick, you can only reduce the replica count */
178
if (replica_count > volinfo->replica_count) {
179
snprintf(err_str, err_len,
180
"given replica count (%d) option is more "
181
"than volume %s's replica count (%d)",
182
replica_count, volinfo->volname,
183
volinfo->replica_count);
184
gf_smsg(THIS->name, GF_LOG_WARNING, EINVAL,
185
GD_MSG_INVALID_ARGUMENT, err_str, NULL);
188
if (replica_count == volinfo->replica_count) {
189
/* This means the 'replica N' option on CLI was
190
redundant. Check if the total number of bricks given
191
for removal is same as 'dist_leaf_count' */
192
if (brick_count % volinfo->dist_leaf_count) {
193
snprintf(err_str, err_len,
194
"number of bricks provided (%d) is "
195
"not valid. need at least %d "
197
brick_count, volinfo->dist_leaf_count,
198
volinfo->dist_leaf_count);
199
gf_smsg(THIS->name, GF_LOG_WARNING, EINVAL,
200
GD_MSG_INVALID_ARGUMENT, err_str, NULL);
207
replica_nodes = ((volinfo->brick_count / volinfo->replica_count) *
208
(volinfo->replica_count - replica_count));
210
if (brick_count % replica_nodes) {
211
snprintf(err_str, err_len,
212
"need %d(xN) bricks for reducing replica "
213
"count of the volume from %d to %d",
214
replica_nodes, volinfo->replica_count, replica_count);
215
gf_smsg(THIS->name, GF_LOG_WARNING, EINVAL,
216
GD_MSG_INVALID_ARGUMENT, err_str, NULL);
227
/* Handler functions */
229
__glusterd_handle_add_brick(rpcsvc_request_t *req)
232
gf_cli_req cli_req = {{
237
char *volname = NULL;
239
void *cli_rsp = NULL;
240
char err_str[2048] = "";
244
glusterd_volinfo_t *volinfo = NULL;
245
xlator_t *this = THIS;
246
int total_bricks = 0;
247
int32_t replica_count = 0;
248
int32_t arbiter_count = 0;
253
ret = xdr_to_generic(req->msg[0], &cli_req, (xdrproc_t)xdr_gf_cli_req);
255
// failed to decode msg;
256
req->rpc_err = GARBAGE_ARGS;
257
snprintf(err_str, sizeof(err_str), "Garbage args received");
258
gf_smsg(this->name, GF_LOG_ERROR, errno, GD_MSG_GARBAGE_ARGS, NULL);
262
gf_msg(this->name, GF_LOG_INFO, 0, GD_MSG_ADD_BRICK_REQ_RECVD,
263
"Received add brick req");
265
if (cli_req.dict.dict_len) {
266
/* Unserialize the dictionary */
269
ret = dict_unserialize(cli_req.dict.dict_val, cli_req.dict.dict_len,
272
gf_msg(this->name, GF_LOG_ERROR, errno,
273
GD_MSG_DICT_UNSERIALIZE_FAIL,
275
"unserialize req-buffer to dictionary");
276
snprintf(err_str, sizeof(err_str),
283
ret = dict_get_str(dict, "volname", &volname);
286
snprintf(err_str, sizeof(err_str),
287
"Unable to get volume "
289
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED, "%s",
294
ret = glusterd_volinfo_find(volname, &volinfo);
296
snprintf(err_str, sizeof(err_str),
297
"Unable to get volinfo "
298
"for volume name %s",
300
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOLINFO_GET_FAIL, "%s",
305
ret = dict_get_int32(dict, "count", &brick_count);
307
snprintf(err_str, sizeof(err_str),
308
"Unable to get volume "
310
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED, "%s",
315
ret = dict_get_int32(dict, "replica-count", &replica_count);
317
gf_msg(this->name, GF_LOG_INFO, errno, GD_MSG_DICT_GET_SUCCESS,
318
"replica-count is %d", replica_count);
321
ret = dict_get_int32(dict, "arbiter-count", &arbiter_count);
323
gf_msg(this->name, GF_LOG_INFO, errno, GD_MSG_DICT_GET_SUCCESS,
324
"arbiter-count is %d", arbiter_count);
327
if (!dict_get_sizen(dict, "force")) {
328
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
329
"Failed to get flag");
333
total_bricks = volinfo->brick_count + brick_count;
335
if (!replica_count) {
336
if (volinfo->type == GF_CLUSTER_TYPE_NONE)
339
if ((volinfo->brick_count < volinfo->dist_leaf_count) &&
340
(total_bricks <= volinfo->dist_leaf_count))
343
if ((brick_count % volinfo->dist_leaf_count) != 0) {
344
snprintf(err_str, sizeof(err_str),
346
"of bricks supplied %d with count %d",
347
brick_count, volinfo->dist_leaf_count);
348
gf_msg(this->name, GF_LOG_ERROR, EINVAL, GD_MSG_VOL_NOT_REPLICA,
354
/* done with validation.. below section is if replica
358
ret = gd_addbr_validate_replica_count(volinfo, replica_count, arbiter_count,
359
total_bricks, &type, err_str,
362
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_COUNT_VALIDATE_FAILED, "%s",
367
/* if replica count is same as earlier, set it back to 0 */
371
ret = dict_set_int32_sizen(dict, "replica-count", replica_count);
373
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
374
"failed to set the replica-count in dict");
379
ret = dict_get_str(dict, "bricks", &bricks);
381
snprintf(err_str, sizeof(err_str),
382
"Unable to get volume "
384
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED, "%s",
389
if (type != volinfo->type) {
390
ret = dict_set_int32_sizen(dict, "type", type);
392
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
393
"failed to set the new type in dict");
398
ret = glusterd_mgmt_v3_initiate_all_phases(req, GD_OP_ADD_BRICK, dict);
404
if (err_str[0] == '\0')
405
snprintf(err_str, sizeof(err_str), "Operation failed");
406
rsp.op_errstr = err_str;
408
glusterd_to_cli(req, cli_rsp, NULL, 0, NULL, (xdrproc_t)xdr_gf_cli_rsp,
410
ret = 0; // sent error to cli, prevent second reply
413
free(cli_req.dict.dict_val); // its malloced by xdr
419
glusterd_handle_add_brick(rpcsvc_request_t *req)
421
return glusterd_big_locked_handler(req, __glusterd_handle_add_brick);
425
subvol_matcher_init(int **subvols, int count)
429
*subvols = GF_CALLOC(count, sizeof(int), gf_gld_mt_int);
437
subvol_matcher_update(int *subvols, glusterd_volinfo_t *volinfo,
438
glusterd_brickinfo_t *brickinfo)
440
glusterd_brickinfo_t *tmp = NULL;
441
int32_t sub_volume = 0;
444
cds_list_for_each_entry(tmp, &volinfo->bricks, brick_list)
446
if (strcmp(tmp->hostname, brickinfo->hostname) ||
447
strcmp(tmp->path, brickinfo->path)) {
451
gf_msg_debug(THIS->name, 0, LOGSTR_FOUND_BRICK, brickinfo->hostname,
452
brickinfo->path, volinfo->volname);
453
sub_volume = (pos / volinfo->dist_leaf_count);
454
subvols[sub_volume]++;
461
subvol_matcher_verify(int *subvols, glusterd_volinfo_t *volinfo, char *err_str,
462
size_t err_len, char *vol_type, int replica_count)
466
int count = volinfo->replica_count - replica_count;
467
xlator_t *this = THIS;
469
if (replica_count && subvols) {
470
for (i = 0; i < volinfo->subvol_count; i++) {
471
if (subvols[i] != count) {
473
snprintf(err_str, err_len,
475
" brick(s) from each subvolume.",
477
gf_smsg(this->name, GF_LOG_ERROR, errno,
478
GD_MSG_BRICK_SUBVOL_VERIFY_FAIL, err_str, NULL);
486
if (subvols && (subvols[i] % volinfo->dist_leaf_count == 0)) {
490
snprintf(err_str, err_len, "Bricks not from same subvol for %s",
492
gf_smsg(this->name, GF_LOG_ERROR, errno,
493
GD_MSG_BRICK_SUBVOL_VERIFY_FAIL, err_str, NULL);
496
} while (++i < volinfo->subvol_count);
502
subvol_matcher_destroy(int *subvols)
508
glusterd_remove_brick_validate_arbiters(glusterd_volinfo_t *volinfo,
509
int32_t count, int32_t replica_count,
510
glusterd_brickinfo_t **brickinfo_list,
511
char *err_str, size_t err_len)
515
glusterd_brickinfo_t *brickinfo = NULL;
516
glusterd_brickinfo_t *last = NULL;
517
char *arbiter_array = NULL;
518
xlator_t *this = THIS;
520
if (volinfo->type != GF_CLUSTER_TYPE_REPLICATE)
523
if (!replica_count || !volinfo->arbiter_count)
526
if (replica_count == 2) {
527
/* If it is an arbiter to replica 2 conversion, only permit
528
* removal of the arbiter brick.*/
529
for (i = 0; i < count; i++) {
530
brickinfo = brickinfo_list[i];
531
last = get_last_brick_of_brick_group(volinfo, brickinfo);
532
if (last != brickinfo) {
533
snprintf(err_str, err_len,
535
"brick(s) only when converting from "
536
"arbiter to replica 2 subvolume.");
537
gf_smsg(this->name, GF_LOG_ERROR, errno,
538
GD_MSG_REMOVE_ARBITER_BRICK, err_str, NULL);
543
} else if (replica_count == 1) {
544
/* If it is an arbiter to plain distribute conversion, in every
545
* replica subvol, the arbiter has to be one of the bricks that
547
arbiter_array = GF_CALLOC(volinfo->subvol_count, sizeof(*arbiter_array),
551
for (i = 0; i < count; i++) {
552
brickinfo = brickinfo_list[i];
553
last = get_last_brick_of_brick_group(volinfo, brickinfo);
554
if (last == brickinfo)
555
arbiter_array[brickinfo->group] = 1;
557
for (i = 0; i < volinfo->subvol_count; i++)
558
if (!arbiter_array[i]) {
559
snprintf(err_str, err_len,
561
"must contain arbiter when converting"
562
" to plain distribute.");
563
gf_smsg(this->name, GF_LOG_ERROR, errno,
564
GD_MSG_REMOVE_ARBITER_BRICK, err_str, NULL);
568
GF_FREE(arbiter_array);
576
__glusterd_handle_remove_brick(rpcsvc_request_t *req)
579
gf_cli_req cli_req = {{
588
glusterd_conf_t *conf = NULL;
589
glusterd_volinfo_t *volinfo = NULL;
590
glusterd_brickinfo_t *brickinfo = NULL;
591
glusterd_brickinfo_t **brickinfo_list = NULL;
593
char err_str[2048] = "";
597
void *cli_rsp = NULL;
598
char vol_type[256] = "";
599
int32_t replica_count = 0;
601
xlator_t *this = THIS;
605
conf = this->private;
608
ret = xdr_to_generic(req->msg[0], &cli_req, (xdrproc_t)xdr_gf_cli_req);
610
// failed to decode msg;
611
req->rpc_err = GARBAGE_ARGS;
612
snprintf(err_str, sizeof(err_str), "Received garbage args");
613
gf_smsg(this->name, GF_LOG_ERROR, errno, GD_MSG_GARBAGE_ARGS, NULL);
617
gf_msg(this->name, GF_LOG_INFO, 0, GD_MSG_REM_BRICK_REQ_RECVD,
618
"Received rem brick req");
620
if (cli_req.dict.dict_len) {
621
/* Unserialize the dictionary */
624
ret = dict_unserialize(cli_req.dict.dict_val, cli_req.dict.dict_len,
627
gf_msg(this->name, GF_LOG_ERROR, errno,
628
GD_MSG_DICT_UNSERIALIZE_FAIL,
630
"unserialize req-buffer to dictionary");
631
snprintf(err_str, sizeof(err_str),
638
ret = dict_get_str(dict, "volname", &volname);
640
snprintf(err_str, sizeof(err_str),
641
"Unable to get volume "
643
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED, "%s",
648
ret = dict_get_int32(dict, "count", &count);
650
snprintf(err_str, sizeof(err_str),
651
"Unable to get brick "
653
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED, "%s",
658
ret = glusterd_volinfo_find(volname, &volinfo);
660
snprintf(err_str, sizeof(err_str), "Volume %s does not exist", volname);
661
gf_msg(this->name, GF_LOG_ERROR, EINVAL, GD_MSG_VOL_NOT_FOUND, "%s",
666
ret = dict_get_int32(dict, "command", &cmd);
668
snprintf(err_str, sizeof(err_str),
671
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED, "%s",
676
ret = dict_get_int32(dict, "replica-count", &replica_count);
678
gf_msg(this->name, GF_LOG_INFO, -ret, GD_MSG_DICT_GET_FAILED,
679
"request to change replica-count to %d", replica_count);
680
ret = gd_rmbr_validate_replica_count(volinfo, replica_count, count,
681
err_str, sizeof(err_str));
683
/* logging and error msg are done in above function
687
dict_del_sizen(dict, "replica-count");
691
ret = dict_set_int32_sizen(dict, "replica-count", replica_count);
693
gf_msg(this->name, GF_LOG_WARNING, -ret, GD_MSG_DICT_SET_FAILED,
694
"failed to set the replica_count "
701
/* 'vol_type' is used for giving the meaning full error msg for user */
702
if (volinfo->type == GF_CLUSTER_TYPE_REPLICATE) {
703
strcpy(vol_type, "replica");
704
} else if (volinfo->type == GF_CLUSTER_TYPE_DISPERSE) {
705
strcpy(vol_type, "disperse");
707
strcpy(vol_type, "distribute");
710
if (!replica_count && (volinfo->type == GF_CLUSTER_TYPE_REPLICATE) &&
711
(volinfo->brick_count == volinfo->dist_leaf_count)) {
712
snprintf(err_str, sizeof(err_str),
713
"Removing bricks from replicate configuration "
714
"is not allowed without reducing replica count "
716
gf_msg(this->name, GF_LOG_ERROR, EPERM, GD_MSG_OP_NOT_PERMITTED_AC_REQD,
722
/* Do not allow remove-brick if the bricks given is less than
724
if (!replica_count && (volinfo->type != GF_CLUSTER_TYPE_NONE)) {
725
if (volinfo->dist_leaf_count && (count % volinfo->dist_leaf_count)) {
726
snprintf(err_str, sizeof(err_str),
728
"incorrect brick count of %d for %s %d",
729
count, vol_type, volinfo->dist_leaf_count);
730
gf_msg(this->name, GF_LOG_ERROR, EINVAL, GD_MSG_INVALID_ENTRY, "%s",
737
if ((volinfo->type != GF_CLUSTER_TYPE_NONE) &&
738
(volinfo->subvol_count > 1)) {
739
ret = subvol_matcher_init(&subvols, volinfo->subvol_count);
744
brickinfo_list = GF_CALLOC(count, sizeof(*brickinfo_list),
745
gf_common_mt_pointer);
746
if (!brickinfo_list) {
752
keylen = snprintf(key, sizeof(key), "brick%d", i);
753
ret = dict_get_strn(dict, key, keylen, &brick);
755
snprintf(err_str, sizeof(err_str), "Unable to get %s", key);
756
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED, "%s",
760
gf_msg_debug(this->name, 0,
761
"Remove brick count %d brick:"
765
ret = glusterd_volume_brickinfo_get_by_brick(brick, volinfo, &brickinfo,
769
snprintf(err_str, sizeof(err_str),
773
gf_msg(this->name, GF_LOG_ERROR, EINVAL, GD_MSG_BRICK_NOT_FOUND,
777
brickinfo_list[i - 1] = brickinfo;
780
if ((volinfo->type == GF_CLUSTER_TYPE_NONE) ||
781
(volinfo->brick_count <= volinfo->dist_leaf_count))
784
subvol_matcher_update(subvols, volinfo, brickinfo);
787
if ((volinfo->type != GF_CLUSTER_TYPE_NONE) &&
788
(volinfo->subvol_count > 1)) {
789
ret = subvol_matcher_verify(subvols, volinfo, err_str, sizeof(err_str),
790
vol_type, replica_count);
795
ret = glusterd_remove_brick_validate_arbiters(volinfo, count, replica_count,
796
brickinfo_list, err_str,
801
if (conf->op_version < GD_OP_VERSION_8_0) {
802
gf_msg_debug(this->name, 0,
803
"The cluster is operating at "
804
"version less than %d. remove-brick operation"
805
"falling back to syncop framework.",
807
ret = glusterd_op_begin_synctask(req, GD_OP_REMOVE_BRICK, dict);
809
ret = glusterd_mgmt_v3_initiate_all_phases(req, GD_OP_REMOVE_BRICK,
817
if (err_str[0] == '\0')
818
snprintf(err_str, sizeof(err_str), "Operation failed");
819
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_GLUSTERD_OP_FAILED, "%s",
821
rsp.op_errstr = err_str;
823
glusterd_to_cli(req, cli_rsp, NULL, 0, NULL, (xdrproc_t)xdr_gf_cli_rsp,
826
ret = 0; // sent error to cli, prevent second reply
830
GF_FREE(brickinfo_list);
831
subvol_matcher_destroy(subvols);
832
free(cli_req.dict.dict_val); // its malloced by xdr
838
glusterd_handle_remove_brick(rpcsvc_request_t *req)
840
return glusterd_big_locked_handler(req, __glusterd_handle_remove_brick);
844
_glusterd_restart_gsync_session(dict_t *this, char *key, data_t *value,
847
char *secondary = NULL;
848
char *secondary_buf = NULL;
849
char *path_list = NULL;
850
char *secondary_vol = NULL;
851
char *secondary_host = NULL;
852
char *secondary_url = NULL;
853
char *conf_path = NULL;
854
char **errmsg = NULL;
856
glusterd_gsync_status_temp_t *param = NULL;
857
gf_boolean_t is_running = _gf_false;
859
param = (glusterd_gsync_status_temp_t *)data;
862
GF_ASSERT(param->volinfo);
864
secondary = strchr(value->data, ':');
867
secondary_buf = gf_strdup(secondary);
868
if (!secondary_buf) {
869
gf_msg("glusterd", GF_LOG_ERROR, ENOMEM, GD_MSG_NO_MEMORY,
870
"Failed to gf_strdup");
877
ret = dict_set_dynstr_sizen(param->rsp_dict, "secondary", secondary_buf);
879
gf_msg("glusterd", GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
880
"Unable to store secondary");
882
GF_FREE(secondary_buf);
886
ret = glusterd_get_secondary_details_confpath(
887
param->volinfo, param->rsp_dict, &secondary_url, &secondary_host,
888
&secondary_vol, &conf_path, errmsg);
890
if (errmsg && *errmsg)
891
gf_msg("glusterd", GF_LOG_ERROR, 0,
892
GD_MSG_SECONDARY_CONFPATH_DETAILS_FETCH_FAIL, "%s", *errmsg);
894
gf_msg("glusterd", GF_LOG_ERROR, 0,
895
GD_MSG_SECONDARY_CONFPATH_DETAILS_FETCH_FAIL,
896
"Unable to fetch secondary or confpath details.");
900
/* In cases that gsyncd is not running, we will not invoke it
901
* because of add-brick. */
902
ret = glusterd_check_gsync_running_local(param->volinfo->volname, secondary,
903
conf_path, &is_running);
905
gf_msg("glusterd", GF_LOG_ERROR, 0, GD_MSG_GSYNC_VALIDATION_FAIL,
906
"gsync running validation failed.");
909
if (_gf_false == is_running) {
910
gf_msg_debug("glusterd", 0,
911
"gsync session for %s and %s is"
912
" not running on this node. Hence not restarting.",
913
param->volinfo->volname, secondary);
918
ret = glusterd_get_local_brickpaths(param->volinfo, &path_list);
920
gf_msg_debug("glusterd", 0,
921
"This node not being part of"
922
" volume should not be running gsyncd. Hence"
923
" no gsyncd process to restart.");
928
ret = glusterd_check_restart_gsync_session(
929
param->volinfo, secondary, param->rsp_dict, path_list, conf_path, 0);
931
gf_msg("glusterd", GF_LOG_ERROR, 0, GD_MSG_GSYNC_RESTART_FAIL,
932
"Unable to restart gsync session.");
935
gf_msg_debug("glusterd", 0, "Returning %d.", ret);
942
glusterd_op_perform_add_bricks(glusterd_volinfo_t *volinfo, int32_t count,
943
char *bricks, dict_t *dict)
947
char *brick_list = NULL;
948
char *free_ptr1 = NULL;
949
char *free_ptr2 = NULL;
950
char *saveptr = NULL;
952
int32_t replica_count = 0;
953
int32_t arbiter_count = 0;
955
glusterd_brickinfo_t *brickinfo = NULL;
956
glusterd_gsync_status_temp_t param = {
959
gf_boolean_t restart_needed = 0;
962
char *brick_mount_dir = NULL;
963
xlator_t *this = THIS;
964
gf_boolean_t is_valid_add_brick = _gf_false;
965
gf_boolean_t restart_shd = _gf_false;
966
struct statvfs brickstat = {
973
brick_list = gf_strdup(bricks);
974
free_ptr1 = brick_list;
978
brick = strtok_r(brick_list + 1, " \n", &saveptr);
981
ret = dict_get_int32(dict, "replica-count", &replica_count);
983
gf_msg(this->name, GF_LOG_INFO, errno, GD_MSG_DICT_GET_SUCCESS,
984
"replica-count is set %d", replica_count);
985
ret = dict_get_int32(dict, "arbiter-count", &arbiter_count);
987
gf_msg(this->name, GF_LOG_INFO, errno, GD_MSG_DICT_GET_SUCCESS,
988
"arbiter-count is set %d", arbiter_count);
989
ret = dict_get_int32(dict, "type", &type);
991
gf_msg(this->name, GF_LOG_INFO, errno, GD_MSG_DICT_GET_SUCCESS,
992
"type is set %d, need to change it", type);
995
brickid = glusterd_get_next_available_brickid(volinfo);
999
ret = glusterd_brickinfo_new_from_brick(brick, &brickinfo, _gf_true,
1004
GLUSTERD_ASSIGN_BRICKID_TO_BRICKINFO(brickinfo, volinfo, brickid++);
1006
brick_mount_dir = NULL;
1008
snprintf(key, sizeof(key), "brick%d.mount_dir", i);
1009
ret = dict_get_str(dict, key, &brick_mount_dir);
1011
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
1012
"%s not present", key);
1015
strncpy(brickinfo->mount_dir, brick_mount_dir,
1016
sizeof(brickinfo->mount_dir) - 1);
1018
ret = glusterd_resolve_brick(brickinfo);
1022
if (!gf_uuid_compare(brickinfo->uuid, MY_UUID)) {
1023
ret = sys_statvfs(brickinfo->path, &brickstat);
1025
gf_msg(this->name, GF_LOG_ERROR, errno, GD_MSG_STATVFS_FAILED,
1026
"Failed to fetch disk utilization "
1027
"from the brick (%s:%s). Please check the health of "
1028
"the brick. Error code was %s",
1029
brickinfo->hostname, brickinfo->path, strerror(errno));
1033
brickinfo->statfs_fsid = brickstat.f_fsid;
1035
if (replica_count) {
1036
add_brick_at_right_order(brickinfo, volinfo, (i - 1),
1039
cds_list_add_tail(&brickinfo->brick_list, &volinfo->bricks);
1041
brick = strtok_r(NULL, " \n", &saveptr);
1043
volinfo->brick_count++;
1046
/* Gets changed only if the options are given in add-brick cli */
1048
volinfo->type = type;
1049
/* performance.client-io-threads is turned on by default,
1050
* however this has adverse effects on replicate volumes due to
1051
* replication design issues, till that get addressed
1052
* performance.client-io-threads option is turned off for all
1053
* replicate volumes if not already explicitly enabled.
1055
if (type && glusterd_is_volume_replicate(volinfo)) {
1056
ret = dict_set_sizen_str_sizen(volinfo->dict,
1057
"performance.client-io-threads", "off");
1059
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
1061
"performance.client-io-threads to off");
1066
if (replica_count) {
1067
volinfo->replica_count = replica_count;
1069
if (arbiter_count) {
1070
volinfo->arbiter_count = arbiter_count;
1072
volinfo->dist_leaf_count = glusterd_get_dist_leaf_count(volinfo);
1074
/* backward compatibility */
1075
volinfo->sub_count = ((volinfo->dist_leaf_count == 1)
1077
: volinfo->dist_leaf_count);
1079
volinfo->subvol_count = (volinfo->brick_count / volinfo->dist_leaf_count);
1082
if (GLUSTERD_STATUS_STARTED != volinfo->status)
1083
goto generate_volfiles;
1085
ret = generate_brick_volfiles(volinfo);
1089
brick_list = gf_strdup(bricks);
1090
free_ptr2 = brick_list;
1094
brick = strtok_r(brick_list + 1, " \n", &saveptr);
1096
if (glusterd_is_volume_replicate(volinfo)) {
1097
if (replica_count) {
1098
is_valid_add_brick = _gf_true;
1099
if (volinfo->status == GLUSTERD_STATUS_STARTED) {
1100
ret = volinfo->shd.svc.stop(&(volinfo->shd.svc), SIGTERM);
1102
gf_msg(this->name, GF_LOG_ERROR, 0,
1103
GD_MSG_GLUSTER_SERVICES_STOP_FAIL,
1104
"Failed to stop shd for %s.", volinfo->volname);
1106
restart_shd = _gf_true;
1108
ret = generate_dummy_client_volfiles(volinfo);
1110
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOLFILE_CREATE_FAIL,
1111
"Failed to create volfile.");
1117
while (i <= count) {
1118
ret = glusterd_volume_brickinfo_get_by_brick(brick, volinfo, &brickinfo,
1123
if (gf_uuid_is_null(brickinfo->uuid)) {
1124
ret = glusterd_resolve_brick(brickinfo);
1126
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_RESOLVE_BRICK_FAIL,
1127
FMTSTR_RESOLVE_BRICK, brickinfo->hostname,
1133
/* if the volume is a replicate volume, do: */
1134
if (is_valid_add_brick) {
1135
if (!gf_uuid_compare(brickinfo->uuid, MY_UUID)) {
1136
ret = glusterd_handle_replicate_brick_ops(volinfo, brickinfo,
1142
ret = glusterd_brick_start(volinfo, brickinfo, _gf_true, _gf_false);
1146
brick = strtok_r(NULL, " \n", &saveptr);
1148
/* Check if the brick is added in this node, and set
1149
* the restart_needed flag. */
1150
if ((!gf_uuid_compare(brickinfo->uuid, MY_UUID)) && !restart_needed) {
1152
gf_msg_debug(this->name, 0,
1153
"Restart gsyncd session, if it's already "
1158
/* If the restart_needed flag is set, restart gsyncd sessions for that
1159
* particular primary with all the secondaries. */
1160
if (restart_needed) {
1161
param.rsp_dict = dict;
1162
param.volinfo = volinfo;
1163
dict_foreach(volinfo->gsync_secondaries,
1164
_glusterd_restart_gsync_session, ¶m);
1169
* The cluster is operating at version greater than
1170
* gluster-3.7.5. So no need to sent volfile fetch
1171
* request in commit phase, the same will be done
1172
* in post validate phase with v3 framework.
1179
if (volinfo->shd.svc.manager(&(volinfo->shd.svc), volinfo,
1180
PROC_START_NO_WAIT)) {
1181
gf_msg(this->name, GF_LOG_CRITICAL, 0,
1182
GD_MSG_GLUSTER_SERVICE_START_FAIL,
1183
"Failed to start shd for %s.", volinfo->volname);
1187
gf_msg_debug(this->name, 0, "Returning %d", ret);
1192
glusterd_op_perform_remove_brick(glusterd_volinfo_t *volinfo, char *brick,
1193
int force, int *need_migrate)
1195
glusterd_brickinfo_t *brickinfo = NULL;
1201
ret = glusterd_volume_brickinfo_get_by_brick(brick, volinfo, &brickinfo,
1206
ret = glusterd_resolve_brick(brickinfo);
1210
glusterd_volinfo_reset_defrag_stats(volinfo);
1212
if (!gf_uuid_compare(brickinfo->uuid, MY_UUID)) {
1213
/* Only if the brick is in this glusterd, do the rebalance */
1219
ret = glusterd_brick_stop(volinfo, brickinfo, _gf_true);
1221
gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_STOP_FAIL,
1223
"glusterfs, ret: %d",
1229
brickinfo->decommissioned = 1;
1232
gf_msg_debug("glusterd", 0, "Returning %d", ret);
1237
glusterd_op_stage_add_brick(dict_t *dict, char **op_errstr, dict_t *rsp_dict)
1240
char *volname = NULL;
1242
int replica_count = 0;
1244
int32_t local_brick_count = 0;
1245
char *bricks = NULL;
1246
char *brick_list = NULL;
1247
char *saveptr = NULL;
1248
char *free_ptr = NULL;
1250
glusterd_brickinfo_t *brickinfo = NULL;
1251
glusterd_volinfo_t *volinfo = NULL;
1252
xlator_t *this = THIS;
1253
char msg[4096] = "";
1255
gf_boolean_t brick_alloc = _gf_false;
1256
char *all_bricks = NULL;
1257
char *str_ret = NULL;
1258
gf_boolean_t is_force = _gf_false;
1261
ret = dict_get_str(dict, "volname", &volname);
1263
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
1264
"Unable to get volume name");
1268
ret = glusterd_volinfo_find(volname, &volinfo);
1270
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOL_NOT_FOUND,
1271
"Unable to find volume: %s", volname);
1275
ret = glusterd_validate_volume_id(dict, volinfo);
1279
ret = dict_get_int32(dict, "replica-count", &replica_count);
1281
gf_msg_debug(this->name, 0, "Unable to get replica count");
1284
glusterd_add_peers_to_auth_list(volname);
1286
if (replica_count && glusterd_is_volume_replicate(volinfo)) {
1287
/* Do not allow add-brick for stopped volumes when replica-count
1288
* is being increased.
1290
if (GLUSTERD_STATUS_STOPPED == volinfo->status) {
1292
snprintf(msg, sizeof(msg),
1293
" Volume must not be in"
1294
" stopped state when replica-count needs to "
1296
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_ADD_FAIL, "%s",
1298
*op_errstr = gf_strdup(msg);
1302
/* Do not allow increasing replica count for arbiter volumes. */
1303
if (volinfo->arbiter_count) {
1305
snprintf(msg, sizeof(msg),
1306
"Increasing replica count "
1307
"for arbiter volumes is not supported.");
1308
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_ADD_FAIL, "%s",
1310
*op_errstr = gf_strdup(msg);
1315
is_force = dict_get_str_boolean(dict, "force", _gf_false);
1317
/* Check brick order if the volume type is replicate or disperse. If
1318
* force at the end of command not given then check brick order.
1319
* doing this check at the originator node is sufficient.
1322
if (!is_force && is_origin_glusterd(dict)) {
1324
if (volinfo->type == GF_CLUSTER_TYPE_REPLICATE) {
1325
gf_msg_debug(this->name, 0,
1326
"Replicate cluster type "
1327
"found. Checking brick order.");
1328
if (replica_count && (replica_count != volinfo->replica_count))
1329
ret = glusterd_check_brick_order(dict, msg, volinfo->type,
1330
&volname, &bricks, &count,
1333
ret = glusterd_check_brick_order(dict, msg, volinfo->type,
1334
&volname, &bricks, &count,
1335
volinfo->replica_count, 0);
1336
} else if (volinfo->type == GF_CLUSTER_TYPE_DISPERSE) {
1337
gf_msg_debug(this->name, 0,
1338
"Disperse cluster type"
1339
" found. Checking brick order.");
1340
ret = glusterd_check_brick_order(dict, msg, volinfo->type, &volname,
1342
volinfo->disperse_count, 0);
1345
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BAD_BRKORDER,
1346
"Not adding brick because of "
1347
"bad brick order. %s",
1349
*op_errstr = gf_strdup(msg);
1354
if (volinfo->replica_count < replica_count && !is_force) {
1355
cds_list_for_each_entry(brickinfo, &volinfo->bricks, brick_list)
1357
if (gf_uuid_compare(brickinfo->uuid, MY_UUID))
1359
if (brickinfo->status == GF_BRICK_STOPPED) {
1361
len = snprintf(msg, sizeof(msg),
1363
"is down, changing replica "
1364
"count needs all the bricks "
1365
"to be up to avoid data loss",
1368
strcpy(msg, "<error>");
1370
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_ADD_FAIL, "%s",
1372
*op_errstr = gf_strdup(msg);
1378
if (is_origin_glusterd(dict)) {
1379
ret = glusterd_validate_quorum(this, GD_OP_ADD_BRICK, dict, op_errstr);
1381
gf_msg(this->name, GF_LOG_CRITICAL, 0, GD_MSG_SERVER_QUORUM_NOT_MET,
1382
"Server quorum not met. Rejecting operation.");
1386
/* Case 1: conf->op_version <= GD_OP_VERSION_3_7_5
1387
* in this case the add-brick is running
1388
* syncop framework that will do a quorum
1390
* Case 2: We don't need to do quorum check on every
1391
* node, only originator glusterd need to
1393
* So nothing need to be done in else
1397
if (glusterd_is_defrag_on(volinfo)) {
1398
snprintf(msg, sizeof(msg),
1399
"Volume name %s rebalance is in "
1400
"progress. Please retry after completion",
1402
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_OIP_RETRY_LATER, "%s", msg);
1403
*op_errstr = gf_strdup(msg);
1408
if (volinfo->snap_count > 0 || !cds_list_empty(&volinfo->snap_volumes)) {
1409
snprintf(msg, sizeof(msg),
1410
"Volume %s has %" PRIu64
1412
"Changing the volume configuration will not effect snapshots."
1413
"But the snapshot brick mount should be intact to "
1414
"make them function.",
1415
volname, volinfo->snap_count);
1416
gf_msg("glusterd", GF_LOG_WARNING, 0, GD_MSG_SNAP_WARN, "%s", msg);
1421
ret = dict_get_int32(dict, "count", &count);
1423
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
1424
"Unable to get count");
1430
ret = dict_get_str(dict, "bricks", &bricks);
1432
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
1433
"Unable to get bricks");
1439
brick_list = gf_strdup(bricks);
1440
all_bricks = gf_strdup(bricks);
1441
free_ptr = brick_list;
1445
brick = strtok_r(brick_list + 1, " \n", &saveptr);
1448
if (!glusterd_store_is_valid_brickpath(volname, brick) ||
1449
!glusterd_is_valid_volfpath(volname, brick)) {
1450
snprintf(msg, sizeof(msg),
1454
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRKPATH_TOO_LONG, "%s",
1456
*op_errstr = gf_strdup(msg);
1462
ret = glusterd_brickinfo_new_from_brick(brick, &brickinfo, _gf_true,
1465
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_NOT_FOUND,
1467
" to get brickinfo");
1470
brick_alloc = _gf_true;
1472
ret = glusterd_new_brick_validate(brick, brickinfo, msg, sizeof(msg),
1475
*op_errstr = gf_strdup(msg);
1480
if (!gf_uuid_compare(brickinfo->uuid, MY_UUID)) {
1481
ret = glusterd_validate_and_create_brickpath(
1482
brickinfo, volinfo->volume_id, volinfo->volname, op_errstr,
1483
is_force, _gf_false);
1487
ret = glusterd_get_brick_mount_dir(
1488
brickinfo->path, brickinfo->hostname, brickinfo->mount_dir);
1490
gf_msg(this->name, GF_LOG_ERROR, 0,
1491
GD_MSG_BRICK_MOUNTDIR_GET_FAIL,
1492
"Failed to get brick mount_dir");
1496
snprintf(key, sizeof(key), "brick%d.mount_dir", i + 1);
1497
ret = dict_set_dynstr_with_alloc(rsp_dict, key,
1498
brickinfo->mount_dir);
1500
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
1501
"Failed to set %s", key);
1505
local_brick_count = i + 1;
1508
glusterd_brickinfo_delete(brickinfo);
1509
brick_alloc = _gf_false;
1511
brick = strtok_r(NULL, " \n", &saveptr);
1515
ret = dict_set_int32_sizen(rsp_dict, "brick_count", local_brick_count);
1517
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
1518
"Failed to set local_brick_count");
1524
if (brick_alloc && brickinfo)
1525
glusterd_brickinfo_delete(brickinfo);
1527
GF_FREE(all_bricks);
1529
gf_msg_debug(this->name, 0, "Returning %d", ret);
1535
glusterd_remove_brick_validate_bricks(gf1_op_commands cmd, int32_t brick_count,
1536
dict_t *dict, glusterd_volinfo_t *volinfo,
1538
gf_defrag_type_t cmd_defrag)
1541
char msg[2048] = "";
1544
glusterd_brickinfo_t *brickinfo = NULL;
1545
glusterd_peerinfo_t *peerinfo = NULL;
1548
char pidfile[PATH_MAX + 1] = {
1551
xlator_t *this = THIS;
1552
glusterd_conf_t *priv = this->private;
1555
/* Check whether all the nodes of the bricks to be removed are
1556
* up, if not fail the operation */
1557
for (i = 1; i <= brick_count; i++) {
1558
keylen = snprintf(key, sizeof(key), "brick%d", i);
1559
ret = dict_get_strn(dict, key, keylen, &brick);
1561
snprintf(msg, sizeof(msg), "Unable to get %s", key);
1562
gf_smsg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
1563
"key=%s", key, NULL);
1564
*errstr = gf_strdup(msg);
1568
ret = glusterd_volume_brickinfo_get_by_brick(brick, volinfo, &brickinfo,
1571
snprintf(msg, sizeof(msg),
1574
brick, volinfo->volname);
1575
gf_smsg(this->name, GF_LOG_ERROR, errno, GD_MSG_INCORRECT_BRICK,
1576
"Brick=%s, Volume=%s", brick, volinfo->volname, NULL);
1577
*errstr = gf_strdup(msg);
1580
/* Do not allow commit if the bricks are not decommissioned
1581
* if its a remove brick commit
1583
if (!brickinfo->decommissioned && cmd == GF_OP_CMD_COMMIT) {
1584
snprintf(msg, sizeof(msg),
1586
"is not decommissioned. "
1587
"Use start or force option",
1589
gf_smsg(this->name, GF_LOG_ERROR, errno, GD_MSG_BRICK_NOT_DECOM,
1590
"Use 'start' or 'force' option, Brick=%s", brick, NULL);
1591
*errstr = gf_strdup(msg);
1596
if (glusterd_is_local_brick(volinfo, brickinfo)) {
1598
case GF_OP_CMD_START:
1600
case GF_OP_CMD_NONE:
1605
switch (cmd_defrag) {
1606
case GF_DEFRAG_CMD_NONE:
1611
if (brickinfo->status != GF_BRICK_STARTED) {
1612
snprintf(msg, sizeof(msg),
1614
"brick %s. Use force option to "
1615
"remove the offline brick",
1618
this->name, GF_LOG_ERROR, errno, GD_MSG_BRICK_STOPPED,
1619
"Use 'force' option to remove the offline brick, Brick=%s",
1621
*errstr = gf_strdup(msg);
1625
GLUSTERD_GET_BRICK_PIDFILE(pidfile, volinfo, brickinfo, priv);
1626
if (!gf_is_service_running(pidfile, &pid)) {
1627
snprintf(msg, sizeof(msg),
1631
gf_smsg(this->name, GF_LOG_ERROR, errno, GD_MSG_BRICK_DEAD,
1632
"Brick=%s", brick, NULL);
1633
*errstr = gf_strdup(msg);
1643
peerinfo = glusterd_peerinfo_find_by_uuid(brickinfo->uuid);
1646
snprintf(msg, sizeof(msg),
1648
"brick %s is not in cluster",
1650
gf_smsg(this->name, GF_LOG_ERROR, errno,
1651
GD_MSG_BRICK_HOST_NOT_FOUND, "Brick=%s", brick, NULL);
1652
*errstr = gf_strdup(msg);
1656
if (!peerinfo->connected) {
1658
snprintf(msg, sizeof(msg),
1662
gf_smsg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_HOST_DOWN,
1663
"Brick=%s", brick, NULL);
1664
*errstr = gf_strdup(msg);
1676
glusterd_op_stage_remove_brick(dict_t *dict, char **op_errstr)
1679
char *volname = NULL;
1680
glusterd_volinfo_t *volinfo = NULL;
1681
char *errstr = NULL;
1682
int32_t brick_count = 0;
1683
char msg[2048] = "";
1685
gf1_op_commands cmd = GF_OP_CMD_NONE;
1686
char *task_id_str = NULL;
1687
xlator_t *this = THIS;
1688
gsync_status_param_t param = {
1692
ret = dict_get_str(dict, "volname", &volname);
1694
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
1695
"Unable to get volume name");
1699
ret = glusterd_volinfo_find(volname, &volinfo);
1702
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOL_NOT_FOUND,
1703
"Volume %s does not exist", volname);
1707
ret = glusterd_validate_volume_id(dict, volinfo);
1711
ret = dict_get_int32(dict, "command", &flag);
1713
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
1714
"Unable to get brick command");
1719
ret = dict_get_int32(dict, "count", &brick_count);
1721
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
1722
"Unable to get brick count");
1727
if (volinfo->brick_count == brick_count) {
1729
"Deleting all the bricks of the "
1730
"volume is not allowed");
1731
gf_smsg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_DELETE, NULL);
1738
case GF_OP_CMD_NONE:
1739
errstr = gf_strdup("no remove-brick command issued");
1740
gf_smsg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_NO_REMOVE_CMD,
1744
case GF_OP_CMD_STATUS:
1747
case GF_OP_CMD_START: {
1748
if ((volinfo->type == GF_CLUSTER_TYPE_REPLICATE) &&
1749
dict_get_sizen(dict, "replica-count")) {
1750
snprintf(msg, sizeof(msg),
1751
"Migration of data is not "
1752
"needed when reducing replica count. Use the"
1754
errstr = gf_strdup(msg);
1755
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_USE_THE_FORCE, "%s",
1760
if (GLUSTERD_STATUS_STARTED != volinfo->status) {
1761
snprintf(msg, sizeof(msg),
1763
"to be started before remove-brick "
1764
"(you can use 'force' or 'commit' "
1765
"to override this behavior)",
1767
errstr = gf_strdup(msg);
1768
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOL_NOT_STARTED,
1772
if (!gd_is_remove_brick_committed(volinfo)) {
1773
snprintf(msg, sizeof(msg),
1774
"An earlier remove-brick "
1775
"task exists for volume %s. Either commit it"
1776
" or stop it before starting a new task.",
1778
errstr = gf_strdup(msg);
1779
gf_msg(this->name, GF_LOG_ERROR, 0,
1780
GD_MSG_OLD_REMOVE_BRICK_EXISTS,
1781
"Earlier remove-brick"
1782
" task exists for volume %s.",
1786
if (glusterd_is_defrag_on(volinfo)) {
1788
"Rebalance is in progress. Please "
1789
"retry after completion");
1790
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_OIP_RETRY_LATER,
1795
if (volinfo->snap_count > 0 ||
1796
!cds_list_empty(&volinfo->snap_volumes)) {
1797
snprintf(msg, sizeof(msg),
1798
"Volume %s has %" PRIu64
1800
"Changing the volume configuration will not effect "
1802
"But the snapshot brick mount should be intact to "
1803
"make them function.",
1804
volname, volinfo->snap_count);
1805
gf_msg("glusterd", GF_LOG_WARNING, 0, GD_MSG_SNAP_WARN, "%s",
1810
ret = glusterd_remove_brick_validate_bricks(
1811
cmd, brick_count, dict, volinfo, &errstr, GF_DEFRAG_CMD_NONE);
1815
if (is_origin_glusterd(dict)) {
1816
ret = glusterd_generate_and_set_task_id(
1817
dict, GF_REMOVE_BRICK_TID_KEY,
1818
SLEN(GF_REMOVE_BRICK_TID_KEY));
1820
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_TASKID_GEN_FAIL,
1821
"Failed to generate task-id");
1825
ret = dict_get_str(dict, GF_REMOVE_BRICK_TID_KEY, &task_id_str);
1827
gf_msg(this->name, GF_LOG_WARNING, -ret,
1828
GD_MSG_DICT_GET_FAILED, "Missing remove-brick-id");
1835
case GF_OP_CMD_STOP:
1839
case GF_OP_CMD_COMMIT:
1840
if (volinfo->decommission_in_progress) {
1842
"use 'force' option as migration "
1844
gf_smsg(this->name, GF_LOG_WARNING, 0, GD_MSG_MIGRATION_PROG,
1845
"Use 'force' option", NULL);
1849
if (volinfo->rebal.defrag_status == GF_DEFRAG_STATUS_FAILED) {
1851
"use 'force' option as migration "
1853
gf_smsg(this->name, GF_LOG_WARNING, 0, GD_MSG_MIGRATION_FAIL,
1854
"Use 'force' option", NULL);
1858
if (volinfo->rebal.defrag_status == GF_DEFRAG_STATUS_COMPLETE) {
1859
if (volinfo->rebal.rebalance_failures > 0 ||
1860
volinfo->rebal.skipped_files > 0) {
1862
"use 'force' option as migration "
1863
"of some files might have been skipped or "
1865
gf_smsg(this->name, GF_LOG_WARNING, 0,
1866
GD_MSG_MIGRATION_FAIL,
1867
"Use 'force' option, some files might have been "
1874
ret = glusterd_remove_brick_validate_bricks(
1875
cmd, brick_count, dict, volinfo, &errstr, GF_DEFRAG_CMD_NONE);
1879
/* If geo-rep is configured, for this volume, it should be
1882
param.volinfo = volinfo;
1883
ret = glusterd_check_geo_rep_running(¶m, op_errstr);
1884
if (ret || param.is_active) {
1891
case GF_OP_CMD_COMMIT_FORCE:
1892
case GF_OP_CMD_DETACH_START:
1893
case GF_OP_CMD_DETACH_COMMIT:
1894
case GF_OP_CMD_DETACH_COMMIT_FORCE:
1895
case GF_OP_CMD_STOP_DETACH_TIER:
1901
gf_msg_debug(this->name, 0, "Returning %d", ret);
1902
if (ret && errstr) {
1904
*op_errstr = errstr;
1906
if (!op_errstr && errstr)
1912
glusterd_remove_brick_migrate_cbk(glusterd_volinfo_t *volinfo,
1913
gf_defrag_status_t status)
1917
#if 0 /* TODO: enable this behavior once cluster-wide awareness comes for \
1918
defrag cbk function */
1919
glusterd_brickinfo_t *brickinfo = NULL;
1920
glusterd_brickinfo_t *tmp = NULL;
1923
case GF_DEFRAG_STATUS_PAUSED:
1924
case GF_DEFRAG_STATUS_FAILED:
1925
/* No changes required in the volume file.
1926
everything should remain as is */
1928
case GF_DEFRAG_STATUS_STOPPED:
1929
/* Fall back to the old volume file */
1930
cds_list_for_each_entry_safe (brickinfo, tmp, &volinfo->bricks,
1932
if (!brickinfo->decommissioned)
1934
brickinfo->decommissioned = 0;
1938
case GF_DEFRAG_STATUS_COMPLETE:
1939
/* Done with the task, you can remove the brick from the
1941
cds_list_for_each_entry_safe (brickinfo, tmp, &volinfo->bricks,
1943
if (!brickinfo->decommissioned)
1945
gf_log (THIS->name, GF_LOG_INFO, "removing the brick %s",
1947
brickinfo->decommissioned = 0;
1948
if (GLUSTERD_STATUS_STARTED == volinfo->status) {
1949
/*TODO: use the 'atomic' flavour of brick_stop*/
1950
ret = glusterd_brick_stop (volinfo, brickinfo);
1952
gf_log (THIS->name, GF_LOG_ERROR,
1953
"Unable to stop glusterfs (%d)", ret);
1956
glusterd_delete_brick (volinfo, brickinfo);
1961
GF_ASSERT (!"cbk function called with wrong status");
1965
ret = glusterd_create_volfiles_and_notify_services (volinfo);
1967
gf_log (THIS->name, GF_LOG_ERROR,
1968
"Unable to write volume files (%d)", ret);
1970
ret = glusterd_store_volinfo (volinfo, GLUSTERD_VOLINFO_VER_AC_INCREMENT);
1972
gf_log (THIS->name, GF_LOG_ERROR,
1973
"Unable to store volume info (%d)", ret);
1976
if (GLUSTERD_STATUS_STARTED == volinfo->status) {
1977
ret = glusterd_check_generate_start_nfs ();
1979
gf_log (THIS->name, GF_LOG_ERROR,
1980
"Unable to start nfs process (%d)", ret);
1985
volinfo->decommission_in_progress = 0;
1990
glusterd_op_add_brick(dict_t *dict, char **op_errstr)
1993
char *volname = NULL;
1994
glusterd_volinfo_t *volinfo = NULL;
1995
char *bricks = NULL;
1998
ret = dict_get_str(dict, "volname", &volname);
2001
gf_msg("glusterd", GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2002
"Unable to get volume name");
2006
ret = glusterd_volinfo_find(volname, &volinfo);
2009
gf_msg("glusterd", GF_LOG_ERROR, EINVAL, GD_MSG_VOL_NOT_FOUND,
2010
"Unable to allocate memory");
2014
ret = dict_get_int32(dict, "count", &count);
2016
gf_msg("glusterd", GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2017
"Unable to get count");
2021
ret = dict_get_str(dict, "bricks", &bricks);
2023
gf_msg("glusterd", GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2024
"Unable to get bricks");
2028
ret = glusterd_op_perform_add_bricks(volinfo, count, bricks, dict);
2030
gf_msg("glusterd", GF_LOG_ERROR, 0, GD_MSG_BRICK_ADD_FAIL,
2031
"Unable to add bricks");
2035
if (GLUSTERD_STATUS_STARTED == volinfo->status)
2036
ret = glusterd_svcs_manager(volinfo);
2043
glusterd_post_commit_brick_operation(dict_t *dict, char **op_errstr)
2046
char *volname = NULL;
2048
ret = dict_get_str(dict, "volname", &volname);
2051
gf_msg(THIS->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2052
"Unable to get volume name");
2055
ret = glusterd_replace_old_auth_allow_list(volname);
2061
glusterd_set_rebalance_id_for_remove_brick(dict_t *req_dict, dict_t *rsp_dict)
2064
char *volname = NULL;
2065
glusterd_volinfo_t *volinfo = NULL;
2066
char msg[2048] = {0};
2067
char *task_id_str = NULL;
2068
xlator_t *this = THIS;
2071
GF_ASSERT(rsp_dict);
2072
GF_ASSERT(req_dict);
2074
ret = dict_get_str(rsp_dict, "volname", &volname);
2076
gf_msg_debug(this->name, 0, "volname not found");
2080
ret = glusterd_volinfo_find(volname, &volinfo);
2082
gf_msg(this->name, GF_LOG_ERROR, EINVAL, GD_MSG_VOL_NOT_FOUND,
2083
"Unable to allocate memory");
2087
ret = dict_get_int32(rsp_dict, "command", &cmd);
2089
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2090
"Unable to get command");
2094
/* remove brick task id is generted in glusterd_op_stage_remove_brick(),
2095
* but rsp_dict is unavailable there. So copying it to rsp_dict from
2098
if (is_origin_glusterd(rsp_dict)) {
2099
ret = dict_get_str(req_dict, GF_REMOVE_BRICK_TID_KEY, &task_id_str);
2101
snprintf(msg, sizeof(msg), "Missing rebalance id for remove-brick");
2102
gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_REBALANCE_ID_MISSING,
2106
gf_uuid_parse(task_id_str, volinfo->rebal.rebalance_id);
2108
ret = glusterd_copy_uuid_to_dict(volinfo->rebal.rebalance_id,
2109
rsp_dict, GF_REMOVE_BRICK_TID_KEY,
2110
SLEN(GF_REMOVE_BRICK_TID_KEY));
2112
gf_msg(this->name, GF_LOG_ERROR, 0,
2113
GD_MSG_REMOVE_BRICK_ID_SET_FAIL,
2114
"Failed to set remove-brick-id");
2119
if (!gf_uuid_is_null(volinfo->rebal.rebalance_id) &&
2120
GD_OP_REMOVE_BRICK == volinfo->rebal.op) {
2121
ret = glusterd_copy_uuid_to_dict(volinfo->rebal.rebalance_id, rsp_dict,
2122
GF_REMOVE_BRICK_TID_KEY,
2123
SLEN(GF_REMOVE_BRICK_TID_KEY));
2125
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
2126
"Failed to set task-id for volume %s", volname);
2134
glusterd_op_remove_brick(dict_t *dict, char **op_errstr)
2137
char *volname = NULL;
2138
glusterd_volinfo_t *volinfo = NULL;
2145
int need_rebalance = 0;
2147
gf1_op_commands cmd = 0;
2148
int32_t replica_count = 0;
2149
char *task_id_str = NULL;
2150
xlator_t *this = THIS;
2151
dict_t *bricks_dict = NULL;
2152
char *brick_tmpstr = NULL;
2153
int start_remove = 0;
2154
uint32_t commit_hash = 0;
2156
glusterd_conf_t *conf = NULL;
2158
conf = this->private;
2159
GF_VALIDATE_OR_GOTO(this->name, conf, out);
2161
ret = dict_get_str(dict, "volname", &volname);
2164
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_ADD_FAIL,
2165
"Unable to get volume name");
2169
ret = glusterd_volinfo_find(volname, &volinfo);
2171
gf_msg(this->name, GF_LOG_ERROR, EINVAL, GD_MSG_VOL_NOT_FOUND,
2172
"Unable to allocate memory");
2176
ret = dict_get_int32(dict, "command", &flag);
2178
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2179
"Unable to get command");
2184
if (GF_OP_CMD_START == cmd)
2187
/* Set task-id, if available, in ctx dict for operations other than
2191
if (is_origin_glusterd(dict) && (!start_remove)) {
2192
if (!gf_uuid_is_null(volinfo->rebal.rebalance_id)) {
2193
ret = glusterd_copy_uuid_to_dict(volinfo->rebal.rebalance_id, dict,
2194
GF_REMOVE_BRICK_TID_KEY,
2195
SLEN(GF_REMOVE_BRICK_TID_KEY));
2197
gf_msg(this->name, GF_LOG_ERROR, 0,
2198
GD_MSG_REMOVE_BRICK_ID_SET_FAIL,
2199
"Failed to set remove-brick-id");
2205
/* Clear task-id, rebal.op and stored bricks on commmitting/stopping
2207
if ((!start_remove) && (cmd != GF_OP_CMD_STATUS)) {
2208
gf_uuid_clear(volinfo->rebal.rebalance_id);
2209
volinfo->rebal.op = GD_OP_NONE;
2210
dict_unref(volinfo->rebal.dict);
2211
volinfo->rebal.dict = NULL;
2216
case GF_OP_CMD_NONE:
2219
case GF_OP_CMD_STATUS:
2223
case GF_OP_CMD_STOP:
2224
case GF_OP_CMD_START:
2225
/* Reset defrag status to 'NOT STARTED' whenever a
2226
* remove-brick/rebalance command is issued to remove
2227
* stale information from previous run.
2228
* Update defrag_cmd as well or it will only be done
2229
* for nodes on which the brick to be removed exists.
2231
/* coverity[MIXED_ENUMS] */
2232
volinfo->rebal.defrag_cmd = cmd;
2233
volinfo->rebal.defrag_status = GF_DEFRAG_STATUS_NOT_STARTED;
2234
ret = dict_get_str(dict, GF_REMOVE_BRICK_TID_KEY, &task_id_str);
2236
gf_msg_debug(this->name, errno, "Missing remove-brick-id");
2239
gf_uuid_parse(task_id_str, volinfo->rebal.rebalance_id);
2240
volinfo->rebal.op = GD_OP_REMOVE_BRICK;
2245
case GF_OP_CMD_COMMIT:
2249
case GF_OP_CMD_COMMIT_FORCE:
2251
if (volinfo->decommission_in_progress) {
2252
if (volinfo->rebal.defrag) {
2253
LOCK(&volinfo->rebal.defrag->lock);
2254
/* Fake 'rebalance-complete' so the graph change
2255
happens right away */
2256
volinfo->rebal.defrag_status = GF_DEFRAG_STATUS_COMPLETE;
2258
UNLOCK(&volinfo->rebal.defrag->lock);
2260
/* Graph change happens in rebalance _cbk function,
2261
no need to do anything here */
2262
/* TODO: '_cbk' function is not doing anything for now */
2268
case GF_OP_CMD_DETACH_START:
2269
case GF_OP_CMD_DETACH_COMMIT_FORCE:
2270
case GF_OP_CMD_DETACH_COMMIT:
2271
case GF_OP_CMD_STOP_DETACH_TIER:
2275
ret = dict_get_int32(dict, "count", &count);
2277
gf_msg("glusterd", GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2278
"Unable to get count");
2281
/* Save the list of bricks for later usage only on starting a
2282
* remove-brick. Right now this is required for displaying the task
2283
* parameters with task status in volume status.
2287
bricks_dict = dict_new();
2292
ret = dict_set_int32_sizen(bricks_dict, "count", count);
2294
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
2295
"Failed to save remove-brick count");
2300
while (i <= count) {
2301
keylen = snprintf(key, sizeof(key), "brick%d", i);
2302
ret = dict_get_strn(dict, key, keylen, &brick);
2304
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2305
"Unable to get %s", key);
2310
brick_tmpstr = gf_strdup(brick);
2311
if (!brick_tmpstr) {
2313
gf_msg(this->name, GF_LOG_ERROR, ENOMEM, GD_MSG_NO_MEMORY,
2314
"Failed to duplicate brick name");
2317
ret = dict_set_dynstrn(bricks_dict, key, keylen, brick_tmpstr);
2319
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
2320
"Failed to add brick to dict");
2323
brick_tmpstr = NULL;
2326
ret = glusterd_op_perform_remove_brick(volinfo, brick, force,
2334
volinfo->rebal.dict = dict_ref(bricks_dict);
2336
ret = dict_get_int32(dict, "replica-count", &replica_count);
2338
gf_msg(this->name, GF_LOG_INFO, -ret, GD_MSG_DICT_GET_FAILED,
2339
"changing replica count %d to %d on volume %s",
2340
volinfo->replica_count, replica_count, volinfo->volname);
2341
volinfo->replica_count = replica_count;
2342
/* A reduction in replica count implies an arbiter volume
2343
* earlier is now no longer one. */
2344
if (volinfo->arbiter_count)
2345
volinfo->arbiter_count = 0;
2346
volinfo->sub_count = replica_count;
2347
volinfo->dist_leaf_count = glusterd_get_dist_leaf_count(volinfo);
2350
* volinfo->type and sub_count have already been set for
2351
* volumes undergoing a detach operation, they should not
2354
if (replica_count == 1) {
2355
if (volinfo->type == GF_CLUSTER_TYPE_REPLICATE) {
2356
volinfo->type = GF_CLUSTER_TYPE_NONE;
2357
/* backward compatibility */
2358
volinfo->sub_count = 0;
2362
volinfo->subvol_count = (volinfo->brick_count / volinfo->dist_leaf_count);
2364
if (!glusterd_is_volume_replicate(volinfo) &&
2365
conf->op_version >= GD_OP_VERSION_3_12_2) {
2366
ret = dict_set_sizen_str_sizen(volinfo->dict,
2367
"performance.client-io-threads", "on");
2369
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
2371
"performance.client-io-threads to on");
2376
ret = glusterd_create_volfiles_and_notify_services(volinfo);
2378
gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_VOLFILE_CREATE_FAIL,
2379
"failed to create volfiles");
2383
ret = glusterd_store_volinfo(volinfo, GLUSTERD_VOLINFO_VER_AC_INCREMENT);
2385
gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_VOLINFO_STORE_FAIL,
2386
"failed to store volinfo");
2390
if (start_remove && volinfo->status == GLUSTERD_STATUS_STARTED) {
2391
ret = glusterd_svcs_reconfigure(volinfo);
2393
gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_NFS_RECONF_FAIL,
2394
"Unable to reconfigure NFS-Server");
2399
/* Need to reset the defrag/rebalance status accordingly */
2400
switch (volinfo->rebal.defrag_status) {
2401
case GF_DEFRAG_STATUS_FAILED:
2402
case GF_DEFRAG_STATUS_COMPLETE:
2403
volinfo->rebal.defrag_status = 0;
2408
if (!force && need_rebalance) {
2409
if (dict_get_uint32(dict, "commit-hash", &commit_hash) == 0) {
2410
volinfo->rebal.commit_hash = commit_hash;
2412
/* perform the rebalance operations */
2413
defrag_cmd = GF_DEFRAG_CMD_START_FORCE;
2415
* We need to set this *before* we issue commands to the
2416
* bricks, or else we might end up setting it after the bricks
2417
* have responded. If we fail to send the request(s) we'll
2418
* clear it ourselves because nobody else will.
2420
volinfo->decommission_in_progress = 1;
2421
char err_str[4096] = "";
2422
ret = glusterd_handle_defrag_start(
2423
volinfo, err_str, sizeof(err_str), defrag_cmd,
2424
glusterd_remove_brick_migrate_cbk, GD_OP_REMOVE_BRICK);
2427
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_REBALANCE_START_FAIL,
2428
"failed to start the rebalance");
2429
/* TBD: shouldn't we do more than print a message? */
2430
volinfo->decommission_in_progress = 0;
2432
*op_errstr = gf_strdup(err_str);
2435
if (GLUSTERD_STATUS_STARTED == volinfo->status)
2436
ret = glusterd_svcs_manager(volinfo);
2439
GF_FREE(brick_tmpstr);
2441
dict_unref(bricks_dict);
2442
gf_msg_debug(this->name, 0, "returning %d ", ret);
2447
glusterd_op_stage_barrier(dict_t *dict, char **op_errstr)
2450
xlator_t *this = THIS;
2451
char *volname = NULL;
2452
glusterd_volinfo_t *vol = NULL;
2453
char *barrier_op = NULL;
2457
ret = dict_get_str(dict, "volname", &volname);
2459
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2460
"Volname not present in "
2465
ret = glusterd_volinfo_find(volname, &vol);
2467
gf_asprintf(op_errstr, "Volume %s does not exist", volname);
2468
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOL_NOT_FOUND, "%s",
2473
if (!glusterd_is_volume_started(vol)) {
2474
gf_asprintf(op_errstr, "Volume %s is not started", volname);
2479
ret = dict_get_str(dict, "barrier", &barrier_op);
2481
gf_asprintf(op_errstr,
2482
"Barrier op for volume %s not present "
2485
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED, "%s",
2491
gf_msg_debug(this->name, 0, "Returning %d", ret);
2496
glusterd_op_barrier(dict_t *dict, char **op_errstr)
2499
xlator_t *this = THIS;
2500
char *volname = NULL;
2501
glusterd_volinfo_t *vol = NULL;
2502
char *barrier_op = NULL;
2506
ret = dict_get_str(dict, "volname", &volname);
2508
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
2509
"Volname not present in "
2514
ret = glusterd_volinfo_find(volname, &vol);
2516
gf_asprintf(op_errstr, "Volume %s does not exist", volname);
2517
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOL_NOT_FOUND, "%s",
2522
ret = dict_get_str(dict, "barrier", &barrier_op);
2524
gf_asprintf(op_errstr,
2525
"Barrier op for volume %s not present "
2528
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED, "%s",
2533
ret = dict_set_dynstr_with_alloc(vol->dict, "features.barrier", barrier_op);
2535
gf_msg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
2536
"Failed to set barrier op in"
2537
" volume option dict");
2541
gd_update_volume_op_versions(vol);
2542
ret = glusterd_create_volfiles(vol);
2544
gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOLFILE_CREATE_FAIL,
2545
"Failed to create volfiles");
2548
ret = glusterd_store_volinfo(vol, GLUSTERD_VOLINFO_VER_AC_INCREMENT);
2551
gf_msg_debug(this->name, 0, "Returning %d", ret);
2556
glusterd_handle_add_tier_brick(rpcsvc_request_t *req)
2562
glusterd_handle_attach_tier(rpcsvc_request_t *req)