oceanbase

Форк
0
/
upgrade_checker.py 
770 строк · 31.8 Кб
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3

4
import sys
5
import os
6
import mysql.connector
7
from mysql.connector import errorcode
8
import logging
9
import getopt
10
import time
11

12
class UpgradeParams:
13
  log_filename = 'upgrade_checker.log'
14
  old_version = '4.0.0.0'
15
#### --------------start : my_error.py --------------
16
class MyError(Exception):
17
  def __init__(self, value):
18
    self.value = value
19
  def __str__(self):
20
    return repr(self.value)
21
#### --------------start : actions.py------------
22
class Cursor:
23
  __cursor = None
24
  def __init__(self, cursor):
25
    self.__cursor = cursor
26
  def exec_sql(self, sql, print_when_succ = True):
27
    try:
28
      self.__cursor.execute(sql)
29
      rowcount = self.__cursor.rowcount
30
      if True == print_when_succ:
31
        logging.info('succeed to execute sql: %s, rowcount = %d', sql, rowcount)
32
      return rowcount
33
    except mysql.connector.Error, e:
34
      logging.exception('mysql connector error, fail to execute sql: %s', sql)
35
      raise e
36
    except Exception, e:
37
      logging.exception('normal error, fail to execute sql: %s', sql)
38
      raise e
39
  def exec_query(self, sql, print_when_succ = True):
40
    try:
41
      self.__cursor.execute(sql)
42
      results = self.__cursor.fetchall()
43
      rowcount = self.__cursor.rowcount
44
      if True == print_when_succ:
45
        logging.info('succeed to execute query: %s, rowcount = %d', sql, rowcount)
46
      return (self.__cursor.description, results)
47
    except mysql.connector.Error, e:
48
      logging.exception('mysql connector error, fail to execute sql: %s', sql)
49
      raise e
50
    except Exception, e:
51
      logging.exception('normal error, fail to execute sql: %s', sql)
52
      raise e
53

54
def set_parameter(cur, parameter, value):
55
  sql = """alter system set {0} = '{1}'""".format(parameter, value)
56
  logging.info(sql)
57
  cur.execute(sql)
58
  wait_parameter_sync(cur, parameter, value)
59

60
def wait_parameter_sync(cur, key, value):
61
  sql = """select count(*) as cnt from oceanbase.__all_virtual_sys_parameter_stat
62
           where name = '{0}' and value != '{1}'""".format(key, value)
63
  times = 10
64
  while times > 0:
65
    logging.info(sql)
66
    cur.execute(sql)
67
    result = cur.fetchall()
68
    if len(result) != 1 or len(result[0]) != 1:
69
      logging.exception('result cnt not match')
70
      raise e
71
    elif result[0][0] == 0:
72
      logging.info("""{0} is sync, value is {1}""".format(key, value))
73
      break
74
    else:
75
      logging.info("""{0} is not sync, value should be {1}""".format(key, value))
76

77
    times -= 1
78
    if times == 0:
79
      logging.exception("""check {0}:{1} sync timeout""".format(key, value))
80
      raise e
81
    time.sleep(5)
82

83
#### --------------start :  opt.py --------------
84
help_str = \
85
"""
86
Help:
87
""" +\
88
sys.argv[0] + """ [OPTIONS]""" +\
89
'\n\n' +\
90
'-I, --help          Display this help and exit.\n' +\
91
'-V, --version       Output version information and exit.\n' +\
92
'-h, --host=name     Connect to host.\n' +\
93
'-P, --port=name     Port number to use for connection.\n' +\
94
'-u, --user=name     User for login.\n' +\
95
'-t, --timeout=name  Cmd/Query/Inspection execute timeout(s).\n' +\
96
'-p, --password=name Password to use when connecting to server. If password is\n' +\
97
'                    not given it\'s empty string "".\n' +\
98
'-m, --module=name   Modules to run. Modules should be a string combined by some of\n' +\
99
'                    the following strings: ddl, normal_dml, each_tenant_dml,\n' +\
100
'                    system_variable_dml, special_action, all. "all" represents\n' +\
101
'                    that all modules should be run. They are splitted by ",".\n' +\
102
'                    For example: -m all, or --module=ddl,normal_dml,special_action\n' +\
103
'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\
104
'\n\n' +\
105
'Maybe you want to run cmd like that:\n' +\
106
sys.argv[0] + ' -h 127.0.0.1 -P 3306 -u admin -p admin\n'
107

