glusterfs

Форк
0
/
glusterd-volume-ops.c 
2989 строк · 89.1 Кб
1
/*
2
   Copyright (c) 2011-2012 Red Hat, Inc. <http://www.redhat.com>
3
   This file is part of GlusterFS.
4

5
   This file is licensed to you under your choice of the GNU Lesser
6
   General Public License, version 3 or any later version (LGPLv3 or
7
   later), or the GNU General Public License, version 2 (GPLv2), in all
8
   cases as published by the Free Software Foundation.
9
*/
10
#include <glusterfs/common-utils.h>
11
#include <glusterfs/syscall.h>
12
#include "glusterd.h"
13
#include "glusterd-op-sm.h"
14
#include "glusterd-geo-rep.h"
15
#include "glusterd-store.h"
16
#include "glusterd-utils.h"
17
#include "glusterd-volgen.h"
18
#include "glusterd-messages.h"
19
#include <glusterfs/run.h>
20
#include "glusterd-snapshot-utils.h"
21
#include "glusterd-svc-mgmt.h"
22
#include "glusterd-svc-helper.h"
23
#include "glusterd-shd-svc.h"
24
#include "glusterd-snapd-svc.h"
25
#include "glusterd-mgmt.h"
26
#include "glusterd-server-quorum.h"
27

28
#include <stdint.h>
29
#include <sys/socket.h>
30
#include <netdb.h>
31
#include <sys/types.h>
32
#include <netinet/in.h>
33
#include <stdlib.h>
34

35
#define glusterd_op_start_volume_args_get(dict, volname, flags)                \
36
    glusterd_op_stop_volume_args_get(dict, volname, flags)
37

38
int
39
__glusterd_handle_create_volume(rpcsvc_request_t *req)
40
{
41
    int32_t ret = -1;
42
    gf_cli_req cli_req = {{
43
        0,
44
    }};
45
    dict_t *dict = NULL;
46
    char *bricks = NULL;
47
    char *volname = NULL;
48
    int brick_count = 0;
49
    int thin_arbiter_count = 0;
50
    void *cli_rsp = NULL;
51
    char err_str[2048] = {
52
        0,
53
    };
54
    gf_cli_rsp rsp = {
55
        0,
56
    };
57
    xlator_t *this = THIS;
58
    glusterd_conf_t *conf = NULL;
59
    char *free_ptr = NULL;
60
    char *trans_type = NULL;
61
    char *address_family_str = NULL;
62
    uuid_t volume_id = {
63
        0,
64
    };
65
    uuid_t tmp_uuid = {0};
66
    int32_t type = 0;
67
    char *username = NULL;
68
    char *password = NULL;
69
#ifdef IPV6_DEFAULT
70
    char *addr_family = "inet6";
71
#else
72
    char *addr_family = "inet";
73
#endif
74
    glusterd_volinfo_t *volinfo = NULL;
75

76
    GF_ASSERT(req);
77

78
    conf = this->private;
79
    GF_VALIDATE_OR_GOTO(this->name, conf, out);
80

81
    ret = -1;
82
    ret = xdr_to_generic(req->msg[0], &cli_req, (xdrproc_t)xdr_gf_cli_req);
83
    if (ret < 0) {
84
        req->rpc_err = GARBAGE_ARGS;
85
        snprintf(err_str, sizeof(err_str),
86
                 "Failed to decode request "
87
                 "received from cli");
88
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_REQ_DECODE_FAIL, "%s",
89
               err_str);
90
        goto out;
91
    }
92

93
    gf_msg_debug(this->name, 0, "Received create volume req");
94

95
    if (cli_req.dict.dict_len) {
96
        /* Unserialize the dictionary */
97
        dict = dict_new();
98

99
        ret = dict_unserialize(cli_req.dict.dict_val, cli_req.dict.dict_len,
100
                               &dict);
101
        if (ret < 0) {
102
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_UNSERIALIZE_FAIL,
103
                   "failed to "
104
                   "unserialize req-buffer to dictionary");
105
            snprintf(err_str, sizeof(err_str),
106
                     "Unable to decode "
107
                     "the command");
108
            goto out;
109
        } else {
110
            dict->extra_stdfree = cli_req.dict.dict_val;
111
        }
112
    }
113

114
    ret = dict_get_str(dict, "volname", &volname);
115
    if (ret) {
116
        snprintf(err_str, sizeof(err_str),
117
                 "Unable to get volume "
118
                 "name");
119
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, "%s",
120
               err_str);
121
        goto out;
122
    }
123

124
    ret = glusterd_volinfo_find(volname, &volinfo);
125
    if (!ret) {
126
        ret = -1;
127
        snprintf(err_str, sizeof(err_str), "Volume %s already exists", volname);
128
        gf_msg(this->name, GF_LOG_ERROR, EEXIST, GD_MSG_VOL_ALREADY_EXIST, "%s",
129
               err_str);
130
        goto out;
131
    }
132

133
    ret = dict_get_int32(dict, "count", &brick_count);
134
    if (ret) {
135
        snprintf(err_str, sizeof(err_str),
136
                 "Unable to get brick count"
137
                 " for volume %s",
138
                 volname);
139
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, "%s",
140
               err_str);
141
        goto out;
142
    }
143

144
    ret = dict_get_int32(dict, "type", &type);
145
    if (ret) {
146
        snprintf(err_str, sizeof(err_str),
147
                 "Unable to get type of "
148
                 "volume %s",
149
                 volname);
150
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, "%s",
151
               err_str);
152
        goto out;
153
    }
154

155
    ret = dict_get_str(dict, "transport", &trans_type);
156
    if (ret) {
157
        snprintf(err_str, sizeof(err_str),
158
                 "Unable to get "
159
                 "transport-type of volume %s",
160
                 volname);
161
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, "%s",
162
               err_str);
163
        goto out;
164
    }
165

166
    ret = dict_get_str(this->options, "transport.address-family",
167
                       &address_family_str);
168

169
    if (!ret) {
170
        ret = dict_set_dynstr_with_alloc(dict, "transport.address-family",
171
                                         address_family_str);
172
        if (ret) {
173
            gf_log(this->name, GF_LOG_ERROR,
174
                   "failed to set transport.address-family");
175
            goto out;
176
        }
177
    } else if (!strcmp(trans_type, "tcp")) {
178
        /* Setting default as inet for trans_type tcp if the op-version
179
         * is >= 3.8.0
180
         */
181
        if (conf->op_version >= GD_OP_VERSION_3_8_0) {
182
            ret = dict_set_dynstr_with_alloc(dict, "transport.address-family",
183
                                             addr_family);
184
            if (ret) {
185
                gf_log(this->name, GF_LOG_ERROR,
186
                       "failed to set "
187
                       "transport.address-family "
188
                       "to %s",
189
                       addr_family);
190
                goto out;
191
            }
192
        }
193
    }
194
    ret = dict_get_str(dict, "bricks", &bricks);
195
    if (ret) {
196
        snprintf(err_str, sizeof(err_str),
197
                 "Unable to get bricks for "
198
                 "volume %s",
199
                 volname);
200
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, "%s",
201
               err_str);
202
        goto out;
203
    }
204

205
    ret = dict_get_int32(dict, "thin-arbiter-count", &thin_arbiter_count);
206
    if (thin_arbiter_count && conf->op_version < GD_OP_VERSION_7_0) {
207
        snprintf(err_str, sizeof(err_str),
208
                 "Cannot execute command. "
209
                 "The cluster is operating at version %d. "
210
                 "Thin-arbiter volume creation is unavailable in "
211
                 "this version",
212
                 conf->op_version);
213
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_GLUSTERD_OP_FAILED, "%s",
214
               err_str);
215
        ret = -1;
216
        goto out;
217
    }
218

219
    if (!dict_get_sizen(dict, "force")) {
220
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
221
               "Failed to get 'force' flag");
222
        goto out;
223
    }
224

225
    gf_uuid_generate(volume_id);
226
    free_ptr = gf_strdup(uuid_utoa(volume_id));
227
    ret = dict_set_dynstr_sizen(dict, "volume-id", free_ptr);
228
    if (ret) {
229
        snprintf(err_str, sizeof(err_str),
230
                 "Unable to set volume "
231
                 "id of volume %s",
232
                 volname);
233
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED, "%s",
234
               err_str);
235
        goto out;
236
    }
237
    free_ptr = NULL;
238

239
    /* generate internal username and password */
240

241
    gf_uuid_generate(tmp_uuid);
242
    username = gf_strdup(uuid_utoa(tmp_uuid));
243
    ret = dict_set_dynstr_sizen(dict, "internal-username", username);
244
    if (ret) {
245
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
246
               "Failed to set username for "
247
               "volume %s",
248
               volname);
249
        goto out;
250
    }
251

252
    gf_uuid_generate(tmp_uuid);
253
    password = gf_strdup(uuid_utoa(tmp_uuid));
254
    ret = dict_set_dynstr_sizen(dict, "internal-password", password);
255
    if (ret) {
256
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
257
               "Failed to set password for "
258
               "volume %s",
259
               volname);
260
        goto out;
261
    }
262

263
    ret = glusterd_op_begin_synctask(req, GD_OP_CREATE_VOLUME, dict);
264

265
out:
266
    if (ret) {
267
        rsp.op_ret = -1;
268
        rsp.op_errno = 0;
269
        if (err_str[0] == '\0')
270
            snprintf(err_str, sizeof(err_str), "Operation failed");
271
        rsp.op_errstr = err_str;
272
        cli_rsp = &rsp;
273
        glusterd_to_cli(req, cli_rsp, NULL, 0, NULL, (xdrproc_t)xdr_gf_cli_rsp,
274
                        dict);
275
        ret = 0;  // Client response sent, prevent second response
276
    }
277

278
    GF_FREE(free_ptr);
279

280
    return ret;
281
}
282

283
int
284
glusterd_handle_create_volume(rpcsvc_request_t *req)
285
{
286
    return glusterd_big_locked_handler(req, __glusterd_handle_create_volume);
287
}
288

289
int
290
__glusterd_handle_cli_start_volume(rpcsvc_request_t *req)
291
{
292
    int32_t ret = -1;
293
    gf_cli_req cli_req = {{
294
        0,
295
    }};
296
    char *volname = NULL;
297
    dict_t *dict = NULL;
298
    glusterd_op_t cli_op = GD_OP_START_VOLUME;
299
    char errstr[2048] = {
300
        0,
301
    };
302
    xlator_t *this = THIS;
303

304
    GF_ASSERT(req);
305

306
    ret = xdr_to_generic(req->msg[0], &cli_req, (xdrproc_t)xdr_gf_cli_req);
307
    if (ret < 0) {
308
        snprintf(errstr, sizeof(errstr),
309
                 "Failed to decode message "
310
                 "received from cli");
311
        req->rpc_err = GARBAGE_ARGS;
312
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_REQ_DECODE_FAIL, "%s",
313
               errstr);
314
        goto out;
315
    }
316

317
    if (cli_req.dict.dict_len) {
318
        /* Unserialize the dictionary */
319
        dict = dict_new();
320

321
        ret = dict_unserialize(cli_req.dict.dict_val, cli_req.dict.dict_len,
322
                               &dict);
323
        if (ret < 0) {
324
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_UNSERIALIZE_FAIL,
325
                   "failed to "
326
                   "unserialize req-buffer to dictionary");
327
            snprintf(errstr, sizeof(errstr),
328
                     "Unable to decode "
329
                     "the command");
330
            goto out;
331
        }
332
    }
333

334
    ret = dict_get_str(dict, "volname", &volname);
335
    if (ret) {
336
        snprintf(errstr, sizeof(errstr), "Unable to get volume name");
337
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, "%s",
338
               errstr);
339
        goto out;
340
    }
341

342
    gf_msg_debug(this->name, 0,
343
                 "Received start vol req"
344
                 " for volume %s",
345
                 volname);
346

347
    ret = glusterd_mgmt_v3_initiate_all_phases(req, GD_OP_START_VOLUME, dict);
348
out:
349
    free(cli_req.dict.dict_val);  // its malloced by xdr
350

351
    if (ret) {
352
        if (errstr[0] == '\0')
353
            snprintf(errstr, sizeof(errstr), "Operation failed");
354
        ret = glusterd_op_send_cli_response(cli_op, ret, 0, req, dict, errstr);
355
    }
356

357
    return ret;
358
}
359

360
int
361
glusterd_handle_cli_start_volume(rpcsvc_request_t *req)
362
{
363
    return glusterd_big_locked_handler(req, __glusterd_handle_cli_start_volume);
364
}
365

366
int
367
__glusterd_handle_cli_stop_volume(rpcsvc_request_t *req)
368
{
369
    int32_t ret = -1;
370
    gf_cli_req cli_req = {{
371
        0,
372
    }};
373
    char *dup_volname = NULL;
374
    dict_t *dict = NULL;
375
    glusterd_op_t cli_op = GD_OP_STOP_VOLUME;
376
    xlator_t *this = THIS;
377
    char err_str[64] = {
378
        0,
379
    };
380
    glusterd_conf_t *conf = NULL;
381

382
    GF_ASSERT(req);
383
    conf = this->private;
384
    GF_ASSERT(conf);
385

386
    ret = xdr_to_generic(req->msg[0], &cli_req, (xdrproc_t)xdr_gf_cli_req);
387
    if (ret < 0) {
388
        snprintf(err_str, sizeof(err_str),
389
                 "Failed to decode message "
390
                 "received from cli");
391
        req->rpc_err = GARBAGE_ARGS;
392
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_REQ_DECODE_FAIL, "%s",
393
               err_str);
394
        goto out;
395
    }
396
    if (cli_req.dict.dict_len) {
397
        /* Unserialize the dictionary */
398
        dict = dict_new();
399

400
        ret = dict_unserialize(cli_req.dict.dict_val, cli_req.dict.dict_len,
401
                               &dict);
402
        if (ret < 0) {
403
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_UNSERIALIZE_FAIL,
404
                   "failed to "
405
                   "unserialize req-buffer to dictionary");
406
            snprintf(err_str, sizeof(err_str),
407
                     "Unable to decode "
408
                     "the command");
409
            goto out;
410
        }
