qemu

Форк
0
/
fdmon-io_uring.c 
361 строка · 11.0 Кб
1
/* SPDX-License-Identifier: GPL-2.0-or-later */
2
/*
3
 * Linux io_uring file descriptor monitoring
4
 *
5
 * The Linux io_uring API supports file descriptor monitoring with a few
6
 * advantages over existing APIs like poll(2) and epoll(7):
7
 *
8
 * 1. Userspace polling of events is possible because the completion queue (cq
9
 *    ring) is shared between the kernel and userspace.  This allows
10
 *    applications that rely on userspace polling to also monitor file
11
 *    descriptors in the same userspace polling loop.
12
 *
13
 * 2. Submission and completion is batched and done together in a single system
14
 *    call.  This minimizes the number of system calls.
15
 *
16
 * 3. File descriptor monitoring is O(1) like epoll(7) so it scales better than
17
 *    poll(2).
18
 *
19
 * 4. Nanosecond timeouts are supported so it requires fewer syscalls than
20
 *    epoll(7).
21
 *
22
 * This code only monitors file descriptors and does not do asynchronous disk
23
 * I/O.  Implementing disk I/O efficiently has other requirements and should
24
 * use a separate io_uring so it does not make sense to unify the code.
25
 *
26
 * File descriptor monitoring is implemented using the following operations:
27
 *
28
 * 1. IORING_OP_POLL_ADD - adds a file descriptor to be monitored.
29
 * 2. IORING_OP_POLL_REMOVE - removes a file descriptor being monitored.  When
30
 *    the poll mask changes for a file descriptor it is first removed and then
31
 *    re-added with the new poll mask, so this operation is also used as part
32
 *    of modifying an existing monitored file descriptor.
33
 * 3. IORING_OP_TIMEOUT - added every time a blocking syscall is made to wait
34
 *    for events.  This operation self-cancels if another event completes
35
 *    before the timeout.
36
 *
37
 * io_uring calls the submission queue the "sq ring" and the completion queue
38
 * the "cq ring".  Ring entries are called "sqe" and "cqe", respectively.
39
 *
40
 * The code is structured so that sq/cq rings are only modified within
41
 * fdmon_io_uring_wait().  Changes to AioHandlers are made by enqueuing them on
42
 * ctx->submit_list so that fdmon_io_uring_wait() can submit IORING_OP_POLL_ADD
43
 * and/or IORING_OP_POLL_REMOVE sqes for them.
44
 */
45

46
#include "qemu/osdep.h"
47
#include <poll.h>
48
#include "qemu/rcu_queue.h"
49
#include "aio-posix.h"
50

51
enum {
52
    FDMON_IO_URING_ENTRIES  = 128, /* sq/cq ring size */
53

54
    /* AioHandler::flags */
55
    FDMON_IO_URING_PENDING  = (1 << 0),
56
    FDMON_IO_URING_ADD      = (1 << 1),
57
    FDMON_IO_URING_REMOVE   = (1 << 2),
58
};
59

60
static inline int poll_events_from_pfd(int pfd_events)
61
{
62
    return (pfd_events & G_IO_IN ? POLLIN : 0) |
63
           (pfd_events & G_IO_OUT ? POLLOUT : 0) |
64
           (pfd_events & G_IO_HUP ? POLLHUP : 0) |
65
           (pfd_events & G_IO_ERR ? POLLERR : 0);
66
}
67

68
static inline int pfd_events_from_poll(int poll_events)
69
{
70
    return (poll_events & POLLIN ? G_IO_IN : 0) |
71
           (poll_events & POLLOUT ? G_IO_OUT : 0) |
72
           (poll_events & POLLHUP ? G_IO_HUP : 0) |
73
           (poll_events & POLLERR ? G_IO_ERR : 0);
74
}
75

76
/*
77
 * Returns an sqe for submitting a request.  Only be called within
78
 * fdmon_io_uring_wait().
79
 */
80
static struct io_uring_sqe *get_sqe(AioContext *ctx)
81
{
82
    struct io_uring *ring = &ctx->fdmon_io_uring;
83
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
84
    int ret;
85

86
    if (likely(sqe)) {
87
        return sqe;
88
    }
89

90
    /* No free sqes left, submit pending sqes first */
91
    do {
92
        ret = io_uring_submit(ring);
93
    } while (ret == -EINTR);
94

95
    assert(ret > 1);
96
    sqe = io_uring_get_sqe(ring);
97
    assert(sqe);
98
    return sqe;
99
}
100

101
/* Atomically enqueue an AioHandler for sq ring submission */
102
static void enqueue(AioHandlerSList *head, AioHandler *node, unsigned flags)
103
{
104
    unsigned old_flags;
105

106
    old_flags = qatomic_fetch_or(&node->flags, FDMON_IO_URING_PENDING | flags);
107
    if (!(old_flags & FDMON_IO_URING_PENDING)) {
108
        QSLIST_INSERT_HEAD_ATOMIC(head, node, node_submitted);
109
    }
110
}
111

