glusterfs
620 строк · 15.7 Кб
1/*
2Copyright (c) 2021 Red Hat, Inc. <https://www.redhat.com>
3This file is part of GlusterFS.
4
5This file is licensed to you under your choice of the GNU Lesser
6General Public License, version 3 or any later version (LGPLv3 or
7later), or the GNU General Public License, version 2 (GPLv2), in all
8cases as published by the Free Software Foundation.
9*/
10
11#include <signal.h>12#include <sys/mman.h>13#include <sys/syscall.h>14
15#include <urcu/uatomic.h>16
17#include <glusterfs/list.h>18#include <glusterfs/gf-io-common.h>19#include <glusterfs/globals.h>20
21static __thread gf_io_thread_t gf_io_thread = {};22
23/* Initialize a condition variable using a monotonic clock for timeouts. */
24static int32_t25gf_io_cond_init(pthread_cond_t *cond)26{
27pthread_condattr_t attr;28int32_t res;29
30res = gf_res_err(pthread_condattr_init(&attr));31if (caa_unlikely(res < 0)) {32return gf_check("io", GF_LOG_ERROR, "pthread_condattr_init", res);33}34
35res = gf_res_err(pthread_condattr_setclock(&attr, CLOCK_MONOTONIC));36if (caa_likely(res >= 0)) {37res = gf_res_err(pthread_cond_init(cond, &attr));38gf_check("io", GF_LOG_ERROR, "pthread_cond_init", res);39} else {40gf_check("io", GF_LOG_ERROR, "pthread_condaddr_setclock", res);41}42
43gf_check("io", GF_LOG_WARNING, "pthread_condattr_destroy",44gf_res_err(pthread_condattr_destroy(&attr)));45
46return res;47}
48
49/* Initializes a sync object to synchronize 'count' entities with a maximum
50* delay of 'timeout' seconds. */
51int32_t
52gf_io_sync_start(gf_io_sync_t *sync, uint32_t count, uint32_t timeout,53uint32_t retries, void *data)54{
55int32_t res;56
57res = gf_res_errno0(clock_gettime(CLOCK_MONOTONIC, &sync->abs_to));58if (caa_unlikely(res < 0)) {59return gf_check("io", GF_LOG_ERROR, "clock_gettime", res);60}61
62sync->abs_to.tv_sec += timeout;63sync->data = data;64sync->timeout = timeout;65sync->retries = retries;66sync->count = count;67sync->phase = 0;68sync->pending = count;69sync->res = 0;70
71res = gf_res_err(pthread_mutex_init(&sync->mutex, NULL));72if (caa_likely(res >= 0)) {73res = gf_io_cond_init(&sync->cond);74if (caa_likely(res >= 0)) {75return 0;76}77
78gf_check("io", GF_LOG_WARNING, "pthread_mutex_destroy",79gf_res_err(pthread_mutex_destroy(&sync->mutex)));80} else {81gf_check("io", GF_LOG_ERROR, "pthread_mutex_init", res);82}83
84return res;85}
86
87/* Destroys a sync object. */
88static void89gf_io_sync_destroy(gf_io_sync_t *sync)90{
91gf_check("io", GF_LOG_WARNING, "pthread_cond_destroy",92gf_res_err(pthread_cond_destroy(&sync->cond)));93gf_check("io", GF_LOG_WARNING, "pthread_mutex_destroy",94gf_res_err(pthread_mutex_destroy(&sync->mutex)));95}
96
97static int32_t98gf_io_sync_wait_timeout(gf_io_sync_t *sync, int32_t retry, bool check)99{
100int32_t res;101
102if (check) {103if (retry > 0) {104GF_LOG_I("io", LG_MSG_IO_SYNC_COMPLETED(retry));105}106return -1;107}108
109res = gf_res_err(pthread_cond_timedwait(&sync->cond, &sync->mutex,110&sync->abs_to));111if (caa_unlikely(res != 0)) {112if (res != -ETIMEDOUT) {113gf_check("io", GF_LOG_ERROR, "pthread_cond_timedwait", res);114GF_ABORT();115}116
117retry++;118
119GF_LOG_W("io", LG_MSG_IO_SYNC_TIMEOUT(retry));120
121if (sync->retries == 0) {122GF_LOG_E("io", LG_MSG_IO_SYNC_ABORTED(retry));123GF_ABORT();124}125sync->retries--;126
127sync->abs_to.tv_sec += sync->timeout;128}129
130return retry;131}
132
133/* Notifies completion of 'count' entities. Optionally it can wait until
134* all other threads have also notified. Only one thread can wait. */
135int32_t
136gf_io_sync_done(gf_io_sync_t *sync, uint32_t count, int32_t res, bool wait)137{
138int32_t retry;139
140gf_io_lock(&sync->mutex);141
142sync->pending -= count;143if (!wait) {144if (caa_unlikely(res < 0) && (sync->res >= 0)) {145sync->res = res;146}147
148if (sync->pending == 0) {149gf_succeed("io", "pthread_cond_signal",150gf_res_err(pthread_cond_signal(&sync->cond)));151}152
153gf_io_unlock(&sync->mutex);154
155return 0;156}157
158retry = 0;159do {160retry = gf_io_sync_wait_timeout(sync, retry, sync->pending == 0);161} while (retry >= 0);162
163res = sync->res;164
165gf_io_unlock(&sync->mutex);166
167gf_io_sync_destroy(sync);168
169return res;170}
171
172/* Wait for a synchronization point. 'count' represents the number of
173* entities waiting, and 'res' the result of the operation done just
174* before synchronizing. The return value will be 0 only if all entities
175* completed without error (i.e. 'res' was >= 0 in all calls to this
176* function). */
177int32_t
178gf_io_sync_wait(gf_io_sync_t *sync, uint32_t count, int32_t res)179{
180uint32_t phase;181int32_t retry;182
183gf_io_lock(&sync->mutex);184
185if (caa_unlikely(res < 0) && (sync->res >= 0)) {186sync->res = res;187}188
189sync->pending -= count;190if (sync->pending == 0) {191sync->pending = sync->count;192sync->phase++;193
194gf_succeed("io", "pthread_cond_broadcast",195gf_res_err(pthread_cond_broadcast(&sync->cond)));196} else {197phase = sync->phase;198
199retry = 0;200do {201retry = gf_io_sync_wait_timeout(sync, retry, sync->phase != phase);202} while (retry >= 0);203}204
205res = sync->res;206
207gf_io_unlock(&sync->mutex);208
209return res;210}
211
212/* Sets the name of the thread. */
213static int32_t214gf_io_thread_name(pthread_t id, const char *code, uint32_t index)215{
216char name[GF_THREAD_NAME_LIMIT];217int32_t len;218
219len = snprintf(name, sizeof(name), GF_THREAD_NAME_PREFIX "%s/%u", code,220index);221if (caa_unlikely((len < 0) || (len >= sizeof(name)))) {222GF_LOG_E("io", LG_MSG_IO_THREAD_NAME_INVALID());223
224return -EINVAL;225}226
227return __gf_thread_set_name(id, name);228}
229
230/* Sets the signal mask of the thread. */
231static int32_t232gf_io_thread_mask(int32_t *signals)233{
234sigset_t set;235int32_t i, res;236
237res = gf_res_errno0(sigfillset(&set));238gf_check("io", GF_LOG_ERROR, "sigfillset", res);239for (i = 0; caa_likely(res >= 0) && (signals[i] != 0); i++) {240res = gf_res_errno0(sigdelset(&set, signals[i]));241gf_check("io", GF_LOG_ERROR, "sigdelset", res);242}243
244if (caa_likely(res >= 0)) {245res = gf_res_err(pthread_sigmask(SIG_BLOCK, &set, NULL));246gf_check("io", GF_LOG_ERROR, "pthread_sigmask", res);247}248
249return res;250}
251
252#ifdef GF_LINUX_HOST_OS253
254/* Sets the affinity of the thread. */
255static int32_t256gf_io_thread_affinity(pthread_t id, cpu_set_t *cpus, uint32_t index)257{
258cpu_set_t affinity;259uint32_t i, current;260
261if (cpus == NULL) {262return 0;263}264
265current = 0;266for (i = 0; i < CPU_SETSIZE; i++) {267if (CPU_ISSET(i, cpus)) {268if (current == index) {269break;270}271current++;272}273}274if (caa_unlikely(i >= CPU_SETSIZE)) {275GF_LOG_E("io", LG_MSG_IO_THREAD_NO_CPU(index));276return -ENODEV;277}278
279CPU_ZERO(&affinity);280CPU_SET(i, &affinity);281
282return gf_check("io", GF_LOG_ERROR, "pthread_setaffinity_np",283gf_res_err(pthread_setaffinity_np(id, sizeof(affinity),284&affinity)));285}
286
287#endif288
289/* Adds a thread to the thread pool. */
290static void291gf_io_thread_add(gf_io_thread_pool_t *pool, gf_io_thread_t *thread)292{
293thread->pool = pool;294
295gf_io_lock(&pool->mutex);296
297list_add_tail(&thread->list, &pool->threads);298
299gf_io_unlock(&pool->mutex);300}
301
302/* Initialize a thread. */
303static gf_io_thread_main_t304gf_io_thread_init(gf_io_sync_t *sync, gf_io_thread_t *thread)305{
306gf_io_thread_pool_config_t *cfg;307gf_io_thread_pool_t *pool;308gf_io_thread_main_t start;309int32_t res;310
311cfg = sync->data;312pool = cfg->pool;313start = NULL;314
315/* Sync phase 0: Creation of all threads. */316
317gf_io_thread_add(pool, thread);318if (caa_unlikely(gf_io_sync_wait(sync, 1, 0) < 0)) {319goto done;320}321
322/* Sync phase 1: Configuration of each thread. */323
324thread->id = pthread_self();325thread->index = uatomic_add_return(&cfg->index, 1) - 1;326
327res = gf_io_thread_name(thread->id, cfg->name,328thread->index + cfg->first_id);329if (caa_likely(res >= 0)) {330res = gf_io_thread_mask(cfg->signals);331}332
333#ifdef GF_LINUX_HOST_OS334if (caa_likely(res >= 0)) {335res = gf_io_thread_affinity(thread->id, cfg->cpus, thread->index);336}337#endif338
339if (caa_unlikely(gf_io_sync_wait(sync, 1, res) < 0)) {340goto done;341}342
343/* Sync phase 2: Specific initialization. */344
345thread->data = NULL;346
347res = cfg->setup(sync, thread);348if (caa_unlikely(res < 0)) {349goto done;350}351
352start = cfg->main;353
354done:355gf_io_sync_done(sync, 1, 0, false);356
357return start;358}
359
360/* Thread main function. */
361static void *362gf_io_thread_main(void *data)363{
364gf_io_thread_t *thread;365gf_io_thread_main_t start;366int32_t res;367
368thread = &gf_io_thread;369
370start = gf_io_thread_init(data, thread);371if (caa_likely(start != NULL)) {372res = start(thread);373if (caa_unlikely(res < 0)) {374GF_ABORT();375}376}377
378return NULL;379}
380
381/* Add scheduler/priority configuration to a thread attr. */
382static int32_t383gf_io_thread_attr_priority(gf_io_thread_pool_config_t *cfg,384pthread_attr_t *attr)385{
386struct sched_param param;387int32_t policy, priority, min, max, res;388
389priority = cfg->priority;390
391if (priority == 0) {392return 0;393}394
395policy = SCHED_FIFO;396if (priority < 0) {397policy = SCHED_RR;398priority = -priority;399}400if (priority > 100) {401GF_LOG_E("io", LG_MSG_IO_THREAD_BAD_PRIORITY(cfg->priority));402return -EINVAL;403}404
405min = gf_res_errno(sched_get_priority_min(policy));406if (caa_unlikely(min < 0)) {407gf_check("io", GF_LOG_ERROR, "sched_get_priority_min", min);408return min;409}410max = gf_res_errno(sched_get_priority_max(policy));411if (caa_unlikely(max < 0)) {412gf_check("io", GF_LOG_ERROR, "sched_get_priority_max", max);413return max;414}415
416memset(¶m, 0, sizeof(param));417param.sched_priority = min + priority * (max - min) / 100;418
419res = gf_res_err(pthread_attr_setschedpolicy(attr, policy));420gf_check("io", GF_LOG_ERROR, "pthread_attr_setschedpolicy", res);421if (caa_likely(res >= 0)) {422res = gf_res_err(pthread_attr_setschedparam(attr, ¶m));423gf_check("io", GF_LOG_ERROR, "pthread_attr_setschedparam", res);424}425if (caa_likely(res >= 0)) {426res = gf_res_err(pthread_attr_setinheritsched(attr,427PTHREAD_EXPLICIT_SCHED));428gf_check("io", GF_LOG_ERROR, "pthread_attr_setinheritsched", res);429}430
431return res;432}
433
434/* Prepare the attrs for a new thread. */
435static int32_t436gf_io_thread_attr(gf_io_thread_pool_config_t *cfg, pthread_attr_t *attr)437{
438int32_t res;439
440res = gf_res_err(pthread_attr_init(attr));441if (caa_unlikely(res < 0)) {442gf_check("io", GF_LOG_ERROR, "pthread_attr_init", res);443return res;444}445
446res = gf_res_err(pthread_attr_setstacksize(attr, cfg->stack_size));447gf_check("io", GF_LOG_ERROR, "pthread_attr_setstacksize", res);448if (caa_likely(res >= 0)) {449res = gf_io_thread_attr_priority(cfg, attr);450}451
452if (caa_unlikely(res < 0)) {453gf_check("io", GF_LOG_WARNING, "pthread_attr_destroy",454gf_res_err(pthread_attr_destroy(attr)));455}456
457return res;458}
459
460/* Create threads. */
461static int32_t462gf_io_thread_create(gf_io_thread_pool_config_t *cfg, pthread_t *ids,463uint32_t *created, void *(*main)(void *), void *data)464{
465pthread_attr_t attr;466uint32_t i;467int32_t res;468
469i = 0;470
471res = gf_io_thread_attr(cfg, &attr);472if (caa_likely(res >= 0)) {473while (i < cfg->num_threads) {474res = gf_res_err(pthread_create(&ids[i], &attr, main, data));475if (caa_unlikely(res < 0)) {476gf_check("io", GF_LOG_ERROR, "pthread_create", res);477break;478}479
480i++;481}482
483gf_check("io", GF_LOG_WARNING, "pthread_attr_destroy",484gf_res_err(pthread_attr_destroy(&attr)));485}486
487*created = i;488
489return res;490}
491
492/* Join a thread. */
493static void494gf_io_thread_join(pthread_t thread, struct timespec *timeout)495{
496#ifdef GF_LINUX_HOST_OS497if (timeout != NULL) {498gf_succeed("io", "pthread_timedjoin_np",499gf_res_err(pthread_timedjoin_np(thread, NULL, timeout)));500
501return;502}503#endif /* GF_LINUX_HOST_OS */504
505gf_succeed("io", "pthread_join", gf_res_err(pthread_join(thread, NULL)));506}
507
508/* Initializes as thread pool object. */
509static int32_t510gf_io_thread_pool_init(gf_io_thread_pool_t *pool,511gf_io_thread_pool_config_t *cfg)512{
513INIT_LIST_HEAD(&pool->threads);514cfg->pool = pool;515cfg->index = 0;516
517return gf_check("io", GF_LOG_ERROR, "pthread_mutex_init",518gf_res_err(pthread_mutex_init(&pool->mutex, NULL)));519}
520
521/* Destroys a thread pool object. */
522static void523gf_io_thread_pool_destroy(gf_io_thread_pool_t *pool)524{
525gf_check("io", GF_LOG_WARNING, "pthread_mutex_destroy",526gf_res_err(pthread_mutex_destroy(&pool->mutex)));527}
528
529/* Start a thread pool. */
530int32_t
531gf_io_thread_pool_start(gf_io_thread_pool_t *pool,532gf_io_thread_pool_config_t *cfg)533{
534pthread_t ids[cfg->num_threads];535gf_io_sync_t sync;536struct timespec to;537uint32_t created, pending;538int32_t res;539
540res = gf_io_thread_pool_init(pool, cfg);541if (caa_unlikely(res < 0)) {542return res;543}544
545created = 0;546
547res = gf_io_sync_start(&sync, cfg->num_threads + 1, cfg->timeout,548cfg->retries, cfg);549if (caa_unlikely(res < 0)) {550goto done;551}552
553/* Sync phase 0: Creation of all threads. */554
555res = gf_io_thread_create(cfg, ids, &created, gf_io_thread_main, &sync);556pending = cfg->num_threads - created + 1;557if (caa_unlikely(res < 0)) {558goto done_sync;559}560
561res = gf_io_sync_wait(&sync, pending, res);562if (caa_unlikely(res < 0)) {563goto done_sync;564}565
566/* Sync phase 1: Configuration of each thread. */567
568res = gf_io_sync_wait(&sync, 1, 0);569if (caa_unlikely(res < 0)) {570goto done_sync;571}572
573/* Sync phase 2: Specific initialization. */574
575res = cfg->setup(&sync, NULL);576
577done_sync:578gf_io_sync_done(&sync, pending, 0, true);579
580done:581if (caa_unlikely(res < 0)) {582gf_succeed("io", "clock_gettime",583gf_res_err(clock_gettime(CLOCK_REALTIME, &to)));584to.tv_sec += sync.timeout;585
586while (created > 0) {587gf_io_thread_join(ids[--created], &to);588}589
590gf_io_thread_pool_destroy(pool);591}592
593return res;594}
595
596/* Wait for thread pool termination and destroy it. */
597void
598gf_io_thread_pool_wait(gf_io_thread_pool_t *pool, uint32_t timeout)599{
600struct timespec to;601gf_io_thread_t *thread;602
603gf_succeed("io", "clock_gettime",604gf_res_err(clock_gettime(CLOCK_REALTIME, &to)));605to.tv_sec += timeout;606
607/* The list of threads is accessed concurrently only during creation of608* the thread pool. Once created, no one will touch the list, and only
609* a single caller to gf_io_thread_pool_wait() is allowed, so it's safe
610* to modify the list without taking the lock. */
611
612while (!list_empty(&pool->threads)) {613thread = list_first_entry(&pool->threads, gf_io_thread_t, list);614list_del_init(&thread->list);615
616gf_io_thread_join(thread->id, &to);617}618
619gf_io_thread_pool_destroy(pool);620}
621