glusterfs
489 строк · 12.2 Кб
1/*
2Copyright (c) 2012 Red Hat, Inc. <http://www.redhat.com>
3This file is part of GlusterFS.
4
5This file is licensed to you under your choice of the GNU Lesser
6General Public License, version 3 or any later version (LGPLv3 or
7later), or the GNU General Public License, version 2 (GPLv2), in all
8cases as published by the Free Software Foundation.
9*/
10
11#include <sys/poll.h>
12#include <pthread.h>
13#include <unistd.h>
14#include <fcntl.h>
15#include <stdlib.h>
16#include <errno.h>
17#include <string.h>
18
19#include "glusterfs/logging.h"
20#include "glusterfs/gf-event.h"
21#include "glusterfs/mem-pool.h"
22#include "glusterfs/syscall.h"
23#include "glusterfs/libglusterfs-messages.h"
24
25struct event_slot_poll {
26int fd;
27int events;
28void *data;
29event_handler_t handler;
30};
31
32static int
33event_register_poll(struct event_pool *event_pool, int fd,
34event_handler_t handler, void *data, int poll_in,
35int poll_out, int notify_poller_death);
36
37static void
38__flush_fd(int fd, int idx, int gen, void *data, int poll_in, int poll_out,
39int poll_err, int event_thread_died)
40{
41char buf[64];
42int ret = -1;
43
44if (!poll_in)
45return;
46
47do {
48ret = sys_read(fd, buf, 64);
49if (ret == -1 && errno != EAGAIN) {
50gf_smsg("poll", GF_LOG_ERROR, errno, LG_MSG_READ_FILE_FAILED,
51"fd=%d", fd, NULL);
52}
53} while (ret == 64);
54
55return;
56}
57
58static int
59__event_getindex(struct event_pool *event_pool, int fd, int idx)
60{
61int ret = -1;
62int i = 0;
63
64GF_VALIDATE_OR_GOTO("event", event_pool, out);
65
66/* lookup in used space based on index provided */
67if (idx > -1 && idx < event_pool->used) {
68if (event_pool->reg[idx].fd == fd) {
69ret = idx;
70goto out;
71}
72}
73
74/* search in used space, if lookup fails */
75for (i = 0; i < event_pool->used; i++) {
76if (event_pool->reg[i].fd == fd) {
77ret = i;
78break;
79}
80}
81
82out:
83return ret;
84}
85
86static struct event_pool *
87event_pool_new_poll(int count, int eventthreadcount)
88{
89struct event_pool *event_pool = NULL;
90int ret = -1;
91
92event_pool = GF_CALLOC(1, sizeof(*event_pool), gf_common_mt_event_pool);
93
94if (!event_pool)
95return NULL;
96
97event_pool->count = count;
98event_pool->reg = GF_CALLOC(event_pool->count, sizeof(*event_pool->reg),
99gf_common_mt_reg);
100
101if (!event_pool->reg) {
102GF_FREE(event_pool);
103return NULL;
104}
105
106pthread_mutex_init(&event_pool->mutex, NULL);
107
108/* Both ends are opened non-blocking. */
109ret = gf_pipe(event_pool->breaker, O_NONBLOCK);
110
111if (ret == -1) {
112gf_smsg("poll", GF_LOG_ERROR, errno, LG_MSG_PIPE_CREATE_FAILED, NULL);
113event_pool->breaker[0] = event_pool->breaker[1] = -1;
114GF_FREE(event_pool->reg);
115GF_FREE(event_pool);
116return NULL;
117}
118
119ret = event_register_poll(event_pool, event_pool->breaker[0], __flush_fd,
120NULL, 1, 0, 0);
121if (ret == -1) {
122gf_smsg("poll", GF_LOG_ERROR, 0, LG_MSG_REGISTER_PIPE_FAILED, NULL);
123sys_close(event_pool->breaker[0]);
124sys_close(event_pool->breaker[1]);
125event_pool->breaker[0] = event_pool->breaker[1] = -1;
126
127GF_FREE(event_pool->reg);
128GF_FREE(event_pool);
129return NULL;
130}
131
132if (eventthreadcount > 1) {
133gf_smsg("poll", GF_LOG_INFO, 0, LG_MSG_POLL_IGNORE_MULTIPLE_THREADS,
134"count=%d", eventthreadcount, NULL);
135}
136
137/* although, eventhreadcount for poll implementation is always
138* going to be 1, eventthreadcount needs to be set to 1 so that
139* rpcsvc_request_handler() thread scaling works flawlessly in
140* both epoll and poll models
141*/
142event_pool->eventthreadcount = 1;
143
144return event_pool;
145}
146
147static int
148event_register_poll(struct event_pool *event_pool, int fd,
149event_handler_t handler, void *data, int poll_in,
150int poll_out, int notify_poller_death)
151{
152int idx = -1;
153
154GF_VALIDATE_OR_GOTO("event", event_pool, out);
155
156pthread_mutex_lock(&event_pool->mutex);
157{
158if (event_pool->count == event_pool->used) {
159event_pool->count += 256;
160
161event_pool->reg = GF_REALLOC(
162event_pool->reg, event_pool->count * sizeof(*event_pool->reg));
163if (!event_pool->reg)
164goto unlock;
165}
166
167idx = event_pool->used++;
168
169event_pool->reg[idx].fd = fd;
170event_pool->reg[idx].events = POLLPRI;
171event_pool->reg[idx].handler = handler;
172event_pool->reg[idx].data = data;
173
174switch (poll_in) {
175case 1:
176event_pool->reg[idx].events |= POLLIN;
177break;
178case 0:
179event_pool->reg[idx].events &= ~POLLIN;
180break;
181case -1:
182/* do nothing */
183break;
184default:
185gf_smsg("poll", GF_LOG_ERROR, 0, LG_MSG_INVALID_POLL_IN,
186"value=%d", poll_in, NULL);
187break;
188}
189
190switch (poll_out) {
191case 1:
192event_pool->reg[idx].events |= POLLOUT;
193break;
194case 0:
195event_pool->reg[idx].events &= ~POLLOUT;
196break;
197case -1:
198/* do nothing */
199break;
200default:
201gf_smsg("poll", GF_LOG_ERROR, 0, LG_MSG_INVALID_POLL_OUT,
202"value=%d", poll_out, NULL);
203break;
204}
205
206event_pool->changed = 1;
207}
208unlock:
209pthread_mutex_unlock(&event_pool->mutex);
210
211out:
212return idx;
213}
214
215static int
216event_unregister_poll(struct event_pool *event_pool, int fd, int idx_hint)
217{
218int idx = -1;
219
220GF_VALIDATE_OR_GOTO("event", event_pool, out);
221
222pthread_mutex_lock(&event_pool->mutex);
223{
224idx = __event_getindex(event_pool, fd, idx_hint);
225
226if (idx == -1) {
227gf_smsg("poll", GF_LOG_ERROR, 0, LG_MSG_INDEX_NOT_FOUND, "fd=%d",
228fd, "idx_hint=%d", idx_hint, NULL);
229errno = ENOENT;
230goto unlock;
231}
232
233event_pool->reg[idx] = event_pool->reg[--event_pool->used];
234event_pool->changed = 1;
235}
236unlock:
237pthread_mutex_unlock(&event_pool->mutex);
238
239out:
240return idx;
241}
242
243static int
244event_unregister_close_poll(struct event_pool *event_pool, int fd, int idx_hint)
245{
246int ret = -1;
247
248ret = event_unregister_poll(event_pool, fd, idx_hint);
249
250sys_close(fd);
251
252return ret;
253}
254
255static int
256event_select_on_poll(struct event_pool *event_pool, int fd, int idx_hint,
257int poll_in, int poll_out)
258{
259int idx = -1;
260
261GF_VALIDATE_OR_GOTO("event", event_pool, out);
262
263pthread_mutex_lock(&event_pool->mutex);
264{
265idx = __event_getindex(event_pool, fd, idx_hint);
266
267if (idx == -1) {
268gf_smsg("poll", GF_LOG_ERROR, 0, LG_MSG_INDEX_NOT_FOUND, "fd=%d",
269fd, "idx_hint=%d", idx_hint, NULL);
270errno = ENOENT;
271goto unlock;
272}
273
274switch (poll_in) {
275case 1:
276event_pool->reg[idx].events |= POLLIN;
277break;
278case 0:
279event_pool->reg[idx].events &= ~POLLIN;
280break;
281case -1:
282/* do nothing */
283break;
284default:
285/* TODO: log error */
286break;
287}
288
289switch (poll_out) {
290case 1:
291event_pool->reg[idx].events |= POLLOUT;
292break;
293case 0:
294event_pool->reg[idx].events &= ~POLLOUT;
295break;
296case -1:
297/* do nothing */
298break;
299default:
300/* TODO: log error */
301break;
302}
303
304if (poll_in + poll_out > -2)
305event_pool->changed = 1;
306}
307unlock:
308pthread_mutex_unlock(&event_pool->mutex);
309
310out:
311return idx;
312}
313
314static int
315event_dispatch_poll_handler(struct event_pool *event_pool, struct pollfd *ufds,
316int i)
317{
318event_handler_t handler = NULL;
319void *data = NULL;
320int idx = -1;
321int ret = 0;
322
323handler = NULL;
324data = NULL;
325
326pthread_mutex_lock(&event_pool->mutex);
327{
328idx = __event_getindex(event_pool, ufds[i].fd, i);
329
330if (idx == -1) {
331gf_smsg("poll", GF_LOG_ERROR, 0, LG_MSG_INDEX_NOT_FOUND, "fd=%d",
332ufds[i].fd, "idx_hint=%d", i, NULL);
333goto unlock;
334}
335
336handler = event_pool->reg[idx].handler;
337data = event_pool->reg[idx].data;
338}
339unlock:
340pthread_mutex_unlock(&event_pool->mutex);
341
342if (handler)
343handler(ufds[i].fd, idx, 0, data,
344(ufds[i].revents & (POLLIN | POLLPRI)),
345(ufds[i].revents & (POLLOUT)),
346(ufds[i].revents & (POLLERR | POLLHUP | POLLNVAL)), 0);
347
348return ret;
349}
350
351static int
352event_dispatch_poll_resize(struct event_pool *event_pool, struct pollfd *ufds,
353int size)
354{
355int i = 0;
356
357pthread_mutex_lock(&event_pool->mutex);
358{
359if (event_pool->changed == 0) {
360goto unlock;
361}
362
363if (event_pool->used > event_pool->evcache_size) {
364GF_FREE(event_pool->evcache);
365
366event_pool->evcache = ufds = NULL;
367
368event_pool->evcache_size = event_pool->used;
369
370ufds = GF_CALLOC(sizeof(struct pollfd), event_pool->evcache_size,
371gf_common_mt_pollfd);
372if (!ufds)
373goto unlock;
374event_pool->evcache = ufds;
375}
376
377if (ufds == NULL) {
378goto unlock;
379}
380
381for (i = 0; i < event_pool->used; i++) {
382ufds[i].fd = event_pool->reg[i].fd;
383ufds[i].events = event_pool->reg[i].events;
384ufds[i].revents = 0;
385}
386
387size = i;
388}
389unlock:
390pthread_mutex_unlock(&event_pool->mutex);
391
392return size;
393}
394
395static int
396event_dispatch_poll(struct event_pool *event_pool)
397{
398struct pollfd *ufds = NULL;
399int size = 0;
400int i = 0;
401int ret = -1;
402
403GF_VALIDATE_OR_GOTO("event", event_pool, out);
404
405pthread_mutex_lock(&event_pool->mutex);
406{
407event_pool->activethreadcount = 1;
408}
409pthread_mutex_unlock(&event_pool->mutex);
410
411while (1) {
412pthread_mutex_lock(&event_pool->mutex);
413{
414if (event_pool->destroy == 1) {
415event_pool->activethreadcount = 0;
416pthread_cond_broadcast(&event_pool->cond);
417pthread_mutex_unlock(&event_pool->mutex);
418return 0;
419}
420}
421pthread_mutex_unlock(&event_pool->mutex);
422
423size = event_dispatch_poll_resize(event_pool, ufds, size);
424ufds = event_pool->evcache;
425
426ret = poll(ufds, size, 1);
427
428if (ret == 0)
429/* timeout */
430continue;
431
432if (ret == -1 && errno == EINTR)
433/* sys call */
434continue;
435
436for (i = 0; i < size; i++) {
437if (!ufds[i].revents)
438continue;
439
440event_dispatch_poll_handler(event_pool, ufds, i);
441}
442}
443
444out:
445return -1;
446}
447
448int
449event_reconfigure_threads_poll(struct event_pool *event_pool, int value)
450{
451/* No-op for poll */
452
453return 0;
454}
455
456/* This function is the destructor for the event_pool data structure
457* Should be called only after poller_threads_destroy() is called,
458* else will lead to crashes.
459*/
460static int
461event_pool_destroy_poll(struct event_pool *event_pool)
462{
463int ret = 0;
464
465ret = sys_close(event_pool->breaker[0]);
466if (ret)
467return ret;
468
469ret = sys_close(event_pool->breaker[1]);
470if (ret)
471return ret;
472
473event_pool->breaker[0] = event_pool->breaker[1] = -1;
474
475GF_FREE(event_pool->reg);
476GF_FREE(event_pool);
477
478return ret;
479}
480
481struct event_ops event_ops_poll = {
482.new = event_pool_new_poll,
483.event_register = event_register_poll,
484.event_select_on = event_select_on_poll,
485.event_unregister = event_unregister_poll,
486.event_unregister_close = event_unregister_close_poll,
487.event_dispatch = event_dispatch_poll,
488.event_reconfigure_threads = event_reconfigure_threads_poll,
489.event_pool_destroy = event_pool_destroy_poll};
490