112
/* Dequeue an AioHandler for sq ring submission.  Called by fill_sq_ring(). */
113
static AioHandler *dequeue(AioHandlerSList *head, unsigned *flags)
114
{
115
    AioHandler *node = QSLIST_FIRST(head);
116

117
    if (!node) {
118
        return NULL;
119
    }
120

121
    /* Doesn't need to be atomic since fill_sq_ring() moves the list */
122
    QSLIST_REMOVE_HEAD(head, node_submitted);
123

124
    /*
125
     * Don't clear FDMON_IO_URING_REMOVE.  It's sticky so it can serve two
126
     * purposes: telling fill_sq_ring() to submit IORING_OP_POLL_REMOVE and
127
     * telling process_cqe() to delete the AioHandler when its
128
     * IORING_OP_POLL_ADD completes.
129
     */
130
    *flags = qatomic_fetch_and(&node->flags, ~(FDMON_IO_URING_PENDING |
131
                                              FDMON_IO_URING_ADD));
132
    return node;
133
}
134

135
static void fdmon_io_uring_update(AioContext *ctx,
136
                                  AioHandler *old_node,
137
                                  AioHandler *new_node)
138
{
139
    if (new_node) {
140
        enqueue(&ctx->submit_list, new_node, FDMON_IO_URING_ADD);
141
    }
142

143
    if (old_node) {
144
        /*
145
         * Deletion is tricky because IORING_OP_POLL_ADD and
146
         * IORING_OP_POLL_REMOVE are async.  We need to wait for the original
147
         * IORING_OP_POLL_ADD to complete before this handler can be freed
148
         * safely.
149
         *
150
         * It's possible that the file descriptor becomes ready and the
151
         * IORING_OP_POLL_ADD cqe is enqueued before IORING_OP_POLL_REMOVE is
152
         * submitted, too.
153
         *
154
         * Mark this handler deleted right now but don't place it on
155
         * ctx->deleted_aio_handlers yet.  Instead, manually fudge the list
156
         * entry to make QLIST_IS_INSERTED() think this handler has been
157
         * inserted and other code recognizes this AioHandler as deleted.
158
         *
159
         * Once the original IORING_OP_POLL_ADD completes we enqueue the
160
         * handler on the real ctx->deleted_aio_handlers list to be freed.
161
         */
162
        assert(!QLIST_IS_INSERTED(old_node, node_deleted));
163
        old_node->node_deleted.le_prev = &old_node->node_deleted.le_next;
164

165
        enqueue(&ctx->submit_list, old_node, FDMON_IO_URING_REMOVE);
166
    }
167
}
168

169
static void add_poll_add_sqe(AioContext *ctx, AioHandler *node)
170
{
171
    struct io_uring_sqe *sqe = get_sqe(ctx);
172
    int events = poll_events_from_pfd(node->pfd.events);
173

174
    io_uring_prep_poll_add(sqe, node->pfd.fd, events);
175
    io_uring_sqe_set_data(sqe, node);
176
}
177

178
static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node)
179
{
180
    struct io_uring_sqe *sqe = get_sqe(ctx);
181

182
#ifdef LIBURING_HAVE_DATA64
183
    io_uring_prep_poll_remove(sqe, (uintptr_t)node);
184
#else
185
    io_uring_prep_poll_remove(sqe, node);
186
#endif
187
    io_uring_sqe_set_data(sqe, NULL);
188
}
189

190
/* Add a timeout that self-cancels when another cqe becomes ready */
191
static void add_timeout_sqe(AioContext *ctx, int64_t ns)
192
{
193
    struct io_uring_sqe *sqe;
194
    struct __kernel_timespec ts = {
195
        .tv_sec = ns / NANOSECONDS_PER_SECOND,
196
        .tv_nsec = ns % NANOSECONDS_PER_SECOND,
197
    };
198

199
    sqe = get_sqe(ctx);
200
    io_uring_prep_timeout(sqe, &ts, 1, 0);
201
    io_uring_sqe_set_data(sqe, NULL);
202
}
203

204
/* Add sqes from ctx->submit_list for submission */
205
static void fill_sq_ring(AioContext *ctx)
206
{
207
    AioHandlerSList submit_list;
208
    AioHandler *node;
209
    unsigned flags;
210

211
    QSLIST_MOVE_ATOMIC(&submit_list, &ctx->submit_list);
212

213
    while ((node = dequeue(&submit_list, &flags))) {
214
        /* Order matters, just in case both flags were set */
215
        if (flags & FDMON_IO_URING_ADD) {
216
            add_poll_add_sqe(ctx, node);
217
        }
218
        if (flags & FDMON_IO_URING_REMOVE) {
219
            add_poll_remove_sqe(ctx, node);
220
        }
221
    }
222
}
223

