qemu

Форк
0
/
qemu-coroutine-lock.c 
469 строк · 13.7 Кб
1
/*
2
 * coroutine queues and locks
3
 *
4
 * Copyright (c) 2011 Kevin Wolf <kwolf@redhat.com>
5
 *
6
 * Permission is hereby granted, free of charge, to any person obtaining a copy
7
 * of this software and associated documentation files (the "Software"), to deal
8
 * in the Software without restriction, including without limitation the rights
9
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10
 * copies of the Software, and to permit persons to whom the Software is
11
 * furnished to do so, subject to the following conditions:
12
 *
13
 * The above copyright notice and this permission notice shall be included in
14
 * all copies or substantial portions of the Software.
15
 *
16
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
19
 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22
 * THE SOFTWARE.
23
 *
24
 * The lock-free mutex implementation is based on OSv
25
 * (core/lfmutex.cc, include/lockfree/mutex.hh).
26
 * Copyright (C) 2013 Cloudius Systems, Ltd.
27
 */
28

29
#include "qemu/osdep.h"
30
#include "qemu/coroutine_int.h"
31
#include "qemu/processor.h"
32
#include "qemu/queue.h"
33
#include "block/aio.h"
34
#include "trace.h"
35

36
void qemu_co_queue_init(CoQueue *queue)
37
{
38
    QSIMPLEQ_INIT(&queue->entries);
39
}
40

41
void coroutine_fn qemu_co_queue_wait_impl(CoQueue *queue, QemuLockable *lock,
42
                                          CoQueueWaitFlags flags)
43
{
44
    Coroutine *self = qemu_coroutine_self();
45
    if (flags & CO_QUEUE_WAIT_FRONT) {
46
        QSIMPLEQ_INSERT_HEAD(&queue->entries, self, co_queue_next);
47
    } else {
48
        QSIMPLEQ_INSERT_TAIL(&queue->entries, self, co_queue_next);
49
    }
50

51
    if (lock) {
52
        qemu_lockable_unlock(lock);
53
    }
54

55
    /* There is no race condition here.  Other threads will call
56
     * aio_co_schedule on our AioContext, which can reenter this
57
     * coroutine but only after this yield and after the main loop
58
     * has gone through the next iteration.
59
     */
60
    qemu_coroutine_yield();
61
    assert(qemu_in_coroutine());
62

63
    /* TODO: OSv implements wait morphing here, where the wakeup
64
     * primitive automatically places the woken coroutine on the
65
     * mutex's queue.  This avoids the thundering herd effect.
66
     * This could be implemented for CoMutexes, but not really for
67
     * other cases of QemuLockable.
68
     */
69
    if (lock) {
70
        qemu_lockable_lock(lock);
71
    }
72
}
73

74
bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock)
75
{
76
    Coroutine *next;
77

78
    next = QSIMPLEQ_FIRST(&queue->entries);
79
    if (!next) {
80
        return false;
81
    }
82

83
    QSIMPLEQ_REMOVE_HEAD(&queue->entries, co_queue_next);
84
    if (lock) {
85
        qemu_lockable_unlock(lock);
86
    }
87
    aio_co_wake(next);
88
    if (lock) {
89
        qemu_lockable_lock(lock);
90
    }
91
    return true;
92
}
93

94
bool coroutine_fn qemu_co_queue_next(CoQueue *queue)
95
{
96
    /* No unlock/lock needed in coroutine context.  */
97
    return qemu_co_enter_next_impl(queue, NULL);
98
}
99

100
void qemu_co_enter_all_impl(CoQueue *queue, QemuLockable *lock)
101
{
102
    while (qemu_co_enter_next_impl(queue, lock)) {
103
        /* just loop */
104
    }
105
}
106

107
void coroutine_fn qemu_co_queue_restart_all(CoQueue *queue)
108
{
109
    /* No unlock/lock needed in coroutine context.  */
110
    qemu_co_enter_all_impl(queue, NULL);
111
}
112

113
bool qemu_co_queue_empty(CoQueue *queue)
114
{
115
    return QSIMPLEQ_FIRST(&queue->entries) == NULL;
116
}
117

