glusterfs

Форк
0
/
event-poll.c 
489 строк · 12.2 Кб
1
/*
2
  Copyright (c) 2012 Red Hat, Inc. <http://www.redhat.com>
3
  This file is part of GlusterFS.
4

5
  This file is licensed to you under your choice of the GNU Lesser
6
  General Public License, version 3 or any later version (LGPLv3 or
7
  later), or the GNU General Public License, version 2 (GPLv2), in all
8
  cases as published by the Free Software Foundation.
9
*/
10

11
#include <sys/poll.h>
12
#include <pthread.h>
13
#include <unistd.h>
14
#include <fcntl.h>
15
#include <stdlib.h>
16
#include <errno.h>
17
#include <string.h>
18

19
#include "glusterfs/logging.h"
20
#include "glusterfs/gf-event.h"
21
#include "glusterfs/mem-pool.h"
22
#include "glusterfs/syscall.h"
23
#include "glusterfs/libglusterfs-messages.h"
24

25
struct event_slot_poll {
26
    int fd;
27
    int events;
28
    void *data;
29
    event_handler_t handler;
30
};
31

32
static int
33
event_register_poll(struct event_pool *event_pool, int fd,
34
                    event_handler_t handler, void *data, int poll_in,
35
                    int poll_out, int notify_poller_death);
36

37
static void
38
__flush_fd(int fd, int idx, int gen, void *data, int poll_in, int poll_out,
39
           int poll_err, int event_thread_died)
40
{
41
    char buf[64];
42
    int ret = -1;
43

44
    if (!poll_in)
45
        return;
46

47
    do {
48
        ret = sys_read(fd, buf, 64);
49
        if (ret == -1 && errno != EAGAIN) {
50
            gf_smsg("poll", GF_LOG_ERROR, errno, LG_MSG_READ_FILE_FAILED,
51
                    "fd=%d", fd, NULL);
52
        }
53
    } while (ret == 64);
54

55
    return;
56
}
57

58
static int
59
__event_getindex(struct event_pool *event_pool, int fd, int idx)
60
{
61
    int ret = -1;
62
    int i = 0;
63

64
    GF_VALIDATE_OR_GOTO("event", event_pool, out);
65

66
    /* lookup in used space based on index provided */
67
    if (idx > -1 && idx < event_pool->used) {
68
        if (event_pool->reg[idx].fd == fd) {
69
            ret = idx;
70
            goto out;
71
        }
72
    }
73

74
    /* search in used space, if lookup fails */
75
    for (i = 0; i < event_pool->used; i++) {
76
        if (event_pool->reg[i].fd == fd) {
77
            ret = i;
78
            break;
79
        }
80
    }
81

82
out:
83
    return ret;
84
}
85

86
static struct event_pool *
87
event_pool_new_poll(int count, int eventthreadcount)
88
{
89
    struct event_pool *event_pool = NULL;
90
    int ret = -1;
91

92
    event_pool = GF_CALLOC(1, sizeof(*event_pool), gf_common_mt_event_pool);
93

94
    if (!event_pool)
95
        return NULL;
96

97
    event_pool->count = count;
98
    event_pool->reg = GF_CALLOC(event_pool->count, sizeof(*event_pool->reg),
99
                                gf_common_mt_reg);
100

101
    if (!event_pool->reg) {
102
        GF_FREE(event_pool);
103
        return NULL;
104
    }
105

106
    pthread_mutex_init(&event_pool->mutex, NULL);
107

108
    /* Both ends are opened non-blocking. */
109
    ret = gf_pipe(event_pool->breaker, O_NONBLOCK);
110

111
    if (ret == -1) {
112
        gf_smsg("poll", GF_LOG_ERROR, errno, LG_MSG_PIPE_CREATE_FAILED, NULL);
113
        event_pool->breaker[0] = event_pool->breaker[1] = -1;
114
        GF_FREE(event_pool->reg);
115
        GF_FREE(event_pool);
116
        return NULL;
117
    }
118

119
    ret = event_register_poll(event_pool, event_pool->breaker[0], __flush_fd,
120
                              NULL, 1, 0, 0);
121
    if (ret == -1) {
122
        gf_smsg("poll", GF_LOG_ERROR, 0, LG_MSG_REGISTER_PIPE_FAILED, NULL);
123
        sys_close(event_pool->breaker[0]);
124
        sys_close(event_pool->breaker[1]);
125
        event_pool->breaker[0] = event_pool->breaker[1] = -1;
126

127
        GF_FREE(event_pool->reg);
128
        GF_FREE(event_pool);
129
        return NULL;
130
    }
131

132
    if (eventthreadcount > 1) {
133
        gf_smsg("poll", GF_LOG_INFO, 0, LG_MSG_POLL_IGNORE_MULTIPLE_THREADS,
134
                "count=%d", eventthreadcount, NULL);
135
    }
136

137
    /* although, eventhreadcount for poll implementation is always
138
     * going to be 1, eventthreadcount needs to be set to 1 so that
139
     * rpcsvc_request_handler() thread scaling works flawlessly in
140
     * both epoll and poll models
141
     */
142
    event_pool->eventthreadcount = 1;
143

144
    return event_pool;
145
}
146

