glusterfs

Форк
0
/
throttle-tbf.c 
291 строка · 6.8 Кб
1
/*
2
   Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
3
   This file is part of GlusterFS.
4

5
   This file is licensed to you under your choice of the GNU Lesser
6
   General Public License, version 3 or any later version (LGPLv3 or
7
   later), or the GNU General Public License, version 2 (GPLv2), in all
8
   cases as published by the Free Software Foundation.
9
*/
10

11
/**
12
 *
13
 * Basic token bucket implementation for rate limiting. As of now interfaces
14
 * to throttle disk read request, directory entry scan and hash calculation
15
 * are available. To throttle a particular request (operation), the call needs
16
 * to be wrapped in-between throttling APIs, for e.g.
17
 *
18
 *  TBF_THROTTLE_BEGIN (...);  <-- induces "delays" if required
19
 *  {
20
 *      call (...);
21
 *  }
22
 *  TBF_THROTTLE_END (...);  <-- not used atm, maybe needed later
23
 *
24
 */
25

26
#include "glusterfs/mem-pool.h"
27
#include <glusterfs/common-utils.h>
28
#include "glusterfs/throttle-tbf.h"
29

30
typedef struct tbf_throttle {
31
    char done;
32

33
    pthread_mutex_t mutex;
34
    pthread_cond_t cond;
35

36
    unsigned long tokens;
37

38
    struct list_head list;
39
} tbf_throttle_t;
40

41
static tbf_throttle_t *
42
tbf_init_throttle(unsigned long tokens_required)
43
{
44
    tbf_throttle_t *throttle = NULL;
45

46
    throttle = GF_MALLOC(sizeof(*throttle), gf_common_mt_tbf_throttle_t);
47
    if (!throttle)
48
        return NULL;
49

50
    throttle->done = 0;
51
    throttle->tokens = tokens_required;
52
    INIT_LIST_HEAD(&throttle->list);
53

54
    (void)pthread_mutex_init(&throttle->mutex, NULL);
55
    (void)pthread_cond_init(&throttle->cond, NULL);
56

57
    return throttle;
58
}
59

60
void
61
_tbf_dispatch_queued(tbf_bucket_t *bucket)
62
{
63
    gf_boolean_t xcont = _gf_false;
64
    tbf_throttle_t *tmp = NULL;
65
    tbf_throttle_t *throttle = NULL;
66

67
    list_for_each_entry_safe(throttle, tmp, &bucket->queued, list)
68
    {
69
        pthread_mutex_lock(&throttle->mutex);
70
        {
71
            if (bucket->tokens < throttle->tokens) {
72
                xcont = _gf_true;
73
                goto unblock;
74
            }
75

76
            /* this request can now be serviced */
77
            throttle->done = 1;
78
            list_del_init(&throttle->list);
79

80
            bucket->tokens -= throttle->tokens;
81
            pthread_cond_signal(&throttle->cond);
82
        }
83
    unblock:
84
        pthread_mutex_unlock(&throttle->mutex);
85
        if (xcont)
86
            break;
87
    }
88
}
89

90
void *
91
tbf_tokengenerator(void *arg)
92
{
93
    unsigned long tokenrate = 0;
94
    unsigned long maxtokens = 0;
95
    unsigned long token_gen_interval = 0;
96
    tbf_bucket_t *bucket = arg;
97

98
    tokenrate = bucket->tokenrate;
99
    maxtokens = bucket->maxtokens;
100
    token_gen_interval = bucket->token_gen_interval;
101

102
    while (1) {
103
        gf_nanosleep(token_gen_interval * GF_US_IN_NS);
104

105
        LOCK(&bucket->lock);
106
        {
107
            bucket->tokens += tokenrate;
108
            if (bucket->tokens > maxtokens)
109
                bucket->tokens = maxtokens;
110

111
            if (!list_empty(&bucket->queued))
112
                _tbf_dispatch_queued(bucket);
113
        }
114
        UNLOCK(&bucket->lock);
115
    }
116

117
    return NULL;
118
}
119

120
/**
121
 * There is lazy synchronization between this routine (when invoked
122
 * under tbf_mod() context) and tbf_throttle(). *bucket is
123
 * updated _after_ all the required variables are initialized.
124
 */