118
/* The wait records are handled with a multiple-producer, single-consumer
119
 * lock-free queue.  There cannot be two concurrent pop_waiter() calls
120
 * because pop_waiter() can only be called while mutex->handoff is zero.
121
 * This can happen in three cases:
122
 * - in qemu_co_mutex_unlock, before the hand-off protocol has started.
123
 *   In this case, qemu_co_mutex_lock will see mutex->handoff == 0 and
124
 *   not take part in the handoff.
125
 * - in qemu_co_mutex_lock, if it steals the hand-off responsibility from
126
 *   qemu_co_mutex_unlock.  In this case, qemu_co_mutex_unlock will fail
127
 *   the cmpxchg (it will see either 0 or the next sequence value) and
128
 *   exit.  The next hand-off cannot begin until qemu_co_mutex_lock has
129
 *   woken up someone.
130
 * - in qemu_co_mutex_unlock, if it takes the hand-off token itself.
131
 *   In this case another iteration starts with mutex->handoff == 0;
132
 *   a concurrent qemu_co_mutex_lock will fail the cmpxchg, and
133
 *   qemu_co_mutex_unlock will go back to case (1).
134
 *
135
 * The following functions manage this queue.
136
 */
137
typedef struct CoWaitRecord {
138
    Coroutine *co;
139
    QSLIST_ENTRY(CoWaitRecord) next;
140
} CoWaitRecord;
141

142
static void coroutine_fn push_waiter(CoMutex *mutex, CoWaitRecord *w)
143
{
144
    w->co = qemu_coroutine_self();
145
    QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next);
146
}
147

148
static void move_waiters(CoMutex *mutex)
149
{
150
    QSLIST_HEAD(, CoWaitRecord) reversed;
151
    QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push);
152
    while (!QSLIST_EMPTY(&reversed)) {
153
        CoWaitRecord *w = QSLIST_FIRST(&reversed);
154
        QSLIST_REMOVE_HEAD(&reversed, next);
155
        QSLIST_INSERT_HEAD(&mutex->to_pop, w, next);
156
    }
157
}
158

159
static CoWaitRecord *pop_waiter(CoMutex *mutex)
160
{
161
    CoWaitRecord *w;
162

163
    if (QSLIST_EMPTY(&mutex->to_pop)) {
164
        move_waiters(mutex);
165
        if (QSLIST_EMPTY(&mutex->to_pop)) {
166
            return NULL;
167
        }
168
    }
169
    w = QSLIST_FIRST(&mutex->to_pop);
170
    QSLIST_REMOVE_HEAD(&mutex->to_pop, next);
171
    return w;
172
}
173

174
static bool has_waiters(CoMutex *mutex)
175
{
176
    return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push);
177
}
178

179
void qemu_co_mutex_init(CoMutex *mutex)
180
{
181
    memset(mutex, 0, sizeof(*mutex));
182
}
183

184
static void coroutine_fn qemu_co_mutex_wake(CoMutex *mutex, Coroutine *co)
185
{
186
    /* Read co before co->ctx; pairs with smp_wmb() in
187
     * qemu_coroutine_enter().
188
     */
189
    smp_read_barrier_depends();
190
    mutex->ctx = co->ctx;
191
    aio_co_wake(co);
192
}
193

194
static void coroutine_fn qemu_co_mutex_lock_slowpath(AioContext *ctx,
195
                                                     CoMutex *mutex)
196
{
197
    Coroutine *self = qemu_coroutine_self();
198
    CoWaitRecord w;
199
    unsigned old_handoff;
200

201
    trace_qemu_co_mutex_lock_entry(mutex, self);
202
    push_waiter(mutex, &w);
203

204
    /*
205
     * Add waiter before reading mutex->handoff.  Pairs with qatomic_set_mb
206
     * in qemu_co_mutex_unlock.
207
     */
208
    smp_mb__after_rmw();
209

210
    /* This is the "Responsibility Hand-Off" protocol; a lock() picks from
211
     * a concurrent unlock() the responsibility of waking somebody up.
212
     */
213
    old_handoff = qatomic_read(&mutex->handoff);
214
    if (old_handoff &&
215
        has_waiters(mutex) &&
216
        qatomic_cmpxchg(&mutex->handoff, old_handoff, 0) == old_handoff) {
217
        /* There can be no concurrent pops, because there can be only
218
         * one active handoff at a time.
219
         */
220
        CoWaitRecord *to_wake = pop_waiter(mutex);
221
        Coroutine *co = to_wake->co;
222
        if (co == self) {
223
            /* We got the lock ourselves!  */
224
            assert(to_wake == &w);
225
            mutex->ctx = ctx;
226
            return;
227
        }
228

229
        qemu_co_mutex_wake(mutex, co);
230
    }
231

232
    qemu_coroutine_yield();
233
    trace_qemu_co_mutex_lock_return(mutex, self);
234
}
235

