glusterfs

Форк
0
/
quota-enforcer-client.c 
490 строк · 12.1 Кб
1
/*
2
   Copyright (c) 2010-2012 Red Hat, Inc. <http://www.redhat.com>
3
   This file is part of GlusterFS.
4

5
   This file is licensed to you under your choice of the GNU Lesser
6
   General Public License, version 3 or any later version (LGPLv3 or
7
   later), or the GNU General Public License, version 2 (GPLv2), in all
8
   cases as published by the Free Software Foundation.
9
*/
10
#include <stdio.h>
11
#include <string.h>
12
#include <netinet/in.h>
13
#include <sys/socket.h>
14
#include <sys/types.h>
15
#include <sys/file.h>
16
#include <netdb.h>
17
#include <signal.h>
18
#include <libgen.h>
19

20
#include <sys/utsname.h>
21

22
#include <stdint.h>
23
#include <pthread.h>
24
#include <sys/stat.h>
25
#include <fcntl.h>
26
#include <time.h>
27
#include <semaphore.h>
28
#include <errno.h>
29

30
#ifdef HAVE_MALLOC_H
31
#include <malloc.h>
32
#endif
33

34
#include "quota.h"
35
#include "quota-messages.h"
36

37
static struct rpc_clnt_program quota_enforcer_clnt;
38

39
static int
40
quota_enforcer_submit_request(void *req, call_frame_t *frame,
41
                              rpc_clnt_prog_t *prog, int procnum,
42
                              xlator_t *this, fop_cbk_fn_t cbkfn,
43
                              xdrproc_t xdrproc)
44
{
45
    int ret = -1;
46
    int count = 0;
47
    struct iovec iov = {
48
        0,
49
    };
50
    struct iobuf *iobuf = NULL;
51
    struct iobref *iobref = NULL;
52
    ssize_t xdr_size = 0;
53
    quota_priv_t *priv = NULL;
54

55
    GF_ASSERT(req);
56
    GF_ASSERT(this);
57

58
    priv = this->private;
59

60
    xdr_size = xdr_sizeof(xdrproc, req);
61
    iobuf = iobuf_get2(this->ctx->iobuf_pool, xdr_size);
62
    if (!iobuf) {
63
        goto out;
64
    }
65

66
    iobref = iobref_new();
67
    if (!iobref) {
68
        goto out;
69
    }
70

71
    iobref_add(iobref, iobuf);
72

73
    iov.iov_base = iobuf->ptr;
74
    iov.iov_len = iobuf_size(iobuf);
75

76
    /* Create the xdr payload */
77
    ret = xdr_serialize_generic(iov, req, xdrproc);
78
    if (ret == -1) {
79
        goto out;
80
    }
81
    iov.iov_len = ret;
82
    count = 1;
83

84
    /* Send the msg */
85
    ret = rpc_clnt_submit(priv->rpc_clnt, prog, procnum, cbkfn, &iov, count,
86
                          NULL, 0, iobref, frame, NULL, 0, NULL, 0, NULL);
87
out:
88
    if (iobref)
89
        iobref_unref(iobref);
90
    if (iobuf)
91
        iobuf_unref(iobuf);
92

93
    return ret;
94
}
95

96
int
97
quota_enforcer_lookup_cbk(struct rpc_req *req, struct iovec *iov, int count,
98
                          void *myframe)
99
{
100
    quota_local_t *local = NULL;
101
    call_frame_t *frame = NULL;
102
    int ret = 0;
103
    gfs3_lookup_rsp rsp = {
104
        0,
105
    };
106
    struct iatt stbuf = {
107
        0,
108
    };
109
    struct iatt postparent = {
110
        0,
111
    };
112
    int op_errno = EINVAL;
113
    dict_t *xdata = NULL;
114
    inode_t *inode = NULL;
115
    xlator_t *this = NULL;
116
    quota_priv_t *priv = NULL;
117
    struct timespec retry_delay = {
118
        0,
119
    };
120
    gf_timer_t *timer = NULL;
121

122
    this = THIS;
123

124
    frame = myframe;
125
    local = frame->local;
126
    inode = local->validate_loc.inode;
127
    priv = this->private;
128

129
    if (-1 == req->rpc_status) {
130
        rsp.op_ret = -1;
131
        op_errno = ENOTCONN;
132
        goto out;
133
    }
134

135
    ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gfs3_lookup_rsp);
