qemu

Форк
0
/
vhost-user-server.c 
517 строк · 15.2 Кб
1
/*
2
 * Sharing QEMU devices via vhost-user protocol
3
 *
4
 * Copyright (c) Coiby Xu <coiby.xu@gmail.com>.
5
 * Copyright (c) 2020 Red Hat, Inc.
6
 *
7
 * This work is licensed under the terms of the GNU GPL, version 2 or
8
 * later.  See the COPYING file in the top-level directory.
9
 */
10
#include "qemu/osdep.h"
11
#include "qemu/error-report.h"
12
#include "qemu/main-loop.h"
13
#include "qemu/vhost-user-server.h"
14
#include "block/aio-wait.h"
15

16
/*
17
 * Theory of operation:
18
 *
19
 * VuServer is started and stopped by vhost_user_server_start() and
20
 * vhost_user_server_stop() from the main loop thread. Starting the server
21
 * opens a vhost-user UNIX domain socket and listens for incoming connections.
22
 * Only one connection is allowed at a time.
23
 *
24
 * The connection is handled by the vu_client_trip() coroutine in the
25
 * VuServer->ctx AioContext. The coroutine consists of a vu_dispatch() loop
26
 * where libvhost-user calls vu_message_read() to receive the next vhost-user
27
 * protocol messages over the UNIX domain socket.
28
 *
29
 * When virtqueues are set up libvhost-user calls set_watch() to monitor kick
30
 * fds. These fds are also handled in the VuServer->ctx AioContext.
31
 *
32
 * Both vu_client_trip() and kick fd monitoring can be stopped by shutting down
33
 * the socket connection. Shutting down the socket connection causes
34
 * vu_message_read() to fail since no more data can be received from the socket.
35
 * After vu_dispatch() fails, vu_client_trip() calls vu_deinit() to stop
36
 * libvhost-user before terminating the coroutine. vu_deinit() calls
37
 * remove_watch() to stop monitoring kick fds and this stops virtqueue
38
 * processing.
39
 *
40
 * When vu_client_trip() has finished cleaning up it schedules a BH in the main
41
 * loop thread to accept the next client connection.
42
 *
43
 * When libvhost-user detects an error it calls panic_cb() and sets the
44
 * dev->broken flag. Both vu_client_trip() and kick fd processing stop when
45
 * the dev->broken flag is set.
46
 *
47
 * It is possible to switch AioContexts using
48
 * vhost_user_server_detach_aio_context() and
49
 * vhost_user_server_attach_aio_context(). They stop monitoring fds in the old
50
 * AioContext and resume monitoring in the new AioContext. The vu_client_trip()
51
 * coroutine remains in a yielded state during the switch. This is made
52
 * possible by QIOChannel's support for spurious coroutine re-entry in
53
 * qio_channel_yield(). The coroutine will restart I/O when re-entered from the
54
 * new AioContext.
55
 */
56

57
static void vmsg_close_fds(VhostUserMsg *vmsg)
58
{
59
    int i;
60
    for (i = 0; i < vmsg->fd_num; i++) {
61
        close(vmsg->fds[i]);
62
    }
63
}
64

65
static void vmsg_unblock_fds(VhostUserMsg *vmsg)
66
{
67
    int i;
68

69
    /*
70
     * These messages carry fd used to map memory, not to send/receive messages,
71
     * so this operation is useless. In addition, in some systems this
72
     * operation can fail (e.g. in macOS setting an fd returned by shm_open()
73
     * non-blocking fails with errno = ENOTTY)
74
     */
75
    if (vmsg->request == VHOST_USER_ADD_MEM_REG ||
76
        vmsg->request == VHOST_USER_SET_MEM_TABLE) {
77
        return;
78
    }
79

80
    for (i = 0; i < vmsg->fd_num; i++) {
81
        qemu_socket_set_nonblock(vmsg->fds[i]);
82
    }
83
}
84

85
static void panic_cb(VuDev *vu_dev, const char *buf)
86
{
87
    error_report("vu_panic: %s", buf);
88
}
89

90
void vhost_user_server_inc_in_flight(VuServer *server)
91
{
92
    assert(!server->wait_idle);
93
    qatomic_inc(&server->in_flight);
94
}
95

96
void vhost_user_server_dec_in_flight(VuServer *server)
97
{
98
    if (qatomic_fetch_dec(&server->in_flight) == 1) {
99
        if (server->wait_idle) {
100
            aio_co_wake(server->co_trip);
101
        }
102
    }
103
}
104

105
bool vhost_user_server_has_in_flight(VuServer *server)
106
{
107
    return qatomic_load_acquire(&server->in_flight) > 0;
108
}
109

