qemu

Форк
0
/
aio-win32.c 
443 строки · 12.4 Кб
1
/*
2
 * QEMU aio implementation
3
 *
4
 * Copyright IBM Corp., 2008
5
 * Copyright Red Hat Inc., 2012
6
 *
7
 * Authors:
8
 *  Anthony Liguori   <aliguori@us.ibm.com>
9
 *  Paolo Bonzini     <pbonzini@redhat.com>
10
 *
11
 * This work is licensed under the terms of the GNU GPL, version 2.  See
12
 * the COPYING file in the top-level directory.
13
 *
14
 * Contributions after 2012-01-13 are licensed under the terms of the
15
 * GNU GPL, version 2 or (at your option) any later version.
16
 */
17

18
#include "qemu/osdep.h"
19
#include "block/block.h"
20
#include "qemu/main-loop.h"
21
#include "qemu/queue.h"
22
#include "qemu/sockets.h"
23
#include "qapi/error.h"
24
#include "qemu/rcu_queue.h"
25
#include "qemu/error-report.h"
26

27
struct AioHandler {
28
    EventNotifier *e;
29
    IOHandler *io_read;
30
    IOHandler *io_write;
31
    EventNotifierHandler *io_notify;
32
    GPollFD pfd;
33
    int deleted;
34
    void *opaque;
35
    QLIST_ENTRY(AioHandler) node;
36
};
37

38
static void aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
39
{
40
    /*
41
     * If the GSource is in the process of being destroyed then
42
     * g_source_remove_poll() causes an assertion failure.  Skip
43
     * removal in that case, because glib cleans up its state during
44
     * destruction anyway.
45
     */
46
    if (!g_source_is_destroyed(&ctx->source)) {
47
        g_source_remove_poll(&ctx->source, &node->pfd);
48
    }
49

50
    /* If aio_poll is in progress, just mark the node as deleted */
51
    if (qemu_lockcnt_count(&ctx->list_lock)) {
52
        node->deleted = 1;
53
        node->pfd.revents = 0;
54
    } else {
55
        /* Otherwise, delete it for real.  We can't just mark it as
56
         * deleted because deleted nodes are only cleaned up after
57
         * releasing the list_lock.
58
         */
59
        QLIST_REMOVE(node, node);
60
        g_free(node);
61
    }
62
}
63

64
void aio_set_fd_handler(AioContext *ctx,
65
                        int fd,
66
                        IOHandler *io_read,
67
                        IOHandler *io_write,
68
                        AioPollFn *io_poll,
69
                        IOHandler *io_poll_ready,
70
                        void *opaque)
71
{
72
    AioHandler *old_node;
73
    AioHandler *node = NULL;
74
    SOCKET s;
75

76
    if (!fd_is_socket(fd)) {
77
        error_report("fd=%d is not a socket, AIO implementation is missing", fd);
78
        return;
79
    }
80

81
    s = _get_osfhandle(fd);
82

83
    qemu_lockcnt_lock(&ctx->list_lock);
84
    QLIST_FOREACH(old_node, &ctx->aio_handlers, node) {
85
        if (old_node->pfd.fd == s && !old_node->deleted) {
86
            break;
87
        }
88
    }
89

90
    if (io_read || io_write) {
91
        HANDLE event;
92
        long bitmask = 0;
93

94
        /* Alloc and insert if it's not already there */
95
        node = g_new0(AioHandler, 1);
96
        node->pfd.fd = s;
97

98
        node->pfd.events = 0;
99
        if (node->io_read) {
100
            node->pfd.events |= G_IO_IN;
101
        }
102
        if (node->io_write) {
103
            node->pfd.events |= G_IO_OUT;
104
        }
105

106
        node->e = &ctx->notifier;
107

108
        /* Update handler with latest information */
109
        node->opaque = opaque;
110
        node->io_read = io_read;
111
        node->io_write = io_write;
112

113
        if (io_read) {
114
            bitmask |= FD_READ | FD_ACCEPT | FD_CLOSE;
115
        }
116

117
        if (io_write) {
118
            bitmask |= FD_WRITE | FD_CONNECT;
119
        }
120

121
        QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
122
        event = event_notifier_get_handle(&ctx->notifier);
123
        qemu_socket_select(fd, event, bitmask, NULL);
124
    }
125
    if (old_node) {
126
        aio_remove_fd_handler(ctx, old_node);
127
    }
128

129
    qemu_lockcnt_unlock(&ctx->list_lock);
130
    aio_notify(ctx);
131
}
132

133
void aio_set_event_notifier(AioContext *ctx,
134
                            EventNotifier *e,
135
                            EventNotifierHandler *io_notify,
136
                            AioPollFn *io_poll,
137
                            EventNotifierHandler *io_poll_ready)
