apache-ignite

Форк
0
601 строка · 22.8 Кб
1
# Licensed to the Apache Software Foundation (ASF) under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#    http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15

16
"""
17
This module contains the base class to build services aware of Ignite.
18
"""
19
import os
20
import random
21
import re
22
import signal
23
import sys
24
import time
25
import tempfile
26
from abc import ABCMeta
27
from datetime import datetime, timedelta
28
from enum import IntEnum
29
from pathlib import Path
30
from threading import Thread
31
from filelock import FileLock
32

33
from ducktape.cluster.remoteaccount import RemoteCommandError
34
from ducktape.utils.util import wait_until
35

36
from ignitetest.services.utils import IgniteServiceType
37
from ignitetest.services.utils.background_thread import BackgroundThreadService
38
from ignitetest.services.utils.concurrent import CountDownLatch, AtomicValue
39
from ignitetest.services.utils.ignite_spec import resolve_spec, SHARED_PREPARED_FILE
40
from ignitetest.services.utils.jmx_utils import ignite_jmx_mixin, JmxClient
41
from ignitetest.services.utils.jvm_utils import JvmProcessMixin, JvmVersionMixin
42
from ignitetest.services.utils.log_utils import monitor_log
43
from ignitetest.services.utils.path import IgnitePathAware
44
from ignitetest.utils.enum import constructible
45

46

47
class IgniteAwareService(BackgroundThreadService, IgnitePathAware, JvmProcessMixin, JvmVersionMixin, metaclass=ABCMeta):
48
    """
49
    The base class to build services aware of Ignite.
50
    """
51
    @constructible
52
    class NetPart(IntEnum):
53
        """
54
        Network part to emulate failure.
55
        """
56
        INPUT = 0
57
        OUTPUT = 1
58
        ALL = 2
59

60
    def __init__(self, context, config, num_nodes, startup_timeout_sec, shutdown_timeout_sec, main_java_class, modules,
61
                 **kwargs):
62
        """
63
        **kwargs are params that passed to IgniteSpec
64
        """
65
        super().__init__(context, num_nodes)
66

67
        # Ducktape checks a Service implementation attribute 'logs' to get config for logging.
68
        # IgniteAwareService uses IgnitePersistenceAware mixin to override default Service 'log' definition.
69
        self.log_level = "DEBUG"
70

71
        self.config = config
72
        self.main_java_class = main_java_class
73
        self.startup_timeout_sec = startup_timeout_sec
74
        self.shutdown_timeout_sec = shutdown_timeout_sec
75
        self.modules = modules
76

77
        self.spec = resolve_spec(self, **kwargs)
78
        self.init_logs_attribute()
79

80
        self.disconnected_nodes = []
81

82
    @property
83
    def product(self):
84
        return str(self.config.version)
85

86
    @property
87
    def globals(self):
88
        return self.context.globals
89

90
    def start_async(self, **kwargs):
91
        """
92
        Starts in async way.
93
        """
94
        super().start(**kwargs)
95

96
    def start(self, **kwargs):
97
        self.start_async(**kwargs)
98
        self.await_started()
99

100
    def await_started(self, nodes=None):
101
        """
102
        Awaits start finished.
103
        """
104
        if self.config.service_type in (IgniteServiceType.NONE, IgniteServiceType.THIN_CLIENT):
105
            return
106

107
        self.logger.info("Waiting for IgniteAware(s) to start ...")
108

109
        self.await_event("Topology snapshot", self.startup_timeout_sec, nodes=nodes, from_the_beginning=True)
110

111
    def start_node(self, node, **kwargs):
112
        self.init_shared(node)
113
        self.init_persistent(node)
114

115
        self.__update_node_log_file(node)
116

117
        super().start_node(node, **kwargs)
118

119
        wait_until(lambda: self.alive(node), timeout_sec=10)
120

121
        ignite_jmx_mixin(node, self)
122

123
    def stop_async(self, force_stop=False, **kwargs):
124
        """
125
        Stop in async way.
126
        """
