loom

Форк
0
/
ThreadPool.cpp 
109 строк · 2.6 Кб
1
/**
2
 * @file ThreadPool.cpp
3
 * @author Michael Fetisov (fetisov.michael@bmstu.ru)
4
 * @brief 
5
 * @version 0.1
6
 * @date 2022-08-26
7
 * 
8
 * @copyright Copyright (c) 2022
9
 * 
10
 * Модифицированная версия пула потоков из книги 
11
 * Anthony Williams. C++ Concurrency in Action. Second Edition
12
 * 
13
 * Оптимизация работы (предотвращение нагрузки процессора) пока очередь пуста
14
 * с использованием std::condition_variable.
15
 * 
16
 */
17

18
#include "simodo/tp/ThreadPool.h"
19
#include "simodo/tp/WorkFunction.h"
20

21
#include <cassert>
22

23
namespace simodo::tp
24
{
25
    ThreadPool::ThreadPool(int number_of_threads, bool start_immediately)
26
        : _number_of_threads(number_of_threads)
27
        , _necessary_to_stop(false)
28
    {
29
        if (_number_of_threads < 0)
30
            _number_of_threads = std::thread::hardware_concurrency();
31

32
        if (start_immediately)
33
            start();
34
    }
35

36
    ThreadPool::~ThreadPool()
37
    {
38
        stop();
39
    }
40

41
    void ThreadPool::submit(Task_interface * task)
42
    {
43
        if (_number_of_threads == 0) {
44
            task->work();
45
            delete task;
46
        }
47
        else {
48
            _task_queue.push(task);    
49
            _waiting_condition.notify_one();
50
        }
51
    }
52

53
    void ThreadPool::submit(std::function<void()> function)
54
    {
55
        submit( new WorkFunction(function) );
56
    }
57

58
    void ThreadPool::start() 
59
    try
60
    {
61
        if (_number_of_threads == 0) 
62
            return;
63

64
        if (!_threads.empty())
65
            return;
66

67
        _threads.reserve(_number_of_threads);
68

69
        for(int i=0; i < _number_of_threads; ++i) 
70
            _threads.push_back(std::thread(&ThreadPool::worker,this));
71
        
72
        _waiting_condition.notify_all();
73
    }
74
    catch(...) {
75
        _necessary_to_stop = true;
76
        _waiting_condition.notify_all();
77
        throw;
78
    }
79

80
    void ThreadPool::stop()
81
    {
82
        _necessary_to_stop = true;
83
        _waiting_condition.notify_all();
84

85
        for(size_t i=0; i < _threads.size(); ++i) 
86
            _threads[i].join();
87

88
        _threads.clear();
89
    }
90

91
    void ThreadPool::worker()
92
    {
93
        while(!_necessary_to_stop || !_task_queue.empty()) {
94
            if (_task_queue.empty()) {
95
                std::unique_lock<std::mutex> locker(_waiting_mutex);
96
                _waiting_condition.wait(locker, [this]{ return !_task_queue.empty() || _necessary_to_stop; });
97
            }
98

99
            Task_interface * task = nullptr;
100

101
            if(_task_queue.try_pop(task)) {
102
                assert(task);
103
                task->work();
104
                delete task;
105
            }
106
        }
107
    }
108

109
}
110

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.