138
{
139
    AioHandler *node;
140

141
    qemu_lockcnt_lock(&ctx->list_lock);
142
    QLIST_FOREACH(node, &ctx->aio_handlers, node) {
143
        if (node->e == e && !node->deleted) {
144
            break;
145
        }
146
    }
147

148
    /* Are we deleting the fd handler? */
149
    if (!io_notify) {
150
        if (node) {
151
            aio_remove_fd_handler(ctx, node);
152
        }
153
    } else {
154
        if (node == NULL) {
155
            /* Alloc and insert if it's not already there */
156
            node = g_new0(AioHandler, 1);
157
            node->e = e;
158
            node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
159
            node->pfd.events = G_IO_IN;
160
            QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
161

162
            g_source_add_poll(&ctx->source, &node->pfd);
163
        }
164
        /* Update handler with latest information */
165
        node->io_notify = io_notify;
166
    }
167

168
    qemu_lockcnt_unlock(&ctx->list_lock);
169
    aio_notify(ctx);
170
}
171

172
void aio_set_event_notifier_poll(AioContext *ctx,
173
                                 EventNotifier *notifier,
174
                                 EventNotifierHandler *io_poll_begin,
175
                                 EventNotifierHandler *io_poll_end)
176
{
177
    /* Not implemented */
178
}
179

180
bool aio_prepare(AioContext *ctx)
181
{
182
    static struct timeval tv0;
183
    AioHandler *node;
184
    bool have_select_revents = false;
185
    fd_set rfds, wfds;
186

187
    /*
188
     * We have to walk very carefully in case aio_set_fd_handler is
189
     * called while we're walking.
190
     */
191
    qemu_lockcnt_inc(&ctx->list_lock);
192

193
    /* fill fd sets */
194
    FD_ZERO(&rfds);
195
    FD_ZERO(&wfds);
196
    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
197
        if (node->io_read) {
198
            FD_SET ((SOCKET)node->pfd.fd, &rfds);
199
        }
200
        if (node->io_write) {
201
            FD_SET ((SOCKET)node->pfd.fd, &wfds);
202
        }
203
    }
204

205
    if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
206
        QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
207
            node->pfd.revents = 0;
208
            if (FD_ISSET(node->pfd.fd, &rfds)) {
209
                node->pfd.revents |= G_IO_IN;
210
                have_select_revents = true;
211
            }
212

213
            if (FD_ISSET(node->pfd.fd, &wfds)) {
214
                node->pfd.revents |= G_IO_OUT;
215
                have_select_revents = true;
216
            }
217
        }
218
    }
219

220
    qemu_lockcnt_dec(&ctx->list_lock);
221
    return have_select_revents;
222
}
223

224
bool aio_pending(AioContext *ctx)
225
{
226
    AioHandler *node;
227
    bool result = false;
228

229
    /*
230
     * We have to walk very carefully in case aio_set_fd_handler is
231
     * called while we're walking.
232
     */
233
    qemu_lockcnt_inc(&ctx->list_lock);
234
    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
235
        if (node->pfd.revents && node->io_notify) {
236
            result = true;
237
            break;
238
        }
239

240
        if ((node->pfd.revents & G_IO_IN) && node->io_read) {
241
            result = true;
242
            break;
243
        }
244
        if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
245
            result = true;
246
            break;
247
        }
248
    }
249

250
    qemu_lockcnt_dec(&ctx->list_lock);
251
    return result;
252
}
253

254
static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
255
{
256
    AioHandler *node;
257
    bool progress = false;
258
    AioHandler *tmp;
259

260
    /*
261
     * We have to walk very carefully in case aio_set_fd_handler is
262
     * called while we're walking.
263
     */
264
    QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
265
        int revents = node->pfd.revents;
266

267
        if (!node->deleted &&
268
            (revents || event_notifier_get_handle(node->e) == event) &&
269
            node->io_notify) {
270
            node->pfd.revents = 0;
271
            node->io_notify(node->e);
272

273
            /* aio_notify() does not count as progress */
274
            if (node->e != &ctx->notifier) {
275
                progress = true;
276
            }
277
        }
278

279
        if (!node->deleted &&
280
            (node->io_read || node->io_write)) {
281
            node->pfd.revents = 0;
282
            if ((revents & G_IO_IN) && node->io_read) {
283
                node->io_read(node->opaque);
284
                progress = true;
285
            }
286
            if ((revents & G_IO_OUT) && node->io_write) {
287
                node->io_write(node->opaque);
288
                progress = true;
289
            }
290

291
            /* if the next select() will return an event, we have progressed */
292
            if (event == event_notifier_get_handle(&ctx->notifier)) {
293
                WSANETWORKEVENTS ev;
294
                WSAEnumNetworkEvents(node->pfd.fd, event, &ev);
295
                if (ev.lNetworkEvents) {
296
                    progress = true;
297
                }
298
            }
299
        }
300

301
        if (node->deleted) {
302
            if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
303
                QLIST_REMOVE(node, node);
304
                g_free(node);
305
                qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
306
            }
307
        }
