glusterfs
291 строка · 6.8 Кб
1/*
2Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
3This file is part of GlusterFS.
4
5This file is licensed to you under your choice of the GNU Lesser
6General Public License, version 3 or any later version (LGPLv3 or
7later), or the GNU General Public License, version 2 (GPLv2), in all
8cases as published by the Free Software Foundation.
9*/
10
11/**
12*
13* Basic token bucket implementation for rate limiting. As of now interfaces
14* to throttle disk read request, directory entry scan and hash calculation
15* are available. To throttle a particular request (operation), the call needs
16* to be wrapped in-between throttling APIs, for e.g.
17*
18* TBF_THROTTLE_BEGIN (...); <-- induces "delays" if required
19* {
20* call (...);
21* }
22* TBF_THROTTLE_END (...); <-- not used atm, maybe needed later
23*
24*/
25
26#include "glusterfs/mem-pool.h"27#include <glusterfs/common-utils.h>28#include "glusterfs/throttle-tbf.h"29
30typedef struct tbf_throttle {31char done;32
33pthread_mutex_t mutex;34pthread_cond_t cond;35
36unsigned long tokens;37
38struct list_head list;39} tbf_throttle_t;40
41static tbf_throttle_t *42tbf_init_throttle(unsigned long tokens_required)43{
44tbf_throttle_t *throttle = NULL;45
46throttle = GF_MALLOC(sizeof(*throttle), gf_common_mt_tbf_throttle_t);47if (!throttle)48return NULL;49
50throttle->done = 0;51throttle->tokens = tokens_required;52INIT_LIST_HEAD(&throttle->list);53
54(void)pthread_mutex_init(&throttle->mutex, NULL);55(void)pthread_cond_init(&throttle->cond, NULL);56
57return throttle;58}
59
60void
61_tbf_dispatch_queued(tbf_bucket_t *bucket)62{
63gf_boolean_t xcont = _gf_false;64tbf_throttle_t *tmp = NULL;65tbf_throttle_t *throttle = NULL;66
67list_for_each_entry_safe(throttle, tmp, &bucket->queued, list)68{69pthread_mutex_lock(&throttle->mutex);70{71if (bucket->tokens < throttle->tokens) {72xcont = _gf_true;73goto unblock;74}75
76/* this request can now be serviced */77throttle->done = 1;78list_del_init(&throttle->list);79
80bucket->tokens -= throttle->tokens;81pthread_cond_signal(&throttle->cond);82}83unblock:84pthread_mutex_unlock(&throttle->mutex);85if (xcont)86break;87}88}
89
90void *91tbf_tokengenerator(void *arg)92{
93unsigned long tokenrate = 0;94unsigned long maxtokens = 0;95unsigned long token_gen_interval = 0;96tbf_bucket_t *bucket = arg;97
98tokenrate = bucket->tokenrate;99maxtokens = bucket->maxtokens;100token_gen_interval = bucket->token_gen_interval;101
102while (1) {103gf_nanosleep(token_gen_interval * GF_US_IN_NS);104
105LOCK(&bucket->lock);106{107bucket->tokens += tokenrate;108if (bucket->tokens > maxtokens)109bucket->tokens = maxtokens;110
111if (!list_empty(&bucket->queued))112_tbf_dispatch_queued(bucket);113}114UNLOCK(&bucket->lock);115}116
117return NULL;118}
119
120/**
121* There is lazy synchronization between this routine (when invoked
122* under tbf_mod() context) and tbf_throttle(). *bucket is
123* updated _after_ all the required variables are initialized.
124*/
125static int32_t126tbf_init_bucket(tbf_t *tbf, tbf_opspec_t *spec)127{
128int ret = 0;129tbf_bucket_t *curr = NULL;130tbf_bucket_t **bucket = NULL;131
132GF_ASSERT(spec->op >= TBF_OP_MIN);133GF_ASSERT(spec->op <= TBF_OP_MAX);134
135/* no rate? no throttling. */136if (!spec->rate)137return 0;138
139bucket = tbf->bucket + spec->op;140
141curr = GF_CALLOC(1, sizeof(*curr), gf_common_mt_tbf_bucket_t);142if (!curr)143goto error_return;144
145LOCK_INIT(&curr->lock);146INIT_LIST_HEAD(&curr->queued);147
148curr->tokens = 0;149curr->tokenrate = spec->rate;150curr->maxtokens = spec->maxlimit;151curr->token_gen_interval = spec->token_gen_interval;152
153ret = gf_thread_create(&curr->tokener, NULL, tbf_tokengenerator, curr,154"tbfclock");155if (ret != 0)156goto freemem;157
158*bucket = curr;159return 0;160
161freemem:162LOCK_DESTROY(&curr->lock);163GF_FREE(curr);164error_return:165return -1;166}
167
168#define TBF_ALLOC_SIZE (sizeof(tbf_t) + (TBF_OP_MAX * sizeof(tbf_bucket_t)))169
170tbf_t *171tbf_init(tbf_opspec_t *tbfspec, unsigned int count)172{
173int32_t i = 0;174int32_t ret = 0;175tbf_t *tbf = NULL;176tbf_opspec_t *opspec = NULL;177
178tbf = GF_MALLOC(TBF_ALLOC_SIZE, gf_common_mt_tbf_t);179if (!tbf)180goto error_return;181
182tbf->bucket = (tbf_bucket_t **)((char *)tbf + sizeof(*tbf));183for (i = 0; i < TBF_OP_MAX; i++) {184*(tbf->bucket + i) = NULL;185}186
187for (i = 0; i < count; i++) {188opspec = tbfspec + i;189
190ret = tbf_init_bucket(tbf, opspec);191if (ret)192break;193}194
195if (ret)196goto error_return;197
198return tbf;199
200error_return:201return NULL;202}
203
204static void205tbf_mod_bucket(tbf_bucket_t *bucket, tbf_opspec_t *spec)206{
207LOCK(&bucket->lock);208{209bucket->tokens = 0;210bucket->tokenrate = spec->rate;211bucket->maxtokens = spec->maxlimit;212}213UNLOCK(&bucket->lock);214
215/* next token tick would unqueue pending operations */216}
217
218int
219tbf_mod(tbf_t *tbf, tbf_opspec_t *tbfspec)220{
221int ret = 0;222tbf_bucket_t *bucket = NULL;223tbf_ops_t op = TBF_OP_MIN;224
225if (!tbf || !tbfspec)226return -1;227
228op = tbfspec->op;229
230GF_ASSERT(op >= TBF_OP_MIN);231GF_ASSERT(op <= TBF_OP_MAX);232
233bucket = *(tbf->bucket + op);234if (bucket) {235tbf_mod_bucket(bucket, tbfspec);236} else {237ret = tbf_init_bucket(tbf, tbfspec);238}239
240return ret;241}
242
243void
244tbf_throttle(tbf_t *tbf, tbf_ops_t op, unsigned long tokens_requested)245{
246char waitq = 0;247tbf_bucket_t *bucket = NULL;248tbf_throttle_t *throttle = NULL;249
250GF_ASSERT(op >= TBF_OP_MIN);251GF_ASSERT(op <= TBF_OP_MAX);252
253bucket = *(tbf->bucket + op);254if (!bucket)255return;256
257LOCK(&bucket->lock);258{259/**260* if there are enough tokens in the bucket there is no need
261* to throttle the request: therefore, consume the required
262* number of tokens and continue.
263*/
264if (tokens_requested <= bucket->tokens) {265bucket->tokens -= tokens_requested;266} else {267throttle = tbf_init_throttle(tokens_requested);268if (!throttle) /* let it slip through for now.. */269goto unblock;270
271waitq = 1;272pthread_mutex_lock(&throttle->mutex);273list_add_tail(&throttle->list, &bucket->queued);274}275}276unblock:277UNLOCK(&bucket->lock);278
279if (waitq) {280while (!throttle->done) {281pthread_cond_wait(&throttle->cond, &throttle->mutex);282}283
284pthread_mutex_unlock(&throttle->mutex);285
286pthread_mutex_destroy(&throttle->mutex);287pthread_cond_destroy(&throttle->cond);288
289GF_FREE(throttle);290}291}
292