127
        super().stop(force_stop, **kwargs)
128

129
    def stop(self, force_stop=False, **kwargs):
130
        self.stop_async(force_stop, **kwargs)
131

132
        # Making this async on FORCE_STOP to eliminate waiting on killing services on tear down.
133
        # Waiting will happen on plain stop() call made by ducktape during same step.
134
        if not force_stop:
135
            self.await_stopped()
136

137
    def await_stopped(self):
138
        """
139
        Awaits stop finished.
140
        """
141
        self.logger.info("Waiting for IgniteAware(s) to stop ...")
142

143
        for node in self.nodes:
144
            stopped = self.wait_node(node, timeout_sec=self.shutdown_timeout_sec)
145
            assert stopped, "Node %s's worker thread did not stop in %d seconds" % \
146
                            (str(node.account), self.shutdown_timeout_sec)
147

148
        for node in self.nodes:
149
            wait_until(lambda: not self.alive(node), timeout_sec=self.shutdown_timeout_sec,
150
                       err_msg="Node %s's remote processes failed to stop in %d seconds" %
151
                               (str(node.account), self.shutdown_timeout_sec))
152

153
    def stop_node(self, node, force_stop=False, **kwargs):
154
        pids = self.pids(node, self.main_java_class)
155

156
        for pid in pids:
157
            node.account.signal(pid, signal.SIGKILL if force_stop else signal.SIGTERM, allow_fail=False)
158

159
    def clean(self, **kwargs):
160
        self.__restore_iptables()
161

162
        super().clean(**kwargs)
163

164
    def clean_node(self, node, **kwargs):
165
        super().clean_node(node, **kwargs)
166

167
        node.account.ssh("rm -rf -- %s" % self.persistent_root, allow_fail=False)
168

169
    def init_persistent(self, node):
170
        """
171
        Init persistent directory.
172
        :param node: Ignite service node.
173
        """
174
        super().init_persistent(node)
175

176
        self._prepare_configs(node)
177

178
    def init_shared(self, node):
179
        """
180
        Init shared directory. Content of shared directory must be equal on all test nodes.
181
        :param node: Ignite service node.
182
        """
183
        local_shared_dir = self._init_local_shared()
184

185
        if not os.path.isdir(local_shared_dir):
186
            self.logger.debug("Local shared dir not exists. Nothing to copy. " + str(local_shared_dir))
187
            return
188

189
        node.account.mkdirs(f"{self.persistent_root} {self.shared_root}")
190

191
        for file in os.listdir(local_shared_dir):
192
            self.logger.debug("Copying shared file to node. " + str(file))
193
            node.account.copy_to(os.path.join(local_shared_dir, file), self.shared_root)
194

195
    def _init_local_shared(self):
196
        """
197
        :return: path to local share folder. Files should be copied on all nodes in `shared_root` folder.
198
        """
199
        local_dir = os.path.join(tempfile.gettempdir(), str(self.context.session_context.session_id))
200

201
        if not self.spec.is_prepare_shared_files(local_dir):
202
            return local_dir
203

204
        with FileLock("init_shared.lock", timeout=120):
205
            if self.spec.is_prepare_shared_files(local_dir):
206
                self.spec.prepare_shared_files(local_dir)
207
                Path(os.path.join(local_dir, SHARED_PREPARED_FILE)).touch()
208

209
        return local_dir
210

211
    def _prepare_configs(self, node):
212
        config = self.spec \
213
            .extend_config(self.config) \
214
            .prepare_for_env(self, node)
215

216
        for name, template in self.spec.config_templates():
217
            config_txt = template.render(service=self, config=config)
218

219
            node.account.create_file(os.path.join(self.config_dir, name), config_txt)
220

221
            self.logger.debug("Config %s for node %s: %s" % (name, node.account.hostname, config_txt))
222

223
        setattr(node, "consistent_id", node.account.externally_routable_ip)
224

225
    def worker(self, idx, node, **kwargs):
226
        cmd = self.spec.command(node)
227

