glusterfs

Форк
0
/
quotad-aggregator.c 
494 строки · 13.0 Кб
1
/*
2
   Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
3
   This file is part of GlusterFS.
4

5
   This file is licensed to you under your choice of the GNU Lesser
6
   General Public License, version 3 or any later version (LGPLv3 or
7
   later), or the GNU General Public License, version 2 (GPLv2), in all
8
   cases as published by the Free Software Foundation.
9
*/
10

11
#include "cli1-xdr.h"
12
#include "quota.h"
13
#include "quotad-helpers.h"
14
#include "quotad-aggregator.h"
15

16
static char *qd_ext_xattrs[] = {
17
    QUOTA_SIZE_KEY,
18
    QUOTA_LIMIT_KEY,
19
    QUOTA_LIMIT_OBJECTS_KEY,
20
    NULL,
21
};
22

23
static struct rpcsvc_program quotad_aggregator_prog;
24

25
struct iobuf *
26
quotad_serialize_reply(rpcsvc_request_t *req, void *arg, struct iovec *outmsg,
27
                       xdrproc_t xdrproc)
28
{
29
    struct iobuf *iob = NULL;
30
    ssize_t retlen = 0;
31
    ssize_t xdr_size = 0;
32

33
    GF_VALIDATE_OR_GOTO("server", req, ret);
34

35
    /* First, get the io buffer into which the reply in arg will
36
     * be serialized.
37
     */
38
    if (arg && xdrproc) {
39
        xdr_size = xdr_sizeof(xdrproc, arg);
40
        iob = iobuf_get2(req->svc->ctx->iobuf_pool, xdr_size);
41
        if (!iob) {
42
            gf_log_callingfn(THIS->name, GF_LOG_ERROR, "Failed to get iobuf");
43
            goto ret;
44
        };
45

46
        iobuf_to_iovec(iob, outmsg);
47
        /* Use the given serializer to translate the given C structure
48
         * in arg to XDR format which will be written into the buffer
49
         * in outmsg.
50
         */
51
        /* retlen is used to received the error since size_t is unsigned and we
52
         * need -1 for error notification during encoding.
53
         */
54

55
        retlen = xdr_serialize_generic(*outmsg, arg, xdrproc);
56
        if (retlen == -1) {
57
            /* Failed to Encode 'GlusterFS' msg in RPC is not exactly
58
               failure of RPC return values.. Client should get
59
               notified about this, so there are no missing frames */
60
            gf_log_callingfn("", GF_LOG_ERROR, "Failed to encode message");
61
            req->rpc_err = GARBAGE_ARGS;
62
            retlen = 0;
63
        }
64
    }
65
    outmsg->iov_len = retlen;
66
ret:
67
    return iob;
68
}
69

70
int
71
quotad_aggregator_submit_reply(call_frame_t *frame, rpcsvc_request_t *req,
72
                               void *arg, struct iovec *payload,
73
                               int payloadcount, struct iobref *iobref,
74
                               xdrproc_t xdrproc)
75
{
76
    struct iobuf *iob = NULL;
77
    int ret = -1;
78
    struct iovec rsp = {
79
        0,
80
    };
81
    quotad_aggregator_state_t *state = NULL;
82
    char new_iobref = 0;
83

84
    GF_VALIDATE_OR_GOTO("server", req, ret);
85

86
    if (frame) {
87
        state = frame->root->state;
88
        frame->local = NULL;
89
    }
90

91
    if (!iobref) {
92
        iobref = iobref_new();
93
        if (!iobref) {
94
            goto ret;
95
        }
96

97
        new_iobref = 1;
98
    }
99

100
    iob = quotad_serialize_reply(req, arg, &rsp, xdrproc);
101
    if (!iob) {
102
        gf_msg("", GF_LOG_ERROR, 0, Q_MSG_DICT_SERIALIZE_FAIL,
103
               "Failed to serialize reply");
104
        goto ret;
105
    }
106

107
    iobref_add(iobref, iob);
108

109
    ret = rpcsvc_submit_generic(req, &rsp, 1, payload, payloadcount, iobref);
110

111
    iobuf_unref(iob);
112

113
    ret = 0;
114
ret:
115
    if (state) {
116
        quotad_aggregator_free_state(state);
117
    }
118

119
    if (frame)
120
        STACK_DESTROY(frame->root);
121

122
    if (new_iobref) {
123
        iobref_unref(iobref);
124
    }
125

126
    return ret;
127
}
128

129
int
130
quotad_aggregator_getlimit_cbk(xlator_t *this, call_frame_t *frame,
131
                               void *lookup_rsp)
