12
#include <netinet/in.h>
13
#include <sys/socket.h>
20
#include <sys/utsname.h>
35
#include "quota-messages.h"
37
static struct rpc_clnt_program quota_enforcer_clnt;
40
quota_enforcer_submit_request(void *req, call_frame_t *frame,
41
rpc_clnt_prog_t *prog, int procnum,
42
xlator_t *this, fop_cbk_fn_t cbkfn,
50
struct iobuf *iobuf = NULL;
51
struct iobref *iobref = NULL;
53
quota_priv_t *priv = NULL;
60
xdr_size = xdr_sizeof(xdrproc, req);
61
iobuf = iobuf_get2(this->ctx->iobuf_pool, xdr_size);
66
iobref = iobref_new();
71
iobref_add(iobref, iobuf);
73
iov.iov_base = iobuf->ptr;
74
iov.iov_len = iobuf_size(iobuf);
77
ret = xdr_serialize_generic(iov, req, xdrproc);
85
ret = rpc_clnt_submit(priv->rpc_clnt, prog, procnum, cbkfn, &iov, count,
86
NULL, 0, iobref, frame, NULL, 0, NULL, 0, NULL);
97
quota_enforcer_lookup_cbk(struct rpc_req *req, struct iovec *iov, int count,
100
quota_local_t *local = NULL;
101
call_frame_t *frame = NULL;
103
gfs3_lookup_rsp rsp = {
106
struct iatt stbuf = {
109
struct iatt postparent = {
112
int op_errno = EINVAL;
113
dict_t *xdata = NULL;
114
inode_t *inode = NULL;
115
xlator_t *this = NULL;
116
quota_priv_t *priv = NULL;
117
struct timespec retry_delay = {
120
gf_timer_t *timer = NULL;
125
local = frame->local;
126
inode = local->validate_loc.inode;
127
priv = this->private;
129
if (-1 == req->rpc_status) {
135
ret = xdr_to_generic(*iov, &rsp, (xdrproc_t)xdr_gfs3_lookup_rsp);
137
gf_msg(this->name, GF_LOG_ERROR, 0, Q_MSG_XDR_DECODING_FAILED,
138
"XDR decoding failed");
144
op_errno = gf_error_to_errno(rsp.op_errno);
145
gf_stat_to_iatt(&rsp.postparent, &postparent);
147
if (rsp.op_ret == -1)
151
gf_stat_to_iatt(&rsp.stat, &stbuf);
153
GF_PROTOCOL_DICT_UNSERIALIZE(frame->this, xdata, (rsp.xdata.xdata_val),
154
(rsp.xdata.xdata_len), rsp.op_ret, op_errno,
157
if ((!gf_uuid_is_null(inode->gfid)) &&
158
(gf_uuid_compare(stbuf.ia_gfid, inode->gfid) != 0)) {
159
gf_msg_debug(frame->this->name, ESTALE, "gfid changed for %s",
160
local->validate_loc.path);
169
rsp.op_errno = op_errno;
178
if (rsp.op_ret == -1 && rsp.op_errno == ENOTCONN) {
179
if (local->quotad_conn_retry >= 12) {
180
priv->quotad_conn_status = 1;
181
gf_log(this->name, GF_LOG_WARNING,
183
"to quotad after retry count %d)",
184
local->quotad_conn_retry);
186
local->quotad_conn_retry++;
189
if (priv->quotad_conn_status == 0) {
193
gf_log(this->name, GF_LOG_DEBUG,
194
"retry connecting to "
195
"quotad (retry count %d)",
196
local->quotad_conn_retry);
198
retry_delay.tv_sec = 5;
199
retry_delay.tv_nsec = 0;
200
timer = gf_timer_call_after(this->ctx, retry_delay,
201
_quota_enforcer_lookup, (void *)frame);
203
gf_log(this->name, GF_LOG_WARNING,
205
"set quota_enforcer_lookup with timer");
211
priv->quotad_conn_status = 0;
214
if (rsp.op_ret == -1) {
216
if (rsp.op_errno != ENOENT)
218
this->name, GF_LOG_WARNING, rsp.op_errno, Q_MSG_LOOKUP_FAILED,
219
"Getting cluster-wide size of directory failed "
220
"(path: %s gfid:%s)",
221
local->validate_loc.path, loc_gfid_utoa(&local->validate_loc));
223
gf_msg_trace(this->name, ENOENT, "not found on remote node");
225
} else if (local->quotad_conn_retry) {
226
gf_log(this->name, GF_LOG_DEBUG,
227
"connected to quotad after "
229
local->quotad_conn_retry);
232
local->validate_cbk(frame, NULL, this, rsp.op_ret, rsp.op_errno, inode,
233
&stbuf, xdata, &postparent);
239
free(rsp.xdata.xdata_val);
245
_quota_enforcer_lookup(void *data)
247
quota_local_t *local = NULL;
248
gfs3_lookup_req req = {
254
int op_errno = ESTALE;
255
quota_priv_t *priv = NULL;
256
call_frame_t *frame = NULL;
258
xlator_t *this = NULL;
259
char *dir_path = NULL;
262
local = frame->local;
264
loc = &local->validate_loc;
266
priv = this->private;
268
if (!(loc && loc->inode))
271
if (!gf_uuid_is_null(loc->inode->gfid))
272
memcpy(req.gfid, loc->inode->gfid, 16);
274
memcpy(req.gfid, loc->gfid, 16);
276
if (local->validate_xdata) {
277
GF_PROTOCOL_DICT_SERIALIZE(this, local->validate_xdata,
278
(&req.xdata.xdata_val), req.xdata.xdata_len,
283
req.bname = (char *)loc->name;
288
dir_path = (char *)loc->path;
292
ret = quota_enforcer_submit_request(
293
&req, frame, priv->quota_enforcer, GF_AGGREGATOR_LOOKUP, this,
294
quota_enforcer_lookup_cbk, (xdrproc_t)xdr_gfs3_lookup_req);
297
gf_msg(this->name, GF_LOG_WARNING, 0, Q_MSG_RPC_SUBMIT_FAILED,
298
"Couldn't send the request to "
299
"fetch cluster wide size of directory (path:%s gfid:%s)",
303
GF_FREE(req.xdata.xdata_val);
308
local->validate_cbk(frame, NULL, this, -1, op_errno, NULL, NULL, NULL,
311
GF_FREE(req.xdata.xdata_val);
317
quota_enforcer_lookup(call_frame_t *frame, xlator_t *this, dict_t *xdata,
318
fop_lookup_cbk_t validate_cbk)
320
quota_local_t *local = NULL;
325
local = frame->local;
327
local->validate_cbk = validate_cbk;
328
local->validate_xdata = dict_ref(xdata);
330
_quota_enforcer_lookup(frame);
335
validate_cbk(frame, NULL, this, -1, ESTALE, NULL, NULL, NULL, NULL);
341
quota_enforcer_notify(struct rpc_clnt *rpc, void *mydata,
342
rpc_clnt_event_t event, void *data)
344
xlator_t *this = NULL;
346
quota_priv_t *priv = NULL;
349
priv = this->private;
351
case RPC_CLNT_CONNECT: {
352
pthread_mutex_lock(&priv->conn_mutex);
354
priv->conn_status = _gf_true;
356
pthread_mutex_unlock(&priv->conn_mutex);
357
gf_msg_trace(this->name, 0, "got RPC_CLNT_CONNECT");
361
case RPC_CLNT_DISCONNECT: {
362
pthread_mutex_lock(&priv->conn_mutex);
364
priv->conn_status = _gf_false;
365
pthread_cond_signal(&priv->conn_cond);
367
pthread_mutex_unlock(&priv->conn_mutex);
368
gf_msg_trace(this->name, 0, "got RPC_CLNT_DISCONNECT");
373
gf_msg_trace(this->name, 0, "got some other RPC event %d", event);
382
quota_enforcer_blocking_connect(rpc_clnt_t *rpc)
384
dict_t *options = NULL;
387
options = dict_new();
391
ret = dict_set_sizen_str_sizen(options, "non-blocking-io", "no");
395
rpc->conn.trans->reconfigure(rpc->conn.trans, options);
399
ret = dict_set_sizen_str_sizen(options, "non-blocking-io", "yes");
403
rpc->conn.trans->reconfigure(rpc->conn.trans, options);
416
quota_enforcer_init(xlator_t *this, dict_t *options)
418
struct rpc_clnt *rpc = NULL;
419
quota_priv_t *priv = NULL;
422
priv = this->private;
426
if (priv->rpc_clnt) {
428
rpc = priv->rpc_clnt;
436
priv->quota_enforcer = "a_enforcer_clnt;
438
ret = dict_set_sizen_str_sizen(options, "transport.address-family", "unix");
442
ret = dict_set_sizen_str_sizen(options, "transport-type", "socket");
446
ret = dict_set_sizen_str_sizen(options, "transport.socket.connect-path",
447
"/var/run/gluster/quotad.socket");
451
rpc = rpc_clnt_new(options, this, this->name, 16);
457
ret = rpc_clnt_register_notify(rpc, quota_enforcer_notify, this);
459
gf_msg("quota", GF_LOG_ERROR, 0, Q_MSG_RPCCLNT_REGISTER_NOTIFY_FAILED,
460
"failed to register notify");
464
ret = quota_enforcer_blocking_connect(rpc);
479
static struct rpc_clnt_procedure quota_enforcer_actors[GF_AGGREGATOR_MAXVALUE] = {
480
[GF_AGGREGATOR_NULL] = {"NULL", NULL},
481
[GF_AGGREGATOR_LOOKUP] = {"LOOKUP", NULL},
484
static struct rpc_clnt_program quota_enforcer_clnt = {
485
.progname = "Quota enforcer",
486
.prognum = GLUSTER_AGGREGATOR_PROGRAM,
487
.progver = GLUSTER_AGGREGATOR_VERSION,
488
.numproc = GF_AGGREGATOR_MAXVALUE,
489
.proctable = quota_enforcer_actors,