17
#include "qemu/osdep.h"
18
#include "qemu/defer-call.h"
19
#include "qemu/queue.h"
20
#include "qemu/thread.h"
21
#include "qemu/coroutine.h"
23
#include "block/thread-pool.h"
24
#include "qemu/main-loop.h"
26
static void do_spawn_thread(ThreadPool *pool);
28
typedef struct ThreadPoolElement ThreadPoolElement;
36
struct ThreadPoolElement {
46
enum ThreadState state;
50
QTAILQ_ENTRY(ThreadPoolElement) reqs;
53
QLIST_ENTRY(ThreadPoolElement) all;
58
QEMUBH *completion_bh;
60
QemuCond worker_stopped;
61
QemuCond request_cond;
62
QEMUBH *new_thread_bh;
65
QLIST_HEAD(, ThreadPoolElement) head;
68
QTAILQ_HEAD(, ThreadPoolElement) request_list;
77
static void *worker_thread(void *opaque)
79
ThreadPool *pool = opaque;
81
qemu_mutex_lock(&pool->lock);
82
pool->pending_threads--;
83
do_spawn_thread(pool);
85
while (pool->cur_threads <= pool->max_threads) {
86
ThreadPoolElement *req;
89
if (QTAILQ_EMPTY(&pool->request_list)) {
91
ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
94
QTAILQ_EMPTY(&pool->request_list) &&
95
pool->cur_threads > pool->min_threads) {
106
req = QTAILQ_FIRST(&pool->request_list);
107
QTAILQ_REMOVE(&pool->request_list, req, reqs);
108
req->state = THREAD_ACTIVE;
109
qemu_mutex_unlock(&pool->lock);
111
ret = req->func(req->arg);
116
req->state = THREAD_DONE;
118
qemu_bh_schedule(pool->completion_bh);
119
qemu_mutex_lock(&pool->lock);
123
qemu_cond_signal(&pool->worker_stopped);
129
qemu_cond_signal(&pool->request_cond);
130
qemu_mutex_unlock(&pool->lock);
134
static void do_spawn_thread(ThreadPool *pool)
139
if (!pool->new_threads) {
144
pool->pending_threads++;
146
qemu_thread_create(&t, "worker", worker_thread, pool, QEMU_THREAD_DETACHED);
149
static void spawn_thread_bh_fn(void *opaque)
151
ThreadPool *pool = opaque;
153
qemu_mutex_lock(&pool->lock);
154
do_spawn_thread(pool);
155
qemu_mutex_unlock(&pool->lock);
158
static void spawn_thread(ThreadPool *pool)
169
if (!pool->pending_threads) {
170
qemu_bh_schedule(pool->new_thread_bh);
174
static void thread_pool_completion_bh(void *opaque)
176
ThreadPool *pool = opaque;
177
ThreadPoolElement *elem, *next;
182
QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
183
if (elem->state != THREAD_DONE) {
187
trace_thread_pool_complete(pool, elem, elem->common.opaque,
189
QLIST_REMOVE(elem, all);
191
if (elem->common.cb) {
198
qemu_bh_schedule(pool->completion_bh);
200
elem->common.cb(elem->common.opaque, elem->ret);
206
qemu_bh_cancel(pool->completion_bh);
208
qemu_aio_unref(elem);
211
qemu_aio_unref(elem);
218
static void thread_pool_cancel(BlockAIOCB *acb)
220
ThreadPoolElement *elem = (ThreadPoolElement *)acb;
221
ThreadPool *pool = elem->pool;
223
trace_thread_pool_cancel(elem, elem->common.opaque);
225
QEMU_LOCK_GUARD(&pool->lock);
226
if (elem->state == THREAD_QUEUED) {
227
QTAILQ_REMOVE(&pool->request_list, elem, reqs);
228
qemu_bh_schedule(pool->completion_bh);
230
elem->state = THREAD_DONE;
231
elem->ret = -ECANCELED;
236
static const AIOCBInfo thread_pool_aiocb_info = {
237
.aiocb_size = sizeof(ThreadPoolElement),
238
.cancel_async = thread_pool_cancel,
241
BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
242
BlockCompletionFunc *cb, void *opaque)
244
ThreadPoolElement *req;
245
AioContext *ctx = qemu_get_current_aio_context();
246
ThreadPool *pool = aio_get_thread_pool(ctx);
249
assert(pool->ctx == qemu_get_current_aio_context());
251
req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
254
req->state = THREAD_QUEUED;
257
QLIST_INSERT_HEAD(&pool->head, req, all);
259
trace_thread_pool_submit(pool, req, arg);
261
qemu_mutex_lock(&pool->lock);
262
if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
265
QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
266
qemu_mutex_unlock(&pool->lock);
267
qemu_cond_signal(&pool->request_cond);
271
typedef struct ThreadPoolCo {
276
static void thread_pool_co_cb(void *opaque, int ret)
278
ThreadPoolCo *co = opaque;
284
int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
286
ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
287
assert(qemu_in_coroutine());
288
thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
289
qemu_coroutine_yield();
293
void thread_pool_submit(ThreadPoolFunc *func, void *arg)
295
thread_pool_submit_aio(func, arg, NULL, NULL);
298
void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
300
qemu_mutex_lock(&pool->lock);
302
pool->min_threads = ctx->thread_pool_min;
303
pool->max_threads = ctx->thread_pool_max;
314
for (int i = pool->cur_threads; i < pool->min_threads; i++) {
318
for (int i = pool->cur_threads; i > pool->max_threads; i--) {
319
qemu_cond_signal(&pool->request_cond);
322
qemu_mutex_unlock(&pool->lock);
325
static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
328
ctx = qemu_get_aio_context();
331
memset(pool, 0, sizeof(*pool));
333
pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool);
334
qemu_mutex_init(&pool->lock);
335
qemu_cond_init(&pool->worker_stopped);
336
qemu_cond_init(&pool->request_cond);
337
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
339
QLIST_INIT(&pool->head);
340
QTAILQ_INIT(&pool->request_list);
342
thread_pool_update_params(pool, ctx);
345
ThreadPool *thread_pool_new(AioContext *ctx)
347
ThreadPool *pool = g_new(ThreadPool, 1);
348
thread_pool_init_one(pool, ctx);
352
void thread_pool_free(ThreadPool *pool)
358
assert(QLIST_EMPTY(&pool->head));
360
qemu_mutex_lock(&pool->lock);
363
qemu_bh_delete(pool->new_thread_bh);
364
pool->cur_threads -= pool->new_threads;
365
pool->new_threads = 0;
368
pool->max_threads = 0;
369
qemu_cond_broadcast(&pool->request_cond);
370
while (pool->cur_threads > 0) {
371
qemu_cond_wait(&pool->worker_stopped, &pool->lock);
374
qemu_mutex_unlock(&pool->lock);
376
qemu_bh_delete(pool->completion_bh);
377
qemu_cond_destroy(&pool->request_cond);
378
qemu_cond_destroy(&pool->worker_stopped);
379
qemu_mutex_destroy(&pool->lock);