apache-ignite

Форк
0
548 строк · 18.1 Кб
1
# Licensed to the Apache Software Foundation (ASF) under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#    http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15

16
"""
17
This module contains control utility wrapper.
18
"""
19
import random
20
import re
21
import socket
22
import time
23
from datetime import datetime, timedelta
24
from typing import NamedTuple
25

26
from ducktape.cluster.remoteaccount import RemoteCommandError
27

28
from ignitetest.services.utils.auth import get_credentials, is_auth_enabled
29
from ignitetest.services.utils.ignite_spec import envs_to_exports
30
from ignitetest.services.utils.ssl.ssl_params import get_ssl_params, is_ssl_enabled, IGNITE_ADMIN_ALIAS
31
from ignitetest.services.utils.jmx_utils import JmxClient
32
from ignitetest.utils.version import V_2_11_0
33

34

35
class ControlUtility:
36
    """
37
    Control utility (control.sh) wrapper.
38
    """
39
    BASE_COMMAND = "control.sh"
40

41
    def __init__(self, cluster, ssl_params=None, username=None, password=None):
42
        self._cluster = cluster
43
        self.logger = cluster.context.logger
44

45
        if ssl_params:
46
            self.ssl_params = ssl_params
47
        elif is_ssl_enabled(cluster.context.globals):
48
            self.ssl_params = get_ssl_params(cluster.context.globals, cluster.shared_root, IGNITE_ADMIN_ALIAS)
49

50
        if username and password:
51
            self.username, self.password = username, password
52
        elif is_auth_enabled(cluster.context.globals):
53
            self.username, self.password = get_credentials(cluster.context.globals)
54

55
    def baseline(self):
56
        """
57
        :return Baseline nodes.
58
        """
59
        return self.cluster_state().baseline
60

61
    def cluster_state(self):
62
        """
63
        :return: Cluster state.
64
        """
65
        result = self.__run("--baseline")
66

67
        return self.__parse_cluster_state(result)
68

69
    def set_baseline(self, baseline):
70
        """
71
        :param baseline: Baseline nodes or topology version to set as baseline.
72
        """
73
        if isinstance(baseline, int):
74
            result = self.__run(f"--baseline version {baseline} --yes")
75
        else:
76
            result = self.__run(
77
                f"--baseline set {','.join([node.account.externally_routable_ip for node in baseline])} --yes")
78

79
        return self.__parse_cluster_state(result)
80

81
    def add_to_baseline(self, nodes):
82
        """
83
        :param nodes: Nodes that should be added to baseline.
84
        """
85
        result = self.__run(
86
            f"--baseline add {','.join([node.account.externally_routable_ip for node in nodes])} --yes")
87

88
        return self.__parse_cluster_state(result)
89

90
    def remove_from_baseline(self, nodes):
91
        """
92
        :param nodes: Nodes that should be removed to baseline.
93
        """
94
        result = self.__run(
95
            f"--baseline remove {','.join([node.account.externally_routable_ip for node in nodes])} --yes")
96

97
        return self.__parse_cluster_state(result)
98

99
    def disable_baseline_auto_adjust(self):
100
        """
101
        Disable baseline auto adjust.
102
        """
103
        return self.__run("--baseline auto_adjust disable --yes")
104

105
    def enable_baseline_auto_adjust(self, timeout=None):
106
        """
107
        Enable baseline auto adjust.
108
        :param timeout: Auto adjust timeout in millis.
109
        """
110
        timeout_str = f"timeout {timeout}" if timeout else ""
111
        return self.__run(f"--baseline auto_adjust enable {timeout_str} --yes")
112

113
    def activate(self):
114
        """
115
        Activate cluster.
116
        """
117
        return self.__run("--activate --yes")
118

119
    def deactivate(self):
120
        """
121
        Deactivate cluster.
122
        """
123
        return self.__run("--deactivate --yes")
124

125
    def tx(self, **kwargs):
126
        """
127
        Get list of transactions, various filters can be applied.
128
        """