228
        self.logger.debug("Attempting to start Application Service on %s with command: %s" % (str(node.account), cmd))
229

230
        node.account.ssh(cmd)
231

232
    def alive(self, node):
233
        """
234
        :param node: Ignite service node.
235
        :return: True if node is alive.
236
        """
237
        return len(self.pids(node, self.main_java_class)) > 0
238

239
    def await_event_on_node(self, evt_message, node, timeout_sec, from_the_beginning=False, backoff_sec=.1,
240
                            log_file=None):
241
        """
242
        Await for specific event message in a node's log file.
243
        :param evt_message: Event message.
244
        :param node: Ignite service node.
245
        :param timeout_sec: Number of seconds to check the condition for before failing.
246
        :param from_the_beginning: If True, search for message from the beginning of log file.
247
        :param backoff_sec: Number of seconds to back off between each failure to meet the condition
248
                before checking again.
249
        :param log_file: Explicit log file.
250
        """
251
        with monitor_log(node, os.path.join(self.log_dir, log_file) if log_file else node.log_file,
252
                         from_the_beginning) as monitor:
253
            monitor.wait_until(evt_message, timeout_sec=timeout_sec, backoff_sec=backoff_sec,
254
                               err_msg="Event [%s] was not triggered on '%s' in %d seconds" % (evt_message, node.name,
255
                                                                                               timeout_sec))
256

257
    def await_event(self, evt_message, timeout_sec, nodes=None, from_the_beginning=False, backoff_sec=.1,
258
                    log_file=None):
259
        """
260
        Await for specific event messages on all nodes.
261
        :param evt_message: Event message.
262
        :param timeout_sec: Number of seconds to check the condition for before failing.
263
        :param nodes: Nodes to await event or None, for all nodes.
264
        :param from_the_beginning: If True, search for message from the beggining of log file.
265
        :param backoff_sec: Number of seconds to back off between each failure to meet the condition
266
                before checking again.
267
        :param log_file: Explicit log file.
268
        """
269
        if nodes is None:
270
            nodes = self.nodes
271

272
        for node in nodes:
273
            self.await_event_on_node(evt_message, node, timeout_sec, from_the_beginning=from_the_beginning,
274
                                     backoff_sec=backoff_sec, log_file=log_file)
275

276
    @staticmethod
277
    def event_time(evt_message, node):
278
        """
279
        Gets the time of specific event message in a node's log file.
280
        :param evt_message: Pattern to search log for.
281
        :param node: Ducktape node to searching log.
282
        :return: Time of found log message matched to pattern or None if not found.
283
        """
284
        stdout = IgniteAwareService.exec_command(node, "grep '%s' %s" % (evt_message, node.log_file))
285

286
        match = re.match("^\\[[^\\[]+\\]", stdout)
287

288
        return datetime.strptime(match.group(), "[%Y-%m-%dT%H:%M:%S,%f]") if match else None
289

290
    def get_event_time_on_node(self, node, log_pattern, from_the_beginning=True, timeout=15):
291
        """
292
        Extracts event time from ignite log by pattern .
293
        :param node: ducktape node to search ignite log on.
294
        :param log_pattern: pattern to search ignite log for.
295
        :param from_the_beginning: switches searching log from its beginning.
296
        :param timeout: timeout to wait for the patters in the log.
297
        """
298
        self.await_event_on_node(log_pattern, node, timeout, from_the_beginning=from_the_beginning, backoff_sec=0)
299

300
        return self.event_time(log_pattern, node)
301

302
    def get_event_time(self, evt_message, selector=max):
303
        """
304
        Gets the time of the specific event from all nodes, using selector.
305
        :param evt_message: Event message.
306
        :param selector: Selector function, default is max.
307
        :return: Minimal event time.
308
        """
309
        return selector(filter(lambda t: t is not None,
310
                               map(lambda node: self.event_time(evt_message, node), self.nodes)), default=None)
311

312
    def exec_on_nodes_async(self, nodes, task, simultaneously=True, delay_ms=0, timeout_sec=20):
