20
from HTMLTestRunner import HTMLTestRunner
21
import test.constants as constants
24
class DockerSanityTest(unittest.TestCase):
28
def resume_container(self):
29
subprocess.run(["docker", "start", constants.BROKER_CONTAINER])
31
def stop_container(self) -> None:
32
subprocess.run(["docker", "stop", constants.BROKER_CONTAINER])
34
def update_file(self, filename, old_string, new_string):
35
with open(filename) as f:
37
with open(filename, 'w') as f:
38
s = s.replace(old_string, new_string)
41
def start_compose(self, filename) -> None:
42
self.update_file(filename, "image: {$IMAGE}", f"image: {self.IMAGE}")
43
self.update_file(f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}", "{$DIR}", self.FIXTURES_DIR)
44
subprocess.run(["docker-compose", "-f", filename, "up", "-d"])
46
def destroy_compose(self, filename) -> None:
47
subprocess.run(["docker-compose", "-f", filename, "down"])
48
self.update_file(filename, f"image: {self.IMAGE}", "image: {$IMAGE}")
49
self.update_file(f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}", self.FIXTURES_DIR, "{$DIR}")
51
def create_topic(self, topic, topic_config):
52
command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_TOPICS}", "--create", "--topic", topic]
53
command.extend(topic_config)
54
subprocess.run(command)
55
check_command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_TOPICS}", "--list"]
56
check_command.extend(topic_config)
57
output = subprocess.check_output(check_command)
58
if topic in output.decode("utf-8"):
62
def produce_message(self, topic, producer_config, key, value):
63
command = ["echo", f'"{key}:{value}"', "|", f"{self.FIXTURES_DIR}/{constants.KAFKA_CONSOLE_PRODUCER}", "--topic", topic, "--property", "'parse.key=true'", "--property", "'key.separator=:'", "--timeout", f"{constants.CLIENT_TIMEOUT}"]
64
command.extend(producer_config)
65
subprocess.run(["bash", "-c", " ".join(command)])
67
def consume_message(self, topic, consumer_config):
68
command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_CONSOLE_CONSUMER}", "--topic", topic, "--property", "'print.key=true'", "--property", "'key.separator=:'", "--from-beginning", "--max-messages", "1", "--timeout-ms", f"{constants.CLIENT_TIMEOUT}"]
69
command.extend(consumer_config)
70
message = subprocess.check_output(["bash", "-c", " ".join(command)])
71
return message.decode("utf-8").strip()
73
def get_metrics(self, jmx_tool_config):
74
command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_RUN_CLASS}", constants.JMX_TOOL]
75
command.extend(jmx_tool_config)
76
message = subprocess.check_output(["bash", "-c", " ".join(command)])
77
return message.decode("utf-8").strip().split()
79
def broker_metrics_flow(self):
80
print(f"Running {constants.BROKER_METRICS_TESTS}")
83
self.assertTrue(self.create_topic(constants.BROKER_METRICS_TEST_TOPIC, ["--bootstrap-server", "localhost:9092"]))
84
except AssertionError as e:
85
errors.append(constants.BROKER_METRICS_ERROR_PREFIX + str(e))
87
jmx_tool_config = ["--one-time", "--object-name", "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec", "--jmx-url", "service:jmx:rmi:///jndi/rmi://:9101/jmxrmi"]
88
metrics_before_message = self.get_metrics(jmx_tool_config)
90
self.assertEqual(len(metrics_before_message), 2)
91
self.assertEqual(metrics_before_message[0], constants.BROKER_METRICS_HEADING)
92
except AssertionError as e:
93
errors.append(constants.BROKER_METRICS_ERROR_PREFIX + str(e))
96
producer_config = ["--bootstrap-server", "localhost:9092", "--property", "client.id=host"]
97
self.produce_message(constants.BROKER_METRICS_TEST_TOPIC, producer_config, "key", "message")
98
consumer_config = ["--bootstrap-server", "localhost:9092", "--property", "auto.offset.reset=earliest"]
99
message = self.consume_message(constants.BROKER_METRICS_TEST_TOPIC, consumer_config)
101
self.assertEqual(message, "key:message")
102
except AssertionError as e:
103
errors.append(constants.BROKER_METRICS_ERROR_PREFIX + str(e))
106
metrics_after_message = self.get_metrics(jmx_tool_config)
108
self.assertEqual(len(metrics_before_message), 2)
109
self.assertEqual(metrics_after_message[0], constants.BROKER_METRICS_HEADING)
110
before_metrics_data, after_metrics_data = metrics_before_message[1].split(","), metrics_after_message[1].split(",")
111
self.assertEqual(len(before_metrics_data), len(after_metrics_data))
112
for i in range(len(before_metrics_data)):
113
if after_metrics_data[i].replace(".", "").isnumeric():
114
self.assertGreaterEqual(float(after_metrics_data[i]), float(before_metrics_data[i]))
116
self.assertEqual(after_metrics_data[i], before_metrics_data[i])
117
except AssertionError as e:
118
errors.append(constants.BROKER_METRICS_ERROR_PREFIX + str(e))
122
def ssl_flow(self, ssl_broker_port, test_name, test_error_prefix, topic):
123
print(f"Running {test_name}")
126
self.assertTrue(self.create_topic(topic, ["--bootstrap-server", ssl_broker_port, "--command-config", f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}"]))
127
except AssertionError as e:
128
errors.append(test_error_prefix + str(e))
131
producer_config = ["--bootstrap-server", ssl_broker_port,
132
"--producer.config", f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}"]
133
self.produce_message(topic, producer_config, "key", "message")
136
"--bootstrap-server", ssl_broker_port,
137
"--property", "auto.offset.reset=earliest",
138
"--consumer.config", f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}",
140
message = self.consume_message(topic, consumer_config)
142
self.assertEqual(message, "key:message")
143
except AssertionError as e:
144
errors.append(test_error_prefix + str(e))
148
def broker_restart_flow(self):
149
print(f"Running {constants.BROKER_RESTART_TESTS}")
153
self.assertTrue(self.create_topic(constants.BROKER_RESTART_TEST_TOPIC, ["--bootstrap-server", "localhost:9092"]))
154
except AssertionError as e:
155
errors.append(constants.BROKER_RESTART_ERROR_PREFIX + str(e))
158
producer_config = ["--bootstrap-server", "localhost:9092", "--property", "client.id=host"]
159
self.produce_message(constants.BROKER_RESTART_TEST_TOPIC, producer_config, "key", "message")
161
print("Stopping Container")
162
self.stop_container()
163
print("Resuming Container")
164
self.resume_container()
166
consumer_config = ["--bootstrap-server", "localhost:9092", "--property", "auto.offset.reset=earliest"]
167
message = self.consume_message(constants.BROKER_RESTART_TEST_TOPIC, consumer_config)
169
self.assertEqual(message, "key:message")
170
except AssertionError as e:
171
errors.append(constants.BROKER_RESTART_ERROR_PREFIX + str(e))
178
total_errors.extend(self.broker_metrics_flow())
179
except Exception as e:
180
print(constants.BROKER_METRICS_ERROR_PREFIX, str(e))
181
total_errors.append(str(e))
183
total_errors.extend(self.ssl_flow('localhost:9093', constants.SSL_FLOW_TESTS, constants.SSL_ERROR_PREFIX, constants.SSL_TOPIC))
184
except Exception as e:
185
print(constants.SSL_ERROR_PREFIX, str(e))
186
total_errors.append(str(e))
188
total_errors.extend(self.ssl_flow('localhost:9094', constants.FILE_INPUT_FLOW_TESTS, constants.FILE_INPUT_ERROR_PREFIX, constants.FILE_INPUT_TOPIC))
189
except Exception as e:
190
print(constants.FILE_INPUT_ERROR_PREFIX, str(e))
191
total_errors.append(str(e))
193
total_errors.extend(self.broker_restart_flow())
194
except Exception as e:
195
print(constants.BROKER_RESTART_ERROR_PREFIX, str(e))
196
total_errors.append(str(e))
198
self.assertEqual(total_errors, [])
200
class DockerSanityTestCombinedMode(DockerSanityTest):
201
def setUp(self) -> None:
202
self.start_compose(f"{self.FIXTURES_DIR}/{constants.COMBINED_MODE_COMPOSE}")
203
def tearDown(self) -> None:
204
self.destroy_compose(f"{self.FIXTURES_DIR}/{constants.COMBINED_MODE_COMPOSE}")
208
class DockerSanityTestIsolatedMode(DockerSanityTest):
209
def setUp(self) -> None:
210
self.start_compose(f"{self.FIXTURES_DIR}/{constants.ISOLATED_MODE_COMPOSE}")
211
def tearDown(self) -> None:
212
self.destroy_compose(f"{self.FIXTURES_DIR}/{constants.ISOLATED_MODE_COMPOSE}")
216
def run_tests(image, mode, fixtures_dir):
217
DockerSanityTest.IMAGE = image
218
DockerSanityTest.FIXTURES_DIR = fixtures_dir
220
test_classes_to_run = []
221
if mode == "jvm" or mode == "native":
222
test_classes_to_run = [DockerSanityTestCombinedMode, DockerSanityTestIsolatedMode]
224
loader = unittest.TestLoader()
226
for test_class in test_classes_to_run:
227
suite = loader.loadTestsFromTestCase(test_class)
228
suites_list.append(suite)
229
combined_suite = unittest.TestSuite(suites_list)
230
cur_directory = os.path.dirname(os.path.realpath(__file__))
231
outfile = open(f"{cur_directory}/report_{mode}.html", "w")
232
runner = HTMLTestRunner.HTMLTestRunner(
234
title=f'Test Report: Apache Kafka {mode.capitalize()} Docker Image',
235
description='This demonstrates the report output.'
237
result = runner.run(combined_suite)
238
return result.failure_count