411
    }
412

413
    ret = dict_get_str(dict, "volname", &dup_volname);
414

415
    if (ret) {
416
        snprintf(err_str, sizeof(err_str),
417
                 "Failed to get volume "
418
                 "name");
419
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, "%s",
420
               err_str);
421
        goto out;
422
    }
423

424
    gf_msg_debug(this->name, 0,
425
                 "Received stop vol req "
426
                 "for volume %s",
427
                 dup_volname);
428

429
    if (conf->op_version < GD_OP_VERSION_4_1_0) {
430
        gf_msg_debug(this->name, 0,
431
                     "The cluster is operating at "
432
                     "version less than %d. Volume start "
433
                     "falling back to syncop framework.",
434
                     GD_OP_VERSION_4_1_0);
435
        ret = glusterd_op_begin_synctask(req, GD_OP_STOP_VOLUME, dict);
436
    } else {
437
        ret = glusterd_mgmt_v3_initiate_all_phases(req, GD_OP_STOP_VOLUME,
438
                                                   dict);
439
    }
440

441
out:
442
    free(cli_req.dict.dict_val);  // its malloced by xdr
443

444
    if (ret) {
445
        if (err_str[0] == '\0')
446
            snprintf(err_str, sizeof(err_str), "Operation failed");
447
        ret = glusterd_op_send_cli_response(cli_op, ret, 0, req, dict, err_str);
448
    }
449

450
    return ret;
451
}
452

453
int
454
glusterd_handle_cli_stop_volume(rpcsvc_request_t *req)
455
{
456
    return glusterd_big_locked_handler(req, __glusterd_handle_cli_stop_volume);
457
}
458

459
int
460
__glusterd_handle_cli_delete_volume(rpcsvc_request_t *req)
461
{
462
    int32_t ret = -1;
463
    gf_cli_req cli_req = {
464
        {
465
            0,
466
        },
467
    };
468
    glusterd_op_t cli_op = GD_OP_DELETE_VOLUME;
469
    dict_t *dict = NULL;
470
    char *volname = NULL;
471
    char err_str[64] = {
472
        0,
473
    };
474

475
    GF_ASSERT(req);
476

477
    ret = xdr_to_generic(req->msg[0], &cli_req, (xdrproc_t)xdr_gf_cli_req);
478
    if (ret < 0) {
479
        snprintf(err_str, sizeof(err_str),
480
                 "Failed to decode request "
481
                 "received from cli");
482
        gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_REQ_DECODE_FAIL, "%s",
483
               err_str);
484
        req->rpc_err = GARBAGE_ARGS;
485
        goto out;
486
    }
487

488
    if (cli_req.dict.dict_len) {
489
        /* Unserialize the dictionary */
490
        dict = dict_new();
491

492
        ret = dict_unserialize(cli_req.dict.dict_val, cli_req.dict.dict_len,
493
                               &dict);
494
        if (ret < 0) {
495
            gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_UNSERIALIZE_FAIL,
496
                   "failed to "
497
                   "unserialize req-buffer to dictionary");
498
            snprintf(err_str, sizeof(err_str),
499
                     "Unable to decode "
500
                     "the command");
501
            goto out;
502
        }
503
    }
504

505
    ret = dict_get_str(dict, "volname", &volname);
506
    if (ret) {
507
        snprintf(err_str, sizeof(err_str),
508
                 "Failed to get volume "
509
                 "name");
510
        gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, "%s",
511
               err_str);
512
        req->rpc_err = GARBAGE_ARGS;
513
        goto out;
514
    }
515

516
    gf_msg_debug(THIS->name, 0,
517
                 "Received delete vol req"
518
                 "for volume %s",
519
                 volname);
520

521
    ret = glusterd_op_begin_synctask(req, GD_OP_DELETE_VOLUME, dict);
522

523
out:
524
    free(cli_req.dict.dict_val);  // its malloced by xdr
525

526
    if (ret) {
527
        if (err_str[0] == '\0')
528
            snprintf(err_str, sizeof(err_str), "Operation failed");
529
        ret = glusterd_op_send_cli_response(cli_op, ret, 0, req, dict, err_str);
530
    }
531

532
    return ret;
533
}
534
int
535
glusterd_handle_cli_delete_volume(rpcsvc_request_t *req)
536
{
537
    return glusterd_big_locked_handler(req,
538
                                       __glusterd_handle_cli_delete_volume);
539
}
540
static int
541
glusterd_handle_heal_options_enable_disable(rpcsvc_request_t *req, dict_t *dict,
542
                                            glusterd_volinfo_t *volinfo)
543
{
544
    gf_xl_afr_op_t heal_op = GF_SHD_OP_INVALID;
545
    int ret = 0;
546
    char *key = NULL;
547
    char *value = NULL;
548

549
    ret = dict_get_int32(dict, "heal-op", (int32_t *)&heal_op);
550
    if (ret || (heal_op == GF_SHD_OP_INVALID)) {
551
        gf_smsg(THIS->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_GET_FAILED,
552
                "Key=heal-op", NULL);
553
        ret = -1;
554
        goto out;
555
    }
556

557
    if ((heal_op != GF_SHD_OP_HEAL_ENABLE) &&
558
        (heal_op != GF_SHD_OP_HEAL_DISABLE) &&
559
        (heal_op != GF_SHD_OP_GRANULAR_ENTRY_HEAL_ENABLE) &&
560
        (heal_op != GF_SHD_OP_GRANULAR_ENTRY_HEAL_DISABLE)) {
561
        ret = -EINVAL;
562
        goto out;
563
    }
564

565
    if (((heal_op == GF_SHD_OP_GRANULAR_ENTRY_HEAL_ENABLE) ||
566
         (heal_op == GF_SHD_OP_GRANULAR_ENTRY_HEAL_DISABLE)) &&
567
        (volinfo->type != GF_CLUSTER_TYPE_REPLICATE)) {
568
        ret = -1;
569
        goto out;
570
    }
571

572
    if ((heal_op == GF_SHD_OP_HEAL_ENABLE) ||
573
        (heal_op == GF_SHD_OP_GRANULAR_ENTRY_HEAL_ENABLE)) {
574
        value = "enable";
575
    } else if ((heal_op == GF_SHD_OP_HEAL_DISABLE) ||
576
               (heal_op == GF_SHD_OP_GRANULAR_ENTRY_HEAL_DISABLE)) {
577
        value = "disable";
578
    }
579

580
    if ((heal_op == GF_SHD_OP_HEAL_ENABLE) ||
581
        (heal_op == GF_SHD_OP_HEAL_DISABLE)) {
582
        key = volgen_get_shd_key(volinfo->type);
583
        if (!key) {
584
            ret = -1;
585
            goto out;
586
        }
587
    } else {
588
        key = "cluster.granular-entry-heal";
589
        ret = dict_set_int8(dict, "is-special-key", 1);
590
        if (ret) {
591
            gf_smsg(THIS->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
592
                    "Key=is-special-key", NULL);
593
            goto out;
594
        }
595
    }
596

597
    ret = dict_set_str_sizen(dict, "key1", key);
598
    if (ret) {
599
        gf_smsg(THIS->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
600
                "Key=key1", NULL);
601
        goto out;
602
    }
603

604
    ret = dict_set_str_sizen(dict, "value1", value);
605
    if (ret) {
606
        gf_smsg(THIS->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
607
                "Key=value1", NULL);
608
        goto out;
609
    }
610

611
    ret = dict_set_int32_sizen(dict, "count", 1);
612
    if (ret) {
613
        gf_smsg(THIS->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
614
                "Key=count", NULL);
615
        goto out;
616
    }
617

618
    ret = glusterd_op_begin_synctask(req, GD_OP_SET_VOLUME, dict);
619

620
out:
621
    return ret;
622
}
623

624
static int32_t
625
glusterd_add_bricks_hname_path_to_dict(dict_t *dict,
626
                                       glusterd_volinfo_t *volinfo)
627
{
628
    glusterd_brickinfo_t *brickinfo = NULL;
629
    int ret = 0;
630
    char key[64] = "";
631
    int index = 0;
632

633
    cds_list_for_each_entry(brickinfo, &volinfo->bricks, brick_list)
634
    {
635
        ret = snprintf(key, sizeof(key), "%d-hostname", index);
636
        ret = dict_set_strn(dict, key, ret, brickinfo->hostname);
637
        if (ret) {
638
            gf_smsg("glusterd", GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
639
                    "Key=%s", key, NULL);
640
            goto out;
641
        }
642

643
        ret = snprintf(key, sizeof(key), "%d-path", index);
644
        ret = dict_set_strn(dict, key, ret, brickinfo->path);
645
        if (ret) {
646
            gf_smsg("glusterd", GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
647
                    "Key=%s", key, NULL);
648
            goto out;
649
        }
650

651
        index++;
652
    }
653
out:
654
    return ret;
655
}
656

657
int
658
__glusterd_handle_cli_heal_volume(rpcsvc_request_t *req)
659
{
660
    int32_t ret = -1;
661
    gf_cli_req cli_req = {{
662
        0,
663
    }};
664
    dict_t *dict = NULL;
665
    glusterd_op_t cli_op = GD_OP_HEAL_VOLUME;
666
    char *volname = NULL;
667
    glusterd_volinfo_t *volinfo = NULL;
668
    xlator_t *this = THIS;
669
    char op_errstr[2048] = {
670
        0,
671
    };
672

673
    GF_ASSERT(req);
674

675
    ret = xdr_to_generic(req->msg[0], &cli_req, (xdrproc_t)xdr_gf_cli_req);
676
    if (ret < 0) {
677
        // failed to decode msg;
678
        req->rpc_err = GARBAGE_ARGS;
679
        gf_smsg(this->name, GF_LOG_ERROR, errno, GD_MSG_GARBAGE_ARGS, NULL);
680
        goto out;
681
    }
682

683
    if (cli_req.dict.dict_len) {
684
        /* Unserialize the dictionary */
685
        dict = dict_new();
686

687
        ret = dict_unserialize(cli_req.dict.dict_val, cli_req.dict.dict_len,
688
                               &dict);
689
        if (ret < 0) {
690
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_UNSERIALIZE_FAIL,
691
                   "failed to "
692
                   "unserialize req-buffer to dictionary");
693
            snprintf(op_errstr, sizeof(op_errstr),
694
                     "Unable to decode the command");
695
            goto out;
696
        } else {
697
            dict->extra_stdfree = cli_req.dict.dict_val;
698
        }
699
    }
700

701
    ret = dict_get_str(dict, "volname", &volname);
702
    if (ret) {
703
        snprintf(op_errstr, sizeof(op_errstr),
704
                 "Unable to find "
705
                 "volume name");
706
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, "%s",
707
               op_errstr);
708
        goto out;
709
    }
710

711
    gf_msg(this->name, GF_LOG_INFO, 0, GD_MSG_HEAL_VOL_REQ_RCVD,
712
           "Received heal vol req "
713
           "for volume %s",
714
           volname);
715

716
    ret = glusterd_volinfo_find(volname, &volinfo);
717
    if (ret) {
718
        snprintf(op_errstr, sizeof(op_errstr), "Volume %s does not exist",
719
                 volname);
720
        goto out;
721
    }
722

723
    ret = glusterd_handle_heal_options_enable_disable(req, dict, volinfo);
724
    if (ret == -EINVAL) {
725
        ret = 0;
726
    } else {
727
        /*
728
         * If the return value is -ve but not -EINVAL then the command
729
         * failed. If the return value is 0 then the synctask for the
730
         * op has begun, so in both cases just 'goto out'. If there was
731
         * a failure it will respond with an error, otherwise the
732
         * synctask will take the responsibility of sending the
733
         * response.
734
         */
735
        goto out;
736
    }
737

738
    ret = glusterd_add_bricks_hname_path_to_dict(dict, volinfo);
739
    if (ret)
740
        goto out;
741

742
    ret = dict_set_int32_sizen(dict, "count", volinfo->brick_count);
743
    if (ret) {
744
        gf_smsg(this->name, GF_LOG_ERROR, -ret, GD_MSG_DICT_SET_FAILED,
745
                "Key=count", NULL);
746
        goto out;
747
    }
748

749
    ret = glusterd_op_begin_synctask(req, GD_OP_HEAL_VOLUME, dict);
750

751
out:
752
    if (ret) {
753
        if (op_errstr[0] == '\0')
754
            snprintf(op_errstr, sizeof(op_errstr), "operation failed");
755
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_GLUSTERD_OP_FAILED, "%s",
756
               op_errstr);
757
        ret = glusterd_op_send_cli_response(cli_op, ret, 0, req, dict,
758
                                            op_errstr);
759
    }
760

761
    return ret;
762
}
763

764
int
765
glusterd_handle_cli_heal_volume(rpcsvc_request_t *req)
766
{
767
    return glusterd_big_locked_handler(req, __glusterd_handle_cli_heal_volume);
768
}
769

770
int
771
__glusterd_handle_cli_statedump_volume(rpcsvc_request_t *req)
772
{
773
    int32_t ret = -1;
774
    gf_cli_req cli_req = {{
775
        0,
776
    }};
777
    char *volname = NULL;
778
    char *options = NULL;
779
    dict_t *dict = NULL;
780
    int32_t option_cnt = 0;
781
    glusterd_op_t cli_op = GD_OP_STATEDUMP_VOLUME;
782
    char err_str[128] = {
783
        0,
784
    };
785
    glusterd_conf_t *priv = NULL;
786

787
    priv = THIS->private;
788
    GF_ASSERT(priv);
789

790
    GF_ASSERT(req);
791

792
    ret = -1;
793
    ret = xdr_to_generic(req->msg[0], &cli_req, (xdrproc_t)xdr_gf_cli_req);
794
    if (ret < 0) {
795
        req->rpc_err = GARBAGE_ARGS;
796
        gf_smsg(THIS->name, GF_LOG_ERROR, errno, GD_MSG_GARBAGE_ARGS, NULL);
797
        goto out;
798
    }
799
    if (cli_req.dict.dict_len) {
800
        /* Unserialize the dictionary */
801
        dict = dict_new();
802

803
        ret = dict_unserialize(cli_req.dict.dict_val, cli_req.dict.dict_len,
804
                               &dict);
805
        if (ret < 0) {
806
            gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_UNSERIALIZE_FAIL,
807
                   "failed to "
808
                   "unserialize req-buffer to dictionary");
809
            snprintf(err_str, sizeof(err_str),
810
                     "Unable to "
811
                     "decode the command");
812
            goto out;
813
        }
814
    }