110
static bool coroutine_fn
111
vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg)
112
{
113
    struct iovec iov = {
114
        .iov_base = (char *)vmsg,
115
        .iov_len = VHOST_USER_HDR_SIZE,
116
    };
117
    int rc, read_bytes = 0;
118
    Error *local_err = NULL;
119
    const size_t max_fds = G_N_ELEMENTS(vmsg->fds);
120
    VuServer *server = container_of(vu_dev, VuServer, vu_dev);
121
    QIOChannel *ioc = server->ioc;
122

123
    vmsg->fd_num = 0;
124
    if (!ioc) {
125
        error_report_err(local_err);
126
        goto fail;
127
    }
128

129
    assert(qemu_in_coroutine());
130
    do {
131
        size_t nfds = 0;
132
        int *fds = NULL;
133

134
        /*
135
         * qio_channel_readv_full may have short reads, keeping calling it
136
         * until getting VHOST_USER_HDR_SIZE or 0 bytes in total
137
         */
138
        rc = qio_channel_readv_full(ioc, &iov, 1, &fds, &nfds, 0, &local_err);
139
        if (rc < 0) {
140
            if (rc == QIO_CHANNEL_ERR_BLOCK) {
141
                assert(local_err == NULL);
142
                if (server->ctx) {
143
                    server->in_qio_channel_yield = true;
144
                    qio_channel_yield(ioc, G_IO_IN);
145
                    server->in_qio_channel_yield = false;
146
                } else {
147
                    return false;
148
                }
149
                continue;
150
            } else {
151
                error_report_err(local_err);
152
                goto fail;
153
            }
154
        }
155

156
        if (nfds > 0) {
157
            if (vmsg->fd_num + nfds > max_fds) {
158
                error_report("A maximum of %zu fds are allowed, "
159
                             "however got %zu fds now",
160
                             max_fds, vmsg->fd_num + nfds);
161
                g_free(fds);
162
                goto fail;
163
            }
164
            memcpy(vmsg->fds + vmsg->fd_num, fds, nfds * sizeof(vmsg->fds[0]));
165
            vmsg->fd_num += nfds;
166
            g_free(fds);
167
        }
168

169
        if (rc == 0) { /* socket closed */
170
            goto fail;
171
        }
172

173
        iov.iov_base += rc;
174
        iov.iov_len -= rc;
175
        read_bytes += rc;
176
    } while (read_bytes != VHOST_USER_HDR_SIZE);
177

178
    /* qio_channel_readv_full will make socket fds blocking, unblock them */
179
    vmsg_unblock_fds(vmsg);
180
    if (vmsg->size > sizeof(vmsg->payload)) {
181
        error_report("Error: too big message request: %d, "
182
                     "size: vmsg->size: %u, "
183
                     "while sizeof(vmsg->payload) = %zu",
184
                     vmsg->request, vmsg->size, sizeof(vmsg->payload));
185
        goto fail;
186
    }
187

188
    struct iovec iov_payload = {
189
        .iov_base = (char *)&vmsg->payload,
190
        .iov_len = vmsg->size,
191
    };
192
    if (vmsg->size) {
193
        rc = qio_channel_readv_all_eof(ioc, &iov_payload, 1, &local_err);
194
        if (rc != 1) {
195
            if (local_err) {
196
                error_report_err(local_err);
197
            }
198
            goto fail;
199
        }
200
    }
201

202
    return true;
203

204
fail:
205
    vmsg_close_fds(vmsg);
206

207
    return false;
208
}
209

210
static coroutine_fn void vu_client_trip(void *opaque)
211
{
212
    VuServer *server = opaque;
213
    VuDev *vu_dev = &server->vu_dev;
214

215
    while (!vu_dev->broken) {
216
        if (server->quiescing) {
217
            server->co_trip = NULL;
218
            aio_wait_kick();
219
            return;
220
        }
221
        /* vu_dispatch() returns false if server->ctx went away */
222
        if (!vu_dispatch(vu_dev) && server->ctx) {
223
            break;
224
        }
225
    }
226

227
    if (vhost_user_server_has_in_flight(server)) {
228
        /* Wait for requests to complete before we can unmap the memory */
229
        server->wait_idle = true;
230
        qemu_coroutine_yield();
231
        server->wait_idle = false;
232
    }
233
    assert(!vhost_user_server_has_in_flight(server));
234

235
    vu_deinit(vu_dev);
236

237
    /* vu_deinit() should have called remove_watch() */
238
    assert(QTAILQ_EMPTY(&server->vu_fd_watches));
239

240
    object_unref(OBJECT(server->sioc));
241
    server->sioc = NULL;
242

243
    object_unref(OBJECT(server->ioc));
244
    server->ioc = NULL;
245

246
    server->co_trip = NULL;
247
    if (server->restart_listener_bh) {
248
        qemu_bh_schedule(server->restart_listener_bh);
249
    }
250
    aio_wait_kick();
251
}
252

