qemu

Форк
0
/
thread-pool.c 
381 строка · 10.3 Кб
1
/*
2
 * QEMU block layer thread pool
3
 *
4
 * Copyright IBM, Corp. 2008
5
 * Copyright Red Hat, Inc. 2012
6
 *
7
 * Authors:
8
 *  Anthony Liguori   <aliguori@us.ibm.com>
9
 *  Paolo Bonzini     <pbonzini@redhat.com>
10
 *
11
 * This work is licensed under the terms of the GNU GPL, version 2.  See
12
 * the COPYING file in the top-level directory.
13
 *
14
 * Contributions after 2012-01-13 are licensed under the terms of the
15
 * GNU GPL, version 2 or (at your option) any later version.
16
 */
17
#include "qemu/osdep.h"
18
#include "qemu/defer-call.h"
19
#include "qemu/queue.h"
20
#include "qemu/thread.h"
21
#include "qemu/coroutine.h"
22
#include "trace.h"
23
#include "block/thread-pool.h"
24
#include "qemu/main-loop.h"
25

26
static void do_spawn_thread(ThreadPool *pool);
27

28
typedef struct ThreadPoolElement ThreadPoolElement;
29

30
enum ThreadState {
31
    THREAD_QUEUED,
32
    THREAD_ACTIVE,
33
    THREAD_DONE,
34
};
35

36
struct ThreadPoolElement {
37
    BlockAIOCB common;
38
    ThreadPool *pool;
39
    ThreadPoolFunc *func;
40
    void *arg;
41

42
    /* Moving state out of THREAD_QUEUED is protected by lock.  After
43
     * that, only the worker thread can write to it.  Reads and writes
44
     * of state and ret are ordered with memory barriers.
45
     */
46
    enum ThreadState state;
47
    int ret;
48

49
    /* Access to this list is protected by lock.  */
50
    QTAILQ_ENTRY(ThreadPoolElement) reqs;
51

52
    /* This list is only written by the thread pool's mother thread.  */
53
    QLIST_ENTRY(ThreadPoolElement) all;
54
};
55

56
struct ThreadPool {
57
    AioContext *ctx;
58
    QEMUBH *completion_bh;
59
    QemuMutex lock;
60
    QemuCond worker_stopped;
61
    QemuCond request_cond;
62
    QEMUBH *new_thread_bh;
63

64
    /* The following variables are only accessed from one AioContext. */
65
    QLIST_HEAD(, ThreadPoolElement) head;
66

67
    /* The following variables are protected by lock.  */
68
    QTAILQ_HEAD(, ThreadPoolElement) request_list;
69
    int cur_threads;
70
    int idle_threads;
71
    int new_threads;     /* backlog of threads we need to create */
72
    int pending_threads; /* threads created but not running yet */
73
    int min_threads;
74
    int max_threads;
75
};
76

77
static void *worker_thread(void *opaque)
78
{
79
    ThreadPool *pool = opaque;
80

81
    qemu_mutex_lock(&pool->lock);
82
    pool->pending_threads--;
83
    do_spawn_thread(pool);
84

85
    while (pool->cur_threads <= pool->max_threads) {
86
        ThreadPoolElement *req;
87
        int ret;
88

89
        if (QTAILQ_EMPTY(&pool->request_list)) {
90
            pool->idle_threads++;
91
            ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
92
            pool->idle_threads--;
93
            if (ret == 0 &&
94
                QTAILQ_EMPTY(&pool->request_list) &&
95
                pool->cur_threads > pool->min_threads) {
96
                /* Timed out + no work to do + no need for warm threads = exit.  */
97
                break;
98
            }
99
            /*
100
             * Even if there was some work to do, check if there aren't
101
             * too many worker threads before picking it up.
102
             */
103
            continue;
104
        }
105

106
        req = QTAILQ_FIRST(&pool->request_list);
107
        QTAILQ_REMOVE(&pool->request_list, req, reqs);
108
        req->state = THREAD_ACTIVE;
109
        qemu_mutex_unlock(&pool->lock);
110

111
        ret = req->func(req->arg);
112

113
        req->ret = ret;
114
        /* Write ret before state.  */
115
        smp_wmb();
116
        req->state = THREAD_DONE;
117

118
        qemu_bh_schedule(pool->completion_bh);
119
        qemu_mutex_lock(&pool->lock);
120
    }
121

122
    pool->cur_threads--;
123
    qemu_cond_signal(&pool->worker_stopped);
124

125
    /*
126
     * Wake up another thread, in case we got a wakeup but decided
127
     * to exit due to pool->cur_threads > pool->max_threads.
128
     */
129
    qemu_cond_signal(&pool->request_cond);
130
    qemu_mutex_unlock(&pool->lock);
131
    return NULL;
132
}
133

134
static void do_spawn_thread(ThreadPool *pool)
135
{
136
    QemuThread t;
137

138
    /* Runs with lock taken.  */
139
    if (!pool->new_threads) {
140
        return;
141
    }
142

143
    pool->new_threads--;
144
    pool->pending_threads++;
145

146
    qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED);
147
}
148

149
static void spawn_thread_bh_fn(void *opaque)
150
{
151
    ThreadPool *pool = opaque;
152

153
    qemu_mutex_lock(&pool->lock);
154
    do_spawn_thread(pool);
155
    qemu_mutex_unlock(&pool->lock);
156
}
157

158
static void spawn_thread(ThreadPool *pool)
159
{
160
    pool->cur_threads++;
161
    pool->new_threads++;
162
    /* If there are threads being created, they will spawn new workers, so
163
     * we don't spend time creating many threads in a loop holding a mutex or
164
     * starving the current vcpu.
165
     *
166
     * If there are no idle threads, ask the main thread to create one, so we
167
     * inherit the correct affinity instead of the vcpu affinity.
168
     */
169
    if (!pool->pending_threads) {
170
        qemu_bh_schedule(pool->new_thread_bh);
171
    }
172
}
173

174
static void thread_pool_completion_bh(void *opaque)
175
{
176
    ThreadPool *pool = opaque;
177
    ThreadPoolElement *elem, *next;
178

179
    defer_call_begin(); /* cb() may use defer_call() to coalesce work */
180

181
restart:
182
    QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
183
        if (elem->state != THREAD_DONE) {
184
            continue;
185
        }
186

187
        trace_thread_pool_complete(pool, elem, elem->common.opaque,
188
                                   elem->ret);
189
        QLIST_REMOVE(elem, all);
190

191
        if (elem->common.cb) {
192
            /* Read state before ret.  */
193
            smp_rmb();
194

195
            /* Schedule ourselves in case elem->common.cb() calls aio_poll() to
196
             * wait for another request that completed at the same time.
197
             */
198
            qemu_bh_schedule(pool->completion_bh);
199

200
            elem->common.cb(elem->common.opaque, elem->ret);
201

202
            /* We can safely cancel the completion_bh here regardless of someone
203
             * else having scheduled it meanwhile because we reenter the
204
             * completion function anyway (goto restart).
205
             */
206
            qemu_bh_cancel(pool->completion_bh);
207

208
            qemu_aio_unref(elem);
209
            goto restart;
210
        } else {
211
            qemu_aio_unref(elem);
212
        }
213
    }
214

215
    defer_call_end();
216
}
217

218
static void thread_pool_cancel(BlockAIOCB *acb)
219
{
220
    ThreadPoolElement *elem = (ThreadPoolElement *)acb;
221
    ThreadPool *pool = elem->pool;
222

223
    trace_thread_pool_cancel(elem, elem->common.opaque);
224

225
    QEMU_LOCK_GUARD(&pool->lock);
226
    if (elem->state == THREAD_QUEUED) {
227
        QTAILQ_REMOVE(&pool->request_list, elem, reqs);
228
        qemu_bh_schedule(pool->completion_bh);
229

230
        elem->state = THREAD_DONE;
231
        elem->ret = -ECANCELED;
232
    }
233

234
}
235

236
static const AIOCBInfo thread_pool_aiocb_info = {
237
    .aiocb_size         = sizeof(ThreadPoolElement),
238
    .cancel_async       = thread_pool_cancel,
239
};
240

241
BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
242
                                   BlockCompletionFunc *cb, void *opaque)
243
{
244
    ThreadPoolElement *req;
245
    AioContext *ctx = qemu_get_current_aio_context();
246
    ThreadPool *pool = aio_get_thread_pool(ctx);
247

248
    /* Assert that the thread submitting work is the same running the pool */
249
    assert(pool->ctx == qemu_get_current_aio_context());
250

251
    req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
252
    req->func = func;
253
    req->arg = arg;
254
    req->state = THREAD_QUEUED;
255
    req->pool = pool;
256

257
    QLIST_INSERT_HEAD(&pool->head, req, all);
258

259
    trace_thread_pool_submit(pool, req, arg);
260

261
    qemu_mutex_lock(&pool->lock);
262
    if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
263
        spawn_thread(pool);
264
    }
265
    QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
266
    qemu_mutex_unlock(&pool->lock);
267
    qemu_cond_signal(&pool->request_cond);
268
    return &req->common;
269
}
270

271
typedef struct ThreadPoolCo {
272
    Coroutine *co;
273
    int ret;
274
} ThreadPoolCo;
275

276
static void thread_pool_co_cb(void *opaque, int ret)
277
{
278
    ThreadPoolCo *co = opaque;
279

280
    co->ret = ret;
281
    aio_co_wake(co->co);
282
}
283

284
int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
285
{
286
    ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
287
    assert(qemu_in_coroutine());
288
    thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
289
    qemu_coroutine_yield();
290
    return tpc.ret;
291
}
292

293
void thread_pool_submit(ThreadPoolFunc *func, void *arg)
294
{
295
    thread_pool_submit_aio(func, arg, NULL, NULL);
296
}
297

298
void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
299
{
300
    qemu_mutex_lock(&pool->lock);
301

302
    pool->min_threads = ctx->thread_pool_min;
303
    pool->max_threads = ctx->thread_pool_max;
304

305
    /*
306
     * We either have to:
307
     *  - Increase the number available of threads until over the min_threads
308
     *    threshold.
309
     *  - Bump the worker threads so that they exit, until under the max_threads
310
     *    threshold.
311
     *  - Do nothing. The current number of threads fall in between the min and
312
     *    max thresholds. We'll let the pool manage itself.
313
     */
314
    for (int i = pool->cur_threads; i < pool->min_threads; i++) {
315
        spawn_thread(pool);
316
    }
317

318
    for (int i = pool->cur_threads; i > pool->max_threads; i--) {
319
        qemu_cond_signal(&pool->request_cond);
320
    }
321

322
    qemu_mutex_unlock(&pool->lock);
323
}
324

325
static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
326
{
327
    if (!ctx) {
328
        ctx = qemu_get_aio_context();
329
    }
330

331
    memset(pool, 0, sizeof(*pool));
332
    pool->ctx = ctx;
333
    pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool);
334
    qemu_mutex_init(&pool->lock);
335
    qemu_cond_init(&pool->worker_stopped);
336
    qemu_cond_init(&pool->request_cond);
337
    pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
338

339
    QLIST_INIT(&pool->head);
340
    QTAILQ_INIT(&pool->request_list);
341

342
    thread_pool_update_params(pool, ctx);
343
}
344

345
ThreadPool *thread_pool_new(AioContext *ctx)
346
{
347
    ThreadPool *pool = g_new(ThreadPool, 1);
348
    thread_pool_init_one(pool, ctx);
349
    return pool;
350
}
351

352
void thread_pool_free(ThreadPool *pool)
353
{
354
    if (!pool) {
355
        return;
356
    }
357

358
    assert(QLIST_EMPTY(&pool->head));
359

360
    qemu_mutex_lock(&pool->lock);
361

362
    /* Stop new threads from spawning */
363
    qemu_bh_delete(pool->new_thread_bh);
364
    pool->cur_threads -= pool->new_threads;
365
    pool->new_threads = 0;
366

367
    /* Wait for worker threads to terminate */
368
    pool->max_threads = 0;
369
    qemu_cond_broadcast(&pool->request_cond);
370
    while (pool->cur_threads > 0) {
371
        qemu_cond_wait(&pool->worker_stopped, &pool->lock);
372
    }
373

374
    qemu_mutex_unlock(&pool->lock);
375

376
    qemu_bh_delete(pool->completion_bh);
377
    qemu_cond_destroy(&pool->request_cond);
378
    qemu_cond_destroy(&pool->worker_stopped);
379
    qemu_mutex_destroy(&pool->lock);
380
    g_free(pool);
381
}
382

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.