2
* QEMU aio implementation
4
* Copyright IBM Corp., 2008
5
* Copyright Red Hat Inc., 2012
8
* Anthony Liguori <aliguori@us.ibm.com>
9
* Paolo Bonzini <pbonzini@redhat.com>
11
* This work is licensed under the terms of the GNU GPL, version 2. See
12
* the COPYING file in the top-level directory.
14
* Contributions after 2012-01-13 are licensed under the terms of the
15
* GNU GPL, version 2 or (at your option) any later version.
18
#include "qemu/osdep.h"
19
#include "block/block.h"
20
#include "qemu/main-loop.h"
21
#include "qemu/queue.h"
22
#include "qemu/sockets.h"
23
#include "qapi/error.h"
24
#include "qemu/rcu_queue.h"
25
#include "qemu/error-report.h"
31
EventNotifierHandler *io_notify;
35
QLIST_ENTRY(AioHandler) node;
38
static void aio_remove_fd_handler(AioContext *ctx, AioHandler *node)
41
* If the GSource is in the process of being destroyed then
42
* g_source_remove_poll() causes an assertion failure. Skip
43
* removal in that case, because glib cleans up its state during
46
if (!g_source_is_destroyed(&ctx->source)) {
47
g_source_remove_poll(&ctx->source, &node->pfd);
50
/* If aio_poll is in progress, just mark the node as deleted */
51
if (qemu_lockcnt_count(&ctx->list_lock)) {
53
node->pfd.revents = 0;
55
/* Otherwise, delete it for real. We can't just mark it as
56
* deleted because deleted nodes are only cleaned up after
57
* releasing the list_lock.
59
QLIST_REMOVE(node, node);
64
void aio_set_fd_handler(AioContext *ctx,
69
IOHandler *io_poll_ready,
73
AioHandler *node = NULL;
76
if (!fd_is_socket(fd)) {
77
error_report("fd=%d is not a socket, AIO implementation is missing", fd);
81
s = _get_osfhandle(fd);
83
qemu_lockcnt_lock(&ctx->list_lock);
84
QLIST_FOREACH(old_node, &ctx->aio_handlers, node) {
85
if (old_node->pfd.fd == s && !old_node->deleted) {
90
if (io_read || io_write) {
94
/* Alloc and insert if it's not already there */
95
node = g_new0(AioHandler, 1);
100
node->pfd.events |= G_IO_IN;
102
if (node->io_write) {
103
node->pfd.events |= G_IO_OUT;
106
node->e = &ctx->notifier;
108
/* Update handler with latest information */
109
node->opaque = opaque;
110
node->io_read = io_read;
111
node->io_write = io_write;
114
bitmask |= FD_READ | FD_ACCEPT | FD_CLOSE;
118
bitmask |= FD_WRITE | FD_CONNECT;
121
QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
122
event = event_notifier_get_handle(&ctx->notifier);
123
qemu_socket_select(fd, event, bitmask, NULL);
126
aio_remove_fd_handler(ctx, old_node);
129
qemu_lockcnt_unlock(&ctx->list_lock);
133
void aio_set_event_notifier(AioContext *ctx,
135
EventNotifierHandler *io_notify,
137
EventNotifierHandler *io_poll_ready)
141
qemu_lockcnt_lock(&ctx->list_lock);
142
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
143
if (node->e == e && !node->deleted) {
148
/* Are we deleting the fd handler? */
151
aio_remove_fd_handler(ctx, node);
155
/* Alloc and insert if it's not already there */
156
node = g_new0(AioHandler, 1);
158
node->pfd.fd = (uintptr_t)event_notifier_get_handle(e);
159
node->pfd.events = G_IO_IN;
160
QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node);
162
g_source_add_poll(&ctx->source, &node->pfd);
164
/* Update handler with latest information */
165
node->io_notify = io_notify;
168
qemu_lockcnt_unlock(&ctx->list_lock);
172
void aio_set_event_notifier_poll(AioContext *ctx,
173
EventNotifier *notifier,
174
EventNotifierHandler *io_poll_begin,
175
EventNotifierHandler *io_poll_end)
177
/* Not implemented */
180
bool aio_prepare(AioContext *ctx)
182
static struct timeval tv0;
184
bool have_select_revents = false;
188
* We have to walk very carefully in case aio_set_fd_handler is
189
* called while we're walking.
191
qemu_lockcnt_inc(&ctx->list_lock);
196
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
198
FD_SET ((SOCKET)node->pfd.fd, &rfds);
200
if (node->io_write) {
201
FD_SET ((SOCKET)node->pfd.fd, &wfds);
205
if (select(0, &rfds, &wfds, NULL, &tv0) > 0) {
206
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
207
node->pfd.revents = 0;
208
if (FD_ISSET(node->pfd.fd, &rfds)) {
209
node->pfd.revents |= G_IO_IN;
210
have_select_revents = true;
213
if (FD_ISSET(node->pfd.fd, &wfds)) {
214
node->pfd.revents |= G_IO_OUT;
215
have_select_revents = true;
220
qemu_lockcnt_dec(&ctx->list_lock);
221
return have_select_revents;
224
bool aio_pending(AioContext *ctx)
230
* We have to walk very carefully in case aio_set_fd_handler is
231
* called while we're walking.
233
qemu_lockcnt_inc(&ctx->list_lock);
234
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
235
if (node->pfd.revents && node->io_notify) {
240
if ((node->pfd.revents & G_IO_IN) && node->io_read) {
244
if ((node->pfd.revents & G_IO_OUT) && node->io_write) {
250
qemu_lockcnt_dec(&ctx->list_lock);
254
static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
257
bool progress = false;
261
* We have to walk very carefully in case aio_set_fd_handler is
262
* called while we're walking.
264
QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) {
265
int revents = node->pfd.revents;
267
if (!node->deleted &&
268
(revents || event_notifier_get_handle(node->e) == event) &&
270
node->pfd.revents = 0;
271
node->io_notify(node->e);
273
/* aio_notify() does not count as progress */
274
if (node->e != &ctx->notifier) {
279
if (!node->deleted &&
280
(node->io_read || node->io_write)) {
281
node->pfd.revents = 0;
282
if ((revents & G_IO_IN) && node->io_read) {
283
node->io_read(node->opaque);
286
if ((revents & G_IO_OUT) && node->io_write) {
287
node->io_write(node->opaque);
291
/* if the next select() will return an event, we have progressed */
292
if (event == event_notifier_get_handle(&ctx->notifier)) {
294
WSAEnumNetworkEvents(node->pfd.fd, event, &ev);
295
if (ev.lNetworkEvents) {
302
if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) {
303
QLIST_REMOVE(node, node);
305
qemu_lockcnt_inc_and_unlock(&ctx->list_lock);
313
void aio_dispatch(AioContext *ctx)
315
qemu_lockcnt_inc(&ctx->list_lock);
317
aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
318
qemu_lockcnt_dec(&ctx->list_lock);
319
timerlistgroup_run_timers(&ctx->tlg);
322
bool aio_poll(AioContext *ctx, bool blocking)
325
HANDLE events[MAXIMUM_WAIT_OBJECTS];
326
bool progress, have_select_revents, first;
331
* There cannot be two concurrent aio_poll calls for the same AioContext (or
332
* an aio_poll concurrent with a GSource prepare/check/dispatch callback).
333
* We rely on this below to avoid slow locked accesses to ctx->notify_me.
335
* aio_poll() may only be called in the AioContext's thread. iohandler_ctx
336
* is special in that it runs in the main thread, but that thread's context
337
* is qemu_aio_context.
339
assert(in_aio_context_home_thread(ctx == iohandler_get_aio_context() ?
340
qemu_get_aio_context() : ctx));
343
/* aio_notify can avoid the expensive event_notifier_set if
344
* everything (file descriptors, bottom halves, timers) will
345
* be re-evaluated before the next blocking poll(). This is
346
* already true when aio_poll is called with blocking == false;
347
* if blocking == true, it is only true after poll() returns,
348
* so disable the optimization now.
351
qatomic_set(&ctx->notify_me, qatomic_read(&ctx->notify_me) + 2);
353
* Write ctx->notify_me before computing the timeout
354
* (reading bottom half flags, etc.). Pairs with
355
* smp_mb in aio_notify().
360
qemu_lockcnt_inc(&ctx->list_lock);
361
have_select_revents = aio_prepare(ctx);
365
QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) {
366
if (!node->deleted && node->io_notify) {
367
assert(count < MAXIMUM_WAIT_OBJECTS);
368
events[count++] = event_notifier_get_handle(node->e);
374
/* ctx->notifier is always registered. */
377
/* Multiple iterations, all of them non-blocking except the first,
378
* may be necessary to process all pending events. After the first
379
* WaitForMultipleObjects call ctx->notify_me will be decremented.
385
timeout = blocking && !have_select_revents
386
? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0;
387
ret = WaitForMultipleObjects(count, events, FALSE, timeout);
390
qatomic_store_release(&ctx->notify_me,
391
qatomic_read(&ctx->notify_me) - 2);
392
aio_notify_accept(ctx);
396
progress |= aio_bh_poll(ctx);
400
/* if we have any signaled events, dispatch event */
402
if ((DWORD) (ret - WAIT_OBJECT_0) < count) {
403
event = events[ret - WAIT_OBJECT_0];
404
events[ret - WAIT_OBJECT_0] = events[--count];
405
} else if (!have_select_revents) {
409
have_select_revents = false;
412
progress |= aio_dispatch_handlers(ctx, event);
415
qemu_lockcnt_dec(&ctx->list_lock);
417
progress |= timerlistgroup_run_timers(&ctx->tlg);
421
void aio_context_setup(AioContext *ctx)
425
void aio_context_destroy(AioContext *ctx)
429
void aio_context_use_g_source(AioContext *ctx)
433
void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
434
int64_t grow, int64_t shrink, Error **errp)
437
error_setg(errp, "AioContext polling is not implemented on Windows");
441
void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch)