apache-ignite

Форк
0
113 строк · 4.1 Кб
1
# Licensed to the Apache Software Foundation (ASF) under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#    http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15

16
"""
17
Background thread service.
18
"""
19

20
import threading
21
import traceback
22
from abc import ABCMeta, abstractmethod
23

24
from ignitetest.services.utils.ducktests_service import DucktestsService
25

26

27
class BackgroundThreadService(DucktestsService, metaclass=ABCMeta):
28
    """BackgroundThreadService allow to start nodes simultaneously using pool of threads."""
29

30
    def __init__(self, context, num_nodes=None, cluster_spec=None, **kwargs):
31
        super().__init__(context, num_nodes, cluster_spec, **kwargs)
32
        self.worker_threads = {}
33
        self.worker_errors = {}
34
        self.errors = ''
35
        self.lock = threading.RLock()
36

37
    def _protected_worker(self, idx, node, **kwargs):
38
        """Protected worker captures exceptions and makes them available to the main thread.
39

40
        This gives us the ability to propagate exceptions thrown in background threads, if desired.
41
        """
42
        try:
43
            self.worker(idx, node, **kwargs)
44
        except BaseException:
45
            with self.lock:
46
                self.logger.info("BackgroundThreadService threw exception: ")
47
                trace_fmt = traceback.format_exc()
48
                self.logger.info(trace_fmt)
49
                self.worker_errors[threading.currentThread().name] = trace_fmt
50
                if self.errors:
51
                    self.errors += "\n"
52
                self.errors += "%s: %s" % (threading.currentThread().name, trace_fmt)
53

54
            raise
55

56
    @abstractmethod
57
    def worker(self, idx, node, **kwargs):
58
        """
59
        :param idx: Node index
60
        :param node: Cluster node instance
61
        """
62

63
    def start_node(self, node, **kwargs):
64
        idx = self.idx(node)
65

66
        if idx in self.worker_threads and self.worker_threads[idx].is_alive():
67
            raise RuntimeError("Cannot restart node since previous thread is still alive")
68

69
        self.logger.info("Running %s node %d on %s", self.service_id, idx, node.account.hostname)
70
        worker = threading.Thread(
71
            name=self.service_id + "-worker-" + str(idx),
72
            target=self._protected_worker,
73
            args=(idx, node),
74
            kwargs=kwargs
75
        )
76
        worker.daemon = True
77
        worker.start()
78
        self.worker_threads[idx] = worker
79

80
    def wait(self, timeout_sec=600):
81
        """Wait no more than timeout_sec for all worker threads to finish.
82

83
        raise TimeoutException if all worker threads do not finish within timeout_sec
84
        """
85
        super().wait(timeout_sec)
86

87
        self._propagate_exceptions()
88

89
    def stop(self, force_stop=False, **kwargs):
90
        alive_workers = sum(1 for worker in self.worker_threads.values() if worker.is_alive())
91
        if alive_workers > 0:
92
            self.logger.debug(
93
                "Called stop with at least one worker thread is still running: " + str(alive_workers))
94

95
            self.logger.debug("%s" % str(self.worker_threads))
96

97
        super().stop(force_stop, **kwargs)
98

99
        self._propagate_exceptions()
100

101
    def wait_node(self, node, timeout_sec=600):
102
        idx = self.idx(node)
103
        worker_thread = self.worker_threads[idx]
104
        worker_thread.join(timeout_sec)
105
        return not worker_thread.is_alive()
106

107
    def _propagate_exceptions(self):
108
        """
109
        Propagate exceptions thrown in background threads
110
        """
111
        with self.lock:
112
            if len(self.worker_errors) > 0:
113
                raise Exception(self.errors)
114

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.