apache-ignite
548 строк · 18.1 Кб
1# Licensed to the Apache Software Foundation (ASF) under one or more
2# contributor license agreements. See the NOTICE file distributed with
3# this work for additional information regarding copyright ownership.
4# The ASF licenses this file to You under the Apache License, Version 2.0
5# (the "License"); you may not use this file except in compliance with
6# the License. You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16"""
17This module contains control utility wrapper.
18"""
19import random20import re21import socket22import time23from datetime import datetime, timedelta24from typing import NamedTuple25
26from ducktape.cluster.remoteaccount import RemoteCommandError27
28from ignitetest.services.utils.auth import get_credentials, is_auth_enabled29from ignitetest.services.utils.ignite_spec import envs_to_exports30from ignitetest.services.utils.ssl.ssl_params import get_ssl_params, is_ssl_enabled, IGNITE_ADMIN_ALIAS31from ignitetest.services.utils.jmx_utils import JmxClient32from ignitetest.utils.version import V_2_11_033
34
35class ControlUtility:36"""37Control utility (control.sh) wrapper.
38"""
39BASE_COMMAND = "control.sh"40
41def __init__(self, cluster, ssl_params=None, username=None, password=None):42self._cluster = cluster43self.logger = cluster.context.logger44
45if ssl_params:46self.ssl_params = ssl_params47elif is_ssl_enabled(cluster.context.globals):48self.ssl_params = get_ssl_params(cluster.context.globals, cluster.shared_root, IGNITE_ADMIN_ALIAS)49
50if username and password:51self.username, self.password = username, password52elif is_auth_enabled(cluster.context.globals):53self.username, self.password = get_credentials(cluster.context.globals)54
55def baseline(self):56"""57:return Baseline nodes.
58"""
59return self.cluster_state().baseline60
61def cluster_state(self):62"""63:return: Cluster state.
64"""
65result = self.__run("--baseline")66
67return self.__parse_cluster_state(result)68
69def set_baseline(self, baseline):70"""71:param baseline: Baseline nodes or topology version to set as baseline.
72"""
73if isinstance(baseline, int):74result = self.__run(f"--baseline version {baseline} --yes")75else:76result = self.__run(77f"--baseline set {','.join([node.account.externally_routable_ip for node in baseline])} --yes")78
79return self.__parse_cluster_state(result)80
81def add_to_baseline(self, nodes):82"""83:param nodes: Nodes that should be added to baseline.
84"""
85result = self.__run(86f"--baseline add {','.join([node.account.externally_routable_ip for node in nodes])} --yes")87
88return self.__parse_cluster_state(result)89
90def remove_from_baseline(self, nodes):91"""92:param nodes: Nodes that should be removed to baseline.
93"""
94result = self.__run(95f"--baseline remove {','.join([node.account.externally_routable_ip for node in nodes])} --yes")96
97return self.__parse_cluster_state(result)98
99def disable_baseline_auto_adjust(self):100"""101Disable baseline auto adjust.
102"""
103return self.__run("--baseline auto_adjust disable --yes")104
105def enable_baseline_auto_adjust(self, timeout=None):106"""107Enable baseline auto adjust.
108:param timeout: Auto adjust timeout in millis.
109"""
110timeout_str = f"timeout {timeout}" if timeout else ""111return self.__run(f"--baseline auto_adjust enable {timeout_str} --yes")112
113def activate(self):114"""115Activate cluster.
116"""
117return self.__run("--activate --yes")118
119def deactivate(self):120"""121Deactivate cluster.
122"""
123return self.__run("--deactivate --yes")124
125def tx(self, **kwargs):126"""127Get list of transactions, various filters can be applied.
128"""
129output = self.__run(self.__tx_command(**kwargs))130res = self.__parse_tx_list(output)131return res if res else output132
133def tx_info(self, xid):134"""135Get verbose transaction info by xid.
136"""
137return self.__parse_tx_info(self.__run(f"--tx --info {xid}"))138
139def tx_kill(self, **kwargs):140"""141Kill transaction by xid or by various filter.
142"""
143output = self.__run(self.__tx_command(kill=True, **kwargs))144res = self.__parse_tx_list(output)145return res if res else output146
147def validate_indexes(self):148"""149Validate indexes.
150"""
151data = self.__run("--cache validate_indexes")152
153assert ('no issues found.' in data), data154
155def idle_verify(self, cache_names=None):156"""157Idle verify.
158"""
159
160if cache_names is None:161data = self.__run("--cache idle_verify")162else:163data = self.__run(f"--cache idle_verify {cache_names}")164
165if self._cluster.config.version < V_2_11_0:166msg = 'idle_verify check has finished, no conflicts have been found.'167else:168msg = 'The check procedure has finished, no conflicts have been found.'169
170assert (msg in data), data171return data172
173def idle_verify_dump(self, node=None):174"""175Idle verify dump.
176:param node: Node on which the command will be executed and the dump file will be located.
177"""
178data = self.__run("--cache idle_verify --dump", node=node)179
180assert ('IdleVerifyDumpTask successfully' in data), data181
182return re.search(r'/.*.txt', data).group(0)183
184def check_consistency(self, args):185"""186Consistency check.
187"""
188data = self.__run(f"--consistency {args} --enable-experimental")189
190assert ('Command [CONSISTENCY] finished with code: 0' in data), data191return data192
193def snapshot_create(self, snapshot_name: str, timeout_sec: int = 60):194"""195Create snapshot.
196:param snapshot_name: Name of Snapshot.
197:param timeout_sec: Timeout to await snapshot to complete.
198"""
199res = self.__run(f"--snapshot create {snapshot_name}")200
201assert "Command [SNAPSHOT] finished with code: 0" in res202
203delta_time = datetime.now() + timedelta(seconds=timeout_sec)204
205while datetime.now() < delta_time:206for node in self._cluster.nodes:207mbean = JmxClient(node).find_mbean('.*name=snapshot.*', negative_pattern='group=views')208
209if snapshot_name != next(mbean.LastSnapshotName, ""):210continue211
212start_time = int(next(mbean.LastSnapshotStartTime))213end_time = int(next(mbean.LastSnapshotEndTime))214err_msg = next(mbean.LastSnapshotErrorMessage)215
216if (start_time < end_time) and (err_msg == ''):217assert snapshot_name == next(mbean.LastSnapshotName)218return219
220raise TimeoutError(f'Failed to wait for the snapshot operation to complete: '221f'snapshot_name={snapshot_name} in {timeout_sec} seconds.')222
223def snapshot_check(self, snapshot_name: str):224"""225Check snapshot.
226:param snapshot_name: Name of snapshot.
227"""
228res = self.__run(f"--snapshot check {snapshot_name}")229
230assert "The check procedure has finished, no conflicts have been found." in res231
232return res233
234def start_performance_statistics(self):235"""236Start performance statistics collecting in the cluster.
237"""
238output = self.__performance_statistics_cmd("start")239
240assert "Started." in output241
242return output243
244def stop_performance_statistics(self):245"""246Stop performance statistics collecting in the cluster.
247"""
248output = self.__performance_statistics_cmd("stop")249
250assert "Stopped." in output251
252return output253
254def rotate_performance_statistics(self):255"""256Rotate performance statistics collecting in the cluster.
257"""
258output = self.__performance_statistics_cmd("rotate")259
260assert "Rotated." in output261
262return output263
264def is_performance_statistics_enabled(self):265"""266Check status of performance statistics collecting in the cluster.
267"""
268output = self.__performance_statistics_cmd("status")269
270assert "Enabled." in output or "Disabled." in output271
272return "Enabled." in output273
274def run(self, cmd, node=None):275"""276Run arbitrary control.sh subcommand.
277:param cmd: Command line parameters for the control.sh.
278:param node: Node to run the control.sh on.
279:return: Output of the commands as a string.
280"""
281return self.__run(cmd, node)282
283def __performance_statistics_cmd(self, sub_command):284return self.__run(f"--performance-statistics {sub_command}")285
286@staticmethod287def __tx_command(**kwargs):288tokens = ["--tx"]289
290if 'xid' in kwargs:291tokens.append(f"--xid {kwargs['xid']}")292
293if kwargs.get('clients'):294tokens.append("--clients")295
296if kwargs.get('servers'):297tokens.append("--servers")298
299if 'min_duration' in kwargs:300tokens.append(f"--min-duration {kwargs.get('min_duration')}")301
302if 'min_size' in kwargs:303tokens.append(f"--min-size {kwargs.get('min_size')}")304
305if 'label_pattern' in kwargs:306tokens.append(f"--label {kwargs['label_pattern']}")307
308if kwargs.get("nodes"):309tokens.append(f"--nodes {','.join(kwargs.get('nodes'))}")310
311if 'limit' in kwargs:312tokens.append(f"--limit {kwargs['limit']}")313
314if 'order' in kwargs:315tokens.append(f"--order {kwargs['order']}")316
317if kwargs.get('kill'):318tokens.append("--kill --yes")319
320return " ".join(tokens)321
322@staticmethod323def __parse_tx_info(output):324tx_info_pattern = re.compile(325"Near XID version: "326"(?P<xid_full>GridCacheVersion \\[topVer=\\d+, order=\\d+, nodeOrder=\\d+(, dataCenterId=\\d+)?\\])\\n\\s+"327"Near XID version \\(UUID\\): (?P<xid>[^\\s]+)\\n\\s+"328"Isolation: (?P<isolation>[^\\s]+)\\n\\s+"329"Concurrency: (?P<concurrency>[^\\s]+)\\n\\s+"330"Timeout: (?P<timeout>\\d+)\\n\\s+"331"Initiator node: (?P<initiator_id>[^\\s]+)\\n\\s+"332"Initiator node \\(consistent ID\\): (?P<initiator_consistent_id>[^\\s+]+)\\n\\s+"333"Label: (?P<label>[^\\s]+)\\n\\s+Topology version: AffinityTopologyVersion "334"\\[topVer=(?P<top_ver>\\d+), minorTopVer=(?P<minor_top_ver>\\d+)\\]\\n\\s+"335"Used caches \\(ID to name\\): {(?P<caches>.*)}\\n\\s+"336"Used cache groups \\(ID to name\\): {(?P<cache_groups>.*)}\\n\\s+"337"States across the cluster: \\[(?P<states>.*)\\]"338)339
340match = tx_info_pattern.search(output)341
342str_fields = ['xid', 'xid_full', 'label', 'timeout', 'isolation', 'concurrency', 'initiator_id',343'initiator_consistent_id']344dict_fields = ['caches', 'cache_groups']345
346if match:347kwargs = {v: match.group(v) for v in str_fields}348kwargs['timeout'] = int(match.group('timeout'))349kwargs.update({v: parse_dict(match.group(v)) for v in dict_fields})350kwargs['top_ver'] = (int(match.group('top_ver')), int(match.group('minor_top_ver')))351kwargs['states'] = parse_list(match.group('states'))352
353return TxVerboseInfo(**kwargs)354
355return None356
357@staticmethod358def __parse_tx_list(output):359tx_pattern = re.compile(360"Tx: \\[xid=(?P<xid>[^\\s]+), "361"label=(?P<label>[^\\s]+), state=(?P<state>[^\\s]+), "362"startTime=(?P<start_time>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}.\\d{3}), "363"duration=(?P<duration>\\d+)( sec)?, "364"isolation=(?P<isolation>[^\\s]+), concurrency=(?P<concurrency>[^\\s]+), "365"topVer=AffinityTopologyVersion \\[topVer=(?P<top_ver>\\d+), minorTopVer=(?P<minor_top_ver>\\d+)\\], "366"timeout=(?P<timeout>\\d+)( sec)?, size=(?P<size>\\d+), dhtNodes=\\[(?P<dht_nodes>.*)\\], "367"nearXid=(?P<near_xid>[^\\s]+), parentNodeIds=\\[(?P<parent_nodes>.*)\\]\\]")368
369str_fields = ['xid', 'label', 'state', 'isolation', 'concurrency', 'near_xid']370int_fields = ['timeout', 'size', 'duration']371list_fields = ['parent_nodes', 'dht_nodes']372
373tx_list = []374for match in tx_pattern.finditer(output):375kwargs = {v: match.group(v) for v in str_fields}376kwargs.update({v: int(match.group(v)) for v in int_fields})377kwargs['top_ver'] = (int(match.group('top_ver')), int(match.group('minor_top_ver')))378kwargs.update({v: parse_list(match.group(v)) for v in list_fields})379kwargs['start_time'] = time.strptime(match.group('start_time'), "%Y-%m-%d %H:%M:%S.%f")380tx_list.append(TxInfo(**kwargs))381
382return tx_list383
384@staticmethod385def __parse_cluster_state(output):386state_pattern = re.compile("Cluster state: (?P<cluster_state>[^\\s]+)")387topology_pattern = re.compile("Current topology version: (?P<topology_version>\\d+)")388baseline_pattern = re.compile("Consistent(Id|ID)=(?P<consistent_id>[^\\s,]+)"389"(,\\sA(ddress|DDRESS)=(?P<address>[^\\s,]+))?"390",\\sS(tate|TATE)=(?P<state>[^\\s,]+)"391"(,\\sOrder=(?P<order>\\d+))?")392
393match = state_pattern.search(output)394state = match.group("cluster_state") if match else None395
396match = topology_pattern.search(output)397topology = int(match.group("topology_version")) if match else None398
399baseline = []400for match in baseline_pattern.finditer(output):401node = BaselineNode(consistent_id=match.group("consistent_id"),402state=match.group("state"),403address=match.group("address"),404order=int(match.group("order")) if match.group("order") else None)405baseline.append(node)406
407return ClusterState(state=state, topology_version=topology, baseline=baseline)408
409def __run(self, cmd, node=None):410if node is None:411node = random.choice(self._cluster.alive_nodes)412
413self.logger.debug(f"Run command {cmd} on node {node.name}")414
415node_ip = socket.gethostbyname(node.account.hostname)416
417raw_output = node.account.ssh_capture(self.__form_cmd(node_ip, cmd), allow_fail=True)418code, output = self.__parse_output(raw_output)419
420self.logger.debug(f"Output of command {cmd} on node {node.name}, exited with code {code}, is {output}")421
422if code != 0:423raise ControlUtilityError(node.account, cmd, code, output)424
425return output426
427def __form_cmd(self, node_ip, cmd):428ssl = ""429if hasattr(self, "ssl_params"):430ssl = f" --keystore {self.ssl_params.key_store_path} " \431f"--keystore-password {self.ssl_params.key_store_password} " \432f"--truststore {self.ssl_params.trust_store_path} " \433f"--truststore-password {self.ssl_params.trust_store_password}"434auth = ""435if hasattr(self, "username"):436auth = f" --user {self.username} --password {self.password} "437
438return "%s %s" % \439(envs_to_exports(self.envs()),440self._cluster.script(f"{self.BASE_COMMAND} --host {node_ip} {cmd} {ssl} {auth}"))441
442def envs(self):443"""444:return: environment set.
445"""
446return {447'EXCLUDE_TEST_CLASSES': 'true',448'CONTROL_JVM_OPTS': '-Dlog4j.configurationFile=file:' + self._cluster.log_config_file449}450
451@staticmethod452def __parse_output(raw_output):453exit_code = raw_output.channel_file.channel.recv_exit_status()454output = "".join(raw_output)455
456pattern = re.compile("Command \\[[^\\s]*\\] finished with code: (\\d+)")457match = pattern.search(output)458
459if match:460return int(match.group(1)), output461return exit_code, output462
463def __alives(self):464return [node for node in self._cluster.nodes if self._cluster.alive(node)]465
466
467class BaselineNode(NamedTuple):468"""469Baseline node info.
470"""
471consistent_id: str472state: str473address: str474order: int475
476
477class ClusterState(NamedTuple):478"""479Cluster state info.
480"""
481state: str482topology_version: int483baseline: list484
485
486class TxInfo(NamedTuple):487"""488Transaction info.
489"""
490xid: str491near_xid: str492label: str493state: str494start_time: time.struct_time495duration: int496isolation: str497concurrency: str498top_ver: tuple499timeout: int500size: int501dht_nodes: list = []502parent_nodes: list = []503
504
505class TxVerboseInfo(NamedTuple):506"""507Transaction info returned with --info
508"""
509xid: str510xid_full: str511label: str512isolation: str513concurrency: str514timeout: int515top_ver: tuple516initiator_id: str517initiator_consistent_id: str518caches: dict519cache_groups: dict520states: list521
522
523class ControlUtilityError(RemoteCommandError):524"""525Error is raised when control utility failed.
526"""
527
528def __init__(self, account, cmd, exit_status, output):529super().__init__(account, cmd, exit_status, "".join(output))530
531
532def parse_dict(raw):533"""534Parse java Map.toString() to python dict.
535"""
536res = {}537for token in raw.split(','):538key, value = tuple(token.strip().split('='))539res[key] = value540
541return res542
543
544def parse_list(raw):545"""546Parse java List.toString() to python list
547"""
548return [token.strip() for token in raw.split(',')]549