4
* Copyright (c) 2015 Red Hat, Inc.
6
* This library is free software; you can redistribute it and/or
7
* modify it under the terms of the GNU Lesser General Public
8
* License as published by the Free Software Foundation; either
9
* version 2.1 of the License, or (at your option) any later version.
11
* This library is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14
* Lesser General Public License for more details.
16
* You should have received a copy of the GNU Lesser General Public
17
* License along with this library; if not, see <http://www.gnu.org/licenses/>.
21
#include "qemu/osdep.h"
22
#include "block/aio-wait.h"
23
#include "io/channel.h"
24
#include "qapi/error.h"
25
#include "qemu/main-loop.h"
26
#include "qemu/module.h"
29
bool qio_channel_has_feature(QIOChannel *ioc,
30
QIOChannelFeature feature)
32
return ioc->features & (1 << feature);
36
void qio_channel_set_feature(QIOChannel *ioc,
37
QIOChannelFeature feature)
39
ioc->features |= (1 << feature);
43
void qio_channel_set_name(QIOChannel *ioc,
47
ioc->name = g_strdup(name);
51
ssize_t qio_channel_readv_full(QIOChannel *ioc,
52
const struct iovec *iov,
59
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
62
!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
63
error_setg_errno(errp, EINVAL,
64
"Channel does not support file descriptor passing");
68
if ((flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) &&
69
!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_READ_MSG_PEEK)) {
70
error_setg_errno(errp, EINVAL,
71
"Channel does not support peek read");
75
return klass->io_readv(ioc, iov, niov, fds, nfds, flags, errp);
79
ssize_t qio_channel_writev_full(QIOChannel *ioc,
80
const struct iovec *iov,
87
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
90
if (!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
91
error_setg_errno(errp, EINVAL,
92
"Channel does not support file descriptor passing");
95
if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
96
error_setg_errno(errp, EINVAL,
97
"Zero Copy does not support file descriptor passing");
102
if ((flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) &&
103
!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) {
104
error_setg_errno(errp, EINVAL,
105
"Requested Zero Copy feature is not available");
109
return klass->io_writev(ioc, iov, niov, fds, nfds, flags, errp);
113
int coroutine_mixed_fn qio_channel_readv_all_eof(QIOChannel *ioc,
114
const struct iovec *iov,
118
return qio_channel_readv_full_all_eof(ioc, iov, niov, NULL, NULL, errp);
121
int coroutine_mixed_fn qio_channel_readv_all(QIOChannel *ioc,
122
const struct iovec *iov,
126
return qio_channel_readv_full_all(ioc, iov, niov, NULL, NULL, errp);
129
int coroutine_mixed_fn qio_channel_readv_full_all_eof(QIOChannel *ioc,
130
const struct iovec *iov,
132
int **fds, size_t *nfds,
136
struct iovec *local_iov = g_new(struct iovec, niov);
137
struct iovec *local_iov_head = local_iov;
138
unsigned int nlocal_iov = niov;
139
int **local_fds = fds;
140
size_t *local_nfds = nfds;
141
bool partial = false;
151
nlocal_iov = iov_copy(local_iov, nlocal_iov,
153
0, iov_size(iov, niov));
155
while ((nlocal_iov > 0) || local_fds) {
157
len = qio_channel_readv_full(ioc, local_iov, nlocal_iov, local_fds,
158
local_nfds, 0, errp);
159
if (len == QIO_CHANNEL_ERR_BLOCK) {
160
if (qemu_in_coroutine()) {
161
qio_channel_yield(ioc, G_IO_IN);
163
qio_channel_wait(ioc, G_IO_IN);
169
if (local_nfds && *local_nfds) {
171
* Got some FDs, but no data yet. This isn't an EOF
172
* scenario (yet), so carry on to try to read data
173
* on next loop iteration
176
} else if (!partial) {
177
/* No fds and no data - EOF before any data read */
183
"Unexpected end-of-file before all data were read");
184
/* Fallthrough into len < 0 handling */
189
/* Close any FDs we previously received */
192
for (i = 0; i < (*nfds); i++) {
203
iov_discard_front(&local_iov, &nlocal_iov, len);
215
g_free(local_iov_head);
219
int coroutine_mixed_fn qio_channel_readv_full_all(QIOChannel *ioc,
220
const struct iovec *iov,
222
int **fds, size_t *nfds,
225
int ret = qio_channel_readv_full_all_eof(ioc, iov, niov, fds, nfds, errp);
228
error_setg(errp, "Unexpected end-of-file before all data were read");
238
int coroutine_mixed_fn qio_channel_writev_all(QIOChannel *ioc,
239
const struct iovec *iov,
243
return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, 0, errp);
246
int coroutine_mixed_fn qio_channel_writev_full_all(QIOChannel *ioc,
247
const struct iovec *iov,
249
int *fds, size_t nfds,
250
int flags, Error **errp)
253
struct iovec *local_iov = g_new(struct iovec, niov);
254
struct iovec *local_iov_head = local_iov;
255
unsigned int nlocal_iov = niov;
257
nlocal_iov = iov_copy(local_iov, nlocal_iov,
259
0, iov_size(iov, niov));
261
while (nlocal_iov > 0) {
264
len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds,
267
if (len == QIO_CHANNEL_ERR_BLOCK) {
268
if (qemu_in_coroutine()) {
269
qio_channel_yield(ioc, G_IO_OUT);
271
qio_channel_wait(ioc, G_IO_OUT);
279
iov_discard_front(&local_iov, &nlocal_iov, len);
287
g_free(local_iov_head);
291
ssize_t qio_channel_readv(QIOChannel *ioc,
292
const struct iovec *iov,
296
return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, 0, errp);
300
ssize_t qio_channel_writev(QIOChannel *ioc,
301
const struct iovec *iov,
305
return qio_channel_writev_full(ioc, iov, niov, NULL, 0, 0, errp);
309
ssize_t qio_channel_read(QIOChannel *ioc,
314
struct iovec iov = { .iov_base = buf, .iov_len = buflen };
315
return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, 0, errp);
319
ssize_t qio_channel_write(QIOChannel *ioc,
324
struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
325
return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, 0, errp);
329
int coroutine_mixed_fn qio_channel_read_all_eof(QIOChannel *ioc,
334
struct iovec iov = { .iov_base = buf, .iov_len = buflen };
335
return qio_channel_readv_all_eof(ioc, &iov, 1, errp);
339
int coroutine_mixed_fn qio_channel_read_all(QIOChannel *ioc,
344
struct iovec iov = { .iov_base = buf, .iov_len = buflen };
345
return qio_channel_readv_all(ioc, &iov, 1, errp);
349
int coroutine_mixed_fn qio_channel_write_all(QIOChannel *ioc,
354
struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
355
return qio_channel_writev_all(ioc, &iov, 1, errp);
359
int qio_channel_set_blocking(QIOChannel *ioc,
363
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
364
return klass->io_set_blocking(ioc, enabled, errp);
368
void qio_channel_set_follow_coroutine_ctx(QIOChannel *ioc, bool enabled)
370
ioc->follow_coroutine_ctx = enabled;
374
int qio_channel_close(QIOChannel *ioc,
377
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
378
return klass->io_close(ioc, errp);
382
GSource *qio_channel_create_watch(QIOChannel *ioc,
383
GIOCondition condition)
385
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
386
GSource *ret = klass->io_create_watch(ioc, condition);
389
g_source_set_name(ret, ioc->name);
396
void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
397
AioContext *read_ctx,
399
AioContext *write_ctx,
403
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
405
klass->io_set_aio_fd_handler(ioc, read_ctx, io_read, write_ctx, io_write,
409
guint qio_channel_add_watch_full(QIOChannel *ioc,
410
GIOCondition condition,
413
GDestroyNotify notify,
414
GMainContext *context)
419
source = qio_channel_create_watch(ioc, condition);
421
g_source_set_callback(source, (GSourceFunc)func, user_data, notify);
423
id = g_source_attach(source, context);
424
g_source_unref(source);
429
guint qio_channel_add_watch(QIOChannel *ioc,
430
GIOCondition condition,
433
GDestroyNotify notify)
435
return qio_channel_add_watch_full(ioc, condition, func,
436
user_data, notify, NULL);
439
GSource *qio_channel_add_watch_source(QIOChannel *ioc,
440
GIOCondition condition,
443
GDestroyNotify notify,
444
GMainContext *context)
449
id = qio_channel_add_watch_full(ioc, condition, func,
450
user_data, notify, context);
451
source = g_main_context_find_source_by_id(context, id);
452
g_source_ref(source);
457
ssize_t qio_channel_pwritev(QIOChannel *ioc, const struct iovec *iov,
458
size_t niov, off_t offset, Error **errp)
460
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
462
if (!klass->io_pwritev) {
463
error_setg(errp, "Channel does not support pwritev");
467
if (!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_SEEKABLE)) {
468
error_setg_errno(errp, EINVAL, "Requested channel is not seekable");
472
return klass->io_pwritev(ioc, iov, niov, offset, errp);
475
ssize_t qio_channel_pwrite(QIOChannel *ioc, char *buf, size_t buflen,
476
off_t offset, Error **errp)
483
return qio_channel_pwritev(ioc, &iov, 1, offset, errp);
486
ssize_t qio_channel_preadv(QIOChannel *ioc, const struct iovec *iov,
487
size_t niov, off_t offset, Error **errp)
489
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
491
if (!klass->io_preadv) {
492
error_setg(errp, "Channel does not support preadv");
496
if (!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_SEEKABLE)) {
497
error_setg_errno(errp, EINVAL, "Requested channel is not seekable");
501
return klass->io_preadv(ioc, iov, niov, offset, errp);
504
ssize_t qio_channel_pread(QIOChannel *ioc, char *buf, size_t buflen,
505
off_t offset, Error **errp)
512
return qio_channel_preadv(ioc, &iov, 1, offset, errp);
515
int qio_channel_shutdown(QIOChannel *ioc,
516
QIOChannelShutdown how,
519
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
521
if (!klass->io_shutdown) {
522
error_setg(errp, "Data path shutdown not supported");
526
return klass->io_shutdown(ioc, how, errp);
530
void qio_channel_set_delay(QIOChannel *ioc,
533
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
535
if (klass->io_set_delay) {
536
klass->io_set_delay(ioc, enabled);
541
void qio_channel_set_cork(QIOChannel *ioc,
544
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
546
if (klass->io_set_cork) {
547
klass->io_set_cork(ioc, enabled);
551
int qio_channel_get_peerpid(QIOChannel *ioc,
555
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
557
if (!klass->io_peerpid) {
558
error_setg(errp, "Channel does not support peer pid");
561
klass->io_peerpid(ioc, pid, errp);
565
off_t qio_channel_io_seek(QIOChannel *ioc,
570
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
572
if (!klass->io_seek) {
573
error_setg(errp, "Channel does not support random access");
577
return klass->io_seek(ioc, offset, whence, errp);
580
int qio_channel_flush(QIOChannel *ioc,
583
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
585
if (!klass->io_flush ||
586
!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) {
590
return klass->io_flush(ioc, errp);
594
static void qio_channel_restart_read(void *opaque)
596
QIOChannel *ioc = opaque;
597
Coroutine *co = qatomic_xchg(&ioc->read_coroutine, NULL);
603
/* Assert that aio_co_wake() reenters the coroutine directly */
604
assert(qemu_get_current_aio_context() ==
605
qemu_coroutine_get_aio_context(co));
609
static void qio_channel_restart_write(void *opaque)
611
QIOChannel *ioc = opaque;
612
Coroutine *co = qatomic_xchg(&ioc->write_coroutine, NULL);
618
/* Assert that aio_co_wake() reenters the coroutine directly */
619
assert(qemu_get_current_aio_context() ==
620
qemu_coroutine_get_aio_context(co));
624
static void coroutine_fn
625
qio_channel_set_fd_handlers(QIOChannel *ioc, GIOCondition condition)
627
AioContext *ctx = ioc->follow_coroutine_ctx ?
628
qemu_coroutine_get_aio_context(qemu_coroutine_self()) :
629
iohandler_get_aio_context();
630
AioContext *read_ctx = NULL;
631
IOHandler *io_read = NULL;
632
AioContext *write_ctx = NULL;
633
IOHandler *io_write = NULL;
635
if (condition == G_IO_IN) {
636
ioc->read_coroutine = qemu_coroutine_self();
639
io_read = qio_channel_restart_read;
642
* Thread safety: if the other coroutine is set and its AioContext
643
* matches ours, then there is mutual exclusion between read and write
644
* because they share a single thread and it's safe to set both read
645
* and write fd handlers here. If the AioContext does not match ours,
646
* then both threads may run in parallel but there is no shared state
649
if (ioc->write_coroutine && ioc->write_ctx == ctx) {
651
io_write = qio_channel_restart_write;
653
} else if (condition == G_IO_OUT) {
654
ioc->write_coroutine = qemu_coroutine_self();
655
ioc->write_ctx = ctx;
657
io_write = qio_channel_restart_write;
658
if (ioc->read_coroutine && ioc->read_ctx == ctx) {
660
io_read = qio_channel_restart_read;
666
qio_channel_set_aio_fd_handler(ioc, read_ctx, io_read,
667
write_ctx, io_write, ioc);
670
static void coroutine_fn
671
qio_channel_clear_fd_handlers(QIOChannel *ioc, GIOCondition condition)
673
AioContext *read_ctx = NULL;
674
IOHandler *io_read = NULL;
675
AioContext *write_ctx = NULL;
676
IOHandler *io_write = NULL;
679
if (condition == G_IO_IN) {
683
if (ioc->write_coroutine && ioc->write_ctx == ctx) {
685
io_write = qio_channel_restart_write;
687
} else if (condition == G_IO_OUT) {
688
ctx = ioc->write_ctx;
691
if (ioc->read_coroutine && ioc->read_ctx == ctx) {
693
io_read = qio_channel_restart_read;
699
qio_channel_set_aio_fd_handler(ioc, read_ctx, io_read,
700
write_ctx, io_write, ioc);
703
void coroutine_fn qio_channel_yield(QIOChannel *ioc,
704
GIOCondition condition)
708
assert(qemu_in_coroutine());
709
ioc_ctx = qemu_coroutine_get_aio_context(qemu_coroutine_self());
711
if (condition == G_IO_IN) {
712
assert(!ioc->read_coroutine);
713
} else if (condition == G_IO_OUT) {
714
assert(!ioc->write_coroutine);
718
qio_channel_set_fd_handlers(ioc, condition);
719
qemu_coroutine_yield();
720
assert(in_aio_context_home_thread(ioc_ctx));
722
/* Allow interrupting the operation by reentering the coroutine other than
723
* through the aio_fd_handlers. */
724
if (condition == G_IO_IN) {
725
assert(ioc->read_coroutine == NULL);
726
} else if (condition == G_IO_OUT) {
727
assert(ioc->write_coroutine == NULL);
729
qio_channel_clear_fd_handlers(ioc, condition);
732
void qio_channel_wake_read(QIOChannel *ioc)
734
Coroutine *co = qatomic_xchg(&ioc->read_coroutine, NULL);
740
static gboolean qio_channel_wait_complete(QIOChannel *ioc,
741
GIOCondition condition,
744
GMainLoop *loop = opaque;
746
g_main_loop_quit(loop);
751
void qio_channel_wait(QIOChannel *ioc,
752
GIOCondition condition)
754
GMainContext *ctxt = g_main_context_new();
755
GMainLoop *loop = g_main_loop_new(ctxt, TRUE);
758
source = qio_channel_create_watch(ioc, condition);
760
g_source_set_callback(source,
761
(GSourceFunc)qio_channel_wait_complete,
765
g_source_attach(source, ctxt);
767
g_main_loop_run(loop);
769
g_source_unref(source);
770
g_main_loop_unref(loop);
771
g_main_context_unref(ctxt);
775
static void qio_channel_finalize(Object *obj)
777
QIOChannel *ioc = QIO_CHANNEL(obj);
779
/* Must not have coroutines in qio_channel_yield() */
780
assert(!ioc->read_coroutine);
781
assert(!ioc->write_coroutine);
787
CloseHandle(ioc->event);
792
static const TypeInfo qio_channel_info = {
793
.parent = TYPE_OBJECT,
794
.name = TYPE_QIO_CHANNEL,
795
.instance_size = sizeof(QIOChannel),
796
.instance_finalize = qio_channel_finalize,
798
.class_size = sizeof(QIOChannelClass),
802
static void qio_channel_register_types(void)
804
type_register_static(&qio_channel_info);
808
type_init(qio_channel_register_types);