136
    if (ret < 0) {
137
        gf_msg(this->name, GF_LOG_ERROR, 0, Q_MSG_XDR_DECODING_FAILED,
138
               "XDR decoding failed");
139
        rsp.op_ret = -1;
140
        op_errno = EINVAL;
141
        goto out;
142
    }
143

144
    op_errno = gf_error_to_errno(rsp.op_errno);
145
    gf_stat_to_iatt(&rsp.postparent, &postparent);
146

147
    if (rsp.op_ret == -1)
148
        goto out;
149

150
    rsp.op_ret = -1;
151
    gf_stat_to_iatt(&rsp.stat, &stbuf);
152

153
    GF_PROTOCOL_DICT_UNSERIALIZE(frame->this, xdata, (rsp.xdata.xdata_val),
154
                                 (rsp.xdata.xdata_len), rsp.op_ret, op_errno,
155
                                 out);
156

157
    if ((!gf_uuid_is_null(inode->gfid)) &&
158
        (gf_uuid_compare(stbuf.ia_gfid, inode->gfid) != 0)) {
159
        gf_msg_debug(frame->this->name, ESTALE, "gfid changed for %s",
160
                     local->validate_loc.path);
161
        rsp.op_ret = -1;
162
        op_errno = ESTALE;
163
        goto out;
164
    }
165

166
    rsp.op_ret = 0;
167

168
out:
169
    rsp.op_errno = op_errno;
170

171
    /* We need to retry connecting to quotad on ENOTCONN error.
172
     * Suppose if there are two volumes vol1 and vol2,
173
     * and quota is enabled and limit is set on vol1.
174
     * Now if IO is happening on vol1 and quota is enabled/disabled
175
     * on vol2, quotad gets restarted and client will receive
176
     * ENOTCONN in the IO path of vol1
177
     */
178
    if (rsp.op_ret == -1 && rsp.op_errno == ENOTCONN) {
179
        if (local->quotad_conn_retry >= 12) {
180
            priv->quotad_conn_status = 1;
181
            gf_log(this->name, GF_LOG_WARNING,
182
                   "failed to connect "
183
                   "to quotad after retry count %d)",
184
                   local->quotad_conn_retry);
185
        } else {
186
            local->quotad_conn_retry++;
187
        }
188

189
        if (priv->quotad_conn_status == 0) {
190
            /* retry connecting after 5secs for 12 retries
191
             * (up to 60sec).
192
             */
193
            gf_log(this->name, GF_LOG_DEBUG,
194
                   "retry connecting to "
195
                   "quotad (retry count %d)",
196
                   local->quotad_conn_retry);
197

198
            retry_delay.tv_sec = 5;
199
            retry_delay.tv_nsec = 0;
200
            timer = gf_timer_call_after(this->ctx, retry_delay,
201
                                        _quota_enforcer_lookup, (void *)frame);
202
            if (timer == NULL) {
203
                gf_log(this->name, GF_LOG_WARNING,
204
                       "failed to "
205
                       "set quota_enforcer_lookup with timer");
206
            } else {
207
                goto clean;
208
            }
209
        }
210
    } else {
211
        priv->quotad_conn_status = 0;
212
    }
213

214
    if (rsp.op_ret == -1) {
215
        /* any error other than ENOENT */
216
        if (rsp.op_errno != ENOENT)
217
            gf_msg(
218
                this->name, GF_LOG_WARNING, rsp.op_errno, Q_MSG_LOOKUP_FAILED,
219
                "Getting cluster-wide size of directory failed "
220
                "(path: %s gfid:%s)",
221
                local->validate_loc.path, loc_gfid_utoa(&local->validate_loc));
222
        else
223
            gf_msg_trace(this->name, ENOENT, "not found on remote node");
224

225
    } else if (local->quotad_conn_retry) {
226
        gf_log(this->name, GF_LOG_DEBUG,
227
               "connected to quotad after "
228
               "retry count %d",
229
               local->quotad_conn_retry);
230
    }
231

