glusterfs
290 строк · 7.3 Кб
1/*
2Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
3This file is part of GlusterFS.
4
5This file is licensed to you under your choice of the GNU Lesser
6General Public License, version 3 or any later version (LGPLv3 or
7later), or the GNU General Public License, version 2 (GPLv2), in all
8cases as published by the Free Software Foundation.
9*/
10
11#include <sys/poll.h>12#include <pthread.h>13#include <unistd.h>14#include <fcntl.h>15#include <stdlib.h>16#include <errno.h>17#include <string.h>18
19#include "glusterfs/gf-event.h"20#include "glusterfs/timespec.h"21#include "glusterfs/libglusterfs-messages.h"22#include "glusterfs/syscall.h"23
24struct event_pool *25gf_event_pool_new(int count, int eventthreadcount)26{
27struct event_pool *event_pool = NULL;28extern struct event_ops event_ops_poll;29
30#ifdef HAVE_SYS_EPOLL_H31extern struct event_ops event_ops_epoll;32
33event_pool = event_ops_epoll.new(count, eventthreadcount);34
35if (event_pool) {36event_pool->ops = &event_ops_epoll;37} else {38gf_msg("event", GF_LOG_WARNING, 0, LG_MSG_FALLBACK_TO_POLL,39"falling back to poll based event handling");40}41#endif42
43if (!event_pool) {44event_pool = event_ops_poll.new(count, eventthreadcount);45
46if (event_pool)47event_pool->ops = &event_ops_poll;48}49
50return event_pool;51}
52
53int
54gf_event_register(struct event_pool *event_pool, int fd,55event_handler_t handler, void *data, int poll_in,56int poll_out, int notify_poller_death)57{
58int ret = -1;59
60GF_VALIDATE_OR_GOTO("event", event_pool, out);61
62ret = event_pool->ops->event_register(63event_pool, fd, handler, data, poll_in, poll_out, notify_poller_death);64out:65return ret;66}
67
68int
69gf_event_unregister(struct event_pool *event_pool, int fd, int idx)70{
71int ret = -1;72
73GF_VALIDATE_OR_GOTO("event", event_pool, out);74
75ret = event_pool->ops->event_unregister(event_pool, fd, idx);76
77out:78return ret;79}
80
81int
82gf_event_unregister_close(struct event_pool *event_pool, int fd, int idx)83{
84int ret = -1;85
86GF_VALIDATE_OR_GOTO("event", event_pool, out);87
88ret = event_pool->ops->event_unregister_close(event_pool, fd, idx);89
90out:91return ret;92}
93
94int
95gf_event_select_on(struct event_pool *event_pool, int fd, int idx_hint,96int poll_in, int poll_out)97{
98int ret = -1;99
100GF_VALIDATE_OR_GOTO("event", event_pool, out);101
102ret = event_pool->ops->event_select_on(event_pool, fd, idx_hint, poll_in,103poll_out);104out:105return ret;106}
107
108int
109gf_event_dispatch(struct event_pool *event_pool)110{
111int ret = -1;112
113GF_VALIDATE_OR_GOTO("event", event_pool, out);114
115ret = event_pool->ops->event_dispatch(event_pool);116if (ret)117goto out;118
119out:120return ret;121}
122
123int
124gf_event_reconfigure_threads(struct event_pool *event_pool, int value)125{
126int ret = -1;127
128GF_VALIDATE_OR_GOTO("event", event_pool, out);129
130/* call event refresh function */131ret = event_pool->ops->event_reconfigure_threads(event_pool, value);132
133out:134return ret;135}
136
137int
138gf_event_pool_destroy(struct event_pool *event_pool)139{
140int ret = -1;141int destroy = 0, activethreadcount = 0;142
143GF_VALIDATE_OR_GOTO("event", event_pool, out);144
145pthread_mutex_lock(&event_pool->mutex);146{147destroy = event_pool->destroy;148activethreadcount = event_pool->activethreadcount;149}150pthread_mutex_unlock(&event_pool->mutex);151
152if (!destroy || (activethreadcount > 0)) {153goto out;154}155
156ret = event_pool->ops->event_pool_destroy(event_pool);157out:158return ret;159}
160
161static void162poller_destroy_handler(int fd, int idx, int gen, void *data, int poll_out,163int poll_in, int poll_err, int event_thread_exit)164{
165struct event_destroy_data *destroy = NULL;166int readfd = -1;167char buf = '\0';168
169destroy = data;170readfd = destroy->readfd;171if (readfd < 0) {172goto out;173}174
175while (sys_read(readfd, &buf, 1) > 0) {176}177
178out:179gf_event_handled(destroy->pool, fd, idx, gen);180
181return;182}
183
184/* This function destroys all the poller threads.
185* Note: to be called before gf_event_pool_destroy is called.
186* The order in which cleaning is performed:
187* - Register a pipe fd(this is for waking threads in poll()/epoll_wait())
188* - Set the destroy mode, which this no new event registration will succeed
189* - Reconfigure the thread count to 0(this will succeed only in destroy mode)
190* - Wake up all the threads in poll() or epoll_wait(), so that they can
191* destroy themselves.
192* - Wait for the thread to join(which will happen only after all the other
193* threads are destroyed)
194*/
195int
196gf_event_dispatch_destroy(struct event_pool *event_pool)197{
198int ret = -1, threadcount = 0;199int fd[2] = {-1};200int idx = -1;201struct timespec sleep_till = {2020,203};204struct event_destroy_data data = {2050,206};207
208GF_VALIDATE_OR_GOTO("event", event_pool, out);209
210/* Both ends are opened non-blocking. */211ret = gf_pipe(fd, O_NONBLOCK);212if (ret < 0)213goto out;214
215data.pool = event_pool;216data.readfd = fd[1];217
218/* From the main thread register an event on the pipe fd[0],219*/
220idx = gf_event_register(event_pool, fd[0], poller_destroy_handler, &data, 1,2210, 0);222if (idx < 0)223goto out;224
225/* Enter the destroy mode first, set this before reconfiguring to 0226* threads, to prevent further reconfigure to thread count > 0.
227*/
228pthread_mutex_lock(&event_pool->mutex);229{230threadcount = event_pool->eventthreadcount;231event_pool->destroy = 1;232}233pthread_mutex_unlock(&event_pool->mutex);234
235ret = gf_event_reconfigure_threads(event_pool, 0);236if (ret < 0)237goto out;238
239/* Write something onto the write end of the pipe(fd[1]) so that240* poll wakes up and calls the handler, poller_destroy_handler()
241*/
242pthread_mutex_lock(&event_pool->mutex);243{244/* Write to pipe(fd[1]) and then wait for 1 second or until245* a poller thread that is dying, broadcasts. Make sure we
246* do not loop forever by limiting to 10 retries
247*/
248int retry = 0;249
250while (event_pool->activethreadcount > 0 &&251(retry++ < (threadcount + 10))) {252if (sys_write(fd[1], "dummy", 6) == -1) {253break;254}255timespec_now_realtime(&sleep_till);256sleep_till.tv_sec += 1;257ret = pthread_cond_timedwait(&event_pool->cond, &event_pool->mutex,258&sleep_till);259if (ret) {260gf_msg_debug("event", 0,261"thread cond-timedwait failed "262"active-thread-count: %d, "263"retry: %d",264event_pool->activethreadcount, retry);265}266}267}268pthread_mutex_unlock(&event_pool->mutex);269
270ret = gf_event_unregister(event_pool, fd[0], idx);271
272out:273if (fd[0] != -1)274sys_close(fd[0]);275if (fd[1] != -1)276sys_close(fd[1]);277
278return ret;279}
280
281int
282gf_event_handled(struct event_pool *event_pool, int fd, int idx, int gen)283{
284int ret = 0;285
286if (event_pool->ops->event_handled)287ret = event_pool->ops->event_handled(event_pool, fd, idx, gen);288
289return ret;290}
291