132
{
133
    gfs3_lookup_rsp *rsp = lookup_rsp;
134
    gf_cli_rsp cli_rsp = {
135
        0,
136
    };
137
    dict_t *xdata = NULL;
138
    quotad_aggregator_state_t *state = NULL;
139
    int ret = -1;
140
    int type = 0;
141

142
    if (!rsp || (rsp->op_ret == -1))
143
        goto reply;
144

145
    GF_PROTOCOL_DICT_UNSERIALIZE(frame->this, xdata, (rsp->xdata.xdata_val),
146
                                 (rsp->xdata.xdata_len), rsp->op_ret,
147
                                 rsp->op_errno, out);
148

149
    if (xdata) {
150
        state = frame->root->state;
151
        ret = dict_get_int32n(state->req_xdata, "type", SLEN("type"), &type);
152
        if (ret < 0)
153
            goto out;
154

155
        ret = dict_set_int32_sizen(xdata, "type", type);
156
        if (ret < 0)
157
            goto out;
158
    }
159

160
    ret = 0;
161
out:
162
    rsp->op_ret = ret;
163
    if (ret) {
164
        gf_msg(this->name, GF_LOG_ERROR, 0, Q_MSG_DICT_UNSERIALIZE_FAIL,
165
               "failed to unserialize "
166
               "nameless lookup rsp");
167
        goto reply;
168
    }
169
    cli_rsp.op_ret = rsp->op_ret;
170
    cli_rsp.op_errno = rsp->op_errno;
171
    cli_rsp.op_errstr = "";
172
    if (xdata) {
173
        GF_PROTOCOL_DICT_SERIALIZE(frame->this, xdata, (&cli_rsp.dict.dict_val),
174
                                   (cli_rsp.dict.dict_len), cli_rsp.op_errno,
175
                                   reply);
176
    }
177

178
reply:
179
    quotad_aggregator_submit_reply(frame, (frame) ? frame->local : NULL,
180
                                   (void *)&cli_rsp, NULL, 0, NULL,
181
                                   (xdrproc_t)xdr_gf_cli_rsp);
182

183
    dict_unref(xdata);
184
    GF_FREE(cli_rsp.dict.dict_val);
185
    return 0;
186
}
187

188
int
189
quotad_aggregator_getlimit(rpcsvc_request_t *req)
190
{
191
    call_frame_t *frame = NULL;
192
    gf_cli_req cli_req = {
193
        {0},
194
    };
195
    gf_cli_rsp cli_rsp = {0};
196
    quotad_aggregator_state_t *state = NULL;
197
    xlator_t *this = NULL;
198
    dict_t *dict = NULL;
199
    int ret = -1, op_errno = 0;
200
    char *gfid_str = NULL;
201
    uuid_t gfid = {0};
202
    char *volume_uuid = NULL;
203

204
    GF_VALIDATE_OR_GOTO("quotad-aggregator", req, err);
205

206
    this = THIS;
207

208
    cli_req.dict.dict_val = alloca(req->msg[0].iov_len);
209

210
    ret = xdr_to_generic(req->msg[0], &cli_req, (xdrproc_t)xdr_gf_cli_req);
211
    if (ret < 0) {
212
        // failed to decode msg;
213
        gf_msg("this->name", GF_LOG_ERROR, 0, Q_MSG_XDR_DECODE_ERROR,
214
               "xdr decoding error");
215
        req->rpc_err = GARBAGE_ARGS;
216
        goto err;
217
    }
218

219
    if (cli_req.dict.dict_len) {
220
        dict = dict_new();
221
        ret = dict_unserialize(cli_req.dict.dict_val, cli_req.dict.dict_len,
222
                               &dict);
223
        if (ret < 0) {
224
            gf_msg(this->name, GF_LOG_ERROR, 0, Q_MSG_DICT_UNSERIALIZE_FAIL,
225
                   "Failed to unserialize req-buffer to "
226
                   "dictionary");
227
            goto err;
228
        }
229
    }
230

231
    ret = dict_get_strn(dict, "gfid", SLEN("gfid"), &gfid_str);
232
    if (ret) {
233
        goto err;
234
    }
235

236
    ret = dict_get_strn(dict, "volume-uuid", SLEN("volume-uuid"), &volume_uuid);
237
    if (ret) {
238
        goto err;
239
    }
240

241
    gf_uuid_parse((const char *)gfid_str, gfid);
242

243
    frame = quotad_aggregator_get_frame_from_req(req);
244
    if (frame == NULL) {
245
        cli_rsp.op_errno = ENOMEM;
246
        goto errx;
247
    }
248
    state = frame->root->state;
249
    state->req_xdata = dict;
250
    state->xdata = dict_new();
251
    dict = NULL;
252

253
    ret = dict_set_int32_sizen(state->xdata, QUOTA_LIMIT_KEY, 42);
254
    if (ret)
255
        goto err;
256

257
    ret = dict_set_int32_sizen(state->xdata, QUOTA_LIMIT_OBJECTS_KEY, 42);
258
    if (ret) {
259
        gf_msg(this->name, GF_LOG_ERROR, ENOMEM, Q_MSG_ENOMEM,
260
               "Failed to set QUOTA_LIMIT_OBJECTS_KEY");
261
        goto err;
262
    }
263

264
    ret = dict_set_int32_sizen(state->xdata, QUOTA_SIZE_KEY, 42);
265
    if (ret)
266
        goto err;
267

268
    ret = dict_set_int32_sizen(state->xdata, GET_ANCESTRY_PATH_KEY, 42);
269
    if (ret)
270
        goto err;
271

272
    ret = qd_nameless_lookup(this, frame, (char *)gfid, state->xdata,
273
                             volume_uuid, quotad_aggregator_getlimit_cbk);
274
    if (ret) {
275
        cli_rsp.op_errno = ret;
276
        goto errx;
277
    }
278

279
    return ret;
280

281
err:
282
    cli_rsp.op_errno = op_errno;
283
errx:
284
    cli_rsp.op_ret = -1;
285
    cli_rsp.op_errstr = "";
286

287
    quotad_aggregator_getlimit_cbk(this, frame, &cli_rsp);
288
    if (dict)
289
        dict_unref(dict);
290
    return ret;
291
}
292