815
    ret = dict_get_str(dict, "volname", &volname);
816
    if (ret) {
817
        snprintf(err_str, sizeof(err_str), "Unable to get the volume name");
818
        gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, "%s",
819
               err_str);
820
        goto out;
821
    }
822

823
    ret = dict_get_str(dict, "options", &options);
824
    if (ret) {
825
        snprintf(err_str, sizeof(err_str), "Unable to get options");
826
        gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, "%s",
827
               err_str);
828
        goto out;
829
    }
830

831
    ret = dict_get_int32(dict, "option_cnt", &option_cnt);
832
    if (ret) {
833
        snprintf(err_str, sizeof(err_str),
834
                 "Unable to get option "
835
                 "count");
836
        gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, "%s",
837
               err_str);
838
        goto out;
839
    }
840

841
    if (priv->op_version == GD_OP_VERSION_MIN && strstr(options, "quotad")) {
842
        snprintf(err_str, sizeof(err_str),
843
                 "The cluster is operating "
844
                 "at op-version 1. Taking quotad's statedump is "
845
                 "disallowed in this state");
846
        ret = -1;
847
        goto out;
848
    }
849

850
    gf_msg(THIS->name, GF_LOG_INFO, 0, GD_MSG_STATEDUMP_VOL_REQ_RCVD,
851
           "Received statedump request for "
852
           "volume %s with options %s",
853
           volname, options);
854

855
    ret = glusterd_op_begin_synctask(req, GD_OP_STATEDUMP_VOLUME, dict);
856

857
out:
858
    if (ret) {
859
        if (err_str[0] == '\0')
860
            snprintf(err_str, sizeof(err_str), "Operation failed");
861
        ret = glusterd_op_send_cli_response(cli_op, ret, 0, req, dict, err_str);
862
    }
863
    free(cli_req.dict.dict_val);
864

865
    return ret;
866
}
867

868
int
869
glusterd_handle_cli_statedump_volume(rpcsvc_request_t *req)
870
{
871
    return glusterd_big_locked_handler(req,
872
                                       __glusterd_handle_cli_statedump_volume);
873
}
874

875
/* op-sm */
876
int
877
glusterd_op_stage_create_volume(dict_t *dict, char **op_errstr,
878
                                dict_t *rsp_dict)
879
{
880
    int ret = 0;
881
    char *volname = NULL;
882
    char *bricks = NULL;
883
    char *brick_list = NULL;
884
    char *free_ptr = NULL;
885
    char key[64] = "";
886
    glusterd_brickinfo_t *brick_info = NULL;
887
    int32_t brick_count = 0;
888
    int32_t local_brick_count = 0;
889
    int32_t i = 0;
890
    int32_t type = 0;
891
    int32_t replica_count = 0;
892
    int32_t disperse_count = 0;
893
    char *brick = NULL;
894
    char *tmpptr = NULL;
895
    xlator_t *this = THIS;
896
    glusterd_conf_t *priv = NULL;
897
    char msg[2048] = {0};
898
    uuid_t volume_uuid;
899
    char *volume_uuid_str;
900
    gf_boolean_t is_force = _gf_false;
901
    glusterd_volinfo_t *volinfo = NULL;
902

903
    priv = this->private;
904
    GF_ASSERT(priv);
905
    GF_ASSERT(rsp_dict);
906

907
    ret = dict_get_str(dict, "volname", &volname);
908
    if (ret) {
909
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
910
               "Unable to get volume name");
911
        goto out;
912
    }
913

914
    ret = glusterd_volinfo_find(volname, &volinfo);
915
    if (!ret) {
916
        snprintf(msg, sizeof(msg), "Volume %s already exists", volname);
917
        ret = -1;
918
        goto out;
919
    }
920

921
    ret = dict_get_int32(dict, "count", &brick_count);
922
    if (ret) {
923
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
924
               "Unable to get brick count "
925
               "for volume %s",
926
               volname);
927
        goto out;
928
    }
929

930
    ret = dict_get_str(dict, "volume-id", &volume_uuid_str);
931
    if (ret) {
932
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
933
               "Unable to get volume id of "
934
               "volume %s",
935
               volname);
936
        goto out;
937
    }
938

939
    ret = gf_uuid_parse(volume_uuid_str, volume_uuid);
940
    if (ret) {
941
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_UUID_PARSE_FAIL,
942
               "Unable to parse volume id of"
943
               " volume %s",
944
               volname);
945
        goto out;
946
    }
947

948
    ret = dict_get_str(dict, "bricks", &bricks);
949
    if (ret) {
950
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
951
               "Unable to get bricks for "
952
               "volume %s",
953
               volname);
954
        goto out;
955
    }
956

957
    is_force = dict_get_str_boolean(dict, "force", _gf_false);
958

959
    if (bricks) {
960
        brick_list = gf_strdup(bricks);
961
        if (!brick_list) {
962
            ret = -1;
963
            goto out;
964
        } else {
965
            free_ptr = brick_list;
966
        }
967
    }
968

969
    /*Check brick order if the volume type is replicate or disperse. If
970
     * force at the end of command not given then check brick order.
971
     */
972
    if (is_origin_glusterd(dict)) {
973
        ret = dict_get_int32(dict, "type", &type);
974
        if (ret) {
975
            snprintf(msg, sizeof(msg),
976
                     "Unable to get type of "
977
                     "volume %s",
978
                     volname);
979
            gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_DICT_GET_FAILED, "%s",
980
                   msg);
981
            goto out;
982
        }
983

984
        if (!is_force) {
985
            if (type == GF_CLUSTER_TYPE_REPLICATE) {
986
                ret = dict_get_int32(dict, "replica-count", &replica_count);
987
                if (ret) {
988
                    gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
989
                           "Bricks check : Could"
990
                           " not retrieve replica count");
991
                    goto out;
992
                }
993
                gf_msg_debug(this->name, 0,
994
                             "Replicate cluster type "
995
                             "found. Checking brick order.");
996
                ret = glusterd_check_brick_order(dict, msg, type, &volname,
997
                                                 &bricks, &brick_count,
998
                                                 replica_count, 0);
999
            } else if (type == GF_CLUSTER_TYPE_DISPERSE) {
1000
                ret = dict_get_int32(dict, "disperse-count", &disperse_count);
1001
                if (ret) {
1002
                    gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1003
                           "Bricks check : Could"
1004
                           " not retrieve disperse count");
1005
                    goto out;
1006
                }
1007
                gf_msg_debug(this->name, 0,
1008
                             "Disperse cluster type"
1009
                             " found. Checking brick order.");
1010
                ret = glusterd_check_brick_order(dict, msg, type, &volname,
1011
                                                 &bricks, &brick_count,
1012
                                                 disperse_count, 0);
1013
            }
1014
            if (ret) {
1015
                gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BAD_BRKORDER,
1016
                       "Not creating the volume because of "
1017
                       "bad brick order. %s",
1018
                       msg);
1019
                *op_errstr = gf_strdup(msg);
1020
                goto out;
1021
            }
1022
        }
1023
    }
1024

1025
    while (i < brick_count) {
1026
        i++;
1027
        brick = strtok_r(brick_list, " \n", &tmpptr);
1028
        brick_list = tmpptr;
1029

1030
        if (!glusterd_store_is_valid_brickpath(volname, brick)) {
1031
            snprintf(msg, sizeof(msg),
1032
                     "brick path %s is too "
1033
                     "long.",
1034
                     brick);
1035
            ret = -1;
1036
            goto out;
1037
        }
1038

1039
        if (!glusterd_is_valid_volfpath(volname, brick)) {
1040
            snprintf(msg, sizeof(msg),
1041
                     "Volume file path for "
1042
                     "volume %s and brick path %s is too long.",
1043
                     volname, brick);
1044
            ret = -1;
1045
            goto out;
1046
        }
1047

1048
        ret = glusterd_brickinfo_new_from_brick(brick, &brick_info, _gf_true,
1049
                                                op_errstr);
1050
        if (ret)
1051
            goto out;
1052

1053
        ret = glusterd_new_brick_validate(brick, brick_info, msg, sizeof(msg),
1054
                                          NULL);
1055
        if (ret)
1056
            goto out;
1057

1058
        ret = glusterd_resolve_brick(brick_info);
1059
        if (ret) {
1060
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_RESOLVE_BRICK_FAIL,
1061
                   FMTSTR_RESOLVE_BRICK, brick_info->hostname,
1062
                   brick_info->path);
1063
            goto out;
1064
        }
1065

1066
        if (!gf_uuid_compare(brick_info->uuid, MY_UUID)) {
1067
            ret = glusterd_validate_and_create_brickpath(
1068
                brick_info, volume_uuid, volname, op_errstr, is_force,
1069
                _gf_false);
1070
            if (ret)
1071
                goto out;
1072

1073
            ret = glusterd_get_brick_mount_dir(
1074
                brick_info->path, brick_info->hostname, brick_info->mount_dir);
1075
            if (ret) {
1076
                gf_msg(this->name, GF_LOG_ERROR, 0,
1077
                       GD_MSG_BRICK_MOUNTDIR_GET_FAIL,
1078
                       "Failed to get brick mount_dir");
1079
                goto out;
1080
            }
1081

1082
            snprintf(key, sizeof(key), "brick%d.mount_dir", i);
1083
            ret = dict_set_dynstr_with_alloc(rsp_dict, key,
1084
                                             brick_info->mount_dir);
1085
            if (ret) {
1086
                gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
1087
                       "Failed to set %s", key);
1088
                goto out;
1089
            }
1090
            local_brick_count = i;
1091

1092
            brick_list = tmpptr;
1093
        }
1094
        glusterd_brickinfo_delete(brick_info);
1095
        brick_info = NULL;
1096
    }
1097

1098
    ret = dict_set_int32_sizen(rsp_dict, "brick_count", local_brick_count);
1099
    if (ret) {
1100
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
1101
               "Failed to set local_brick_count");
1102
        goto out;
1103
    }
1104
out:
1105
    GF_FREE(free_ptr);
1106
    if (brick_info)
1107
        glusterd_brickinfo_delete(brick_info);
1108

1109
    if (msg[0] != '\0') {
1110
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_OP_STAGE_CREATE_VOL_FAIL,
1111
               "%s", msg);
1112
        *op_errstr = gf_strdup(msg);
1113
    }
1114
    gf_msg_debug(this->name, 0, "Returning %d", ret);
1115

1116
    return ret;
1117
}
1118

1119
int
1120
glusterd_op_stop_volume_args_get(dict_t *dict, char **volname, int *flags)
1121
{
1122
    int ret = -1;
1123

1124
    if (!dict) {
1125
        gf_smsg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_INVALID_ARGUMENT, NULL);
1126
        goto out;
1127
    }
1128

1129
    if (!volname) {
1130
        gf_smsg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_INVALID_ARGUMENT, NULL);
1131
        goto out;
1132
    }
1133

1134
    if (!flags) {
1135
        gf_smsg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_INVALID_ARGUMENT, NULL);
1136
        goto out;
1137
    }
1138

1139
    ret = dict_get_str(dict, "volname", volname);
1140
    if (ret) {
1141
        gf_smsg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1142
                "Key=volname", NULL);
1143
        goto out;
1144
    }
1145

1146
    ret = dict_get_int32(dict, "flags", flags);
1147
    if (ret) {
1148
        gf_smsg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1149
                "Key=flags", NULL);
1150
        goto out;
1151
    }
1152
out:
1153
    return ret;
1154
}
1155

1156
int
1157
glusterd_op_statedump_volume_args_get(dict_t *dict, char **volname,
1158
                                      char **options, int *option_cnt)
1159
{
1160
    int ret = -1;
1161

1162
    if (!dict || !volname || !options || !option_cnt) {
1163
        gf_smsg("glusterd", GF_LOG_ERROR, errno, GD_MSG_INVALID_ARGUMENT, NULL);
1164
        goto out;
1165
    }
1166

1167
    ret = dict_get_str(dict, "volname", volname);
1168
    if (ret) {
1169
        gf_smsg("glusterd", GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1170
                "Key=volname", NULL);
1171
        goto out;
1172
    }
1173

1174
    ret = dict_get_str(dict, "options", options);
1175
    if (ret) {
1176
        gf_smsg("glusterd", GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1177
                "Key=options", NULL);
1178
        goto out;
1179
    }
1180

1181
    ret = dict_get_int32(dict, "option_cnt", option_cnt);
1182
    if (ret) {
1183
        gf_smsg("glusterd", GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1184
                "Key=option_cnt", NULL);
1185
        goto out;
1186
    }
1187

1188
out:
1189
    return ret;
1190
}
1191

