8
from mysql.connector import errorcode
13
log_filename = 'upgrade_cluster_health_checker.log'
16
class MyError(Exception):
17
def __init__(self, value):
20
return repr(self.value)
25
def __init__(self, cursor):
26
self.__cursor = cursor
27
def exec_sql(self, sql, print_when_succ = True):
29
self.__cursor.execute(sql)
30
rowcount = self.__cursor.rowcount
31
if True == print_when_succ:
32
logging.info('succeed to execute sql: %s, rowcount = %d', sql, rowcount)
34
except mysql.connector.Error, e:
35
logging.exception('mysql connector error, fail to execute sql: %s', sql)
38
logging.exception('normal error, fail to execute sql: %s', sql)
40
def exec_query(self, sql, print_when_succ = True):
42
self.__cursor.execute(sql)
43
results = self.__cursor.fetchall()
44
rowcount = self.__cursor.rowcount
45
if True == print_when_succ:
46
logging.info('succeed to execute query: %s, rowcount = %d', sql, rowcount)
47
return (self.__cursor.description, results)
48
except mysql.connector.Error, e:
49
logging.exception('mysql connector error, fail to execute sql: %s', sql)
52
logging.exception('normal error, fail to execute sql: %s', sql)
61
sys.argv[0] + """ [OPTIONS]""" +\
63
'-I, --help Display this help and exit.\n' +\
64
'-V, --version Output version information and exit.\n' +\
65
'-h, --host=name Connect to host.\n' +\
66
'-P, --port=name Port number to use for connection.\n' +\
67
'-u, --user=name User for login.\n' +\
68
'-p, --password=name Password to use when connecting to server. If password is\n' +\
69
' not given it\'s empty string "".\n' +\
70
'-m, --module=name Modules to run. Modules should be a string combined by some of\n' +\
71
' the following strings: ddl, normal_dml, each_tenant_dml,\n' +\
72
' system_variable_dml, special_action, all. "all" represents\n' +\
73
' that all modules should be run. They are splitted by ",".\n' +\
74
' For example: -m all, or --module=ddl,normal_dml,special_action\n' +\
75
'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\
76
'-t, --timeout=name check timeout.\n' + \
77
'-z, --zone=name If zone is not specified, check all servers status in cluster. \n' +\
78
' Otherwise, only check servers status in specified zone. \n' + \
80
'Maybe you want to run cmd like that:\n' +\
81
sys.argv[0] + ' -h 127.0.0.1 -P 3306 -u admin -p admin\n'
83
version_str = """version 1.0.0"""
86
__g_short_name_set = set([])
87
__g_long_name_set = set([])
90
__is_with_param = None
94
def __init__(self, short_name, long_name, is_with_param, is_local_opt, default_value = None):
95
if short_name in Option.__g_short_name_set:
96
raise MyError('duplicate option short name: {0}'.format(short_name))
97
elif long_name in Option.__g_long_name_set:
98
raise MyError('duplicate option long name: {0}'.format(long_name))
99
Option.__g_short_name_set.add(short_name)
100
Option.__g_long_name_set.add(long_name)
101
self.__short_name = short_name
102
self.__long_name = long_name
103
self.__is_with_param = is_with_param
104
self.__is_local_opt = is_local_opt
105
self.__has_value = False
106
if None != default_value:
107
self.set_value(default_value)
108
def is_with_param(self):
109
return self.__is_with_param
110
def get_short_name(self):
111
return self.__short_name
112
def get_long_name(self):
113
return self.__long_name
115
return self.__has_value
118
def set_value(self, value):
120
self.__has_value = True
121
def is_local_opt(self):
122
return self.__is_local_opt
124
return None != self.__short_name and None != self.__long_name and True == self.__has_value and None != self.__value
128
Option('I', 'help', False, True),\
129
Option('V', 'version', False, True),\
130
Option('h', 'host', True, False),\
131
Option('P', 'port', True, False),\
132
Option('u', 'user', True, False),\
133
Option('p', 'password', True, False, ''),\
135
Option('m', 'module', True, False, 'all'),\
137
Option('l', 'log-file', True, False),\
138
Option('t', 'timeout', True, False, 0),\
139
Option('z', 'zone', True, False, ''),\
142
def change_opt_defult_value(opt_long_name, opt_default_val):
145
if opt.get_long_name() == opt_long_name:
146
opt.set_value(opt_default_val)
149
def has_no_local_opts():
153
if opt.is_local_opt() and opt.has_value():
154
no_local_opts = False
157
def check_db_client_opts():
160
if not opt.is_local_opt() and not opt.has_value():
161
raise MyError('option "-{0}" has not been specified, maybe you should run "{1} --help" for help'\
162
.format(opt.get_short_name(), sys.argv[0]))
164
def parse_option(opt_name, opt_val):
167
if opt_name in (('-' + opt.get_short_name()), ('--' + opt.get_long_name())):
168
opt.set_value(opt_val)
170
def parse_options(argv):
175
if opt.is_with_param():
176
short_opt_str += opt.get_short_name() + ':'
178
short_opt_str += opt.get_short_name()
180
if opt.is_with_param():
181
long_opt_list.append(opt.get_long_name() + '=')
183
long_opt_list.append(opt.get_long_name())
184
(opts, args) = getopt.getopt(argv, short_opt_str, long_opt_list)
185
for (opt_name, opt_val) in opts:
186
parse_option(opt_name, opt_val)
187
if has_no_local_opts():
188
check_db_client_opts()
190
def deal_with_local_opt(opt):
191
if 'help' == opt.get_long_name():
194
elif 'version' == opt.get_long_name():
198
def deal_with_local_opts():
200
if has_no_local_opts():
201
raise MyError('no local options, can not deal with local options')
204
if opt.is_local_opt() and opt.has_value():
205
deal_with_local_opt(opt)
212
if 'host' == opt.get_long_name():
213
return opt.get_value()
218
if 'port' == opt.get_long_name():
219
return opt.get_value()
224
if 'user' == opt.get_long_name():
225
return opt.get_value()
227
def get_opt_password():
230
if 'password' == opt.get_long_name():
231
return opt.get_value()
236
if 'module' == opt.get_long_name():
237
return opt.get_value()
239
def get_opt_log_file():
242
if 'log-file' == opt.get_long_name():
243
return opt.get_value()
245
def get_opt_timeout():
248
if 'timeout' == opt.get_long_name():
249
return opt.get_value()
254
if 'zone' == opt.get_long_name():
255
return opt.get_value()
259
def config_logging_module(log_filenamme):
260
logging.basicConfig(level=logging.INFO,\
261
format='[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s',\
262
datefmt='%Y-%m-%d %H:%M:%S',\
263
filename=log_filenamme,\
266
formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s', '%Y-%m-%d %H:%M:%S')
269
stdout_handler = logging.StreamHandler(sys.stdout)
270
stdout_handler.setLevel(logging.INFO)
272
stdout_handler.setFormatter(formatter)
274
logging.getLogger('').addHandler(stdout_handler)
277
def check_zone_valid(query_cur, zone):
279
sql = """select count(*) from oceanbase.DBA_OB_ZONES where zone = '{0}'""".format(zone)
280
(desc, results) = query_cur.exec_query(sql);
281
if len(results) != 1 or len(results[0]) != 1:
282
raise MyError("unmatched row/column cnt")
283
elif results[0][0] == 0:
284
raise MyError("zone:{0} doesn't exist".format(zone))
286
logging.info("zone:{0} is valid".format(zone))
288
logging.info("zone is empty, check all servers in cluster")
290
def fetch_tenant_ids(query_cur):
293
(desc, results) = query_cur.exec_query("""select distinct tenant_id from oceanbase.__all_tenant order by tenant_id desc""")
295
tenant_id_list.append(r[0])
296
return tenant_id_list
298
logging.exception('fail to fetch distinct tenant ids')
301
def set_default_timeout_by_tenant(query_cur, timeout, timeout_per_tenant, min_timeout):
303
logging.info("use timeout from opt, timeout(s):{0}".format(timeout))
305
tenant_id_list = fetch_tenant_ids(query_cur)
306
cal_timeout = len(tenant_id_list) * timeout_per_tenant
307
timeout = (cal_timeout if cal_timeout > min_timeout else min_timeout)
308
logging.info("use default timeout caculated by tenants, "
309
"timeout(s):{0}, tenant_count:{1}, "
310
"timeout_per_tenant(s):{2}, min_timeout(s):{3}"
311
.format(timeout, len(tenant_id_list), timeout_per_tenant, min_timeout))
317
def check_server_version_by_zone(query_cur, zone):
319
logging.info("skip check server version by cluster")
321
sql = """select distinct(substring_index(build_version, '_', 1)) from oceanbase.__all_server where zone = '{0}'""".format(zone);
322
(desc, results) = query_cur.exec_query(sql);
323
if len(results) != 1:
324
raise MyError("servers build_version not match")
326
logging.info("check server version success")
329
def check_paxos_replica(query_cur, timeout):
331
sql = """select count(*) from oceanbase.GV$OB_LOG_STAT where in_sync = 'NO'"""
332
wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600)
333
check_until_timeout(query_cur, sql, 0, wait_timeout)
336
logging.info('check paxos replica success')
339
def check_observer_status(query_cur, zone, timeout):
340
sql = """select count(*) from oceanbase.__all_server where (start_service_time <= 0 or status='inactive')"""
342
sql += """ and zone = '{0}'""".format(zone)
343
wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600)
344
check_until_timeout(query_cur, sql, 0, wait_timeout)
347
def check_schema_status(query_cur, timeout):
348
sql = """select if (a.cnt = b.cnt, 1, 0) as passed from (select count(*) as cnt from oceanbase.__all_virtual_server_schema_info where refreshed_schema_version > 1 and refreshed_schema_version % 8 = 0) as a join (select count(*) as cnt from oceanbase.__all_server join oceanbase.__all_tenant) as b"""
349
wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600)
350
check_until_timeout(query_cur, sql, 1, wait_timeout)
353
def check_major_merge(query_cur, timeout):
355
(desc, results) = query_cur.exec_query("""select distinct value from oceanbase.GV$OB_PARAMETERS where name = 'enable_major_freeze';""")
356
if len(results) != 1:
358
elif results[0][0] != 'True':
361
wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600)
362
sql = """select count(1) from oceanbase.CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')"""
363
check_until_timeout(query_cur, sql, 0, wait_timeout)
364
sql2 = """select /*+ query_timeout(1000000000) */ count(1) from oceanbase.__all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0"""
365
check_until_timeout(query_cur, sql2, 0, wait_timeout)
367
def check_until_timeout(query_cur, sql, value, timeout):
370
(desc, results) = query_cur.exec_query(sql)
372
if len(results) != 1 or len(results[0]) != 1:
373
raise MyError("unmatched row/column cnt")
374
elif results[0][0] == value:
375
logging.info("check value is {0} success".format(value))
378
logging.info("value is {0}, expected value is {1}, not matched".format(results[0][0], value))
382
logging.warn("""check {0} job timeout""".format(job_name))
387
def do_check(my_host, my_port, my_user, my_passwd, upgrade_params, timeout, need_check_major_status, zone = ''):
389
conn = mysql.connector.connect(user = my_user,
390
password = my_passwd,
393
database = 'oceanbase',
394
raise_on_warnings = True)
395
conn.autocommit = True
396
cur = conn.cursor(buffered=True)
398
query_cur = QueryCursor(cur)
399
check_zone_valid(query_cur, zone)
400
check_observer_status(query_cur, zone, timeout)
401
check_paxos_replica(query_cur, timeout)
402
check_schema_status(query_cur, timeout)
403
check_server_version_by_zone(query_cur, zone)
404
if True == need_check_major_status:
405
check_major_merge(query_cur, timeout)
407
logging.exception('run error')
412
except mysql.connector.Error, e:
413
logging.exception('connection error')
416
logging.exception('normal error')
419
if __name__ == '__main__':
420
upgrade_params = UpgradeParams()
421
change_opt_defult_value('log-file', upgrade_params.log_filename)
422
parse_options(sys.argv[1:])
423
if not has_no_local_opts():
424
deal_with_local_opts()
426
check_db_client_opts()
427
log_filename = get_opt_log_file()
428
upgrade_params.log_filename = log_filename
430
config_logging_module(upgrade_params.log_filename)
432
host = get_opt_host()
433
port = int(get_opt_port())
434
user = get_opt_user()
435
password = get_opt_password()
436
timeout = int(get_opt_timeout())
437
zone = get_opt_zone()
438
logging.info('parameters from cmd: host=\"%s\", port=%s, user=\"%s\", password=\"%s\", log-file=\"%s\", timeout=%s, zone=\"%s\"', \
439
host, port, user, password, log_filename, timeout, zone)
440
do_check(host, port, user, password, upgrade_params, timeout, False, zone)
441
except mysql.connector.Error, e:
442
logging.exception('mysql connctor error')
445
logging.exception('normal error')