glusterfs

Форк
0
/
gf-io-common.c 
620 строк · 15.7 Кб
1
/*
2
  Copyright (c) 2021 Red Hat, Inc. <https://www.redhat.com>
3
  This file is part of GlusterFS.
4

5
  This file is licensed to you under your choice of the GNU Lesser
6
  General Public License, version 3 or any later version (LGPLv3 or
7
  later), or the GNU General Public License, version 2 (GPLv2), in all
8
  cases as published by the Free Software Foundation.
9
*/
10

11
#include <signal.h>
12
#include <sys/mman.h>
13
#include <sys/syscall.h>
14

15
#include <urcu/uatomic.h>
16

17
#include <glusterfs/list.h>
18
#include <glusterfs/gf-io-common.h>
19
#include <glusterfs/globals.h>
20

21
static __thread gf_io_thread_t gf_io_thread = {};
22

23
/* Initialize a condition variable using a monotonic clock for timeouts. */
24
static int32_t
25
gf_io_cond_init(pthread_cond_t *cond)
26
{
27
    pthread_condattr_t attr;
28
    int32_t res;
29

30
    res = gf_res_err(pthread_condattr_init(&attr));
31
    if (caa_unlikely(res < 0)) {
32
        return gf_check("io", GF_LOG_ERROR, "pthread_condattr_init", res);
33
    }
34

35
    res = gf_res_err(pthread_condattr_setclock(&attr, CLOCK_MONOTONIC));
36
    if (caa_likely(res >= 0)) {
37
        res = gf_res_err(pthread_cond_init(cond, &attr));
38
        gf_check("io", GF_LOG_ERROR, "pthread_cond_init", res);
39
    } else {
40
        gf_check("io", GF_LOG_ERROR, "pthread_condaddr_setclock", res);
41
    }
42

43
    gf_check("io", GF_LOG_WARNING, "pthread_condattr_destroy",
44
             gf_res_err(pthread_condattr_destroy(&attr)));
45

46
    return res;
47
}
48

49
/* Initializes a sync object to synchronize 'count' entities with a maximum
50
 * delay of 'timeout' seconds. */
51
int32_t
52
gf_io_sync_start(gf_io_sync_t *sync, uint32_t count, uint32_t timeout,
53
                 uint32_t retries, void *data)
54
{
55
    int32_t res;
56

57
    res = gf_res_errno0(clock_gettime(CLOCK_MONOTONIC, &sync->abs_to));
58
    if (caa_unlikely(res < 0)) {
59
        return gf_check("io", GF_LOG_ERROR, "clock_gettime", res);
60
    }
61

62
    sync->abs_to.tv_sec += timeout;
63
    sync->data = data;
64
    sync->timeout = timeout;
65
    sync->retries = retries;
66
    sync->count = count;
67
    sync->phase = 0;
68
    sync->pending = count;
69
    sync->res = 0;
70

71
    res = gf_res_err(pthread_mutex_init(&sync->mutex, NULL));
72
    if (caa_likely(res >= 0)) {
73
        res = gf_io_cond_init(&sync->cond);
74
        if (caa_likely(res >= 0)) {
75
            return 0;
76
        }
77

78
        gf_check("io", GF_LOG_WARNING, "pthread_mutex_destroy",
79
                 gf_res_err(pthread_mutex_destroy(&sync->mutex)));
80
    } else {
81
        gf_check("io", GF_LOG_ERROR, "pthread_mutex_init", res);
82
    }
83

84
    return res;
85
}
86

87
/* Destroys a sync object. */
88
static void
89
gf_io_sync_destroy(gf_io_sync_t *sync)
90
{
91
    gf_check("io", GF_LOG_WARNING, "pthread_cond_destroy",
92
             gf_res_err(pthread_cond_destroy(&sync->cond)));
93
    gf_check("io", GF_LOG_WARNING, "pthread_mutex_destroy",
94
             gf_res_err(pthread_mutex_destroy(&sync->mutex)));
95
}
96