108
version_str = """version 1.0.0"""
109

110
class Option:
111
  __g_short_name_set = set([])
112
  __g_long_name_set = set([])
113
  __short_name = None
114
  __long_name = None
115
  __is_with_param = None
116
  __is_local_opt = None
117
  __has_value = None
118
  __value = None
119
  def __init__(self, short_name, long_name, is_with_param, is_local_opt, default_value = None):
120
    if short_name in Option.__g_short_name_set:
121
      raise MyError('duplicate option short name: {0}'.format(short_name))
122
    elif long_name in Option.__g_long_name_set:
123
      raise MyError('duplicate option long name: {0}'.format(long_name))
124
    Option.__g_short_name_set.add(short_name)
125
    Option.__g_long_name_set.add(long_name)
126
    self.__short_name = short_name
127
    self.__long_name = long_name
128
    self.__is_with_param = is_with_param
129
    self.__is_local_opt = is_local_opt
130
    self.__has_value = False
131
    if None != default_value:
132
      self.set_value(default_value)
133
  def is_with_param(self):
134
    return self.__is_with_param
135
  def get_short_name(self):
136
    return self.__short_name
137
  def get_long_name(self):
138
    return self.__long_name
139
  def has_value(self):
140
    return self.__has_value
141
  def get_value(self):
142
    return self.__value
143
  def set_value(self, value):
144
    self.__value = value
145
    self.__has_value = True
146
  def is_local_opt(self):
147
    return self.__is_local_opt
148
  def is_valid(self):
149
    return None != self.__short_name and None != self.__long_name and True == self.__has_value and None != self.__value
150

151
g_opts =\
152
[\
153
Option('I', 'help', False, True),\
154
Option('V', 'version', False, True),\
155
Option('h', 'host', True, False),\
156
Option('P', 'port', True, False),\
157
Option('u', 'user', True, False),\
158
Option('t', 'timeout', True, False, 0),\
159
Option('p', 'password', True, False, ''),\
160
# 要跑哪个模块,默认全跑
161
Option('m', 'module', True, False, 'all'),\
162
# 日志文件路径,不同脚本的main函数中中会改成不同的默认值
163
Option('l', 'log-file', True, False)
164
]\
165

166
def change_opt_defult_value(opt_long_name, opt_default_val):
167
  global g_opts
168
  for opt in g_opts:
169
    if opt.get_long_name() == opt_long_name:
170
      opt.set_value(opt_default_val)
171
      return
172

173
def has_no_local_opts():
174
  global g_opts
175
  no_local_opts = True
176
  for opt in g_opts:
177
    if opt.is_local_opt() and opt.has_value():
178
      no_local_opts = False
179
  return no_local_opts
180

181
def check_db_client_opts():
182
  global g_opts
183
  for opt in g_opts:
184
    if not opt.is_local_opt() and not opt.has_value():
185
      raise MyError('option "-{0}" has not been specified, maybe you should run "{1} --help" for help'\
186
          .format(opt.get_short_name(), sys.argv[0]))
187

188
def parse_option(opt_name, opt_val):
189
  global g_opts
190
  for opt in g_opts:
191
    if opt_name in (('-' + opt.get_short_name()), ('--' + opt.get_long_name())):
192
      opt.set_value(opt_val)
193

194
def parse_options(argv):
195
  global g_opts
196
  short_opt_str = ''
197
  long_opt_list = []
198
  for opt in g_opts:
199
    if opt.is_with_param():
200
      short_opt_str += opt.get_short_name() + ':'
201
    else:
202
      short_opt_str += opt.get_short_name()
203
  for opt in g_opts:
204
    if opt.is_with_param():
205
      long_opt_list.append(opt.get_long_name() + '=')
206
    else:
207
      long_opt_list.append(opt.get_long_name())
208
  (opts, args) = getopt.getopt(argv, short_opt_str, long_opt_list)
209
  for (opt_name, opt_val) in opts:
210
    parse_option(opt_name, opt_val)
211
  if has_no_local_opts():
212
    check_db_client_opts()
213

214
def deal_with_local_opt(opt):
215
  if 'help' == opt.get_long_name():
216
    global help_str
217
    print help_str
218
  elif 'version' == opt.get_long_name():
219
    global version_str
220
    print version_str
221

222
def deal_with_local_opts():
223
  global g_opts
224
  if has_no_local_opts():
225
    raise MyError('no local options, can not deal with local options')
