apache-ignite

Форк
0
181 строка · 6.3 Кб
1
# Licensed to the Apache Software Foundation (ASF) under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#    http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15

16
"""
17
This module contains classes and utilities to start zookeeper cluster for testing ZookeeperDiscovery.
18
"""
19

20
import os.path
21
from distutils.version import LooseVersion
22

23
from ducktape.utils.util import wait_until
24

25
from ignitetest.services.utils.ducktests_service import DucktestsService
26
from ignitetest.services.utils.log_utils import monitor_log
27
from ignitetest.services.utils.path import PathAware
28

29

30
class ZookeeperSettings:
31
    """
32
    Settings for zookeeper quorum nodes.
33
    """
34
    def __init__(self, **kwargs):
35
        self.min_session_timeout = kwargs.get('min_session_timeout', 2000)
36
        self.tick_time = kwargs.get('tick_time', self.min_session_timeout // 3)
37
        self.init_limit = kwargs.get('init_limit', 10)
38
        self.sync_limit = kwargs.get('sync_limit', 5)
39
        self.force_sync = kwargs.get('force_sync', 'no')
40
        self.client_port = kwargs.get('client_port', 2181)
41

42
        version = kwargs.get("version")
43
        if version:
44
            if isinstance(version, str):
45
                version = LooseVersion(version)
46
            self.version = version
47
        else:
48
            self.version = LooseVersion("3.5.8")
49

50
        assert self.tick_time <= self.min_session_timeout // 2, "'tick_time' must be <= 'min_session_timeout' / 2"
51

52

53
class ZookeeperService(DucktestsService, PathAware):
54
    """
55
    Zookeeper service.
56
    """
57
    LOG_FILENAME = "zookeeper.log"
58

59
    def __init__(self, context, num_nodes, settings=ZookeeperSettings(), start_timeout_sec=60):
60
        super().__init__(context, num_nodes)
61
        self.settings = settings
62
        self.start_timeout_sec = start_timeout_sec
63
        self.init_logs_attribute()
64

65
    @property
66
    def product(self):
67
        return "%s-%s" % ("zookeeper", self.settings.version)
68

69
    @property
70
    def globals(self):
71
        return self.context.globals
72

73
    @property
74
    def log_config_file(self):
75
        return os.path.join(self.persistent_root, "log4j.properties")
76

77
    @property
78
    def config_file(self):
79
        return os.path.join(self.persistent_root, "zookeeper.properties")
80

81
    def start(self, **kwargs):
82
        self.start_async(**kwargs)
83
        self.await_started()
84

85
    def start_async(self, **kwargs):
86
        """
87
        Starts in async way.
88
        """
89
        super().start(**kwargs)
90

91
    def await_started(self):
92
        self.logger.info("Waiting for Zookeeper quorum...")
93

94
        for node in self.nodes:
95
            self.await_quorum(node, self.start_timeout_sec)
96

97
        self.logger.info("Zookeeper quorum is formed.")
98

99
    def start_node(self, node, **kwargs):
100
        idx = self.idx(node)
101

102
        self.logger.info("Starting Zookeeper node %d on %s", idx, node.account.hostname)
103

104
        self.init_persistent(node)
105
        node.account.ssh(f"echo {idx} > {self.work_dir}/myid")
106

107
        config_file = self.render('zookeeper.properties.j2', settings=self.settings, data_dir=self.work_dir)
108
        node.account.create_file(self.config_file, config_file)
109
        self.logger.info("ZK config %s", config_file)
110

111
        log_config_file = self.render('log4j.properties.j2', log_dir=self.log_dir)
112
        node.account.create_file(self.log_config_file, log_config_file)
113

114
        start_cmd = f"nohup java -cp {os.path.join(self.home_dir, 'lib')}/*:{self.persistent_root} " \
115
                    f"org.apache.zookeeper.server.quorum.QuorumPeerMain {self.config_file} >/dev/null 2>&1 &"
116

117
        node.account.ssh(start_cmd)
118

119
    def wait_node(self, node, timeout_sec=20):
120
        wait_until(lambda: not self.alive(node), timeout_sec=timeout_sec)
121

122
        return not self.alive(node)
123

124
    def await_quorum(self, node, timeout):
125
        """
126
        Await quorum formed on node (leader election ready).
127
        :param node:  Zookeeper service node.
128
        :param timeout: Wait timeout.
129
        """
130
        with monitor_log(node, self.log_file, from_the_beginning=True) as monitor:
131
            monitor.wait_until(
132
                "LEADER ELECTION TOOK",
133
                timeout_sec=timeout,
134
                err_msg=f"Zookeeper quorum was not formed on {node.account.hostname}"
135
            )
136

137
    @property
138
    def log_file(self):
139
        """
140
        :return: current log file of node.
141
        """
142
        return os.path.join(self.log_dir, self.LOG_FILENAME)
143

144
    @staticmethod
145
    def java_class_name():
146
        """ The class name of the Zookeeper quorum peers. """
147
        return "org.apache.zookeeper.server.quorum.QuorumPeerMain"
148

149
    def pids(self, node):
150
        """
151
        Get pids of zookeeper service node.
152
        :param node: Zookeeper service node.
153
        :return: List of pids.
154
        """
155
        return node.account.java_pids(self.java_class_name())
156

157
    def alive(self, node):
158
        """
159
        Check if zookeeper service node is alive.
160
        :param node: Zookeeper service node.
161
        :return: True if node is alive
162
        """
163
        return len(self.pids(node)) > 0
164

165
    def connection_string(self):
166
        """
167
        Form a connection string to zookeeper cluster.
168
        :return: Connection string.
169
        """
170
        return ','.join([node.account.hostname + ":" + str(2181) for node in self.nodes])
171

172
    def stop_node(self, node, force_stop=False, **kwargs):
173
        idx = self.idx(node)
174
        self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
175
        node.account.kill_process("zookeeper", clean_shutdown=not force_stop, allow_fail=False)
176

177
    def clean_node(self, node, **kwargs):
178
        super().clean_node(node, **kwargs)
179

180
        self.logger.info("Cleaning Zookeeper node %d on %s", self.idx(node), node.account.hostname)
181
        node.account.ssh(f"rm -rf -- {self.persistent_root}", allow_fail=False)
182

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.