glusterfs

Форк
0
290 строк · 7.3 Кб
1
/*
2
  Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
3
  This file is part of GlusterFS.
4

5
  This file is licensed to you under your choice of the GNU Lesser
6
  General Public License, version 3 or any later version (LGPLv3 or
7
  later), or the GNU General Public License, version 2 (GPLv2), in all
8
  cases as published by the Free Software Foundation.
9
*/
10

11
#include <sys/poll.h>
12
#include <pthread.h>
13
#include <unistd.h>
14
#include <fcntl.h>
15
#include <stdlib.h>
16
#include <errno.h>
17
#include <string.h>
18

19
#include "glusterfs/gf-event.h"
20
#include "glusterfs/timespec.h"
21
#include "glusterfs/libglusterfs-messages.h"
22
#include "glusterfs/syscall.h"
23

24
struct event_pool *
25
gf_event_pool_new(int count, int eventthreadcount)
26
{
27
    struct event_pool *event_pool = NULL;
28
    extern struct event_ops event_ops_poll;
29

30
#ifdef HAVE_SYS_EPOLL_H
31
    extern struct event_ops event_ops_epoll;
32

33
    event_pool = event_ops_epoll.new(count, eventthreadcount);
34

35
    if (event_pool) {
36
        event_pool->ops = &event_ops_epoll;
37
    } else {
38
        gf_msg("event", GF_LOG_WARNING, 0, LG_MSG_FALLBACK_TO_POLL,
39
               "falling back to poll based event handling");
40
    }
41
#endif
42

43
    if (!event_pool) {
44
        event_pool = event_ops_poll.new(count, eventthreadcount);
45

46
        if (event_pool)
47
            event_pool->ops = &event_ops_poll;
48
    }
49

50
    return event_pool;
51
}
52

53
int
54
gf_event_register(struct event_pool *event_pool, int fd,
55
                  event_handler_t handler, void *data, int poll_in,
56
                  int poll_out, int notify_poller_death)
57
{
58
    int ret = -1;
59

60
    GF_VALIDATE_OR_GOTO("event", event_pool, out);
61

62
    ret = event_pool->ops->event_register(
63
        event_pool, fd, handler, data, poll_in, poll_out, notify_poller_death);
64
out:
65
    return ret;
66
}
67

68
int
69
gf_event_unregister(struct event_pool *event_pool, int fd, int idx)
70
{
71
    int ret = -1;
72

73
    GF_VALIDATE_OR_GOTO("event", event_pool, out);
74

75
    ret = event_pool->ops->event_unregister(event_pool, fd, idx);
76

77
out:
78
    return ret;
79
}
80

81
int
82
gf_event_unregister_close(struct event_pool *event_pool, int fd, int idx)
83
{
84
    int ret = -1;
85

86
    GF_VALIDATE_OR_GOTO("event", event_pool, out);
87

88
    ret = event_pool->ops->event_unregister_close(event_pool, fd, idx);
89

90
out:
91
    return ret;
92
}
93

94
int
95
gf_event_select_on(struct event_pool *event_pool, int fd, int idx_hint,
96
                   int poll_in, int poll_out)
97
{
98
    int ret = -1;
99

100
    GF_VALIDATE_OR_GOTO("event", event_pool, out);
101

102
    ret = event_pool->ops->event_select_on(event_pool, fd, idx_hint, poll_in,
103
                                           poll_out);
104
out:
105
    return ret;
106
}
107

108
int
109
gf_event_dispatch(struct event_pool *event_pool)
110
{
111
    int ret = -1;
112

113
    GF_VALIDATE_OR_GOTO("event", event_pool, out);
114

115
    ret = event_pool->ops->event_dispatch(event_pool);
116
    if (ret)
117
        goto out;
118

119
out:
120
    return ret;
121
}
122

123
int
124
gf_event_reconfigure_threads(struct event_pool *event_pool, int value)
125
{
126
    int ret = -1;
127

128
    GF_VALIDATE_OR_GOTO("event", event_pool, out);
129

130
    /* call event refresh function */
131
    ret = event_pool->ops->event_reconfigure_threads(event_pool, value);
132

133
out:
134
    return ret;
135
}
136

137
int
138
gf_event_pool_destroy(struct event_pool *event_pool)
139
{
140
    int ret = -1;
141
    int destroy = 0, activethreadcount = 0;
142

143
    GF_VALIDATE_OR_GOTO("event", event_pool, out);
144

145
    pthread_mutex_lock(&event_pool->mutex);
146
    {
147
        destroy = event_pool->destroy;
148
        activethreadcount = event_pool->activethreadcount;
149
    }
150
    pthread_mutex_unlock(&event_pool->mutex);
151

152
    if (!destroy || (activethreadcount > 0)) {
153
        goto out;
154
    }
155

156
    ret = event_pool->ops->event_pool_destroy(event_pool);
157
out:
158
    return ret;
159
}
160