226
  else:
227
    for opt in g_opts:
228
      if opt.is_local_opt() and opt.has_value():
229
        deal_with_local_opt(opt)
230
        # 只处理一个
231
        return
232

233
def get_opt_host():
234
  global g_opts
235
  for opt in g_opts:
236
    if 'host' == opt.get_long_name():
237
      return opt.get_value()
238

239
def get_opt_port():
240
  global g_opts
241
  for opt in g_opts:
242
    if 'port' == opt.get_long_name():
243
      return opt.get_value()
244

245
def get_opt_user():
246
  global g_opts
247
  for opt in g_opts:
248
    if 'user' == opt.get_long_name():
249
      return opt.get_value()
250

251
def get_opt_password():
252
  global g_opts
253
  for opt in g_opts:
254
    if 'password' == opt.get_long_name():
255
      return opt.get_value()
256

257
def get_opt_timeout():
258
  global g_opts
259
  for opt in g_opts:
260
    if 'timeout' == opt.get_long_name():
261
      return opt.get_value()
262

263
def get_opt_module():
264
  global g_opts
265
  for opt in g_opts:
266
    if 'module' == opt.get_long_name():
267
      return opt.get_value()
268

269
def get_opt_log_file():
270
  global g_opts
271
  for opt in g_opts:
272
    if 'log-file' == opt.get_long_name():
273
      return opt.get_value()
274
#### ---------------end----------------------
275

276
#### --------------start :  do_upgrade_pre.py--------------
277
def config_logging_module(log_filenamme):
278
  logging.basicConfig(level=logging.INFO,\
279
      format='[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s',\
280
      datefmt='%Y-%m-%d %H:%M:%S',\
281
      filename=log_filenamme,\
282
      filemode='w')
283
  # 定义日志打印格式
284
  formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s', '%Y-%m-%d %H:%M:%S')
285
  #######################################
286
  # 定义一个Handler打印INFO及以上级别的日志到sys.stdout
287
  stdout_handler = logging.StreamHandler(sys.stdout)
288
  stdout_handler.setLevel(logging.INFO)
289
  # 设置日志打印格式
290
  stdout_handler.setFormatter(formatter)
291
  # 将定义好的stdout_handler日志handler添加到root logger
292
  logging.getLogger('').addHandler(stdout_handler)
293
#### ---------------end----------------------
294

295

296
fail_list=[]
297

298
def get_version(version_str):
299
  versions = version_str.split(".")
300

301
  if len(versions) != 4:
302
    logging.exception("""version:{0} is invalid""".format(version_str))
303
    raise e
304

305
  major = int(versions[0])
306
  minor = int(versions[1])
307
  major_patch = int(versions[2])
308
  minor_patch = int(versions[3])
309

310
  if major > 0xffffffff or minor > 0xffff or major_patch > 0xff or minor_patch > 0xff:
311
    logging.exception("""version:{0} is invalid""".format(version_str))
312
    raise e
313

314
  version = (major << 32) | (minor << 16) | (major_patch << 8) | (minor_patch)
315
  return version
316

317
#### START ####
318
# 1. 检查前置版本
319
def check_observer_version(query_cur, upgrade_params):
320
  (desc, results) = query_cur.exec_query("""select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'""")
321
  if len(results) != 1:
322
    fail_list.append('min_observer_version is not sync')
323
  elif cmp(results[0][0], upgrade_params.old_version) < 0 :
324
    fail_list.append('old observer version is expected equal or higher then: {0}, actual version:{1}'.format(upgrade_params.old_version, results[0][0]))
325
  logging.info('check observer version success, version = {0}'.format(results[0][0]))
326

327
def check_data_version(query_cur):
328
  min_cluster_version = 0
329
  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
330
  (desc, results) = query_cur.exec_query(sql)
331
  if len(results) != 1:
332
    fail_list.append('min_observer_version is not sync')
333
  elif len(results[0]) != 1:
334
    fail_list.append('column cnt not match')
335
  else:
336
    min_cluster_version = get_version(results[0][0])
337

338
    # check data version
339
    if min_cluster_version < get_version("4.1.0.0"):
340
      # last barrier cluster version should be 4.1.0.0
341
      fail_list.append('last barrier cluster version is 4.1.0.0. prohibit cluster upgrade from cluster version less than 4.1.0.0')
342
    else:
343
      data_version_str = ''
344
      data_version = 0
345
      # check compatible is same
