3
####====XXXX======######==== I am a splitter ====######======XXXX====####
6
## -*- coding: utf-8 -*-
7
####====XXXX======######==== I am a splitter ====######======XXXX====####
10
## -*- coding: utf-8 -*-
17
#import mysql.connector
18
#from mysql.connector import errorcode
19
#from my_error import MyError
25
# def __init__(self, action_sql, rollback_sql):
26
# self.action_sql = action_sql
27
# self.rollback_sql = rollback_sql
29
#current_cluster_version = "4.3.0.1"
30
#current_data_version = "4.3.0.1"
32
#g_commit_sql_list = []
34
#def get_current_cluster_version():
35
# return current_cluster_version
37
#def get_current_data_version():
38
# return current_data_version
40
#def refresh_commit_sql_list():
41
# global g_succ_sql_list
42
# global g_commit_sql_list
43
# if len(g_commit_sql_list) < len(g_succ_sql_list):
44
# for i in range(len(g_commit_sql_list), len(g_succ_sql_list)):
45
# g_commit_sql_list.append(g_succ_sql_list[i])
47
#def get_succ_sql_list_str():
48
# global g_succ_sql_list
50
# for i in range(0, len(g_succ_sql_list)):
53
# ret_str += g_succ_sql_list[i].action_sql + ';'
56
#def get_commit_sql_list_str():
57
# global g_commit_sql_list
59
# for i in range(0, len(g_commit_sql_list)):
62
# ret_str += g_commit_sql_list[i].action_sql + ';'
65
#def get_rollback_sql_file_lines_str():
66
# global g_commit_sql_list
68
# g_commit_sql_list_len = len(g_commit_sql_list)
69
# for i in range(0, g_commit_sql_list_len):
72
# idx = g_commit_sql_list_len - 1 - i
73
# ret_str += '/*\n' + g_commit_sql_list[idx].action_sql + ';\n*/\n'
74
# ret_str += g_commit_sql_list[idx].rollback_sql + ';'
77
#def dump_rollback_sql_to_file(rollback_sql_filename):
78
# logging.info('===================== begin to dump rollback sql file ============================')
79
# rollback_sql_file = open(rollback_sql_filename, 'w')
80
# rollback_sql_file.write('# 此文件是回滚用的sql。\n')
81
# rollback_sql_file.write('# 注释的sql是已经成功commit的sql,它的下一条没被注释的sql则是对应的回滚sql。回滚的sql的排序跟commit的sql的排序刚好相反。\n')
82
# rollback_sql_file.write('# 跑升级脚本失败的时候可以参考本文件来进行回滚。\n')
83
# rollback_sql_file.write('\n')
84
# rollback_sql_file_lines_str = get_rollback_sql_file_lines_str()
85
# rollback_sql_file.write(rollback_sql_file_lines_str)
86
# rollback_sql_file.close()
87
# logging.info('=========== succeed to dump rollback sql file to: ' + rollback_sql_filename + '===============')
89
#def check_is_ddl_sql(sql):
90
# word_list = sql.split()
91
# if len(word_list) < 1:
92
# raise MyError('sql is empty, sql="{0}"'.format(sql))
93
# key_word = word_list[0].lower()
94
# if 'create' != key_word and 'alter' != key_word:
95
# raise MyError('sql must be ddl, key_word="{0}", sql="{1}"'.format(key_word, sql))
97
#def check_is_query_sql(sql):
98
# word_list = sql.split()
99
# if len(word_list) < 1:
100
# raise MyError('sql is empty, sql="{0}"'.format(sql))
101
# key_word = word_list[0].lower()
102
# if 'select' != key_word and 'show' != key_word and 'desc' != key_word:
103
# raise MyError('sql must be query, key_word="{0}", sql="{1}"'.format(key_word, sql))
105
#def check_is_update_sql(sql):
106
# word_list = sql.split()
107
# if len(word_list) < 1:
108
# raise MyError('sql is empty, sql="{0}"'.format(sql))
109
# key_word = word_list[0].lower()
110
# if 'insert' != key_word and 'update' != key_word and 'replace' != key_word and 'set' != key_word and 'delete' != key_word:
111
# # 还有类似这种:select @current_ts := now()
112
# if not (len(word_list) >= 3 and 'select' == word_list[0].lower()\
113
# and word_list[1].lower().startswith('@') and ':=' == word_list[2].lower()):
114
# raise MyError('sql must be update, key_word="{0}", sql="{1}"'.format(key_word, sql))
116
#def get_min_cluster_version(cur):
117
# min_cluster_version = 0
118
# sql = """select distinct value from oceanbase.GV$OB_PARAMETERS where name='min_observer_version'"""
121
# results = cur.fetchall()
122
# if len(results) != 1:
123
# logging.exception('min_observer_version is not sync')
125
# elif len(results[0]) != 1:
126
# logging.exception('column cnt not match')
129
# min_cluster_version = get_version(results[0][0])
130
# return min_cluster_version
132
#def set_parameter(cur, parameter, value, timeout = 0):
133
# sql = """alter system set {0} = '{1}'""".format(parameter, value)
136
# wait_parameter_sync(cur, False, parameter, value, timeout)
138
#def set_session_timeout(cur, seconds):
139
# sql = "set @@session.ob_query_timeout = {0}".format(seconds * 1000 * 1000)
143
#def set_default_timeout_by_tenant(cur, timeout, timeout_per_tenant, min_timeout):
145
# logging.info("use timeout from opt, timeout(s):{0}".format(timeout))
147
# query_cur = QueryCursor(cur)
148
# tenant_id_list = fetch_tenant_ids(query_cur)
149
# cal_timeout = len(tenant_id_list) * timeout_per_tenant
150
# timeout = (cal_timeout if cal_timeout > min_timeout else min_timeout)
151
# logging.info("use default timeout caculated by tenants, "
152
# "timeout(s):{0}, tenant_count:{1}, "
153
# "timeout_per_tenant(s):{2}, min_timeout(s):{3}"
154
# .format(timeout, len(tenant_id_list), timeout_per_tenant, min_timeout))
158
#def set_tenant_parameter(cur, parameter, value, timeout = 0):
161
# if get_min_cluster_version(cur) < get_version("4.2.1.0"):
162
# tenants_list = ['all']
164
# tenants_list = ['sys', 'all_user', 'all_meta']
166
# query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60)
168
# set_session_timeout(cur, query_timeout)
170
# for tenants in tenants_list:
171
# sql = """alter system set {0} = '{1}' tenant = '{2}'""".format(parameter, value, tenants)
175
# set_session_timeout(cur, 10)
177
# wait_parameter_sync(cur, True, parameter, value, timeout)
179
#def get_ori_enable_ddl(cur, timeout):
180
# ori_value_str = fetch_ori_enable_ddl(cur)
181
# wait_parameter_sync(cur, False, 'enable_ddl', ori_value_str, timeout)
182
# ori_value = (ori_value_str == 'True')
185
#def fetch_ori_enable_ddl(cur):
187
# sql = """select value from oceanbase.__all_sys_parameter where name = 'enable_ddl'"""
191
# result = cur.fetchall()
193
# if len(result) == 0:
194
# # means default value, is True
196
# elif len(result) != 1 or len(result[0]) != 1:
197
# logging.exception('result cnt not match')
199
# elif result[0][0].lower() in ["1", "true", "on", "yes", 't']:
201
# elif result[0][0].lower() in ["0", "false", "off", "no", 'f']:
204
# logging.exception("""result value is invalid, result:{0}""".format(result[0][0]))
208
## print version like "x.x.x.x"
209
#def print_version(version):
210
# version = int(version)
211
# major = (version >> 32) & 0xffffffff
212
# minor = (version >> 16) & 0xffff
213
# major_patch = (version >> 8) & 0xff
214
# minor_patch = version & 0xff
215
# version_str = "{0}.{1}.{2}.{3}".format(major, minor, major_patch, minor_patch)
217
## version str should like "x.x.x.x"
218
#def get_version(version_str):
219
# versions = version_str.split(".")
221
# if len(versions) != 4:
222
# logging.exception("""version:{0} is invalid""".format(version_str))
225
# major = int(versions[0])
226
# minor = int(versions[1])
227
# major_patch = int(versions[2])
228
# minor_patch = int(versions[3])
230
# if major > 0xffffffff or minor > 0xffff or major_patch > 0xff or minor_patch > 0xff:
231
# logging.exception("""version:{0} is invalid""".format(version_str))
234
# version = (major << 32) | (minor << 16) | (major_patch << 8) | (minor_patch)
237
#def check_server_version_by_cluster(cur):
238
# sql = """select distinct(substring_index(build_version, '_', 1)) from __all_server""";
241
# result = cur.fetchall()
242
# if len(result) != 1:
243
# raise MyError("servers build_version not match")
245
# logging.info("check server version success")
247
#def check_parameter(cur, is_tenant_config, key, value):
248
# table_name = "GV$OB_PARAMETERS" if not is_tenant_config else "__all_virtual_tenant_parameter_info"
249
# sql = """select * from oceanbase.{0}
250
# where name = '{1}' and value = '{2}'""".format(table_name, key, value)
253
# result = cur.fetchall()
261
#def wait_parameter_sync(cur, is_tenant_config, key, value, timeout):
262
# table_name = "GV$OB_PARAMETERS" if not is_tenant_config else "__all_virtual_tenant_parameter_info"
263
# sql = """select count(*) as cnt from oceanbase.{0}
264
# where name = '{1}' and value != '{2}'""".format(table_name, key, value)
268
# if not is_tenant_config or timeout > 0:
269
# wait_timeout = (timeout if timeout > 0 else 60)
270
# query_timeout = wait_timeout
272
# # is_tenant_config & timeout not set
273
# wait_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60)
274
# query_timeout = set_default_timeout_by_tenant(cur, timeout, 2, 60)
276
# set_session_timeout(cur, query_timeout)
278
# times = wait_timeout / 5
282
# result = cur.fetchall()
283
# if len(result) != 1 or len(result[0]) != 1:
284
# logging.exception('result cnt not match')
286
# elif result[0][0] == 0:
287
# logging.info("""{0} is sync, value is {1}""".format(key, value))
290
# logging.info("""{0} is not sync, value should be {1}""".format(key, value))
294
# logging.exception("""check {0}:{1} sync timeout""".format(key, value))
298
# set_session_timeout(cur, 10)
300
#def do_begin_upgrade(cur, timeout):
302
# if not check_parameter(cur, False, "enable_upgrade_mode", "True"):
303
# action_sql = "alter system begin upgrade"
304
# rollback_sql = "alter system end upgrade"
305
# logging.info(action_sql)
307
# cur.execute(action_sql)
309
# global g_succ_sql_list
310
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
312
# wait_parameter_sync(cur, False, "enable_upgrade_mode", "True", timeout)
315
#def do_begin_rolling_upgrade(cur, timeout):
317
# if not check_parameter(cur, False, "_upgrade_stage", "DBUPGRADE"):
318
# action_sql = "alter system begin rolling upgrade"
319
# rollback_sql = "alter system end upgrade"
321
# logging.info(action_sql)
322
# cur.execute(action_sql)
324
# global g_succ_sql_list
325
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
327
# wait_parameter_sync(cur, False, "_upgrade_stage", "DBUPGRADE", timeout)
330
#def do_end_rolling_upgrade(cur, timeout):
332
# # maybe in upgrade_post_check stage or never run begin upgrade
333
# if check_parameter(cur, False, "enable_upgrade_mode", "False"):
336
# current_cluster_version = get_current_cluster_version()
337
# if not check_parameter(cur, False, "_upgrade_stage", "POSTUPGRADE") or not check_parameter(cur, False, "min_observer_version", current_cluster_version):
338
# action_sql = "alter system end rolling upgrade"
339
# rollback_sql = "alter system end upgrade"
341
# logging.info(action_sql)
342
# cur.execute(action_sql)
344
# global g_succ_sql_list
345
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
347
# wait_parameter_sync(cur, False, "min_observer_version", current_data_version, timeout)
348
# wait_parameter_sync(cur, False, "_upgrade_stage", "POSTUPGRADE", timeout)
351
#def do_end_upgrade(cur, timeout):
353
# if not check_parameter(cur, False, "enable_upgrade_mode", "False"):
354
# action_sql = "alter system end upgrade"
357
# logging.info(action_sql)
358
# cur.execute(action_sql)
360
# global g_succ_sql_list
361
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
363
# wait_parameter_sync(cur, False, "enable_upgrade_mode", "False", timeout)
365
#def do_suspend_merge(cur, timeout):
367
# if get_min_cluster_version(cur) < get_version("4.2.1.0"):
368
# tenants_list = ['all']
370
# tenants_list = ['sys', 'all_user', 'all_meta']
372
# query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60)
374
# set_session_timeout(cur, query_timeout)
376
# for tenants in tenants_list:
377
# action_sql = "alter system suspend merge tenant = {0}".format(tenants)
378
# rollback_sql = "alter system resume merge tenant = {0}".format(tenants)
379
# logging.info(action_sql)
380
# cur.execute(action_sql)
382
# set_session_timeout(cur, 10)
384
#def do_resume_merge(cur, timeout):
386
# if get_min_cluster_version(cur) < get_version("4.2.1.0"):
387
# tenants_list = ['all']
389
# tenants_list = ['sys', 'all_user', 'all_meta']
391
# query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60)
393
# set_session_timeout(cur, query_timeout)
395
# for tenants in tenants_list:
396
# action_sql = "alter system resume merge tenant = {0}".format(tenants)
397
# rollback_sql = "alter system suspend merge tenant = {0}".format(tenants)
398
# logging.info(action_sql)
399
# cur.execute(action_sql)
401
# set_session_timeout(cur, 10)
405
# def __init__(self, cursor):
406
# self.__cursor = cursor
407
# def exec_sql(self, sql, print_when_succ = True):
409
# self.__cursor.execute(sql)
410
# rowcount = self.__cursor.rowcount
411
# if True == print_when_succ:
412
# logging.info('succeed to execute sql: %s, rowcount = %d', sql, rowcount)
414
# except mysql.connector.Error, e:
415
# logging.exception('mysql connector error, fail to execute sql: %s', sql)
417
# except Exception, e:
418
# logging.exception('normal error, fail to execute sql: %s', sql)
420
# def exec_query(self, sql, print_when_succ = True):
422
# self.__cursor.execute(sql)
423
# results = self.__cursor.fetchall()
424
# rowcount = self.__cursor.rowcount
425
# if True == print_when_succ:
426
# logging.info('succeed to execute query: %s, rowcount = %d', sql, rowcount)
427
# return (self.__cursor.description, results)
428
# except mysql.connector.Error, e:
429
# logging.exception('mysql connector error, fail to execute sql: %s', sql)
431
# except Exception, e:
432
# logging.exception('normal error, fail to execute sql: %s', sql)
437
# def __init__(self, cursor):
438
# self._cursor = Cursor(cursor)
439
# def exec_ddl(self, sql, print_when_succ = True):
441
# # 这里检查是不是ddl,不是ddl就抛错
442
# check_is_ddl_sql(sql)
443
# return self._cursor.exec_sql(sql, print_when_succ)
444
# except Exception, e:
445
# logging.exception('fail to execute ddl: %s', sql)
450
# def __init__(self, cursor):
451
# self._cursor = Cursor(cursor)
452
# def exec_query(self, sql, print_when_succ = True):
454
# # 这里检查是不是query,不是query就抛错
455
# check_is_query_sql(sql)
456
# return self._cursor.exec_query(sql, print_when_succ)
457
# except Exception, e:
458
# logging.exception('fail to execute dml query: %s', sql)
461
#class DMLCursor(QueryCursor):
462
# def exec_update(self, sql, print_when_succ = True):
464
# # 这里检查是不是update,不是update就抛错
465
# check_is_update_sql(sql)
466
# return self._cursor.exec_sql(sql, print_when_succ)
467
# except Exception, e:
468
# logging.exception('fail to execute dml update: %s', sql)
471
#class BaseDDLAction():
473
# _query_cursor = None
474
# def __init__(self, cursor):
475
# self.__ddl_cursor = DDLCursor(cursor)
476
# self._query_cursor = QueryCursor(cursor)
477
# def do_action(self):
478
# global g_succ_sql_list
479
# action_sql = self.get_action_ddl()
480
# rollback_sql = self.get_rollback_sql()
481
# self.__ddl_cursor.exec_ddl(action_sql)
482
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
483
# # ddl马上就提交了,因此刷新g_commit_sql_list
484
# refresh_commit_sql_list()
486
#class BaseDMLAction():
488
# _query_cursor = None
489
# def __init__(self, cursor):
490
# self.__dml_cursor = DMLCursor(cursor)
491
# self._query_cursor = QueryCursor(cursor)
492
# def do_action(self):
493
# global g_succ_sql_list
494
# action_sql = self.get_action_dml()
495
# rollback_sql = self.get_rollback_sql()
496
# self.__dml_cursor.exec_update(action_sql)
497
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
499
#class BaseEachTenantDMLAction():
501
# _query_cursor = None
502
# _tenant_id_list = None
504
# def __init__(self, cursor, tenant_id_list):
505
# self.__dml_cursor = DMLCursor(cursor)
506
# self._query_cursor = QueryCursor(cursor)
507
# self._tenant_id_list = tenant_id_list
508
# self._cursor = Cursor(cursor)
509
# def get_tenant_id_list(self):
510
# return self._tenant_id_list
511
# def do_each_tenant_action(self, tenant_id):
512
# global g_succ_sql_list
513
# action_sql = self.get_each_tenant_action_dml(tenant_id)
514
# rollback_sql = self.get_each_tenant_rollback_sql(tenant_id)
515
# self.__dml_cursor.exec_update(action_sql)
516
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
518
#class BaseEachTenantDDLAction():
520
# _query_cursor = None
521
# _tenant_id_list = None
522
# _all_table_name = "__all_table"
523
# def __init__(self, cursor, tenant_id_list):
524
# self.__ddl_cursor = DDLCursor(cursor)
525
# self._query_cursor = QueryCursor(cursor)
526
# self._tenant_id_list = tenant_id_list
527
# def get_tenant_id_list(self):
528
# return self._tenant_id_list
529
# def get_all_table_name(self):
530
# return self._all_table_name
531
# def set_all_table_name(self, table_name):
532
# self._all_table_name = table_name
533
# def do_each_tenant_action(self, tenant_id):
534
# global g_succ_sql_list
535
# action_sql = self.get_each_tenant_action_ddl(tenant_id)
536
# rollback_sql = self.get_each_tenant_rollback_sql(tenant_id)
537
# self.__ddl_cursor.exec_ddl(action_sql)
538
# g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
539
# # ddl马上就提交了,因此刷新g_commit_sql_list
540
# refresh_commit_sql_list()
542
#def actions_cls_compare(x, y):
543
# diff = x.get_seq_num() - y.get_seq_num()
545
# raise MyError('seq num is equal')
551
#def reflect_action_cls_list(action_module, action_name_prefix):
552
# action_cls_list = []
553
# cls_from_actions = dir(action_module)
554
# for cls in cls_from_actions:
555
# if cls.startswith(action_name_prefix):
556
# action_cls = getattr(action_module, cls)
557
# action_cls_list.append(action_cls)
558
# action_cls_list.sort(actions_cls_compare)
559
# return action_cls_list
561
#def fetch_observer_version(cur):
562
# sql = """select distinct value from __all_virtual_sys_parameter_stat where name='min_observer_version'"""
565
# result = cur.fetchall()
566
# if len(result) != 1:
567
# raise MyError('query results count is not 1')
569
# logging.info('get observer version success, version = {0}'.format(result[0][0]))
572
#def fetch_tenant_ids(query_cur):
575
# (desc, results) = query_cur.exec_query("""select distinct tenant_id from oceanbase.__all_tenant order by tenant_id desc""")
577
# tenant_id_list.append(r[0])
578
# return tenant_id_list
579
# except Exception, e:
580
# logging.exception('fail to fetch distinct tenant ids')
583
####====XXXX======######==== I am a splitter ====######======XXXX====####
585
##!/usr/bin/env python
586
## -*- coding: utf-8 -*-
588
#pre_upgrade_log_filename = 'upgrade_pre.log'
589
#pre_upgrade_sql_filename = 'upgrade_sql_pre.txt'
590
#pre_upgrade_rollback_sql_filename = 'rollback_sql_pre.txt'
592
#post_upgrade_log_filename = 'upgrade_post.log'
593
#post_upgrade_sql_filename = 'upgrade_sql_post.txt'
594
#post_upgrade_rollback_sql_filename = 'rollback_sql_post.txt'
596
####====XXXX======######==== I am a splitter ====######======XXXX====####
597
#filename:do_upgrade_post.py
598
##!/usr/bin/env python
599
## -*- coding: utf-8 -*-
601
#from my_error import MyError
603
#import mysql.connector
604
#from mysql.connector import errorcode
611
#import upgrade_health_checker
612
#import tenant_upgrade_action
613
#import upgrade_post_checker
615
## 由于用了/*+read_consistency(WEAK) */来查询,因此升级期间不能允许创建或删除租户
618
# log_filename = config.post_upgrade_log_filename
619
# sql_dump_filename = config.post_upgrade_sql_filename
620
# rollback_sql_filename = config.post_upgrade_rollback_sql_filename
622
#def config_logging_module(log_filenamme):
623
# logging.basicConfig(level=logging.INFO,\
624
# format='[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s',\
625
# datefmt='%Y-%m-%d %H:%M:%S',\
626
# filename=log_filenamme,\
629
# formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s', '%Y-%m-%d %H:%M:%S')
630
# #######################################
631
# # 定义一个Handler打印INFO及以上级别的日志到sys.stdout
632
# stdout_handler = logging.StreamHandler(sys.stdout)
633
# stdout_handler.setLevel(logging.INFO)
635
# stdout_handler.setFormatter(formatter)
636
# # 将定义好的stdout_handler日志handler添加到root logger
637
# logging.getLogger('').addHandler(stdout_handler)
640
# logging.info('==================================================================================')
641
# logging.info('============================== STATISTICS BEGIN ==================================')
642
# logging.info('==================================================================================')
643
# logging.info('succeed run sql(except sql of special actions): \n\n%s\n', actions.get_succ_sql_list_str())
644
# logging.info('commited sql(except sql of special actions): \n\n%s\n', actions.get_commit_sql_list_str())
645
# logging.info('==================================================================================')
646
# logging.info('=============================== STATISTICS END ===================================')
647
# logging.info('==================================================================================')
649
#def do_upgrade(my_host, my_port, my_user, my_passwd, timeout, my_module_set, upgrade_params):
651
# conn = mysql.connector.connect(user = my_user,
652
# password = my_passwd,
655
# database = 'oceanbase',
656
# raise_on_warnings = True)
657
# cur = conn.cursor(buffered=True)
659
# query_cur = actions.QueryCursor(cur)
660
# actions.check_server_version_by_cluster(cur)
663
# if run_modules.MODULE_HEALTH_CHECK in my_module_set:
664
# logging.info('================begin to run health check action ===============')
665
# upgrade_health_checker.do_check(my_host, my_port, my_user, my_passwd, upgrade_params, timeout, False) # need_check_major_status = False
666
# logging.info('================succeed to run health check action ===============')
668
# if run_modules.MODULE_END_ROLLING_UPGRADE in my_module_set:
669
# logging.info('================begin to run end rolling upgrade action ===============')
670
# conn.autocommit = True
671
# actions.do_end_rolling_upgrade(cur, timeout)
672
# conn.autocommit = False
673
# actions.refresh_commit_sql_list()
674
# logging.info('================succeed to run end rolling upgrade action ===============')
676
# if run_modules.MODULE_TENANT_UPRADE in my_module_set:
677
# logging.info('================begin to run tenant upgrade action ===============')
678
# conn.autocommit = True
679
# tenant_upgrade_action.do_upgrade(conn, cur, timeout, my_user, my_passwd)
680
# conn.autocommit = False
681
# actions.refresh_commit_sql_list()
682
# logging.info('================succeed to run tenant upgrade action ===============')
684
# if run_modules.MODULE_END_UPRADE in my_module_set:
685
# logging.info('================begin to run end upgrade action ===============')
686
# conn.autocommit = True
687
# actions.do_end_upgrade(cur, timeout)
688
# conn.autocommit = False
689
# actions.refresh_commit_sql_list()
690
# logging.info('================succeed to run end upgrade action ===============')
692
# if run_modules.MODULE_POST_CHECK in my_module_set:
693
# logging.info('================begin to run post check action ===============')
694
# conn.autocommit = True
695
# upgrade_post_checker.do_check(conn, cur, query_cur, timeout)
696
# conn.autocommit = False
697
# actions.refresh_commit_sql_list()
698
# logging.info('================succeed to run post check action ===============')
700
# except Exception, e:
701
# logging.exception('run error')
707
# # actions.dump_rollback_sql_to_file(upgrade_params.rollback_sql_filename)
710
# except mysql.connector.Error, e:
711
# logging.exception('connection error')
713
# except Exception, e:
714
# logging.exception('normal error')
717
#def do_upgrade_by_argv(argv):
718
# upgrade_params = UpgradeParams()
719
# opts.change_opt_defult_value('log-file', upgrade_params.log_filename)
720
# opts.parse_options(argv)
721
# if not opts.has_no_local_opts():
722
# opts.deal_with_local_opts('upgrade_post')
724
# opts.check_db_client_opts()
725
# log_filename = opts.get_opt_log_file()
726
# upgrade_params.log_filename = log_filename
727
# # 日志配置放在这里是为了前面的操作不要覆盖掉日志文件
728
# config_logging_module(upgrade_params.log_filename)
730
# host = opts.get_opt_host()
731
# port = int(opts.get_opt_port())
732
# user = opts.get_opt_user()
733
# password = opts.get_opt_password()
734
# timeout = int(opts.get_opt_timeout())
735
# cmd_module_str = opts.get_opt_module()
736
# module_set = set([])
737
# all_module_set = run_modules.get_all_module_set()
738
# cmd_module_list = cmd_module_str.split(',')
739
# for cmd_module in cmd_module_list:
740
# if run_modules.ALL_MODULE == cmd_module:
741
# module_set = module_set | all_module_set
742
# elif cmd_module in all_module_set:
743
# module_set.add(cmd_module)
745
# raise MyError('invalid module: {0}'.format(cmd_module))
746
# logging.info('parameters from cmd: host=\"%s\", port=%s, user=\"%s\", password=\"%s\", timeout=\"%s\", module=\"%s\", log-file=\"%s\"',\
747
# host, port, user, password, timeout, module_set, log_filename)
748
# do_upgrade(host, port, user, password, timeout, module_set, upgrade_params)
749
# except mysql.connector.Error, e:
750
# logging.exception('mysql connctor error')
751
# logging.exception('run error, maybe you can reference ' + upgrade_params.rollback_sql_filename + ' to rollback it')
753
# except Exception, e:
754
# logging.exception('normal error')
755
# logging.exception('run error, maybe you can reference ' + upgrade_params.rollback_sql_filename + ' to rollback it')
760
####====XXXX======######==== I am a splitter ====######======XXXX====####
761
#filename:do_upgrade_pre.py
762
##!/usr/bin/env python
763
## -*- coding: utf-8 -*-
765
#from my_error import MyError
767
#import mysql.connector
768
#from mysql.connector import errorcode
775
#import special_upgrade_action_pre
776
#import upgrade_health_checker
778
## 由于用了/*+read_consistency(WEAK) */来查询,因此升级期间不能允许创建或删除租户
781
# log_filename = config.pre_upgrade_log_filename
782
# sql_dump_filename = config.pre_upgrade_sql_filename
783
# rollback_sql_filename = config.pre_upgrade_rollback_sql_filename
785
#def config_logging_module(log_filenamme):
786
# logging.basicConfig(level=logging.INFO,\
787
# format='[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s',\
788
# datefmt='%Y-%m-%d %H:%M:%S',\
789
# filename=log_filenamme,\
792
# formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s', '%Y-%m-%d %H:%M:%S')
793
# #######################################
794
# # 定义一个Handler打印INFO及以上级别的日志到sys.stdout
795
# stdout_handler = logging.StreamHandler(sys.stdout)
796
# stdout_handler.setLevel(logging.INFO)
798
# stdout_handler.setFormatter(formatter)
799
# # 将定义好的stdout_handler日志handler添加到root logger
800
# logging.getLogger('').addHandler(stdout_handler)
803
# logging.info('==================================================================================')
804
# logging.info('============================== STATISTICS BEGIN ==================================')
805
# logging.info('==================================================================================')
806
# logging.info('succeed run sql(except sql of special actions): \n\n%s\n', actions.get_succ_sql_list_str())
807
# logging.info('commited sql(except sql of special actions): \n\n%s\n', actions.get_commit_sql_list_str())
808
# logging.info('==================================================================================')
809
# logging.info('=============================== STATISTICS END ===================================')
810
# logging.info('==================================================================================')
812
#def do_upgrade(my_host, my_port, my_user, my_passwd, timeout, my_module_set, upgrade_params):
814
# conn = mysql.connector.connect(user = my_user,
815
# password = my_passwd,
818
# database = 'oceanbase',
819
# raise_on_warnings = True)
820
# cur = conn.cursor(buffered=True)
822
# query_cur = actions.QueryCursor(cur)
823
# actions.check_server_version_by_cluster(cur)
825
# if run_modules.MODULE_BEGIN_UPGRADE in my_module_set:
826
# logging.info('================begin to run begin upgrade action===============')
827
# conn.autocommit = True
828
# actions.do_begin_upgrade(cur, timeout)
829
# conn.autocommit = False
830
# actions.refresh_commit_sql_list()
831
# logging.info('================succeed to run begin upgrade action===============')
833
# if run_modules.MODULE_BEGIN_ROLLING_UPGRADE in my_module_set:
834
# logging.info('================begin to run begin rolling upgrade action===============')
835
# conn.autocommit = True
836
# actions.do_begin_rolling_upgrade(cur, timeout)
837
# conn.autocommit = False
838
# actions.refresh_commit_sql_list()
839
# logging.info('================succeed to run begin rolling upgrade action===============')
841
# if run_modules.MODULE_SPECIAL_ACTION in my_module_set:
842
# logging.info('================begin to run special action===============')
843
# conn.autocommit = True
844
# special_upgrade_action_pre.do_special_upgrade(conn, cur, timeout, my_user, my_passwd)
845
# conn.autocommit = False
846
# actions.refresh_commit_sql_list()
847
# logging.info('================succeed to run special action===============')
849
# if run_modules.MODULE_HEALTH_CHECK in my_module_set:
850
# logging.info('================begin to run health check action ===============')
851
# upgrade_health_checker.do_check(my_host, my_port, my_user, my_passwd, upgrade_params, timeout, True) # need_check_major_status = True
852
# logging.info('================succeed to run health check action ===============')
854
# except Exception, e:
855
# logging.exception('run error')
861
# # actions.dump_rollback_sql_to_file(upgrade_params.rollback_sql_filename)
864
# except mysql.connector.Error, e:
865
# logging.exception('connection error')
867
# except Exception, e:
868
# logging.exception('normal error')
871
#def do_upgrade_by_argv(argv):
872
# upgrade_params = UpgradeParams()
873
# opts.change_opt_defult_value('log-file', upgrade_params.log_filename)
874
# opts.parse_options(argv)
875
# if not opts.has_no_local_opts():
876
# opts.deal_with_local_opts('upgrade_pre')
878
# opts.check_db_client_opts()
879
# log_filename = opts.get_opt_log_file()
880
# upgrade_params.log_filename = log_filename
881
# # 日志配置放在这里是为了前面的操作不要覆盖掉日志文件
882
# config_logging_module(upgrade_params.log_filename)
884
# host = opts.get_opt_host()
885
# port = int(opts.get_opt_port())
886
# user = opts.get_opt_user()
887
# password = opts.get_opt_password()
888
# timeout = int(opts.get_opt_timeout())
889
# cmd_module_str = opts.get_opt_module()
890
# module_set = set([])
891
# all_module_set = run_modules.get_all_module_set()
892
# cmd_module_list = cmd_module_str.split(',')
893
# for cmd_module in cmd_module_list:
894
# if run_modules.ALL_MODULE == cmd_module:
895
# module_set = module_set | all_module_set
896
# elif cmd_module in all_module_set:
897
# module_set.add(cmd_module)
899
# raise MyError('invalid module: {0}'.format(cmd_module))
900
# logging.info('parameters from cmd: host=\"%s\", port=%s, user=\"%s\", password=\"%s\", timeout=\"%s\", module=\"%s\", log-file=\"%s\"',\
901
# host, port, user, password, timeout, module_set, log_filename)
902
# do_upgrade(host, port, user, password, timeout, module_set, upgrade_params)
903
# except mysql.connector.Error, e:
904
# logging.exception('mysql connctor error')
905
# logging.exception('run error, maybe you can reference ' + upgrade_params.rollback_sql_filename + ' to rollback it')
907
# except Exception, e:
908
# logging.exception('normal error')
909
# logging.exception('run error, maybe you can reference ' + upgrade_params.rollback_sql_filename + ' to rollback it')
914
####====XXXX======######==== I am a splitter ====######======XXXX====####
916
##!/usr/bin/env python
917
## -*- coding: utf-8 -*-
919
#class MyError(Exception):
920
# def __init__(self, value):
923
# return repr(self.value)
924
####====XXXX======######==== I am a splitter ====######======XXXX====####
926
##!/usr/bin/env python
927
## -*- coding: utf-8 -*-
929
#import mysql.connector
930
#from mysql.connector import errorcode
931
#from my_error import MyError
932
#from actions import QueryCursor
935
#def results_to_str(desc, results):
938
# for col_desc in desc:
939
# max_width_list.append(len(str(col_desc[0])))
940
# col_count = len(max_width_list)
941
# for result in results:
942
# if col_count != len(result):
943
# raise MyError('column count is not equal, desc column count: {0}, data column count: {1}'.format(col_count, len(result)))
944
# for i in range(0, col_count):
945
# result_col_width = len(str(result[i]))
946
# if max_width_list[i] < result_col_width:
947
# max_width_list[i] = result_col_width
949
# for i in range(0, col_count):
951
# ret_str += ' ' # 空四格
952
# ret_str += str(desc[i][0])
954
# for j in range(0, max_width_list[i] - len(str(desc[i][0]))):
957
# for result in results:
958
# ret_str += '\n' # 先换行
959
# for i in range(0, col_count):
961
# ret_str += ' ' # 空四格
962
# ret_str += str(result[i])
964
# for j in range(0, max_width_list[i] - len(str(result[i]))):
968
#def query_and_dump_results(query_cur, sql):
969
# (desc, results) = query_cur.exec_query(sql)
970
# result_str = results_to_str(desc, results)
971
# logging.info('dump query results, sql: %s, results:\n%s', sql, result_str)
973
####====XXXX======######==== I am a splitter ====######======XXXX====####
975
##!/usr/bin/env python
976
## -*- coding: utf-8 -*-
978
#from my_error import MyError
987
#sys.argv[0] + """ [OPTIONS]""" +\
989
#'-I, --help Display this help and exit.\n' +\
990
#'-V, --version Output version information and exit.\n' +\
991
#'-h, --host=name Connect to host.\n' +\
992
#'-P, --port=name Port number to use for connection.\n' +\
993
#'-u, --user=name User for login.\n' +\
994
#'-p, --password=name Password to use when connecting to server. If password is\n' +\
995
#' not given it\'s empty string "".\n' +\
996
#'-t, --timeout=name Cmd/Query/Inspection execute timeout(s).\n' +\
997
#'-m, --module=name Modules to run. Modules should be a string combined by some of\n' +\
998
#' the following strings:\n' +\
999
#' 1. begin_upgrade \n' +\
1000
#' 2. begin_rolling_upgrade \n' +\
1001
#' 3. special_action \n' +\
1002
#' 4. health_check \n' +\
1003
#' 5. all: default value, run all sub modules above \n' +\
1004
#' that all modules should be run. They are splitted by ",".\n' +\
1005
#' For example: -m all, or --module=begin_upgrade,begin_rolling_upgrade,special_action\n' +\
1006
#'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\
1008
#'Maybe you want to run cmd like that:\n' +\
1009
#sys.argv[0] + ' -h 127.0.0.1 -P 3306 -u admin -p admin\n'
1015
#sys.argv[0] + """ [OPTIONS]""" +\
1017
#'-I, --help Display this help and exit.\n' +\
1018
#'-V, --version Output version information and exit.\n' +\
1019
#'-h, --host=name Connect to host.\n' +\
1020
#'-P, --port=name Port number to use for connection.\n' +\
1021
#'-u, --user=name User for login.\n' +\
1022
#'-p, --password=name Password to use when connecting to server. If password is\n' +\
1023
#' not given it\'s empty string "".\n' +\
1024
#'-t, --timeout=name Cmd/Query/Inspection execute timeout(s).\n' +\
1025
#'-m, --module=name Modules to run. Modules should be a string combined by some of\n' +\
1026
#' the following strings:\n' +\
1027
#' 1. health_check \n' +\
1028
#' 2. end_rolling_upgrade \n' +\
1029
#' 3. tenant_upgrade \n' +\
1030
#' 4. end_upgrade \n' +\
1031
#' 5. post_check \n' +\
1032
#' 6. all: default value, run all sub modules above \n' +\
1033
#' that all modules should be run. They are splitted by ",".\n' +\
1034
#' For example: -m all, or --module=health_check,end_rolling_upgrade\n' +\
1035
#'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\
1037
#'Maybe you want to run cmd like that:\n' +\
1038
#sys.argv[0] + ' -h 127.0.0.1 -P 3306 -u admin -p admin\n'
1040
#version_str = """version 1.0.0"""
1043
# __g_short_name_set = set([])
1044
# __g_long_name_set = set([])
1045
# __short_name = None
1047
# __is_with_param = None
1048
# __is_local_opt = None
1051
# def __init__(self, short_name, long_name, is_with_param, is_local_opt, default_value = None):
1052
# if short_name in Option.__g_short_name_set:
1053
# raise MyError('duplicate option short name: {0}'.format(short_name))
1054
# elif long_name in Option.__g_long_name_set:
1055
# raise MyError('duplicate option long name: {0}'.format(long_name))
1056
# Option.__g_short_name_set.add(short_name)
1057
# Option.__g_long_name_set.add(long_name)
1058
# self.__short_name = short_name
1059
# self.__long_name = long_name
1060
# self.__is_with_param = is_with_param
1061
# self.__is_local_opt = is_local_opt
1062
# self.__has_value = False
1063
# if None != default_value:
1064
# self.set_value(default_value)
1065
# def is_with_param(self):
1066
# return self.__is_with_param
1067
# def get_short_name(self):
1068
# return self.__short_name
1069
# def get_long_name(self):
1070
# return self.__long_name
1071
# def has_value(self):
1072
# return self.__has_value
1073
# def get_value(self):
1074
# return self.__value
1075
# def set_value(self, value):
1076
# self.__value = value
1077
# self.__has_value = True
1078
# def is_local_opt(self):
1079
# return self.__is_local_opt
1080
# def is_valid(self):
1081
# return None != self.__short_name and None != self.__long_name and True == self.__has_value and None != self.__value
1085
#Option('I', 'help', False, True),\
1086
#Option('V', 'version', False, True),\
1087
#Option('h', 'host', True, False),\
1088
#Option('P', 'port', True, False),\
1089
#Option('u', 'user', True, False),\
1090
#Option('t', 'timeout', True, False, 0),\
1091
#Option('p', 'password', True, False, ''),\
1093
#Option('m', 'module', True, False, 'all'),\
1094
## 日志文件路径,不同脚本的main函数中中会改成不同的默认值
1095
#Option('l', 'log-file', True, False)
1098
#def change_opt_defult_value(opt_long_name, opt_default_val):
1101
# if opt.get_long_name() == opt_long_name:
1102
# opt.set_value(opt_default_val)
1105
#def has_no_local_opts():
1107
# no_local_opts = True
1109
# if opt.is_local_opt() and opt.has_value():
1110
# no_local_opts = False
1111
# return no_local_opts
1113
#def check_db_client_opts():
1116
# if not opt.is_local_opt() and not opt.has_value():
1117
# raise MyError('option "-{0}" has not been specified, maybe you should run "{1} --help" for help'\
1118
# .format(opt.get_short_name(), sys.argv[0]))
1120
#def parse_option(opt_name, opt_val):
1123
# if opt_name in (('-' + opt.get_short_name()), ('--' + opt.get_long_name())):
1124
# opt.set_value(opt_val)
1126
#def parse_options(argv):
1131
# if opt.is_with_param():
1132
# short_opt_str += opt.get_short_name() + ':'
1134
# short_opt_str += opt.get_short_name()
1136
# if opt.is_with_param():
1137
# long_opt_list.append(opt.get_long_name() + '=')
1139
# long_opt_list.append(opt.get_long_name())
1140
# (opts, args) = getopt.getopt(argv, short_opt_str, long_opt_list)
1141
# for (opt_name, opt_val) in opts:
1142
# parse_option(opt_name, opt_val)
1143
# if has_no_local_opts():
1144
# check_db_client_opts()
1146
#def deal_with_local_opt(opt, filename):
1147
# if 'help' == opt.get_long_name():
1148
# if 'upgrade_pre' == filename:
1149
# global pre_help_str
1151
# elif 'upgrade_post' == filename:
1152
# global post_help_str
1153
# print post_help_str
1155
# raise MyError('not supported filename:{0} for help option'.format(filename))
1156
# elif 'version' == opt.get_long_name():
1160
#def deal_with_local_opts(filename):
1162
# if has_no_local_opts():
1163
# raise MyError('no local options, can not deal with local options')
1166
# if opt.is_local_opt() and opt.has_value():
1167
# deal_with_local_opt(opt, filename)
1174
# if 'host' == opt.get_long_name():
1175
# return opt.get_value()
1180
# if 'port' == opt.get_long_name():
1181
# return opt.get_value()
1186
# if 'user' == opt.get_long_name():
1187
# return opt.get_value()
1189
#def get_opt_password():
1192
# if 'password' == opt.get_long_name():
1193
# return opt.get_value()
1195
#def get_opt_timeout():
1198
# if 'timeout' == opt.get_long_name():
1199
# return opt.get_value()
1201
#def get_opt_module():
1204
# if 'module' == opt.get_long_name():
1205
# return opt.get_value()
1207
#def get_opt_log_file():
1210
# if 'log-file' == opt.get_long_name():
1211
# return opt.get_value()
1213
##parse_options(sys.argv[1:])
1215
####====XXXX======######==== I am a splitter ====######======XXXX====####
1216
#filename:reset_upgrade_scripts.py
1217
##!/usr/bin/env python
1218
## -*- coding: utf-8 -*-
1222
#def clear_action_codes(action_filename_list, action_begin_line, \
1223
# action_end_line, is_special_upgrade_code):
1225
# for action_filename in action_filename_list:
1226
# new_action_file_lines = []
1227
# action_file = open(action_filename, 'r')
1228
# action_file_lines = action_file.readlines()
1229
# is_action_codes = False
1230
# for action_file_line in action_file_lines:
1231
# if is_action_codes and action_file_line == (action_end_line + char_enter):
1232
# is_action_codes = False
1233
# if not is_action_codes:
1234
# new_action_file_lines.append(action_file_line)
1235
# if not is_action_codes and action_file_line == (action_begin_line + char_enter):
1236
# is_action_codes = True
1237
# action_file.close()
1238
# new_action_file = open(action_filename, 'w')
1239
# for new_action_file_line in new_action_file_lines:
1240
# if is_special_upgrade_code:
1241
# if new_action_file_line == (action_end_line + char_enter):
1242
# new_action_file.write(' return\n')
1243
# new_action_file.write(new_action_file_line)
1244
# new_action_file.close()
1246
#def regenerate_upgrade_script():
1247
# print('\n=========run gen_upgrade_scripts.py, begin=========\n')
1248
# info = os.popen('./gen_upgrade_scripts.py;')
1250
# print('\n=========run gen_upgrade_scripts.py, end=========\n')
1252
#if __name__ == '__main__':
1253
# action_begin_line = '####========******####======== actions begin ========####******========####'
1254
# action_end_line = '####========******####========= actions end =========####******========####'
1255
# action_filename_list = \
1257
# 'normal_ddl_actions_pre.py',\
1258
# 'normal_ddl_actions_post.py',\
1259
# 'normal_dml_actions_pre.py',\
1260
# 'normal_dml_actions_post.py',\
1261
# 'each_tenant_dml_actions_pre.py',\
1262
# 'each_tenant_dml_actions_post.py',\
1263
# 'each_tenant_ddl_actions_post.py'\
1265
# special_upgrade_filename_list = \
1267
# 'special_upgrade_action_pre.py',\
1268
# 'special_upgrade_action_post.py'
1270
# clear_action_codes(action_filename_list, action_begin_line, action_end_line, False)
1271
# clear_action_codes(special_upgrade_filename_list, action_begin_line, action_end_line, True)
1272
# regenerate_upgrade_script()
1275
####====XXXX======######==== I am a splitter ====######======XXXX====####
1276
#filename:run_modules.py
1277
##!/usr/bin/env python
1278
## -*- coding: utf-8 -*-
1282
## module for upgrade_pre.py
1283
#MODULE_BEGIN_UPGRADE = 'begin_upgrade'
1284
#MODULE_BEGIN_ROLLING_UPGRADE = 'begin_rolling_upgrade'
1285
#MODULE_SPECIAL_ACTION = 'special_action'
1286
##MODULE_HEALTH_CHECK = 'health_check'
1288
## module for upgrade_post.py
1289
#MODULE_HEALTH_CHECK = 'health_check'
1290
#MODULE_END_ROLLING_UPGRADE = 'end_rolling_upgrade'
1291
#MODULE_TENANT_UPRADE = 'tenant_upgrade'
1292
#MODULE_END_UPRADE = 'end_upgrade'
1293
#MODULE_POST_CHECK = 'post_check'
1295
#def get_all_module_set():
1297
# module_set = set([])
1298
# attrs_from_run_module = dir(run_modules)
1299
# for attr in attrs_from_run_module:
1300
# if attr.startswith('MODULE_'):
1301
# module = getattr(run_modules, attr)
1302
# module_set.add(module)
1305
####====XXXX======######==== I am a splitter ====######======XXXX====####
1306
#filename:special_upgrade_action_pre.py
1307
##!/usr/bin/env python
1308
## -*- coding: utf-8 -*-
1310
#from my_error import MyError
1312
#import mysql.connector
1313
#from mysql.connector import errorcode
1317
#from random import Random
1318
#from actions import DMLCursor
1319
#from actions import QueryCursor
1324
#import upgrade_health_checker
1327
#def do_special_upgrade(conn, cur, timeout, user, passwd):
1328
# # special upgrade action
1329
##升级语句对应的action要写在下面的actions begin和actions end这两行之间,
1330
##因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
1331
##这两行之间的这些代码,如果不写在这两行之间的话会导致清空不掉相应的代码。
1332
# current_version = actions.fetch_observer_version(cur)
1333
# target_version = actions.get_current_cluster_version()
1334
# # when upgrade across version, disable enable_ddl/major_freeze
1335
# if current_version != target_version:
1336
# actions.set_parameter(cur, 'enable_ddl', 'False', timeout)
1337
# actions.set_parameter(cur, 'enable_major_freeze', 'False', timeout)
1338
# actions.set_tenant_parameter(cur, '_enable_adaptive_compaction', 'False', timeout)
1339
# # wait scheduler in storage to notice adaptive_compaction is switched to false
1341
# query_cur = actions.QueryCursor(cur)
1342
# wait_major_timeout = 600
1343
# upgrade_health_checker.check_major_merge(query_cur, wait_major_timeout)
1344
# actions.do_suspend_merge(cur, timeout)
1345
# # When upgrading from a version prior to 4.2 to version 4.2, the bloom_filter should be disabled.
1346
# # The param _bloom_filter_enabled is no longer in use as of version 4.2, there is no need to enable it again.
1347
# if actions.get_version(current_version) < actions.get_version('4.2.0.0')\
1348
# and actions.get_version(target_version) >= actions.get_version('4.2.0.0'):
1349
# actions.set_tenant_parameter(cur, '_bloom_filter_enabled', 'False', timeout)
1351
#####========******####======== actions begin ========####******========####
1353
#####========******####========= actions end =========####******========####
1355
#def query(cur, sql):
1358
# results = cur.fetchall()
1364
#def get_oracle_tenant_ids(cur):
1365
# return [_[0] for _ in query(cur, 'select tenant_id from oceanbase.__all_tenant where compatibility_mode = 1')]
1367
#def get_tenant_ids(cur):
1368
# return [_[0] for _ in query(cur, 'select tenant_id from oceanbase.__all_tenant')]
1370
####====XXXX======######==== I am a splitter ====######======XXXX====####
1371
#filename:tenant_upgrade_action.py
1372
##!/usr/bin/env python
1373
## -*- coding: utf-8 -*-
1377
#from actions import Cursor
1378
#from actions import DMLCursor
1379
#from actions import QueryCursor
1380
#import mysql.connector
1381
#from mysql.connector import errorcode
1384
#def do_upgrade(conn, cur, timeout, user, pwd):
1386
##升级语句对应的action要写在下面的actions begin和actions end这两行之间,
1387
##因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
1388
##这两行之间的这些代码,如果不写在这两行之间的话会导致清空不掉相应的代码。
1389
# across_version = upgrade_across_version(cur)
1391
# run_upgrade_job(conn, cur, "UPGRADE_ALL", timeout)
1393
# run_upgrade_job(conn, cur, "UPGRADE_VIRTUAL_SCHEMA", timeout)
1395
# run_root_inspection(cur, timeout)
1396
#####========******####======== actions begin ========####******========####
1397
# upgrade_syslog_level(conn, cur)
1400
#def upgrade_syslog_level(conn, cur):
1402
# cur.execute("""select svr_ip, svr_port, value from oceanbase.__all_virtual_sys_parameter_stat where name = 'syslog_level'""")
1403
# result = cur.fetchall()
1405
# logging.info("syslog level before upgrade: ip: {0}, port: {1}, value: {2}".format(r[0], r[1], r[2]))
1406
# cur.execute("""select count(*) cnt from oceanbase.__all_virtual_sys_parameter_stat where name = 'syslog_level' and value = 'INFO'""")
1407
# result = cur.fetchall()
1408
# info_cnt = result[0][0]
1410
# actions.set_parameter(cur, "syslog_level", "WDIAG")
1411
# except Exception, e:
1412
# logging.warn("upgrade syslog level failed!")
1414
#####========******####========= actions end =========####******========####
1416
#def query(cur, sql):
1419
# results = cur.fetchall()
1422
#def get_tenant_ids(cur):
1423
# return [_[0] for _ in query(cur, 'select tenant_id from oceanbase.__all_tenant')]
1425
#def run_root_inspection(cur, timeout):
1427
# query_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 600)
1429
# actions.set_session_timeout(cur, query_timeout)
1431
# sql = "alter system run job 'root_inspection'"
1435
# actions.set_session_timeout(cur, 10)
1437
#def upgrade_across_version(cur):
1438
# current_data_version = actions.get_current_data_version()
1439
# int_current_data_version = actions.get_version(current_data_version)
1441
# across_version = False
1444
# # 1. check if target_data_version/current_data_version match with current_data_version
1445
# sql = "select count(*) from oceanbase.__all_table where table_name = '__all_virtual_core_table'"
1446
# results = query(cur, sql)
1447
# if len(results) < 1 or len(results[0]) < 1:
1448
# logging.warn("row/column cnt not match")
1450
# elif results[0][0] <= 0:
1451
# # __all_virtual_core_table doesn't exist, this cluster is upgraded from 4.0.0.0
1452
# across_version = True
1455
# tenant_ids = get_tenant_ids(cur)
1456
# if len(tenant_ids) <= 0:
1457
# logging.warn("tenant_ids count is unexpected")
1459
# tenant_count = len(tenant_ids)
1461
# sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0}".format(int_current_data_version)
1462
# results = query(cur, sql)
1463
# if len(results) != 1 or len(results[0]) != 1:
1464
# logging.warn('result cnt not match')
1466
# elif 2 * tenant_count != results[0][0]:
1467
# logging.info('target_data_version/current_data_version not match with {0}, tenant_cnt:{1}, result_cnt:{2}'.format(current_data_version, tenant_count, results[0][0]))
1468
# across_version = True
1470
# logging.info("all tenant's target_data_version/current_data_version are match with {0}".format(current_data_version))
1471
# across_version = False
1473
# # 2. check if compatible match with current_data_version
1474
# if not across_version:
1475
# sql = "select count(*) from oceanbase.__all_virtual_tenant_parameter_info where name = 'compatible' and value != '{0}'".format(current_data_version)
1476
# results = query(cur, sql)
1477
# if len(results) < 1 or len(results[0]) < 1:
1478
# logging.warn("row/column cnt not match")
1480
# elif results[0][0] == 0:
1481
# logging.info("compatible are all matched")
1483
# logging.info("compatible unmatched")
1484
# across_version = True
1486
# return across_version
1488
#def get_max_used_job_id(cur):
1491
# sql = "select job_id from oceanbase.__all_rootservice_job order by job_id desc limit 1"
1492
# results = query(cur, sql)
1494
# if (len(results) == 0):
1496
# elif (len(results) != 1 or len(results[0]) != 1):
1497
# logging.warn("row cnt not match")
1500
# max_job_id = results[0][0]
1502
# logging.info("get max_used_job_id:{0}".format(max_job_id))
1505
# except Exception, e:
1506
# logging.warn("failed to get max_used_job_id")
1509
#def check_can_run_upgrade_job(cur, job_name):
1511
# sql = """select job_status from oceanbase.__all_rootservice_job
1512
# where job_type = '{0}' order by job_id desc limit 1""".format(job_name)
1513
# results = query(cur, sql)
1516
# if (len(results) == 0):
1518
# logging.info("upgrade job not created yet, should run upgrade job")
1519
# elif (len(results) != 1 or len(results[0]) != 1):
1520
# logging.warn("row cnt not match")
1522
# elif ("INPROGRESS" == results[0][0]):
1523
# logging.warn("upgrade job still running, should wait")
1525
# elif ("SUCCESS" == results[0][0]):
1527
# logging.info("maybe upgrade job remained, can run again")
1528
# elif ("FAILED" == results[0][0]):
1530
# logging.info("execute upgrade job failed, should run again")
1532
# logging.warn("invalid job status: {0}".format(results[0][0]))
1536
# except Exception, e:
1537
# logging.warn("failed to check if upgrade job can run")
1540
#def check_upgrade_job_result(cur, job_name, timeout, max_used_job_id):
1542
# wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 100, 3600)
1544
# times = wait_timeout / 10
1545
# while (times >= 0):
1546
# sql = """select job_status, rs_svr_ip, rs_svr_port, gmt_create from oceanbase.__all_rootservice_job
1547
# where job_type = '{0}' and job_id > {1} order by job_id desc limit 1
1548
# """.format(job_name, max_used_job_id)
1549
# results = query(cur, sql)
1551
# if (len(results) == 0):
1552
# logging.info("upgrade job not created yet")
1553
# elif (len(results) != 1 or len(results[0]) != 4):
1554
# logging.warn("row cnt not match")
1556
# elif ("INPROGRESS" == results[0][0]):
1557
# logging.info("upgrade job is still running")
1558
# # check if rs change
1559
# if times % 10 == 0:
1561
# port = results[0][2]
1562
# gmt_create = results[0][3]
1563
# sql = """select count(*) from oceanbase.__all_virtual_core_meta_table where role = 1 and svr_ip = '{0}' and svr_port = {1}""".format(ip, port)
1564
# results = query(cur, sql)
1565
# if (len(results) != 1 or len(results[0]) != 1):
1566
# logging.warn("row/column cnt not match")
1568
# elif results[0][0] == 1:
1569
# sql = """select count(*) from oceanbase.__all_rootservice_event_history where gmt_create > '{0}' and event = 'full_rootservice'""".format(gmt_create)
1570
# results = query(cur, sql)
1571
# if (len(results) != 1 or len(results[0]) != 1):
1572
# logging.warn("row/column cnt not match")
1574
# elif results[0][0] > 0:
1575
# logging.warn("rs changed, should check if upgrade job is still running")
1578
# logging.info("rs[{0}:{1}] still exist, keep waiting".format(ip, port))
1580
# logging.warn("rs changed or not exist, should check if upgrade job is still running")
1582
# elif ("SUCCESS" == results[0][0]):
1583
# logging.info("execute upgrade job successfully")
1585
# elif ("FAILED" == results[0][0]):
1586
# logging.warn("execute upgrade job failed")
1589
# logging.warn("invalid job status: {0}".format(results[0][0]))
1594
# logging.warn("""check {0} job timeout""".format(job_name))
1597
# except Exception, e:
1598
# logging.warn("failed to check upgrade job result")
1601
#def run_upgrade_job(conn, cur, job_name, timeout):
1603
# logging.info("start to run upgrade job, job_name:{0}".format(job_name))
1605
# if check_can_run_upgrade_job(cur, job_name):
1606
# conn.autocommit = True
1607
# # disable enable_ddl
1608
# ori_enable_ddl = actions.get_ori_enable_ddl(cur, timeout)
1609
# if ori_enable_ddl == 0:
1610
# actions.set_parameter(cur, 'enable_ddl', 'True', timeout)
1611
# # enable_sys_table_ddl
1612
# actions.set_parameter(cur, 'enable_sys_table_ddl', 'True', timeout)
1613
# # get max_used_job_id
1614
# max_used_job_id = get_max_used_job_id(cur)
1616
# sql = """alter system run upgrade job '{0}'""".format(job_name)
1619
# # check upgrade job result
1620
# check_upgrade_job_result(cur, job_name, timeout, max_used_job_id)
1621
# # reset enable_sys_table_ddl
1622
# actions.set_parameter(cur, 'enable_sys_table_ddl', 'False', timeout)
1624
# if ori_enable_ddl == 0:
1625
# actions.set_parameter(cur, 'enable_ddl', 'False', timeout)
1626
# except Exception, e:
1627
# logging.warn("run upgrade job failed, :{0}".format(job_name))
1629
# logging.info("run upgrade job success, job_name:{0}".format(job_name))
1630
####====XXXX======######==== I am a splitter ====######======XXXX====####
1631
#filename:upgrade_checker.py
1632
##!/usr/bin/env python
1633
## -*- coding: utf-8 -*-
1637
#import mysql.connector
1638
#from mysql.connector import errorcode
1643
#class UpgradeParams:
1644
# log_filename = 'upgrade_checker.log'
1645
# old_version = '4.0.0.0'
1646
##### --------------start : my_error.py --------------
1647
#class MyError(Exception):
1648
# def __init__(self, value):
1651
# return repr(self.value)
1652
##### --------------start : actions.py------------
1655
# def __init__(self, cursor):
1656
# self.__cursor = cursor
1657
# def exec_sql(self, sql, print_when_succ = True):
1659
# self.__cursor.execute(sql)
1660
# rowcount = self.__cursor.rowcount
1661
# if True == print_when_succ:
1662
# logging.info('succeed to execute sql: %s, rowcount = %d', sql, rowcount)
1664
# except mysql.connector.Error, e:
1665
# logging.exception('mysql connector error, fail to execute sql: %s', sql)
1667
# except Exception, e:
1668
# logging.exception('normal error, fail to execute sql: %s', sql)
1670
# def exec_query(self, sql, print_when_succ = True):
1672
# self.__cursor.execute(sql)
1673
# results = self.__cursor.fetchall()
1674
# rowcount = self.__cursor.rowcount
1675
# if True == print_when_succ:
1676
# logging.info('succeed to execute query: %s, rowcount = %d', sql, rowcount)
1677
# return (self.__cursor.description, results)
1678
# except mysql.connector.Error, e:
1679
# logging.exception('mysql connector error, fail to execute sql: %s', sql)
1681
# except Exception, e:
1682
# logging.exception('normal error, fail to execute sql: %s', sql)
1685
#def set_parameter(cur, parameter, value):
1686
# sql = """alter system set {0} = '{1}'""".format(parameter, value)
1689
# wait_parameter_sync(cur, parameter, value)
1691
#def wait_parameter_sync(cur, key, value):
1692
# sql = """select count(*) as cnt from oceanbase.__all_virtual_sys_parameter_stat
1693
# where name = '{0}' and value != '{1}'""".format(key, value)
1698
# result = cur.fetchall()
1699
# if len(result) != 1 or len(result[0]) != 1:
1700
# logging.exception('result cnt not match')
1702
# elif result[0][0] == 0:
1703
# logging.info("""{0} is sync, value is {1}""".format(key, value))
1706
# logging.info("""{0} is not sync, value should be {1}""".format(key, value))
1710
# logging.exception("""check {0}:{1} sync timeout""".format(key, value))
1714
##### --------------start : opt.py --------------
1719
#sys.argv[0] + """ [OPTIONS]""" +\
1721
#'-I, --help Display this help and exit.\n' +\
1722
#'-V, --version Output version information and exit.\n' +\
1723
#'-h, --host=name Connect to host.\n' +\
1724
#'-P, --port=name Port number to use for connection.\n' +\
1725
#'-u, --user=name User for login.\n' +\
1726
#'-t, --timeout=name Cmd/Query/Inspection execute timeout(s).\n' +\
1727
#'-p, --password=name Password to use when connecting to server. If password is\n' +\
1728
#' not given it\'s empty string "".\n' +\
1729
#'-m, --module=name Modules to run. Modules should be a string combined by some of\n' +\
1730
#' the following strings: ddl, normal_dml, each_tenant_dml,\n' +\
1731
#' system_variable_dml, special_action, all. "all" represents\n' +\
1732
#' that all modules should be run. They are splitted by ",".\n' +\
1733
#' For example: -m all, or --module=ddl,normal_dml,special_action\n' +\
1734
#'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\
1736
#'Maybe you want to run cmd like that:\n' +\
1737
#sys.argv[0] + ' -h 127.0.0.1 -P 3306 -u admin -p admin\n'
1739
#version_str = """version 1.0.0"""
1742
# __g_short_name_set = set([])
1743
# __g_long_name_set = set([])
1744
# __short_name = None
1746
# __is_with_param = None
1747
# __is_local_opt = None
1750
# def __init__(self, short_name, long_name, is_with_param, is_local_opt, default_value = None):
1751
# if short_name in Option.__g_short_name_set:
1752
# raise MyError('duplicate option short name: {0}'.format(short_name))
1753
# elif long_name in Option.__g_long_name_set:
1754
# raise MyError('duplicate option long name: {0}'.format(long_name))
1755
# Option.__g_short_name_set.add(short_name)
1756
# Option.__g_long_name_set.add(long_name)
1757
# self.__short_name = short_name
1758
# self.__long_name = long_name
1759
# self.__is_with_param = is_with_param
1760
# self.__is_local_opt = is_local_opt
1761
# self.__has_value = False
1762
# if None != default_value:
1763
# self.set_value(default_value)
1764
# def is_with_param(self):
1765
# return self.__is_with_param
1766
# def get_short_name(self):
1767
# return self.__short_name
1768
# def get_long_name(self):
1769
# return self.__long_name
1770
# def has_value(self):
1771
# return self.__has_value
1772
# def get_value(self):
1773
# return self.__value
1774
# def set_value(self, value):
1775
# self.__value = value
1776
# self.__has_value = True
1777
# def is_local_opt(self):
1778
# return self.__is_local_opt
1779
# def is_valid(self):
1780
# return None != self.__short_name and None != self.__long_name and True == self.__has_value and None != self.__value
1784
#Option('I', 'help', False, True),\
1785
#Option('V', 'version', False, True),\
1786
#Option('h', 'host', True, False),\
1787
#Option('P', 'port', True, False),\
1788
#Option('u', 'user', True, False),\
1789
#Option('t', 'timeout', True, False, 0),\
1790
#Option('p', 'password', True, False, ''),\
1792
#Option('m', 'module', True, False, 'all'),\
1793
## 日志文件路径,不同脚本的main函数中中会改成不同的默认值
1794
#Option('l', 'log-file', True, False)
1797
#def change_opt_defult_value(opt_long_name, opt_default_val):
1800
# if opt.get_long_name() == opt_long_name:
1801
# opt.set_value(opt_default_val)
1804
#def has_no_local_opts():
1806
# no_local_opts = True
1808
# if opt.is_local_opt() and opt.has_value():
1809
# no_local_opts = False
1810
# return no_local_opts
1812
#def check_db_client_opts():
1815
# if not opt.is_local_opt() and not opt.has_value():
1816
# raise MyError('option "-{0}" has not been specified, maybe you should run "{1} --help" for help'\
1817
# .format(opt.get_short_name(), sys.argv[0]))
1819
#def parse_option(opt_name, opt_val):
1822
# if opt_name in (('-' + opt.get_short_name()), ('--' + opt.get_long_name())):
1823
# opt.set_value(opt_val)
1825
#def parse_options(argv):
1830
# if opt.is_with_param():
1831
# short_opt_str += opt.get_short_name() + ':'
1833
# short_opt_str += opt.get_short_name()
1835
# if opt.is_with_param():
1836
# long_opt_list.append(opt.get_long_name() + '=')
1838
# long_opt_list.append(opt.get_long_name())
1839
# (opts, args) = getopt.getopt(argv, short_opt_str, long_opt_list)
1840
# for (opt_name, opt_val) in opts:
1841
# parse_option(opt_name, opt_val)
1842
# if has_no_local_opts():
1843
# check_db_client_opts()
1845
#def deal_with_local_opt(opt):
1846
# if 'help' == opt.get_long_name():
1849
# elif 'version' == opt.get_long_name():
1853
#def deal_with_local_opts():
1855
# if has_no_local_opts():
1856
# raise MyError('no local options, can not deal with local options')
1859
# if opt.is_local_opt() and opt.has_value():
1860
# deal_with_local_opt(opt)
1867
# if 'host' == opt.get_long_name():
1868
# return opt.get_value()
1873
# if 'port' == opt.get_long_name():
1874
# return opt.get_value()
1879
# if 'user' == opt.get_long_name():
1880
# return opt.get_value()
1882
#def get_opt_password():
1885
# if 'password' == opt.get_long_name():
1886
# return opt.get_value()
1888
#def get_opt_timeout():
1891
# if 'timeout' == opt.get_long_name():
1892
# return opt.get_value()
1894
#def get_opt_module():
1897
# if 'module' == opt.get_long_name():
1898
# return opt.get_value()
1900
#def get_opt_log_file():
1903
# if 'log-file' == opt.get_long_name():
1904
# return opt.get_value()
1905
##### ---------------end----------------------
1907
##### --------------start : do_upgrade_pre.py--------------
1908
#def config_logging_module(log_filenamme):
1909
# logging.basicConfig(level=logging.INFO,\
1910
# format='[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s',\
1911
# datefmt='%Y-%m-%d %H:%M:%S',\
1912
# filename=log_filenamme,\
1915
# formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s', '%Y-%m-%d %H:%M:%S')
1916
# #######################################
1917
# # 定义一个Handler打印INFO及以上级别的日志到sys.stdout
1918
# stdout_handler = logging.StreamHandler(sys.stdout)
1919
# stdout_handler.setLevel(logging.INFO)
1921
# stdout_handler.setFormatter(formatter)
1922
# # 将定义好的stdout_handler日志handler添加到root logger
1923
# logging.getLogger('').addHandler(stdout_handler)
1924
##### ---------------end----------------------
1929
#def get_version(version_str):
1930
# versions = version_str.split(".")
1932
# if len(versions) != 4:
1933
# logging.exception("""version:{0} is invalid""".format(version_str))
1936
# major = int(versions[0])
1937
# minor = int(versions[1])
1938
# major_patch = int(versions[2])
1939
# minor_patch = int(versions[3])
1941
# if major > 0xffffffff or minor > 0xffff or major_patch > 0xff or minor_patch > 0xff:
1942
# logging.exception("""version:{0} is invalid""".format(version_str))
1945
# version = (major << 32) | (minor << 16) | (major_patch << 8) | (minor_patch)
1950
#def check_observer_version(query_cur, upgrade_params):
1951
# (desc, results) = query_cur.exec_query("""select distinct value from GV$OB_PARAMETERS where name='min_observer_version'""")
1952
# if len(results) != 1:
1953
# fail_list.append('min_observer_version is not sync')
1954
# elif cmp(results[0][0], upgrade_params.old_version) < 0 :
1955
# fail_list.append('old observer version is expected equal or higher then: {0}, actual version:{1}'.format(upgrade_params.old_version, results[0][0]))
1956
# logging.info('check observer version success, version = {0}'.format(results[0][0]))
1958
#def check_data_version(query_cur):
1959
# min_cluster_version = 0
1960
# sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
1961
# (desc, results) = query_cur.exec_query(sql)
1962
# if len(results) != 1:
1963
# fail_list.append('min_observer_version is not sync')
1964
# elif len(results[0]) != 1:
1965
# fail_list.append('column cnt not match')
1967
# min_cluster_version = get_version(results[0][0])
1969
# # check data version
1970
# if min_cluster_version < get_version("4.1.0.0"):
1971
# # last barrier cluster version should be 4.1.0.0
1972
# fail_list.append('last barrier cluster version is 4.1.0.0. prohibit cluster upgrade from cluster version less than 4.1.0.0')
1974
# data_version_str = ''
1976
# # check compatible is same
1977
# sql = """select distinct value from oceanbase.__all_virtual_tenant_parameter_info where name='compatible'"""
1978
# (desc, results) = query_cur.exec_query(sql)
1979
# if len(results) != 1:
1980
# fail_list.append('compatible is not sync')
1981
# elif len(results[0]) != 1:
1982
# fail_list.append('column cnt not match')
1984
# data_version_str = results[0][0]
1985
# data_version = get_version(results[0][0])
1987
# if data_version < get_version("4.1.0.0"):
1988
# # last barrier data version should be 4.1.0.0
1989
# fail_list.append('last barrier data version is 4.1.0.0. prohibit cluster upgrade from data version less than 4.1.0.0')
1991
# # check target_data_version/current_data_version
1992
# sql = "select count(*) from oceanbase.__all_tenant"
1993
# (desc, results) = query_cur.exec_query(sql)
1994
# if len(results) != 1 or len(results[0]) != 1:
1995
# fail_list.append('result cnt not match')
1997
# tenant_count = results[0][0]
1999
# sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0}".format(data_version)
2000
# (desc, results) = query_cur.exec_query(sql)
2001
# if len(results) != 1 or len(results[0]) != 1:
2002
# fail_list.append('result cnt not match')
2003
# elif 2 * tenant_count != results[0][0]:
2004
# fail_list.append('target_data_version/current_data_version not match with {0}, tenant_cnt:{1}, result_cnt:{2}'.format(data_version_str, tenant_count, results[0][0]))
2006
# logging.info("check data version success, all tenant's compatible/target_data_version/current_data_version is {0}".format(data_version_str))
2008
## 2. 检查paxos副本是否同步, paxos副本是否缺失
2009
#def check_paxos_replica(query_cur):
2010
# # 2.1 检查paxos副本是否同步
2011
# (desc, results) = query_cur.exec_query("""select count(1) as unsync_cnt from GV$OB_LOG_STAT where in_sync = 'NO'""")
2012
# if results[0][0] > 0 :
2013
# fail_list.append('{0} replicas unsync, please check'.format(results[0][0]))
2014
# # 2.2 检查paxos副本是否有缺失 TODO
2015
# logging.info('check paxos replica success')
2017
## 3. 检查是否有做balance, locality变更
2018
#def check_rebalance_task(query_cur):
2019
# # 3.1 检查是否有做locality变更
2020
# (desc, results) = query_cur.exec_query("""select count(1) as cnt from DBA_OB_TENANT_JOBS where job_status='INPROGRESS' and result_code is null""")
2021
# if results[0][0] > 0 :
2022
# fail_list.append('{0} locality tasks is doing, please check'.format(results[0][0]))
2023
# # 3.2 检查是否有做balance
2024
# (desc, results) = query_cur.exec_query("""select count(1) as rebalance_task_cnt from CDB_OB_LS_REPLICA_TASKS""")
2025
# if results[0][0] > 0 :
2026
# fail_list.append('{0} rebalance tasks is doing, please check'.format(results[0][0]))
2027
# logging.info('check rebalance task success')
2030
#def check_cluster_status(query_cur):
2032
# (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""")
2033
# if results[0][0] > 0 :
2034
# fail_list.append('{0} tenant is merging, please check'.format(results[0][0]))
2035
# (desc, results) = query_cur.exec_query("""select /*+ query_timeout(1000000000) */ count(1) from __all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0""")
2036
# if results[0][0] > 0 :
2037
# fail_list.append('{0} tablet is merging, please check'.format(results[0][0]))
2038
# logging.info('check cluster status success')
2040
## 5. 检查是否有异常租户(creating,延迟删除,恢复中)
2041
#def check_tenant_status(query_cur):
2043
# # check tenant schema
2044
# (desc, results) = query_cur.exec_query("""select count(*) as count from DBA_OB_TENANTS where status != 'NORMAL'""")
2045
# if len(results) != 1 or len(results[0]) != 1:
2046
# fail_list.append('results len not match')
2047
# elif 0 != results[0][0]:
2048
# fail_list.append('has abnormal tenant, should stop')
2050
# logging.info('check tenant status success')
2052
# # check tenant info
2053
# # don't support restore tenant upgrade
2054
# (desc, results) = query_cur.exec_query("""select count(*) as count from oceanbase.__all_virtual_tenant_info where tenant_role != 'PRIMARY' and tenant_role != 'STANDBY'""")
2055
# if len(results) != 1 or len(results[0]) != 1:
2056
# fail_list.append('results len not match')
2057
# elif 0 != results[0][0]:
2058
# fail_list.append('has abnormal tenant info, should stop')
2060
# logging.info('check tenant info success')
2062
# # check tenant lock status
2063
# (desc, results) = query_cur.exec_query("""select count(*) from DBA_OB_TENANTS where LOCKED = 'YES'""")
2064
# if len(results) != 1 or len(results[0]) != 1:
2065
# fail_list.append('results len not match')
2066
# elif 0 != results[0][0]:
2067
# fail_list.append('has locked tenant, should unlock')
2069
# logging.info('check tenant lock status success')
2072
#def check_restore_job_exist(query_cur):
2073
# (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_RESTORE_PROGRESS""")
2074
# if len(results) != 1 or len(results[0]) != 1:
2075
# fail_list.append('failed to restore job cnt')
2076
# elif results[0][0] != 0:
2077
# fail_list.append("""still has restore job, upgrade is not allowed temporarily""")
2078
# logging.info('check restore job success')
2080
#def check_is_primary_zone_distributed(primary_zone_str):
2081
# semicolon_pos = len(primary_zone_str)
2082
# for i in range(len(primary_zone_str)):
2083
# if primary_zone_str[i] == ';':
2086
# comma_pos = len(primary_zone_str)
2087
# for j in range(len(primary_zone_str)):
2088
# if primary_zone_str[j] == ',':
2091
# if comma_pos < semicolon_pos:
2096
## 7. 升级前需要primary zone只有一个
2097
#def check_tenant_primary_zone(query_cur):
2098
# sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
2099
# (desc, results) = query_cur.exec_query(sql)
2100
# if len(results) != 1:
2101
# fail_list.append('min_observer_version is not sync')
2102
# elif len(results[0]) != 1:
2103
# fail_list.append('column cnt not match')
2105
# min_cluster_version = get_version(results[0][0])
2106
# if min_cluster_version < get_version("4.1.0.0"):
2107
# (desc, results) = query_cur.exec_query("""select tenant_name,primary_zone from DBA_OB_TENANTS where tenant_id != 1""");
2108
# for item in results:
2109
# if cmp(item[1], "RANDOM") == 0:
2110
# fail_list.append('{0} tenant primary zone random before update not allowed'.format(item[0]))
2111
# elif check_is_primary_zone_distributed(item[1]):
2112
# fail_list.append('{0} tenant primary zone distributed before update not allowed'.format(item[0]))
2113
# logging.info('check tenant primary zone success')
2115
## 8. 修改永久下线的时间,避免升级过程中缺副本
2116
#def modify_server_permanent_offline_time(cur):
2117
# set_parameter(cur, 'server_permanent_offline_time', '72h')
2120
#def check_ddl_task_execute(query_cur):
2121
# (desc, results) = query_cur.exec_query("""select count(1) from __all_virtual_ddl_task_status""")
2122
# if 0 != results[0][0]:
2123
# fail_list.append("There are DDL task in progress")
2124
# logging.info('check ddl task execut status success')
2127
#def check_backup_job_exist(query_cur):
2128
# # Backup jobs cannot be in-progress during upgrade.
2129
# (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_BACKUP_JOBS""")
2130
# if len(results) != 1 or len(results[0]) != 1:
2131
# fail_list.append('failed to backup job cnt')
2132
# elif results[0][0] != 0:
2133
# fail_list.append("""still has backup job, upgrade is not allowed temporarily""")
2135
# logging.info('check backup job success')
2138
#def check_archive_job_exist(query_cur):
2139
# min_cluster_version = 0
2140
# sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
2141
# (desc, results) = query_cur.exec_query(sql)
2142
# if len(results) != 1:
2143
# fail_list.append('min_observer_version is not sync')
2144
# elif len(results[0]) != 1:
2145
# fail_list.append('column cnt not match')
2147
# min_cluster_version = get_version(results[0][0])
2149
# # Archive jobs cannot be in-progress before upgrade from 4.0.
2150
# if min_cluster_version < get_version("4.1.0.0"):
2151
# (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_ARCHIVELOG where status!='STOP'""")
2152
# if len(results) != 1 or len(results[0]) != 1:
2153
# fail_list.append('failed to archive job cnt')
2154
# elif results[0][0] != 0:
2155
# fail_list.append("""still has archive job, upgrade is not allowed temporarily""")
2157
# logging.info('check archive job success')
2160
#def check_archive_dest_exist(query_cur):
2161
# min_cluster_version = 0
2162
# sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
2163
# (desc, results) = query_cur.exec_query(sql)
2164
# if len(results) != 1:
2165
# fail_list.append('min_observer_version is not sync')
2166
# elif len(results[0]) != 1:
2167
# fail_list.append('column cnt not match')
2169
# min_cluster_version = get_version(results[0][0])
2170
# # archive dest need to be cleaned before upgrade from 4.0.
2171
# if min_cluster_version < get_version("4.1.0.0"):
2172
# (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_ARCHIVE_DEST""")
2173
# if len(results) != 1 or len(results[0]) != 1:
2174
# fail_list.append('failed to archive dest cnt')
2175
# elif results[0][0] != 0:
2176
# fail_list.append("""still has archive destination, upgrade is not allowed temporarily""")
2178
# logging.info('check archive destination success')
2181
#def check_backup_dest_exist(query_cur):
2182
# min_cluster_version = 0
2183
# sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
2184
# (desc, results) = query_cur.exec_query(sql)
2185
# if len(results) != 1:
2186
# fail_list.append('min_observer_version is not sync')
2187
# elif len(results[0]) != 1:
2188
# fail_list.append('column cnt not match')
2190
# min_cluster_version = get_version(results[0][0])
2191
# # backup dest need to be cleaned before upgrade from 4.0.
2192
# if min_cluster_version < get_version("4.1.0.0"):
2193
# (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_BACKUP_PARAMETER where name='data_backup_dest' and (value!=NULL or value!='')""")
2194
# if len(results) != 1 or len(results[0]) != 1:
2195
# fail_list.append('failed to data backup dest cnt')
2196
# elif results[0][0] != 0:
2197
# fail_list.append("""still has backup destination, upgrade is not allowed temporarily""")
2199
# logging.info('check backup destination success')
2201
#def check_server_version(query_cur):
2202
# sql = """select distinct(substring_index(build_version, '_', 1)) from __all_server""";
2203
# (desc, results) = query_cur.exec_query(sql);
2204
# if len(results) != 1:
2205
# fail_list.append("servers build_version not match")
2207
# logging.info("check server version success")
2210
#def check_observer_status(query_cur):
2211
# (desc, results) = query_cur.exec_query("""select count(*) from oceanbase.__all_server where (start_service_time <= 0 or status != "active")""")
2212
# if results[0][0] > 0 :
2213
# fail_list.append('{0} observer not available , please check'.format(results[0][0]))
2214
# logging.info('check observer status success')
2217
#def check_schema_status(query_cur):
2218
# (desc, results) = query_cur.exec_query("""select if (a.cnt = b.cnt, 1, 0) as passed from (select count(*) as cnt from oceanbase.__all_virtual_server_schema_info where refreshed_schema_version > 1 and refreshed_schema_version % 8 = 0) as a join (select count(*) as cnt from oceanbase.__all_server join oceanbase.__all_tenant) as b""")
2219
# if results[0][0] != 1 :
2220
# fail_list.append('{0} schema not available, please check'.format(results[0][0]))
2221
# logging.info('check schema status success')
2223
## 16. 检查是否存在名为all/all_user/all_meta的租户
2224
#def check_not_supported_tenant_name(query_cur):
2225
# names = ["all", "all_user", "all_meta"]
2226
# (desc, results) = query_cur.exec_query("""select tenant_name from oceanbase.DBA_OB_TENANTS""")
2227
# for i in range(len(results)):
2228
# if results[i][0].lower() in names:
2229
# fail_list.append('a tenant named all/all_user/all_meta (case insensitive) cannot exist in the cluster, please rename the tenant')
2231
# logging.info('check special tenant name success')
2232
## 17 检查日志传输压缩是否有使用zlib压缩算法,在升级前需要保证所有observer未开启日志传输压缩或使用非zlib压缩算法
2233
#def check_log_transport_compress_func(query_cur):
2234
# (desc, results) = query_cur.exec_query("""select count(1) as cnt from oceanbase.__all_virtual_tenant_parameter_info where (name like "log_transport_compress_func" and value like "zlib_1.0")""")
2235
# if results[0][0] > 0 :
2236
# fail_list.append('The zlib compression algorithm is no longer supported with log_transport_compress_func, please replace it with other compression algorithms')
2237
# logging.info('check log_transport_compress_func success')
2238
## 18 检查升级过程中是否有表使用zlib压缩,在升级前需要保证所有表都不使用zlib压缩
2239
#def check_table_compress_func(query_cur):
2240
# (desc, results) = query_cur.exec_query("""select /*+ query_timeout(1000000000) */ count(1) from __all_virtual_table where (compress_func_name like '%zlib%')""")
2241
# if results[0][0] > 0 :
2242
# fail_list.append('There are tables use zlib compression, please replace it with other compression algorithms or do not use compression during the upgrade')
2243
# logging.info('check table compression method success')
2244
## 19 检查升级过程中 table_api/obkv 连接传输是否使用了zlib压缩,在升级前需要保证所有 obkv/table_api 连接未开启zlib压缩传输或者使用非zlib压缩算法
2245
#def check_table_api_transport_compress_func(query_cur):
2246
# (desc, results) = query_cur.exec_query("""select count(1) as cnt from GV$OB_PARAMETERS where (name like "tableapi_transport_compress_func" and value like "zlib%");""")
2247
# if results[0][0] > 0 :
2248
# fail_list.append('Table api connection is not allowed to use zlib as compression algorithm during the upgrade, please use other compression algorithms by setting table_api_transport_compress_func')
2249
# logging.info('check table_api_transport_compress_func success')
2252
#def check_tenant_clone_job_exist(query_cur):
2253
# min_cluster_version = 0
2254
# sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
2255
# (desc, results) = query_cur.exec_query(sql)
2256
# if len(results) != 1:
2257
# fail_list.append('min_observer_version is not sync')
2258
# elif len(results[0]) != 1:
2259
# fail_list.append('column cnt not match')
2261
# min_cluster_version = get_version(results[0][0])
2262
# if min_cluster_version >= get_version("4.3.0.0"):
2263
# (desc, results) = query_cur.exec_query("""select count(1) from __all_virtual_clone_job""")
2264
# if len(results) != 1 or len(results[0]) != 1:
2265
# fail_list.append('failed to tenant clone job cnt')
2266
# elif results[0][0] != 0:
2267
# fail_list.append("""still has tenant clone job, upgrade is not allowed temporarily""")
2269
# logging.info('check tenant clone job success')
2272
#def check_tenant_snapshot_task_exist(query_cur):
2273
# min_cluster_version = 0
2274
# sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
2275
# (desc, results) = query_cur.exec_query(sql)
2276
# if len(results) != 1:
2277
# fail_list.append('min_observer_version is not sync')
2278
# elif len(results[0]) != 1:
2279
# fail_list.append('column cnt not match')
2281
# min_cluster_version = get_version(results[0][0])
2282
# if min_cluster_version >= get_version("4.3.0.0"):
2283
# (desc, results) = query_cur.exec_query("""select count(1) from __all_virtual_tenant_snapshot where status!='NORMAL'""")
2284
# if len(results) != 1 or len(results[0]) != 1:
2285
# fail_list.append('failed to tenant snapshot task')
2286
# elif results[0][0] != 0:
2287
# fail_list.append("""still has tenant snapshot task, upgrade is not allowed temporarily""")
2289
# logging.info('check tenant snapshot task success')
2291
## 17. 检查是否有租户在升到4.3.0版本之前已将binlog_row_image设为MINIMAL
2292
#def check_variable_binlog_row_image(query_cur):
2293
## 4.3.0.0之前的版本,MINIMAL模式生成的日志CDC无法正常消费(DELETE日志).
2294
## 4.3.0版本开始,MINIMAL模式做了改进,支持CDC消费,需要在升级到4.3.0.0之后再打开.
2295
# min_cluster_version = 0
2296
# sql = """select distinct value from GV$OB_PARAMETERS where name='min_observer_version'"""
2297
# (desc, results) = query_cur.exec_query(sql)
2298
# if len(results) != 1:
2299
# fail_list.append('min_observer_version is not sync')
2300
# elif len(results[0]) != 1:
2301
# fail_list.append('column cnt not match')
2303
# min_cluster_version = get_version(results[0][0])
2304
# # check cluster version
2305
# if min_cluster_version < get_version("4.3.0.0"):
2306
# (desc, results) = query_cur.exec_query("""select count(*) from CDB_OB_SYS_VARIABLES where NAME='binlog_row_image' and VALUE = '0'""")
2307
# if results[0][0] > 0 :
2308
# fail_list.append('Sys Variable binlog_row_image is set to MINIMAL, please check'.format(results[0][0]))
2309
# logging.info('check variable binlog_row_image success')
2311
## last check of do_check, make sure no function execute after check_fail_list
2312
#def check_fail_list():
2313
# if len(fail_list) != 0 :
2314
# error_msg ="upgrade checker failed with " + str(len(fail_list)) + " reasons: " + ", ".join(['['+x+"] " for x in fail_list])
2315
# raise MyError(error_msg)
2317
#def set_query_timeout(query_cur, timeout):
2319
# sql = """set @@session.ob_query_timeout = {0}""".format(timeout * 1000 * 1000)
2320
# query_cur.exec_sql(sql)
2323
#def do_check(my_host, my_port, my_user, my_passwd, timeout, upgrade_params):
2325
# conn = mysql.connector.connect(user = my_user,
2326
# password = my_passwd,
2329
# database = 'oceanbase',
2330
# raise_on_warnings = True)
2331
# conn.autocommit = True
2332
# cur = conn.cursor(buffered=True)
2334
# query_cur = Cursor(cur)
2335
# set_query_timeout(query_cur, timeout)
2336
# check_observer_version(query_cur, upgrade_params)
2337
# check_data_version(query_cur)
2338
# check_paxos_replica(query_cur)
2339
# check_rebalance_task(query_cur)
2340
# check_cluster_status(query_cur)
2341
# check_tenant_status(query_cur)
2342
# check_restore_job_exist(query_cur)
2343
# check_tenant_primary_zone(query_cur)
2344
# check_ddl_task_execute(query_cur)
2345
# check_backup_job_exist(query_cur)
2346
# check_archive_job_exist(query_cur)
2347
# check_archive_dest_exist(query_cur)
2348
# check_backup_dest_exist(query_cur)
2349
# check_observer_status(query_cur)
2350
# check_schema_status(query_cur)
2351
# check_server_version(query_cur)
2352
# check_not_supported_tenant_name(query_cur)
2353
# check_tenant_clone_job_exist(query_cur)
2354
# check_tenant_snapshot_task_exist(query_cur)
2355
# check_log_transport_compress_func(query_cur)
2356
# check_table_compress_func(query_cur)
2357
# check_table_api_transport_compress_func(query_cur)
2358
# check_variable_binlog_row_image(query_cur)
2359
# # all check func should execute before check_fail_list
2361
# modify_server_permanent_offline_time(cur)
2362
# except Exception, e:
2363
# logging.exception('run error')
2368
# except mysql.connector.Error, e:
2369
# logging.exception('connection error')
2371
# except Exception, e:
2372
# logging.exception('normal error')
2375
#if __name__ == '__main__':
2376
# upgrade_params = UpgradeParams()
2377
# change_opt_defult_value('log-file', upgrade_params.log_filename)
2378
# parse_options(sys.argv[1:])
2379
# if not has_no_local_opts():
2380
# deal_with_local_opts()
2382
# check_db_client_opts()
2383
# log_filename = get_opt_log_file()
2384
# upgrade_params.log_filename = log_filename
2385
# # 日志配置放在这里是为了前面的操作不要覆盖掉日志文件
2386
# config_logging_module(upgrade_params.log_filename)
2388
# host = get_opt_host()
2389
# port = int(get_opt_port())
2390
# user = get_opt_user()
2391
# password = get_opt_password()
2392
# timeout = int(get_opt_timeout())
2393
# logging.info('parameters from cmd: host=\"%s\", port=%s, user=\"%s\", password=\"%s\", timeout=\"%s\", log-file=\"%s\"',\
2394
# host, port, user, password, timeout, log_filename)
2395
# do_check(host, port, user, password, timeout, upgrade_params)
2396
# except mysql.connector.Error, e:
2397
# logging.exception('mysql connctor error')
2399
# except Exception, e:
2400
# logging.exception('normal error')
2402
####====XXXX======######==== I am a splitter ====######======XXXX====####
2403
#filename:upgrade_health_checker.py
2404
##!/usr/bin/env python
2405
## -*- coding: utf-8 -*-
2410
#import mysql.connector
2411
#from mysql.connector import errorcode
2415
#class UpgradeParams:
2416
# log_filename = 'upgrade_cluster_health_checker.log'
2418
##### --------------start : my_error.py --------------
2419
#class MyError(Exception):
2420
# def __init__(self, value):
2423
# return repr(self.value)
2425
##### --------------start : actions.py 只允许执行查询语句--------------
2428
# def __init__(self, cursor):
2429
# self.__cursor = cursor
2430
# def exec_sql(self, sql, print_when_succ = True):
2432
# self.__cursor.execute(sql)
2433
# rowcount = self.__cursor.rowcount
2434
# if True == print_when_succ:
2435
# logging.info('succeed to execute sql: %s, rowcount = %d', sql, rowcount)
2437
# except mysql.connector.Error, e:
2438
# logging.exception('mysql connector error, fail to execute sql: %s', sql)
2440
# except Exception, e:
2441
# logging.exception('normal error, fail to execute sql: %s', sql)
2443
# def exec_query(self, sql, print_when_succ = True):
2445
# self.__cursor.execute(sql)
2446
# results = self.__cursor.fetchall()
2447
# rowcount = self.__cursor.rowcount
2448
# if True == print_when_succ:
2449
# logging.info('succeed to execute query: %s, rowcount = %d', sql, rowcount)
2450
# return (self.__cursor.description, results)
2451
# except mysql.connector.Error, e:
2452
# logging.exception('mysql connector error, fail to execute sql: %s', sql)
2454
# except Exception, e:
2455
# logging.exception('normal error, fail to execute sql: %s', sql)
2457
##### ---------------end----------------------
2459
##### --------------start : opt.py --------------
2464
#sys.argv[0] + """ [OPTIONS]""" +\
2466
#'-I, --help Display this help and exit.\n' +\
2467
#'-V, --version Output version information and exit.\n' +\
2468
#'-h, --host=name Connect to host.\n' +\
2469
#'-P, --port=name Port number to use for connection.\n' +\
2470
#'-u, --user=name User for login.\n' +\
2471
#'-p, --password=name Password to use when connecting to server. If password is\n' +\
2472
#' not given it\'s empty string "".\n' +\
2473
#'-m, --module=name Modules to run. Modules should be a string combined by some of\n' +\
2474
#' the following strings: ddl, normal_dml, each_tenant_dml,\n' +\
2475
#' system_variable_dml, special_action, all. "all" represents\n' +\
2476
#' that all modules should be run. They are splitted by ",".\n' +\
2477
#' For example: -m all, or --module=ddl,normal_dml,special_action\n' +\
2478
#'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\
2479
#'-t, --timeout=name check timeout.\n' + \
2480
#'-z, --zone=name If zone is not specified, check all servers status in cluster. \n' +\
2481
#' Otherwise, only check servers status in specified zone. \n' + \
2483
#'Maybe you want to run cmd like that:\n' +\
2484
#sys.argv[0] + ' -h 127.0.0.1 -P 3306 -u admin -p admin\n'
2486
#version_str = """version 1.0.0"""
2489
# __g_short_name_set = set([])
2490
# __g_long_name_set = set([])
2491
# __short_name = None
2493
# __is_with_param = None
2494
# __is_local_opt = None
2497
# def __init__(self, short_name, long_name, is_with_param, is_local_opt, default_value = None):
2498
# if short_name in Option.__g_short_name_set:
2499
# raise MyError('duplicate option short name: {0}'.format(short_name))
2500
# elif long_name in Option.__g_long_name_set:
2501
# raise MyError('duplicate option long name: {0}'.format(long_name))
2502
# Option.__g_short_name_set.add(short_name)
2503
# Option.__g_long_name_set.add(long_name)
2504
# self.__short_name = short_name
2505
# self.__long_name = long_name
2506
# self.__is_with_param = is_with_param
2507
# self.__is_local_opt = is_local_opt
2508
# self.__has_value = False
2509
# if None != default_value:
2510
# self.set_value(default_value)
2511
# def is_with_param(self):
2512
# return self.__is_with_param
2513
# def get_short_name(self):
2514
# return self.__short_name
2515
# def get_long_name(self):
2516
# return self.__long_name
2517
# def has_value(self):
2518
# return self.__has_value
2519
# def get_value(self):
2520
# return self.__value
2521
# def set_value(self, value):
2522
# self.__value = value
2523
# self.__has_value = True
2524
# def is_local_opt(self):
2525
# return self.__is_local_opt
2526
# def is_valid(self):
2527
# return None != self.__short_name and None != self.__long_name and True == self.__has_value and None != self.__value
2531
#Option('I', 'help', False, True),\
2532
#Option('V', 'version', False, True),\
2533
#Option('h', 'host', True, False),\
2534
#Option('P', 'port', True, False),\
2535
#Option('u', 'user', True, False),\
2536
#Option('p', 'password', True, False, ''),\
2538
#Option('m', 'module', True, False, 'all'),\
2539
## 日志文件路径,不同脚本的main函数中中会改成不同的默认值
2540
#Option('l', 'log-file', True, False),\
2541
#Option('t', 'timeout', True, False, 0),\
2542
#Option('z', 'zone', True, False, ''),\
2545
#def change_opt_defult_value(opt_long_name, opt_default_val):
2548
# if opt.get_long_name() == opt_long_name:
2549
# opt.set_value(opt_default_val)
2552
#def has_no_local_opts():
2554
# no_local_opts = True
2556
# if opt.is_local_opt() and opt.has_value():
2557
# no_local_opts = False
2558
# return no_local_opts
2560
#def check_db_client_opts():
2563
# if not opt.is_local_opt() and not opt.has_value():
2564
# raise MyError('option "-{0}" has not been specified, maybe you should run "{1} --help" for help'\
2565
# .format(opt.get_short_name(), sys.argv[0]))
2567
#def parse_option(opt_name, opt_val):
2570
# if opt_name in (('-' + opt.get_short_name()), ('--' + opt.get_long_name())):
2571
# opt.set_value(opt_val)
2573
#def parse_options(argv):
2578
# if opt.is_with_param():
2579
# short_opt_str += opt.get_short_name() + ':'
2581
# short_opt_str += opt.get_short_name()
2583
# if opt.is_with_param():
2584
# long_opt_list.append(opt.get_long_name() + '=')
2586
# long_opt_list.append(opt.get_long_name())
2587
# (opts, args) = getopt.getopt(argv, short_opt_str, long_opt_list)
2588
# for (opt_name, opt_val) in opts:
2589
# parse_option(opt_name, opt_val)
2590
# if has_no_local_opts():
2591
# check_db_client_opts()
2593
#def deal_with_local_opt(opt):
2594
# if 'help' == opt.get_long_name():
2597
# elif 'version' == opt.get_long_name():
2601
#def deal_with_local_opts():
2603
# if has_no_local_opts():
2604
# raise MyError('no local options, can not deal with local options')
2607
# if opt.is_local_opt() and opt.has_value():
2608
# deal_with_local_opt(opt)
2615
# if 'host' == opt.get_long_name():
2616
# return opt.get_value()
2621
# if 'port' == opt.get_long_name():
2622
# return opt.get_value()
2627
# if 'user' == opt.get_long_name():
2628
# return opt.get_value()
2630
#def get_opt_password():
2633
# if 'password' == opt.get_long_name():
2634
# return opt.get_value()
2636
#def get_opt_module():
2639
# if 'module' == opt.get_long_name():
2640
# return opt.get_value()
2642
#def get_opt_log_file():
2645
# if 'log-file' == opt.get_long_name():
2646
# return opt.get_value()
2648
#def get_opt_timeout():
2651
# if 'timeout' == opt.get_long_name():
2652
# return opt.get_value()
2657
# if 'zone' == opt.get_long_name():
2658
# return opt.get_value()
2659
##### ---------------end----------------------
2661
##### --------------start : do_upgrade_pre.py--------------
2662
#def config_logging_module(log_filenamme):
2663
# logging.basicConfig(level=logging.INFO,\
2664
# format='[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s',\
2665
# datefmt='%Y-%m-%d %H:%M:%S',\
2666
# filename=log_filenamme,\
2669
# formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s', '%Y-%m-%d %H:%M:%S')
2670
# #######################################
2671
# # 定义一个Handler打印INFO及以上级别的日志到sys.stdout
2672
# stdout_handler = logging.StreamHandler(sys.stdout)
2673
# stdout_handler.setLevel(logging.INFO)
2675
# stdout_handler.setFormatter(formatter)
2676
# # 将定义好的stdout_handler日志handler添加到root logger
2677
# logging.getLogger('').addHandler(stdout_handler)
2678
##### ---------------end----------------------
2680
#def check_zone_valid(query_cur, zone):
2682
# sql = """select count(*) from oceanbase.DBA_OB_ZONES where zone = '{0}'""".format(zone)
2683
# (desc, results) = query_cur.exec_query(sql);
2684
# if len(results) != 1 or len(results[0]) != 1:
2685
# raise MyError("unmatched row/column cnt")
2686
# elif results[0][0] == 0:
2687
# raise MyError("zone:{0} doesn't exist".format(zone))
2689
# logging.info("zone:{0} is valid".format(zone))
2691
# logging.info("zone is empty, check all servers in cluster")
2693
#def fetch_tenant_ids(query_cur):
2695
# tenant_id_list = []
2696
# (desc, results) = query_cur.exec_query("""select distinct tenant_id from oceanbase.__all_tenant order by tenant_id desc""")
2698
# tenant_id_list.append(r[0])
2699
# return tenant_id_list
2700
# except Exception, e:
2701
# logging.exception('fail to fetch distinct tenant ids')
2704
#def set_default_timeout_by_tenant(query_cur, timeout, timeout_per_tenant, min_timeout):
2706
# logging.info("use timeout from opt, timeout(s):{0}".format(timeout))
2708
# tenant_id_list = fetch_tenant_ids(query_cur)
2709
# cal_timeout = len(tenant_id_list) * timeout_per_tenant
2710
# timeout = (cal_timeout if cal_timeout > min_timeout else min_timeout)
2711
# logging.info("use default timeout caculated by tenants, "
2712
# "timeout(s):{0}, tenant_count:{1}, "
2713
# "timeout_per_tenant(s):{2}, min_timeout(s):{3}"
2714
# .format(timeout, len(tenant_id_list), timeout_per_tenant, min_timeout))
2719
## 0. 检查server版本是否严格一致
2720
#def check_server_version_by_zone(query_cur, zone):
2722
# logging.info("skip check server version by cluster")
2724
# sql = """select distinct(substring_index(build_version, '_', 1)) from oceanbase.__all_server where zone = '{0}'""".format(zone);
2725
# (desc, results) = query_cur.exec_query(sql);
2726
# if len(results) != 1:
2727
# raise MyError("servers build_version not match")
2729
# logging.info("check server version success")
2731
## 1. 检查paxos副本是否同步, paxos副本是否缺失
2732
#def check_paxos_replica(query_cur, timeout):
2733
# # 1.1 检查paxos副本是否同步
2734
# sql = """select count(*) from oceanbase.GV$OB_LOG_STAT where in_sync = 'NO'"""
2735
# wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600)
2736
# check_until_timeout(query_cur, sql, 0, wait_timeout)
2738
# # 1.2 检查paxos副本是否有缺失 TODO
2739
# logging.info('check paxos replica success')
2741
## 2. 检查observer是否可服务
2742
#def check_observer_status(query_cur, zone, timeout):
2743
# sql = """select count(*) from oceanbase.__all_server where (start_service_time <= 0 or status='inactive')"""
2745
# sql += """ and zone = '{0}'""".format(zone)
2746
# wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600)
2747
# check_until_timeout(query_cur, sql, 0, wait_timeout)
2750
#def check_schema_status(query_cur, timeout):
2751
# sql = """select if (a.cnt = b.cnt, 1, 0) as passed from (select count(*) as cnt from oceanbase.__all_virtual_server_schema_info where refreshed_schema_version > 1 and refreshed_schema_version % 8 = 0) as a join (select count(*) as cnt from oceanbase.__all_server join oceanbase.__all_tenant) as b"""
2752
# wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600)
2753
# check_until_timeout(query_cur, sql, 1, wait_timeout)
2755
## 4. check major finish
2756
#def check_major_merge(query_cur, timeout):
2758
# (desc, results) = query_cur.exec_query("""select distinct value from oceanbase.GV$OB_PARAMETERS where name = 'enable_major_freeze';""")
2759
# if len(results) != 1:
2761
# elif results[0][0] != 'True':
2763
# if need_check == 1:
2764
# wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600)
2765
# sql = """select count(1) from oceanbase.CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')"""
2766
# check_until_timeout(query_cur, sql, 0, wait_timeout)
2767
# sql2 = """select /*+ query_timeout(1000000000) */ count(1) from oceanbase.__all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0"""
2768
# check_until_timeout(query_cur, sql2, 0, wait_timeout)
2770
#def check_until_timeout(query_cur, sql, value, timeout):
2771
# times = timeout / 10
2773
# (desc, results) = query_cur.exec_query(sql)
2775
# if len(results) != 1 or len(results[0]) != 1:
2776
# raise MyError("unmatched row/column cnt")
2777
# elif results[0][0] == value:
2778
# logging.info("check value is {0} success".format(value))
2781
# logging.info("value is {0}, expected value is {1}, not matched".format(results[0][0], value))
2785
# logging.warn("""check {0} job timeout""".format(job_name))
2790
#def do_check(my_host, my_port, my_user, my_passwd, upgrade_params, timeout, need_check_major_status, zone = ''):
2792
# conn = mysql.connector.connect(user = my_user,
2793
# password = my_passwd,
2796
# database = 'oceanbase',
2797
# raise_on_warnings = True)
2798
# conn.autocommit = True
2799
# cur = conn.cursor(buffered=True)
2801
# query_cur = QueryCursor(cur)
2802
# check_zone_valid(query_cur, zone)
2803
# check_observer_status(query_cur, zone, timeout)
2804
# check_paxos_replica(query_cur, timeout)
2805
# check_schema_status(query_cur, timeout)
2806
# check_server_version_by_zone(query_cur, zone)
2807
# if True == need_check_major_status:
2808
# check_major_merge(query_cur, timeout)
2809
# except Exception, e:
2810
# logging.exception('run error')
2815
# except mysql.connector.Error, e:
2816
# logging.exception('connection error')
2818
# except Exception, e:
2819
# logging.exception('normal error')
2822
#if __name__ == '__main__':
2823
# upgrade_params = UpgradeParams()
2824
# change_opt_defult_value('log-file', upgrade_params.log_filename)
2825
# parse_options(sys.argv[1:])
2826
# if not has_no_local_opts():
2827
# deal_with_local_opts()
2829
# check_db_client_opts()
2830
# log_filename = get_opt_log_file()
2831
# upgrade_params.log_filename = log_filename
2832
# # 日志配置放在这里是为了前面的操作不要覆盖掉日志文件
2833
# config_logging_module(upgrade_params.log_filename)
2835
# host = get_opt_host()
2836
# port = int(get_opt_port())
2837
# user = get_opt_user()
2838
# password = get_opt_password()
2839
# timeout = int(get_opt_timeout())
2840
# zone = get_opt_zone()
2841
# logging.info('parameters from cmd: host=\"%s\", port=%s, user=\"%s\", password=\"%s\", log-file=\"%s\", timeout=%s, zone=\"%s\"', \
2842
# host, port, user, password, log_filename, timeout, zone)
2843
# do_check(host, port, user, password, upgrade_params, timeout, False, zone) # need_check_major_status = False
2844
# except mysql.connector.Error, e:
2845
# logging.exception('mysql connctor error')
2847
# except Exception, e:
2848
# logging.exception('normal error')
2851
####====XXXX======######==== I am a splitter ====######======XXXX====####
2852
#filename:upgrade_post_checker.py
2853
##!/usr/bin/env python
2854
## -*- coding: utf-8 -*-
2859
#import mysql.connector
2860
#from mysql.connector import errorcode
2867
#def check_cluster_version(cur, timeout):
2868
# current_cluster_version = actions.get_current_cluster_version()
2869
# actions.wait_parameter_sync(cur, False, "min_observer_version", current_cluster_version, timeout)
2872
#def check_data_version(cur, query_cur, timeout):
2874
# # get tenant except standby tenant
2875
# sql = "select tenant_id from oceanbase.__all_tenant except select tenant_id from oceanbase.__all_virtual_tenant_info where tenant_role = 'STANDBY'"
2876
# (desc, results) = query_cur.exec_query(sql)
2877
# if len(results) == 0:
2878
# logging.warn('result cnt not match')
2880
# tenant_count = len(results)
2881
# tenant_ids_str = ''
2882
# for index, row in enumerate(results):
2883
# tenant_ids_str += """{0}{1}""".format((',' if index > 0 else ''), row[0])
2886
# sql = "select count(*) from oceanbase.__all_server";
2887
# (desc, results) = query_cur.exec_query(sql)
2888
# if len(results) != 1 or len(results[0]) != 1:
2889
# logging.warn('result cnt not match')
2891
# server_count = results[0][0]
2893
# # check compatible sync
2894
# parameter_count = int(server_count) * int(tenant_count)
2895
# current_data_version = actions.get_current_data_version()
2897
# query_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 2, 60)
2898
# actions.set_session_timeout(cur, query_timeout)
2900
# sql = """select count(*) as cnt from oceanbase.__all_virtual_tenant_parameter_info where name = 'compatible' and value = '{0}' and tenant_id in ({1})""".format(current_data_version, tenant_ids_str)
2902
# wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 60)
2903
# times = wait_timeout / 5
2907
# result = cur.fetchall()
2908
# if len(result) != 1 or len(result[0]) != 1:
2909
# logging.exception('result cnt not match')
2911
# elif result[0][0] == parameter_count:
2912
# logging.info("""'compatible' is sync, value is {0}""".format(current_data_version))
2915
# logging.info("""'compatible' is not sync, value should be {0}, expected_cnt should be {1}, current_cnt is {2}""".format(current_data_version, parameter_count, result[0][0]))
2919
# logging.exception("""check compatible:{0} sync timeout""".format(current_data_version))
2923
# actions.set_session_timeout(cur, 10)
2925
# # check target_data_version/current_data_version from __all_core_table
2926
# int_current_data_version = actions.get_version(current_data_version)
2927
# sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0} and tenant_id in ({1})".format(int_current_data_version, tenant_ids_str)
2928
# (desc, results) = query_cur.exec_query(sql)
2929
# if len(results) != 1 or len(results[0]) != 1:
2930
# logging.warn('result cnt not match')
2932
# elif 2 * tenant_count != results[0][0]:
2933
# logging.warn('target_data_version/current_data_version not match with {0}, tenant_cnt:{1}, result_cnt:{2}'.format(current_data_version, tenant_count, results[0][0]))
2936
# logging.info("all tenant's target_data_version/current_data_version are match with {0}".format(current_data_version))
2939
#def check_root_inspection(cur, query_cur, timeout):
2940
# sql = "select count(*) from oceanbase.__all_virtual_upgrade_inspection where info != 'succeed'"
2942
# wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 600)
2944
# times = wait_timeout / 10
2946
# (desc, results) = query_cur.exec_query(sql)
2947
# if results[0][0] == 0:
2953
# logging.warn('check root inspection failed!')
2955
# logging.info('check root inspection success')
2958
#def enable_ddl(cur, timeout):
2959
# actions.set_parameter(cur, 'enable_ddl', 'True', timeout)
2962
#def enable_rebalance(cur, timeout):
2963
# actions.set_parameter(cur, 'enable_rebalance', 'True', timeout)
2966
#def enable_rereplication(cur, timeout):
2967
# actions.set_parameter(cur, 'enable_rereplication', 'True', timeout)
2970
#def enable_major_freeze(cur, timeout):
2971
# actions.set_parameter(cur, 'enable_major_freeze', 'True', timeout)
2972
# actions.set_tenant_parameter(cur, '_enable_adaptive_compaction', 'True', timeout)
2973
# actions.do_resume_merge(cur, timeout)
2976
#def do_check(conn, cur, query_cur, timeout):
2978
# check_cluster_version(cur, timeout)
2979
# check_data_version(cur, query_cur, timeout)
2980
# check_root_inspection(cur, query_cur, timeout)
2981
# enable_ddl(cur, timeout)
2982
# enable_rebalance(cur, timeout)
2983
# enable_rereplication(cur, timeout)
2984
# enable_major_freeze(cur, timeout)
2985
# except Exception, e:
2986
# logging.exception('run error')
2988
####====XXXX======######==== I am a splitter ====######======XXXX====####
2995
from random import Random
2997
class SplitError(Exception):
2998
def __init__(self, value):
3001
return repr(self.value)
3003
def random_str(rand_str_len = 8):
3005
chars = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789'
3006
length = len(chars) - 1
3008
for i in range(rand_str_len):
3009
str += chars[random.randint(0, length)]
3012
def split_py_files(sub_files_dir):
3014
file_splitter_line = '####====XXXX======######==== I am a splitter ====######======XXXX====####'
3015
sub_filename_line_prefix = '#filename:'
3016
sub_file_module_end_line = '#sub file module end'
3017
os.makedirs(sub_files_dir)
3018
print('succeed to create run dir: ' + sub_files_dir + char_enter)
3019
cur_file = open(sys.argv[0], 'r')
3020
cur_file_lines = cur_file.readlines()
3021
cur_file_lines_count = len(cur_file_lines)
3024
begin_read_sub_py_file = False
3025
is_first_splitter_line = True
3027
while i < cur_file_lines_count:
3028
if (file_splitter_line + char_enter) != cur_file_lines[i]:
3029
if begin_read_sub_py_file:
3030
sub_file_lines.append(cur_file_lines[i])
3032
if is_first_splitter_line:
3033
is_first_splitter_line = False
3036
sub_file = open(sub_files_dir + '/' + sub_filename, 'w')
3037
for sub_file_line in sub_file_lines:
3038
sub_file.write(sub_file_line[1:])
3044
if i >= cur_file_lines_count:
3045
raise SplitError('invalid line index:' + str(i) + ', lines_count:' + str(cur_file_lines_count))
3046
elif (sub_file_module_end_line + char_enter) == cur_file_lines[i]:
3047
print 'succeed to split all sub py files'
3050
mark_idx = cur_file_lines[i].find(sub_filename_line_prefix)
3052
raise SplitError('invalid sub file name line, mark_idx = ' + str(mark_idx) + ', line = ' + cur_file_lines[i])
3054
sub_filename = cur_file_lines[i][len(sub_filename_line_prefix):-1]
3055
begin_read_sub_py_file = True
3061
if __name__ == '__main__':
3062
cur_filename = sys.argv[0][sys.argv[0].rfind(os.sep)+1:]
3063
(cur_file_short_name,cur_file_ext_name1) = os.path.splitext(sys.argv[0])
3064
(cur_file_real_name,cur_file_ext_name2) = os.path.splitext(cur_filename)
3065
sub_files_dir_suffix = '_extract_files_' + datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') + '_' + random_str()
3066
sub_files_dir = cur_file_short_name + sub_files_dir_suffix
3067
sub_files_short_dir = cur_file_real_name + sub_files_dir_suffix
3068
split_py_files(sub_files_dir)
3069
exec('from ' + sub_files_short_dir + '.do_upgrade_pre import do_upgrade_by_argv')
3070
do_upgrade_by_argv(sys.argv[1:])