apache-ignite

Форк
0
105 строк · 3.8 Кб
1
# Licensed to the Apache Software Foundation (ASF) under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#    http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License
15

16

17
"""
18
This module contains Ignite CDC utility (ignite-cdc.sh) wrapper.
19
"""
20
import os
21
import re
22
import signal
23
from copy import deepcopy
24

25
from ignitetest.services.utils.ignite_spec import envs_to_exports
26
from ignitetest.services.utils.jvm_utils import JvmProcessMixin
27

28

29
class IgniteCdcUtility(JvmProcessMixin):
30
    """
31
    Ignite CDC utility (ignite-cdc.sh) wrapper.
32
    """
33
    BASE_COMMAND = "ignite-cdc.sh"
34
    APP_SERVICE_CLASS = "org.apache.ignite.startup.cmdline.CdcCommandLineStartup"
35

36
    def __init__(self, cluster):
37
        self.cluster = cluster
38
        self.logger = cluster.context.logger
39

40
    def start(self):
41
        """
42
        Start ignite-cdc.sh on cluster nodes.
43
        """
44
        def __start(node):
45
            self.logger.info(f"{self.__service_node_id(node)}: starting {self.BASE_COMMAND}")
46

47
            raw_output = node.account.ssh_capture(
48
                self.__form_cmd(f"{self.BASE_COMMAND} -v {self.cluster.config_file}"))
49

50
            code, output = self.__parse_output(raw_output)
51

52
            self.logger.debug(f"{self.__service_node_id(node)}: {self.BASE_COMMAND} finished with exit code: {code}"
53
                              f"; output: {output}" if code != 0 else "")
54

55
        self.logger.info(f"{self.cluster.service_id}: starting {self.BASE_COMMAND} ...")
56

57
        self.cluster.exec_on_nodes_async(self.cluster.nodes, __start, timeout_sec=1)
58

59
    def stop(self, force_stop=False):
60
        """
61
        Stop ignite-cdc.sh on cluster nodes.
62
        """
63
        def __stop(node):
64
            self.logger.info(f"{self.__service_node_id(node)}: stopping {self.BASE_COMMAND}")
65

66
            pids = self.pids(node, self.APP_SERVICE_CLASS)
67

68
            for pid in pids:
69
                node.account.signal(pid, signal.SIGKILL if force_stop else signal.SIGTERM, allow_fail=False)
70

71
        self.logger.info(f"{self.cluster.service_id}: stopping {self.BASE_COMMAND} ...")
72

73
        self.cluster.exec_on_nodes_async(self.cluster.nodes, __stop)
74

75
    @property
76
    def nodes(self):
77
        return self.cluster.nodes
78

79
    def __service_node_id(self, node):
80
        return f"{self.cluster.service_id} node {self.cluster.idx(node)} on {node.account.hostname}"
81

82
    def __form_cmd(self, cmd):
83
        envs = self.cluster.spec.envs()
84

85
        jvm_opts = deepcopy(self.cluster.spec.jvm_opts)
86

87
        def replace_ports(opt):
88
            for port in ["opencensus.metrics.port", "com.sun.management.jmxremote.port"]:
89
                opt = re.sub(f"-D{port}=(\\d+)", lambda x: f"-D{port}={int(x.group(1)) + 1}", opt)
90

91
            return opt
92

93
        cdc_jvm_opts = list(map(lambda opt: replace_ports(opt), jvm_opts))
94

95
        envs["CDC_JVM_OPTS"] = f"\"{' '.join(cdc_jvm_opts)}\""
96

97
        return (f"{envs_to_exports(envs)} bash " + self.cluster.script(cmd) +
98
                f" 2>&1 | tee -a {os.path.join(self.cluster.log_dir, 'ignite-cdc-console.log')} &")
99

100
    @staticmethod
101
    def __parse_output(raw_output):
102
        exit_code = raw_output.channel_file.channel.recv_exit_status()
103
        output = "".join(raw_output)
104

105
        return exit_code, output
106

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.