346
      sql = """select distinct value from oceanbase.__all_virtual_tenant_parameter_info where name='compatible'"""
347
      (desc, results) = query_cur.exec_query(sql)
348
      if len(results) != 1:
349
        fail_list.append('compatible is not sync')
350
      elif len(results[0]) != 1:
351
        fail_list.append('column cnt not match')
352
      else:
353
        data_version_str = results[0][0]
354
        data_version = get_version(results[0][0])
355

356
        if data_version < get_version("4.1.0.0"):
357
          # last barrier data version should be 4.1.0.0
358
          fail_list.append('last barrier data version is 4.1.0.0. prohibit cluster upgrade from data version less than 4.1.0.0')
359
        else:
360
          # check target_data_version/current_data_version
361
          sql = "select count(*) from oceanbase.__all_tenant"
362
          (desc, results) = query_cur.exec_query(sql)
363
          if len(results) != 1 or len(results[0]) != 1:
364
            fail_list.append('result cnt not match')
365
          else:
366
            tenant_count = results[0][0]
367

368
            sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0}".format(data_version)
369
            (desc, results) = query_cur.exec_query(sql)
370
            if len(results) != 1 or len(results[0]) != 1:
371
              fail_list.append('result cnt not match')
372
            elif 2 * tenant_count != results[0][0]:
373
              fail_list.append('target_data_version/current_data_version not match with {0}, tenant_cnt:{1}, result_cnt:{2}'.format(data_version_str, tenant_count, results[0][0]))
374
            else:
375
              logging.info("check data version success, all tenant's compatible/target_data_version/current_data_version is {0}".format(data_version_str))
376

377
# 2. 检查paxos副本是否同步, paxos副本是否缺失
378
def check_paxos_replica(query_cur):
379
  # 2.1 检查paxos副本是否同步
380
  (desc, results) = query_cur.exec_query("""select count(1) as unsync_cnt from GV$OB_LOG_STAT where in_sync = 'NO'""")
381
  if results[0][0] > 0 :
382
    fail_list.append('{0} replicas unsync, please check'.format(results[0][0]))
383
  # 2.2 检查paxos副本是否有缺失 TODO
384
  logging.info('check paxos replica success')
385

386
# 3. 检查是否有做balance, locality变更
387
def check_rebalance_task(query_cur):
388
  # 3.1 检查是否有做locality变更
389
  (desc, results) = query_cur.exec_query("""select count(1) as cnt from DBA_OB_TENANT_JOBS where job_status='INPROGRESS' and result_code is null""")
390
  if results[0][0] > 0 :
391
    fail_list.append('{0} locality tasks is doing, please check'.format(results[0][0]))
392
  # 3.2 检查是否有做balance
393
  (desc, results) = query_cur.exec_query("""select count(1) as rebalance_task_cnt from CDB_OB_LS_REPLICA_TASKS""")
394
  if results[0][0] > 0 :
395
    fail_list.append('{0} rebalance tasks is doing, please check'.format(results[0][0]))
396
  logging.info('check rebalance task success')
397

398
# 4. 检查集群状态
399
def check_cluster_status(query_cur):
400
  # 4.1 检查是否非合并状态
401
  (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""")
402
  if results[0][0] > 0 :
403
    fail_list.append('{0} tenant is merging, please check'.format(results[0][0]))
404
  (desc, results) = query_cur.exec_query("""select /*+ query_timeout(1000000000) */ count(1) from __all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0""")
405
  if results[0][0] > 0 :
406
    fail_list.append('{0} tablet is merging, please check'.format(results[0][0]))
407
  logging.info('check cluster status success')
408

409
# 5. 检查是否有异常租户(creating,延迟删除,恢复中)
410
def check_tenant_status(query_cur):
411

412
  # check tenant schema
413
  (desc, results) = query_cur.exec_query("""select count(*) as count from DBA_OB_TENANTS where status != 'NORMAL'""")
414
  if len(results) != 1 or len(results[0]) != 1:
415
    fail_list.append('results len not match')
416
  elif 0 != results[0][0]:
417
    fail_list.append('has abnormal tenant, should stop')
418
  else:
419
    logging.info('check tenant status success')
420

421
  # check tenant info
422
  # don't support restore tenant upgrade
423
  (desc, results) = query_cur.exec_query("""select count(*) as count from oceanbase.__all_virtual_tenant_info where tenant_role != 'PRIMARY' and tenant_role != 'STANDBY'""")
424
  if len(results) != 1 or len(results[0]) != 1:
425
    fail_list.append('results len not match')
426
  elif 0 != results[0][0]:
427
    fail_list.append('has abnormal tenant info, should stop')
428
  else:
429
    logging.info('check tenant info success')
430

431
   # check tenant lock status
432
  (desc, results) = query_cur.exec_query("""select count(*) from DBA_OB_TENANTS where LOCKED = 'YES'""")
433
  if len(results) != 1 or len(results[0]) != 1:
434
    fail_list.append('results len not match')
435
  elif 0 != results[0][0]:
436
    fail_list.append('has locked tenant, should unlock')
437
  else:
438
    logging.info('check tenant lock status success')
439

440
# 6. 检查无恢复任务
441
def check_restore_job_exist(query_cur):
442
  (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_RESTORE_PROGRESS""")
