glusterfs

Форк
0
/
rot-buffs.c 
490 строк · 10.8 Кб
1
/*
2
  Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com>
3
  This file is part of GlusterFS.
4

5
  This file is licensed to you under your choice of the GNU Lesser
6
  General Public License, version 3 or any later version (LGPLv3 or
7
  later), or the GNU General Public License, version 2 (GPLv2), in all
8
  cases as published by the Free Software Foundation.
9
*/
10

11
#include <math.h>
12

13
#include "glusterfs/mem-types.h"
14
#include "glusterfs/common-utils.h"
15

16
#include "glusterfs/rot-buffs.h"
17

18
/**
19
 * Producer-Consumer based on top of rotational buffers.
20
 *
21
 * This favours writers (producer) and keeps the critical section
22
 * light weight. Buffer switch happens when a consumer wants to
23
 * consume data. This is the slow path and waits for pending
24
 * writes to finish.
25
 *
26
 * TODO: do away with opaques (use arrays with indexing).
27
 */
28

29
#define ROT_BUFF_DEFAULT_COUNT 2
30
#define ROT_BUFF_ALLOC_SIZE (1 * 1024 * 1024) /* 1MB per iovec */
31

32
#define RLIST_IOV_MELDED_ALLOC_SIZE (RBUF_IOVEC_SIZE + ROT_BUFF_ALLOC_SIZE)
33

34
/**
35
 * iovec list is not shrunk (deallocated) if usage/total count
36
 * falls in this range. this is the fast path and should satisfy
37
 * most of the workloads. for the rest shrinking iovec list is
38
 * generous.
39
 */
40
#define RVEC_LOW_WATERMARK_COUNT 1
41
#define RVEC_HIGH_WATERMARK_COUNT (1 << 4)
42

43
static inline rbuf_list_t *
44
rbuf_current_buffer(rbuf_t *rbuf)
45
{
46
    return rbuf->current;
47
}
48

49
static void
50
rlist_mark_waiting(rbuf_list_t *rlist)
51
{
52
    LOCK(&rlist->c_lock);
53
    {
54
        rlist->awaiting = _gf_true;
55
    }
56
    UNLOCK(&rlist->c_lock);
57
}
58

59
static int
60
__rlist_has_waiter(rbuf_list_t *rlist)
61
{
62
    return (rlist->awaiting == _gf_true);
63
}
64

65
static void *
66
rbuf_alloc_rvec(void)
67
{
68
    return GF_CALLOC(1, RLIST_IOV_MELDED_ALLOC_SIZE, gf_common_mt_rvec_t);
69
}
70

71
static void
72
rlist_reset_vector_usage(rbuf_list_t *rlist)
73
{
74
    rlist->used = 1;
75
}
76

77
static void
78
rlist_increment_vector_usage(rbuf_list_t *rlist)
79
{
80
    rlist->used++;
81
}
82

83
static void
84
rlist_increment_total_usage(rbuf_list_t *rlist)
85
{
86
    rlist->total++;
87
}
88

89
static int
90
rvec_in_watermark_range(rbuf_list_t *rlist)
91
{
92
    return ((rlist->total >= RVEC_LOW_WATERMARK_COUNT) &&
93
            (rlist->total <= RVEC_HIGH_WATERMARK_COUNT));
94
}
95

96
static void
97
rbuf_reset_rvec(rbuf_iovec_t *rvec)
98
{
99
    GF_VALIDATE_OR_GOTO("libglusterfs", rvec, err);
100
    /* iov_base is _never_ modified */
101
    rvec->iov.iov_len = 0;
102
err:
103
    return;
104
}
105

106
/* TODO: alloc multiple rbuf_iovec_t */
107
static int
108
rlist_add_new_vec(rbuf_list_t *rlist)
109
{
110
    rbuf_iovec_t *rvec = NULL;
111

112
    rvec = (rbuf_iovec_t *)rbuf_alloc_rvec();
113
    if (!rvec)
114
        return -1;
115
    INIT_LIST_HEAD(&rvec->list);
116
    rvec->iov.iov_base = ((char *)rvec) + RBUF_IOVEC_SIZE;
117
    rvec->iov.iov_len = 0;
118

119
    list_add_tail(&rvec->list, &rlist->veclist);
120

121
    rlist->rvec = rvec; /* cache the latest */
122

123
    rlist_increment_vector_usage(rlist);
124
    rlist_increment_total_usage(rlist);
125

126
    return 0;
127
}
128