1192
int
1193
glusterd_op_stage_start_volume(dict_t *dict, char **op_errstr, dict_t *rsp_dict)
1194
{
1195
    int ret = 0;
1196
    char *volname = NULL;
1197
    char key[64] = "";
1198
    int flags = 0;
1199
    int32_t brick_count = 0;
1200
    int32_t local_brick_count = 0;
1201
    glusterd_volinfo_t *volinfo = NULL;
1202
    glusterd_brickinfo_t *brickinfo = NULL;
1203
    char msg[2048] = {
1204
        0,
1205
    };
1206
    xlator_t *this = THIS;
1207
    uuid_t volume_id = {
1208
        0,
1209
    };
1210
    char volid[50] = {
1211
        0,
1212
    };
1213
    char xattr_volid[50] = {
1214
        0,
1215
    };
1216
    int32_t len = 0;
1217

1218
    GF_ASSERT(rsp_dict);
1219

1220
    ret = glusterd_op_start_volume_args_get(dict, &volname, &flags);
1221
    if (ret)
1222
        goto out;
1223

1224
    ret = glusterd_volinfo_find(volname, &volinfo);
1225
    if (ret) {
1226
        snprintf(msg, sizeof(msg), FMTSTR_CHECK_VOL_EXISTS, volname);
1227
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOLINFO_GET_FAIL,
1228
               FMTSTR_CHECK_VOL_EXISTS, volname);
1229
        goto out;
1230
    }
1231

1232
    /* This is an incremental approach to have all the volinfo objects ref
1233
     * count. The first attempt is made in volume start transaction to
1234
     * ensure it doesn't race with import volume where stale volume is
1235
     * deleted. There are multiple instances of GlusterD crashing in
1236
     * bug-948686.t because of this. Once this approach is full proof, all
1237
     * other volinfo objects will be refcounted.
1238
     */
1239
    glusterd_volinfo_ref(volinfo);
1240

1241
    ret = glusterd_validate_quorum(this, GD_OP_START_VOLUME, dict, op_errstr);
1242
    if (ret) {
1243
        gf_msg(this->name, GF_LOG_CRITICAL, 0, GD_MSG_SERVER_QUORUM_NOT_MET,
1244
               "Server quorum not met. Rejecting operation.");
1245
        goto out;
1246
    }
1247

1248
    ret = glusterd_validate_volume_id(dict, volinfo);
1249
    if (ret)
1250
        goto out;
1251

1252
    if (!(flags & GF_CLI_FLAG_OP_FORCE)) {
1253
        if (glusterd_is_volume_started(volinfo)) {
1254
            snprintf(msg, sizeof(msg),
1255
                     "Volume %s already "
1256
                     "started",
1257
                     volname);
1258
            ret = -1;
1259
            goto out;
1260
        }
1261
    }
1262

1263
    cds_list_for_each_entry(brickinfo, &volinfo->bricks, brick_list)
1264
    {
1265
        brick_count++;
1266
        ret = glusterd_resolve_brick(brickinfo);
1267
        if (ret) {
1268
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_RESOLVE_BRICK_FAIL,
1269
                   FMTSTR_RESOLVE_BRICK, brickinfo->hostname, brickinfo->path);
1270
            goto out;
1271
        }
1272

1273
        if ((gf_uuid_compare(brickinfo->uuid, MY_UUID)) ||
1274
            (brickinfo->snap_status == -1))
1275
            continue;
1276

1277
        ret = gf_lstat_dir(brickinfo->path, NULL);
1278
        if (ret && (flags & GF_CLI_FLAG_OP_FORCE)) {
1279
            continue;
1280
        } else if (ret) {
1281
            len = snprintf(msg, sizeof(msg),
1282
                           "Failed to find "
1283
                           "brick directory %s for volume %s. "
1284
                           "Reason : %s",
1285
                           brickinfo->path, volname, strerror(errno));
1286
            if (len < 0) {
1287
                strcpy(msg, "<error>");
1288
            }
1289
            goto out;
1290
        }
1291
        ret = sys_lgetxattr(brickinfo->path, GF_XATTR_VOL_ID_KEY, volume_id,
1292
                            16);
1293
        if (ret < 0 && (!(flags & GF_CLI_FLAG_OP_FORCE))) {
1294
            len = snprintf(msg, sizeof(msg),
1295
                           "Failed to get "
1296
                           "extended attribute %s for brick dir "
1297
                           "%s. Reason : %s",
1298
                           GF_XATTR_VOL_ID_KEY, brickinfo->path,
1299
                           strerror(errno));
1300
            if (len < 0) {
1301
                strcpy(msg, "<error>");
1302
            }
1303
            ret = -1;
1304
            goto out;
1305
        } else if (ret < 0) {
1306
            ret = sys_lsetxattr(brickinfo->path, GF_XATTR_VOL_ID_KEY,
1307
                                volinfo->volume_id, 16, XATTR_CREATE);
1308
            if (ret == -1) {
1309
                len = snprintf(msg, sizeof(msg),
1310
                               "Failed to "
1311
                               "set extended attribute %s on "
1312
                               "%s. Reason: %s",
1313
                               GF_XATTR_VOL_ID_KEY, brickinfo->path,
1314
                               strerror(errno));
1315
                if (len < 0) {
1316
                    strcpy(msg, "<error>");
1317
                }
1318
                goto out;
1319
            } else {
1320
                continue;
1321
            }
1322
        }
1323
        if (gf_uuid_compare(volinfo->volume_id, volume_id)) {
1324
            len = snprintf(msg, sizeof(msg),
1325
                           "Volume id "
1326
                           "mismatch for brick %s:%s. Expected "
1327
                           "volume id %s, volume id %s found",
1328
                           brickinfo->hostname, brickinfo->path,
1329
                           uuid_utoa_r(volinfo->volume_id, volid),
1330
                           uuid_utoa_r(volume_id, xattr_volid));
1331
            if (len < 0) {
1332
                strcpy(msg, "<error>");
1333
            }
1334
            ret = -1;
1335
            goto out;
1336
        }
1337

1338
        if (strlen(brickinfo->mount_dir) < 1) {
1339
            ret = glusterd_get_brick_mount_dir(
1340
                brickinfo->path, brickinfo->hostname, brickinfo->mount_dir);
1341
            if (ret) {
1342
                gf_msg(this->name, GF_LOG_ERROR, 0,
1343
                       GD_MSG_BRICK_MOUNTDIR_GET_FAIL,
1344
                       "Failed to get brick mount_dir");
1345
                goto out;
1346
            }
1347

1348
            snprintf(key, sizeof(key), "brick%d.mount_dir", brick_count);
1349
            ret = dict_set_dynstr_with_alloc(rsp_dict, key,
1350
                                             brickinfo->mount_dir);
1351
            if (ret) {
1352
                gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
1353
                       "Failed to set %s", key);
1354
                goto out;
1355
            }
1356
            local_brick_count = brick_count;
1357
        }
1358
    }
1359

1360
    ret = dict_set_int32_sizen(rsp_dict, "brick_count", local_brick_count);
1361
    if (ret) {
1362
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
1363
               "Failed to set local_brick_count");
1364
        goto out;
1365
    }
1366

1367
    ret = 0;
1368
out:
1369
    if (volinfo)
1370
        glusterd_volinfo_unref(volinfo);
1371

1372
    if (ret && (msg[0] != '\0')) {
1373
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_OP_STAGE_START_VOL_FAIL,
1374
               "%s", msg);
1375
        *op_errstr = gf_strdup(msg);
1376
    }
1377
    return ret;
1378
}
1379

1380
int
1381
glusterd_op_stage_stop_volume(dict_t *dict, char **op_errstr)
1382
{
1383
    int ret = -1;
1384
    char *volname = NULL;
1385
    int flags = 0;
1386
    glusterd_volinfo_t *volinfo = NULL;
1387
    char msg[2048] = {0};
1388
    xlator_t *this = THIS;
1389
    gsync_status_param_t param = {
1390
        0,
1391
    };
1392

1393
    ret = glusterd_op_stop_volume_args_get(dict, &volname, &flags);
1394
    if (ret) {
1395
        snprintf(msg, sizeof(msg), "Failed to get details of volume %s",
1396
                 volname);
1397
        gf_smsg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOL_STOP_ARGS_GET_FAILED,
1398
                "Volume name=%s", volname, NULL);
1399
        goto out;
1400
    }
1401

1402
    ret = glusterd_volinfo_find(volname, &volinfo);
1403
    if (ret) {
1404
        snprintf(msg, sizeof(msg), FMTSTR_CHECK_VOL_EXISTS, volname);
1405
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOLINFO_GET_FAIL, "%s", msg);
1406
        goto out;
1407
    }
1408

1409
    ret = glusterd_validate_volume_id(dict, volinfo);
1410
    if (ret)
1411
        goto out;
1412

1413
    /* If 'force' flag is given, no check is required */
1414
    if (flags & GF_CLI_FLAG_OP_FORCE)
1415
        goto out;
1416

1417
    if (_gf_false == glusterd_is_volume_started(volinfo)) {
1418
        snprintf(msg, sizeof(msg),
1419
                 "Volume %s "
1420
                 "is not in the started state",
1421
                 volname);
1422
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOL_NOT_STARTED, "%s", msg);
1423
        ret = -1;
1424
        goto out;
1425
    }
1426

1427
    /* If geo-rep is configured, for this volume, it should be stopped. */
1428
    param.volinfo = volinfo;
1429
    ret = glusterd_check_geo_rep_running(&param, op_errstr);
1430
    if (ret || param.is_active) {
1431
        ret = -1;
1432
        goto out;
1433
    }
1434

1435
    ret = glusterd_check_ganesha_export(volinfo);
1436
    if (ret) {
1437
        ret = ganesha_manage_export(dict, "off", _gf_false, op_errstr);
1438
        if (ret) {
1439
            gf_msg(this->name, GF_LOG_WARNING, 0,
1440
                   GD_MSG_NFS_GNS_UNEXPRT_VOL_FAIL,
1441
                   "Could not "
1442
                   "unexport volume via NFS-Ganesha");
1443
            ret = 0;
1444
        }
1445
    }
1446

1447
    if (glusterd_is_defrag_on(volinfo)) {
1448
        snprintf(msg, sizeof(msg),
1449
                 "rebalance session is "
1450
                 "in progress for the volume '%s'",
1451
                 volname);
1452
        gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_OIP, "%s", msg);
1453
        ret = -1;
1454
        goto out;
1455
    }
1456

1457
out:
1458
    if (msg[0] != 0)
1459
        *op_errstr = gf_strdup(msg);
1460
    gf_msg_debug(this->name, 0, "Returning %d", ret);
1461

1462
    return ret;
1463
}
1464

1465
int
1466
glusterd_op_stage_delete_volume(dict_t *dict, char **op_errstr)
1467
{
1468
    int ret = 0;
1469
    char *volname = NULL;
1470
    glusterd_volinfo_t *volinfo = NULL;
1471
    char msg[2048] = {0};
1472
    xlator_t *this = THIS;
1473

1474
    ret = dict_get_str(dict, "volname", &volname);
1475
    if (ret) {
1476
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1477
               "Unable to get volume name");
1478
        goto out;
1479
    }
1480

1481
    ret = glusterd_volinfo_find(volname, &volinfo);
1482
    if (ret) {
1483
        snprintf(msg, sizeof(msg), FMTSTR_CHECK_VOL_EXISTS, volname);
1484
        goto out;
1485
    }
1486

1487
    ret = glusterd_validate_volume_id(dict, volinfo);
1488
    if (ret)
1489
        goto out;
1490

1491
    if (glusterd_is_volume_started(volinfo)) {
1492
        snprintf(msg, sizeof(msg),
1493
                 "Volume %s has been started."
1494
                 "Volume needs to be stopped before deletion.",
1495
                 volname);
1496
        ret = -1;
1497
        goto out;
1498
    }
1499

1500
    if (volinfo->snap_count > 0 || !cds_list_empty(&volinfo->snap_volumes)) {
1501
        snprintf(msg, sizeof(msg),
1502
                 "Cannot delete Volume %s ,"
1503
                 "as it has %" PRIu64
1504
                 " snapshots. "
1505
                 "To delete the volume, "
1506
                 "first delete all the snapshots under it.",
1507
                 volname, volinfo->snap_count);
1508
        ret = -1;
1509
        goto out;
1510
    }
1511

1512
    if (!glusterd_are_all_peers_up()) {
1513
        ret = -1;
1514
        snprintf(msg, sizeof(msg), "Some of the peers are down");
1515
        goto out;
1516
    }
1517
    volinfo->stage_deleted = _gf_true;
1518
    gf_log(this->name, GF_LOG_INFO,
1519
           "Setting stage deleted flag to true for "
1520
           "volume %s",
1521
           volinfo->volname);
1522
    ret = 0;
1523

1524
out:
1525
    if (msg[0] != '\0') {
1526
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_OP_STAGE_DELETE_VOL_FAIL,
1527
               "%s", msg);
1528
        *op_errstr = gf_strdup(msg);
1529
    }
1530
    gf_msg_debug(this->name, 0, "Returning %d", ret);
1531

1532
    return ret;
1533
}
1534

1535
static int
1536
glusterd_handle_heal_cmd(xlator_t *this, glusterd_volinfo_t *volinfo,
1537
                         dict_t *dict, char **op_errstr)
