qemu
1/* SPDX-License-Identifier: GPL-2.0-or-later */
2/*
3* Linux io_uring file descriptor monitoring
4*
5* The Linux io_uring API supports file descriptor monitoring with a few
6* advantages over existing APIs like poll(2) and epoll(7):
7*
8* 1. Userspace polling of events is possible because the completion queue (cq
9* ring) is shared between the kernel and userspace. This allows
10* applications that rely on userspace polling to also monitor file
11* descriptors in the same userspace polling loop.
12*
13* 2. Submission and completion is batched and done together in a single system
14* call. This minimizes the number of system calls.
15*
16* 3. File descriptor monitoring is O(1) like epoll(7) so it scales better than
17* poll(2).
18*
19* 4. Nanosecond timeouts are supported so it requires fewer syscalls than
20* epoll(7).
21*
22* This code only monitors file descriptors and does not do asynchronous disk
23* I/O. Implementing disk I/O efficiently has other requirements and should
24* use a separate io_uring so it does not make sense to unify the code.
25*
26* File descriptor monitoring is implemented using the following operations:
27*
28* 1. IORING_OP_POLL_ADD - adds a file descriptor to be monitored.
29* 2. IORING_OP_POLL_REMOVE - removes a file descriptor being monitored. When
30* the poll mask changes for a file descriptor it is first removed and then
31* re-added with the new poll mask, so this operation is also used as part
32* of modifying an existing monitored file descriptor.
33* 3. IORING_OP_TIMEOUT - added every time a blocking syscall is made to wait
34* for events. This operation self-cancels if another event completes
35* before the timeout.
36*
37* io_uring calls the submission queue the "sq ring" and the completion queue
38* the "cq ring". Ring entries are called "sqe" and "cqe", respectively.
39*
40* The code is structured so that sq/cq rings are only modified within
41* fdmon_io_uring_wait(). Changes to AioHandlers are made by enqueuing them on
42* ctx->submit_list so that fdmon_io_uring_wait() can submit IORING_OP_POLL_ADD
43* and/or IORING_OP_POLL_REMOVE sqes for them.
44*/
45
46#include "qemu/osdep.h"47#include <poll.h>48#include "qemu/rcu_queue.h"49#include "aio-posix.h"50
51enum {52FDMON_IO_URING_ENTRIES = 128, /* sq/cq ring size */53
54/* AioHandler::flags */55FDMON_IO_URING_PENDING = (1 << 0),56FDMON_IO_URING_ADD = (1 << 1),57FDMON_IO_URING_REMOVE = (1 << 2),58};59
60static inline int poll_events_from_pfd(int pfd_events)61{
62return (pfd_events & G_IO_IN ? POLLIN : 0) |63(pfd_events & G_IO_OUT ? POLLOUT : 0) |64(pfd_events & G_IO_HUP ? POLLHUP : 0) |65(pfd_events & G_IO_ERR ? POLLERR : 0);66}
67
68static inline int pfd_events_from_poll(int poll_events)69{
70return (poll_events & POLLIN ? G_IO_IN : 0) |71(poll_events & POLLOUT ? G_IO_OUT : 0) |72(poll_events & POLLHUP ? G_IO_HUP : 0) |73(poll_events & POLLERR ? G_IO_ERR : 0);74}
75
76/*
77* Returns an sqe for submitting a request. Only be called within
78* fdmon_io_uring_wait().
79*/
80static struct io_uring_sqe *get_sqe(AioContext *ctx)81{
82struct io_uring *ring = &ctx->fdmon_io_uring;83struct io_uring_sqe *sqe = io_uring_get_sqe(ring);84int ret;85
86if (likely(sqe)) {87return sqe;88}89
90/* No free sqes left, submit pending sqes first */91do {92ret = io_uring_submit(ring);93} while (ret == -EINTR);94
95assert(ret > 1);96sqe = io_uring_get_sqe(ring);97assert(sqe);98return sqe;99}
100
101/* Atomically enqueue an AioHandler for sq ring submission */
102static void enqueue(AioHandlerSList *head, AioHandler *node, unsigned flags)103{
104unsigned old_flags;105
106old_flags = qatomic_fetch_or(&node->flags, FDMON_IO_URING_PENDING | flags);107if (!(old_flags & FDMON_IO_URING_PENDING)) {108QSLIST_INSERT_HEAD_ATOMIC(head, node, node_submitted);109}110}
111
112/* Dequeue an AioHandler for sq ring submission. Called by fill_sq_ring(). */
113static AioHandler *dequeue(AioHandlerSList *head, unsigned *flags)114{
115AioHandler *node = QSLIST_FIRST(head);116
117if (!node) {118return NULL;119}120
121/* Doesn't need to be atomic since fill_sq_ring() moves the list */122QSLIST_REMOVE_HEAD(head, node_submitted);123
124/*125* Don't clear FDMON_IO_URING_REMOVE. It's sticky so it can serve two
126* purposes: telling fill_sq_ring() to submit IORING_OP_POLL_REMOVE and
127* telling process_cqe() to delete the AioHandler when its
128* IORING_OP_POLL_ADD completes.
129*/
130*flags = qatomic_fetch_and(&node->flags, ~(FDMON_IO_URING_PENDING |131FDMON_IO_URING_ADD));132return node;133}
134
135static void fdmon_io_uring_update(AioContext *ctx,136AioHandler *old_node,137AioHandler *new_node)138{
139if (new_node) {140enqueue(&ctx->submit_list, new_node, FDMON_IO_URING_ADD);141}142
143if (old_node) {144/*145* Deletion is tricky because IORING_OP_POLL_ADD and
146* IORING_OP_POLL_REMOVE are async. We need to wait for the original
147* IORING_OP_POLL_ADD to complete before this handler can be freed
148* safely.
149*
150* It's possible that the file descriptor becomes ready and the
151* IORING_OP_POLL_ADD cqe is enqueued before IORING_OP_POLL_REMOVE is
152* submitted, too.
153*
154* Mark this handler deleted right now but don't place it on
155* ctx->deleted_aio_handlers yet. Instead, manually fudge the list
156* entry to make QLIST_IS_INSERTED() think this handler has been
157* inserted and other code recognizes this AioHandler as deleted.
158*
159* Once the original IORING_OP_POLL_ADD completes we enqueue the
160* handler on the real ctx->deleted_aio_handlers list to be freed.
161*/
162assert(!QLIST_IS_INSERTED(old_node, node_deleted));163old_node->node_deleted.le_prev = &old_node->node_deleted.le_next;164
165enqueue(&ctx->submit_list, old_node, FDMON_IO_URING_REMOVE);166}167}
168
169static void add_poll_add_sqe(AioContext *ctx, AioHandler *node)170{
171struct io_uring_sqe *sqe = get_sqe(ctx);172int events = poll_events_from_pfd(node->pfd.events);173
174io_uring_prep_poll_add(sqe, node->pfd.fd, events);175io_uring_sqe_set_data(sqe, node);176}
177
178static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node)179{
180struct io_uring_sqe *sqe = get_sqe(ctx);181
182#ifdef LIBURING_HAVE_DATA64183io_uring_prep_poll_remove(sqe, (uintptr_t)node);184#else185io_uring_prep_poll_remove(sqe, node);186#endif187io_uring_sqe_set_data(sqe, NULL);188}
189
190/* Add a timeout that self-cancels when another cqe becomes ready */
191static void add_timeout_sqe(AioContext *ctx, int64_t ns)192{
193struct io_uring_sqe *sqe;194struct __kernel_timespec ts = {195.tv_sec = ns / NANOSECONDS_PER_SECOND,196.tv_nsec = ns % NANOSECONDS_PER_SECOND,197};198
199sqe = get_sqe(ctx);200io_uring_prep_timeout(sqe, &ts, 1, 0);201io_uring_sqe_set_data(sqe, NULL);202}
203
204/* Add sqes from ctx->submit_list for submission */
205static void fill_sq_ring(AioContext *ctx)206{
207AioHandlerSList submit_list;208AioHandler *node;209unsigned flags;210
211QSLIST_MOVE_ATOMIC(&submit_list, &ctx->submit_list);212
213while ((node = dequeue(&submit_list, &flags))) {214/* Order matters, just in case both flags were set */215if (flags & FDMON_IO_URING_ADD) {216add_poll_add_sqe(ctx, node);217}218if (flags & FDMON_IO_URING_REMOVE) {219add_poll_remove_sqe(ctx, node);220}221}222}
223
224/* Returns true if a handler became ready */
225static bool process_cqe(AioContext *ctx,226AioHandlerList *ready_list,227struct io_uring_cqe *cqe)228{
229AioHandler *node = io_uring_cqe_get_data(cqe);230unsigned flags;231
232/* poll_timeout and poll_remove have a zero user_data field */233if (!node) {234return false;235}236
237/*238* Deletion can only happen when IORING_OP_POLL_ADD completes. If we race
239* with enqueue() here then we can safely clear the FDMON_IO_URING_REMOVE
240* bit before IORING_OP_POLL_REMOVE is submitted.
241*/
242flags = qatomic_fetch_and(&node->flags, ~FDMON_IO_URING_REMOVE);243if (flags & FDMON_IO_URING_REMOVE) {244QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);245return false;246}247
248aio_add_ready_handler(ready_list, node, pfd_events_from_poll(cqe->res));249
250/* IORING_OP_POLL_ADD is one-shot so we must re-arm it */251add_poll_add_sqe(ctx, node);252return true;253}
254
255static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list)256{
257struct io_uring *ring = &ctx->fdmon_io_uring;258struct io_uring_cqe *cqe;259unsigned num_cqes = 0;260unsigned num_ready = 0;261unsigned head;262
263io_uring_for_each_cqe(ring, head, cqe) {264if (process_cqe(ctx, ready_list, cqe)) {265num_ready++;266}267
268num_cqes++;269}270
271io_uring_cq_advance(ring, num_cqes);272return num_ready;273}
274
275static int fdmon_io_uring_wait(AioContext *ctx, AioHandlerList *ready_list,276int64_t timeout)277{
278unsigned wait_nr = 1; /* block until at least one cqe is ready */279int ret;280
281if (timeout == 0) {282wait_nr = 0; /* non-blocking */283} else if (timeout > 0) {284add_timeout_sqe(ctx, timeout);285}286
287fill_sq_ring(ctx);288
289do {290ret = io_uring_submit_and_wait(&ctx->fdmon_io_uring, wait_nr);291} while (ret == -EINTR);292
293assert(ret >= 0);294
295return process_cq_ring(ctx, ready_list);296}
297
298static bool fdmon_io_uring_need_wait(AioContext *ctx)299{
300/* Have io_uring events completed? */301if (io_uring_cq_ready(&ctx->fdmon_io_uring)) {302return true;303}304
305/* Are there pending sqes to submit? */306if (io_uring_sq_ready(&ctx->fdmon_io_uring)) {307return true;308}309
310/* Do we need to process AioHandlers for io_uring changes? */311if (!QSLIST_EMPTY_RCU(&ctx->submit_list)) {312return true;313}314
315return false;316}
317
318static const FDMonOps fdmon_io_uring_ops = {319.update = fdmon_io_uring_update,320.wait = fdmon_io_uring_wait,321.need_wait = fdmon_io_uring_need_wait,322};323
324bool fdmon_io_uring_setup(AioContext *ctx)325{
326int ret;327
328ret = io_uring_queue_init(FDMON_IO_URING_ENTRIES, &ctx->fdmon_io_uring, 0);329if (ret != 0) {330return false;331}332
333QSLIST_INIT(&ctx->submit_list);334ctx->fdmon_ops = &fdmon_io_uring_ops;335return true;336}
337
338void fdmon_io_uring_destroy(AioContext *ctx)339{
340if (ctx->fdmon_ops == &fdmon_io_uring_ops) {341AioHandler *node;342
343io_uring_queue_exit(&ctx->fdmon_io_uring);344
345/* Move handlers due to be removed onto the deleted list */346while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) {347unsigned flags = qatomic_fetch_and(&node->flags,348~(FDMON_IO_URING_PENDING |349FDMON_IO_URING_ADD |350FDMON_IO_URING_REMOVE));351
352if (flags & FDMON_IO_URING_REMOVE) {353QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);354}355
356QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted);357}358
359ctx->fdmon_ops = &fdmon_poll_ops;360}361}
362