443
  if len(results) != 1 or len(results[0]) != 1:
444
    fail_list.append('failed to restore job cnt')
445
  elif results[0][0] != 0:
446
      fail_list.append("""still has restore job, upgrade is not allowed temporarily""")
447
  logging.info('check restore job success')
448

449
def check_is_primary_zone_distributed(primary_zone_str):
450
  semicolon_pos = len(primary_zone_str)
451
  for i in range(len(primary_zone_str)):
452
    if primary_zone_str[i] == ';':
453
      semicolon_pos = i
454
      break
455
  comma_pos = len(primary_zone_str)
456
  for j in range(len(primary_zone_str)):
457
    if primary_zone_str[j] == ',':
458
      comma_pos = j
459
      break
460
  if comma_pos < semicolon_pos:
461
    return True
462
  else:
463
    return False
464

465
# 7. 升级前需要primary zone只有一个
466
def check_tenant_primary_zone(query_cur):
467
  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
468
  (desc, results) = query_cur.exec_query(sql)
469
  if len(results) != 1:
470
    fail_list.append('min_observer_version is not sync')
471
  elif len(results[0]) != 1:
472
    fail_list.append('column cnt not match')
473
  else:
474
    min_cluster_version = get_version(results[0][0])
475
    if min_cluster_version < get_version("4.1.0.0"):
476
      (desc, results) = query_cur.exec_query("""select tenant_name,primary_zone from DBA_OB_TENANTS where  tenant_id != 1""");
477
      for item in results:
478
        if cmp(item[1], "RANDOM") == 0:
479
          fail_list.append('{0} tenant primary zone random before update not allowed'.format(item[0]))
480
        elif check_is_primary_zone_distributed(item[1]):
481
          fail_list.append('{0} tenant primary zone distributed before update not allowed'.format(item[0]))
482
      logging.info('check tenant primary zone success')
483

484
# 8. 修改永久下线的时间,避免升级过程中缺副本
485
def modify_server_permanent_offline_time(cur):
486
  set_parameter(cur, 'server_permanent_offline_time', '72h')
487

488
# 9. 检查是否有DDL任务在执行
489
def check_ddl_task_execute(query_cur):
490
  (desc, results) = query_cur.exec_query("""select count(1) from __all_virtual_ddl_task_status""")
491
  if 0 != results[0][0]:
492
    fail_list.append("There are DDL task in progress")
493
  logging.info('check ddl task execut status success')
494

495
# 10. 检查无备份任务
496
def check_backup_job_exist(query_cur):
497
  # Backup jobs cannot be in-progress during upgrade.
498
  (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_BACKUP_JOBS""")
499
  if len(results) != 1 or len(results[0]) != 1:
500
    fail_list.append('failed to backup job cnt')
501
  elif results[0][0] != 0:
502
    fail_list.append("""still has backup job, upgrade is not allowed temporarily""")
503
  else:
504
    logging.info('check backup job success')
505

506
# 11. 检查无归档任务
507
def check_archive_job_exist(query_cur):
508
  min_cluster_version = 0
509
  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
510
  (desc, results) = query_cur.exec_query(sql)
511
  if len(results) != 1:
512
    fail_list.append('min_observer_version is not sync')
513
  elif len(results[0]) != 1:
514
    fail_list.append('column cnt not match')
515
  else:
516
    min_cluster_version = get_version(results[0][0])
517

518
    # Archive jobs cannot be in-progress before upgrade from 4.0.
519
    if min_cluster_version < get_version("4.1.0.0"):
520
      (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_ARCHIVELOG where status!='STOP'""")