1538
{
1539
    glusterd_svc_t *svc = NULL;
1540
    gf_xl_afr_op_t heal_op = GF_SHD_OP_INVALID;
1541
    int ret = 0;
1542
    char msg[2408] = {
1543
        0,
1544
    };
1545
    char *offline_msg =
1546
        "Self-heal daemon is not running. "
1547
        "Check self-heal daemon log file.";
1548

1549
    ret = dict_get_int32(dict, "heal-op", (int32_t *)&heal_op);
1550
    if (ret) {
1551
        ret = -1;
1552
        *op_errstr = gf_strdup("Heal operation not specified");
1553
        goto out;
1554
    }
1555

1556
    svc = &(volinfo->shd.svc);
1557
    switch (heal_op) {
1558
        case GF_SHD_OP_INVALID:
1559
        case GF_SHD_OP_HEAL_ENABLE: /* This op should be handled in volume-set*/
1560
        case GF_SHD_OP_HEAL_DISABLE: /* This op should be handled in
1561
                                        volume-set*/
1562
        case GF_SHD_OP_GRANULAR_ENTRY_HEAL_ENABLE:  /* This op should be handled
1563
                                                       in volume-set */
1564
        case GF_SHD_OP_GRANULAR_ENTRY_HEAL_DISABLE: /* This op should be handled
1565
                                                       in volume-set */
1566
        case GF_SHD_OP_HEAL_SUMMARY:                /*glfsheal cmd*/
1567
        case GF_SHD_OP_SBRAIN_HEAL_FROM_BIGGER_FILE:  /*glfsheal cmd*/
1568
        case GF_SHD_OP_SBRAIN_HEAL_FROM_LATEST_MTIME: /*glfsheal cmd*/
1569
        case GF_SHD_OP_SBRAIN_HEAL_FROM_BRICK:        /*glfsheal cmd*/
1570
            ret = -1;
1571
            *op_errstr = gf_strdup("Invalid heal-op");
1572
            goto out;
1573

1574
        case GF_SHD_OP_HEAL_INDEX:
1575
        case GF_SHD_OP_HEAL_FULL:
1576
            if (!glusterd_is_shd_compatible_volume(volinfo)) {
1577
                ret = -1;
1578
                snprintf(msg, sizeof(msg),
1579
                         "Volume %s is not of type "
1580
                         "replicate or disperse",
1581
                         volinfo->volname);
1582
                *op_errstr = gf_strdup(msg);
1583
                goto out;
1584
            }
1585

1586
            if (!svc->online) {
1587
                ret = -1;
1588
                *op_errstr = gf_strdup(offline_msg);
1589
                goto out;
1590
            }
1591
            break;
1592
        case GF_SHD_OP_INDEX_SUMMARY:
1593
        case GF_SHD_OP_SPLIT_BRAIN_FILES:
1594
        case GF_SHD_OP_STATISTICS:
1595
        case GF_SHD_OP_STATISTICS_HEAL_COUNT:
1596
        case GF_SHD_OP_STATISTICS_HEAL_COUNT_PER_REPLICA:
1597
            if (!glusterd_is_volume_replicate(volinfo)) {
1598
                ret = -1;
1599
                snprintf(msg, sizeof(msg),
1600
                         "This command is supported "
1601
                         "for only volume of replicated "
1602
                         "type. Volume %s is not of type "
1603
                         "replicate",
1604
                         volinfo->volname);
1605
                *op_errstr = gf_strdup(msg);
1606
                goto out;
1607
            }
1608

1609
            if (!svc->online) {
1610
                ret = -1;
1611
                *op_errstr = gf_strdup(offline_msg);
1612
                goto out;
1613
            }
1614
            break;
1615
        case GF_SHD_OP_HEALED_FILES:
1616
        case GF_SHD_OP_HEAL_FAILED_FILES:
1617
            ret = -1;
1618
            snprintf(msg, sizeof(msg),
1619
                     "Command not supported. "
1620
                     "Please use \"gluster volume heal %s info\" "
1621
                     "and logs to find the heal information.",
1622
                     volinfo->volname);
1623
            *op_errstr = gf_strdup(msg);
1624
            goto out;
1625
    }
1626
out:
1627
    if (ret)
1628
        gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_HANDLE_HEAL_CMD_FAIL, "%s",
1629
               *op_errstr);
1630
    return ret;
1631
}
1632

1633
int
1634
glusterd_op_stage_heal_volume(dict_t *dict, char **op_errstr)
1635
{
1636
    int ret = 0;
1637
    char *volname = NULL;
1638
    gf_boolean_t enabled = _gf_false;
1639
    glusterd_volinfo_t *volinfo = NULL;
1640
    char msg[2048];
1641
    glusterd_conf_t *priv = NULL;
1642
    dict_t *opt_dict = NULL;
1643
    xlator_t *this = THIS;
1644

1645
    priv = this->private;
1646
    if (!priv) {
1647
        ret = -1;
1648
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_PRIV_NULL, "priv is NULL");
1649
        goto out;
1650
    }
1651

1652
    ret = dict_get_str(dict, "volname", &volname);
1653
    if (ret) {
1654
        gf_msg("glusterd", GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1655
               "Unable to get volume name");
1656
        goto out;
1657
    }
1658

1659
    ret = glusterd_volinfo_find(volname, &volinfo);
1660
    if (ret) {
1661
        ret = -1;
1662
        snprintf(msg, sizeof(msg), "Volume %s does not exist", volname);
1663
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOL_NOT_FOUND, "%s", msg);
1664
        *op_errstr = gf_strdup(msg);
1665
        goto out;
1666
    }
1667

1668
    ret = glusterd_validate_volume_id(dict, volinfo);
1669
    if (ret)
1670
        goto out;
1671

1672
    if (!glusterd_is_volume_started(volinfo)) {
1673
        ret = -1;
1674
        snprintf(msg, sizeof(msg), "Volume %s is not started.", volname);
1675
        gf_smsg(this->name, GF_LOG_WARNING, 0, GD_MSG_VOL_NOT_STARTED,
1676
                "Volume=%s", volname, NULL);
1677
        *op_errstr = gf_strdup(msg);
1678
        goto out;
1679
    }
1680

1681
    opt_dict = volinfo->dict;
1682
    if (!opt_dict) {
1683
        gf_smsg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, NULL);
1684
        ret = 0;
1685
        goto out;
1686
    }
1687
    enabled = gd_is_self_heal_enabled(volinfo, opt_dict);
1688
    if (!enabled) {
1689
        ret = -1;
1690
        snprintf(msg, sizeof(msg),
1691
                 "Self-heal-daemon is "
1692
                 "disabled. Heal will not be triggered on volume %s",
1693
                 volname);
1694
        gf_msg(this->name, GF_LOG_WARNING, 0, GD_MSG_SELF_HEALD_DISABLED, "%s",
1695
               msg);
1696
        *op_errstr = gf_strdup(msg);
1697
        goto out;
1698
    }
1699

1700
    ret = glusterd_handle_heal_cmd(this, volinfo, dict, op_errstr);
1701
    if (ret)
1702
        goto out;
1703

1704
    ret = 0;
1705
out:
1706
    gf_msg_debug("glusterd", 0, "Returning %d", ret);
1707

1708
    return ret;
1709
}
1710

1711
int
1712
glusterd_op_stage_statedump_volume(dict_t *dict, char **op_errstr)
1713
{
1714
    int ret = -1;
1715
    char *volname = NULL;
1716
    char *options = NULL;
1717
    int option_cnt = 0;
1718
    gf_boolean_t is_running = _gf_false;
1719
    glusterd_volinfo_t *volinfo = NULL;
1720
    char msg[2408] = {
1721
        0,
1722
    };
1723
    xlator_t *this = THIS;
1724
    glusterd_conf_t *priv = NULL;
1725

1726
    priv = this->private;
1727
    GF_ASSERT(priv);
1728

1729
    ret = glusterd_op_statedump_volume_args_get(dict, &volname, &options,
1730
                                                &option_cnt);
1731
    if (ret)
1732
        goto out;
1733

1734
    ret = glusterd_volinfo_find(volname, &volinfo);
1735
    if (ret) {
1736
        snprintf(msg, sizeof(msg), FMTSTR_CHECK_VOL_EXISTS, volname);
1737
        gf_smsg(this->name, GF_LOG_ERROR, errno, GD_MSG_VOLINFO_GET_FAIL,
1738
                "Volume=%s", volname, NULL);
1739
        goto out;
1740
    }
1741

1742
    ret = glusterd_validate_volume_id(dict, volinfo);
1743
    if (ret)
1744
        goto out;
1745

1746
    is_running = glusterd_is_volume_started(volinfo);
1747
    if (!is_running) {
1748
        snprintf(msg, sizeof(msg),
1749
                 "Volume %s is not in the started"
1750
                 " state",
1751
                 volname);
1752
        ret = -1;
1753
        goto out;
1754
    }
1755

1756
    if (priv->op_version == GD_OP_VERSION_MIN && strstr(options, "quotad")) {
1757
        snprintf(msg, sizeof(msg),
1758
                 "The cluster is operating "
1759
                 "at op-version 1. Taking quotad's statedump is "
1760
                 "disallowed in this state");
1761
        ret = -1;
1762
        goto out;
1763
    }
1764
    if ((strstr(options, "quotad")) &&
1765
        (!glusterd_is_volume_quota_enabled(volinfo))) {
1766
        snprintf(msg, sizeof(msg),
1767
                 "Quota is not enabled on "
1768
                 "volume %s",
1769
                 volname);
1770
        ret = -1;
1771
        goto out;
1772
    }
1773
out:
1774
    if (ret && msg[0] != '\0')
1775
        *op_errstr = gf_strdup(msg);
1776
    gf_msg_debug(this->name, 0, "Returning %d", ret);
1777
    return ret;
1778
}
1779

1780
int
1781
glusterd_op_stage_clearlocks_volume(dict_t *dict, char **op_errstr)
1782
{
1783
    int ret = -1;
1784
    char *volname = NULL;
1785
    char *path = NULL;
1786
    char *type = NULL;
1787
    char *kind = NULL;
1788
    glusterd_volinfo_t *volinfo = NULL;
1789
    char msg[2048] = {
1790
        0,
1791
    };
1792

1793
    ret = dict_get_str(dict, "volname", &volname);
1794
    if (ret) {
1795
        snprintf(msg, sizeof(msg), "Failed to get volume name");
1796
        gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, "%s", msg);
1797
        *op_errstr = gf_strdup(msg);
1798
        goto out;
1799
    }
1800

1801
    ret = dict_get_str(dict, "path", &path);
1802
    if (ret) {
1803
        snprintf(msg, sizeof(msg), "Failed to get path");
1804
        gf_msg(THIS->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, "%s", msg);
1805
        *op_errstr = gf_strdup(msg);
1806
        goto out;
1807
    }
1808

1809
    ret = dict_get_str(dict, "kind", &kind);
1810
    if (ret) {
1811
        snprintf(msg, sizeof(msg), "Failed to get kind");
1812
        gf_msg("glusterd", GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, "%s", msg);
1813
        *op_errstr = gf_strdup(msg);
1814
        goto out;
1815
    }
1816

1817
    ret = dict_get_str(dict, "type", &type);
1818
    if (ret) {
1819
        snprintf(msg, sizeof(msg), "Failed to get type");
1820
        gf_msg("glusterd", GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, "%s", msg);
1821
        *op_errstr = gf_strdup(msg);
1822
        goto out;
1823
    }
1824

1825
    ret = glusterd_volinfo_find(volname, &volinfo);
1826
    if (ret) {
1827
        snprintf(msg, sizeof(msg), "Volume %s does not exist", volname);
1828
        gf_msg("glusterd", GF_LOG_ERROR, 0, GD_MSG_VOL_NOT_FOUND, "%s", msg);
1829
        *op_errstr = gf_strdup(msg);
1830
        goto out;
1831
    }
1832

1833
    ret = glusterd_validate_volume_id(dict, volinfo);
1834
    if (ret)
1835
        goto out;
1836

1837
    if (!glusterd_is_volume_started(volinfo)) {
1838
        snprintf(msg, sizeof(msg), "Volume %s is not started", volname);
1839
        gf_msg("glusterd", GF_LOG_ERROR, 0, GD_MSG_VOL_NOT_STARTED, "%s", msg);
1840
        *op_errstr = gf_strdup(msg);
1841
        goto out;
1842
    }
1843

1844
    ret = 0;
1845
out:
1846
    gf_msg_debug("glusterd", 0, "Returning %d", ret);
1847
    return ret;
1848
}
1849

1850
int
1851
glusterd_op_create_volume(dict_t *dict, char **op_errstr)
1852
{
1853
    int ret = 0;
1854
    char *volname = NULL;
1855
    glusterd_conf_t *priv = NULL;
1856
    glusterd_volinfo_t *volinfo = NULL;
1857
    gf_boolean_t vol_added = _gf_false;
1858
    glusterd_brickinfo_t *brickinfo = NULL;
1859
    glusterd_brickinfo_t *ta_brickinfo = NULL;
1860
    xlator_t *this = THIS;
1861
    char *brick = NULL;
1862
    char *ta_brick = NULL;
1863
    int32_t count = 0;
1864
    int32_t i = 1;
1865
    char *bricks = NULL;
1866
    char *ta_bricks = NULL;
1867
    char *brick_list = NULL;
1868
    char *ta_brick_list = NULL;
1869
    char *free_ptr = NULL;
1870
    char *ta_free_ptr = NULL;
1871
    char *saveptr = NULL;
1872
    char *ta_saveptr = NULL;
1873
    char *trans_type = NULL;
1874
    char *str = NULL;
1875
    char *username = NULL;
1876
    char *password = NULL;
1877
    int brickid = 0;
1878
    char msg[1024] __attribute__((unused)) = {
1879
        0,
1880
    };
1881
    char *brick_mount_dir = NULL;
1882
    char key[64] = "";
1883
    char *address_family_str = NULL;
1884
    struct statvfs brickstat = {
1885
        0,
1886
    };
1887

1888
    priv = this->private;
1889
    GF_ASSERT(priv);
1890

1891
    ret = glusterd_volinfo_new(&volinfo);
1892

1893
    if (ret) {
1894
        gf_msg(this->name, GF_LOG_ERROR, ENOMEM, GD_MSG_NO_MEMORY,
1895
               "Unable to allocate memory for volinfo");
1896
        goto out;
1897
    }
1898

1899
    ret = dict_get_str(dict, "volname", &volname);
1900

1901
    if (ret) {
1902
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1903
               "Unable to get volume name");
1904
        goto out;
1905
    }
1906

1907
    if (snprintf(volinfo->volname, sizeof(volinfo->volname), "%s", volname) >=
1908
        sizeof(volinfo->volname)) {
1909
        ret = -1;
1910
        goto out;
1911
    }
1912

1913
    GF_ASSERT(volinfo->volname);
1914

1915
    ret = dict_get_int32(dict, "type", &volinfo->type);