313
        """
314
        Executes given task on the nodes.
315
        :param task: a 'lambda: node'.
316
        :param simultaneously: Enables or disables simultaneous start of the task on each node.
317
        :param delay_ms: delay before task run. Begins with 0, grows by delay_ms for each next node in nodes.
318
        :param timeout_sec: timeout to wait the task.
319
        """
320
        sem = CountDownLatch(len(nodes)) if simultaneously else None
321
        time_holder = AtomicValue()
322

323
        delay = 0
324
        threads = []
325

326
        for node in nodes:
327
            thread = Thread(target=self.__exec_on_node, args=(node, task, sem, delay, time_holder))
328

329
            threads.append(thread)
330

331
            thread.start()
332

333
            delay += delay_ms
334

335
        for thread in threads:
336
            thread.join(timeout_sec)
337

338
        return time_holder.get()
339

340
    def __exec_on_node(self, node, task, start_waiter=None, delay_ms=0, time_holder=None):
341
        try:
342
            if start_waiter:
343
                start_waiter.count_down()
344
                start_waiter.wait()
345

346
            if delay_ms > 0:
347
                time.sleep(delay_ms / 1000.0)
348

349
            if time_holder:
350
                mono = time.monotonic()
351
                timestamp = datetime.now()
352

353
                time_holder.compare_and_set(None, (mono, timestamp))
354
            task(node)
355
        except BaseException:
356
            self.logger.error("async task threw exception:", exc_info=1)
357
            raise
358

359
    @property
360
    def netfilter_store_path(self):
361
        """
362
        :return: path to store backup of iptables filter
363
        """
364
        return os.path.join(self.temp_dir, "iptables.bak")
365

366
    def drop_network(self, nodes=None, net_part: NetPart = NetPart.ALL):
367
        """
368
        Disconnects node from cluster.
369
        :param nodes: Nodes to emulate network failure on.
370
        :param net_part: Part of network to emulate failure of.
371
        """
372
        if nodes is None:
373
            assert self.num_nodes == 1
374
            nodes = self.nodes
375

376
        for node in nodes:
377
            self.logger.info("Dropping " + str(net_part) + " Ignite connections on '" + node.account.hostname + "' ...")
378

379
        self.__backup_iptables(nodes)
380

381
        return self.exec_on_nodes_async(nodes, lambda n: self.__enable_netfilter(n, net_part))
382

383
    def __enable_netfilter(self, node, net_part: NetPart):
384
        cm_spi = self.config.communication_spi
385
        dsc_spi = self.config.discovery_spi
386

387
        cm_ports = str(cm_spi.local_port) if cm_spi.local_port_range < 1 else str(cm_spi.local_port) + ':' + str(
388
            cm_spi.local_port + cm_spi.local_port_range)
389

390
        dsc_ports = str(dsc_spi.port) if not hasattr(dsc_spi, 'port_range') or dsc_spi.port_range < 1 else str(
391
            dsc_spi.port) + ':' + str(dsc_spi.port + dsc_spi.port_range)
392

393
        settings = ""
394

395
        if net_part in (IgniteAwareService.NetPart.ALL, IgniteAwareService.NetPart.INPUT):
396
            settings = self.__apply_iptables_settings(
397
                node,
398
                f"sudo iptables -I INPUT 1 -p tcp -m multiport --dport {dsc_ports},{cm_ports} -j DROP")
399

400
        if net_part in (IgniteAwareService.NetPart.ALL, IgniteAwareService.NetPart.OUTPUT):
401
            settings = self.__apply_iptables_settings(
402
                node,
403
                f"sudo iptables -I OUTPUT 1 -p tcp -m multiport --dport {dsc_ports},{cm_ports} -j DROP")
404

405
        self.logger.debug("Activated netfilter on '%s':\n%s" % (node.name, settings))
406

407
    def __apply_iptables_settings(self, node, cmd):
408
        # Sets given iptables settings and ensures they were applied.
409
        settings_before = self.__dump_netfilter_settings(node)
410

