apache-ignite
601 строка · 22.8 Кб
1# Licensed to the Apache Software Foundation (ASF) under one or more
2# contributor license agreements. See the NOTICE file distributed with
3# this work for additional information regarding copyright ownership.
4# The ASF licenses this file to You under the Apache License, Version 2.0
5# (the "License"); you may not use this file except in compliance with
6# the License. You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""
17This module contains the base class to build services aware of Ignite.
18"""
19import os20import random21import re22import signal23import sys24import time25import tempfile26from abc import ABCMeta27from datetime import datetime, timedelta28from enum import IntEnum29from pathlib import Path30from threading import Thread31from filelock import FileLock32
33from ducktape.cluster.remoteaccount import RemoteCommandError34from ducktape.utils.util import wait_until35
36from ignitetest.services.utils import IgniteServiceType37from ignitetest.services.utils.background_thread import BackgroundThreadService38from ignitetest.services.utils.concurrent import CountDownLatch, AtomicValue39from ignitetest.services.utils.ignite_spec import resolve_spec, SHARED_PREPARED_FILE40from ignitetest.services.utils.jmx_utils import ignite_jmx_mixin, JmxClient41from ignitetest.services.utils.jvm_utils import JvmProcessMixin, JvmVersionMixin42from ignitetest.services.utils.log_utils import monitor_log43from ignitetest.services.utils.path import IgnitePathAware44from ignitetest.utils.enum import constructible45
46
47class IgniteAwareService(BackgroundThreadService, IgnitePathAware, JvmProcessMixin, JvmVersionMixin, metaclass=ABCMeta):48"""49The base class to build services aware of Ignite.
50"""
51@constructible52class NetPart(IntEnum):53"""54Network part to emulate failure.
55"""
56INPUT = 057OUTPUT = 158ALL = 259
60def __init__(self, context, config, num_nodes, startup_timeout_sec, shutdown_timeout_sec, main_java_class, modules,61**kwargs):62"""63**kwargs are params that passed to IgniteSpec
64"""
65super().__init__(context, num_nodes)66
67# Ducktape checks a Service implementation attribute 'logs' to get config for logging.68# IgniteAwareService uses IgnitePersistenceAware mixin to override default Service 'log' definition.69self.log_level = "DEBUG"70
71self.config = config72self.main_java_class = main_java_class73self.startup_timeout_sec = startup_timeout_sec74self.shutdown_timeout_sec = shutdown_timeout_sec75self.modules = modules76
77self.spec = resolve_spec(self, **kwargs)78self.init_logs_attribute()79
80self.disconnected_nodes = []81
82@property83def product(self):84return str(self.config.version)85
86@property87def globals(self):88return self.context.globals89
90def start_async(self, **kwargs):91"""92Starts in async way.
93"""
94super().start(**kwargs)95
96def start(self, **kwargs):97self.start_async(**kwargs)98self.await_started()99
100def await_started(self, nodes=None):101"""102Awaits start finished.
103"""
104if self.config.service_type in (IgniteServiceType.NONE, IgniteServiceType.THIN_CLIENT):105return106
107self.logger.info("Waiting for IgniteAware(s) to start ...")108
109self.await_event("Topology snapshot", self.startup_timeout_sec, nodes=nodes, from_the_beginning=True)110
111def start_node(self, node, **kwargs):112self.init_shared(node)113self.init_persistent(node)114
115self.__update_node_log_file(node)116
117super().start_node(node, **kwargs)118
119wait_until(lambda: self.alive(node), timeout_sec=10)120
121ignite_jmx_mixin(node, self)122
123def stop_async(self, force_stop=False, **kwargs):124"""125Stop in async way.
126"""
127super().stop(force_stop, **kwargs)128
129def stop(self, force_stop=False, **kwargs):130self.stop_async(force_stop, **kwargs)131
132# Making this async on FORCE_STOP to eliminate waiting on killing services on tear down.133# Waiting will happen on plain stop() call made by ducktape during same step.134if not force_stop:135self.await_stopped()136
137def await_stopped(self):138"""139Awaits stop finished.
140"""
141self.logger.info("Waiting for IgniteAware(s) to stop ...")142
143for node in self.nodes:144stopped = self.wait_node(node, timeout_sec=self.shutdown_timeout_sec)145assert stopped, "Node %s's worker thread did not stop in %d seconds" % \146(str(node.account), self.shutdown_timeout_sec)147
148for node in self.nodes:149wait_until(lambda: not self.alive(node), timeout_sec=self.shutdown_timeout_sec,150err_msg="Node %s's remote processes failed to stop in %d seconds" %151(str(node.account), self.shutdown_timeout_sec))152
153def stop_node(self, node, force_stop=False, **kwargs):154pids = self.pids(node, self.main_java_class)155
156for pid in pids:157node.account.signal(pid, signal.SIGKILL if force_stop else signal.SIGTERM, allow_fail=False)158
159def clean(self, **kwargs):160self.__restore_iptables()161
162super().clean(**kwargs)163
164def clean_node(self, node, **kwargs):165super().clean_node(node, **kwargs)166
167node.account.ssh("rm -rf -- %s" % self.persistent_root, allow_fail=False)168
169def init_persistent(self, node):170"""171Init persistent directory.
172:param node: Ignite service node.
173"""
174super().init_persistent(node)175
176self._prepare_configs(node)177
178def init_shared(self, node):179"""180Init shared directory. Content of shared directory must be equal on all test nodes.
181:param node: Ignite service node.
182"""
183local_shared_dir = self._init_local_shared()184
185if not os.path.isdir(local_shared_dir):186self.logger.debug("Local shared dir not exists. Nothing to copy. " + str(local_shared_dir))187return188
189node.account.mkdirs(f"{self.persistent_root} {self.shared_root}")190
191for file in os.listdir(local_shared_dir):192self.logger.debug("Copying shared file to node. " + str(file))193node.account.copy_to(os.path.join(local_shared_dir, file), self.shared_root)194
195def _init_local_shared(self):196"""197:return: path to local share folder. Files should be copied on all nodes in `shared_root` folder.
198"""
199local_dir = os.path.join(tempfile.gettempdir(), str(self.context.session_context.session_id))200
201if not self.spec.is_prepare_shared_files(local_dir):202return local_dir203
204with FileLock("init_shared.lock", timeout=120):205if self.spec.is_prepare_shared_files(local_dir):206self.spec.prepare_shared_files(local_dir)207Path(os.path.join(local_dir, SHARED_PREPARED_FILE)).touch()208
209return local_dir210
211def _prepare_configs(self, node):212config = self.spec \213.extend_config(self.config) \214.prepare_for_env(self, node)215
216for name, template in self.spec.config_templates():217config_txt = template.render(service=self, config=config)218
219node.account.create_file(os.path.join(self.config_dir, name), config_txt)220
221self.logger.debug("Config %s for node %s: %s" % (name, node.account.hostname, config_txt))222
223setattr(node, "consistent_id", node.account.externally_routable_ip)224
225def worker(self, idx, node, **kwargs):226cmd = self.spec.command(node)227
228self.logger.debug("Attempting to start Application Service on %s with command: %s" % (str(node.account), cmd))229
230node.account.ssh(cmd)231
232def alive(self, node):233"""234:param node: Ignite service node.
235:return: True if node is alive.
236"""
237return len(self.pids(node, self.main_java_class)) > 0238
239def await_event_on_node(self, evt_message, node, timeout_sec, from_the_beginning=False, backoff_sec=.1,240log_file=None):241"""242Await for specific event message in a node's log file.
243:param evt_message: Event message.
244:param node: Ignite service node.
245:param timeout_sec: Number of seconds to check the condition for before failing.
246:param from_the_beginning: If True, search for message from the beginning of log file.
247:param backoff_sec: Number of seconds to back off between each failure to meet the condition
248before checking again.
249:param log_file: Explicit log file.
250"""
251with monitor_log(node, os.path.join(self.log_dir, log_file) if log_file else node.log_file,252from_the_beginning) as monitor:253monitor.wait_until(evt_message, timeout_sec=timeout_sec, backoff_sec=backoff_sec,254err_msg="Event [%s] was not triggered on '%s' in %d seconds" % (evt_message, node.name,255timeout_sec))256
257def await_event(self, evt_message, timeout_sec, nodes=None, from_the_beginning=False, backoff_sec=.1,258log_file=None):259"""260Await for specific event messages on all nodes.
261:param evt_message: Event message.
262:param timeout_sec: Number of seconds to check the condition for before failing.
263:param nodes: Nodes to await event or None, for all nodes.
264:param from_the_beginning: If True, search for message from the beggining of log file.
265:param backoff_sec: Number of seconds to back off between each failure to meet the condition
266before checking again.
267:param log_file: Explicit log file.
268"""
269if nodes is None:270nodes = self.nodes271
272for node in nodes:273self.await_event_on_node(evt_message, node, timeout_sec, from_the_beginning=from_the_beginning,274backoff_sec=backoff_sec, log_file=log_file)275
276@staticmethod277def event_time(evt_message, node):278"""279Gets the time of specific event message in a node's log file.
280:param evt_message: Pattern to search log for.
281:param node: Ducktape node to searching log.
282:return: Time of found log message matched to pattern or None if not found.
283"""
284stdout = IgniteAwareService.exec_command(node, "grep '%s' %s" % (evt_message, node.log_file))285
286match = re.match("^\\[[^\\[]+\\]", stdout)287
288return datetime.strptime(match.group(), "[%Y-%m-%dT%H:%M:%S,%f]") if match else None289
290def get_event_time_on_node(self, node, log_pattern, from_the_beginning=True, timeout=15):291"""292Extracts event time from ignite log by pattern .
293:param node: ducktape node to search ignite log on.
294:param log_pattern: pattern to search ignite log for.
295:param from_the_beginning: switches searching log from its beginning.
296:param timeout: timeout to wait for the patters in the log.
297"""
298self.await_event_on_node(log_pattern, node, timeout, from_the_beginning=from_the_beginning, backoff_sec=0)299
300return self.event_time(log_pattern, node)301
302def get_event_time(self, evt_message, selector=max):303"""304Gets the time of the specific event from all nodes, using selector.
305:param evt_message: Event message.
306:param selector: Selector function, default is max.
307:return: Minimal event time.
308"""
309return selector(filter(lambda t: t is not None,310map(lambda node: self.event_time(evt_message, node), self.nodes)), default=None)311
312def exec_on_nodes_async(self, nodes, task, simultaneously=True, delay_ms=0, timeout_sec=20):313"""314Executes given task on the nodes.
315:param task: a 'lambda: node'.
316:param simultaneously: Enables or disables simultaneous start of the task on each node.
317:param delay_ms: delay before task run. Begins with 0, grows by delay_ms for each next node in nodes.
318:param timeout_sec: timeout to wait the task.
319"""
320sem = CountDownLatch(len(nodes)) if simultaneously else None321time_holder = AtomicValue()322
323delay = 0324threads = []325
326for node in nodes:327thread = Thread(target=self.__exec_on_node, args=(node, task, sem, delay, time_holder))328
329threads.append(thread)330
331thread.start()332
333delay += delay_ms334
335for thread in threads:336thread.join(timeout_sec)337
338return time_holder.get()339
340def __exec_on_node(self, node, task, start_waiter=None, delay_ms=0, time_holder=None):341try:342if start_waiter:343start_waiter.count_down()344start_waiter.wait()345
346if delay_ms > 0:347time.sleep(delay_ms / 1000.0)348
349if time_holder:350mono = time.monotonic()351timestamp = datetime.now()352
353time_holder.compare_and_set(None, (mono, timestamp))354task(node)355except BaseException:356self.logger.error("async task threw exception:", exc_info=1)357raise358
359@property360def netfilter_store_path(self):361"""362:return: path to store backup of iptables filter
363"""
364return os.path.join(self.temp_dir, "iptables.bak")365
366def drop_network(self, nodes=None, net_part: NetPart = NetPart.ALL):367"""368Disconnects node from cluster.
369:param nodes: Nodes to emulate network failure on.
370:param net_part: Part of network to emulate failure of.
371"""
372if nodes is None:373assert self.num_nodes == 1374nodes = self.nodes375
376for node in nodes:377self.logger.info("Dropping " + str(net_part) + " Ignite connections on '" + node.account.hostname + "' ...")378
379self.__backup_iptables(nodes)380
381return self.exec_on_nodes_async(nodes, lambda n: self.__enable_netfilter(n, net_part))382
383def __enable_netfilter(self, node, net_part: NetPart):384cm_spi = self.config.communication_spi385dsc_spi = self.config.discovery_spi386
387cm_ports = str(cm_spi.local_port) if cm_spi.local_port_range < 1 else str(cm_spi.local_port) + ':' + str(388cm_spi.local_port + cm_spi.local_port_range)389
390dsc_ports = str(dsc_spi.port) if not hasattr(dsc_spi, 'port_range') or dsc_spi.port_range < 1 else str(391dsc_spi.port) + ':' + str(dsc_spi.port + dsc_spi.port_range)392
393settings = ""394
395if net_part in (IgniteAwareService.NetPart.ALL, IgniteAwareService.NetPart.INPUT):396settings = self.__apply_iptables_settings(397node,398f"sudo iptables -I INPUT 1 -p tcp -m multiport --dport {dsc_ports},{cm_ports} -j DROP")399
400if net_part in (IgniteAwareService.NetPart.ALL, IgniteAwareService.NetPart.OUTPUT):401settings = self.__apply_iptables_settings(402node,403f"sudo iptables -I OUTPUT 1 -p tcp -m multiport --dport {dsc_ports},{cm_ports} -j DROP")404
405self.logger.debug("Activated netfilter on '%s':\n%s" % (node.name, settings))406
407def __apply_iptables_settings(self, node, cmd):408# Sets given iptables settings and ensures they were applied.409settings_before = self.__dump_netfilter_settings(node)410
411out, err = IgniteAwareService.exec_command_ex(node, cmd)412
413assert len(out) == 0, \414"Unexpected iptables output on '" + node.name + "': '" + out + "'\n Command: '" + cmd + "'."415assert len(err) == 0, \416"Unexpected iptables output on '" + node.name + "': '" + err + "'.\n Command: '" + cmd + "'."417
418settings = self.__dump_netfilter_settings(node)419
420assert settings_before != settings, \421"iptables settings not set on '" + node.name + "'\n Command: '" + cmd + "'\n ---iptables before---\n" \422+ settings_before + "\n ---iptables after---\n" + settings423
424return settings425
426def __backup_iptables(self, nodes):427# Store current network filter settings.428for node in nodes:429cmd = f"sudo iptables-save | tee {self.netfilter_store_path}"430
431_, err = IgniteAwareService.exec_command_ex(node, cmd)432
433if "Warning: iptables-legacy tables present" in err:434cmd = f"sudo iptables-legacy-save | tee {self.netfilter_store_path}"435
436_, err = IgniteAwareService.exec_command_ex(node, cmd)437
438assert len(err) == 0, "Failed to store iptables rules on '%s': %s" % (node.name, err)439
440self.logger.debug("Netfilter before launch on '%s': %s" % (node.name, self.__dump_netfilter_settings(node)))441
442assert self.disconnected_nodes.count(node) == 0443
444self.disconnected_nodes.append(node)445
446def __restore_iptables(self):447# Restore previous network filter settings.448cmd = f"sudo iptables-restore < {self.netfilter_store_path}"449
450errors = []451
452for node in self.disconnected_nodes:453settings_before = self.__dump_netfilter_settings(node)454
455_, err = IgniteAwareService.exec_command_ex(node, cmd)456
457settings_after = self.__dump_netfilter_settings(node)458
459if len(err) > 0:460errors.append("Failed to restore iptables rules on '%s': %s" % (node.name, err))461elif settings_before == settings_after:462errors.append("iptables settings not restored on '" + node.name + "':\n" + settings_after)463else:464self.logger.debug(465"Netfilter after launch on '%s':\n%s" % (node.name, self.__dump_netfilter_settings(node)))466
467if len(errors) > 0:468self.logger.error("Failed restoring actions:" + os.linesep + os.linesep.join(errors))469
470raise RuntimeError("Unable to restore node states. See the log above.")471
472@staticmethod473def __dump_netfilter_settings(node):474"""475Reads current netfilter settings on the node for debugging purposes.
476"""
477out, err = IgniteAwareService.exec_command_ex(node, "sudo iptables -L -n")478
479if "Warning: iptables-legacy tables present" in err:480out, err = IgniteAwareService.exec_command_ex(node, "sudo iptables-legacy -L -n")481
482assert len(err) == 0, "Failed to dump iptables rules on '%s': %s" % (node.name, err)483
484return out485
486def __update_node_log_file(self, node):487"""488Update the node log file.
489"""
490if not hasattr(node, 'log_file'):491# '*' here is to support LoggerNodeIdAndApplicationAware loggers generates logs like 'ignite-367efed9.log'492# default Ignite configuration uses o.a.i.l.l.Log4jRollingFileAppender generates such files.493node.log_file = os.path.join(self.log_dir, "ignite*.log")494
495cnt = list(node.account.ssh_capture(f'ls {self.log_dir} | '496f'grep -E "^ignite.log(.[0-9]+)?$" | '497f'wc -l', callback=int))[0]498if cnt > 0:499rotated_log = os.path.join(self.log_dir, f"ignite.log.{cnt}")500self.logger.debug(f"rotating {node.log_file} to {rotated_log} on {node.name}")501node.account.ssh(f"mv {node.log_file} {rotated_log}")502
503@staticmethod504def exec_command_ex(node, cmd):505"""Executes the command passed on the given node and returns out and error results as string."""506_, out, err = node.account.ssh_client.exec_command(cmd)507
508return str(out.read(), sys.getdefaultencoding()), str(err.read(), sys.getdefaultencoding())509
510@staticmethod511def exec_command(node, cmd, check_error=True):512"""Executes the command passed on the given node and returns out result as string."""513out, err = IgniteAwareService.exec_command_ex(node, cmd)514
515if check_error:516assert len(err) == 0, f"Command failed: '{cmd}'.\nError: '{err}'"517
518return out519
520def thread_dump(self, node):521"""522Generate thread dump on node.
523:param node: Ignite service node.
524"""
525for pid in self.pids(node, self.main_java_class):526try:527node.account.signal(pid, signal.SIGQUIT, allow_fail=True)528except RemoteCommandError:529self.logger.warn("Could not dump threads on node")530
531@staticmethod532def node_id(node):533"""534Returns node id from its log if started.
535This is a remote call. Reuse its results if possible.
536"""
537regexp = "^>>> Local node \\[ID=([^,]+),.+$"538cmd = "grep -E '%s' %s | sed -r 's/%s/\\1/'" % (regexp, node.log_file, regexp)539
540return IgniteAwareService.exec_command(node, cmd).strip().lower()541
542def restore_from_snapshot(self, snapshot_name: str):543"""544Restore from snapshot.
545:param snapshot_name: Name of Snapshot.
546"""
547snapshot_db = os.path.join(self.snapshots_dir, snapshot_name, "db")548
549for node in self.nodes:550assert len(self.pids(node, self.main_java_class)) == 0551
552node.account.ssh(f'rm -rf {self.database_dir}', allow_fail=False)553node.account.ssh(f'cp -r {snapshot_db} {self.work_dir}', allow_fail=False)554
555def await_rebalance(self, timeout_sec=600):556"""557Waiting for the rebalance to complete.
558For the method, you need to set the
559metric_exporters={'org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi'}
560to the config.
561
562:param timeout_sec: Timeout to wait the rebalance to complete.
563"""
564
565delta_time = datetime.now() + timedelta(seconds=timeout_sec)566
567node = random.choice(self.alive_nodes)568
569rebalanced = False570mbean = JmxClient(node).find_mbean('.*name=cluster')571
572while datetime.now() < delta_time and not rebalanced:573rebalanced = next(mbean.Rebalanced) == 'true'574
575if rebalanced:576return577
578raise TimeoutError(f'Rebalancing was not completed within the time: {timeout_sec} seconds.')579
580@property581def alive_nodes(self) -> list:582"""583Alive nodes.
584
585:return List of alives nodes.
586"""
587return [node for node in self.nodes if self.alive(node)]588
589@staticmethod590def get_file_size(node, file):591out = IgniteAwareService.exec_command(node, f'du -s --block-size=1 {file}')592
593data = out.split("\t")594
595return int(data[0])596
597
598def node_failed_event_pattern(failed_node_id=None):599"""Failed node pattern in log."""600return "Node FAILED: .\\{1,\\}Node \\[id=" + (failed_node_id if failed_node_id else "") + \601".\\{1,\\}\\(isClient\\|client\\)=false"602