1916
    if (ret) {
1917
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1918
               "Unable to get type of volume"
1919
               " %s",
1920
               volname);
1921
        goto out;
1922
    }
1923

1924
    ret = dict_get_int32(dict, "count", &volinfo->brick_count);
1925
    if (ret) {
1926
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1927
               "Unable to get brick count of"
1928
               " volume %s",
1929
               volname);
1930
        goto out;
1931
    }
1932

1933
    ret = dict_get_int32(dict, "port", &volinfo->port);
1934
    if (ret) {
1935
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1936
               "Unable to get port");
1937
        goto out;
1938
    }
1939

1940
    ret = dict_get_str(dict, "bricks", &bricks);
1941
    if (ret) {
1942
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1943
               "Unable to get bricks for "
1944
               "volume %s",
1945
               volname);
1946
        goto out;
1947
    }
1948

1949
    /* replica-count 1 means, no replication, file is in one brick only */
1950
    volinfo->replica_count = 1;
1951

1952
    if (GF_CLUSTER_TYPE_REPLICATE == volinfo->type) {
1953
        /* performance.client-io-threads is turned on to default,
1954
         * however this has adverse effects on replicate volumes due to
1955
         * replication design issues, till that get addressed
1956
         * performance.client-io-threads option is turned off for all
1957
         * replicate volumes
1958
         */
1959
        if (priv->op_version >= GD_OP_VERSION_3_12_2) {
1960
            ret = dict_set_sizen_str_sizen(
1961
                volinfo->dict, "performance.client-io-threads", "off");
1962
            if (ret) {
1963
                gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
1964
                       "Failed to set "
1965
                       "performance.client-io-threads to off");
1966
                goto out;
1967
            }
1968
        }
1969
        ret = dict_get_int32(dict, "replica-count", &volinfo->replica_count);
1970
        if (ret) {
1971
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1972
                   "Failed to get "
1973
                   "replica count for volume %s",
1974
                   volname);
1975
            goto out;
1976
        }
1977

1978
        /* coverity[unused_value] arbiter count is optional */
1979
        ret = dict_get_int32(dict, "arbiter-count", &volinfo->arbiter_count);
1980
        ret = dict_get_int32(dict, "thin-arbiter-count",
1981
                             &volinfo->thin_arbiter_count);
1982
        if (volinfo->thin_arbiter_count) {
1983
            ret = dict_get_str(dict, "ta-brick", &ta_bricks);
1984
            if (ret) {
1985
                gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1986
                       "Unable to get thin arbiter brick for "
1987
                       "volume %s",
1988
                       volname);
1989
                goto out;
1990
            }
1991
        }
1992

1993
    } else if (GF_CLUSTER_TYPE_DISPERSE == volinfo->type) {
1994
        ret = dict_get_int32(dict, "disperse-count", &volinfo->disperse_count);
1995
        if (ret) {
1996
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
1997
                   "Failed to get "
1998
                   "disperse count for volume %s",
1999
                   volname);
2000
            goto out;
2001
        }
2002
        ret = dict_get_int32(dict, "redundancy-count",
2003
                             &volinfo->redundancy_count);
2004
        if (ret) {
2005
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
2006
                   "Failed to get "
2007
                   "redundancy count for volume %s",
2008
                   volname);
2009
            goto out;
2010
        }
2011
    }
2012

2013
    /* dist-leaf-count is the count of brick nodes for a given
2014
       subvolume of distribute */
2015
    volinfo->dist_leaf_count = glusterd_get_dist_leaf_count(volinfo);
2016

2017
    /* subvol_count is the count of number of subvolumes present
2018
       for a given distribute volume */
2019
    volinfo->subvol_count = (volinfo->brick_count / volinfo->dist_leaf_count);
2020

2021
    /* Keep sub-count same as earlier, for the sake of backward
2022
       compatibility */
2023
    if (volinfo->dist_leaf_count > 1)
2024
        volinfo->sub_count = volinfo->dist_leaf_count;
2025

2026
    ret = dict_get_str(dict, "transport", &trans_type);
2027
    if (ret) {
2028
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
2029
               "Unable to get transport type of volume %s", volname);
2030
        goto out;
2031
    }
2032

2033
    ret = dict_get_str(dict, "volume-id", &str);
2034
    if (ret) {
2035
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
2036
               "Unable to get volume-id of volume %s", volname);
2037
        goto out;
2038
    }
2039
    ret = gf_uuid_parse(str, volinfo->volume_id);
2040
    if (ret) {
2041
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_UUID_PARSE_FAIL,
2042
               "unable to parse uuid %s of volume %s", str, volname);
2043
        goto out;
2044
    }
2045

2046
    ret = dict_get_str(dict, "internal-username", &username);
2047
    if (ret) {
2048
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
2049
               "unable to get internal username of volume %s", volname);
2050
        goto out;
2051
    }
2052
    glusterd_auth_set_username(volinfo, username);
2053

2054
    ret = dict_get_str(dict, "internal-password", &password);
2055
    if (ret) {
2056
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
2057
               "unable to get internal password of volume %s", volname);
2058
        goto out;
2059
    }
2060
    glusterd_auth_set_password(volinfo, password);
2061

2062
    if (strcasecmp(trans_type, "rdma") == 0) {
2063
        volinfo->transport_type = GF_TRANSPORT_RDMA;
2064
    } else if (strcasecmp(trans_type, "tcp") == 0) {
2065
        volinfo->transport_type = GF_TRANSPORT_TCP;
2066
    } else {
2067
        volinfo->transport_type = GF_TRANSPORT_BOTH_TCP_RDMA;
2068
    }
2069

2070
    if (ta_bricks) {
2071
        ta_brick_list = gf_strdup(ta_bricks);
2072
        ta_free_ptr = ta_brick_list;
2073
    }
2074

2075
    if (volinfo->thin_arbiter_count) {
2076
        ta_brick = strtok_r(ta_brick_list + 1, " \n", &ta_saveptr);
2077

2078
        count = 1;
2079
        brickid = volinfo->replica_count;
2080
        /* assign brickid to ta_bricks
2081
         * Following loop runs for number of subvols times. Although
2082
         * there is only one ta-brick for a volume but the volume fuse volfile
2083
         * requires an entry of ta-brick for each subvolume. Also, the ta-brick
2084
         * id needs to be adjusted according to the subvol count.
2085
         * For eg- For first subvolume ta-brick id is volname-ta-2, for second
2086
         * subvol ta-brick id is volname-ta-5.
2087
         */
2088
        while (count <= volinfo->subvol_count) {
2089
            ret = glusterd_brickinfo_new_from_brick(ta_brick, &ta_brickinfo,
2090
                                                    _gf_false, op_errstr);
2091
            if (ret)
2092
                goto out;
2093

2094
            GLUSTERD_ASSIGN_BRICKID_TO_TA_BRICKINFO(ta_brickinfo, volinfo,
2095
                                                    brickid);
2096
            cds_list_add_tail(&ta_brickinfo->brick_list, &volinfo->ta_bricks);
2097
            count++;
2098
            brickid += volinfo->replica_count + 1;
2099
        }
2100
    }
2101

2102
    if (bricks) {
2103
        brick_list = gf_strdup(bricks);
2104
        free_ptr = brick_list;
2105
    }
2106

2107
    count = volinfo->brick_count;
2108

2109
    if (count)
2110
        brick = strtok_r(brick_list + 1, " \n", &saveptr);
2111

2112
    brickid = glusterd_get_next_available_brickid(volinfo);
2113
    if (brickid < 0)
2114
        goto out;
2115
    while (i <= count) {
2116
        ret = glusterd_brickinfo_new_from_brick(brick, &brickinfo, _gf_true,
2117
                                                op_errstr);
2118
        if (ret)
2119
            goto out;
2120
        if (volinfo->thin_arbiter_count == 1 &&
2121
            (brickid + 1) % (volinfo->replica_count + 1) == 0) {
2122
            brickid = brickid + 1;
2123
        }
2124
        GLUSTERD_ASSIGN_BRICKID_TO_BRICKINFO(brickinfo, volinfo, brickid++);
2125

2126
        ret = glusterd_resolve_brick(brickinfo);
2127
        if (ret) {
2128
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_RESOLVE_BRICK_FAIL,
2129
                   FMTSTR_RESOLVE_BRICK, brickinfo->hostname, brickinfo->path);
2130
            goto out;
2131
        }
2132

2133
        brick_mount_dir = NULL;
2134
        ret = snprintf(key, sizeof(key), "brick%d.mount_dir", i);
2135
        ret = dict_get_strn(dict, key, ret, &brick_mount_dir);
2136
        if (ret) {
2137
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
2138
                   "%s not present", key);
2139
            goto out;
2140
        }
2141
        snprintf(brickinfo->mount_dir, sizeof(brickinfo->mount_dir), "%s",
2142
                 brick_mount_dir);
2143

2144
        if (!gf_uuid_compare(brickinfo->uuid, MY_UUID)) {
2145
            ret = sys_statvfs(brickinfo->path, &brickstat);
2146
            if (ret) {
2147
                gf_log("brick-op", GF_LOG_ERROR,
2148
                       "Failed to fetch disk"
2149
                       " utilization from the brick (%s:%s). Please "
2150
                       "check health of the brick. Error code was %s",
2151
                       brickinfo->hostname, brickinfo->path, strerror(errno));
2152
                goto out;
2153
            }
2154
            brickinfo->statfs_fsid = brickstat.f_fsid;
2155
        }
2156

2157
        cds_list_add_tail(&brickinfo->brick_list, &volinfo->bricks);
2158
        brick = strtok_r(NULL, " \n", &saveptr);
2159
        i++;
2160
    }
2161

2162
    ret = glusterd_enable_default_options(volinfo, NULL);
2163
    if (ret) {
2164
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_FAIL_DEFAULT_OPT_SET,
2165
               "Failed to set default "
2166
               "options on create for volume %s",
2167
               volinfo->volname);
2168
        goto out;
2169
    }
2170

2171
    ret = dict_get_str(dict, "transport.address-family", &address_family_str);
2172
    if (!ret) {
2173
        ret = dict_set_dynstr_with_alloc(
2174
            volinfo->dict, "transport.address-family", address_family_str);
2175
        if (ret) {
2176
            gf_log(this->name, GF_LOG_ERROR,
2177
                   "Failed to set transport.address-family for %s",
2178
                   volinfo->volname);
2179
            goto out;
2180
        }
2181
    }
2182

2183
    gd_update_volume_op_versions(volinfo);
2184

2185
    ret = glusterd_store_volinfo(volinfo, GLUSTERD_VOLINFO_VER_AC_INCREMENT);
2186
    if (ret) {
2187
        glusterd_store_delete_volume(volinfo);
2188
        *op_errstr = gf_strdup(
2189
            "Failed to store the "
2190
            "Volume information");
2191
        goto out;
2192
    }
2193

2194
    ret = glusterd_create_volfiles_and_notify_services(volinfo);
2195
    if (ret) {
2196
        *op_errstr = gf_strdup("Failed to create volume files");
2197
        goto out;
2198
    }
2199

2200
    volinfo->rebal.defrag_status = 0;
2201
    glusterd_list_add_order(&volinfo->vol_list, &priv->volumes,
2202
                            glusterd_compare_volume_name);
2203
    vol_added = _gf_true;
2204

2205
out:
2206
    GF_FREE(free_ptr);
2207
    GF_FREE(ta_free_ptr);
2208
    if (!vol_added && volinfo)
2209
        glusterd_volinfo_unref(volinfo);
2210
    return ret;
2211
}
2212

2213
int
2214
glusterd_start_volume(glusterd_volinfo_t *volinfo, int flags, gf_boolean_t wait)
2215

2216
{
2217
    int ret = 0;
2218
    glusterd_brickinfo_t *brickinfo = NULL;
2219
    xlator_t *this = THIS;
2220
    glusterd_volinfo_ver_ac_t verincrement = 0;
2221

2222
    GF_ASSERT(volinfo);
2223

2224
    cds_list_for_each_entry(brickinfo, &volinfo->bricks, brick_list)
2225
    {
2226
        /* Mark start_triggered to false so that in case if this brick
2227
         * was brought down through gf_attach utility, the
2228
         * brickinfo->start_triggered wouldn't have been updated to
2229
         * _gf_false
2230
         */
2231
        if (flags & GF_CLI_FLAG_OP_FORCE) {
2232
            brickinfo->start_triggered = _gf_false;
2233
        }
2234
        ret = glusterd_brick_start(volinfo, brickinfo, wait, _gf_false);
2235
        /* If 'force' try to start all bricks regardless of success or
2236
         * failure
2237
         */
2238
        if (!(flags & GF_CLI_FLAG_OP_FORCE) && ret)
2239
            goto out;
2240
    }
2241

2242
    /* Increment the volinfo version only if there is a
2243
     * change in status. Force option can be used to start
2244
     * dead bricks even if the volume is in started state.
2245
     * In such case volume status will be GLUSTERD_STATUS_STARTED.
2246
     * Therefore we should not increment the volinfo version.*/
2247
    if (GLUSTERD_STATUS_STARTED != volinfo->status) {
2248
        verincrement = GLUSTERD_VOLINFO_VER_AC_INCREMENT;
2249
    } else {
2250
        verincrement = GLUSTERD_VOLINFO_VER_AC_NONE;
2251
    }
2252

2253
    glusterd_set_volume_status(volinfo, GLUSTERD_STATUS_STARTED);
2254
    /* Update volinfo on disk in critical section because
2255
       attach_brick_callback can also call store_volinfo for same
2256
       volume to update volinfo on disk
2257
    */
2258
    /* coverity[ORDER_REVERSAL] */
2259
    LOCK(&volinfo->lock);
2260
    ret = glusterd_store_volinfo(volinfo, verincrement);
2261
    UNLOCK(&volinfo->lock);
2262
    if (ret) {
2263
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOLINFO_SET_FAIL,
2264
               "Failed to store volinfo of "
2265
               "%s volume",
2266
               volinfo->volname);
2267
        goto out;
2268
    }
