kafka

Форк
0
/
docker_sanity_test.py 
238 строк · 11.0 Кб
1
#!/usr/bin/env python
2

3
# Licensed to the Apache Software Foundation (ASF) under one or more
4
# contributor license agreements.  See the NOTICE file distributed with
5
# this work for additional information regarding copyright ownership.
6
# The ASF licenses this file to You under the Apache License, Version 2.0
7
# (the "License"); you may not use this file except in compliance with
8
# the License.  You may obtain a copy of the License at
9
#
10
#    http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS,
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
# See the License for the specific language governing permissions and
16
# limitations under the License.
17

18
import unittest
19
import subprocess
20
from HTMLTestRunner import HTMLTestRunner
21
import test.constants as constants
22
import os
23

24
class DockerSanityTest(unittest.TestCase):
25
    IMAGE="apache/kafka"
26
    FIXTURES_DIR="."
27
    
28
    def resume_container(self):
29
        subprocess.run(["docker", "start", constants.BROKER_CONTAINER])
30

31
    def stop_container(self) -> None:
32
        subprocess.run(["docker", "stop", constants.BROKER_CONTAINER])
33

34
    def update_file(self, filename, old_string, new_string):
35
        with open(filename) as f:
36
            s = f.read()
37
        with open(filename, 'w') as f:
38
            s = s.replace(old_string, new_string)
39
            f.write(s)
40

41
    def start_compose(self, filename) -> None:
42
        self.update_file(filename, "image: {$IMAGE}", f"image: {self.IMAGE}")
43
        self.update_file(f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}", "{$DIR}", self.FIXTURES_DIR)
44
        subprocess.run(["docker-compose", "-f", filename, "up", "-d"])
45
    
46
    def destroy_compose(self, filename) -> None:
47
        subprocess.run(["docker-compose", "-f", filename, "down"])
48
        self.update_file(filename, f"image: {self.IMAGE}", "image: {$IMAGE}")
49
        self.update_file(f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}", self.FIXTURES_DIR, "{$DIR}")
50

51
    def create_topic(self, topic, topic_config):
52
        command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_TOPICS}", "--create", "--topic", topic]
53
        command.extend(topic_config)
54
        subprocess.run(command)
55
        check_command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_TOPICS}", "--list"]
56
        check_command.extend(topic_config)
57
        output = subprocess.check_output(check_command)
58
        if topic in output.decode("utf-8"):
59
            return True
60
        return False
61
        
62
    def produce_message(self, topic, producer_config, key, value):
63
        command = ["echo", f'"{key}:{value}"', "|", f"{self.FIXTURES_DIR}/{constants.KAFKA_CONSOLE_PRODUCER}", "--topic", topic, "--property", "'parse.key=true'", "--property", "'key.separator=:'", "--timeout", f"{constants.CLIENT_TIMEOUT}"]
64
        command.extend(producer_config)
65
        subprocess.run(["bash", "-c", " ".join(command)])
66
    
67
    def consume_message(self, topic, consumer_config):
68
        command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_CONSOLE_CONSUMER}", "--topic", topic, "--property", "'print.key=true'", "--property", "'key.separator=:'", "--from-beginning", "--max-messages", "1", "--timeout-ms", f"{constants.CLIENT_TIMEOUT}"]
69
        command.extend(consumer_config)
70
        message = subprocess.check_output(["bash", "-c", " ".join(command)])
71
        return message.decode("utf-8").strip()
72
    
73
    def get_metrics(self, jmx_tool_config):
74
        command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_RUN_CLASS}", constants.JMX_TOOL]
75
        command.extend(jmx_tool_config)
76
        message = subprocess.check_output(["bash", "-c", " ".join(command)])
77
        return message.decode("utf-8").strip().split()
78
    
79
    def broker_metrics_flow(self):
80
        print(f"Running {constants.BROKER_METRICS_TESTS}")
81
        errors = []
82
        try:
83
            self.assertTrue(self.create_topic(constants.BROKER_METRICS_TEST_TOPIC, ["--bootstrap-server", "localhost:9092"]))