161
static void
162
poller_destroy_handler(int fd, int idx, int gen, void *data, int poll_out,
163
                       int poll_in, int poll_err, int event_thread_exit)
164
{
165
    struct event_destroy_data *destroy = NULL;
166
    int readfd = -1;
167
    char buf = '\0';
168

169
    destroy = data;
170
    readfd = destroy->readfd;
171
    if (readfd < 0) {
172
        goto out;
173
    }
174

175
    while (sys_read(readfd, &buf, 1) > 0) {
176
    }
177

178
out:
179
    gf_event_handled(destroy->pool, fd, idx, gen);
180

181
    return;
182
}
183

184
/* This function destroys all the poller threads.
185
 * Note: to be called before gf_event_pool_destroy is called.
186
 * The order in which cleaning is performed:
187
 * - Register a pipe fd(this is for waking threads in poll()/epoll_wait())
188
 * - Set the destroy mode, which this no new event registration will succeed
189
 * - Reconfigure the thread count to 0(this will succeed only in destroy mode)
190
 * - Wake up all the threads in poll() or epoll_wait(), so that they can
191
 *   destroy themselves.
192
 * - Wait for the thread to join(which will happen only after all the other
193
 *   threads are destroyed)
194
 */
195
int
196
gf_event_dispatch_destroy(struct event_pool *event_pool)
197
{
198
    int ret = -1, threadcount = 0;
199
    int fd[2] = {-1};
200
    int idx = -1;
201
    struct timespec sleep_till = {
202
        0,
203
    };
204
    struct event_destroy_data data = {
205
        0,
206
    };
207

208
    GF_VALIDATE_OR_GOTO("event", event_pool, out);
209

210
    /* Both ends are opened non-blocking. */
211
    ret = gf_pipe(fd, O_NONBLOCK);
212
    if (ret < 0)
213
        goto out;
214

215
    data.pool = event_pool;
216
    data.readfd = fd[1];
217

218
    /* From the main thread register an event on the pipe fd[0],
219
     */
220
    idx = gf_event_register(event_pool, fd[0], poller_destroy_handler, &data, 1,
221
                            0, 0);
222
    if (idx < 0)
223
        goto out;
224

225
    /* Enter the destroy mode first, set this before reconfiguring to 0
226
     * threads, to prevent further reconfigure to thread count > 0.
227
     */
228
    pthread_mutex_lock(&event_pool->mutex);
229
    {
230
        threadcount = event_pool->eventthreadcount;
231
        event_pool->destroy = 1;
232
    }
233
    pthread_mutex_unlock(&event_pool->mutex);
234

235
    ret = gf_event_reconfigure_threads(event_pool, 0);
236
    if (ret < 0)
237
        goto out;
238

239
    /* Write something onto the write end of the pipe(fd[1]) so that
240
     * poll wakes up and calls the handler, poller_destroy_handler()
241
     */
242
    pthread_mutex_lock(&event_pool->mutex);
243
    {
244
        /* Write to pipe(fd[1]) and then wait for 1 second or until
245
         * a poller thread that is dying, broadcasts. Make sure we
246
         * do not loop forever by limiting to 10 retries
247
         */
248
        int retry = 0;
249

250
        while (event_pool->activethreadcount > 0 &&
251
               (retry++ < (threadcount + 10))) {
252
            if (sys_write(fd[1], "dummy", 6) == -1) {
253
                break;
254
            }
255
            timespec_now_realtime(&sleep_till);
256
            sleep_till.tv_sec += 1;
257
            ret = pthread_cond_timedwait(&event_pool->cond, &event_pool->mutex,
258
                                         &sleep_till);
259
            if (ret) {
260
                gf_msg_debug("event", 0,
261
                             "thread cond-timedwait failed "
262
                             "active-thread-count: %d, "
263
                             "retry: %d",
264
                             event_pool->activethreadcount, retry);
265
            }
266
        }
267
    }
268
    pthread_mutex_unlock(&event_pool->mutex);
269

270
    ret = gf_event_unregister(event_pool, fd[0], idx);
271

272
out:
273
    if (fd[0] != -1)
274
        sys_close(fd[0]);
275
    if (fd[1] != -1)
276
        sys_close(fd[1]);
277

278
    return ret;
279
}
280

281
int
282
gf_event_handled(struct event_pool *event_pool, int fd, int idx, int gen)
283
{
284
    int ret = 0;
285

286
    if (event_pool->ops->event_handled)
287
        ret = event_pool->ops->event_handled(event_pool, fd, idx, gen);
288

289
    return ret;
290
}
291

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.