129
static void
130
rlist_free_rvec(rbuf_iovec_t *rvec)
131
{
132
    if (!rvec)
133
        return;
134
    list_del(&rvec->list);
135
    GF_FREE(rvec);
136
}
137

138
static void
139
rlist_purge_all_rvec(rbuf_list_t *rlist)
140
{
141
    rbuf_iovec_t *rvec = NULL;
142

143
    if (!rlist)
144
        return;
145
    while (!list_empty(&rlist->veclist)) {
146
        rvec = list_first_entry(&rlist->veclist, rbuf_iovec_t, list);
147
        rlist_free_rvec(rvec);
148
    }
149
}
150

151
static void
152
rlist_shrink_rvec(rbuf_list_t *rlist, unsigned long long shrink)
153
{
154
    rbuf_iovec_t *rvec = NULL;
155

156
    while (!list_empty(&rlist->veclist) && (shrink-- > 0)) {
157
        rvec = list_first_entry(&rlist->veclist, rbuf_iovec_t, list);
158
        rlist_free_rvec(rvec);
159
    }
160
}
161

162
static void
163
rbuf_purge_rlist(rbuf_t *rbuf)
164
{
165
    rbuf_list_t *rlist = NULL;
166

167
    while (!list_empty(&rbuf->freelist)) {
168
        rlist = list_first_entry(&rbuf->freelist, rbuf_list_t, list);
169
        list_del(&rlist->list);
170

171
        rlist_purge_all_rvec(rlist);
172

173
        LOCK_DESTROY(&rlist->c_lock);
174

175
        (void)pthread_mutex_destroy(&rlist->b_lock);
176
        (void)pthread_cond_destroy(&rlist->b_cond);
177

178
        GF_FREE(rlist);
179
    }
180
}
181

182
rbuf_t *
183
rbuf_init(int bufcount)
184
{
185
    int j = 0;
186
    int ret = 0;
187
    rbuf_t *rbuf = NULL;
188
    rbuf_list_t *rlist = NULL;
189

190
    if (bufcount <= 0)
191
        bufcount = ROT_BUFF_DEFAULT_COUNT;
192

193
    rbuf = GF_CALLOC(1, sizeof(rbuf_t), gf_common_mt_rbuf_t);
194
    if (!rbuf)
195
        goto error_return;
196

197
    LOCK_INIT(&rbuf->lock);
198
    INIT_LIST_HEAD(&rbuf->freelist);
199

200
    /* it could have been one big calloc() but this is just once.. */
201
    for (j = 0; j < bufcount; j++) {
202
        rlist = GF_CALLOC(1, sizeof(rbuf_list_t), gf_common_mt_rlist_t);
203
        if (!rlist) {
204
            ret = -1;
205
            break;
206
        }
207

208
        INIT_LIST_HEAD(&rlist->list);
209
        INIT_LIST_HEAD(&rlist->veclist);
210

211
        rlist->pending = rlist->completed = 0;
212

213
        ret = rlist_add_new_vec(rlist);
214
        if (ret)
215
            break;
216

217
        LOCK_INIT(&rlist->c_lock);
218

219
        rlist->awaiting = _gf_false;
220
        ret = pthread_mutex_init(&rlist->b_lock, 0);
221
        if (ret != 0) {
222
            GF_FREE(rlist);
223
            break;
224
        }
225

226
        ret = pthread_cond_init(&rlist->b_cond, 0);
227
        if (ret != 0) {
228
            GF_FREE(rlist);
229
            break;
230
        }
231

232
        list_add_tail(&rlist->list, &rbuf->freelist);
233
    }
234

235
    if (ret != 0)
236
        goto dealloc_rlist;
237

238
    /* cache currently used buffer: first in the list */
239
    rbuf->current = list_first_entry(&rbuf->freelist, rbuf_list_t, list);
240
    return rbuf;
241

242
dealloc_rlist:
243
    rbuf_purge_rlist(rbuf);
244
    LOCK_DESTROY(&rbuf->lock);
245
    GF_FREE(rbuf);
246
error_return:
247
    return NULL;
248
}
249

