apache-ignite
181 строка · 6.3 Кб
1# Licensed to the Apache Software Foundation (ASF) under one or more
2# contributor license agreements. See the NOTICE file distributed with
3# this work for additional information regarding copyright ownership.
4# The ASF licenses this file to You under the Apache License, Version 2.0
5# (the "License"); you may not use this file except in compliance with
6# the License. You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""
17This module contains classes and utilities to start zookeeper cluster for testing ZookeeperDiscovery.
18"""
19
20import os.path21from distutils.version import LooseVersion22
23from ducktape.utils.util import wait_until24
25from ignitetest.services.utils.ducktests_service import DucktestsService26from ignitetest.services.utils.log_utils import monitor_log27from ignitetest.services.utils.path import PathAware28
29
30class ZookeeperSettings:31"""32Settings for zookeeper quorum nodes.
33"""
34def __init__(self, **kwargs):35self.min_session_timeout = kwargs.get('min_session_timeout', 2000)36self.tick_time = kwargs.get('tick_time', self.min_session_timeout // 3)37self.init_limit = kwargs.get('init_limit', 10)38self.sync_limit = kwargs.get('sync_limit', 5)39self.force_sync = kwargs.get('force_sync', 'no')40self.client_port = kwargs.get('client_port', 2181)41
42version = kwargs.get("version")43if version:44if isinstance(version, str):45version = LooseVersion(version)46self.version = version47else:48self.version = LooseVersion("3.5.8")49
50assert self.tick_time <= self.min_session_timeout // 2, "'tick_time' must be <= 'min_session_timeout' / 2"51
52
53class ZookeeperService(DucktestsService, PathAware):54"""55Zookeeper service.
56"""
57LOG_FILENAME = "zookeeper.log"58
59def __init__(self, context, num_nodes, settings=ZookeeperSettings(), start_timeout_sec=60):60super().__init__(context, num_nodes)61self.settings = settings62self.start_timeout_sec = start_timeout_sec63self.init_logs_attribute()64
65@property66def product(self):67return "%s-%s" % ("zookeeper", self.settings.version)68
69@property70def globals(self):71return self.context.globals72
73@property74def log_config_file(self):75return os.path.join(self.persistent_root, "log4j.properties")76
77@property78def config_file(self):79return os.path.join(self.persistent_root, "zookeeper.properties")80
81def start(self, **kwargs):82self.start_async(**kwargs)83self.await_started()84
85def start_async(self, **kwargs):86"""87Starts in async way.
88"""
89super().start(**kwargs)90
91def await_started(self):92self.logger.info("Waiting for Zookeeper quorum...")93
94for node in self.nodes:95self.await_quorum(node, self.start_timeout_sec)96
97self.logger.info("Zookeeper quorum is formed.")98
99def start_node(self, node, **kwargs):100idx = self.idx(node)101
102self.logger.info("Starting Zookeeper node %d on %s", idx, node.account.hostname)103
104self.init_persistent(node)105node.account.ssh(f"echo {idx} > {self.work_dir}/myid")106
107config_file = self.render('zookeeper.properties.j2', settings=self.settings, data_dir=self.work_dir)108node.account.create_file(self.config_file, config_file)109self.logger.info("ZK config %s", config_file)110
111log_config_file = self.render('log4j.properties.j2', log_dir=self.log_dir)112node.account.create_file(self.log_config_file, log_config_file)113
114start_cmd = f"nohup java -cp {os.path.join(self.home_dir, 'lib')}/*:{self.persistent_root} " \115f"org.apache.zookeeper.server.quorum.QuorumPeerMain {self.config_file} >/dev/null 2>&1 &"116
117node.account.ssh(start_cmd)118
119def wait_node(self, node, timeout_sec=20):120wait_until(lambda: not self.alive(node), timeout_sec=timeout_sec)121
122return not self.alive(node)123
124def await_quorum(self, node, timeout):125"""126Await quorum formed on node (leader election ready).
127:param node: Zookeeper service node.
128:param timeout: Wait timeout.
129"""
130with monitor_log(node, self.log_file, from_the_beginning=True) as monitor:131monitor.wait_until(132"LEADER ELECTION TOOK",133timeout_sec=timeout,134err_msg=f"Zookeeper quorum was not formed on {node.account.hostname}"135)136
137@property138def log_file(self):139"""140:return: current log file of node.
141"""
142return os.path.join(self.log_dir, self.LOG_FILENAME)143
144@staticmethod145def java_class_name():146""" The class name of the Zookeeper quorum peers. """147return "org.apache.zookeeper.server.quorum.QuorumPeerMain"148
149def pids(self, node):150"""151Get pids of zookeeper service node.
152:param node: Zookeeper service node.
153:return: List of pids.
154"""
155return node.account.java_pids(self.java_class_name())156
157def alive(self, node):158"""159Check if zookeeper service node is alive.
160:param node: Zookeeper service node.
161:return: True if node is alive
162"""
163return len(self.pids(node)) > 0164
165def connection_string(self):166"""167Form a connection string to zookeeper cluster.
168:return: Connection string.
169"""
170return ','.join([node.account.hostname + ":" + str(2181) for node in self.nodes])171
172def stop_node(self, node, force_stop=False, **kwargs):173idx = self.idx(node)174self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))175node.account.kill_process("zookeeper", clean_shutdown=not force_stop, allow_fail=False)176
177def clean_node(self, node, **kwargs):178super().clean_node(node, **kwargs)179
180self.logger.info("Cleaning Zookeeper node %d on %s", self.idx(node), node.account.hostname)181node.account.ssh(f"rm -rf -- {self.persistent_root}", allow_fail=False)182