97
static int32_t
98
gf_io_sync_wait_timeout(gf_io_sync_t *sync, int32_t retry, bool check)
99
{
100
    int32_t res;
101

102
    if (check) {
103
        if (retry > 0) {
104
            GF_LOG_I("io", LG_MSG_IO_SYNC_COMPLETED(retry));
105
        }
106
        return -1;
107
    }
108

109
    res = gf_res_err(pthread_cond_timedwait(&sync->cond, &sync->mutex,
110
                                            &sync->abs_to));
111
    if (caa_unlikely(res != 0)) {
112
        if (res != -ETIMEDOUT) {
113
            gf_check("io", GF_LOG_ERROR, "pthread_cond_timedwait", res);
114
            GF_ABORT();
115
        }
116

117
        retry++;
118

119
        GF_LOG_W("io", LG_MSG_IO_SYNC_TIMEOUT(retry));
120

121
        if (sync->retries == 0) {
122
            GF_LOG_E("io", LG_MSG_IO_SYNC_ABORTED(retry));
123
            GF_ABORT();
124
        }
125
        sync->retries--;
126

127
        sync->abs_to.tv_sec += sync->timeout;
128
    }
129

130
    return retry;
131
}
132

133
/* Notifies completion of 'count' entities. Optionally it can wait until
134
 * all other threads have also notified. Only one thread can wait. */
135
int32_t
136
gf_io_sync_done(gf_io_sync_t *sync, uint32_t count, int32_t res, bool wait)
137
{
138
    int32_t retry;
139

140
    gf_io_lock(&sync->mutex);
141

142
    sync->pending -= count;
143
    if (!wait) {
144
        if (caa_unlikely(res < 0) && (sync->res >= 0)) {
145
            sync->res = res;
146
        }
147

148
        if (sync->pending == 0) {
149
            gf_succeed("io", "pthread_cond_signal",
150
                       gf_res_err(pthread_cond_signal(&sync->cond)));
151
        }
152

153
        gf_io_unlock(&sync->mutex);
154

155
        return 0;
156
    }
157

158
    retry = 0;
159
    do {
160
        retry = gf_io_sync_wait_timeout(sync, retry, sync->pending == 0);
161
    } while (retry >= 0);
162

163
    res = sync->res;
164

165
    gf_io_unlock(&sync->mutex);
166

167
    gf_io_sync_destroy(sync);
168

169
    return res;
170
}
171

172
/* Wait for a synchronization point. 'count' represents the number of
173
 * entities waiting, and 'res' the result of the operation done just
174
 * before synchronizing. The return value will be 0 only if all entities
175
 * completed without error (i.e. 'res' was >= 0 in all calls to this
176
 * function). */
177
int32_t
178
gf_io_sync_wait(gf_io_sync_t *sync, uint32_t count, int32_t res)
179
{
180
    uint32_t phase;
181
    int32_t retry;
182

183
    gf_io_lock(&sync->mutex);
184

185
    if (caa_unlikely(res < 0) && (sync->res >= 0)) {
186
        sync->res = res;
187
    }
188

189
    sync->pending -= count;
190
    if (sync->pending == 0) {
191
        sync->pending = sync->count;
192
        sync->phase++;
193

194
        gf_succeed("io", "pthread_cond_broadcast",
195
                   gf_res_err(pthread_cond_broadcast(&sync->cond)));
196
    } else {
197
        phase = sync->phase;
198

199
        retry = 0;
200
        do {
201
            retry = gf_io_sync_wait_timeout(sync, retry, sync->phase != phase);
202
        } while (retry >= 0);
203
    }
204

205
    res = sync->res;
206

207
    gf_io_unlock(&sync->mutex);
208

209
    return res;
210
}
211

212
/* Sets the name of the thread. */
213
static int32_t
214
gf_io_thread_name(pthread_t id, const char *code, uint32_t index)
215
{
216
    char name[GF_THREAD_NAME_LIMIT];
217
    int32_t len;
218

219
    len = snprintf(name, sizeof(name), GF_THREAD_NAME_PREFIX "%s/%u", code,
220
                   index);
221
    if (caa_unlikely((len < 0) || (len >= sizeof(name)))) {
222
        GF_LOG_E("io", LG_MSG_IO_THREAD_NAME_INVALID());
223

224
        return -EINVAL;
225
    }
226

227
    return __gf_thread_set_name(id, name);
228
}
229

230
/* Sets the signal mask of the thread. */
231
static int32_t
232
gf_io_thread_mask(int32_t *signals)
233
{
234
    sigset_t set;
235
    int32_t i, res;
236

237
    res = gf_res_errno0(sigfillset(&set));
238
    gf_check("io", GF_LOG_ERROR, "sigfillset", res);
239
    for (i = 0; caa_likely(res >= 0) && (signals[i] != 0); i++) {
240
        res = gf_res_errno0(sigdelset(&set, signals[i]));
241
        gf_check("io", GF_LOG_ERROR, "sigdelset", res);
242
    }
243

244
    if (caa_likely(res >= 0)) {
245
        res = gf_res_err(pthread_sigmask(SIG_BLOCK, &set, NULL));
246
        gf_check("io", GF_LOG_ERROR, "pthread_sigmask", res);
247
    }
248

249
    return res;
250
}
251