2269
out:
2270
    gf_msg_trace(this->name, 0, "returning %d ", ret);
2271
    return ret;
2272
}
2273

2274
int
2275
glusterd_op_start_volume(dict_t *dict, char **op_errstr)
2276
{
2277
    int ret = 0;
2278
    int32_t brick_count = 0;
2279
    char *brick_mount_dir = NULL;
2280
    char key[64] = "";
2281
    char *volname = NULL;
2282
    int flags = 0;
2283
    glusterd_volinfo_t *volinfo = NULL;
2284
    glusterd_brickinfo_t *brickinfo = NULL;
2285
    xlator_t *this = THIS;
2286
    glusterd_conf_t *conf = NULL;
2287
    glusterd_svc_t *svc = NULL;
2288
    char *str = NULL;
2289
    gf_boolean_t option = _gf_false;
2290

2291
    conf = this->private;
2292
    GF_ASSERT(conf);
2293

2294
    ret = glusterd_op_start_volume_args_get(dict, &volname, &flags);
2295
    if (ret)
2296
        goto out;
2297

2298
    ret = glusterd_volinfo_find(volname, &volinfo);
2299
    if (ret) {
2300
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOL_NOT_FOUND,
2301
               FMTSTR_CHECK_VOL_EXISTS, volname);
2302
        goto out;
2303
    }
2304

2305
    /* This is an incremental approach to have all the volinfo objects ref
2306
     * count. The first attempt is made in volume start transaction to
2307
     * ensure it doesn't race with import volume where stale volume is
2308
     * deleted. There are multiple instances of GlusterD crashing in
2309
     * bug-948686.t because of this. Once this approach is full proof, all
2310
     * other volinfo objects will be refcounted.
2311
     */
2312
    glusterd_volinfo_ref(volinfo);
2313

2314
    cds_list_for_each_entry(brickinfo, &volinfo->bricks, brick_list)
2315
    {
2316
        brick_count++;
2317
        /* Don't check bricks that are not owned by you
2318
         */
2319
        if (gf_uuid_compare(brickinfo->uuid, MY_UUID))
2320
            continue;
2321
        if (strlen(brickinfo->mount_dir) < 1) {
2322
            brick_mount_dir = NULL;
2323
            ret = snprintf(key, sizeof(key), "brick%d.mount_dir", brick_count);
2324
            ret = dict_get_strn(dict, key, ret, &brick_mount_dir);
2325
            if (ret) {
2326
                gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
2327
                       "%s not present", key);
2328
                goto out;
2329
            }
2330
            if (snprintf(brickinfo->mount_dir, sizeof(brickinfo->mount_dir),
2331
                         "%s",
2332
                         brick_mount_dir) >= sizeof(brickinfo->mount_dir)) {
2333
                ret = -1;
2334
                goto out;
2335
            }
2336
        }
2337
    }
2338

2339
    ret = dict_get_str(conf->opts, GLUSTERD_STORE_KEY_GANESHA_GLOBAL, &str);
2340
    if (ret != 0) {
2341
        gf_msg(this->name, GF_LOG_INFO, 0, GD_MSG_DICT_GET_FAILED,
2342
               "Global dict not present.");
2343
        ret = 0;
2344

2345
    } else {
2346
        ret = gf_string2boolean(str, &option);
2347
        /* Check if the feature is enabled and set nfs-disable to true */
2348
        if (option) {
2349
            gf_msg_debug(this->name, 0, "NFS-Ganesha is enabled");
2350
            /* Gluster-nfs should not start when NFS-Ganesha is enabled*/
2351
            ret = dict_set_str(volinfo->dict, NFS_DISABLE_MAP_KEY, "on");
2352
            if (ret) {
2353
                gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
2354
                       "Failed to set nfs.disable for"
2355
                       "volume %s",
2356
                       volname);
2357
                goto out;
2358
            }
2359
        }
2360
    }
2361

2362
    ret = glusterd_start_volume(volinfo, flags, _gf_true);
2363
    if (ret)
2364
        goto out;
2365

2366
    if (!volinfo->is_snap_volume) {
2367
        svc = &(volinfo->snapd.svc);
2368
        ret = svc->manager(svc, volinfo, PROC_START_NO_WAIT);
2369
        if (ret)
2370
            goto out;
2371
    }
2372

2373
    svc = &(volinfo->gfproxyd.svc);
2374
    ret = svc->manager(svc, volinfo, PROC_START_NO_WAIT);
2375
    ret = glusterd_svcs_manager(volinfo);
2376

2377
out:
2378
    if (volinfo)
2379
        glusterd_volinfo_unref(volinfo);
2380

2381
    gf_msg_trace(this->name, 0, "returning %d ", ret);
2382
    return ret;
2383
}
2384

2385
int
2386
glusterd_stop_volume(glusterd_volinfo_t *volinfo)
2387
{
2388
    int ret = -1;
2389
    glusterd_brickinfo_t *brickinfo = NULL;
2390
    xlator_t *this = THIS;
2391
    glusterd_svc_t *svc = NULL;
2392

2393
    GF_VALIDATE_OR_GOTO(this->name, volinfo, out);
2394

2395
    cds_list_for_each_entry(brickinfo, &volinfo->bricks, brick_list)
2396
    {
2397
        ret = glusterd_brick_stop(volinfo, brickinfo, _gf_false);
2398
        if (ret) {
2399
            gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRICK_STOP_FAIL,
2400
                   "Failed to stop "
2401
                   "brick (%s)",
2402
                   brickinfo->path);
2403
            goto out;
2404
        }
2405
    }
2406

2407
    glusterd_set_volume_status(volinfo, GLUSTERD_STATUS_STOPPED);
2408

2409
    ret = glusterd_store_volinfo(volinfo, GLUSTERD_VOLINFO_VER_AC_INCREMENT);
2410
    if (ret) {
2411
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOLINFO_SET_FAIL,
2412
               "Failed to store volinfo of "
2413
               "%s volume",
2414
               volinfo->volname);
2415
        goto out;
2416
    }
2417

2418
    if (!volinfo->is_snap_volume) {
2419
        svc = &(volinfo->snapd.svc);
2420
        ret = svc->manager(svc, volinfo, PROC_START_NO_WAIT);
2421
        if (ret)
2422
            goto out;
2423
    }
2424

2425
    ret = glusterd_svcs_manager(volinfo);
2426
    if (ret) {
2427
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOL_GRAPH_CHANGE_NOTIFY_FAIL,
2428
               "Failed to notify graph "
2429
               "change for %s volume",
2430
               volinfo->volname);
2431

2432
        goto out;
2433
    }
2434

2435
out:
2436
    return ret;
2437
}
2438

2439
int
2440
glusterd_op_stop_volume(dict_t *dict)
2441
{
2442
    int ret = 0;
2443
    int flags = 0;
2444
    char *volname = NULL;
2445
    glusterd_volinfo_t *volinfo = NULL;
2446
    xlator_t *this = THIS;
2447

2448
    ret = glusterd_op_stop_volume_args_get(dict, &volname, &flags);
2449
    if (ret)
2450
        goto out;
2451

2452
    ret = glusterd_volinfo_find(volname, &volinfo);
2453
    if (ret) {
2454
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOL_NOT_FOUND,
2455
               FMTSTR_CHECK_VOL_EXISTS, volname);
2456
        goto out;
2457
    }
2458

2459
    ret = glusterd_stop_volume(volinfo);
2460
    if (ret) {
2461
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOL_STOP_FAILED,
2462
               "Failed to stop %s volume", volname);
2463
        goto out;
2464
    }
2465
out:
2466
    gf_msg_trace(this->name, 0, "returning %d ", ret);
2467
    return ret;
2468
}
2469

2470
int
2471
glusterd_op_delete_volume(dict_t *dict)
2472
{
2473
    int ret = 0;
2474
    char *volname = NULL;
2475
    glusterd_volinfo_t *volinfo = NULL;
2476
    xlator_t *this = THIS;
2477

2478
    ret = dict_get_str(dict, "volname", &volname);
2479
    if (ret) {
2480
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
2481
               "Unable to get volume name");
2482
        goto out;
2483
    }
2484

2485
    ret = glusterd_volinfo_find(volname, &volinfo);
2486
    if (ret) {
2487
        gf_msg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOL_NOT_FOUND,
2488
               FMTSTR_CHECK_VOL_EXISTS, volname);
2489
        goto out;
2490
    }
2491

2492
    if (glusterd_check_ganesha_export(volinfo) && is_origin_glusterd(dict)) {
2493
        ret = manage_export_config(volname, "off", NULL);
2494
        if (ret)
2495
            gf_msg(this->name, GF_LOG_WARNING, 0, 0,
2496
                   "Could not delete ganesha export conf file "
2497
                   "for %s",
2498
                   volname);
2499
    }
2500

2501
    ret = glusterd_delete_volume(volinfo);
2502
out:
2503
    gf_msg_debug(this->name, 0, "returning %d", ret);
2504
    return ret;
2505
}
2506

2507
int
2508
glusterd_op_heal_volume(dict_t *dict, char **op_errstr)
2509
{
2510
    int ret = 0;
2511
    /* Necessary subtasks of heal are completed in brick op */
2512

2513
    return ret;
2514
}
2515

2516
static int
2517
glusterd_client_statedump(char *volname, char *options, int option_cnt,
2518
                          char **op_errstr)
2519
{
2520
    int ret = 0;
2521
    char *dup_options = NULL;
2522
    char *option = NULL;
2523
    char *tmpptr = NULL;
2524
    char msg[256] = "";
2525
    char *target_ip = NULL;
2526
    char *pid = NULL;
2527

2528
    dup_options = gf_strdup(options);
2529
    if (!dup_options) {
2530
        gf_smsg("glusterd", GF_LOG_ERROR, errno, GD_MSG_STRDUP_FAILED,
2531
                "options=%s", options, NULL);
2532
        goto out;
2533
    }
2534
    option = strtok_r(dup_options, " ", &tmpptr);
2535
    if (strcmp(option, "client")) {
2536
        snprintf(msg, sizeof(msg),
2537
                 "for gluster client statedump, options "
2538
                 "should be after the key 'client'");
2539
        gf_smsg("glusterd", GF_LOG_ERROR, errno, GD_MSG_INVALID_ENTRY,
2540
                "Options misplaced", NULL);
2541
        *op_errstr = gf_strdup(msg);
2542
        ret = -1;
2543
        goto out;
2544
    }
2545
    target_ip = strtok_r(NULL, " ", &tmpptr);
2546
    if (target_ip == NULL) {
2547
        snprintf(msg, sizeof(msg), "ip address not specified");
2548
        gf_smsg("glusterd", GF_LOG_ERROR, errno, GD_MSG_INVALID_ENTRY, msg,
2549
                NULL);
2550
        *op_errstr = gf_strdup(msg);
2551
        ret = -1;
2552
        goto out;
2553
    }
2554

2555
    pid = strtok_r(NULL, " ", &tmpptr);
2556
    if (pid == NULL) {
2557
        snprintf(msg, sizeof(msg), "pid not specified");
2558
        gf_smsg("glusterd", GF_LOG_ERROR, errno, GD_MSG_INVALID_ENTRY, msg,
2559
                NULL);
2560
        *op_errstr = gf_strdup(msg);
2561
        ret = -1;
2562
        goto out;
2563
    }
2564

2565
    ret = glusterd_client_statedump_submit_req(volname, target_ip, pid);
2566
out:
2567
    GF_FREE(dup_options);
2568
    return ret;
2569
}
2570

2571
int
2572
glusterd_op_statedump_volume(dict_t *dict, char **op_errstr)
2573
{
2574
    int ret = 0;
2575
    char *volname = NULL;
2576
    char *options = NULL;
2577
    int option_cnt = 0;
2578
    glusterd_volinfo_t *volinfo = NULL;
2579
    glusterd_brickinfo_t *brickinfo = NULL;
2580

2581
    ret = glusterd_op_statedump_volume_args_get(dict, &volname, &options,
2582
                                                &option_cnt);
2583
    if (ret)
2584
        goto out;
2585

2586
    ret = glusterd_volinfo_find(volname, &volinfo);
2587
    if (ret)
2588
        goto out;
2589
    gf_msg_debug("glusterd", 0, "Performing statedump on volume %s", volname);
2590
    if (strstr(options, "quotad")) {
2591
        ret = glusterd_quotad_statedump(options, option_cnt, op_errstr);
2592
        if (ret)
2593
            goto out;
2594
#ifdef BUILD_GNFS
2595
    } else if (strstr(options, "nfs") != NULL) {
2596
        ret = glusterd_nfs_statedump(options, option_cnt, op_errstr);
2597
        if (ret)
2598
            goto out;
2599
#endif
2600
    } else if (strstr(options, "client")) {
2601
        ret = glusterd_client_statedump(volname, options, option_cnt,
2602
                                        op_errstr);
2603
        if (ret)
2604
            goto out;
2605

2606
    } else {
2607
        cds_list_for_each_entry(brickinfo, &volinfo->bricks, brick_list)
2608
        {
2609
            ret = glusterd_brick_statedump(volinfo, brickinfo, options,
2610
                                           option_cnt, op_errstr);
2611
            /* Let us take the statedump of other bricks instead of
2612
             * exiting, if statedump of this brick fails.
2613
             */
2614
            if (ret)
2615
                gf_msg(THIS->name, GF_LOG_WARNING, 0, GD_MSG_BRK_STATEDUMP_FAIL,
2616
                       "could not "
2617
                       "take the statedump of the brick %s:%s."
2618
                       " Proceeding to other bricks",
2619
                       brickinfo->hostname, brickinfo->path);
2620
        }
2621
    }
2622

2623
out:
2624
    return ret;
2625
}
2626

2627
int
2628
glusterd_clearlocks_send_cmd(glusterd_volinfo_t *volinfo, char *cmd, char *path,
2629
                             char *result, char *errstr, int err_len,
2630
                             char *mntpt)