125
static int32_t
126
tbf_init_bucket(tbf_t *tbf, tbf_opspec_t *spec)
127
{
128
    int ret = 0;
129
    tbf_bucket_t *curr = NULL;
130
    tbf_bucket_t **bucket = NULL;
131

132
    GF_ASSERT(spec->op >= TBF_OP_MIN);
133
    GF_ASSERT(spec->op <= TBF_OP_MAX);
134

135
    /* no rate? no throttling. */
136
    if (!spec->rate)
137
        return 0;
138

139
    bucket = tbf->bucket + spec->op;
140

141
    curr = GF_CALLOC(1, sizeof(*curr), gf_common_mt_tbf_bucket_t);
142
    if (!curr)
143
        goto error_return;
144

145
    LOCK_INIT(&curr->lock);
146
    INIT_LIST_HEAD(&curr->queued);
147

148
    curr->tokens = 0;
149
    curr->tokenrate = spec->rate;
150
    curr->maxtokens = spec->maxlimit;
151
    curr->token_gen_interval = spec->token_gen_interval;
152

153
    ret = gf_thread_create(&curr->tokener, NULL, tbf_tokengenerator, curr,
154
                           "tbfclock");
155
    if (ret != 0)
156
        goto freemem;
157

158
    *bucket = curr;
159
    return 0;
160

161
freemem:
162
    LOCK_DESTROY(&curr->lock);
163
    GF_FREE(curr);
164
error_return:
165
    return -1;
166
}
167

168
#define TBF_ALLOC_SIZE (sizeof(tbf_t) + (TBF_OP_MAX * sizeof(tbf_bucket_t)))
169

170
tbf_t *
171
tbf_init(tbf_opspec_t *tbfspec, unsigned int count)
172
{
173
    int32_t i = 0;
174
    int32_t ret = 0;
175
    tbf_t *tbf = NULL;
176
    tbf_opspec_t *opspec = NULL;
177

178
    tbf = GF_MALLOC(TBF_ALLOC_SIZE, gf_common_mt_tbf_t);
179
    if (!tbf)
180
        goto error_return;
181

182
    tbf->bucket = (tbf_bucket_t **)((char *)tbf + sizeof(*tbf));
183
    for (i = 0; i < TBF_OP_MAX; i++) {
184
        *(tbf->bucket + i) = NULL;
185
    }
186

187
    for (i = 0; i < count; i++) {
188
        opspec = tbfspec + i;
189

190
        ret = tbf_init_bucket(tbf, opspec);
191
        if (ret)
192
            break;
193
    }
194

195
    if (ret)
196
        goto error_return;
197

198
    return tbf;
199

200
error_return:
201
    return NULL;
202
}
203

204
static void
205
tbf_mod_bucket(tbf_bucket_t *bucket, tbf_opspec_t *spec)
206
{
207
    LOCK(&bucket->lock);
208
    {
209
        bucket->tokens = 0;
210
        bucket->tokenrate = spec->rate;
211
        bucket->maxtokens = spec->maxlimit;
212
    }
213
    UNLOCK(&bucket->lock);
214

215
    /* next token tick would unqueue pending operations */
216
}
217

218
int
219
tbf_mod(tbf_t *tbf, tbf_opspec_t *tbfspec)
220
{
221
    int ret = 0;
222
    tbf_bucket_t *bucket = NULL;
223
    tbf_ops_t op = TBF_OP_MIN;
224

225
    if (!tbf || !tbfspec)
226
        return -1;
227

228
    op = tbfspec->op;
229

230
    GF_ASSERT(op >= TBF_OP_MIN);
231
    GF_ASSERT(op <= TBF_OP_MAX);
232

233
    bucket = *(tbf->bucket + op);
234
    if (bucket) {
235
        tbf_mod_bucket(bucket, tbfspec);
236
    } else {
237
        ret = tbf_init_bucket(tbf, tbfspec);
238
    }
239

240
    return ret;
241
}
242

243
void
244
tbf_throttle(tbf_t *tbf, tbf_ops_t op, unsigned long tokens_requested)
245
{
246
    char waitq = 0;
247
    tbf_bucket_t *bucket = NULL;
248
    tbf_throttle_t *throttle = NULL;
249

250
    GF_ASSERT(op >= TBF_OP_MIN);
251
    GF_ASSERT(op <= TBF_OP_MAX);
252

253
    bucket = *(tbf->bucket + op);
254
    if (!bucket)
255
        return;
256

257
    LOCK(&bucket->lock);
258
    {
259
        /**
260
         * if there are enough tokens in the bucket there is no need
261
         * to throttle the request: therefore, consume the required
262
         * number of tokens and continue.
263
         */
264
        if (tokens_requested <= bucket->tokens) {
265
            bucket->tokens -= tokens_requested;
266
        } else {
267
            throttle = tbf_init_throttle(tokens_requested);
268
            if (!throttle) /* let it slip through for now.. */
269
                goto unblock;
270

271
            waitq = 1;
272
            pthread_mutex_lock(&throttle->mutex);
273
            list_add_tail(&throttle->list, &bucket->queued);
274
        }
275
    }
276
unblock:
277
    UNLOCK(&bucket->lock);
278

279
    if (waitq) {
280
        while (!throttle->done) {
281
            pthread_cond_wait(&throttle->cond, &throttle->mutex);
282
        }
283

284
        pthread_mutex_unlock(&throttle->mutex);
285

286
        pthread_mutex_destroy(&throttle->mutex);
287
        pthread_cond_destroy(&throttle->cond);
288

289
        GF_FREE(throttle);
290
    }
291
}
292

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.