252
#ifdef GF_LINUX_HOST_OS
253

254
/* Sets the affinity of the thread. */
255
static int32_t
256
gf_io_thread_affinity(pthread_t id, cpu_set_t *cpus, uint32_t index)
257
{
258
    cpu_set_t affinity;
259
    uint32_t i, current;
260

261
    if (cpus == NULL) {
262
        return 0;
263
    }
264

265
    current = 0;
266
    for (i = 0; i < CPU_SETSIZE; i++) {
267
        if (CPU_ISSET(i, cpus)) {
268
            if (current == index) {
269
                break;
270
            }
271
            current++;
272
        }
273
    }
274
    if (caa_unlikely(i >= CPU_SETSIZE)) {
275
        GF_LOG_E("io", LG_MSG_IO_THREAD_NO_CPU(index));
276
        return -ENODEV;
277
    }
278

279
    CPU_ZERO(&affinity);
280
    CPU_SET(i, &affinity);
281

282
    return gf_check("io", GF_LOG_ERROR, "pthread_setaffinity_np",
283
                    gf_res_err(pthread_setaffinity_np(id, sizeof(affinity),
284
                                                      &affinity)));
285
}
286

287
#endif
288

289
/* Adds a thread to the thread pool. */
290
static void
291
gf_io_thread_add(gf_io_thread_pool_t *pool, gf_io_thread_t *thread)
292
{
293
    thread->pool = pool;
294

295
    gf_io_lock(&pool->mutex);
296

297
    list_add_tail(&thread->list, &pool->threads);
298

299
    gf_io_unlock(&pool->mutex);
300
}
301

302
/* Initialize a thread. */
303
static gf_io_thread_main_t
304
gf_io_thread_init(gf_io_sync_t *sync, gf_io_thread_t *thread)
305
{
306
    gf_io_thread_pool_config_t *cfg;
307
    gf_io_thread_pool_t *pool;
308
    gf_io_thread_main_t start;
309
    int32_t res;
310

311
    cfg = sync->data;
312
    pool = cfg->pool;
313
    start = NULL;
314

315
    /* Sync phase 0: Creation of all threads. */
316

317
    gf_io_thread_add(pool, thread);
318
    if (caa_unlikely(gf_io_sync_wait(sync, 1, 0) < 0)) {
319
        goto done;
320
    }
321

322
    /* Sync phase 1: Configuration of each thread. */
323

324
    thread->id = pthread_self();
325
    thread->index = uatomic_add_return(&cfg->index, 1) - 1;
326

327
    res = gf_io_thread_name(thread->id, cfg->name,
328
                            thread->index + cfg->first_id);
329
    if (caa_likely(res >= 0)) {
330
        res = gf_io_thread_mask(cfg->signals);
331
    }
332

333
#ifdef GF_LINUX_HOST_OS
334
    if (caa_likely(res >= 0)) {
335
        res = gf_io_thread_affinity(thread->id, cfg->cpus, thread->index);
336
    }
337
#endif
338

339
    if (caa_unlikely(gf_io_sync_wait(sync, 1, res) < 0)) {
340
        goto done;
341
    }
342

343
    /* Sync phase 2: Specific initialization. */
344

345
    thread->data = NULL;
346

347
    res = cfg->setup(sync, thread);
348
    if (caa_unlikely(res < 0)) {
349
        goto done;
350
    }
351

352
    start = cfg->main;
353

354
done:
355
    gf_io_sync_done(sync, 1, 0, false);
356

357
    return start;
358
}
359

360
/* Thread main function. */
361
static void *
362
gf_io_thread_main(void *data)
363
{
364
    gf_io_thread_t *thread;
365
    gf_io_thread_main_t start;
366
    int32_t res;
367

368
    thread = &gf_io_thread;
369

370
    start = gf_io_thread_init(data, thread);
371
    if (caa_likely(start != NULL)) {
372
        res = start(thread);
373
        if (caa_unlikely(res < 0)) {
374
            GF_ABORT();
375
        }
376
    }
377

378
    return NULL;
379
}
380

