glusterfs
490 строк · 10.8 Кб
1/*
2Copyright (c) 2008-2015 Red Hat, Inc. <http://www.redhat.com>
3This file is part of GlusterFS.
4
5This file is licensed to you under your choice of the GNU Lesser
6General Public License, version 3 or any later version (LGPLv3 or
7later), or the GNU General Public License, version 2 (GPLv2), in all
8cases as published by the Free Software Foundation.
9*/
10
11#include <math.h>12
13#include "glusterfs/mem-types.h"14#include "glusterfs/common-utils.h"15
16#include "glusterfs/rot-buffs.h"17
18/**
19* Producer-Consumer based on top of rotational buffers.
20*
21* This favours writers (producer) and keeps the critical section
22* light weight. Buffer switch happens when a consumer wants to
23* consume data. This is the slow path and waits for pending
24* writes to finish.
25*
26* TODO: do away with opaques (use arrays with indexing).
27*/
28
29#define ROT_BUFF_DEFAULT_COUNT 230#define ROT_BUFF_ALLOC_SIZE (1 * 1024 * 1024) /* 1MB per iovec */31
32#define RLIST_IOV_MELDED_ALLOC_SIZE (RBUF_IOVEC_SIZE + ROT_BUFF_ALLOC_SIZE)33
34/**
35* iovec list is not shrunk (deallocated) if usage/total count
36* falls in this range. this is the fast path and should satisfy
37* most of the workloads. for the rest shrinking iovec list is
38* generous.
39*/
40#define RVEC_LOW_WATERMARK_COUNT 141#define RVEC_HIGH_WATERMARK_COUNT (1 << 4)42
43static inline rbuf_list_t *44rbuf_current_buffer(rbuf_t *rbuf)45{
46return rbuf->current;47}
48
49static void50rlist_mark_waiting(rbuf_list_t *rlist)51{
52LOCK(&rlist->c_lock);53{54rlist->awaiting = _gf_true;55}56UNLOCK(&rlist->c_lock);57}
58
59static int60__rlist_has_waiter(rbuf_list_t *rlist)61{
62return (rlist->awaiting == _gf_true);63}
64
65static void *66rbuf_alloc_rvec(void)67{
68return GF_CALLOC(1, RLIST_IOV_MELDED_ALLOC_SIZE, gf_common_mt_rvec_t);69}
70
71static void72rlist_reset_vector_usage(rbuf_list_t *rlist)73{
74rlist->used = 1;75}
76
77static void78rlist_increment_vector_usage(rbuf_list_t *rlist)79{
80rlist->used++;81}
82
83static void84rlist_increment_total_usage(rbuf_list_t *rlist)85{
86rlist->total++;87}
88
89static int90rvec_in_watermark_range(rbuf_list_t *rlist)91{
92return ((rlist->total >= RVEC_LOW_WATERMARK_COUNT) &&93(rlist->total <= RVEC_HIGH_WATERMARK_COUNT));94}
95
96static void97rbuf_reset_rvec(rbuf_iovec_t *rvec)98{
99GF_VALIDATE_OR_GOTO("libglusterfs", rvec, err);100/* iov_base is _never_ modified */101rvec->iov.iov_len = 0;102err:103return;104}
105
106/* TODO: alloc multiple rbuf_iovec_t */
107static int108rlist_add_new_vec(rbuf_list_t *rlist)109{
110rbuf_iovec_t *rvec = NULL;111
112rvec = (rbuf_iovec_t *)rbuf_alloc_rvec();113if (!rvec)114return -1;115INIT_LIST_HEAD(&rvec->list);116rvec->iov.iov_base = ((char *)rvec) + RBUF_IOVEC_SIZE;117rvec->iov.iov_len = 0;118
119list_add_tail(&rvec->list, &rlist->veclist);120
121rlist->rvec = rvec; /* cache the latest */122
123rlist_increment_vector_usage(rlist);124rlist_increment_total_usage(rlist);125
126return 0;127}
128
129static void130rlist_free_rvec(rbuf_iovec_t *rvec)131{
132if (!rvec)133return;134list_del(&rvec->list);135GF_FREE(rvec);136}
137
138static void139rlist_purge_all_rvec(rbuf_list_t *rlist)140{
141rbuf_iovec_t *rvec = NULL;142
143if (!rlist)144return;145while (!list_empty(&rlist->veclist)) {146rvec = list_first_entry(&rlist->veclist, rbuf_iovec_t, list);147rlist_free_rvec(rvec);148}149}
150
151static void152rlist_shrink_rvec(rbuf_list_t *rlist, unsigned long long shrink)153{
154rbuf_iovec_t *rvec = NULL;155
156while (!list_empty(&rlist->veclist) && (shrink-- > 0)) {157rvec = list_first_entry(&rlist->veclist, rbuf_iovec_t, list);158rlist_free_rvec(rvec);159}160}
161
162static void163rbuf_purge_rlist(rbuf_t *rbuf)164{
165rbuf_list_t *rlist = NULL;166
167while (!list_empty(&rbuf->freelist)) {168rlist = list_first_entry(&rbuf->freelist, rbuf_list_t, list);169list_del(&rlist->list);170
171rlist_purge_all_rvec(rlist);172
173LOCK_DESTROY(&rlist->c_lock);174
175(void)pthread_mutex_destroy(&rlist->b_lock);176(void)pthread_cond_destroy(&rlist->b_cond);177
178GF_FREE(rlist);179}180}
181
182rbuf_t *183rbuf_init(int bufcount)184{
185int j = 0;186int ret = 0;187rbuf_t *rbuf = NULL;188rbuf_list_t *rlist = NULL;189
190if (bufcount <= 0)191bufcount = ROT_BUFF_DEFAULT_COUNT;192
193rbuf = GF_CALLOC(1, sizeof(rbuf_t), gf_common_mt_rbuf_t);194if (!rbuf)195goto error_return;196
197LOCK_INIT(&rbuf->lock);198INIT_LIST_HEAD(&rbuf->freelist);199
200/* it could have been one big calloc() but this is just once.. */201for (j = 0; j < bufcount; j++) {202rlist = GF_CALLOC(1, sizeof(rbuf_list_t), gf_common_mt_rlist_t);203if (!rlist) {204ret = -1;205break;206}207
208INIT_LIST_HEAD(&rlist->list);209INIT_LIST_HEAD(&rlist->veclist);210
211rlist->pending = rlist->completed = 0;212
213ret = rlist_add_new_vec(rlist);214if (ret)215break;216
217LOCK_INIT(&rlist->c_lock);218
219rlist->awaiting = _gf_false;220ret = pthread_mutex_init(&rlist->b_lock, 0);221if (ret != 0) {222GF_FREE(rlist);223break;224}225
226ret = pthread_cond_init(&rlist->b_cond, 0);227if (ret != 0) {228GF_FREE(rlist);229break;230}231
232list_add_tail(&rlist->list, &rbuf->freelist);233}234
235if (ret != 0)236goto dealloc_rlist;237
238/* cache currently used buffer: first in the list */239rbuf->current = list_first_entry(&rbuf->freelist, rbuf_list_t, list);240return rbuf;241
242dealloc_rlist:243rbuf_purge_rlist(rbuf);244LOCK_DESTROY(&rbuf->lock);245GF_FREE(rbuf);246error_return:247return NULL;248}
249
250void
251rbuf_dtor(rbuf_t *rbuf)252{
253if (!rbuf)254return;255rbuf->current = NULL;256rbuf_purge_rlist(rbuf);257LOCK_DESTROY(&rbuf->lock);258
259GF_FREE(rbuf);260}
261
262static char *263rbuf_adjust_write_area(struct iovec *iov, size_t bytes)264{
265char *wbuf = NULL;266
267wbuf = iov->iov_base + iov->iov_len;268iov->iov_len += bytes;269return wbuf;270}
271
272static char *273rbuf_alloc_write_area(rbuf_list_t *rlist, size_t bytes)274{
275int ret = 0;276struct iovec *iov = NULL;277
278/* check for available space in _current_ IO buffer */279iov = &rlist->rvec->iov;280if (iov->iov_len + bytes <= ROT_BUFF_ALLOC_SIZE)281return rbuf_adjust_write_area(iov, bytes); /* fast path */282
283/* not enough bytes, try next available buffers */284if (list_is_last(&rlist->rvec->list, &rlist->veclist)) {285/* OH! consumed all vector buffers */286GF_ASSERT(rlist->used == rlist->total);287ret = rlist_add_new_vec(rlist);288if (ret)289goto error_return;290} else {291/* not the end, have available rbuf_iovec's */292rlist->rvec = list_next_entry(rlist->rvec, list);293rlist->used++;294rbuf_reset_rvec(rlist->rvec);295}296
297iov = &rlist->rvec->iov;298return rbuf_adjust_write_area(iov, bytes);299
300error_return:301return NULL;302}
303
304char *305rbuf_reserve_write_area(rbuf_t *rbuf, size_t bytes, void **opaque)306{
307char *wbuf = NULL;308rbuf_list_t *rlist = NULL;309
310if (!rbuf || (bytes <= 0) || (bytes > ROT_BUFF_ALLOC_SIZE) || !opaque)311return NULL;312
313LOCK(&rbuf->lock);314{315rlist = rbuf_current_buffer(rbuf);316wbuf = rbuf_alloc_write_area(rlist, bytes);317if (!wbuf)318goto unblock;319rlist->pending++;320}321unblock:322UNLOCK(&rbuf->lock);323
324if (wbuf)325*opaque = rlist;326return wbuf;327}
328
329static void330rbuf_notify_waiter(rbuf_list_t *rlist)331{
332pthread_mutex_lock(&rlist->b_lock);333{334pthread_cond_signal(&rlist->b_cond);335}336pthread_mutex_unlock(&rlist->b_lock);337}
338
339int
340rbuf_write_complete(void *opaque)341{
342rbuf_list_t *rlist = NULL;343gf_boolean_t notify = _gf_false;344
345if (!opaque)346return -1;347
348rlist = opaque;349
350LOCK(&rlist->c_lock);351{352rlist->completed++;353/**354* it's safe to test ->pending without rbuf->lock *only* if
355* there's a waiter as there can be no new incoming writes.
356*/
357if (__rlist_has_waiter(rlist) && (rlist->completed == rlist->pending))358notify = _gf_true;359}360UNLOCK(&rlist->c_lock);361
362if (notify)363rbuf_notify_waiter(rlist);364
365return 0;366}
367
368int
369rbuf_get_buffer(rbuf_t *rbuf, void **opaque, sequence_fn *seqfn, void *mydata)370{
371int retval = RBUF_CONSUMABLE;372rbuf_list_t *rlist = NULL;373
374if (!rbuf || !opaque)375return -1;376
377LOCK(&rbuf->lock);378{379rlist = rbuf_current_buffer(rbuf);380if (!rlist->pending) {381retval = RBUF_EMPTY;382goto unblock;383}384
385if (list_is_singular(&rbuf->freelist)) {386/**387* removal would lead to writer starvation, disallow
388* switching.
389*/
390retval = RBUF_WOULD_STARVE;391goto unblock;392}393
394list_del_init(&rlist->list);395if (seqfn)396seqfn(rlist, mydata);397rbuf->current = list_first_entry(&rbuf->freelist, rbuf_list_t, list);398}399unblock:400UNLOCK(&rbuf->lock);401
402if (retval == RBUF_CONSUMABLE)403*opaque = rlist; /* caller _owns_ the buffer */404
405return retval;406}
407
408/**
409* Wait for completion of pending writers and invoke dispatcher
410* routine (for buffer consumption).
411*/
412
413static void414__rbuf_wait_for_writers(rbuf_list_t *rlist)415{
416while (rlist->completed != rlist->pending)417pthread_cond_wait(&rlist->b_cond, &rlist->b_lock);418}
419
420#ifndef M_E421#define M_E 2.7422#endif423
424static void425rlist_shrink_vector(rbuf_list_t *rlist)426{
427unsigned long long shrink = 0;428
429/**430* fast path: don't bother to deallocate if vectors are hardly
431* used.
432*/
433if (rvec_in_watermark_range(rlist))434return;435
436/**437* Calculate the shrink count based on total allocated vectors.
438* Note that the calculation sticks to rlist->total irrespective
439* of the actual usage count (rlist->used). Later, ->used could
440* be used to apply slack to the calculation based on how much
441* it lags from ->total. For now, let's stick to slow decay.
442*/
443shrink = rlist->total - (rlist->total * pow(M_E, -0.2));444
445rlist_shrink_rvec(rlist, shrink);446rlist->total -= shrink;447}
448
449int
450rbuf_wait_for_completion(rbuf_t *rbuf, void *opaque,451void (*fn)(rbuf_list_t *, void *), void *arg)452{
453rbuf_list_t *rlist = NULL;454
455if (!rbuf || !opaque)456return -1;457
458rlist = opaque;459
460pthread_mutex_lock(&rlist->b_lock);461{462rlist_mark_waiting(rlist);463__rbuf_wait_for_writers(rlist);464}465pthread_mutex_unlock(&rlist->b_lock);466
467/**468* from here on, no need of locking until the rlist is put
469* back into rotation.
470*/
471
472fn(rlist, arg); /* invoke dispatcher */473
474rlist->awaiting = _gf_false;475rlist->pending = rlist->completed = 0;476
477rlist_shrink_vector(rlist);478rlist_reset_vector_usage(rlist);479
480rlist->rvec = list_first_entry(&rlist->veclist, rbuf_iovec_t, list);481rbuf_reset_rvec(rlist->rvec);482
483LOCK(&rbuf->lock);484{485list_add_tail(&rlist->list, &rbuf->freelist);486}487UNLOCK(&rbuf->lock);488
489return 0;490}
491