521
      if len(results) != 1 or len(results[0]) != 1:
522
        fail_list.append('failed to archive job cnt')
523
      elif results[0][0] != 0:
524
        fail_list.append("""still has archive job, upgrade is not allowed temporarily""")
525
      else:
526
        logging.info('check archive job success')
527

528
# 12. 检查归档路径是否清空
529
def check_archive_dest_exist(query_cur):
530
  min_cluster_version = 0
531
  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
532
  (desc, results) = query_cur.exec_query(sql)
533
  if len(results) != 1:
534
    fail_list.append('min_observer_version is not sync')
535
  elif len(results[0]) != 1:
536
    fail_list.append('column cnt not match')
537
  else:
538
    min_cluster_version = get_version(results[0][0])
539
    # archive dest need to be cleaned before upgrade from 4.0.
540
    if min_cluster_version < get_version("4.1.0.0"):
541
      (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_ARCHIVE_DEST""")
542
      if len(results) != 1 or len(results[0]) != 1:
543
        fail_list.append('failed to archive dest cnt')
544
      elif results[0][0] != 0:
545
        fail_list.append("""still has archive destination, upgrade is not allowed temporarily""")
546
      else:
547
        logging.info('check archive destination success')
548

549
# 13. 检查备份路径是否清空
550
def check_backup_dest_exist(query_cur):
551
  min_cluster_version = 0
552
  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
553
  (desc, results) = query_cur.exec_query(sql)
554
  if len(results) != 1:
555
    fail_list.append('min_observer_version is not sync')
556
  elif len(results[0]) != 1:
557
    fail_list.append('column cnt not match')
558
  else:
559
    min_cluster_version = get_version(results[0][0])
560
    # backup dest need to be cleaned before upgrade from 4.0.
561
    if min_cluster_version < get_version("4.1.0.0"):
562
      (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_BACKUP_PARAMETER where name='data_backup_dest' and (value!=NULL or value!='')""")
563
      if len(results) != 1 or len(results[0]) != 1:
564
        fail_list.append('failed to data backup dest cnt')
565
      elif results[0][0] != 0:
566
        fail_list.append("""still has backup destination, upgrade is not allowed temporarily""")
567
      else:
568
        logging.info('check backup destination success')
569

570
def check_server_version(query_cur):
571
    sql = """select distinct(substring_index(build_version, '_', 1)) from __all_server""";
572
    (desc, results) = query_cur.exec_query(sql);
573
    if len(results) != 1:
574
      fail_list.append("servers build_version not match")
575
    else:
576
      logging.info("check server version success")
577

578
# 14. 检查server是否可服务
579
def check_observer_status(query_cur):
580
  (desc, results) = query_cur.exec_query("""select count(*) from oceanbase.__all_server where (start_service_time <= 0 or status != "active")""")
581
  if results[0][0] > 0 :
582
    fail_list.append('{0} observer not available , please check'.format(results[0][0]))
583
  logging.info('check observer status success')
584

585
# 15  检查schema是否刷新成功
586
def check_schema_status(query_cur):
587
  (desc, results) = query_cur.exec_query("""select if (a.cnt = b.cnt, 1, 0) as passed from (select count(*) as cnt from oceanbase.__all_virtual_server_schema_info where refreshed_schema_version > 1 and refreshed_schema_version % 8 = 0) as a join (select count(*) as cnt from oceanbase.__all_server join oceanbase.__all_tenant) as b""")
588
  if results[0][0] != 1 :
589
    fail_list.append('{0} schema not available, please check'.format(results[0][0]))
590
  logging.info('check schema status success')
591

592
# 16. 检查是否存在名为all/all_user/all_meta的租户
593
def check_not_supported_tenant_name(query_cur):
594
  names = ["all", "all_user", "all_meta"]
595
  (desc, results) = query_cur.exec_query("""select tenant_name from oceanbase.DBA_OB_TENANTS""")
596
  for i in range(len(results)):
597
    if results[i][0].lower() in names:
598
      fail_list.append('a tenant named all/all_user/all_meta (case insensitive) cannot exist in the cluster, please rename the tenant')
599
      break
600
  logging.info('check special tenant name success')
601
# 17  检查日志传输压缩是否有使用zlib压缩算法,在升级前需要保证所有observer未开启日志传输压缩或使用非zlib压缩算法
602
def check_log_transport_compress_func(query_cur):
603
  (desc, results) = query_cur.exec_query("""select count(1) as cnt from oceanbase.__all_virtual_tenant_parameter_info where (name like "log_transport_compress_func" and value like "zlib_1.0")""")
