qemu
1/*
2* coroutine queues and locks
3*
4* Copyright (c) 2011 Kevin Wolf <kwolf@redhat.com>
5*
6* Permission is hereby granted, free of charge, to any person obtaining a copy
7* of this software and associated documentation files (the "Software"), to deal
8* in the Software without restriction, including without limitation the rights
9* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10* copies of the Software, and to permit persons to whom the Software is
11* furnished to do so, subject to the following conditions:
12*
13* The above copyright notice and this permission notice shall be included in
14* all copies or substantial portions of the Software.
15*
16* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
19* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22* THE SOFTWARE.
23*
24* The lock-free mutex implementation is based on OSv
25* (core/lfmutex.cc, include/lockfree/mutex.hh).
26* Copyright (C) 2013 Cloudius Systems, Ltd.
27*/
28
29#include "qemu/osdep.h"30#include "qemu/coroutine_int.h"31#include "qemu/processor.h"32#include "qemu/queue.h"33#include "block/aio.h"34#include "trace.h"35
36void qemu_co_queue_init(CoQueue *queue)37{
38QSIMPLEQ_INIT(&queue->entries);39}
40
41void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuLockable *lock,42CoQueueWaitFlags flags)43{
44Coroutine *self = qemu_coroutine_self();45if (flags & CO_QUEUE_WAIT_FRONT) {46QSIMPLEQ_INSERT_HEAD(&queue->entries, self, co_queue_next);47} else {48QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);49}50
51if (lock) {52qemu_lockable_unlock(lock);53}54
55/* There is no race condition here. Other threads will call56* aio_co_schedule on our AioContext, which can reenter this
57* coroutine but only after this yield and after the main loop
58* has gone through the next iteration.
59*/
60qemu_coroutine_yield();61assert(qemu_in_coroutine());62
63/* TODO: OSv implements wait morphing here, where the wakeup64* primitive automatically places the woken coroutine on the
65* mutex's queue. This avoids the thundering herd effect.
66* This could be implemented for CoMutexes, but not really for
67* other cases of QemuLockable.
68*/
69if (lock) {70qemu_lockable_lock(lock);71}72}
73
74bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock)75{
76Coroutine *next;77
78next = QSIMPLEQ_FIRST(&queue->entries);79if (!next) {80return false;81}82
83QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);84if (lock) {85qemu_lockable_unlock(lock);86}87aio_co_wake(next);88if (lock) {89qemu_lockable_lock(lock);90}91return true;92}
93
94bool coroutine_fn qemu_co_queue_next(CoQueue *queue)95{
96/* No unlock/lock needed in coroutine context. */97return qemu_co_enter_next_impl(queue, NULL);98}
99
100void qemu_co_enter_all_impl(CoQueue *queue, QemuLockable *lock)101{
102while (qemu_co_enter_next_impl(queue, lock)) {103/* just loop */104}105}
106
107void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue)108{
109/* No unlock/lock needed in coroutine context. */110qemu_co_enter_all_impl(queue, NULL);111}
112
113bool qemu_co_queue_empty(CoQueue *queue)114{
115return QSIMPLEQ_FIRST(&queue->entries) == NULL;116}
117
118/* The wait records are handled with a multiple-producer, single-consumer
119* lock-free queue. There cannot be two concurrent pop_waiter() calls
120* because pop_waiter() can only be called while mutex->handoff is zero.
121* This can happen in three cases:
122* - in qemu_co_mutex_unlock, before the hand-off protocol has started.
123* In this case, qemu_co_mutex_lock will see mutex->handoff == 0 and
124* not take part in the handoff.
125* - in qemu_co_mutex_lock, if it steals the hand-off responsibility from
126* qemu_co_mutex_unlock. In this case, qemu_co_mutex_unlock will fail
127* the cmpxchg (it will see either 0 or the next sequence value) and
128* exit. The next hand-off cannot begin until qemu_co_mutex_lock has
129* woken up someone.
130* - in qemu_co_mutex_unlock, if it takes the hand-off token itself.
131* In this case another iteration starts with mutex->handoff == 0;
132* a concurrent qemu_co_mutex_lock will fail the cmpxchg, and
133* qemu_co_mutex_unlock will go back to case (1).
134*
135* The following functions manage this queue.
136*/
137typedef struct CoWaitRecord {138Coroutine *co;139QSLIST_ENTRY(CoWaitRecord) next;140} CoWaitRecord;141
142static void coroutine_fn push_waiter(CoMutex *mutex, CoWaitRecord *w)143{
144w->co = qemu_coroutine_self();145QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next);146}
147
148static void move_waiters(CoMutex *mutex)149{
150QSLIST_HEAD(, CoWaitRecord) reversed;151QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push);152while (!QSLIST_EMPTY(&reversed)) {153CoWaitRecord *w = QSLIST_FIRST(&reversed);154QSLIST_REMOVE_HEAD(&reversed, next);155QSLIST_INSERT_HEAD(&mutex->to_pop, w, next);156}157}
158
159static CoWaitRecord *pop_waiter(CoMutex *mutex)160{
161CoWaitRecord *w;162
163if (QSLIST_EMPTY(&mutex->to_pop)) {164move_waiters(mutex);165if (QSLIST_EMPTY(&mutex->to_pop)) {166return NULL;167}168}169w = QSLIST_FIRST(&mutex->to_pop);170QSLIST_REMOVE_HEAD(&mutex->to_pop, next);171return w;172}
173
174static bool has_waiters(CoMutex *mutex)175{
176return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push);177}
178
179void qemu_co_mutex_init(CoMutex *mutex)180{
181memset(mutex, 0, sizeof(*mutex));182}
183
184static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co)185{
186/* Read co before co->ctx; pairs with smp_wmb() in187* qemu_coroutine_enter().
188*/
189smp_read_barrier_depends();190mutex->ctx = co->ctx;191aio_co_wake(co);192}
193
194static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx,195CoMutex *mutex)196{
197Coroutine *self = qemu_coroutine_self();198CoWaitRecord w;199unsigned old_handoff;200
201trace_qemu_co_mutex_lock_entry(mutex, self);202push_waiter(mutex, &w);203
204/*205* Add waiter before reading mutex->handoff. Pairs with qatomic_set_mb
206* in qemu_co_mutex_unlock.
207*/
208smp_mb__after_rmw();209
210/* This is the "Responsibility Hand-Off" protocol; a lock() picks from211* a concurrent unlock() the responsibility of waking somebody up.
212*/
213old_handoff = qatomic_read(&mutex->handoff);214if (old_handoff &&215has_waiters(mutex) &&216qatomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) {217/* There can be no concurrent pops, because there can be only218* one active handoff at a time.
219*/
220CoWaitRecord *to_wake = pop_waiter(mutex);221Coroutine *co = to_wake->co;222if (co == self) {223/* We got the lock ourselves! */224assert(to_wake == &w);225mutex->ctx = ctx;226return;227}228
229qemu_co_mutex_wake(mutex, co);230}231
232qemu_coroutine_yield();233trace_qemu_co_mutex_lock_return(mutex, self);234}
235
236void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)237{
238AioContext *ctx = qemu_get_current_aio_context();239Coroutine *self = qemu_coroutine_self();240int waiters, i;241
242/* Running a very small critical section on pthread_mutex_t and CoMutex243* shows that pthread_mutex_t is much faster because it doesn't actually
244* go to sleep. What happens is that the critical section is shorter
245* than the latency of entering the kernel and thus FUTEX_WAIT always
246* fails. With CoMutex there is no such latency but you still want to
247* avoid wait and wakeup. So introduce it artificially.
248*/
249i = 0;250retry_fast_path:251waiters = qatomic_cmpxchg(&mutex->locked, 0, 1);252if (waiters != 0) {253while (waiters == 1 && ++i < 1000) {254if (qatomic_read(&mutex->ctx) == ctx) {255break;256}257if (qatomic_read(&mutex->locked) == 0) {258goto retry_fast_path;259}260cpu_relax();261}262waiters = qatomic_fetch_inc(&mutex->locked);263}264
265if (waiters == 0) {266/* Uncontended. */267trace_qemu_co_mutex_lock_uncontended(mutex, self);268mutex->ctx = ctx;269} else {270qemu_co_mutex_lock_slowpath(ctx, mutex);271}272mutex->holder = self;273self->locks_held++;274}
275
276void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)277{
278Coroutine *self = qemu_coroutine_self();279
280trace_qemu_co_mutex_unlock_entry(mutex, self);281
282assert(mutex->locked);283assert(mutex->holder == self);284assert(qemu_in_coroutine());285
286mutex->ctx = NULL;287mutex->holder = NULL;288self->locks_held--;289if (qatomic_fetch_dec(&mutex->locked) == 1) {290/* No waiting qemu_co_mutex_lock(). Pfew, that was easy! */291return;292}293
294for (;;) {295CoWaitRecord *to_wake = pop_waiter(mutex);296unsigned our_handoff;297
298if (to_wake) {299qemu_co_mutex_wake(mutex, to_wake->co);300break;301}302
303/* Some concurrent lock() is in progress (we know this because304* mutex->locked was >1) but it hasn't yet put itself on the wait
305* queue. Pick a sequence number for the handoff protocol (not 0).
306*/
307if (++mutex->sequence == 0) {308mutex->sequence = 1;309}310
311our_handoff = mutex->sequence;312/* Set handoff before checking for waiters. */313qatomic_set_mb(&mutex->handoff, our_handoff);314if (!has_waiters(mutex)) {315/* The concurrent lock has not added itself yet, so it316* will be able to pick our handoff.
317*/
318break;319}320
321/* Try to do the handoff protocol ourselves; if somebody else has322* already taken it, however, we're done and they're responsible.
323*/
324if (qatomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) {325break;326}327}328
329trace_qemu_co_mutex_unlock_return(mutex, self);330}
331
332struct CoRwTicket {333bool read;334Coroutine *co;335QSIMPLEQ_ENTRY(CoRwTicket) next;336};337
338void qemu_co_rwlock_init(CoRwlock *lock)339{
340qemu_co_mutex_init(&lock->mutex);341lock->owners = 0;342QSIMPLEQ_INIT(&lock->tickets);343}
344
345/* Releases the internal CoMutex. */
346static void coroutine_fn qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)347{
348CoRwTicket *tkt = QSIMPLEQ_FIRST(&lock->tickets);349Coroutine *co = NULL;350
351/*352* Setting lock->owners here prevents rdlock and wrlock from
353* sneaking in between unlock and wake.
354*/
355
356if (tkt) {357if (tkt->read) {358if (lock->owners >= 0) {359lock->owners++;360co = tkt->co;361}362} else {363if (lock->owners == 0) {364lock->owners = -1;365co = tkt->co;366}367}368}369
370if (co) {371QSIMPLEQ_REMOVE_HEAD(&lock->tickets, next);372qemu_co_mutex_unlock(&lock->mutex);373aio_co_wake(co);374} else {375qemu_co_mutex_unlock(&lock->mutex);376}377}
378
379void coroutine_fn qemu_co_rwlock_rdlock(CoRwlock *lock)380{
381Coroutine *self = qemu_coroutine_self();382
383qemu_co_mutex_lock(&lock->mutex);384/* For fairness, wait if a writer is in line. */385if (lock->owners == 0 || (lock->owners > 0 && QSIMPLEQ_EMPTY(&lock->tickets))) {386lock->owners++;387qemu_co_mutex_unlock(&lock->mutex);388} else {389CoRwTicket my_ticket = { true, self };390
391QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);392qemu_co_mutex_unlock(&lock->mutex);393qemu_coroutine_yield();394assert(lock->owners >= 1);395
396/* Possibly wake another reader, which will wake the next in line. */397qemu_co_mutex_lock(&lock->mutex);398qemu_co_rwlock_maybe_wake_one(lock);399}400
401self->locks_held++;402}
403
404void coroutine_fn qemu_co_rwlock_unlock(CoRwlock *lock)405{
406Coroutine *self = qemu_coroutine_self();407
408assert(qemu_in_coroutine());409self->locks_held--;410
411qemu_co_mutex_lock(&lock->mutex);412if (lock->owners > 0) {413lock->owners--;414} else {415assert(lock->owners == -1);416lock->owners = 0;417}418
419qemu_co_rwlock_maybe_wake_one(lock);420}
421
422void coroutine_fn qemu_co_rwlock_downgrade(CoRwlock *lock)423{
424qemu_co_mutex_lock(&lock->mutex);425assert(lock->owners == -1);426lock->owners = 1;427
428/* Possibly wake another reader, which will wake the next in line. */429qemu_co_rwlock_maybe_wake_one(lock);430}
431
432void coroutine_fn qemu_co_rwlock_wrlock(CoRwlock *lock)433{
434Coroutine *self = qemu_coroutine_self();435
436qemu_co_mutex_lock(&lock->mutex);437if (lock->owners == 0) {438lock->owners = -1;439qemu_co_mutex_unlock(&lock->mutex);440} else {441CoRwTicket my_ticket = { false, qemu_coroutine_self() };442
443QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);444qemu_co_mutex_unlock(&lock->mutex);445qemu_coroutine_yield();446assert(lock->owners == -1);447}448
449self->locks_held++;450}
451
452void coroutine_fn qemu_co_rwlock_upgrade(CoRwlock *lock)453{
454qemu_co_mutex_lock(&lock->mutex);455assert(lock->owners > 0);456/* For fairness, wait if a writer is in line. */457if (lock->owners == 1 && QSIMPLEQ_EMPTY(&lock->tickets)) {458lock->owners = -1;459qemu_co_mutex_unlock(&lock->mutex);460} else {461CoRwTicket my_ticket = { false, qemu_coroutine_self() };462
463lock->owners--;464QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);465qemu_co_rwlock_maybe_wake_one(lock);466qemu_coroutine_yield();467assert(lock->owners == -1);468}469}
470