236
void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
237
{
238
    AioContext *ctx = qemu_get_current_aio_context();
239
    Coroutine *self = qemu_coroutine_self();
240
    int waiters, i;
241

242
    /* Running a very small critical section on pthread_mutex_t and CoMutex
243
     * shows that pthread_mutex_t is much faster because it doesn't actually
244
     * go to sleep.  What happens is that the critical section is shorter
245
     * than the latency of entering the kernel and thus FUTEX_WAIT always
246
     * fails.  With CoMutex there is no such latency but you still want to
247
     * avoid wait and wakeup.  So introduce it artificially.
248
     */
249
    i = 0;
250
retry_fast_path:
251
    waiters = qatomic_cmpxchg(&mutex->locked, 0, 1);
252
    if (waiters != 0) {
253
        while (waiters == 1 && ++i < 1000) {
254
            if (qatomic_read(&mutex->ctx) == ctx) {
255
                break;
256
            }
257
            if (qatomic_read(&mutex->locked) == 0) {
258
                goto retry_fast_path;
259
            }
260
            cpu_relax();
261
        }
262
        waiters = qatomic_fetch_inc(&mutex->locked);
263
    }
264

265
    if (waiters == 0) {
266
        /* Uncontended.  */
267
        trace_qemu_co_mutex_lock_uncontended(mutex, self);
268
        mutex->ctx = ctx;
269
    } else {
270
        qemu_co_mutex_lock_slowpath(ctx, mutex);
271
    }
272
    mutex->holder = self;
273
    self->locks_held++;
274
}
275

276
void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
277
{
278
    Coroutine *self = qemu_coroutine_self();
279

280
    trace_qemu_co_mutex_unlock_entry(mutex, self);
281

282
    assert(mutex->locked);
283
    assert(mutex->holder == self);
284
    assert(qemu_in_coroutine());
285

286
    mutex->ctx = NULL;
287
    mutex->holder = NULL;
288
    self->locks_held--;
289
    if (qatomic_fetch_dec(&mutex->locked) == 1) {
290
        /* No waiting qemu_co_mutex_lock().  Pfew, that was easy!  */
291
        return;
292
    }
293

294
    for (;;) {
295
        CoWaitRecord *to_wake = pop_waiter(mutex);
296
        unsigned our_handoff;
297

298
        if (to_wake) {
299
            qemu_co_mutex_wake(mutex, to_wake->co);
300
            break;
301
        }
302

303
        /* Some concurrent lock() is in progress (we know this because
304
         * mutex->locked was >1) but it hasn't yet put itself on the wait
305
         * queue.  Pick a sequence number for the handoff protocol (not 0).
306
         */
307
        if (++mutex->sequence == 0) {
308
            mutex->sequence = 1;
309
        }
310

311
        our_handoff = mutex->sequence;
312
        /* Set handoff before checking for waiters.  */
313
        qatomic_set_mb(&mutex->handoff, our_handoff);
314
        if (!has_waiters(mutex)) {
315
            /* The concurrent lock has not added itself yet, so it
316
             * will be able to pick our handoff.
317
             */
318
            break;
319
        }
320

321
        /* Try to do the handoff protocol ourselves; if somebody else has
322
         * already taken it, however, we're done and they're responsible.
323
         */
324
        if (qatomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) {
325
            break;
326
        }
327
    }
328

329
    trace_qemu_co_mutex_unlock_return(mutex, self);
330
}
331

332
struct CoRwTicket {
333
    bool read;
334
    Coroutine *co;
335
    QSIMPLEQ_ENTRY(CoRwTicket) next;
336
};
337

338
void qemu_co_rwlock_init(CoRwlock *lock)
339
{
340
    qemu_co_mutex_init(&lock->mutex);
341
    lock->owners = 0;
342
    QSIMPLEQ_INIT(&lock->tickets);
343
}
344