129
        output = self.__run(self.__tx_command(**kwargs))
130
        res = self.__parse_tx_list(output)
131
        return res if res else output
132

133
    def tx_info(self, xid):
134
        """
135
        Get verbose transaction info by xid.
136
        """
137
        return self.__parse_tx_info(self.__run(f"--tx --info {xid}"))
138

139
    def tx_kill(self, **kwargs):
140
        """
141
        Kill transaction by xid or by various filter.
142
        """
143
        output = self.__run(self.__tx_command(kill=True, **kwargs))
144
        res = self.__parse_tx_list(output)
145
        return res if res else output
146

147
    def validate_indexes(self):
148
        """
149
        Validate indexes.
150
        """
151
        data = self.__run("--cache validate_indexes")
152

153
        assert ('no issues found.' in data), data
154

155
    def idle_verify(self, cache_names=None):
156
        """
157
        Idle verify.
158
        """
159

160
        if cache_names is None:
161
            data = self.__run("--cache idle_verify")
162
        else:
163
            data = self.__run(f"--cache idle_verify {cache_names}")
164

165
        if self._cluster.config.version < V_2_11_0:
166
            msg = 'idle_verify check has finished, no conflicts have been found.'
167
        else:
168
            msg = 'The check procedure has finished, no conflicts have been found.'
169

170
        assert (msg in data), data
171
        return data
172

173
    def idle_verify_dump(self, node=None):
174
        """
175
        Idle verify dump.
176
        :param node: Node on which the command will be executed and the dump file will be located.
177
        """
178
        data = self.__run("--cache idle_verify --dump", node=node)
179

180
        assert ('IdleVerifyDumpTask successfully' in data), data
181

182
        return re.search(r'/.*.txt', data).group(0)
183

184
    def check_consistency(self, args):
185
        """
186
        Consistency check.
187
        """
188
        data = self.__run(f"--consistency {args} --enable-experimental")
189

190
        assert ('Command [CONSISTENCY] finished with code: 0' in data), data
191
        return data
192

193
    def snapshot_create(self, snapshot_name: str, timeout_sec: int = 60):
194
        """
195
        Create snapshot.
196
        :param snapshot_name: Name of Snapshot.
197
        :param timeout_sec: Timeout to await snapshot to complete.
198
        """
199
        res = self.__run(f"--snapshot create {snapshot_name}")
200

201
        assert "Command [SNAPSHOT] finished with code: 0" in res
202

203
        delta_time = datetime.now() + timedelta(seconds=timeout_sec)
204

205
        while datetime.now() < delta_time:
206
            for node in self._cluster.nodes:
207
                mbean = JmxClient(node).find_mbean('.*name=snapshot.*', negative_pattern='group=views')
208

209
                if snapshot_name != next(mbean.LastSnapshotName, ""):
210
                    continue
211

212
                start_time = int(next(mbean.LastSnapshotStartTime))
213
                end_time = int(next(mbean.LastSnapshotEndTime))
214
                err_msg = next(mbean.LastSnapshotErrorMessage)
215

216
                if (start_time < end_time) and (err_msg == ''):
217
                    assert snapshot_name == next(mbean.LastSnapshotName)
218
                    return
219

220
        raise TimeoutError(f'Failed to wait for the snapshot operation to complete: '
221
                           f'snapshot_name={snapshot_name} in {timeout_sec} seconds.')
222

223
    def snapshot_check(self, snapshot_name: str):
224
        """
225
        Check snapshot.
226
        :param snapshot_name: Name of snapshot.
227
        """
228
        res = self.__run(f"--snapshot check {snapshot_name}")
229

230
        assert "The check procedure has finished, no conflicts have been found." in res
231

232
        return res
233

234
    def start_performance_statistics(self):
235
        """
236
        Start performance statistics collecting in the cluster.
237
        """
238
        output = self.__performance_statistics_cmd("start")
239

240
        assert "Started." in output
241

242
        return output
243

244
    def stop_performance_statistics(self):
245
        """
246
        Stop performance statistics collecting in the cluster.
247
        """