381
/* Add scheduler/priority configuration to a thread attr. */
382
static int32_t
383
gf_io_thread_attr_priority(gf_io_thread_pool_config_t *cfg,
384
                           pthread_attr_t *attr)
385
{
386
    struct sched_param param;
387
    int32_t policy, priority, min, max, res;
388

389
    priority = cfg->priority;
390

391
    if (priority == 0) {
392
        return 0;
393
    }
394

395
    policy = SCHED_FIFO;
396
    if (priority < 0) {
397
        policy = SCHED_RR;
398
        priority = -priority;
399
    }
400
    if (priority > 100) {
401
        GF_LOG_E("io", LG_MSG_IO_THREAD_BAD_PRIORITY(cfg->priority));
402
        return -EINVAL;
403
    }
404

405
    min = gf_res_errno(sched_get_priority_min(policy));
406
    if (caa_unlikely(min < 0)) {
407
        gf_check("io", GF_LOG_ERROR, "sched_get_priority_min", min);
408
        return min;
409
    }
410
    max = gf_res_errno(sched_get_priority_max(policy));
411
    if (caa_unlikely(max < 0)) {
412
        gf_check("io", GF_LOG_ERROR, "sched_get_priority_max", max);
413
        return max;
414
    }
415

416
    memset(&param, 0, sizeof(param));
417
    param.sched_priority = min + priority * (max - min) / 100;
418

419
    res = gf_res_err(pthread_attr_setschedpolicy(attr, policy));
420
    gf_check("io", GF_LOG_ERROR, "pthread_attr_setschedpolicy", res);
421
    if (caa_likely(res >= 0)) {
422
        res = gf_res_err(pthread_attr_setschedparam(attr, &param));
423
        gf_check("io", GF_LOG_ERROR, "pthread_attr_setschedparam", res);
424
    }
425
    if (caa_likely(res >= 0)) {
426
        res = gf_res_err(pthread_attr_setinheritsched(attr,
427
                                                      PTHREAD_EXPLICIT_SCHED));
428
        gf_check("io", GF_LOG_ERROR, "pthread_attr_setinheritsched", res);
429
    }
430

431
    return res;
432
}
433

434
/* Prepare the attrs for a new thread. */
435
static int32_t
436
gf_io_thread_attr(gf_io_thread_pool_config_t *cfg, pthread_attr_t *attr)
437
{
438
    int32_t res;
439

440
    res = gf_res_err(pthread_attr_init(attr));
441
    if (caa_unlikely(res < 0)) {
442
        gf_check("io", GF_LOG_ERROR, "pthread_attr_init", res);
443
        return res;
444
    }
445

446
    res = gf_res_err(pthread_attr_setstacksize(attr, cfg->stack_size));
447
    gf_check("io", GF_LOG_ERROR, "pthread_attr_setstacksize", res);
448
    if (caa_likely(res >= 0)) {
449
        res = gf_io_thread_attr_priority(cfg, attr);
450
    }
451

452
    if (caa_unlikely(res < 0)) {
453
        gf_check("io", GF_LOG_WARNING, "pthread_attr_destroy",
454
                 gf_res_err(pthread_attr_destroy(attr)));
455
    }
456

457
    return res;
458
}
459

460
/* Create threads. */
461
static int32_t
462
gf_io_thread_create(gf_io_thread_pool_config_t *cfg, pthread_t *ids,
463
                    uint32_t *created, void *(*main)(void *), void *data)
464
{
465
    pthread_attr_t attr;
466
    uint32_t i;
467
    int32_t res;
468

469
    i = 0;
470

471
    res = gf_io_thread_attr(cfg, &attr);
472
    if (caa_likely(res >= 0)) {
473
        while (i < cfg->num_threads) {
474
            res = gf_res_err(pthread_create(&ids[i], &attr, main, data));
475
            if (caa_unlikely(res < 0)) {
476
                gf_check("io", GF_LOG_ERROR, "pthread_create", res);
477
                break;
478
            }
479

480
            i++;
481
        }
482

483
        gf_check("io", GF_LOG_WARNING, "pthread_attr_destroy",
484
                 gf_res_err(pthread_attr_destroy(&attr)));
485
    }
486

487
    *created = i;
488

489
    return res;
490
}
491