293
int
294
quotad_aggregator_lookup_cbk(xlator_t *this, call_frame_t *frame, void *rsp)
295
{
296
    quotad_aggregator_submit_reply(frame, frame ? frame->local : NULL, rsp,
297
                                   NULL, 0, NULL,
298
                                   (xdrproc_t)xdr_gfs3_lookup_rsp);
299

300
    return 0;
301
}
302

303
int
304
quotad_aggregator_lookup(rpcsvc_request_t *req)
305
{
306
    call_frame_t *frame = NULL;
307
    gfs3_lookup_req args = {
308
        {
309
            0,
310
        },
311
    };
312
    int i = 0, ret = -1, op_errno = 0;
313
    gfs3_lookup_rsp rsp = {
314
        0,
315
    };
316
    quotad_aggregator_state_t *state = NULL;
317
    xlator_t *this = NULL;
318
    dict_t *dict = NULL;
319
    char *volume_uuid = NULL;
320

321
    GF_VALIDATE_OR_GOTO("quotad-aggregator", req, err);
322

323
    this = THIS;
324

325
    args.bname = alloca(req->msg[0].iov_len);
326
    args.xdata.xdata_val = alloca(req->msg[0].iov_len);
327

328
    ret = xdr_to_generic(req->msg[0], &args, (xdrproc_t)xdr_gfs3_lookup_req);
329
    if (ret < 0) {
330
        rsp.op_errno = EINVAL;
331
        goto err;
332
    }
333

334
    frame = quotad_aggregator_get_frame_from_req(req);
335
    if (frame == NULL) {
336
        rsp.op_errno = ENOMEM;
337
        goto err;
338
    }
339

340
    state = frame->root->state;
341

342
    GF_PROTOCOL_DICT_UNSERIALIZE(this, dict, (args.xdata.xdata_val),
343
                                 (args.xdata.xdata_len), ret, op_errno, err);
344

345
    ret = dict_get_str(dict, "volume-uuid", &volume_uuid);
346
    if (ret) {
347
        goto err;
348
    }
349

350
    state->xdata = dict_new();
351

352
    for (i = 0; qd_ext_xattrs[i]; i++) {
353
        if (dict_get(dict, qd_ext_xattrs[i])) {
354
            ret = dict_set_uint32(state->xdata, qd_ext_xattrs[i], 1);
355
            if (ret < 0)
356
                goto err;
357
        }
358
    }
359

360
    ret = qd_nameless_lookup(this, frame, args.gfid, state->xdata, volume_uuid,
361
                             quotad_aggregator_lookup_cbk);
362
    if (ret) {
363
        rsp.op_errno = ret;
364
        goto err;
365
    }
366

367
    if (dict)
368
        dict_unref(dict);
369

370
    return ret;
371

372
err:
373
    rsp.op_ret = -1;
374
    rsp.op_errno = op_errno;
375

376
    quotad_aggregator_lookup_cbk(this, frame, &rsp);
377
    if (dict)
378
        dict_unref(dict);
379

380
    return ret;
381
}
382