248
        output = self.__performance_statistics_cmd("stop")
249

250
        assert "Stopped." in output
251

252
        return output
253

254
    def rotate_performance_statistics(self):
255
        """
256
        Rotate performance statistics collecting in the cluster.
257
        """
258
        output = self.__performance_statistics_cmd("rotate")
259

260
        assert "Rotated." in output
261

262
        return output
263

264
    def is_performance_statistics_enabled(self):
265
        """
266
        Check status of performance statistics collecting in the cluster.
267
        """
268
        output = self.__performance_statistics_cmd("status")
269

270
        assert "Enabled." in output or "Disabled." in output
271

272
        return "Enabled." in output
273

274
    def run(self, cmd, node=None):
275
        """
276
        Run arbitrary control.sh subcommand.
277
        :param cmd: Command line parameters for the control.sh.
278
        :param node: Node to run the control.sh on.
279
        :return: Output of the commands as a string.
280
        """
281
        return self.__run(cmd, node)
282

283
    def __performance_statistics_cmd(self, sub_command):
284
        return self.__run(f"--performance-statistics {sub_command}")
285

286
    @staticmethod
287
    def __tx_command(**kwargs):
288
        tokens = ["--tx"]
289

290
        if 'xid' in kwargs:
291
            tokens.append(f"--xid {kwargs['xid']}")
292

293
        if kwargs.get('clients'):
294
            tokens.append("--clients")
295

296
        if kwargs.get('servers'):
297
            tokens.append("--servers")
298

299
        if 'min_duration' in kwargs:
300
            tokens.append(f"--min-duration {kwargs.get('min_duration')}")
301

302
        if 'min_size' in kwargs:
303
            tokens.append(f"--min-size {kwargs.get('min_size')}")
304

305
        if 'label_pattern' in kwargs:
306
            tokens.append(f"--label {kwargs['label_pattern']}")
307

308
        if kwargs.get("nodes"):
309
            tokens.append(f"--nodes {','.join(kwargs.get('nodes'))}")
310

311
        if 'limit' in kwargs:
312
            tokens.append(f"--limit {kwargs['limit']}")
313

314
        if 'order' in kwargs:
315
            tokens.append(f"--order {kwargs['order']}")
316

317
        if kwargs.get('kill'):
318
            tokens.append("--kill --yes")
319

320
        return " ".join(tokens)
321

322
    @staticmethod
323
    def __parse_tx_info(output):
324
        tx_info_pattern = re.compile(
325
            "Near XID version: "
326
            "(?P<xid_full>GridCacheVersion \\[topVer=\\d+, order=\\d+, nodeOrder=\\d+(, dataCenterId=\\d+)?\\])\\n\\s+"
327
            "Near XID version \\(UUID\\): (?P<xid>[^\\s]+)\\n\\s+"
328
            "Isolation: (?P<isolation>[^\\s]+)\\n\\s+"
329
            "Concurrency: (?P<concurrency>[^\\s]+)\\n\\s+"
330
            "Timeout: (?P<timeout>\\d+)\\n\\s+"
331
            "Initiator node: (?P<initiator_id>[^\\s]+)\\n\\s+"
332
            "Initiator node \\(consistent ID\\): (?P<initiator_consistent_id>[^\\s+]+)\\n\\s+"
333
            "Label: (?P<label>[^\\s]+)\\n\\s+Topology version: AffinityTopologyVersion "
334
            "\\[topVer=(?P<top_ver>\\d+), minorTopVer=(?P<minor_top_ver>\\d+)\\]\\n\\s+"
335
            "Used caches \\(ID to name\\): {(?P<caches>.*)}\\n\\s+"
336
            "Used cache groups \\(ID to name\\): {(?P<cache_groups>.*)}\\n\\s+"
337
            "States across the cluster: \\[(?P<states>.*)\\]"
338
        )
339

340
        match = tx_info_pattern.search(output)
341

342
        str_fields = ['xid', 'xid_full', 'label', 'timeout', 'isolation', 'concurrency', 'initiator_id',
343
                      'initiator_consistent_id']