604
  if results[0][0] > 0 :
605
    fail_list.append('The zlib compression algorithm is no longer supported with log_transport_compress_func, please replace it with other compression algorithms')
606
  logging.info('check log_transport_compress_func success')
607
# 18 检查升级过程中是否有表使用zlib压缩,在升级前需要保证所有表都不使用zlib压缩
608
def check_table_compress_func(query_cur):
609
  (desc, results) = query_cur.exec_query("""select /*+ query_timeout(1000000000) */ count(1) from __all_virtual_table where (compress_func_name like '%zlib%')""")
610
  if results[0][0] > 0 :
611
    fail_list.append('There are tables use zlib compression, please replace it with other compression algorithms or do not use compression during the upgrade')
612
  logging.info('check table compression method success')
613
# 19 检查升级过程中 table_api/obkv 连接传输是否使用了zlib压缩,在升级前需要保证所有 obkv/table_api 连接未开启zlib压缩传输或者使用非zlib压缩算法
614
def check_table_api_transport_compress_func(query_cur):
615
  (desc, results) = query_cur.exec_query("""select count(1) as cnt from GV$OB_PARAMETERS where (name like "tableapi_transport_compress_func" and value like "zlib%");""")
616
  if results[0][0] > 0 :
617
    fail_list.append('Table api connection is not allowed to use zlib as compression algorithm during the upgrade, please use other compression algorithms by setting table_api_transport_compress_func')
618
  logging.info('check table_api_transport_compress_func success')
619

620
# 17. 检查无租户克隆任务
621
def check_tenant_clone_job_exist(query_cur):
622
  min_cluster_version = 0
623
  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
624
  (desc, results) = query_cur.exec_query(sql)
625
  if len(results) != 1:
626
    fail_list.append('min_observer_version is not sync')
627
  elif len(results[0]) != 1:
628
    fail_list.append('column cnt not match')
629
  else:
630
    min_cluster_version = get_version(results[0][0])
631
    if min_cluster_version >= get_version("4.3.0.0"):
632
      (desc, results) = query_cur.exec_query("""select count(1) from __all_virtual_clone_job""")
633
      if len(results) != 1 or len(results[0]) != 1:
634
        fail_list.append('failed to tenant clone job cnt')
635
      elif results[0][0] != 0:
636
        fail_list.append("""still has tenant clone job, upgrade is not allowed temporarily""")
637
      else:
638
        logging.info('check tenant clone job success')
639

640
# 18. 检查无租户快照任务
641
def check_tenant_snapshot_task_exist(query_cur):
642
  min_cluster_version = 0
643
  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
644
  (desc, results) = query_cur.exec_query(sql)
645
  if len(results) != 1:
646
    fail_list.append('min_observer_version is not sync')
647
  elif len(results[0]) != 1:
648
    fail_list.append('column cnt not match')
649
  else:
650
    min_cluster_version = get_version(results[0][0])
651
    if min_cluster_version >= get_version("4.3.0.0"):
652
      (desc, results) = query_cur.exec_query("""select count(1) from __all_virtual_tenant_snapshot where status!='NORMAL'""")
653
      if len(results) != 1 or len(results[0]) != 1:
654
        fail_list.append('failed to tenant snapshot task')
655
      elif results[0][0] != 0:
656
        fail_list.append("""still has tenant snapshot task, upgrade is not allowed temporarily""")
657
      else:
658
        logging.info('check tenant snapshot task success')
659

660
# 17. 检查是否有租户在升到4.3.0版本之前已将binlog_row_image设为MINIMAL
661
def check_variable_binlog_row_image(query_cur):
662
# 4.3.0.0之前的版本,MINIMAL模式生成的日志CDC无法正常消费(DELETE日志).
663
# 4.3.0版本开始,MINIMAL模式做了改进,支持CDC消费,需要在升级到4.3.0.0之后再打开.
664
  min_cluster_version = 0
665
  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
666
  (desc, results) = query_cur.exec_query(sql)
667
  if len(results) != 1:
668
    fail_list.append('min_observer_version is not sync')
669
  elif len(results[0]) != 1:
670
    fail_list.append('column cnt not match')
671
  else:
672
    min_cluster_version = get_version(results[0][0])
673
    # check cluster version