250
void
251
rbuf_dtor(rbuf_t *rbuf)
252
{
253
    if (!rbuf)
254
        return;
255
    rbuf->current = NULL;
256
    rbuf_purge_rlist(rbuf);
257
    LOCK_DESTROY(&rbuf->lock);
258

259
    GF_FREE(rbuf);
260
}
261

262
static char *
263
rbuf_adjust_write_area(struct iovec *iov, size_t bytes)
264
{
265
    char *wbuf = NULL;
266

267
    wbuf = iov->iov_base + iov->iov_len;
268
    iov->iov_len += bytes;
269
    return wbuf;
270
}
271

272
static char *
273
rbuf_alloc_write_area(rbuf_list_t *rlist, size_t bytes)
274
{
275
    int ret = 0;
276
    struct iovec *iov = NULL;
277

278
    /* check for available space in _current_ IO buffer */
279
    iov = &rlist->rvec->iov;
280
    if (iov->iov_len + bytes <= ROT_BUFF_ALLOC_SIZE)
281
        return rbuf_adjust_write_area(iov, bytes); /* fast path */
282

283
    /* not enough bytes, try next available buffers */
284
    if (list_is_last(&rlist->rvec->list, &rlist->veclist)) {
285
        /* OH! consumed all vector buffers */
286
        GF_ASSERT(rlist->used == rlist->total);
287
        ret = rlist_add_new_vec(rlist);
288
        if (ret)
289
            goto error_return;
290
    } else {
291
        /* not the end, have available rbuf_iovec's */
292
        rlist->rvec = list_next_entry(rlist->rvec, list);
293
        rlist->used++;
294
        rbuf_reset_rvec(rlist->rvec);
295
    }
296

297
    iov = &rlist->rvec->iov;
298
    return rbuf_adjust_write_area(iov, bytes);
299

300
error_return:
301
    return NULL;
302
}
303

304
char *
305
rbuf_reserve_write_area(rbuf_t *rbuf, size_t bytes, void **opaque)
306
{
307
    char *wbuf = NULL;
308
    rbuf_list_t *rlist = NULL;
309

310
    if (!rbuf || (bytes <= 0) || (bytes > ROT_BUFF_ALLOC_SIZE) || !opaque)
311
        return NULL;
312

313
    LOCK(&rbuf->lock);
314
    {
315
        rlist = rbuf_current_buffer(rbuf);
316
        wbuf = rbuf_alloc_write_area(rlist, bytes);
317
        if (!wbuf)
318
            goto unblock;
319
        rlist->pending++;
320
    }
321
unblock:
322
    UNLOCK(&rbuf->lock);
323

324
    if (wbuf)
325
        *opaque = rlist;
326
    return wbuf;
327
}
328

329
static void
330
rbuf_notify_waiter(rbuf_list_t *rlist)
331
{
332
    pthread_mutex_lock(&rlist->b_lock);
333
    {
334
        pthread_cond_signal(&rlist->b_cond);
335
    }
336
    pthread_mutex_unlock(&rlist->b_lock);
337
}
338

339
int
340
rbuf_write_complete(void *opaque)
341
{
342
    rbuf_list_t *rlist = NULL;
343
    gf_boolean_t notify = _gf_false;
344

345
    if (!opaque)
346
        return -1;
347

348
    rlist = opaque;
349

350
    LOCK(&rlist->c_lock);
351
    {
352
        rlist->completed++;
353
        /**
354
         * it's safe to test ->pending without rbuf->lock *only* if
355
         * there's a waiter as there can be no new incoming writes.
356
         */
357
        if (__rlist_has_waiter(rlist) && (rlist->completed == rlist->pending))
358
            notify = _gf_true;
359
    }
360
    UNLOCK(&rlist->c_lock);
361

362
    if (notify)
363
        rbuf_notify_waiter(rlist);
364

365
    return 0;
366
}
367