344
        dict_fields = ['caches', 'cache_groups']
345

346
        if match:
347
            kwargs = {v: match.group(v) for v in str_fields}
348
            kwargs['timeout'] = int(match.group('timeout'))
349
            kwargs.update({v: parse_dict(match.group(v)) for v in dict_fields})
350
            kwargs['top_ver'] = (int(match.group('top_ver')), int(match.group('minor_top_ver')))
351
            kwargs['states'] = parse_list(match.group('states'))
352

353
            return TxVerboseInfo(**kwargs)
354

355
        return None
356

357
    @staticmethod
358
    def __parse_tx_list(output):
359
        tx_pattern = re.compile(
360
            "Tx: \\[xid=(?P<xid>[^\\s]+), "
361
            "label=(?P<label>[^\\s]+), state=(?P<state>[^\\s]+), "
362
            "startTime=(?P<start_time>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}.\\d{3}), "
363
            "duration=(?P<duration>\\d+)( sec)?, "
364
            "isolation=(?P<isolation>[^\\s]+), concurrency=(?P<concurrency>[^\\s]+), "
365
            "topVer=AffinityTopologyVersion \\[topVer=(?P<top_ver>\\d+), minorTopVer=(?P<minor_top_ver>\\d+)\\], "
366
            "timeout=(?P<timeout>\\d+)( sec)?, size=(?P<size>\\d+), dhtNodes=\\[(?P<dht_nodes>.*)\\], "
367
            "nearXid=(?P<near_xid>[^\\s]+), parentNodeIds=\\[(?P<parent_nodes>.*)\\]\\]")
368

369
        str_fields = ['xid', 'label', 'state', 'isolation', 'concurrency', 'near_xid']
370
        int_fields = ['timeout', 'size', 'duration']
371
        list_fields = ['parent_nodes', 'dht_nodes']
372

373
        tx_list = []
374
        for match in tx_pattern.finditer(output):
375
            kwargs = {v: match.group(v) for v in str_fields}
376
            kwargs.update({v: int(match.group(v)) for v in int_fields})
377
            kwargs['top_ver'] = (int(match.group('top_ver')), int(match.group('minor_top_ver')))
378
            kwargs.update({v: parse_list(match.group(v)) for v in list_fields})
379
            kwargs['start_time'] = time.strptime(match.group('start_time'), "%Y-%m-%d %H:%M:%S.%f")
380
            tx_list.append(TxInfo(**kwargs))
381

382
        return tx_list
383

384
    @staticmethod
385
    def __parse_cluster_state(output):
386
        state_pattern = re.compile("Cluster state: (?P<cluster_state>[^\\s]+)")
387
        topology_pattern = re.compile("Current topology version: (?P<topology_version>\\d+)")
388
        baseline_pattern = re.compile("Consistent(Id|ID)=(?P<consistent_id>[^\\s,]+)"
389
                                      "(,\\sA(ddress|DDRESS)=(?P<address>[^\\s,]+))?"
390
                                      ",\\sS(tate|TATE)=(?P<state>[^\\s,]+)"
391
                                      "(,\\sOrder=(?P<order>\\d+))?")
392

393
        match = state_pattern.search(output)
394
        state = match.group("cluster_state") if match else None
395

396
        match = topology_pattern.search(output)
397
        topology = int(match.group("topology_version")) if match else None
398

399
        baseline = []
400
        for match in baseline_pattern.finditer(output):
401
            node = BaselineNode(consistent_id=match.group("consistent_id"),
402
                                state=match.group("state"),
403
                                address=match.group("address"),
404
                                order=int(match.group("order")) if match.group("order") else None)
405
            baseline.append(node)
406

407
        return ClusterState(state=state, topology_version=topology, baseline=baseline)
408

409
    def __run(self, cmd, node=None):
410
        if node is None:
411
            node = random.choice(self._cluster.alive_nodes)
412

413
        self.logger.debug(f"Run command {cmd} on node {node.name}")