492
/* Join a thread. */
493
static void
494
gf_io_thread_join(pthread_t thread, struct timespec *timeout)
495
{
496
#ifdef GF_LINUX_HOST_OS
497
    if (timeout != NULL) {
498
        gf_succeed("io", "pthread_timedjoin_np",
499
                   gf_res_err(pthread_timedjoin_np(thread, NULL, timeout)));
500

501
        return;
502
    }
503
#endif /* GF_LINUX_HOST_OS */
504

505
    gf_succeed("io", "pthread_join", gf_res_err(pthread_join(thread, NULL)));
506
}
507

508
/* Initializes as thread pool object. */
509
static int32_t
510
gf_io_thread_pool_init(gf_io_thread_pool_t *pool,
511
                       gf_io_thread_pool_config_t *cfg)
512
{
513
    INIT_LIST_HEAD(&pool->threads);
514
    cfg->pool = pool;
515
    cfg->index = 0;
516

517
    return gf_check("io", GF_LOG_ERROR, "pthread_mutex_init",
518
                    gf_res_err(pthread_mutex_init(&pool->mutex, NULL)));
519
}
520

521
/* Destroys a thread pool object. */
522
static void
523
gf_io_thread_pool_destroy(gf_io_thread_pool_t *pool)
524
{
525
    gf_check("io", GF_LOG_WARNING, "pthread_mutex_destroy",
526
             gf_res_err(pthread_mutex_destroy(&pool->mutex)));
527
}
528

529
/* Start a thread pool. */
530
int32_t
531
gf_io_thread_pool_start(gf_io_thread_pool_t *pool,
532
                        gf_io_thread_pool_config_t *cfg)
533
{
534
    pthread_t ids[cfg->num_threads];
535
    gf_io_sync_t sync;
536
    struct timespec to;
537
    uint32_t created, pending;
538
    int32_t res;
539

540
    res = gf_io_thread_pool_init(pool, cfg);
541
    if (caa_unlikely(res < 0)) {
542
        return res;
543
    }
544

545
    created = 0;
546

547
    res = gf_io_sync_start(&sync, cfg->num_threads + 1, cfg->timeout,
548
                           cfg->retries, cfg);
549
    if (caa_unlikely(res < 0)) {
550
        goto done;
551
    }
552

553
    /* Sync phase 0: Creation of all threads. */
554

555
    res = gf_io_thread_create(cfg, ids, &created, gf_io_thread_main, &sync);
556
    pending = cfg->num_threads - created + 1;
557
    if (caa_unlikely(res < 0)) {
558
        goto done_sync;
559
    }
560

561
    res = gf_io_sync_wait(&sync, pending, res);
562
    if (caa_unlikely(res < 0)) {
563
        goto done_sync;
564
    }
565

566
    /* Sync phase 1: Configuration of each thread. */
567

568
    res = gf_io_sync_wait(&sync, 1, 0);
569
    if (caa_unlikely(res < 0)) {
570
        goto done_sync;
571
    }
572

573
    /* Sync phase 2: Specific initialization. */
574

575
    res = cfg->setup(&sync, NULL);
576

577
done_sync:
578
    gf_io_sync_done(&sync, pending, 0, true);
579

580
done:
581
    if (caa_unlikely(res < 0)) {
582
        gf_succeed("io", "clock_gettime",
583
                   gf_res_err(clock_gettime(CLOCK_REALTIME, &to)));
584
        to.tv_sec += sync.timeout;
585

586
        while (created > 0) {
587
            gf_io_thread_join(ids[--created], &to);
588
        }
589

590
        gf_io_thread_pool_destroy(pool);
591
    }
592

593
    return res;
594
}
595

596
/* Wait for thread pool termination and destroy it. */
597
void
598
gf_io_thread_pool_wait(gf_io_thread_pool_t *pool, uint32_t timeout)
599
{
600
    struct timespec to;
601
    gf_io_thread_t *thread;
602

603
    gf_succeed("io", "clock_gettime",
604
               gf_res_err(clock_gettime(CLOCK_REALTIME, &to)));
605
    to.tv_sec += timeout;
606

607
    /* The list of threads is accessed concurrently only during creation of
608
     * the thread pool. Once created, no one will touch the list, and only
609
     * a single caller to gf_io_thread_pool_wait() is allowed, so it's safe
610
     * to modify the list without taking the lock. */
611

612
    while (!list_empty(&pool->threads)) {
613
        thread = list_first_entry(&pool->threads, gf_io_thread_t, list);
614
        list_del_init(&thread->list);
615

616
        gf_io_thread_join(thread->id, &to);
617
    }
618

619
    gf_io_thread_pool_destroy(pool);
620
}
621

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.