apache-ignite
113 строк · 4.1 Кб
1# Licensed to the Apache Software Foundation (ASF) under one or more
2# contributor license agreements. See the NOTICE file distributed with
3# this work for additional information regarding copyright ownership.
4# The ASF licenses this file to You under the Apache License, Version 2.0
5# (the "License"); you may not use this file except in compliance with
6# the License. You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""
17Background thread service.
18"""
19
20import threading21import traceback22from abc import ABCMeta, abstractmethod23
24from ignitetest.services.utils.ducktests_service import DucktestsService25
26
27class BackgroundThreadService(DucktestsService, metaclass=ABCMeta):28"""BackgroundThreadService allow to start nodes simultaneously using pool of threads."""29
30def __init__(self, context, num_nodes=None, cluster_spec=None, **kwargs):31super().__init__(context, num_nodes, cluster_spec, **kwargs)32self.worker_threads = {}33self.worker_errors = {}34self.errors = ''35self.lock = threading.RLock()36
37def _protected_worker(self, idx, node, **kwargs):38"""Protected worker captures exceptions and makes them available to the main thread.39
40This gives us the ability to propagate exceptions thrown in background threads, if desired.
41"""
42try:43self.worker(idx, node, **kwargs)44except BaseException:45with self.lock:46self.logger.info("BackgroundThreadService threw exception: ")47trace_fmt = traceback.format_exc()48self.logger.info(trace_fmt)49self.worker_errors[threading.currentThread().name] = trace_fmt50if self.errors:51self.errors += "\n"52self.errors += "%s: %s" % (threading.currentThread().name, trace_fmt)53
54raise55
56@abstractmethod57def worker(self, idx, node, **kwargs):58"""59:param idx: Node index
60:param node: Cluster node instance
61"""
62
63def start_node(self, node, **kwargs):64idx = self.idx(node)65
66if idx in self.worker_threads and self.worker_threads[idx].is_alive():67raise RuntimeError("Cannot restart node since previous thread is still alive")68
69self.logger.info("Running %s node %d on %s", self.service_id, idx, node.account.hostname)70worker = threading.Thread(71name=self.service_id + "-worker-" + str(idx),72target=self._protected_worker,73args=(idx, node),74kwargs=kwargs75)76worker.daemon = True77worker.start()78self.worker_threads[idx] = worker79
80def wait(self, timeout_sec=600):81"""Wait no more than timeout_sec for all worker threads to finish.82
83raise TimeoutException if all worker threads do not finish within timeout_sec
84"""
85super().wait(timeout_sec)86
87self._propagate_exceptions()88
89def stop(self, force_stop=False, **kwargs):90alive_workers = sum(1 for worker in self.worker_threads.values() if worker.is_alive())91if alive_workers > 0:92self.logger.debug(93"Called stop with at least one worker thread is still running: " + str(alive_workers))94
95self.logger.debug("%s" % str(self.worker_threads))96
97super().stop(force_stop, **kwargs)98
99self._propagate_exceptions()100
101def wait_node(self, node, timeout_sec=600):102idx = self.idx(node)103worker_thread = self.worker_threads[idx]104worker_thread.join(timeout_sec)105return not worker_thread.is_alive()106
107def _propagate_exceptions(self):108"""109Propagate exceptions thrown in background threads
110"""
111with self.lock:112if len(self.worker_errors) > 0:113raise Exception(self.errors)114