84
        except AssertionError as e:
85
            errors.append(constants.BROKER_METRICS_ERROR_PREFIX + str(e))
86
            return errors
87
        jmx_tool_config = ["--one-time", "--object-name", "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec", "--jmx-url", "service:jmx:rmi:///jndi/rmi://:9101/jmxrmi"]
88
        metrics_before_message = self.get_metrics(jmx_tool_config)
89
        try:
90
            self.assertEqual(len(metrics_before_message), 2)
91
            self.assertEqual(metrics_before_message[0], constants.BROKER_METRICS_HEADING)
92
        except AssertionError as e:
93
            errors.append(constants.BROKER_METRICS_ERROR_PREFIX + str(e))
94
            return errors
95

96
        producer_config = ["--bootstrap-server", "localhost:9092", "--property", "client.id=host"]
97
        self.produce_message(constants.BROKER_METRICS_TEST_TOPIC, producer_config, "key", "message")
98
        consumer_config = ["--bootstrap-server", "localhost:9092", "--property", "auto.offset.reset=earliest"]
99
        message = self.consume_message(constants.BROKER_METRICS_TEST_TOPIC, consumer_config)
100
        try:
101
            self.assertEqual(message, "key:message")
102
        except AssertionError as e:
103
            errors.append(constants.BROKER_METRICS_ERROR_PREFIX + str(e))
104
            return errors
105
        
106
        metrics_after_message = self.get_metrics(jmx_tool_config)
107
        try:
108
            self.assertEqual(len(metrics_before_message), 2)
109
            self.assertEqual(metrics_after_message[0], constants.BROKER_METRICS_HEADING)
110
            before_metrics_data, after_metrics_data = metrics_before_message[1].split(","), metrics_after_message[1].split(",")
111
            self.assertEqual(len(before_metrics_data), len(after_metrics_data))
112
            for i in range(len(before_metrics_data)):
113
                if after_metrics_data[i].replace(".", "").isnumeric():
114
                    self.assertGreaterEqual(float(after_metrics_data[i]), float(before_metrics_data[i]))
115
                else:
116
                    self.assertEqual(after_metrics_data[i], before_metrics_data[i])
117
        except AssertionError as e:
118
            errors.append(constants.BROKER_METRICS_ERROR_PREFIX + str(e))
119

120
        return errors
121

122
    def ssl_flow(self, ssl_broker_port, test_name, test_error_prefix, topic):
123
        print(f"Running {test_name}")
124
        errors = []
125
        try:
126
            self.assertTrue(self.create_topic(topic, ["--bootstrap-server", ssl_broker_port, "--command-config", f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}"]))
127
        except AssertionError as e:
128
            errors.append(test_error_prefix + str(e))
129
            return errors
130

131
        producer_config = ["--bootstrap-server", ssl_broker_port,
132
                           "--producer.config", f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}"]
133
        self.produce_message(topic, producer_config, "key", "message")
134

135
        consumer_config = [
136
            "--bootstrap-server", ssl_broker_port,
137
            "--property", "auto.offset.reset=earliest",
138
            "--consumer.config", f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}",
139
        ]
140
        message = self.consume_message(topic, consumer_config)
141
        try:
142
            self.assertEqual(message, "key:message")
143
        except AssertionError as e:
144
            errors.append(test_error_prefix + str(e))
145
        
146
        return errors
147
    
148
    def broker_restart_flow(self):
149
        print(f"Running {constants.BROKER_RESTART_TESTS}")
150
        errors = []
151
        
152
        try:
153
            self.assertTrue(self.create_topic(constants.BROKER_RESTART_TEST_TOPIC, ["--bootstrap-server", "localhost:9092"]))
154
        except AssertionError as e:
155
            errors.append(constants.BROKER_RESTART_ERROR_PREFIX + str(e))
156
            return errors
157
        
158
        producer_config = ["--bootstrap-server", "localhost:9092", "--property", "client.id=host"]
159
        self.produce_message(constants.BROKER_RESTART_TEST_TOPIC, producer_config, "key", "message")
160

161
        print("Stopping Container")
162
        self.stop_container()
163
        print("Resuming Container")
164
        self.resume_container()
165

166
        consumer_config = ["--bootstrap-server", "localhost:9092", "--property", "auto.offset.reset=earliest"]
167
        message = self.consume_message(constants.BROKER_RESTART_TEST_TOPIC, consumer_config)
168
        try:
169
            self.assertEqual(message, "key:message")
170
        except AssertionError as e:
171
            errors.append(constants.BROKER_RESTART_ERROR_PREFIX + str(e))
172
        
173
        return errors
174

175
    def execute(self):
176
        total_errors = []
177
        try:
178
            total_errors.extend(self.broker_metrics_flow())
179
        except Exception as e:
180
            print(constants.BROKER_METRICS_ERROR_PREFIX, str(e))
181
            total_errors.append(str(e))
182
        try:
183
            total_errors.extend(self.ssl_flow('localhost:9093', constants.SSL_FLOW_TESTS, constants.SSL_ERROR_PREFIX, constants.SSL_TOPIC))
184
        except Exception as e:
185
            print(constants.SSL_ERROR_PREFIX, str(e))
186
            total_errors.append(str(e))
187
        try:
188
            total_errors.extend(self.ssl_flow('localhost:9094', constants.FILE_INPUT_FLOW_TESTS, constants.FILE_INPUT_ERROR_PREFIX, constants.FILE_INPUT_TOPIC))
189
        except Exception as e:
190
            print(constants.FILE_INPUT_ERROR_PREFIX, str(e))
191
            total_errors.append(str(e))
192
        try:
193
            total_errors.extend(self.broker_restart_flow())
194
        except Exception as e:
195
            print(constants.BROKER_RESTART_ERROR_PREFIX, str(e))
196
            total_errors.append(str(e))
197
        
198
        self.assertEqual(total_errors, [])
199

200
class DockerSanityTestCombinedMode(DockerSanityTest):
201
    def setUp(self) -> None:
202
        self.start_compose(f"{self.FIXTURES_DIR}/{constants.COMBINED_MODE_COMPOSE}")
203
    def tearDown(self) -> None:
204
        self.destroy_compose(f"{self.FIXTURES_DIR}/{constants.COMBINED_MODE_COMPOSE}")
205
    def test_bed(self):
206
        self.execute()
207

208
class DockerSanityTestIsolatedMode(DockerSanityTest):
209
    def setUp(self) -> None:
210
        self.start_compose(f"{self.FIXTURES_DIR}/{constants.ISOLATED_MODE_COMPOSE}")
211
    def tearDown(self) -> None:
212
        self.destroy_compose(f"{self.FIXTURES_DIR}/{constants.ISOLATED_MODE_COMPOSE}")
213
    def test_bed(self):
214
        self.execute()
215

216
def run_tests(image, mode, fixtures_dir):
217
    DockerSanityTest.IMAGE = image
218
    DockerSanityTest.FIXTURES_DIR = fixtures_dir
219

220
    test_classes_to_run = []
221
    if mode == "jvm" or mode == "native":
222
        test_classes_to_run = [DockerSanityTestCombinedMode, DockerSanityTestIsolatedMode]
223
    
224
    loader = unittest.TestLoader()
225
    suites_list = []
226
    for test_class in test_classes_to_run:
227
        suite = loader.loadTestsFromTestCase(test_class)
228
        suites_list.append(suite)
229
    combined_suite = unittest.TestSuite(suites_list)
230
    cur_directory = os.path.dirname(os.path.realpath(__file__))
231
    outfile = open(f"{cur_directory}/report_{mode}.html", "w")
232
    runner = HTMLTestRunner.HTMLTestRunner(
233
                stream=outfile,
234
                title=f'Test Report: Apache Kafka {mode.capitalize()} Docker Image',
235
                description='This demonstrates the report output.'
236
                )
237
    result = runner.run(combined_suite)
238
    return result.failure_count
239

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.