147
static int
148
event_register_poll(struct event_pool *event_pool, int fd,
149
                    event_handler_t handler, void *data, int poll_in,
150
                    int poll_out, int notify_poller_death)
151
{
152
    int idx = -1;
153

154
    GF_VALIDATE_OR_GOTO("event", event_pool, out);
155

156
    pthread_mutex_lock(&event_pool->mutex);
157
    {
158
        if (event_pool->count == event_pool->used) {
159
            event_pool->count += 256;
160

161
            event_pool->reg = GF_REALLOC(
162
                event_pool->reg, event_pool->count * sizeof(*event_pool->reg));
163
            if (!event_pool->reg)
164
                goto unlock;
165
        }
166

167
        idx = event_pool->used++;
168

169
        event_pool->reg[idx].fd = fd;
170
        event_pool->reg[idx].events = POLLPRI;
171
        event_pool->reg[idx].handler = handler;
172
        event_pool->reg[idx].data = data;
173

174
        switch (poll_in) {
175
            case 1:
176
                event_pool->reg[idx].events |= POLLIN;
177
                break;
178
            case 0:
179
                event_pool->reg[idx].events &= ~POLLIN;
180
                break;
181
            case -1:
182
                /* do nothing */
183
                break;
184
            default:
185
                gf_smsg("poll", GF_LOG_ERROR, 0, LG_MSG_INVALID_POLL_IN,
186
                        "value=%d", poll_in, NULL);
187
                break;
188
        }
189

190
        switch (poll_out) {
191
            case 1:
192
                event_pool->reg[idx].events |= POLLOUT;
193
                break;
194
            case 0:
195
                event_pool->reg[idx].events &= ~POLLOUT;
196
                break;
197
            case -1:
198
                /* do nothing */
199
                break;
200
            default:
201
                gf_smsg("poll", GF_LOG_ERROR, 0, LG_MSG_INVALID_POLL_OUT,
202
                        "value=%d", poll_out, NULL);
203
                break;
204
        }
205

206
        event_pool->changed = 1;
207
    }
208
unlock:
209
    pthread_mutex_unlock(&event_pool->mutex);
210

211
out:
212
    return idx;
213
}
214

215
static int
216
event_unregister_poll(struct event_pool *event_pool, int fd, int idx_hint)
217
{
218
    int idx = -1;
219

220
    GF_VALIDATE_OR_GOTO("event", event_pool, out);
221

222
    pthread_mutex_lock(&event_pool->mutex);
223
    {
224
        idx = __event_getindex(event_pool, fd, idx_hint);
225

226
        if (idx == -1) {
227
            gf_smsg("poll", GF_LOG_ERROR, 0, LG_MSG_INDEX_NOT_FOUND, "fd=%d",
228
                    fd, "idx_hint=%d", idx_hint, NULL);
229
            errno = ENOENT;
230
            goto unlock;
231
        }
232

233
        event_pool->reg[idx] = event_pool->reg[--event_pool->used];
234
        event_pool->changed = 1;
235
    }
236
unlock:
237
    pthread_mutex_unlock(&event_pool->mutex);
238

239
out:
240
    return idx;
241
}
242