232
    local->validate_cbk(frame, NULL, this, rsp.op_ret, rsp.op_errno, inode,
233
                        &stbuf, xdata, &postparent);
234

235
clean:
236
    if (xdata)
237
        dict_unref(xdata);
238

239
    free(rsp.xdata.xdata_val);
240

241
    return 0;
242
}
243

244
void
245
_quota_enforcer_lookup(void *data)
246
{
247
    quota_local_t *local = NULL;
248
    gfs3_lookup_req req = {
249
        {
250
            0,
251
        },
252
    };
253
    int ret = 0;
254
    int op_errno = ESTALE;
255
    quota_priv_t *priv = NULL;
256
    call_frame_t *frame = NULL;
257
    loc_t *loc = NULL;
258
    xlator_t *this = NULL;
259
    char *dir_path = NULL;
260

261
    frame = data;
262
    local = frame->local;
263
    this = local->this;
264
    loc = &local->validate_loc;
265

266
    priv = this->private;
267

268
    if (!(loc && loc->inode))
269
        goto unwind;
270

271
    if (!gf_uuid_is_null(loc->inode->gfid))
272
        memcpy(req.gfid, loc->inode->gfid, 16);
273
    else
274
        memcpy(req.gfid, loc->gfid, 16);
275

276
    if (local->validate_xdata) {
277
        GF_PROTOCOL_DICT_SERIALIZE(this, local->validate_xdata,
278
                                   (&req.xdata.xdata_val), req.xdata.xdata_len,
279
                                   op_errno, unwind);
280
    }
281

282
    if (loc->name)
283
        req.bname = (char *)loc->name;
284
    else
285
        req.bname = "";
286

287
    if (loc->path)
288
        dir_path = (char *)loc->path;
289
    else
290
        dir_path = "";
291

292
    ret = quota_enforcer_submit_request(
293
        &req, frame, priv->quota_enforcer, GF_AGGREGATOR_LOOKUP, this,
294
        quota_enforcer_lookup_cbk, (xdrproc_t)xdr_gfs3_lookup_req);
295

296
    if (ret) {
297
        gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_RPC_SUBMIT_FAILED,
298
               "Couldn't send the request to "
299
               "fetch cluster wide size of directory (path:%s gfid:%s)",
300
               dir_path, req.gfid);
301
    }
302

303
    GF_FREE(req.xdata.xdata_val);
304

305
    return;
306

307
unwind:
308
    local->validate_cbk(frame, NULL, this, -1, op_errno, NULL, NULL, NULL,
309
                        NULL);
310

311
    GF_FREE(req.xdata.xdata_val);
312

313
    return;
314
}
315

316
int
317
quota_enforcer_lookup(call_frame_t *frame, xlator_t *this, dict_t *xdata,
318
                      fop_lookup_cbk_t validate_cbk)
319
{
320
    quota_local_t *local = NULL;
321

322
    if (!frame || !this)
323
        goto unwind;
324

325
    local = frame->local;
326
    local->this = this;
327
    local->validate_cbk = validate_cbk;
328
    local->validate_xdata = dict_ref(xdata);
329

330
    _quota_enforcer_lookup(frame);
331

332
    return 0;
333

334
unwind:
335
    validate_cbk(frame, NULL, this, -1, ESTALE, NULL, NULL, NULL, NULL);
336

337
    return 0;
338
}
339

340
int
341
quota_enforcer_notify(struct rpc_clnt *rpc, void *mydata,
342
                      rpc_clnt_event_t event, void *data)
343
{
344
    xlator_t *this = NULL;
345
    int ret = 0;
346
    quota_priv_t *priv = NULL;
347

348
    this = mydata;
349
    priv = this->private;
350
    switch (event) {
351
        case RPC_CLNT_CONNECT: {
352
            pthread_mutex_lock(&priv->conn_mutex);
353
            {
354
                priv->conn_status = _gf_true;
355
            }
356
            pthread_mutex_unlock(&priv->conn_mutex);
357
            gf_msg_trace(this->name, 0, "got RPC_CLNT_CONNECT");
358
            break;
359
        }
360

361
        case RPC_CLNT_DISCONNECT: {
362
            pthread_mutex_lock(&priv->conn_mutex);
363
            {
364
                priv->conn_status = _gf_false;
365
                pthread_cond_signal(&priv->conn_cond);
366
            }
367
            pthread_mutex_unlock(&priv->conn_mutex);
368
            gf_msg_trace(this->name, 0, "got RPC_CLNT_DISCONNECT");
369
            break;
370
        }
371

372
        default:
373
            gf_msg_trace(this->name, 0, "got some other RPC event %d", event);
374
            ret = 0;
375
            break;
376
    }
377

378
    return ret;
379
}
380