2631
{
2632
    int ret = -1;
2633
    char abspath[PATH_MAX] = {
2634
        0,
2635
    };
2636

2637
    snprintf(abspath, sizeof(abspath), "%s/%s", mntpt, path);
2638
    ret = sys_lgetxattr(abspath, cmd, result, PATH_MAX);
2639
    if (ret < 0) {
2640
        snprintf(errstr, err_len,
2641
                 "clear-locks getxattr command "
2642
                 "failed. Reason: %s",
2643
                 strerror(errno));
2644
        gf_msg_debug(THIS->name, 0, "%s", errstr);
2645
        goto out;
2646
    }
2647

2648
    ret = 0;
2649
out:
2650
    return ret;
2651
}
2652

2653
int
2654
glusterd_clearlocks_rmdir_mount(glusterd_volinfo_t *volinfo, char *mntpt)
2655
{
2656
    int ret = -1;
2657

2658
    ret = sys_rmdir(mntpt);
2659
    if (ret) {
2660
        gf_msg_debug(THIS->name, 0, "rmdir failed");
2661
        goto out;
2662
    }
2663

2664
    ret = 0;
2665
out:
2666
    return ret;
2667
}
2668

2669
void
2670
glusterd_clearlocks_unmount(glusterd_volinfo_t *volinfo, char *mntpt)
2671
{
2672
    glusterd_conf_t *priv = NULL;
2673
    runner_t runner = {
2674
        0,
2675
    };
2676
    int ret = 0;
2677

2678
    priv = THIS->private;
2679

2680
    /*umount failures are ignored. Using stat we could have avoided
2681
     * attempting to unmount a non-existent filesystem. But a failure of
2682
     * stat() on mount can be due to network failures.*/
2683

2684
    runinit(&runner);
2685
    runner_add_args(&runner, _PATH_UMOUNT, "-f", NULL);
2686
    runner_argprintf(&runner, "%s", mntpt);
2687

2688
    synclock_unlock(&priv->big_lock);
2689
    ret = runner_run(&runner);
2690
    synclock_lock(&priv->big_lock);
2691
    if (ret) {
2692
        ret = 0;
2693
        gf_msg_debug("glusterd", 0, "umount failed on maintenance client");
2694
    }
2695

2696
    return;
2697
}
2698

2699
int
2700
glusterd_clearlocks_create_mount(glusterd_volinfo_t *volinfo, char **mntpt)
2701
{
2702
    int ret = -1;
2703
    char template[PATH_MAX] = {
2704
        0,
2705
    };
2706
    char *tmpl = NULL;
2707

2708
    snprintf(template, sizeof(template), "/tmp/%s.XXXXXX", volinfo->volname);
2709
    tmpl = mkdtemp(template);
2710
    if (!tmpl) {
2711
        gf_msg_debug(THIS->name, errno,
2712
                     "Couldn't create temporary mount directory.");
2713
        goto out;
2714
    }
2715

2716
    *mntpt = gf_strdup(tmpl);
2717
    ret = 0;
2718
out:
2719
    return ret;
2720
}
2721

2722
int
2723
glusterd_clearlocks_mount(glusterd_volinfo_t *volinfo, char **xl_opts,
2724
                          char *mntpt)
2725
{
2726
    int ret = -1;
2727
    int i = 0;
2728
    glusterd_conf_t *priv = NULL;
2729
    runner_t runner = {
2730
        0,
2731
    };
2732
    char client_volfpath[PATH_MAX] = {
2733
        0,
2734
    };
2735
    char self_heal_opts[3][1024] = {"*replicate*.data-self-heal=off",
2736
                                    "*replicate*.metadata-self-heal=off",
2737
                                    "*replicate*.entry-self-heal=off"};
2738

2739
    priv = THIS->private;
2740

2741
    runinit(&runner);
2742
    glusterd_get_trusted_client_filepath(client_volfpath, volinfo,
2743
                                         volinfo->transport_type);
2744
    runner_add_args(&runner, SBIN_DIR "/glusterfs", "-f", NULL);
2745
    runner_argprintf(&runner, "%s", client_volfpath);
2746
    runner_add_arg(&runner, "-l");
2747
    runner_argprintf(&runner, "%s/%s-clearlocks-mnt.log", priv->logdir,
2748
                     volinfo->volname);
2749
    if (volinfo->memory_accounting)
2750
        runner_add_arg(&runner, "--mem-accounting");
2751

2752
    for (i = 0; i < volinfo->brick_count && xl_opts[i]; i++) {
2753
        runner_add_arg(&runner, "--xlator-option");
2754
        runner_argprintf(&runner, "%s", xl_opts[i]);
2755
    }
2756

2757
    for (i = 0; i < 3; i++) {
2758
        runner_add_args(&runner, "--xlator-option", self_heal_opts[i], NULL);
2759
    }
2760

2761
    runner_argprintf(&runner, "%s", mntpt);
2762
    synclock_unlock(&priv->big_lock);
2763
    ret = runner_run(&runner);
2764
    synclock_lock(&priv->big_lock);
2765
    if (ret) {
2766
        gf_msg_debug(THIS->name, 0, "Could not start glusterfs");
2767
        goto out;
2768
    }
2769
    gf_msg_debug(THIS->name, 0, "Started glusterfs successfully");
2770

2771
out:
2772
    return ret;
2773
}
2774

2775
int
2776
glusterd_clearlocks_get_local_client_ports(glusterd_volinfo_t *volinfo,
2777
                                           char **xl_opts)
2778
{
2779
    glusterd_brickinfo_t *brickinfo = NULL;
2780
    char brickname[PATH_MAX] = {
2781
        0,
2782
    };
2783
    int index = 0;
2784
    int ret = -1;
2785
    int i = 0;
2786
    int32_t len = 0;
2787
    int port = 0;
2788

2789
    GF_ASSERT(xl_opts);
2790
    if (!xl_opts) {
2791
        gf_msg_debug(THIS->name, 0,
2792
                     "Should pass non-NULL "
2793
                     "xl_opts");
2794
        goto out;
2795
    }
2796

2797
    index = -1;
2798
    cds_list_for_each_entry(brickinfo, &volinfo->bricks, brick_list)
2799
    {
2800
        index++;
2801
        if (gf_uuid_compare(brickinfo->uuid, MY_UUID))
2802
            continue;
2803

2804
        if (volinfo->transport_type == GF_TRANSPORT_RDMA) {
2805
            len = snprintf(brickname, sizeof(brickname), "%s.rdma",
2806
                           brickinfo->path);
2807
        } else
2808
            len = snprintf(brickname, sizeof(brickname), "%s", brickinfo->path);
2809
        if ((len < 0) || (len >= sizeof(brickname))) {
2810
            ret = -1;
2811
            goto out;
2812
        }
2813
        port = pmap_registry_search(THIS, brickname, _gf_false);
2814
        if (!port) {
2815
            ret = -1;
2816
            gf_msg_debug(THIS->name, 0,
2817
                         "Couldn't get port "
2818
                         " for brick %s:%s",
2819
                         brickinfo->hostname, brickinfo->path);
2820
            goto out;
2821
        }
2822

2823
        ret = gf_asprintf(&xl_opts[i], "%s-client-%d.remote-port=%d",
2824
                          volinfo->volname, index, port);
2825
        if (ret == -1) {
2826
            xl_opts[i] = NULL;
2827
            goto out;
2828
        }
2829
        i++;
2830
    }
2831

2832
    ret = 0;
2833
out:
2834
    return ret;
2835
}
2836

2837
int
2838
glusterd_op_clearlocks_volume(dict_t *dict, char **op_errstr, dict_t *rsp_dict)
2839
{
2840
    int32_t ret = -1;
2841
    int i = 0;
2842
    char *volname = NULL;
2843
    char *path = NULL;
2844
    char *kind = NULL;
2845
    char *type = NULL;
2846
    char *opts = NULL;
2847
    char *cmd_str = NULL;
2848
    char *free_ptr = NULL;
2849
    char msg[PATH_MAX] = {
2850
        0,
2851
    };
2852
    char result[PATH_MAX] = {
2853
        0,
2854
    };
2855
    char *mntpt = NULL;
2856
    char **xl_opts = NULL;
2857
    glusterd_volinfo_t *volinfo = NULL;
2858
    xlator_t *this = THIS;
2859

2860
    ret = dict_get_str(dict, "volname", &volname);
2861
    if (ret) {
2862
        gf_smsg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED,
2863
                "Key=volname", NULL);
2864
        goto out;
2865
    }
2866
    gf_msg_debug("glusterd", 0, "Performing clearlocks on volume %s", volname);
2867

2868
    ret = dict_get_str(dict, "path", &path);
2869
    if (ret) {
2870
        gf_smsg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, "Key=path",
2871
                NULL);
2872
        goto out;
2873
    }
2874

2875
    ret = dict_get_str(dict, "kind", &kind);
2876
    if (ret) {
2877
        gf_smsg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, "Key=kind",
2878
                NULL);
2879
        goto out;
2880
    }
2881

2882
    ret = dict_get_str(dict, "type", &type);
2883
    if (ret) {
2884
        gf_smsg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_GET_FAILED, "Key=type",
2885
                NULL);
2886
        goto out;
2887
    }
2888

2889
    ret = dict_get_str(dict, "opts", &opts);
2890
    if (ret)
2891
        ret = 0;
2892

2893
    gf_smsg(this->name, GF_LOG_INFO, 0, GD_MSG_CLRCLK_VOL_REQ_RCVD,
2894
            "Volume=%s, Kind=%s, Type=%s, Options=%s", volname, kind, type,
2895
            opts, NULL);
2896

2897
    if (opts)
2898
        ret = gf_asprintf(&cmd_str, GF_XATTR_CLRLK_CMD ".t%s.k%s.%s", type,
2899
                          kind, opts);
2900
    else
2901
        ret = gf_asprintf(&cmd_str, GF_XATTR_CLRLK_CMD ".t%s.k%s", type, kind);
2902
    if (ret == -1)
2903
        goto out;
2904

2905
    ret = glusterd_volinfo_find(volname, &volinfo);
2906
    if (ret) {
2907
        snprintf(msg, sizeof(msg), "Volume %s doesn't exist.", volname);
2908
        gf_smsg(this->name, GF_LOG_ERROR, 0, GD_MSG_VOL_NOT_FOUND, "Volume=%s",
2909
                volname, NULL);
2910
        goto out;
2911
    }
2912

2913
    xl_opts = GF_CALLOC(volinfo->brick_count + 1, sizeof(char *),
2914
                        gf_gld_mt_charptr);
2915
    if (!xl_opts) {
2916
        gf_smsg(this->name, GF_LOG_ERROR, errno, GD_MSG_NO_MEMORY, NULL);
2917
        goto out;
2918
    }
2919

2920
    ret = glusterd_clearlocks_get_local_client_ports(volinfo, xl_opts);
2921
    if (ret) {
2922
        snprintf(msg, sizeof(msg),
2923
                 "Couldn't get port numbers of "
2924
                 "local bricks");
2925
        gf_smsg(this->name, GF_LOG_ERROR, 0, GD_MSG_BRK_PORT_NUM_GET_FAIL,
2926
                NULL);
2927
        goto out;
2928
    }
2929

2930
    ret = glusterd_clearlocks_create_mount(volinfo, &mntpt);
2931
    if (ret) {
2932
        snprintf(msg, sizeof(msg),
2933
                 "Creating mount directory "
2934
                 "for clear-locks failed.");
2935
        gf_smsg(this->name, GF_LOG_ERROR, 0,
2936
                GD_MSG_CLRLOCKS_MOUNTDIR_CREATE_FAIL, NULL);
2937
        goto out;
2938
    }
2939

2940
    ret = glusterd_clearlocks_mount(volinfo, xl_opts, mntpt);
2941
    if (ret) {
2942
        snprintf(msg, sizeof(msg),
2943
                 "Failed to mount clear-locks "
2944
                 "maintenance client.");
2945
        gf_smsg(this->name, GF_LOG_ERROR, 0, GD_MSG_CLRLOCKS_CLNT_MOUNT_FAIL,
2946
                NULL);
2947
        goto out;
2948
    }
2949

2950
    ret = glusterd_clearlocks_send_cmd(volinfo, cmd_str, path, result, msg,
2951
                                       sizeof(msg), mntpt);
2952
    if (ret) {
2953
        gf_smsg(this->name, GF_LOG_ERROR, 0, GD_MSG_CLRCLK_SND_CMD_FAIL, NULL);
2954
        goto umount;
2955
    }
2956

2957
    free_ptr = gf_strdup(result);
2958
    if (dict_set_dynstr_sizen(rsp_dict, "lk-summary", free_ptr)) {
2959
        GF_FREE(free_ptr);
2960
        snprintf(msg, sizeof(msg),
2961
                 "Failed to set clear-locks "
2962
                 "result");
2963
        gf_smsg(this->name, GF_LOG_ERROR, 0, GD_MSG_DICT_SET_FAILED,
2964
                "Key=lk-summary", NULL);
2965
    }
2966

2967
umount:
2968
    glusterd_clearlocks_unmount(volinfo, mntpt);
2969

2970
    if (glusterd_clearlocks_rmdir_mount(volinfo, mntpt))
2971
        gf_smsg(this->name, GF_LOG_WARNING, 0, GD_MSG_CLRLOCKS_CLNT_UMOUNT_FAIL,
2972
                NULL);
2973

2974
out:
2975
    if (ret)
2976
        *op_errstr = gf_strdup(msg);
2977

2978
    if (xl_opts) {
2979
        for (i = 0; i < volinfo->brick_count && xl_opts[i]; i++)
2980
            GF_FREE(xl_opts[i]);
2981
        GF_FREE(xl_opts);
2982
    }
2983

2984
    GF_FREE(cmd_str);
2985

2986
    GF_FREE(mntpt);
2987

2988
    return ret;
2989
}
2990

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.