243
static int
244
event_unregister_close_poll(struct event_pool *event_pool, int fd, int idx_hint)
245
{
246
    int ret = -1;
247

248
    ret = event_unregister_poll(event_pool, fd, idx_hint);
249

250
    sys_close(fd);
251

252
    return ret;
253
}
254

255
static int
256
event_select_on_poll(struct event_pool *event_pool, int fd, int idx_hint,
257
                     int poll_in, int poll_out)
258
{
259
    int idx = -1;
260

261
    GF_VALIDATE_OR_GOTO("event", event_pool, out);
262

263
    pthread_mutex_lock(&event_pool->mutex);
264
    {
265
        idx = __event_getindex(event_pool, fd, idx_hint);
266

267
        if (idx == -1) {
268
            gf_smsg("poll", GF_LOG_ERROR, 0, LG_MSG_INDEX_NOT_FOUND, "fd=%d",
269
                    fd, "idx_hint=%d", idx_hint, NULL);
270
            errno = ENOENT;
271
            goto unlock;
272
        }
273

274
        switch (poll_in) {
275
            case 1:
276
                event_pool->reg[idx].events |= POLLIN;
277
                break;
278
            case 0:
279
                event_pool->reg[idx].events &= ~POLLIN;
280
                break;
281
            case -1:
282
                /* do nothing */
283
                break;
284
            default:
285
                /* TODO: log error */
286
                break;
287
        }
288

289
        switch (poll_out) {
290
            case 1:
291
                event_pool->reg[idx].events |= POLLOUT;
292
                break;
293
            case 0:
294
                event_pool->reg[idx].events &= ~POLLOUT;
295
                break;
296
            case -1:
297
                /* do nothing */
298
                break;
299
            default:
300
                /* TODO: log error */
301
                break;
302
        }
303

304
        if (poll_in + poll_out > -2)
305
            event_pool->changed = 1;
306
    }
307
unlock:
308
    pthread_mutex_unlock(&event_pool->mutex);
309

310
out:
311
    return idx;
312
}
313

314
static int
315
event_dispatch_poll_handler(struct event_pool *event_pool, struct pollfd *ufds,
316
                            int i)
317
{
318
    event_handler_t handler = NULL;
319
    void *data = NULL;
320
    int idx = -1;
321
    int ret = 0;
322

323
    handler = NULL;
324
    data = NULL;
325

326
    pthread_mutex_lock(&event_pool->mutex);
327
    {
328
        idx = __event_getindex(event_pool, ufds[i].fd, i);
329

330
        if (idx == -1) {
331
            gf_smsg("poll", GF_LOG_ERROR, 0, LG_MSG_INDEX_NOT_FOUND, "fd=%d",
332
                    ufds[i].fd, "idx_hint=%d", i, NULL);
333
            goto unlock;
334
        }
335

336
        handler = event_pool->reg[idx].handler;
337
        data = event_pool->reg[idx].data;
338
    }
339
unlock:
340
    pthread_mutex_unlock(&event_pool->mutex);
341

342
    if (handler)
343
        handler(ufds[i].fd, idx, 0, data,
344
                (ufds[i].revents & (POLLIN | POLLPRI)),
345
                (ufds[i].revents & (POLLOUT)),
346
                (ufds[i].revents & (POLLERR | POLLHUP | POLLNVAL)), 0);
347

348
    return ret;
349
}
350

351
static int
352
event_dispatch_poll_resize(struct event_pool *event_pool, struct pollfd *ufds,
353
                           int size)