224
/* Returns true if a handler became ready */
225
static bool process_cqe(AioContext *ctx,
226
                        AioHandlerList *ready_list,
227
                        struct io_uring_cqe *cqe)
228
{
229
    AioHandler *node = io_uring_cqe_get_data(cqe);
230
    unsigned flags;
231

232
    /* poll_timeout and poll_remove have a zero user_data field */
233
    if (!node) {
234
        return false;
235
    }
236

237
    /*
238
     * Deletion can only happen when IORING_OP_POLL_ADD completes.  If we race
239
     * with enqueue() here then we can safely clear the FDMON_IO_URING_REMOVE
240
     * bit before IORING_OP_POLL_REMOVE is submitted.
241
     */
242
    flags = qatomic_fetch_and(&node->flags, ~FDMON_IO_URING_REMOVE);
243
    if (flags & FDMON_IO_URING_REMOVE) {
244
        QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
245
        return false;
246
    }
247

248
    aio_add_ready_handler(ready_list, node, pfd_events_from_poll(cqe->res));
249

250
    /* IORING_OP_POLL_ADD is one-shot so we must re-arm it */
251
    add_poll_add_sqe(ctx, node);
252
    return true;
253
}
254

255
static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list)
256
{
257
    struct io_uring *ring = &ctx->fdmon_io_uring;
258
    struct io_uring_cqe *cqe;
259
    unsigned num_cqes = 0;
260
    unsigned num_ready = 0;
261
    unsigned head;
262

263
    io_uring_for_each_cqe(ring, head, cqe) {
264
        if (process_cqe(ctx, ready_list, cqe)) {
265
            num_ready++;
266
        }
267

268
        num_cqes++;
269
    }
270

271
    io_uring_cq_advance(ring, num_cqes);
272
    return num_ready;
273
}
274

275
static int fdmon_io_uring_wait(AioContext *ctx, AioHandlerList *ready_list,
276
                               int64_t timeout)
277
{
278
    unsigned wait_nr = 1; /* block until at least one cqe is ready */
279
    int ret;
280

281
    if (timeout == 0) {
282
        wait_nr = 0; /* non-blocking */
283
    } else if (timeout > 0) {
284
        add_timeout_sqe(ctx, timeout);
285
    }
286

287
    fill_sq_ring(ctx);
288

289
    do {
290
        ret = io_uring_submit_and_wait(&ctx->fdmon_io_uring, wait_nr);
291
    } while (ret == -EINTR);
292

293
    assert(ret >= 0);
294

295
    return process_cq_ring(ctx, ready_list);
296
}
297

298
static bool fdmon_io_uring_need_wait(AioContext *ctx)
299
{
300
    /* Have io_uring events completed? */
301
    if (io_uring_cq_ready(&ctx->fdmon_io_uring)) {
302
        return true;
303
    }
304

305
    /* Are there pending sqes to submit? */
306
    if (io_uring_sq_ready(&ctx->fdmon_io_uring)) {
307
        return true;
308
    }
309

310
    /* Do we need to process AioHandlers for io_uring changes? */
311
    if (!QSLIST_EMPTY_RCU(&ctx->submit_list)) {
312
        return true;
313
    }
314

315
    return false;
316
}
317

318
static const FDMonOps fdmon_io_uring_ops = {
319
    .update = fdmon_io_uring_update,
320
    .wait = fdmon_io_uring_wait,
321
    .need_wait = fdmon_io_uring_need_wait,
322
};
323

324
bool fdmon_io_uring_setup(AioContext *ctx)
325
{
326
    int ret;
327

328
    ret = io_uring_queue_init(FDMON_IO_URING_ENTRIES, &ctx->fdmon_io_uring, 0);
329
    if (ret != 0) {
330
        return false;
331
    }
332

333
    QSLIST_INIT(&ctx->submit_list);
334
    ctx->fdmon_ops = &fdmon_io_uring_ops;
335
    return true;
336
}
337

338
void fdmon_io_uring_destroy(AioContext *ctx)
339
{
340
    if (ctx->fdmon_ops == &fdmon_io_uring_ops) {
341
        AioHandler *node;
342

343
        io_uring_queue_exit(&ctx->fdmon_io_uring);
344

345
        /* Move handlers due to be removed onto the deleted list */
346
        while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) {
347
            unsigned flags = qatomic_fetch_and(&node->flags,
348
                    ~(FDMON_IO_URING_PENDING |
349
                      FDMON_IO_URING_ADD |
350
                      FDMON_IO_URING_REMOVE));
351

352
            if (flags & FDMON_IO_URING_REMOVE) {
353
                QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
354
            }
355

356
            QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted);
357
        }
358

359
        ctx->fdmon_ops = &fdmon_poll_ops;
360
    }
361
}
362

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.