345
/* Releases the internal CoMutex.  */
346
static void coroutine_fn qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)
347
{
348
    CoRwTicket *tkt = QSIMPLEQ_FIRST(&lock->tickets);
349
    Coroutine *co = NULL;
350

351
    /*
352
     * Setting lock->owners here prevents rdlock and wrlock from
353
     * sneaking in between unlock and wake.
354
     */
355

356
    if (tkt) {
357
        if (tkt->read) {
358
            if (lock->owners >= 0) {
359
                lock->owners++;
360
                co = tkt->co;
361
            }
362
        } else {
363
            if (lock->owners == 0) {
364
                lock->owners = -1;
365
                co = tkt->co;
366
            }
367
        }
368
    }
369

370
    if (co) {
371
        QSIMPLEQ_REMOVE_HEAD(&lock->tickets, next);
372
        qemu_co_mutex_unlock(&lock->mutex);
373
        aio_co_wake(co);
374
    } else {
375
        qemu_co_mutex_unlock(&lock->mutex);
376
    }
377
}
378

379
void coroutine_fn qemu_co_rwlock_rdlock(CoRwlock *lock)
380
{
381
    Coroutine *self = qemu_coroutine_self();
382

383
    qemu_co_mutex_lock(&lock->mutex);
384
    /* For fairness, wait if a writer is in line.  */
385
    if (lock->owners == 0 || (lock->owners > 0 && QSIMPLEQ_EMPTY(&lock->tickets))) {
386
        lock->owners++;
387
        qemu_co_mutex_unlock(&lock->mutex);
388
    } else {
389
        CoRwTicket my_ticket = { true, self };
390

391
        QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
392
        qemu_co_mutex_unlock(&lock->mutex);
393
        qemu_coroutine_yield();
394
        assert(lock->owners >= 1);
395

396
        /* Possibly wake another reader, which will wake the next in line.  */
397
        qemu_co_mutex_lock(&lock->mutex);
398
        qemu_co_rwlock_maybe_wake_one(lock);
399
    }
400

401
    self->locks_held++;
402
}
403

404
void coroutine_fn qemu_co_rwlock_unlock(CoRwlock *lock)
405
{
406
    Coroutine *self = qemu_coroutine_self();
407

408
    assert(qemu_in_coroutine());
409
    self->locks_held--;
410

411
    qemu_co_mutex_lock(&lock->mutex);
412
    if (lock->owners > 0) {
413
        lock->owners--;
414
    } else {
415
        assert(lock->owners == -1);
416
        lock->owners = 0;
417
    }
418

419
    qemu_co_rwlock_maybe_wake_one(lock);
420
}
421

422
void coroutine_fn qemu_co_rwlock_downgrade(CoRwlock *lock)
423
{
424
    qemu_co_mutex_lock(&lock->mutex);
425
    assert(lock->owners == -1);
426
    lock->owners = 1;
427

428
    /* Possibly wake another reader, which will wake the next in line.  */
429
    qemu_co_rwlock_maybe_wake_one(lock);
430
}
431

432
void coroutine_fn qemu_co_rwlock_wrlock(CoRwlock *lock)
433
{
434
    Coroutine *self = qemu_coroutine_self();
435

436
    qemu_co_mutex_lock(&lock->mutex);
437
    if (lock->owners == 0) {
438
        lock->owners = -1;
439
        qemu_co_mutex_unlock(&lock->mutex);
440
    } else {
441
        CoRwTicket my_ticket = { false, qemu_coroutine_self() };
442

443
        QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
444
        qemu_co_mutex_unlock(&lock->mutex);
445
        qemu_coroutine_yield();
446
        assert(lock->owners == -1);
447
    }
448

449
    self->locks_held++;
450
}
451

452
void coroutine_fn qemu_co_rwlock_upgrade(CoRwlock *lock)
453
{
454
    qemu_co_mutex_lock(&lock->mutex);
455
    assert(lock->owners > 0);
456
    /* For fairness, wait if a writer is in line.  */
457
    if (lock->owners == 1 && QSIMPLEQ_EMPTY(&lock->tickets)) {
458
        lock->owners = -1;
459
        qemu_co_mutex_unlock(&lock->mutex);
460
    } else {
461
        CoRwTicket my_ticket = { false, qemu_coroutine_self() };
462

463
        lock->owners--;
464
        QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
465
        qemu_co_rwlock_maybe_wake_one(lock);
466
        qemu_coroutine_yield();
467
        assert(lock->owners == -1);
468
    }
469
}
470

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.