354
{
355
    int i = 0;
356

357
    pthread_mutex_lock(&event_pool->mutex);
358
    {
359
        if (event_pool->changed == 0) {
360
            goto unlock;
361
        }
362

363
        if (event_pool->used > event_pool->evcache_size) {
364
            GF_FREE(event_pool->evcache);
365

366
            event_pool->evcache = ufds = NULL;
367

368
            event_pool->evcache_size = event_pool->used;
369

370
            ufds = GF_CALLOC(sizeof(struct pollfd), event_pool->evcache_size,
371
                             gf_common_mt_pollfd);
372
            if (!ufds)
373
                goto unlock;
374
            event_pool->evcache = ufds;
375
        }
376

377
        if (ufds == NULL) {
378
            goto unlock;
379
        }
380

381
        for (i = 0; i < event_pool->used; i++) {
382
            ufds[i].fd = event_pool->reg[i].fd;
383
            ufds[i].events = event_pool->reg[i].events;
384
            ufds[i].revents = 0;
385
        }
386

387
        size = i;
388
    }
389
unlock:
390
    pthread_mutex_unlock(&event_pool->mutex);
391

392
    return size;
393
}
394

395
static int
396
event_dispatch_poll(struct event_pool *event_pool)
397
{
398
    struct pollfd *ufds = NULL;
399
    int size = 0;
400
    int i = 0;
401
    int ret = -1;
402

403
    GF_VALIDATE_OR_GOTO("event", event_pool, out);
404

405
    pthread_mutex_lock(&event_pool->mutex);
406
    {
407
        event_pool->activethreadcount = 1;
408
    }
409
    pthread_mutex_unlock(&event_pool->mutex);
410

411
    while (1) {
412
        pthread_mutex_lock(&event_pool->mutex);
413
        {
414
            if (event_pool->destroy == 1) {
415
                event_pool->activethreadcount = 0;
416
                pthread_cond_broadcast(&event_pool->cond);
417
                pthread_mutex_unlock(&event_pool->mutex);
418
                return 0;
419
            }
420
        }
421
        pthread_mutex_unlock(&event_pool->mutex);
422

423
        size = event_dispatch_poll_resize(event_pool, ufds, size);
424
        ufds = event_pool->evcache;
425

426
        ret = poll(ufds, size, 1);
427

428
        if (ret == 0)
429
            /* timeout */
430
            continue;
431

432
        if (ret == -1 && errno == EINTR)
433
            /* sys call */
434
            continue;
435

436
        for (i = 0; i < size; i++) {
437
            if (!ufds[i].revents)
438
                continue;
439

440
            event_dispatch_poll_handler(event_pool, ufds, i);
441
        }
442
    }
443

444
out:
445
    return -1;
446
}
447

448
int
449
event_reconfigure_threads_poll(struct event_pool *event_pool, int value)
450
{
451
    /* No-op for poll */
452

453
    return 0;
454
}
455

456
/* This function is the destructor for the event_pool data structure
457
 * Should be called only after poller_threads_destroy() is called,
458
 * else will lead to crashes.
459
 */
460
static int
461
event_pool_destroy_poll(struct event_pool *event_pool)
462
{
463
    int ret = 0;
464

465
    ret = sys_close(event_pool->breaker[0]);
466
    if (ret)
467
        return ret;
468

469
    ret = sys_close(event_pool->breaker[1]);
470
    if (ret)
471
        return ret;
472

473
    event_pool->breaker[0] = event_pool->breaker[1] = -1;
474

475
    GF_FREE(event_pool->reg);
476
    GF_FREE(event_pool);
477

478
    return ret;
479
}
480

481
struct event_ops event_ops_poll = {
482
    .new = event_pool_new_poll,
483
    .event_register = event_register_poll,
484
    .event_select_on = event_select_on_poll,
485
    .event_unregister = event_unregister_poll,
486
    .event_unregister_close = event_unregister_close_poll,
487
    .event_dispatch = event_dispatch_poll,
488
    .event_reconfigure_threads = event_reconfigure_threads_poll,
489
    .event_pool_destroy = event_pool_destroy_poll};
490

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.