674
    if min_cluster_version < get_version("4.3.0.0"):
675
      (desc, results) = query_cur.exec_query("""select count(*) from CDB_OB_SYS_VARIABLES where NAME='binlog_row_image' and VALUE = '0'""")
676
      if results[0][0] > 0 :
677
        fail_list.append('Sys Variable binlog_row_image is set to MINIMAL, please check'.format(results[0][0]))
678
    logging.info('check variable binlog_row_image success')
679

680
# last check of do_check, make sure no function execute after check_fail_list
681
def check_fail_list():
682
  if len(fail_list) != 0 :
683
     error_msg ="upgrade checker failed with " + str(len(fail_list)) + " reasons: " + ", ".join(['['+x+"] " for x in fail_list])
684
     raise MyError(error_msg)
685

686
def set_query_timeout(query_cur, timeout):
687
  if timeout != 0:
688
    sql = """set @@session.ob_query_timeout = {0}""".format(timeout * 1000 * 1000)
689
    query_cur.exec_sql(sql)
690

691
# 开始升级前的检查
692
def do_check(my_host, my_port, my_user, my_passwd, timeout, upgrade_params):
693
  try:
694
    conn = mysql.connector.connect(user = my_user,
695
                                   password = my_passwd,
696
                                   host = my_host,
697
                                   port = my_port,
698
                                   database = 'oceanbase',
699
                                   raise_on_warnings = True)
700
    conn.autocommit = True
701
    cur = conn.cursor(buffered=True)
702
    try:
703
      query_cur = Cursor(cur)
704
      set_query_timeout(query_cur, timeout)
705
      check_observer_version(query_cur, upgrade_params)
706
      check_data_version(query_cur)
707
      check_paxos_replica(query_cur)
708
      check_rebalance_task(query_cur)
709
      check_cluster_status(query_cur)
710
      check_tenant_status(query_cur)
711
      check_restore_job_exist(query_cur)
712
      check_tenant_primary_zone(query_cur)
713
      check_ddl_task_execute(query_cur)
714
      check_backup_job_exist(query_cur)
715
      check_archive_job_exist(query_cur)
716
      check_archive_dest_exist(query_cur)
717
      check_backup_dest_exist(query_cur)
718
      check_observer_status(query_cur)
719
      check_schema_status(query_cur)
720
      check_server_version(query_cur)
721
      check_not_supported_tenant_name(query_cur)
722
      check_tenant_clone_job_exist(query_cur)
723
      check_tenant_snapshot_task_exist(query_cur)
724
      check_log_transport_compress_func(query_cur)
725
      check_table_compress_func(query_cur)
726
      check_table_api_transport_compress_func(query_cur)
727
      check_variable_binlog_row_image(query_cur)
728
      # all check func should execute before check_fail_list
729
      check_fail_list()
730
      modify_server_permanent_offline_time(cur)
731
    except Exception, e:
732
      logging.exception('run error')
733
      raise e
734
    finally:
735
      cur.close()
736
      conn.close()
737
  except mysql.connector.Error, e:
738
    logging.exception('connection error')
739
    raise e
740
  except Exception, e:
741
    logging.exception('normal error')
742
    raise e
743

744
if __name__ == '__main__':
745
  upgrade_params = UpgradeParams()
746
  change_opt_defult_value('log-file', upgrade_params.log_filename)
747
  parse_options(sys.argv[1:])
748
  if not has_no_local_opts():
749
    deal_with_local_opts()
750
  else:
751
    check_db_client_opts()
752
    log_filename = get_opt_log_file()
753
    upgrade_params.log_filename = log_filename
754
    # 日志配置放在这里是为了前面的操作不要覆盖掉日志文件
755
    config_logging_module(upgrade_params.log_filename)
756
    try:
757
      host = get_opt_host()
758
      port = int(get_opt_port())
759
      user = get_opt_user()
760
      password = get_opt_password()
761
      timeout = int(get_opt_timeout())
762
      logging.info('parameters from cmd: host=\"%s\", port=%s, user=\"%s\", password=\"%s\", timeout=\"%s\", log-file=\"%s\"',\
763
          host, port, user, password, timeout, log_filename)
764
      do_check(host, port, user, password, timeout, upgrade_params)
765
    except mysql.connector.Error, e:
766
      logging.exception('mysql connctor error')
767
      raise e
768
    except Exception, e:
769
      logging.exception('normal error')
770
      raise e
771

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.