253
/*
254
 * a wrapper for vu_kick_cb
255
 *
256
 * since aio_dispatch can only pass one user data pointer to the
257
 * callback function, pack VuDev and pvt into a struct. Then unpack it
258
 * and pass them to vu_kick_cb
259
 */
260
static void kick_handler(void *opaque)
261
{
262
    VuFdWatch *vu_fd_watch = opaque;
263
    VuDev *vu_dev = vu_fd_watch->vu_dev;
264

265
    vu_fd_watch->cb(vu_dev, 0, vu_fd_watch->pvt);
266

267
    /* Stop vu_client_trip() if an error occurred in vu_fd_watch->cb() */
268
    if (vu_dev->broken) {
269
        VuServer *server = container_of(vu_dev, VuServer, vu_dev);
270

271
        qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
272
    }
273
}
274

275
static VuFdWatch *find_vu_fd_watch(VuServer *server, int fd)
276
{
277

278
    VuFdWatch *vu_fd_watch, *next;
279
    QTAILQ_FOREACH_SAFE(vu_fd_watch, &server->vu_fd_watches, next, next) {
280
        if (vu_fd_watch->fd == fd) {
281
            return vu_fd_watch;
282
        }
283
    }
284
    return NULL;
285
}
286

287
static void
288
set_watch(VuDev *vu_dev, int fd, int vu_evt,
289
          vu_watch_cb cb, void *pvt)
290
{
291

292
    VuServer *server = container_of(vu_dev, VuServer, vu_dev);
293
    g_assert(vu_dev);
294
    g_assert(fd >= 0);
295
    g_assert(cb);
296

297
    VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd);
298

299
    if (!vu_fd_watch) {
300
        vu_fd_watch = g_new0(VuFdWatch, 1);
301

302
        QTAILQ_INSERT_TAIL(&server->vu_fd_watches, vu_fd_watch, next);
303

304
        vu_fd_watch->fd = fd;
305
        vu_fd_watch->cb = cb;
306
        qemu_socket_set_nonblock(fd);
307
        aio_set_fd_handler(server->ctx, fd, kick_handler,
308
                           NULL, NULL, NULL, vu_fd_watch);
309
        vu_fd_watch->vu_dev = vu_dev;
310
        vu_fd_watch->pvt = pvt;
311
    }
312
}
313

314

315
static void remove_watch(VuDev *vu_dev, int fd)
316
{
317
    VuServer *server;
318
    g_assert(vu_dev);
319
    g_assert(fd >= 0);
320

321
    server = container_of(vu_dev, VuServer, vu_dev);
322

323
    VuFdWatch *vu_fd_watch = find_vu_fd_watch(server, fd);
324

325
    if (!vu_fd_watch) {
326
        return;
327
    }
328
    aio_set_fd_handler(server->ctx, fd, NULL, NULL, NULL, NULL, NULL);
329

330
    QTAILQ_REMOVE(&server->vu_fd_watches, vu_fd_watch, next);
331
    g_free(vu_fd_watch);
332
}
333

334

335
static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
336
                      gpointer opaque)
337
{
338
    VuServer *server = opaque;
339

340
    if (server->sioc) {
341
        warn_report("Only one vhost-user client is allowed to "
342
                    "connect the server one time");
343
        return;
344
    }
345

346
    if (!vu_init(&server->vu_dev, server->max_queues, sioc->fd, panic_cb,
347
                 vu_message_read, set_watch, remove_watch, server->vu_iface)) {
348
        error_report("Failed to initialize libvhost-user");
349
        return;
350
    }
351

352
    /*
353
     * Unset the callback function for network listener to make another
354
     * vhost-user client keeping waiting until this client disconnects
355
     */
356
    qio_net_listener_set_client_func(server->listener,
357
                                     NULL,
358
                                     NULL,
359
                                     NULL);
360
    server->sioc = sioc;
361
    /*
362
     * Increase the object reference, so sioc will not freed by
363
     * qio_net_listener_channel_func which will call object_unref(OBJECT(sioc))
364
     */
365
    object_ref(OBJECT(server->sioc));
366
    qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client");
367
    server->ioc = QIO_CHANNEL(sioc);
368
    object_ref(OBJECT(server->ioc));
369

370
    /* TODO vu_message_write() spins if non-blocking! */
371
    qio_channel_set_blocking(server->ioc, false, NULL);
372

373
    qio_channel_set_follow_coroutine_ctx(server->ioc, true);
374

375
    vhost_user_server_attach_aio_context(server, server->ctx);
376
}
377