414

415
        node_ip = socket.gethostbyname(node.account.hostname)
416

417
        raw_output = node.account.ssh_capture(self.__form_cmd(node_ip, cmd), allow_fail=True)
418
        code, output = self.__parse_output(raw_output)
419

420
        self.logger.debug(f"Output of command {cmd} on node {node.name}, exited with code {code}, is {output}")
421

422
        if code != 0:
423
            raise ControlUtilityError(node.account, cmd, code, output)
424

425
        return output
426

427
    def __form_cmd(self, node_ip, cmd):
428
        ssl = ""
429
        if hasattr(self, "ssl_params"):
430
            ssl = f" --keystore {self.ssl_params.key_store_path} " \
431
                  f"--keystore-password {self.ssl_params.key_store_password} " \
432
                  f"--truststore {self.ssl_params.trust_store_path} " \
433
                  f"--truststore-password {self.ssl_params.trust_store_password}"
434
        auth = ""
435
        if hasattr(self, "username"):
436
            auth = f" --user {self.username} --password {self.password} "
437

438
        return "%s %s" % \
439
               (envs_to_exports(self.envs()),
440
                self._cluster.script(f"{self.BASE_COMMAND} --host {node_ip} {cmd} {ssl} {auth}"))
441

442
    def envs(self):
443
        """
444
        :return: environment set.
445
        """
446
        return {
447
            'EXCLUDE_TEST_CLASSES': 'true',
448
            'CONTROL_JVM_OPTS': '-Dlog4j.configurationFile=file:' + self._cluster.log_config_file
449
        }
450

451
    @staticmethod
452
    def __parse_output(raw_output):
453
        exit_code = raw_output.channel_file.channel.recv_exit_status()
454
        output = "".join(raw_output)
455

456
        pattern = re.compile("Command \\[[^\\s]*\\] finished with code: (\\d+)")
457
        match = pattern.search(output)
458

459
        if match:
460
            return int(match.group(1)), output
461
        return exit_code, output
462

463
    def __alives(self):
464
        return [node for node in self._cluster.nodes if self._cluster.alive(node)]
465

466

467
class BaselineNode(NamedTuple):
468
    """
469
    Baseline node info.
470
    """
471
    consistent_id: str
472
    state: str
473
    address: str
474
    order: int
475

476

477
class ClusterState(NamedTuple):
478
    """
479
    Cluster state info.
480
    """
481
    state: str
482
    topology_version: int
483
    baseline: list
484

485

486
class TxInfo(NamedTuple):
487
    """
488
    Transaction info.
489
    """
490
    xid: str
491
    near_xid: str
492
    label: str
493
    state: str
494
    start_time: time.struct_time
495
    duration: int
496
    isolation: str
497
    concurrency: str
498
    top_ver: tuple
499
    timeout: int
500
    size: int
501
    dht_nodes: list = []
502
    parent_nodes: list = []
503

504

505
class TxVerboseInfo(NamedTuple):
506
    """
507
    Transaction info returned with --info
508
    """
509
    xid: str
510
    xid_full: str
511
    label: str
512
    isolation: str
513
    concurrency: str
514
    timeout: int
515
    top_ver: tuple
516
    initiator_id: str
517
    initiator_consistent_id: str
518
    caches: dict
519
    cache_groups: dict
520
    states: list
521

522

523
class ControlUtilityError(RemoteCommandError):
524
    """
525
    Error is raised when control utility failed.
526
    """
527

528
    def __init__(self, account, cmd, exit_status, output):
529
        super().__init__(account, cmd, exit_status, "".join(output))
530

531

532
def parse_dict(raw):
533
    """
534
    Parse java Map.toString() to python dict.
535
    """
536
    res = {}
537
    for token in raw.split(','):
538
        key, value = tuple(token.strip().split('='))
539
        res[key] = value
540

541
    return res
542

543

544
def parse_list(raw):
545
    """
546
    Parse java List.toString() to python list
547
    """
548
    return [token.strip() for token in raw.split(',')]
549

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.