383
int
384
quotad_aggregator_rpc_notify(rpcsvc_t *rpc, void *xl, rpcsvc_event_t event,
385
                             void *data)
386
{
387
    if (!xl || !data) {
388
        gf_log_callingfn("server", GF_LOG_WARNING,
389
                         "Calling rpc_notify without initializing");
390
        goto out;
391
    }
392

393
    switch (event) {
394
        case RPCSVC_EVENT_ACCEPT:
395
            break;
396

397
        case RPCSVC_EVENT_DISCONNECT:
398
            break;
399

400
        default:
401
            break;
402
    }
403

404
out:
405
    return 0;
406
}
407

408
int
409
quotad_aggregator_init(xlator_t *this)
410
{
411
    quota_priv_t *priv = NULL;
412
    int ret = -1;
413

414
    priv = this->private;
415

416
    if (priv->rpcsvc) {
417
        /* Listener already created */
418
        return 0;
419
    }
420

421
    ret = dict_set_nstrn(this->options, "transport.address-family",
422
                         SLEN("transport.address-family"), "unix",
423
                         SLEN("unix"));
424
    if (ret)
425
        goto out;
426

427
    ret = dict_set_nstrn(this->options, "transport-type",
428
                         SLEN("transport-type"), "socket", SLEN("socket"));
429
    if (ret)
430
        goto out;
431

432
    ret = dict_set_nstrn(this->options, "transport.socket.listen-path",
433
                         SLEN("transport.socket.listen-path"),
434
                         "/var/run/gluster/quotad.socket",
435
                         SLEN("/var/run/gluster/quotad.socket"));
436
    if (ret)
437
        goto out;
438

439
    /* RPC related */
440
    priv->rpcsvc = rpcsvc_init(this, this->ctx, this->options, 0);
441
    if (priv->rpcsvc == NULL) {
442
        gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_RPCSVC_INIT_FAILED,
443
               "creation of rpcsvc failed");
444
        ret = -1;
445
        goto out;
446
    }
447

448
    ret = rpcsvc_create_listeners(priv->rpcsvc, this->options, this->name);
449
    if (ret < 1) {
450
        gf_msg(this->name, GF_LOG_WARNING, 0,
451
               Q_MSG_RPCSVC_LISTENER_CREATION_FAILED,
452
               "creation of listener failed");
453
        ret = -1;
454
        goto out;
455
    }
456

457
    priv->quotad_aggregator = &quotad_aggregator_prog;
458
    quotad_aggregator_prog.options = this->options;
459

460
    ret = rpcsvc_program_register(priv->rpcsvc, &quotad_aggregator_prog,
461
                                  _gf_false);
462
    if (ret) {
463
        gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_RPCSVC_REGISTER_FAILED,
464
               "registration of program (name:%s, prognum:%d, "
465
               "progver:%d) failed",
466
               quotad_aggregator_prog.progname, quotad_aggregator_prog.prognum,
467
               quotad_aggregator_prog.progver);
468
        goto out;
469
    }
470

471
    ret = 0;
472
out:
473
    if (ret && priv->rpcsvc) {
474
        GF_FREE(priv->rpcsvc);
475
        priv->rpcsvc = NULL;
476
    }
477

478
    return ret;
479
}
480

481
static rpcsvc_actor_t quotad_aggregator_actors[GF_AGGREGATOR_MAXVALUE] = {
482
    [GF_AGGREGATOR_NULL] = {"NULL", NULL, NULL, GF_AGGREGATOR_NULL, DRC_NA, 0},
483
    [GF_AGGREGATOR_LOOKUP] = {"LOOKUP", quotad_aggregator_lookup, NULL,
484
                              GF_AGGREGATOR_NULL, DRC_NA, 0},
485
    [GF_AGGREGATOR_GETLIMIT] = {"GETLIMIT", quotad_aggregator_getlimit, NULL,
486
                                GF_AGGREGATOR_GETLIMIT, DRC_NA, 0},
487
};
488

489
static struct rpcsvc_program quotad_aggregator_prog = {
490
    .progname = "GlusterFS 3.3",
491
    .prognum = GLUSTER_AGGREGATOR_PROGRAM,
492
    .progver = GLUSTER_AGGREGATOR_VERSION,
493
    .numactors = GF_AGGREGATOR_MAXVALUE,
494
    .actors = quotad_aggregator_actors};
495

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.