7
from mysql.connector import errorcode
13
log_filename = 'upgrade_checker.log'
14
old_version = '4.0.0.0'
15
#### --------------start : my_error.py --------------
16
class MyError(Exception):
17
def __init__(self, value):
20
return repr(self.value)
21
#### --------------start : actions.py------------
24
def __init__(self, cursor):
25
self.__cursor = cursor
26
def exec_sql(self, sql, print_when_succ = True):
28
self.__cursor.execute(sql)
29
rowcount = self.__cursor.rowcount
30
if True == print_when_succ:
31
logging.info('succeed to execute sql: %s, rowcount = %d', sql, rowcount)
33
except mysql.connector.Error, e:
34
logging.exception('mysql connector error, fail to execute sql: %s', sql)
37
logging.exception('normal error, fail to execute sql: %s', sql)
39
def exec_query(self, sql, print_when_succ = True):
41
self.__cursor.execute(sql)
42
results = self.__cursor.fetchall()
43
rowcount = self.__cursor.rowcount
44
if True == print_when_succ:
45
logging.info('succeed to execute query: %s, rowcount = %d', sql, rowcount)
46
return (self.__cursor.description, results)
47
except mysql.connector.Error, e:
48
logging.exception('mysql connector error, fail to execute sql: %s', sql)
51
logging.exception('normal error, fail to execute sql: %s', sql)
54
def set_parameter(cur, parameter, value):
55
sql = """alter system set {0} = '{1}'""".format(parameter, value)
58
wait_parameter_sync(cur, parameter, value)
60
def wait_parameter_sync(cur, key, value):
61
sql = """select count(*) as cnt from oceanbase.__all_virtual_sys_parameter_stat
62
where name = '{0}' and value != '{1}'""".format(key, value)
67
result = cur.fetchall()
68
if len(result) != 1 or len(result[0]) != 1:
69
logging.exception('result cnt not match')
71
elif result[0][0] == 0:
72
logging.info("""{0} is sync, value is {1}""".format(key, value))
75
logging.info("""{0} is not sync, value should be {1}""".format(key, value))
79
logging.exception("""check {0}:{1} sync timeout""".format(key, value))
83
#### --------------start : opt.py --------------
88
sys.argv[0] + """ [OPTIONS]""" +\
90
'-I, --help Display this help and exit.\n' +\
91
'-V, --version Output version information and exit.\n' +\
92
'-h, --host=name Connect to host.\n' +\
93
'-P, --port=name Port number to use for connection.\n' +\
94
'-u, --user=name User for login.\n' +\
95
'-t, --timeout=name Cmd/Query/Inspection execute timeout(s).\n' +\
96
'-p, --password=name Password to use when connecting to server. If password is\n' +\
97
' not given it\'s empty string "".\n' +\
98
'-m, --module=name Modules to run. Modules should be a string combined by some of\n' +\
99
' the following strings: ddl, normal_dml, each_tenant_dml,\n' +\
100
' system_variable_dml, special_action, all. "all" represents\n' +\
101
' that all modules should be run. They are splitted by ",".\n' +\
102
' For example: -m all, or --module=ddl,normal_dml,special_action\n' +\
103
'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\
105
'Maybe you want to run cmd like that:\n' +\
106
sys.argv[0] + ' -h 127.0.0.1 -P 3306 -u admin -p admin\n'
108
version_str = """version 1.0.0"""
111
__g_short_name_set = set([])
112
__g_long_name_set = set([])
115
__is_with_param = None
116
__is_local_opt = None
119
def __init__(self, short_name, long_name, is_with_param, is_local_opt, default_value = None):
120
if short_name in Option.__g_short_name_set:
121
raise MyError('duplicate option short name: {0}'.format(short_name))
122
elif long_name in Option.__g_long_name_set:
123
raise MyError('duplicate option long name: {0}'.format(long_name))
124
Option.__g_short_name_set.add(short_name)
125
Option.__g_long_name_set.add(long_name)
126
self.__short_name = short_name
127
self.__long_name = long_name
128
self.__is_with_param = is_with_param
129
self.__is_local_opt = is_local_opt
130
self.__has_value = False
131
if None != default_value:
132
self.set_value(default_value)
133
def is_with_param(self):
134
return self.__is_with_param
135
def get_short_name(self):
136
return self.__short_name
137
def get_long_name(self):
138
return self.__long_name
140
return self.__has_value
143
def set_value(self, value):
145
self.__has_value = True
146
def is_local_opt(self):
147
return self.__is_local_opt
149
return None != self.__short_name and None != self.__long_name and True == self.__has_value and None != self.__value
153
Option('I', 'help', False, True),\
154
Option('V', 'version', False, True),\
155
Option('h', 'host', True, False),\
156
Option('P', 'port', True, False),\
157
Option('u', 'user', True, False),\
158
Option('t', 'timeout', True, False, 0),\
159
Option('p', 'password', True, False, ''),\
161
Option('m', 'module', True, False, 'all'),\
162
# 日志文件路径,不同脚本的main函数中中会改成不同的默认值
163
Option('l', 'log-file', True, False)
166
def change_opt_defult_value(opt_long_name, opt_default_val):
169
if opt.get_long_name() == opt_long_name:
170
opt.set_value(opt_default_val)
173
def has_no_local_opts():
177
if opt.is_local_opt() and opt.has_value():
178
no_local_opts = False
181
def check_db_client_opts():
184
if not opt.is_local_opt() and not opt.has_value():
185
raise MyError('option "-{0}" has not been specified, maybe you should run "{1} --help" for help'\
186
.format(opt.get_short_name(), sys.argv[0]))
188
def parse_option(opt_name, opt_val):
191
if opt_name in (('-' + opt.get_short_name()), ('--' + opt.get_long_name())):
192
opt.set_value(opt_val)
194
def parse_options(argv):
199
if opt.is_with_param():
200
short_opt_str += opt.get_short_name() + ':'
202
short_opt_str += opt.get_short_name()
204
if opt.is_with_param():
205
long_opt_list.append(opt.get_long_name() + '=')
207
long_opt_list.append(opt.get_long_name())
208
(opts, args) = getopt.getopt(argv, short_opt_str, long_opt_list)
209
for (opt_name, opt_val) in opts:
210
parse_option(opt_name, opt_val)
211
if has_no_local_opts():
212
check_db_client_opts()
214
def deal_with_local_opt(opt):
215
if 'help' == opt.get_long_name():
218
elif 'version' == opt.get_long_name():
222
def deal_with_local_opts():
224
if has_no_local_opts():
225
raise MyError('no local options, can not deal with local options')
228
if opt.is_local_opt() and opt.has_value():
229
deal_with_local_opt(opt)
236
if 'host' == opt.get_long_name():
237
return opt.get_value()
242
if 'port' == opt.get_long_name():
243
return opt.get_value()
248
if 'user' == opt.get_long_name():
249
return opt.get_value()
251
def get_opt_password():
254
if 'password' == opt.get_long_name():
255
return opt.get_value()
257
def get_opt_timeout():
260
if 'timeout' == opt.get_long_name():
261
return opt.get_value()
266
if 'module' == opt.get_long_name():
267
return opt.get_value()
269
def get_opt_log_file():
272
if 'log-file' == opt.get_long_name():
273
return opt.get_value()
274
#### ---------------end----------------------
276
#### --------------start : do_upgrade_pre.py--------------
277
def config_logging_module(log_filenamme):
278
logging.basicConfig(level=logging.INFO,\
279
format='[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s',\
280
datefmt='%Y-%m-%d %H:%M:%S',\
281
filename=log_filenamme,\
284
formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s', '%Y-%m-%d %H:%M:%S')
285
#######################################
286
# 定义一个Handler打印INFO及以上级别的日志到sys.stdout
287
stdout_handler = logging.StreamHandler(sys.stdout)
288
stdout_handler.setLevel(logging.INFO)
290
stdout_handler.setFormatter(formatter)
291
# 将定义好的stdout_handler日志handler添加到root logger
292
logging.getLogger('').addHandler(stdout_handler)
293
#### ---------------end----------------------
298
def get_version(version_str):
299
versions = version_str.split(".")
301
if len(versions) != 4:
302
logging.exception("""version:{0} is invalid""".format(version_str))
305
major = int(versions[0])
306
minor = int(versions[1])
307
major_patch = int(versions[2])
308
minor_patch = int(versions[3])
310
if major > 0xffffffff or minor > 0xffff or major_patch > 0xff or minor_patch > 0xff:
311
logging.exception("""version:{0} is invalid""".format(version_str))
314
version = (major << 32) | (minor << 16) | (major_patch << 8) | (minor_patch)
319
def check_observer_version(query_cur, upgrade_params):
320
(desc, results) = query_cur.exec_query("""select distinct value from GV$OB_PARAMETERS where name='min_observer_version'""")
321
if len(results) != 1:
322
fail_list.append('min_observer_version is not sync')
323
elif cmp(results[0][0], upgrade_params.old_version) < 0 :
324
fail_list.append('old observer version is expected equal or higher then: {0}, actual version:{1}'.format(upgrade_params.old_version, results[0][0]))
325
logging.info('check observer version success, version = {0}'.format(results[0][0]))
327
def check_data_version(query_cur):
328
min_cluster_version = 0
329
sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
330
(desc, results) = query_cur.exec_query(sql)
331
if len(results) != 1:
332
fail_list.append('min_observer_version is not sync')
333
elif len(results[0]) != 1:
334
fail_list.append('column cnt not match')
336
min_cluster_version = get_version(results[0][0])
339
if min_cluster_version < get_version("4.1.0.0"):
340
# last barrier cluster version should be 4.1.0.0
341
fail_list.append('last barrier cluster version is 4.1.0.0. prohibit cluster upgrade from cluster version less than 4.1.0.0')
343
data_version_str = ''
345
# check compatible is same
346
sql = """select distinct value from oceanbase.__all_virtual_tenant_parameter_info where name='compatible'"""
347
(desc, results) = query_cur.exec_query(sql)
348
if len(results) != 1:
349
fail_list.append('compatible is not sync')
350
elif len(results[0]) != 1:
351
fail_list.append('column cnt not match')
353
data_version_str = results[0][0]
354
data_version = get_version(results[0][0])
356
if data_version < get_version("4.1.0.0"):
357
# last barrier data version should be 4.1.0.0
358
fail_list.append('last barrier data version is 4.1.0.0. prohibit cluster upgrade from data version less than 4.1.0.0')
360
# check target_data_version/current_data_version
361
sql = "select count(*) from oceanbase.__all_tenant"
362
(desc, results) = query_cur.exec_query(sql)
363
if len(results) != 1 or len(results[0]) != 1:
364
fail_list.append('result cnt not match')
366
tenant_count = results[0][0]
368
sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0}".format(data_version)
369
(desc, results) = query_cur.exec_query(sql)
370
if len(results) != 1 or len(results[0]) != 1:
371
fail_list.append('result cnt not match')
372
elif 2 * tenant_count != results[0][0]:
373
fail_list.append('target_data_version/current_data_version not match with {0}, tenant_cnt:{1}, result_cnt:{2}'.format(data_version_str, tenant_count, results[0][0]))
375
logging.info("check data version success, all tenant's compatible/target_data_version/current_data_version is {0}".format(data_version_str))
377
# 2. 检查paxos副本是否同步, paxos副本是否缺失
378
def check_paxos_replica(query_cur):
380
(desc, results) = query_cur.exec_query("""select count(1) as unsync_cnt from GV$OB_LOG_STAT where in_sync = 'NO'""")
381
if results[0][0] > 0 :
382
fail_list.append('{0} replicas unsync, please check'.format(results[0][0]))
383
# 2.2 检查paxos副本是否有缺失 TODO
384
logging.info('check paxos replica success')
386
# 3. 检查是否有做balance, locality变更
387
def check_rebalance_task(query_cur):
388
# 3.1 检查是否有做locality变更
389
(desc, results) = query_cur.exec_query("""select count(1) as cnt from DBA_OB_TENANT_JOBS where job_status='INPROGRESS' and result_code is null""")
390
if results[0][0] > 0 :
391
fail_list.append('{0} locality tasks is doing, please check'.format(results[0][0]))
393
(desc, results) = query_cur.exec_query("""select count(1) as rebalance_task_cnt from CDB_OB_LS_REPLICA_TASKS""")
394
if results[0][0] > 0 :
395
fail_list.append('{0} rebalance tasks is doing, please check'.format(results[0][0]))
396
logging.info('check rebalance task success')
399
def check_cluster_status(query_cur):
401
(desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""")
402
if results[0][0] > 0 :
403
fail_list.append('{0} tenant is merging, please check'.format(results[0][0]))
404
(desc, results) = query_cur.exec_query("""select /*+ query_timeout(1000000000) */ count(1) from __all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0""")
405
if results[0][0] > 0 :
406
fail_list.append('{0} tablet is merging, please check'.format(results[0][0]))
407
logging.info('check cluster status success')
409
# 5. 检查是否有异常租户(creating,延迟删除,恢复中)
410
def check_tenant_status(query_cur):
412
# check tenant schema
413
(desc, results) = query_cur.exec_query("""select count(*) as count from DBA_OB_TENANTS where status != 'NORMAL'""")
414
if len(results) != 1 or len(results[0]) != 1:
415
fail_list.append('results len not match')
416
elif 0 != results[0][0]:
417
fail_list.append('has abnormal tenant, should stop')
419
logging.info('check tenant status success')
422
# don't support restore tenant upgrade
423
(desc, results) = query_cur.exec_query("""select count(*) as count from oceanbase.__all_virtual_tenant_info where tenant_role != 'PRIMARY' and tenant_role != 'STANDBY'""")
424
if len(results) != 1 or len(results[0]) != 1:
425
fail_list.append('results len not match')
426
elif 0 != results[0][0]:
427
fail_list.append('has abnormal tenant info, should stop')
429
logging.info('check tenant info success')
431
# check tenant lock status
432
(desc, results) = query_cur.exec_query("""select count(*) from DBA_OB_TENANTS where LOCKED = 'YES'""")
433
if len(results) != 1 or len(results[0]) != 1:
434
fail_list.append('results len not match')
435
elif 0 != results[0][0]:
436
fail_list.append('has locked tenant, should unlock')
438
logging.info('check tenant lock status success')
441
def check_restore_job_exist(query_cur):
442
(desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_RESTORE_PROGRESS""")
443
if len(results) != 1 or len(results[0]) != 1:
444
fail_list.append('failed to restore job cnt')
445
elif results[0][0] != 0:
446
fail_list.append("""still has restore job, upgrade is not allowed temporarily""")
447
logging.info('check restore job success')
449
def check_is_primary_zone_distributed(primary_zone_str):
450
semicolon_pos = len(primary_zone_str)
451
for i in range(len(primary_zone_str)):
452
if primary_zone_str[i] == ';':
455
comma_pos = len(primary_zone_str)
456
for j in range(len(primary_zone_str)):
457
if primary_zone_str[j] == ',':
460
if comma_pos < semicolon_pos:
465
# 7. 升级前需要primary zone只有一个
466
def check_tenant_primary_zone(query_cur):
467
sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
468
(desc, results) = query_cur.exec_query(sql)
469
if len(results) != 1:
470
fail_list.append('min_observer_version is not sync')
471
elif len(results[0]) != 1:
472
fail_list.append('column cnt not match')
474
min_cluster_version = get_version(results[0][0])
475
if min_cluster_version < get_version("4.1.0.0"):
476
(desc, results) = query_cur.exec_query("""select tenant_name,primary_zone from DBA_OB_TENANTS where tenant_id != 1""");
478
if cmp(item[1], "RANDOM") == 0:
479
fail_list.append('{0} tenant primary zone random before update not allowed'.format(item[0]))
480
elif check_is_primary_zone_distributed(item[1]):
481
fail_list.append('{0} tenant primary zone distributed before update not allowed'.format(item[0]))
482
logging.info('check tenant primary zone success')
484
# 8. 修改永久下线的时间,避免升级过程中缺副本
485
def modify_server_permanent_offline_time(cur):
486
set_parameter(cur, 'server_permanent_offline_time', '72h')
489
def check_ddl_task_execute(query_cur):
490
(desc, results) = query_cur.exec_query("""select count(1) from __all_virtual_ddl_task_status""")
491
if 0 != results[0][0]:
492
fail_list.append("There are DDL task in progress")
493
logging.info('check ddl task execut status success')
496
def check_backup_job_exist(query_cur):
497
# Backup jobs cannot be in-progress during upgrade.
498
(desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_BACKUP_JOBS""")
499
if len(results) != 1 or len(results[0]) != 1:
500
fail_list.append('failed to backup job cnt')
501
elif results[0][0] != 0:
502
fail_list.append("""still has backup job, upgrade is not allowed temporarily""")
504
logging.info('check backup job success')
507
def check_archive_job_exist(query_cur):
508
min_cluster_version = 0
509
sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
510
(desc, results) = query_cur.exec_query(sql)
511
if len(results) != 1:
512
fail_list.append('min_observer_version is not sync')
513
elif len(results[0]) != 1:
514
fail_list.append('column cnt not match')
516
min_cluster_version = get_version(results[0][0])
518
# Archive jobs cannot be in-progress before upgrade from 4.0.
519
if min_cluster_version < get_version("4.1.0.0"):
520
(desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_ARCHIVELOG where status!='STOP'""")
521
if len(results) != 1 or len(results[0]) != 1:
522
fail_list.append('failed to archive job cnt')
523
elif results[0][0] != 0:
524
fail_list.append("""still has archive job, upgrade is not allowed temporarily""")
526
logging.info('check archive job success')
529
def check_archive_dest_exist(query_cur):
530
min_cluster_version = 0
531
sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
532
(desc, results) = query_cur.exec_query(sql)
533
if len(results) != 1:
534
fail_list.append('min_observer_version is not sync')
535
elif len(results[0]) != 1:
536
fail_list.append('column cnt not match')
538
min_cluster_version = get_version(results[0][0])
539
# archive dest need to be cleaned before upgrade from 4.0.
540
if min_cluster_version < get_version("4.1.0.0"):
541
(desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_ARCHIVE_DEST""")
542
if len(results) != 1 or len(results[0]) != 1:
543
fail_list.append('failed to archive dest cnt')
544
elif results[0][0] != 0:
545
fail_list.append("""still has archive destination, upgrade is not allowed temporarily""")
547
logging.info('check archive destination success')
550
def check_backup_dest_exist(query_cur):
551
min_cluster_version = 0
552
sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
553
(desc, results) = query_cur.exec_query(sql)
554
if len(results) != 1:
555
fail_list.append('min_observer_version is not sync')
556
elif len(results[0]) != 1:
557
fail_list.append('column cnt not match')
559
min_cluster_version = get_version(results[0][0])
560
# backup dest need to be cleaned before upgrade from 4.0.
561
if min_cluster_version < get_version("4.1.0.0"):
562
(desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_BACKUP_PARAMETER where name='data_backup_dest' and (value!=NULL or value!='')""")
563
if len(results) != 1 or len(results[0]) != 1:
564
fail_list.append('failed to data backup dest cnt')
565
elif results[0][0] != 0:
566
fail_list.append("""still has backup destination, upgrade is not allowed temporarily""")
568
logging.info('check backup destination success')
570
def check_server_version(query_cur):
571
sql = """select distinct(substring_index(build_version, '_', 1)) from __all_server""";
572
(desc, results) = query_cur.exec_query(sql);
573
if len(results) != 1:
574
fail_list.append("servers build_version not match")
576
logging.info("check server version success")
579
def check_observer_status(query_cur):
580
(desc, results) = query_cur.exec_query("""select count(*) from oceanbase.__all_server where (start_service_time <= 0 or status != "active")""")
581
if results[0][0] > 0 :
582
fail_list.append('{0} observer not available , please check'.format(results[0][0]))
583
logging.info('check observer status success')
586
def check_schema_status(query_cur):
587
(desc, results) = query_cur.exec_query("""select if (a.cnt = b.cnt, 1, 0) as passed from (select count(*) as cnt from oceanbase.__all_virtual_server_schema_info where refreshed_schema_version > 1 and refreshed_schema_version % 8 = 0) as a join (select count(*) as cnt from oceanbase.__all_server join oceanbase.__all_tenant) as b""")
588
if results[0][0] != 1 :
589
fail_list.append('{0} schema not available, please check'.format(results[0][0]))
590
logging.info('check schema status success')
592
# 16. 检查是否存在名为all/all_user/all_meta的租户
593
def check_not_supported_tenant_name(query_cur):
594
names = ["all", "all_user", "all_meta"]
595
(desc, results) = query_cur.exec_query("""select tenant_name from oceanbase.DBA_OB_TENANTS""")
596
for i in range(len(results)):
597
if results[i][0].lower() in names:
598
fail_list.append('a tenant named all/all_user/all_meta (case insensitive) cannot exist in the cluster, please rename the tenant')
600
logging.info('check special tenant name success')
601
# 17 检查日志传输压缩是否有使用zlib压缩算法,在升级前需要保证所有observer未开启日志传输压缩或使用非zlib压缩算法
602
def check_log_transport_compress_func(query_cur):
603
(desc, results) = query_cur.exec_query("""select count(1) as cnt from oceanbase.__all_virtual_tenant_parameter_info where (name like "log_transport_compress_func" and value like "zlib_1.0")""")
604
if results[0][0] > 0 :
605
fail_list.append('The zlib compression algorithm is no longer supported with log_transport_compress_func, please replace it with other compression algorithms')
606
logging.info('check log_transport_compress_func success')
607
# 18 检查升级过程中是否有表使用zlib压缩,在升级前需要保证所有表都不使用zlib压缩
608
def check_table_compress_func(query_cur):
609
(desc, results) = query_cur.exec_query("""select /*+ query_timeout(1000000000) */ count(1) from __all_virtual_table where (compress_func_name like '%zlib%')""")
610
if results[0][0] > 0 :
611
fail_list.append('There are tables use zlib compression, please replace it with other compression algorithms or do not use compression during the upgrade')
612
logging.info('check table compression method success')
613
# 19 检查升级过程中 table_api/obkv 连接传输是否使用了zlib压缩,在升级前需要保证所有 obkv/table_api 连接未开启zlib压缩传输或者使用非zlib压缩算法
614
def check_table_api_transport_compress_func(query_cur):
615
(desc, results) = query_cur.exec_query("""select count(1) as cnt from GV$OB_PARAMETERS where (name like "tableapi_transport_compress_func" and value like "zlib%");""")
616
if results[0][0] > 0 :
617
fail_list.append('Table api connection is not allowed to use zlib as compression algorithm during the upgrade, please use other compression algorithms by setting table_api_transport_compress_func')
618
logging.info('check table_api_transport_compress_func success')
621
def check_tenant_clone_job_exist(query_cur):
622
min_cluster_version = 0
623
sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
624
(desc, results) = query_cur.exec_query(sql)
625
if len(results) != 1:
626
fail_list.append('min_observer_version is not sync')
627
elif len(results[0]) != 1:
628
fail_list.append('column cnt not match')
630
min_cluster_version = get_version(results[0][0])
631
if min_cluster_version >= get_version("4.3.0.0"):
632
(desc, results) = query_cur.exec_query("""select count(1) from __all_virtual_clone_job""")
633
if len(results) != 1 or len(results[0]) != 1:
634
fail_list.append('failed to tenant clone job cnt')
635
elif results[0][0] != 0:
636
fail_list.append("""still has tenant clone job, upgrade is not allowed temporarily""")
638
logging.info('check tenant clone job success')
641
def check_tenant_snapshot_task_exist(query_cur):
642
min_cluster_version = 0
643
sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
644
(desc, results) = query_cur.exec_query(sql)
645
if len(results) != 1:
646
fail_list.append('min_observer_version is not sync')
647
elif len(results[0]) != 1:
648
fail_list.append('column cnt not match')
650
min_cluster_version = get_version(results[0][0])
651
if min_cluster_version >= get_version("4.3.0.0"):
652
(desc, results) = query_cur.exec_query("""select count(1) from __all_virtual_tenant_snapshot where status!='NORMAL'""")
653
if len(results) != 1 or len(results[0]) != 1:
654
fail_list.append('failed to tenant snapshot task')
655
elif results[0][0] != 0:
656
fail_list.append("""still has tenant snapshot task, upgrade is not allowed temporarily""")
658
logging.info('check tenant snapshot task success')
660
# 17. 检查是否有租户在升到4.3.0版本之前已将binlog_row_image设为MINIMAL
661
def check_variable_binlog_row_image(query_cur):
662
# 4.3.0.0之前的版本,MINIMAL模式生成的日志CDC无法正常消费(DELETE日志).
663
# 4.3.0版本开始,MINIMAL模式做了改进,支持CDC消费,需要在升级到4.3.0.0之后再打开.
664
min_cluster_version = 0
665
sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
666
(desc, results) = query_cur.exec_query(sql)
667
if len(results) != 1:
668
fail_list.append('min_observer_version is not sync')
669
elif len(results[0]) != 1:
670
fail_list.append('column cnt not match')
672
min_cluster_version = get_version(results[0][0])
673
# check cluster version
674
if min_cluster_version < get_version("4.3.0.0"):
675
(desc, results) = query_cur.exec_query("""select count(*) from CDB_OB_SYS_VARIABLES where NAME='binlog_row_image' and VALUE = '0'""")
676
if results[0][0] > 0 :
677
fail_list.append('Sys Variable binlog_row_image is set to MINIMAL, please check'.format(results[0][0]))
678
logging.info('check variable binlog_row_image success')
680
# last check of do_check, make sure no function execute after check_fail_list
681
def check_fail_list():
682
if len(fail_list) != 0 :
683
error_msg ="upgrade checker failed with " + str(len(fail_list)) + " reasons: " + ", ".join(['['+x+"] " for x in fail_list])
684
raise MyError(error_msg)
686
def set_query_timeout(query_cur, timeout):
688
sql = """set @@session.ob_query_timeout = {0}""".format(timeout * 1000 * 1000)
689
query_cur.exec_sql(sql)
692
def do_check(my_host, my_port, my_user, my_passwd, timeout, upgrade_params):
694
conn = mysql.connector.connect(user = my_user,
695
password = my_passwd,
698
database = 'oceanbase',
699
raise_on_warnings = True)
700
conn.autocommit = True
701
cur = conn.cursor(buffered=True)
703
query_cur = Cursor(cur)
704
set_query_timeout(query_cur, timeout)
705
check_observer_version(query_cur, upgrade_params)
706
check_data_version(query_cur)
707
check_paxos_replica(query_cur)
708
check_rebalance_task(query_cur)
709
check_cluster_status(query_cur)
710
check_tenant_status(query_cur)
711
check_restore_job_exist(query_cur)
712
check_tenant_primary_zone(query_cur)
713
check_ddl_task_execute(query_cur)
714
check_backup_job_exist(query_cur)
715
check_archive_job_exist(query_cur)
716
check_archive_dest_exist(query_cur)
717
check_backup_dest_exist(query_cur)
718
check_observer_status(query_cur)
719
check_schema_status(query_cur)
720
check_server_version(query_cur)
721
check_not_supported_tenant_name(query_cur)
722
check_tenant_clone_job_exist(query_cur)
723
check_tenant_snapshot_task_exist(query_cur)
724
check_log_transport_compress_func(query_cur)
725
check_table_compress_func(query_cur)
726
check_table_api_transport_compress_func(query_cur)
727
check_variable_binlog_row_image(query_cur)
728
# all check func should execute before check_fail_list
730
modify_server_permanent_offline_time(cur)
732
logging.exception('run error')
737
except mysql.connector.Error, e:
738
logging.exception('connection error')
741
logging.exception('normal error')
744
if __name__ == '__main__':
745
upgrade_params = UpgradeParams()
746
change_opt_defult_value('log-file', upgrade_params.log_filename)
747
parse_options(sys.argv[1:])
748
if not has_no_local_opts():
749
deal_with_local_opts()
751
check_db_client_opts()
752
log_filename = get_opt_log_file()
753
upgrade_params.log_filename = log_filename
754
# 日志配置放在这里是为了前面的操作不要覆盖掉日志文件
755
config_logging_module(upgrade_params.log_filename)
757
host = get_opt_host()
758
port = int(get_opt_port())
759
user = get_opt_user()
760
password = get_opt_password()
761
timeout = int(get_opt_timeout())
762
logging.info('parameters from cmd: host=\"%s\", port=%s, user=\"%s\", password=\"%s\", timeout=\"%s\", log-file=\"%s\"',\
763
host, port, user, password, timeout, log_filename)
764
do_check(host, port, user, password, timeout, upgrade_params)
765
except mysql.connector.Error, e:
766
logging.exception('mysql connctor error')
769
logging.exception('normal error')