411
        out, err = IgniteAwareService.exec_command_ex(node, cmd)
412

413
        assert len(out) == 0, \
414
            "Unexpected iptables output on '" + node.name + "': '" + out + "'\n   Command: '" + cmd + "'."
415
        assert len(err) == 0, \
416
            "Unexpected iptables output on '" + node.name + "': '" + err + "'.\n   Command: '" + cmd + "'."
417

418
        settings = self.__dump_netfilter_settings(node)
419

420
        assert settings_before != settings, \
421
            "iptables settings not set on '" + node.name + "'\n   Command: '" + cmd + "'\n   ---iptables before---\n" \
422
            + settings_before + "\n   ---iptables after---\n" + settings
423

424
        return settings
425

426
    def __backup_iptables(self, nodes):
427
        # Store current network filter settings.
428
        for node in nodes:
429
            cmd = f"sudo iptables-save | tee {self.netfilter_store_path}"
430

431
            _, err = IgniteAwareService.exec_command_ex(node, cmd)
432

433
            if "Warning: iptables-legacy tables present" in err:
434
                cmd = f"sudo iptables-legacy-save | tee {self.netfilter_store_path}"
435

436
                _, err = IgniteAwareService.exec_command_ex(node, cmd)
437

438
            assert len(err) == 0, "Failed to store iptables rules on '%s': %s" % (node.name, err)
439

440
            self.logger.debug("Netfilter before launch on '%s': %s" % (node.name, self.__dump_netfilter_settings(node)))
441

442
            assert self.disconnected_nodes.count(node) == 0
443

444
            self.disconnected_nodes.append(node)
445

446
    def __restore_iptables(self):
447
        # Restore previous network filter settings.
448
        cmd = f"sudo iptables-restore < {self.netfilter_store_path}"
449

450
        errors = []
451

452
        for node in self.disconnected_nodes:
453
            settings_before = self.__dump_netfilter_settings(node)
454

455
            _, err = IgniteAwareService.exec_command_ex(node, cmd)
456

457
            settings_after = self.__dump_netfilter_settings(node)
458

459
            if len(err) > 0:
460
                errors.append("Failed to restore iptables rules on '%s': %s" % (node.name, err))
461
            elif settings_before == settings_after:
462
                errors.append("iptables settings not restored on '" + node.name + "':\n" + settings_after)
463
            else:
464
                self.logger.debug(
465
                    "Netfilter after launch on '%s':\n%s" % (node.name, self.__dump_netfilter_settings(node)))
466

467
        if len(errors) > 0:
468
            self.logger.error("Failed restoring actions:" + os.linesep + os.linesep.join(errors))
469

470
            raise RuntimeError("Unable to restore node states. See the log above.")
471

472
    @staticmethod
473
    def __dump_netfilter_settings(node):
474
        """
475
        Reads current netfilter settings on the node for debugging purposes.
476
        """
477
        out, err = IgniteAwareService.exec_command_ex(node, "sudo iptables -L -n")
478

479
        if "Warning: iptables-legacy tables present" in err:
480
            out, err = IgniteAwareService.exec_command_ex(node, "sudo iptables-legacy -L -n")
481

482
        assert len(err) == 0, "Failed to dump iptables rules on '%s': %s" % (node.name, err)
483

484
        return out
485

486
    def __update_node_log_file(self, node):
487
        """
488
        Update the node log file.
489
        """
490
        if not hasattr(node, 'log_file'):
491
            # '*' here is to support LoggerNodeIdAndApplicationAware loggers generates logs like 'ignite-367efed9.log'
492
            # default Ignite configuration uses o.a.i.l.l.Log4jRollingFileAppender generates such files.
493
            node.log_file = os.path.join(self.log_dir, "ignite*.log")
494

495
        cnt = list(node.account.ssh_capture(f'ls {self.log_dir} | '
496
                                            f'grep -E "^ignite.log(.[0-9]+)?$" | '
497
                                            f'wc -l', callback=int))[0]
498
        if cnt > 0:
499
            rotated_log = os.path.join(self.log_dir, f"ignite.log.{cnt}")
500
            self.logger.debug(f"rotating {node.log_file} to {rotated_log} on {node.name}")
501
            node.account.ssh(f"mv {node.log_file} {rotated_log}")
502

503
    @staticmethod
504
    def exec_command_ex(node, cmd):
505
        """Executes the command passed on the given node and returns out and error results as string."""
506
        _, out, err = node.account.ssh_client.exec_command(cmd)
507

508
        return str(out.read(), sys.getdefaultencoding()), str(err.read(), sys.getdefaultencoding())
509

510
    @staticmethod
511
    def exec_command(node, cmd, check_error=True):
512
        """Executes the command passed on the given node and returns out result as string."""
513
        out, err = IgniteAwareService.exec_command_ex(node, cmd)
514

515
        if check_error:
516
            assert len(err) == 0, f"Command failed: '{cmd}'.\nError: '{err}'"
517

518
        return out
519

520
    def thread_dump(self, node):
521
        """
522
        Generate thread dump on node.
523
        :param node: Ignite service node.
524
        """
525
        for pid in self.pids(node, self.main_java_class):
526
            try:
527
                node.account.signal(pid, signal.SIGQUIT, allow_fail=True)
528
            except RemoteCommandError:
529
                self.logger.warn("Could not dump threads on node")
530

531
    @staticmethod
532
    def node_id(node):
533
        """
534
        Returns node id from its log if started.
535
        This is a remote call. Reuse its results if possible.
536
        """
537
        regexp = "^>>> Local node \\[ID=([^,]+),.+$"
538
        cmd = "grep -E '%s' %s | sed -r 's/%s/\\1/'" % (regexp, node.log_file, regexp)
539

540
        return IgniteAwareService.exec_command(node, cmd).strip().lower()
541

542
    def restore_from_snapshot(self, snapshot_name: str):
543
        """
544
        Restore from snapshot.
545
        :param snapshot_name: Name of Snapshot.
546
        """
547
        snapshot_db = os.path.join(self.snapshots_dir, snapshot_name, "db")
548

549
        for node in self.nodes:
550
            assert len(self.pids(node, self.main_java_class)) == 0
551

552
            node.account.ssh(f'rm -rf {self.database_dir}', allow_fail=False)
553
            node.account.ssh(f'cp -r {snapshot_db} {self.work_dir}', allow_fail=False)
554

555
    def await_rebalance(self, timeout_sec=600):
556
        """
557
        Waiting for the rebalance to complete.
558
        For the method, you need to set the
559
        metric_exporters={'org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi'}
560
        to the config.
561

562
        :param timeout_sec: Timeout to wait the rebalance to complete.
563
        """
564

565
        delta_time = datetime.now() + timedelta(seconds=timeout_sec)
566

567
        node = random.choice(self.alive_nodes)
568

569
        rebalanced = False
570
        mbean = JmxClient(node).find_mbean('.*name=cluster')
571

572
        while datetime.now() < delta_time and not rebalanced:
573
            rebalanced = next(mbean.Rebalanced) == 'true'
574

575
        if rebalanced:
576
            return
577

578
        raise TimeoutError(f'Rebalancing was not completed within the time: {timeout_sec} seconds.')
579

580
    @property
581
    def alive_nodes(self) -> list:
582
        """
583
        Alive nodes.
584

585
        :return List of alives nodes.
586
        """
587
        return [node for node in self.nodes if self.alive(node)]
588

589
    @staticmethod
590
    def get_file_size(node, file):
591
        out = IgniteAwareService.exec_command(node, f'du -s --block-size=1 {file}')
592

593
        data = out.split("\t")
594

595
        return int(data[0])
596

597

598
def node_failed_event_pattern(failed_node_id=None):
599
    """Failed node pattern in log."""
600
    return "Node FAILED: .\\{1,\\}Node \\[id=" + (failed_node_id if failed_node_id else "") + \
601
           ".\\{1,\\}\\(isClient\\|client\\)=false"
602

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.