381
int
382
quota_enforcer_blocking_connect(rpc_clnt_t *rpc)
383
{
384
    dict_t *options = NULL;
385
    int ret = -1;
386

387
    options = dict_new();
388
    if (options == NULL)
389
        goto out;
390

391
    ret = dict_set_sizen_str_sizen(options, "non-blocking-io", "no");
392
    if (ret)
393
        goto out;
394

395
    rpc->conn.trans->reconfigure(rpc->conn.trans, options);
396

397
    rpc_clnt_start(rpc);
398

399
    ret = dict_set_sizen_str_sizen(options, "non-blocking-io", "yes");
400
    if (ret)
401
        goto out;
402

403
    rpc->conn.trans->reconfigure(rpc->conn.trans, options);
404

405
    ret = 0;
406
out:
407
    if (options)
408
        dict_unref(options);
409

410
    return ret;
411
}
412

413
// Returns a started rpc_clnt. Creates a new rpc_clnt if quota_priv doesn't have
414
// one already
415
struct rpc_clnt *
416
quota_enforcer_init(xlator_t *this, dict_t *options)
417
{
418
    struct rpc_clnt *rpc = NULL;
419
    quota_priv_t *priv = NULL;
420
    int ret = -1;
421

422
    priv = this->private;
423

424
    LOCK(&priv->lock);
425
    {
426
        if (priv->rpc_clnt) {
427
            ret = 0;
428
            rpc = priv->rpc_clnt;
429
        }
430
    }
431
    UNLOCK(&priv->lock);
432

433
    if (rpc)
434
        goto out;
435

436
    priv->quota_enforcer = &quota_enforcer_clnt;
437

438
    ret = dict_set_sizen_str_sizen(options, "transport.address-family", "unix");
439
    if (ret)
440
        goto out;
441

442
    ret = dict_set_sizen_str_sizen(options, "transport-type", "socket");
443
    if (ret)
444
        goto out;
445

446
    ret = dict_set_sizen_str_sizen(options, "transport.socket.connect-path",
447
                                   "/var/run/gluster/quotad.socket");
448
    if (ret)
449
        goto out;
450

451
    rpc = rpc_clnt_new(options, this, this->name, 16);
452
    if (!rpc) {
453
        ret = -1;
454
        goto out;
455
    }
456

457
    ret = rpc_clnt_register_notify(rpc, quota_enforcer_notify, this);
458
    if (ret) {
459
        gf_msg("quota", GF_LOG_ERROR, 0, Q_MSG_RPCCLNT_REGISTER_NOTIFY_FAILED,
460
               "failed to register notify");
461
        goto out;
462
    }
463

464
    ret = quota_enforcer_blocking_connect(rpc);
465
    if (ret)
466
        goto out;
467

468
    ret = 0;
469
out:
470
    if (ret) {
471
        if (rpc)
472
            rpc_clnt_unref(rpc);
473
        rpc = NULL;
474
    }
475

476
    return rpc;
477
}
478

479
static struct rpc_clnt_procedure quota_enforcer_actors[GF_AGGREGATOR_MAXVALUE] = {
480
    [GF_AGGREGATOR_NULL] = {"NULL", NULL},
481
    [GF_AGGREGATOR_LOOKUP] = {"LOOKUP", NULL},
482
};
483

484
static struct rpc_clnt_program quota_enforcer_clnt = {
485
    .progname = "Quota enforcer",
486
    .prognum = GLUSTER_AGGREGATOR_PROGRAM,
487
    .progver = GLUSTER_AGGREGATOR_VERSION,
488
    .numproc = GF_AGGREGATOR_MAXVALUE,
489
    .proctable = quota_enforcer_actors,
490
};
491

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.