368
int
369
rbuf_get_buffer(rbuf_t *rbuf, void **opaque, sequence_fn *seqfn, void *mydata)
370
{
371
    int retval = RBUF_CONSUMABLE;
372
    rbuf_list_t *rlist = NULL;
373

374
    if (!rbuf || !opaque)
375
        return -1;
376

377
    LOCK(&rbuf->lock);
378
    {
379
        rlist = rbuf_current_buffer(rbuf);
380
        if (!rlist->pending) {
381
            retval = RBUF_EMPTY;
382
            goto unblock;
383
        }
384

385
        if (list_is_singular(&rbuf->freelist)) {
386
            /**
387
             * removal would lead to writer starvation, disallow
388
             * switching.
389
             */
390
            retval = RBUF_WOULD_STARVE;
391
            goto unblock;
392
        }
393

394
        list_del_init(&rlist->list);
395
        if (seqfn)
396
            seqfn(rlist, mydata);
397
        rbuf->current = list_first_entry(&rbuf->freelist, rbuf_list_t, list);
398
    }
399
unblock:
400
    UNLOCK(&rbuf->lock);
401

402
    if (retval == RBUF_CONSUMABLE)
403
        *opaque = rlist; /* caller _owns_ the buffer */
404

405
    return retval;
406
}
407

408
/**
409
 * Wait for completion of pending writers and invoke dispatcher
410
 * routine (for buffer consumption).
411
 */
412

413
static void
414
__rbuf_wait_for_writers(rbuf_list_t *rlist)
415
{
416
    while (rlist->completed != rlist->pending)
417
        pthread_cond_wait(&rlist->b_cond, &rlist->b_lock);
418
}
419

420
#ifndef M_E
421
#define M_E 2.7
422
#endif
423

424
static void
425
rlist_shrink_vector(rbuf_list_t *rlist)
426
{
427
    unsigned long long shrink = 0;
428

429
    /**
430
     * fast path: don't bother to deallocate if vectors are hardly
431
     * used.
432
     */
433
    if (rvec_in_watermark_range(rlist))
434
        return;
435

436
    /**
437
     * Calculate the shrink count based on total allocated vectors.
438
     * Note that the calculation sticks to rlist->total irrespective
439
     * of the actual usage count (rlist->used). Later, ->used could
440
     * be used to apply slack to the calculation based on how much
441
     * it lags from ->total. For now, let's stick to slow decay.
442
     */
443
    shrink = rlist->total - (rlist->total * pow(M_E, -0.2));
444

445
    rlist_shrink_rvec(rlist, shrink);
446
    rlist->total -= shrink;
447
}
448

449
int
450
rbuf_wait_for_completion(rbuf_t *rbuf, void *opaque,
451
                         void (*fn)(rbuf_list_t *, void *), void *arg)
452
{
453
    rbuf_list_t *rlist = NULL;
454

455
    if (!rbuf || !opaque)
456
        return -1;
457

458
    rlist = opaque;
459

460
    pthread_mutex_lock(&rlist->b_lock);
461
    {
462
        rlist_mark_waiting(rlist);
463
        __rbuf_wait_for_writers(rlist);
464
    }
465
    pthread_mutex_unlock(&rlist->b_lock);
466

467
    /**
468
     * from here on, no need of locking until the rlist is put
469
     * back into rotation.
470
     */
471

472
    fn(rlist, arg); /* invoke dispatcher */
473

474
    rlist->awaiting = _gf_false;
475
    rlist->pending = rlist->completed = 0;
476

477
    rlist_shrink_vector(rlist);
478
    rlist_reset_vector_usage(rlist);
479

480
    rlist->rvec = list_first_entry(&rlist->veclist, rbuf_iovec_t, list);
481
    rbuf_reset_rvec(rlist->rvec);
482

483
    LOCK(&rbuf->lock);
484
    {
485
        list_add_tail(&rlist->list, &rbuf->freelist);
486
    }
487
    UNLOCK(&rbuf->lock);
488

489
    return 0;
490
}
491

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.