308
    }
309

310
    return progress;
311
}
312

313
void aio_dispatch(AioContext *ctx)
314
{
315
    qemu_lockcnt_inc(&ctx->list_lock);
316
    aio_bh_poll(ctx);
317
    aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
318
    qemu_lockcnt_dec(&ctx->list_lock);
319
    timerlistgroup_run_timers(&ctx->tlg);
320
}
321

322
bool aio_poll(AioContext *ctx, bool blocking)
323
{
324
    AioHandler *node;
325
    HANDLE events[MAXIMUM_WAIT_OBJECTS];
326
    bool progress, have_select_revents, first;
327
    unsigned count;
328
    int timeout;
329

330
    /*
331
     * There cannot be two concurrent aio_poll calls for the same AioContext (or
332
     * an aio_poll concurrent with a GSource prepare/check/dispatch callback).
333
     * We rely on this below to avoid slow locked accesses to ctx->notify_me.
334
     *
335
     * aio_poll() may only be called in the AioContext's thread. iohandler_ctx
336
     * is special in that it runs in the main thread, but that thread's context
337
     * is qemu_aio_context.
338
     */
339
    assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
340
                                      qemu_get_aio_context() : ctx));
341
    progress = false;
342

343
    /* aio_notify can avoid the expensive event_notifier_set if
344
     * everything (file descriptors, bottom halves, timers) will
345
     * be re-evaluated before the next blocking poll().  This is
346
     * already true when aio_poll is called with blocking == false;
347
     * if blocking == true, it is only true after poll() returns,
348
     * so disable the optimization now.
349
     */
350
    if (blocking) {
351
        qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2);
352
        /*
353
         * Write ctx->notify_me before computing the timeout
354
         * (reading bottom half flags, etc.).  Pairs with
355
         * smp_mb in aio_notify().
356
         */
357
        smp_mb();
358
    }
359

360
    qemu_lockcnt_inc(&ctx->list_lock);
361
    have_select_revents = aio_prepare(ctx);
362

363
    /* fill fd sets */
364
    count = 0;
365
    QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
366
        if (!node->deleted && node->io_notify) {
367
            assert(count < MAXIMUM_WAIT_OBJECTS);
368
            events[count++] = event_notifier_get_handle(node->e);
369
        }
370
    }
371

372
    first = true;
373

374
    /* ctx->notifier is always registered.  */
375
    assert(count > 0);
376

377
    /* Multiple iterations, all of them non-blocking except the first,
378
     * may be necessary to process all pending events.  After the first
379
     * WaitForMultipleObjects call ctx->notify_me will be decremented.
380
     */
381
    do {
382
        HANDLE event;
383
        int ret;
384

385
        timeout = blocking && !have_select_revents
386
            ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0;
387
        ret = WaitForMultipleObjects(count, events, FALSE, timeout);
388
        if (blocking) {
389
            assert(first);
390
            qatomic_store_release(&ctx->notify_me,
391
                                  qatomic_read(&ctx->notify_me) - 2);
392
            aio_notify_accept(ctx);
393
        }
394

395
        if (first) {
396
            progress |= aio_bh_poll(ctx);
397
            first = false;
398
        }
399

400
        /* if we have any signaled events, dispatch event */
401
        event = NULL;
402
        if ((DWORD) (ret - WAIT_OBJECT_0) < count) {
403
            event = events[ret - WAIT_OBJECT_0];
404
            events[ret - WAIT_OBJECT_0] = events[--count];
405
        } else if (!have_select_revents) {
406
            break;
407
        }
408

409
        have_select_revents = false;
410
        blocking = false;
411

412
        progress |= aio_dispatch_handlers(ctx, event);
413
    } while (count > 0);
414

415
    qemu_lockcnt_dec(&ctx->list_lock);
416

417
    progress |= timerlistgroup_run_timers(&ctx->tlg);
418
    return progress;
419
}
420

421
void aio_context_setup(AioContext *ctx)
422
{
423
}
424

425
void aio_context_destroy(AioContext *ctx)
426
{
427
}
428

429
void aio_context_use_g_source(AioContext *ctx)
430
{
431
}
432

433
void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
434
                                 int64_t grow, int64_t shrink, Error **errp)
435
{
436
    if (max_ns) {
437
        error_setg(errp, "AioContext polling is not implemented on Windows");
438
    }
439
}
440

441
void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch)
442
{
443
}
444

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.