apache-ignite
105 строк · 3.8 Кб
1# Licensed to the Apache Software Foundation (ASF) under one or more
2# contributor license agreements. See the NOTICE file distributed with
3# this work for additional information regarding copyright ownership.
4# The ASF licenses this file to You under the Apache License, Version 2.0
5# (the "License"); you may not use this file except in compliance with
6# the License. You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License
15
16
17"""
18This module contains Ignite CDC utility (ignite-cdc.sh) wrapper.
19"""
20import os
21import re
22import signal
23from copy import deepcopy
24
25from ignitetest.services.utils.ignite_spec import envs_to_exports
26from ignitetest.services.utils.jvm_utils import JvmProcessMixin
27
28
29class IgniteCdcUtility(JvmProcessMixin):
30"""
31Ignite CDC utility (ignite-cdc.sh) wrapper.
32"""
33BASE_COMMAND = "ignite-cdc.sh"
34APP_SERVICE_CLASS = "org.apache.ignite.startup.cmdline.CdcCommandLineStartup"
35
36def __init__(self, cluster):
37self.cluster = cluster
38self.logger = cluster.context.logger
39
40def start(self):
41"""
42Start ignite-cdc.sh on cluster nodes.
43"""
44def __start(node):
45self.logger.info(f"{self.__service_node_id(node)}: starting {self.BASE_COMMAND}")
46
47raw_output = node.account.ssh_capture(
48self.__form_cmd(f"{self.BASE_COMMAND} -v {self.cluster.config_file}"))
49
50code, output = self.__parse_output(raw_output)
51
52self.logger.debug(f"{self.__service_node_id(node)}: {self.BASE_COMMAND} finished with exit code: {code}"
53f"; output: {output}" if code != 0 else "")
54
55self.logger.info(f"{self.cluster.service_id}: starting {self.BASE_COMMAND} ...")
56
57self.cluster.exec_on_nodes_async(self.cluster.nodes, __start, timeout_sec=1)
58
59def stop(self, force_stop=False):
60"""
61Stop ignite-cdc.sh on cluster nodes.
62"""
63def __stop(node):
64self.logger.info(f"{self.__service_node_id(node)}: stopping {self.BASE_COMMAND}")
65
66pids = self.pids(node, self.APP_SERVICE_CLASS)
67
68for pid in pids:
69node.account.signal(pid, signal.SIGKILL if force_stop else signal.SIGTERM, allow_fail=False)
70
71self.logger.info(f"{self.cluster.service_id}: stopping {self.BASE_COMMAND} ...")
72
73self.cluster.exec_on_nodes_async(self.cluster.nodes, __stop)
74
75@property
76def nodes(self):
77return self.cluster.nodes
78
79def __service_node_id(self, node):
80return f"{self.cluster.service_id} node {self.cluster.idx(node)} on {node.account.hostname}"
81
82def __form_cmd(self, cmd):
83envs = self.cluster.spec.envs()
84
85jvm_opts = deepcopy(self.cluster.spec.jvm_opts)
86
87def replace_ports(opt):
88for port in ["opencensus.metrics.port", "com.sun.management.jmxremote.port"]:
89opt = re.sub(f"-D{port}=(\\d+)", lambda x: f"-D{port}={int(x.group(1)) + 1}", opt)
90
91return opt
92
93cdc_jvm_opts = list(map(lambda opt: replace_ports(opt), jvm_opts))
94
95envs["CDC_JVM_OPTS"] = f"\"{' '.join(cdc_jvm_opts)}\""
96
97return (f"{envs_to_exports(envs)} bash " + self.cluster.script(cmd) +
98f" 2>&1 | tee -a {os.path.join(self.cluster.log_dir, 'ignite-cdc-console.log')} &")
99
100@staticmethod
101def __parse_output(raw_output):
102exit_code = raw_output.channel_file.channel.recv_exit_status()
103output = "".join(raw_output)
104
105return exit_code, output
106