378
/* server->ctx acquired by caller */
379
void vhost_user_server_stop(VuServer *server)
380
{
381
    qemu_bh_delete(server->restart_listener_bh);
382
    server->restart_listener_bh = NULL;
383

384
    if (server->sioc) {
385
        VuFdWatch *vu_fd_watch;
386

387
        QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
388
            aio_set_fd_handler(server->ctx, vu_fd_watch->fd,
389
                               NULL, NULL, NULL, NULL, vu_fd_watch);
390
        }
391

392
        qio_channel_shutdown(server->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
393

394
        AIO_WAIT_WHILE(server->ctx, server->co_trip);
395
    }
396

397
    if (server->listener) {
398
        qio_net_listener_disconnect(server->listener);
399
        object_unref(OBJECT(server->listener));
400
    }
401
}
402

403
/*
404
 * Allow the next client to connect to the server. Called from a BH in the main
405
 * loop.
406
 */
407
static void restart_listener_bh(void *opaque)
408
{
409
    VuServer *server = opaque;
410

411
    qio_net_listener_set_client_func(server->listener, vu_accept, server,
412
                                     NULL);
413
}
414

415
/* Called with ctx acquired */
416
void vhost_user_server_attach_aio_context(VuServer *server, AioContext *ctx)
417
{
418
    VuFdWatch *vu_fd_watch;
419

420
    server->ctx = ctx;
421

422
    if (!server->sioc) {
423
        return;
424
    }
425

426
    QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
427
        aio_set_fd_handler(ctx, vu_fd_watch->fd, kick_handler, NULL,
428
                           NULL, NULL, vu_fd_watch);
429
    }
430

431
    if (server->co_trip) {
432
        /*
433
         * The caller didn't fully shut down co_trip (this can happen on
434
         * non-polling drains like in bdrv_graph_wrlock()). This is okay as long
435
         * as it no longer tries to shut it down and we're guaranteed to still
436
         * be in the same AioContext as before.
437
         *
438
         * co_ctx can still be NULL if we get multiple calls and only just
439
         * scheduled a new coroutine in the else branch.
440
         */
441
        AioContext *co_ctx = qemu_coroutine_get_aio_context(server->co_trip);
442

443
        assert(!server->quiescing);
444
        assert(!co_ctx || co_ctx == ctx);
445
    } else {
446
        server->co_trip = qemu_coroutine_create(vu_client_trip, server);
447
        assert(!server->in_qio_channel_yield);
448
        aio_co_schedule(ctx, server->co_trip);
449
    }
450
}
451

452
/* Called with server->ctx acquired */
453
void vhost_user_server_detach_aio_context(VuServer *server)
454
{
455
    if (server->sioc) {
456
        VuFdWatch *vu_fd_watch;
457

458
        QTAILQ_FOREACH(vu_fd_watch, &server->vu_fd_watches, next) {
459
            aio_set_fd_handler(server->ctx, vu_fd_watch->fd,
460
                               NULL, NULL, NULL, NULL, vu_fd_watch);
461
        }
462
    }
463

464
    server->ctx = NULL;
465

466
    if (server->ioc) {
467
        if (server->in_qio_channel_yield) {
468
            /* Stop receiving the next vhost-user message */
469
            qio_channel_wake_read(server->ioc);
470
        }
471
    }
472
}
473

474
bool vhost_user_server_start(VuServer *server,
475
                             SocketAddress *socket_addr,
476
                             AioContext *ctx,
477
                             uint16_t max_queues,
478
                             const VuDevIface *vu_iface,
479
                             Error **errp)
480
{
481
    QEMUBH *bh;
482
    QIONetListener *listener;
483

484
    if (socket_addr->type != SOCKET_ADDRESS_TYPE_UNIX &&
485
        socket_addr->type != SOCKET_ADDRESS_TYPE_FD) {
486
        error_setg(errp, "Only socket address types 'unix' and 'fd' are supported");
487
        return false;
488
    }
489

490
    listener = qio_net_listener_new();
491
    if (qio_net_listener_open_sync(listener, socket_addr, 1,
492
                                   errp) < 0) {
493
        object_unref(OBJECT(listener));
494
        return false;
495
    }
496

497
    bh = qemu_bh_new(restart_listener_bh, server);
498

499
    /* zero out unspecified fields */
500
    *server = (VuServer) {
501
        .listener              = listener,
502
        .restart_listener_bh   = bh,
503
        .vu_iface              = vu_iface,
504
        .max_queues            = max_queues,
505
        .ctx                   = ctx,
506
    };
507

508
    qio_net_listener_set_name(server->listener, "vhost-user-backend-listener");
509

510
    qio_net_listener_set_client_func(server->listener,
511
                                     vu_accept,
512
                                     server,
513
                                     NULL);
514

515
    QTAILQ_INIT(&server->vu_fd_watches);
516
    return true;
517
}
518

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.