oceanbase

Форк
0
/
upgrade_pre.py 
3070 строк · 117.5 Кб
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
####====XXXX======######==== I am a splitter ====######======XXXX====####
4
#filename:__init__.py
5
##!/usr/bin/env python
6
## -*- coding: utf-8 -*-
7
####====XXXX======######==== I am a splitter ====######======XXXX====####
8
#filename:actions.py
9
##!/usr/bin/env python
10
## -*- coding: utf-8 -*-
11
#
12
#import time
13
#import re
14
#import json
15
#import traceback
16
#import sys
17
#import mysql.connector
18
#from mysql.connector import errorcode
19
#from my_error import MyError
20
#import logging
21
#
22
#class SqlItem:
23
#  action_sql = None
24
#  rollback_sql = None
25
#  def __init__(self, action_sql, rollback_sql):
26
#    self.action_sql = action_sql
27
#    self.rollback_sql = rollback_sql
28
#
29
#current_cluster_version = "4.3.0.1"
30
#current_data_version = "4.3.0.1"
31
#g_succ_sql_list = []
32
#g_commit_sql_list = []
33
#
34
#def get_current_cluster_version():
35
#  return current_cluster_version
36
#
37
#def get_current_data_version():
38
#  return current_data_version
39
#
40
#def refresh_commit_sql_list():
41
#  global g_succ_sql_list
42
#  global g_commit_sql_list
43
#  if len(g_commit_sql_list) < len(g_succ_sql_list):
44
#    for i in range(len(g_commit_sql_list), len(g_succ_sql_list)):
45
#      g_commit_sql_list.append(g_succ_sql_list[i])
46
#
47
#def get_succ_sql_list_str():
48
#  global g_succ_sql_list
49
#  ret_str = ''
50
#  for i in range(0, len(g_succ_sql_list)):
51
#    if i > 0:
52
#      ret_str += '\n'
53
#    ret_str += g_succ_sql_list[i].action_sql + ';'
54
#  return ret_str
55
#
56
#def get_commit_sql_list_str():
57
#  global g_commit_sql_list
58
#  ret_str = ''
59
#  for i in range(0, len(g_commit_sql_list)):
60
#    if i > 0:
61
#      ret_str += '\n'
62
#    ret_str += g_commit_sql_list[i].action_sql + ';'
63
#  return ret_str
64
#
65
#def get_rollback_sql_file_lines_str():
66
#  global g_commit_sql_list
67
#  ret_str = ''
68
#  g_commit_sql_list_len = len(g_commit_sql_list)
69
#  for i in range(0, g_commit_sql_list_len):
70
#    if i > 0:
71
#      ret_str += '\n'
72
#    idx = g_commit_sql_list_len - 1 - i
73
#    ret_str += '/*\n' + g_commit_sql_list[idx].action_sql + ';\n*/\n'
74
#    ret_str += g_commit_sql_list[idx].rollback_sql + ';'
75
#  return ret_str
76
#
77
#def dump_rollback_sql_to_file(rollback_sql_filename):
78
#  logging.info('===================== begin to dump rollback sql file ============================')
79
#  rollback_sql_file = open(rollback_sql_filename, 'w')
80
#  rollback_sql_file.write('# 此文件是回滚用的sql。\n')
81
#  rollback_sql_file.write('# 注释的sql是已经成功commit的sql,它的下一条没被注释的sql则是对应的回滚sql。回滚的sql的排序跟commit的sql的排序刚好相反。\n')
82
#  rollback_sql_file.write('# 跑升级脚本失败的时候可以参考本文件来进行回滚。\n')
83
#  rollback_sql_file.write('\n')
84
#  rollback_sql_file_lines_str = get_rollback_sql_file_lines_str()
85
#  rollback_sql_file.write(rollback_sql_file_lines_str)
86
#  rollback_sql_file.close()
87
#  logging.info('=========== succeed to dump rollback sql file to: ' + rollback_sql_filename + '===============')
88
#
89
#def check_is_ddl_sql(sql):
90
#  word_list = sql.split()
91
#  if len(word_list) < 1:
92
#    raise MyError('sql is empty, sql="{0}"'.format(sql))
93
#  key_word = word_list[0].lower()
94
#  if 'create' != key_word and 'alter' != key_word:
95
#    raise MyError('sql must be ddl, key_word="{0}", sql="{1}"'.format(key_word, sql))
96
#
97
#def check_is_query_sql(sql):
98
#  word_list = sql.split()
99
#  if len(word_list) < 1:
100
#    raise MyError('sql is empty, sql="{0}"'.format(sql))
101
#  key_word = word_list[0].lower()
102
#  if 'select' != key_word and 'show' != key_word and 'desc' != key_word:
103
#    raise MyError('sql must be query, key_word="{0}", sql="{1}"'.format(key_word, sql))
104
#
105
#def check_is_update_sql(sql):
106
#  word_list = sql.split()
107
#  if len(word_list) < 1:
108
#    raise MyError('sql is empty, sql="{0}"'.format(sql))
109
#  key_word = word_list[0].lower()
110
#  if 'insert' != key_word and 'update' != key_word and 'replace' != key_word and 'set' != key_word and 'delete' != key_word:
111
#    # 还有类似这种:select @current_ts := now()
112
#    if not (len(word_list) >= 3 and 'select' == word_list[0].lower()\
113
#        and word_list[1].lower().startswith('@') and ':=' == word_list[2].lower()):
114
#      raise MyError('sql must be update, key_word="{0}", sql="{1}"'.format(key_word, sql))
115
#
116
#def get_min_cluster_version(cur):
117
#  min_cluster_version = 0
118
#  sql = """select distinct value from oceanbase.GV$OB_PARAMETERS where name='min_observer_version'"""
119
#  logging.info(sql)
120
#  cur.execute(sql)
121
#  results = cur.fetchall()
122
#  if len(results) != 1:
123
#    logging.exception('min_observer_version is not sync')
124
#    raise e
125
#  elif len(results[0]) != 1:
126
#    logging.exception('column cnt not match')
127
#    raise e
128
#  else:
129
#    min_cluster_version = get_version(results[0][0])
130
#  return min_cluster_version
131
#
132
#def set_parameter(cur, parameter, value, timeout = 0):
133
#  sql = """alter system set {0} = '{1}'""".format(parameter, value)
134
#  logging.info(sql)
135
#  cur.execute(sql)
136
#  wait_parameter_sync(cur, False, parameter, value, timeout)
137
#
138
#def set_session_timeout(cur, seconds):
139
#  sql = "set @@session.ob_query_timeout = {0}".format(seconds * 1000 * 1000)
140
#  logging.info(sql)
141
#  cur.execute(sql)
142
#
143
#def set_default_timeout_by_tenant(cur, timeout, timeout_per_tenant, min_timeout):
144
#  if timeout > 0:
145
#    logging.info("use timeout from opt, timeout(s):{0}".format(timeout))
146
#  else:
147
#    query_cur = QueryCursor(cur)
148
#    tenant_id_list = fetch_tenant_ids(query_cur)
149
#    cal_timeout = len(tenant_id_list) * timeout_per_tenant
150
#    timeout = (cal_timeout if cal_timeout > min_timeout else min_timeout)
151
#    logging.info("use default timeout caculated by tenants, "
152
#                 "timeout(s):{0}, tenant_count:{1}, "
153
#                 "timeout_per_tenant(s):{2}, min_timeout(s):{3}"
154
#                 .format(timeout, len(tenant_id_list), timeout_per_tenant, min_timeout))
155
#
156
#  return timeout
157
#
158
#def set_tenant_parameter(cur, parameter, value, timeout = 0):
159
#
160
#  tenants_list = []
161
#  if get_min_cluster_version(cur) < get_version("4.2.1.0"):
162
#    tenants_list = ['all']
163
#  else:
164
#    tenants_list = ['sys', 'all_user', 'all_meta']
165
#
166
#  query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60)
167
#
168
#  set_session_timeout(cur, query_timeout)
169
#
170
#  for tenants in tenants_list:
171
#    sql = """alter system set {0} = '{1}' tenant = '{2}'""".format(parameter, value, tenants)
172
#    logging.info(sql)
173
#    cur.execute(sql)
174
#
175
#  set_session_timeout(cur, 10)
176
#
177
#  wait_parameter_sync(cur, True, parameter, value, timeout)
178
#
179
#def get_ori_enable_ddl(cur, timeout):
180
#  ori_value_str = fetch_ori_enable_ddl(cur)
181
#  wait_parameter_sync(cur, False, 'enable_ddl', ori_value_str, timeout)
182
#  ori_value = (ori_value_str == 'True')
183
#  return ori_value
184
#
185
#def fetch_ori_enable_ddl(cur):
186
#  ori_value = 'True'
187
#  sql = """select value from oceanbase.__all_sys_parameter where name = 'enable_ddl'"""
188
#
189
#  logging.info(sql)
190
#  cur.execute(sql)
191
#  result = cur.fetchall()
192
#
193
#  if len(result) == 0:
194
#    # means default value, is True
195
#    ori_value = 'True'
196
#  elif len(result) != 1 or len(result[0]) != 1:
197
#    logging.exception('result cnt not match')
198
#    raise e
199
#  elif result[0][0].lower() in ["1", "true", "on", "yes", 't']:
200
#    ori_value = 'True'
201
#  elif result[0][0].lower() in ["0", "false", "off", "no", 'f']:
202
#    ori_value = 'False'
203
#  else:
204
#    logging.exception("""result value is invalid, result:{0}""".format(result[0][0]))
205
#    raise e
206
#  return ori_value
207
#
208
## print version like "x.x.x.x"
209
#def print_version(version):
210
#  version = int(version)
211
#  major = (version >> 32) & 0xffffffff
212
#  minor = (version >> 16) & 0xffff
213
#  major_patch = (version >> 8) & 0xff
214
#  minor_patch = version & 0xff
215
#  version_str = "{0}.{1}.{2}.{3}".format(major, minor, major_patch, minor_patch)
216
#
217
## version str should like "x.x.x.x"
218
#def get_version(version_str):
219
#  versions = version_str.split(".")
220
#
221
#  if len(versions) != 4:
222
#    logging.exception("""version:{0} is invalid""".format(version_str))
223
#    raise e
224
#
225
#  major = int(versions[0])
226
#  minor = int(versions[1])
227
#  major_patch = int(versions[2])
228
#  minor_patch = int(versions[3])
229
#
230
#  if major > 0xffffffff or minor > 0xffff or major_patch > 0xff or minor_patch > 0xff:
231
#    logging.exception("""version:{0} is invalid""".format(version_str))
232
#    raise e
233
#
234
#  version = (major << 32) | (minor << 16) | (major_patch << 8) | (minor_patch)
235
#  return version
236
#
237
#def check_server_version_by_cluster(cur):
238
#  sql = """select distinct(substring_index(build_version, '_', 1)) from __all_server""";
239
#  logging.info(sql)
240
#  cur.execute(sql)
241
#  result = cur.fetchall()
242
#  if len(result) != 1:
243
#    raise MyError("servers build_version not match")
244
#  else:
245
#    logging.info("check server version success")
246
#
247
#def check_parameter(cur, is_tenant_config, key, value):
248
#  table_name = "GV$OB_PARAMETERS" if not is_tenant_config else "__all_virtual_tenant_parameter_info"
249
#  sql = """select * from oceanbase.{0}
250
#           where name = '{1}' and value = '{2}'""".format(table_name, key, value)
251
#  logging.info(sql)
252
#  cur.execute(sql)
253
#  result = cur.fetchall()
254
#  bret = False
255
#  if len(result) > 0:
256
#    bret = True
257
#  else:
258
#    bret = False
259
#  return bret
260
#
261
#def wait_parameter_sync(cur, is_tenant_config, key, value, timeout):
262
#  table_name = "GV$OB_PARAMETERS" if not is_tenant_config else "__all_virtual_tenant_parameter_info"
263
#  sql = """select count(*) as cnt from oceanbase.{0}
264
#           where name = '{1}' and value != '{2}'""".format(table_name, key, value)
265
#
266
#  wait_timeout = 0
267
#  query_timeout = 0
268
#  if not is_tenant_config or timeout > 0:
269
#    wait_timeout = (timeout if timeout > 0 else 60)
270
#    query_timeout = wait_timeout
271
#  else:
272
#    # is_tenant_config & timeout not set
273
#    wait_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60)
274
#    query_timeout = set_default_timeout_by_tenant(cur, timeout, 2, 60)
275
#
276
#  set_session_timeout(cur, query_timeout)
277
#
278
#  times = wait_timeout / 5
279
#  while times >= 0:
280
#    logging.info(sql)
281
#    cur.execute(sql)
282
#    result = cur.fetchall()
283
#    if len(result) != 1 or len(result[0]) != 1:
284
#      logging.exception('result cnt not match')
285
#      raise e
286
#    elif result[0][0] == 0:
287
#      logging.info("""{0} is sync, value is {1}""".format(key, value))
288
#      break
289
#    else:
290
#      logging.info("""{0} is not sync, value should be {1}""".format(key, value))
291
#
292
#    times -= 1
293
#    if times == -1:
294
#      logging.exception("""check {0}:{1} sync timeout""".format(key, value))
295
#      raise e
296
#    time.sleep(5)
297
#
298
#  set_session_timeout(cur, 10)
299
#
300
#def do_begin_upgrade(cur, timeout):
301
#
302
#  if not check_parameter(cur, False, "enable_upgrade_mode", "True"):
303
#    action_sql = "alter system begin upgrade"
304
#    rollback_sql = "alter system end upgrade"
305
#    logging.info(action_sql)
306
#
307
#    cur.execute(action_sql)
308
#
309
#    global g_succ_sql_list
310
#    g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
311
#
312
#  wait_parameter_sync(cur, False, "enable_upgrade_mode", "True", timeout)
313
#
314
#
315
#def do_begin_rolling_upgrade(cur, timeout):
316
#
317
#  if not check_parameter(cur, False, "_upgrade_stage", "DBUPGRADE"):
318
#    action_sql = "alter system begin rolling upgrade"
319
#    rollback_sql = "alter system end upgrade"
320
#
321
#    logging.info(action_sql)
322
#    cur.execute(action_sql)
323
#
324
#    global g_succ_sql_list
325
#    g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
326
#
327
#  wait_parameter_sync(cur, False, "_upgrade_stage", "DBUPGRADE", timeout)
328
#
329
#
330
#def do_end_rolling_upgrade(cur, timeout):
331
#
332
#  # maybe in upgrade_post_check stage or never run begin upgrade
333
#  if check_parameter(cur, False, "enable_upgrade_mode", "False"):
334
#    return
335
#
336
#  current_cluster_version = get_current_cluster_version()
337
#  if not check_parameter(cur, False, "_upgrade_stage", "POSTUPGRADE") or not check_parameter(cur, False, "min_observer_version", current_cluster_version):
338
#    action_sql = "alter system end rolling upgrade"
339
#    rollback_sql = "alter system end upgrade"
340
#
341
#    logging.info(action_sql)
342
#    cur.execute(action_sql)
343
#
344
#    global g_succ_sql_list
345
#    g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
346
#
347
#  wait_parameter_sync(cur, False, "min_observer_version", current_data_version, timeout)
348
#  wait_parameter_sync(cur, False, "_upgrade_stage", "POSTUPGRADE", timeout)
349
#
350
#
351
#def do_end_upgrade(cur, timeout):
352
#
353
#  if not check_parameter(cur, False, "enable_upgrade_mode", "False"):
354
#    action_sql = "alter system end upgrade"
355
#    rollback_sql = ""
356
#
357
#    logging.info(action_sql)
358
#    cur.execute(action_sql)
359
#
360
#    global g_succ_sql_list
361
#    g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
362
#
363
#  wait_parameter_sync(cur, False, "enable_upgrade_mode", "False", timeout)
364
#
365
#def do_suspend_merge(cur, timeout):
366
#  tenants_list = []
367
#  if get_min_cluster_version(cur) < get_version("4.2.1.0"):
368
#    tenants_list = ['all']
369
#  else:
370
#    tenants_list = ['sys', 'all_user', 'all_meta']
371
#
372
#  query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60)
373
#
374
#  set_session_timeout(cur, query_timeout)
375
#
376
#  for tenants in tenants_list:
377
#    action_sql = "alter system suspend merge tenant = {0}".format(tenants)
378
#    rollback_sql = "alter system resume merge tenant = {0}".format(tenants)
379
#    logging.info(action_sql)
380
#    cur.execute(action_sql)
381
#
382
#  set_session_timeout(cur, 10)
383
#
384
#def do_resume_merge(cur, timeout):
385
#  tenants_list = []
386
#  if get_min_cluster_version(cur) < get_version("4.2.1.0"):
387
#    tenants_list = ['all']
388
#  else:
389
#    tenants_list = ['sys', 'all_user', 'all_meta']
390
#
391
#  query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60)
392
#
393
#  set_session_timeout(cur, query_timeout)
394
#
395
#  for tenants in tenants_list:
396
#    action_sql = "alter system resume merge tenant = {0}".format(tenants)
397
#    rollback_sql = "alter system suspend merge tenant = {0}".format(tenants)
398
#    logging.info(action_sql)
399
#    cur.execute(action_sql)
400
#
401
#  set_session_timeout(cur, 10)
402
#
403
#class Cursor:
404
#  __cursor = None
405
#  def __init__(self, cursor):
406
#    self.__cursor = cursor
407
#  def exec_sql(self, sql, print_when_succ = True):
408
#    try:
409
#      self.__cursor.execute(sql)
410
#      rowcount = self.__cursor.rowcount
411
#      if True == print_when_succ:
412
#        logging.info('succeed to execute sql: %s, rowcount = %d', sql, rowcount)
413
#      return rowcount
414
#    except mysql.connector.Error, e:
415
#      logging.exception('mysql connector error, fail to execute sql: %s', sql)
416
#      raise e
417
#    except Exception, e:
418
#      logging.exception('normal error, fail to execute sql: %s', sql)
419
#      raise e
420
#  def exec_query(self, sql, print_when_succ = True):
421
#    try:
422
#      self.__cursor.execute(sql)
423
#      results = self.__cursor.fetchall()
424
#      rowcount = self.__cursor.rowcount
425
#      if True == print_when_succ:
426
#        logging.info('succeed to execute query: %s, rowcount = %d', sql, rowcount)
427
#      return (self.__cursor.description, results)
428
#    except mysql.connector.Error, e:
429
#      logging.exception('mysql connector error, fail to execute sql: %s', sql)
430
#      raise e
431
#    except Exception, e:
432
#      logging.exception('normal error, fail to execute sql: %s', sql)
433
#      raise e
434
#
435
#class DDLCursor:
436
#  _cursor = None
437
#  def __init__(self, cursor):
438
#    self._cursor = Cursor(cursor)
439
#  def exec_ddl(self, sql, print_when_succ = True):
440
#    try:
441
#      # 这里检查是不是ddl,不是ddl就抛错
442
#      check_is_ddl_sql(sql)
443
#      return self._cursor.exec_sql(sql, print_when_succ)
444
#    except Exception, e:
445
#      logging.exception('fail to execute ddl: %s', sql)
446
#      raise e
447
#
448
#class QueryCursor:
449
#  _cursor = None
450
#  def __init__(self, cursor):
451
#    self._cursor = Cursor(cursor)
452
#  def exec_query(self, sql, print_when_succ = True):
453
#    try:
454
#      # 这里检查是不是query,不是query就抛错
455
#      check_is_query_sql(sql)
456
#      return self._cursor.exec_query(sql, print_when_succ)
457
#    except Exception, e:
458
#      logging.exception('fail to execute dml query: %s', sql)
459
#      raise e
460
#
461
#class DMLCursor(QueryCursor):
462
#  def exec_update(self, sql, print_when_succ = True):
463
#    try:
464
#      # 这里检查是不是update,不是update就抛错
465
#      check_is_update_sql(sql)
466
#      return self._cursor.exec_sql(sql, print_when_succ)
467
#    except Exception, e:
468
#      logging.exception('fail to execute dml update: %s', sql)
469
#      raise e
470
#
471
#class BaseDDLAction():
472
#  __ddl_cursor = None
473
#  _query_cursor = None
474
#  def __init__(self, cursor):
475
#    self.__ddl_cursor = DDLCursor(cursor)
476
#    self._query_cursor = QueryCursor(cursor)
477
#  def do_action(self):
478
#    global g_succ_sql_list
479
#    action_sql = self.get_action_ddl()
480
#    rollback_sql = self.get_rollback_sql()
481
#    self.__ddl_cursor.exec_ddl(action_sql)
482
#    g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
483
#    # ddl马上就提交了,因此刷新g_commit_sql_list
484
#    refresh_commit_sql_list()
485
#
486
#class BaseDMLAction():
487
#  __dml_cursor = None
488
#  _query_cursor = None
489
#  def __init__(self, cursor):
490
#    self.__dml_cursor = DMLCursor(cursor)
491
#    self._query_cursor = QueryCursor(cursor)
492
#  def do_action(self):
493
#    global g_succ_sql_list
494
#    action_sql = self.get_action_dml()
495
#    rollback_sql = self.get_rollback_sql()
496
#    self.__dml_cursor.exec_update(action_sql)
497
#    g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
498
#
499
#class BaseEachTenantDMLAction():
500
#  __dml_cursor = None
501
#  _query_cursor = None
502
#  _tenant_id_list = None
503
#  _cursor = None
504
#  def __init__(self, cursor, tenant_id_list):
505
#    self.__dml_cursor = DMLCursor(cursor)
506
#    self._query_cursor = QueryCursor(cursor)
507
#    self._tenant_id_list = tenant_id_list
508
#    self._cursor = Cursor(cursor)
509
#  def get_tenant_id_list(self):
510
#    return self._tenant_id_list
511
#  def do_each_tenant_action(self, tenant_id):
512
#    global g_succ_sql_list
513
#    action_sql = self.get_each_tenant_action_dml(tenant_id)
514
#    rollback_sql = self.get_each_tenant_rollback_sql(tenant_id)
515
#    self.__dml_cursor.exec_update(action_sql)
516
#    g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
517
#
518
#class BaseEachTenantDDLAction():
519
#  __dml_cursor = None
520
#  _query_cursor = None
521
#  _tenant_id_list = None
522
#  _all_table_name = "__all_table"
523
#  def __init__(self, cursor, tenant_id_list):
524
#    self.__ddl_cursor = DDLCursor(cursor)
525
#    self._query_cursor = QueryCursor(cursor)
526
#    self._tenant_id_list = tenant_id_list
527
#  def get_tenant_id_list(self):
528
#    return self._tenant_id_list
529
#  def get_all_table_name(self):
530
#    return self._all_table_name
531
#  def set_all_table_name(self, table_name):
532
#    self._all_table_name = table_name
533
#  def do_each_tenant_action(self, tenant_id):
534
#    global g_succ_sql_list
535
#    action_sql = self.get_each_tenant_action_ddl(tenant_id)
536
#    rollback_sql = self.get_each_tenant_rollback_sql(tenant_id)
537
#    self.__ddl_cursor.exec_ddl(action_sql)
538
#    g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
539
#    # ddl马上就提交了,因此刷新g_commit_sql_list
540
#    refresh_commit_sql_list()
541
#
542
#def actions_cls_compare(x, y):
543
#  diff = x.get_seq_num() - y.get_seq_num()
544
#  if 0 == diff:
545
#    raise MyError('seq num is equal')
546
#  elif diff < 0:
547
#    return -1
548
#  else:
549
#    return 1
550
#
551
#def reflect_action_cls_list(action_module, action_name_prefix):
552
#  action_cls_list = []
553
#  cls_from_actions = dir(action_module)
554
#  for cls in cls_from_actions:
555
#    if cls.startswith(action_name_prefix):
556
#      action_cls = getattr(action_module, cls)
557
#      action_cls_list.append(action_cls)
558
#  action_cls_list.sort(actions_cls_compare)
559
#  return action_cls_list
560
#
561
#def fetch_observer_version(cur):
562
#  sql = """select distinct value from __all_virtual_sys_parameter_stat where name='min_observer_version'"""
563
#  logging.info(sql)
564
#  cur.execute(sql)
565
#  result = cur.fetchall()
566
#  if len(result) != 1:
567
#    raise MyError('query results count is not 1')
568
#  else:
569
#    logging.info('get observer version success, version = {0}'.format(result[0][0]))
570
#  return result[0][0]
571
#
572
#def fetch_tenant_ids(query_cur):
573
#  try:
574
#    tenant_id_list = []
575
#    (desc, results) = query_cur.exec_query("""select distinct tenant_id from oceanbase.__all_tenant order by tenant_id desc""")
576
#    for r in results:
577
#      tenant_id_list.append(r[0])
578
#    return tenant_id_list
579
#  except Exception, e:
580
#    logging.exception('fail to fetch distinct tenant ids')
581
#    raise e
582
#
583
####====XXXX======######==== I am a splitter ====######======XXXX====####
584
#filename:config.py
585
##!/usr/bin/env python
586
## -*- coding: utf-8 -*-
587
#
588
#pre_upgrade_log_filename = 'upgrade_pre.log'
589
#pre_upgrade_sql_filename = 'upgrade_sql_pre.txt'
590
#pre_upgrade_rollback_sql_filename = 'rollback_sql_pre.txt'
591
#
592
#post_upgrade_log_filename = 'upgrade_post.log'
593
#post_upgrade_sql_filename = 'upgrade_sql_post.txt'
594
#post_upgrade_rollback_sql_filename = 'rollback_sql_post.txt'
595
#
596
####====XXXX======######==== I am a splitter ====######======XXXX====####
597
#filename:do_upgrade_post.py
598
##!/usr/bin/env python
599
## -*- coding: utf-8 -*-
600
#
601
#from my_error import MyError
602
#import sys
603
#import mysql.connector
604
#from mysql.connector import errorcode
605
#import logging
606
#import json
607
#import config
608
#import opts
609
#import run_modules
610
#import actions
611
#import upgrade_health_checker
612
#import tenant_upgrade_action
613
#import upgrade_post_checker
614
#
615
## 由于用了/*+read_consistency(WEAK) */来查询,因此升级期间不能允许创建或删除租户
616
#
617
#class UpgradeParams:
618
#  log_filename = config.post_upgrade_log_filename
619
#  sql_dump_filename = config.post_upgrade_sql_filename
620
#  rollback_sql_filename =  config.post_upgrade_rollback_sql_filename
621
#
622
#def config_logging_module(log_filenamme):
623
#  logging.basicConfig(level=logging.INFO,\
624
#      format='[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s',\
625
#      datefmt='%Y-%m-%d %H:%M:%S',\
626
#      filename=log_filenamme,\
627
#      filemode='w')
628
#  # 定义日志打印格式
629
#  formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s', '%Y-%m-%d %H:%M:%S')
630
#  #######################################
631
#  # 定义一个Handler打印INFO及以上级别的日志到sys.stdout
632
#  stdout_handler = logging.StreamHandler(sys.stdout)
633
#  stdout_handler.setLevel(logging.INFO)
634
#  # 设置日志打印格式
635
#  stdout_handler.setFormatter(formatter)
636
#  # 将定义好的stdout_handler日志handler添加到root logger
637
#  logging.getLogger('').addHandler(stdout_handler)
638
#
639
#def print_stats():
640
#  logging.info('==================================================================================')
641
#  logging.info('============================== STATISTICS BEGIN ==================================')
642
#  logging.info('==================================================================================')
643
#  logging.info('succeed run sql(except sql of special actions): \n\n%s\n', actions.get_succ_sql_list_str())
644
#  logging.info('commited sql(except sql of special actions): \n\n%s\n', actions.get_commit_sql_list_str())
645
#  logging.info('==================================================================================')
646
#  logging.info('=============================== STATISTICS END ===================================')
647
#  logging.info('==================================================================================')
648
#
649
#def do_upgrade(my_host, my_port, my_user, my_passwd, timeout, my_module_set, upgrade_params):
650
#  try:
651
#    conn = mysql.connector.connect(user = my_user,
652
#                                   password = my_passwd,
653
#                                   host = my_host,
654
#                                   port = my_port,
655
#                                   database = 'oceanbase',
656
#                                   raise_on_warnings = True)
657
#    cur = conn.cursor(buffered=True)
658
#    try:
659
#      query_cur = actions.QueryCursor(cur)
660
#      actions.check_server_version_by_cluster(cur)
661
#      conn.commit()
662
#
663
#      if run_modules.MODULE_HEALTH_CHECK in my_module_set:
664
#        logging.info('================begin to run health check action ===============')
665
#        upgrade_health_checker.do_check(my_host, my_port, my_user, my_passwd, upgrade_params, timeout, False)  # need_check_major_status = False
666
#        logging.info('================succeed to run health check action ===============')
667
#
668
#      if run_modules.MODULE_END_ROLLING_UPGRADE in my_module_set:
669
#        logging.info('================begin to run end rolling upgrade action ===============')
670
#        conn.autocommit = True
671
#        actions.do_end_rolling_upgrade(cur, timeout)
672
#        conn.autocommit = False
673
#        actions.refresh_commit_sql_list()
674
#        logging.info('================succeed to run end rolling upgrade action ===============')
675
#
676
#      if run_modules.MODULE_TENANT_UPRADE in my_module_set:
677
#        logging.info('================begin to run tenant upgrade action ===============')
678
#        conn.autocommit = True
679
#        tenant_upgrade_action.do_upgrade(conn, cur, timeout, my_user, my_passwd)
680
#        conn.autocommit = False
681
#        actions.refresh_commit_sql_list()
682
#        logging.info('================succeed to run tenant upgrade action ===============')
683
#
684
#      if run_modules.MODULE_END_UPRADE in my_module_set:
685
#        logging.info('================begin to run end upgrade action ===============')
686
#        conn.autocommit = True
687
#        actions.do_end_upgrade(cur, timeout)
688
#        conn.autocommit = False
689
#        actions.refresh_commit_sql_list()
690
#        logging.info('================succeed to run end upgrade action ===============')
691
#
692
#      if run_modules.MODULE_POST_CHECK in my_module_set:
693
#        logging.info('================begin to run post check action ===============')
694
#        conn.autocommit = True
695
#        upgrade_post_checker.do_check(conn, cur, query_cur, timeout)
696
#        conn.autocommit = False
697
#        actions.refresh_commit_sql_list()
698
#        logging.info('================succeed to run post check action ===============')
699
#
700
#    except Exception, e:
701
#      logging.exception('run error')
702
#      raise e
703
#    finally:
704
#      # 打印统计信息
705
#      print_stats()
706
#      # 将回滚sql写到文件中
707
#      # actions.dump_rollback_sql_to_file(upgrade_params.rollback_sql_filename)
708
#      cur.close()
709
#      conn.close()
710
#  except mysql.connector.Error, e:
711
#    logging.exception('connection error')
712
#    raise e
713
#  except Exception, e:
714
#    logging.exception('normal error')
715
#    raise e
716
#
717
#def do_upgrade_by_argv(argv):
718
#  upgrade_params = UpgradeParams()
719
#  opts.change_opt_defult_value('log-file', upgrade_params.log_filename)
720
#  opts.parse_options(argv)
721
#  if not opts.has_no_local_opts():
722
#    opts.deal_with_local_opts('upgrade_post')
723
#  else:
724
#    opts.check_db_client_opts()
725
#    log_filename = opts.get_opt_log_file()
726
#    upgrade_params.log_filename = log_filename
727
#    # 日志配置放在这里是为了前面的操作不要覆盖掉日志文件
728
#    config_logging_module(upgrade_params.log_filename)
729
#    try:
730
#      host = opts.get_opt_host()
731
#      port = int(opts.get_opt_port())
732
#      user = opts.get_opt_user()
733
#      password = opts.get_opt_password()
734
#      timeout = int(opts.get_opt_timeout())
735
#      cmd_module_str = opts.get_opt_module()
736
#      module_set = set([])
737
#      all_module_set = run_modules.get_all_module_set()
738
#      cmd_module_list = cmd_module_str.split(',')
739
#      for cmd_module in cmd_module_list:
740
#        if run_modules.ALL_MODULE == cmd_module:
741
#          module_set = module_set | all_module_set
742
#        elif cmd_module in all_module_set:
743
#          module_set.add(cmd_module)
744
#        else:
745
#          raise MyError('invalid module: {0}'.format(cmd_module))
746
#      logging.info('parameters from cmd: host=\"%s\", port=%s, user=\"%s\", password=\"%s\", timeout=\"%s\", module=\"%s\", log-file=\"%s\"',\
747
#          host, port, user, password, timeout, module_set, log_filename)
748
#      do_upgrade(host, port, user, password, timeout, module_set, upgrade_params)
749
#    except mysql.connector.Error, e:
750
#      logging.exception('mysql connctor error')
751
#      logging.exception('run error, maybe you can reference ' + upgrade_params.rollback_sql_filename + ' to rollback it')
752
#      raise e
753
#    except Exception, e:
754
#      logging.exception('normal error')
755
#      logging.exception('run error, maybe you can reference ' + upgrade_params.rollback_sql_filename + ' to rollback it')
756
#      raise e
757
#
758
#
759
#
760
####====XXXX======######==== I am a splitter ====######======XXXX====####
761
#filename:do_upgrade_pre.py
762
##!/usr/bin/env python
763
## -*- coding: utf-8 -*-
764
#
765
#from my_error import MyError
766
#import sys
767
#import mysql.connector
768
#from mysql.connector import errorcode
769
#import logging
770
#
771
#import config
772
#import opts
773
#import run_modules
774
#import actions
775
#import special_upgrade_action_pre
776
#import upgrade_health_checker
777
#
778
## 由于用了/*+read_consistency(WEAK) */来查询,因此升级期间不能允许创建或删除租户
779
#
780
#class UpgradeParams:
781
#  log_filename = config.pre_upgrade_log_filename
782
#  sql_dump_filename = config.pre_upgrade_sql_filename
783
#  rollback_sql_filename = config.pre_upgrade_rollback_sql_filename
784
#
785
#def config_logging_module(log_filenamme):
786
#  logging.basicConfig(level=logging.INFO,\
787
#      format='[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s',\
788
#      datefmt='%Y-%m-%d %H:%M:%S',\
789
#      filename=log_filenamme,\
790
#      filemode='w')
791
#  # 定义日志打印格式
792
#  formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s', '%Y-%m-%d %H:%M:%S')
793
#  #######################################
794
#  # 定义一个Handler打印INFO及以上级别的日志到sys.stdout
795
#  stdout_handler = logging.StreamHandler(sys.stdout)
796
#  stdout_handler.setLevel(logging.INFO)
797
#  # 设置日志打印格式
798
#  stdout_handler.setFormatter(formatter)
799
#  # 将定义好的stdout_handler日志handler添加到root logger
800
#  logging.getLogger('').addHandler(stdout_handler)
801
#
802
#def print_stats():
803
#  logging.info('==================================================================================')
804
#  logging.info('============================== STATISTICS BEGIN ==================================')
805
#  logging.info('==================================================================================')
806
#  logging.info('succeed run sql(except sql of special actions): \n\n%s\n', actions.get_succ_sql_list_str())
807
#  logging.info('commited sql(except sql of special actions): \n\n%s\n', actions.get_commit_sql_list_str())
808
#  logging.info('==================================================================================')
809
#  logging.info('=============================== STATISTICS END ===================================')
810
#  logging.info('==================================================================================')
811
#
812
#def do_upgrade(my_host, my_port, my_user, my_passwd, timeout, my_module_set, upgrade_params):
813
#  try:
814
#    conn = mysql.connector.connect(user = my_user,
815
#                                   password = my_passwd,
816
#                                   host = my_host,
817
#                                   port = my_port,
818
#                                   database = 'oceanbase',
819
#                                   raise_on_warnings = True)
820
#    cur = conn.cursor(buffered=True)
821
#    try:
822
#      query_cur = actions.QueryCursor(cur)
823
#      actions.check_server_version_by_cluster(cur)
824
#
825
#      if run_modules.MODULE_BEGIN_UPGRADE in my_module_set:
826
#        logging.info('================begin to run begin upgrade action===============')
827
#        conn.autocommit = True
828
#        actions.do_begin_upgrade(cur, timeout)
829
#        conn.autocommit = False
830
#        actions.refresh_commit_sql_list()
831
#        logging.info('================succeed to run begin upgrade action===============')
832
#
833
#      if run_modules.MODULE_BEGIN_ROLLING_UPGRADE in my_module_set:
834
#        logging.info('================begin to run begin rolling upgrade action===============')
835
#        conn.autocommit = True
836
#        actions.do_begin_rolling_upgrade(cur, timeout)
837
#        conn.autocommit = False
838
#        actions.refresh_commit_sql_list()
839
#        logging.info('================succeed to run begin rolling upgrade action===============')
840
#
841
#      if run_modules.MODULE_SPECIAL_ACTION in my_module_set:
842
#        logging.info('================begin to run special action===============')
843
#        conn.autocommit = True
844
#        special_upgrade_action_pre.do_special_upgrade(conn, cur, timeout, my_user, my_passwd)
845
#        conn.autocommit = False
846
#        actions.refresh_commit_sql_list()
847
#        logging.info('================succeed to run special action===============')
848
#
849
#      if run_modules.MODULE_HEALTH_CHECK in my_module_set:
850
#        logging.info('================begin to run health check action ===============')
851
#        upgrade_health_checker.do_check(my_host, my_port, my_user, my_passwd, upgrade_params, timeout, True) # need_check_major_status = True
852
#        logging.info('================succeed to run health check action ===============')
853
#
854
#    except Exception, e:
855
#      logging.exception('run error')
856
#      raise e
857
#    finally:
858
#      # 打印统计信息
859
#      print_stats()
860
#      # 将回滚sql写到文件中
861
#      # actions.dump_rollback_sql_to_file(upgrade_params.rollback_sql_filename)
862
#      cur.close()
863
#      conn.close()
864
#  except mysql.connector.Error, e:
865
#    logging.exception('connection error')
866
#    raise e
867
#  except Exception, e:
868
#    logging.exception('normal error')
869
#    raise e
870
#
871
#def do_upgrade_by_argv(argv):
872
#  upgrade_params = UpgradeParams()
873
#  opts.change_opt_defult_value('log-file', upgrade_params.log_filename)
874
#  opts.parse_options(argv)
875
#  if not opts.has_no_local_opts():
876
#    opts.deal_with_local_opts('upgrade_pre')
877
#  else:
878
#    opts.check_db_client_opts()
879
#    log_filename = opts.get_opt_log_file()
880
#    upgrade_params.log_filename = log_filename
881
#    # 日志配置放在这里是为了前面的操作不要覆盖掉日志文件
882
#    config_logging_module(upgrade_params.log_filename)
883
#    try:
884
#      host = opts.get_opt_host()
885
#      port = int(opts.get_opt_port())
886
#      user = opts.get_opt_user()
887
#      password = opts.get_opt_password()
888
#      timeout = int(opts.get_opt_timeout())
889
#      cmd_module_str = opts.get_opt_module()
890
#      module_set = set([])
891
#      all_module_set = run_modules.get_all_module_set()
892
#      cmd_module_list = cmd_module_str.split(',')
893
#      for cmd_module in cmd_module_list:
894
#        if run_modules.ALL_MODULE == cmd_module:
895
#          module_set = module_set | all_module_set
896
#        elif cmd_module in all_module_set:
897
#          module_set.add(cmd_module)
898
#        else:
899
#          raise MyError('invalid module: {0}'.format(cmd_module))
900
#      logging.info('parameters from cmd: host=\"%s\", port=%s, user=\"%s\", password=\"%s\", timeout=\"%s\", module=\"%s\", log-file=\"%s\"',\
901
#          host, port, user, password, timeout, module_set, log_filename)
902
#      do_upgrade(host, port, user, password, timeout, module_set, upgrade_params)
903
#    except mysql.connector.Error, e:
904
#      logging.exception('mysql connctor error')
905
#      logging.exception('run error, maybe you can reference ' + upgrade_params.rollback_sql_filename + ' to rollback it')
906
#      raise e
907
#    except Exception, e:
908
#      logging.exception('normal error')
909
#      logging.exception('run error, maybe you can reference ' + upgrade_params.rollback_sql_filename + ' to rollback it')
910
#      raise e
911
#
912
#
913
#
914
####====XXXX======######==== I am a splitter ====######======XXXX====####
915
#filename:my_error.py
916
##!/usr/bin/env python
917
## -*- coding: utf-8 -*-
918
#
919
#class MyError(Exception):
920
#  def __init__(self, value):
921
#    self.value = value
922
#  def __str__(self):
923
#    return repr(self.value)
924
####====XXXX======######==== I am a splitter ====######======XXXX====####
925
#filename:my_utils.py
926
##!/usr/bin/env python
927
## -*- coding: utf-8 -*-
928
#
929
#import mysql.connector
930
#from mysql.connector import errorcode
931
#from my_error import MyError
932
#from actions import QueryCursor
933
#import logging
934
#
935
#def results_to_str(desc, results):
936
#  ret_str = ''
937
#  max_width_list = []
938
#  for col_desc in desc:
939
#    max_width_list.append(len(str(col_desc[0])))
940
#  col_count = len(max_width_list)
941
#  for result in results:
942
#    if col_count != len(result):
943
#      raise MyError('column count is not equal, desc column count: {0}, data column count: {1}'.format(col_count, len(result)))
944
#    for i in range(0, col_count):
945
#      result_col_width = len(str(result[i]))
946
#      if max_width_list[i] < result_col_width:
947
#        max_width_list[i] = result_col_width
948
#  # 打印列名
949
#  for i in range(0, col_count):
950
#    if i > 0:
951
#      ret_str += '    ' # 空四格
952
#    ret_str += str(desc[i][0])
953
#    # 补足空白
954
#    for j in range(0, max_width_list[i] - len(str(desc[i][0]))):
955
#      ret_str += ' '
956
#  # 打印数据
957
#  for result in results:
958
#    ret_str += '\n' # 先换行
959
#    for i in range(0, col_count):
960
#      if i > 0:
961
#        ret_str += '    ' # 空四格
962
#      ret_str += str(result[i])
963
#      # 补足空白
964
#      for j in range(0, max_width_list[i] - len(str(result[i]))):
965
#        ret_str += ' '
966
#  return ret_str
967
#
968
#def query_and_dump_results(query_cur, sql):
969
#  (desc, results) = query_cur.exec_query(sql)
970
#  result_str = results_to_str(desc, results)
971
#  logging.info('dump query results, sql: %s, results:\n%s', sql, result_str)
972
#
973
####====XXXX======######==== I am a splitter ====######======XXXX====####
974
#filename:opts.py
975
##!/usr/bin/env python
976
## -*- coding: utf-8 -*-
977
#
978
#from my_error import MyError
979
#import sys
980
#import os
981
#import getopt
982
#
983
#pre_help_str = \
984
#"""
985
#Help:
986
#""" +\
987
#sys.argv[0] + """ [OPTIONS]""" +\
988
#'\n\n' +\
989
#'-I, --help          Display this help and exit.\n' +\
990
#'-V, --version       Output version information and exit.\n' +\
991
#'-h, --host=name     Connect to host.\n' +\
992
#'-P, --port=name     Port number to use for connection.\n' +\
993
#'-u, --user=name     User for login.\n' +\
994
#'-p, --password=name Password to use when connecting to server. If password is\n' +\
995
#'                    not given it\'s empty string "".\n' +\
996
#'-t, --timeout=name  Cmd/Query/Inspection execute timeout(s).\n' +\
997
#'-m, --module=name   Modules to run. Modules should be a string combined by some of\n' +\
998
#'                    the following strings:\n' +\
999
#'                    1. begin_upgrade \n' +\
1000
#'                    2. begin_rolling_upgrade \n' +\
1001
#'                    3. special_action \n' +\
1002
#'                    4. health_check \n' +\
1003
#'                    5. all: default value, run all sub modules above \n' +\
1004
#'                    that all modules should be run. They are splitted by ",".\n' +\
1005
#'                    For example: -m all, or --module=begin_upgrade,begin_rolling_upgrade,special_action\n' +\
1006
#'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\
1007
#'\n\n' +\
1008
#'Maybe you want to run cmd like that:\n' +\
1009
#sys.argv[0] + ' -h 127.0.0.1 -P 3306 -u admin -p admin\n'
1010
#
1011
#post_help_str = \
1012
#"""
1013
#Help:
1014
#""" +\
1015
#sys.argv[0] + """ [OPTIONS]""" +\
1016
#'\n\n' +\
1017
#'-I, --help          Display this help and exit.\n' +\
1018
#'-V, --version       Output version information and exit.\n' +\
1019
#'-h, --host=name     Connect to host.\n' +\
1020
#'-P, --port=name     Port number to use for connection.\n' +\
1021
#'-u, --user=name     User for login.\n' +\
1022
#'-p, --password=name Password to use when connecting to server. If password is\n' +\
1023
#'                    not given it\'s empty string "".\n' +\
1024
#'-t, --timeout=name  Cmd/Query/Inspection execute timeout(s).\n' +\
1025
#'-m, --module=name   Modules to run. Modules should be a string combined by some of\n' +\
1026
#'                    the following strings:\n' +\
1027
#'                    1. health_check \n' +\
1028
#'                    2. end_rolling_upgrade \n' +\
1029
#'                    3. tenant_upgrade \n' +\
1030
#'                    4. end_upgrade \n' +\
1031
#'                    5. post_check \n' +\
1032
#'                    6. all: default value, run all sub modules above \n' +\
1033
#'                    that all modules should be run. They are splitted by ",".\n' +\
1034
#'                    For example: -m all, or --module=health_check,end_rolling_upgrade\n' +\
1035
#'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\
1036
#'\n\n' +\
1037
#'Maybe you want to run cmd like that:\n' +\
1038
#sys.argv[0] + ' -h 127.0.0.1 -P 3306 -u admin -p admin\n'
1039
#
1040
#version_str = """version 1.0.0"""
1041
#
1042
#class Option:
1043
#  __g_short_name_set = set([])
1044
#  __g_long_name_set = set([])
1045
#  __short_name = None
1046
#  __long_name = None
1047
#  __is_with_param = None
1048
#  __is_local_opt = None
1049
#  __has_value = None
1050
#  __value = None
1051
#  def __init__(self, short_name, long_name, is_with_param, is_local_opt, default_value = None):
1052
#    if short_name in Option.__g_short_name_set:
1053
#      raise MyError('duplicate option short name: {0}'.format(short_name))
1054
#    elif long_name in Option.__g_long_name_set:
1055
#      raise MyError('duplicate option long name: {0}'.format(long_name))
1056
#    Option.__g_short_name_set.add(short_name)
1057
#    Option.__g_long_name_set.add(long_name)
1058
#    self.__short_name = short_name
1059
#    self.__long_name = long_name
1060
#    self.__is_with_param = is_with_param
1061
#    self.__is_local_opt = is_local_opt
1062
#    self.__has_value = False
1063
#    if None != default_value:
1064
#      self.set_value(default_value)
1065
#  def is_with_param(self):
1066
#    return self.__is_with_param
1067
#  def get_short_name(self):
1068
#    return self.__short_name
1069
#  def get_long_name(self):
1070
#    return self.__long_name
1071
#  def has_value(self):
1072
#    return self.__has_value
1073
#  def get_value(self):
1074
#    return self.__value
1075
#  def set_value(self, value):
1076
#    self.__value = value
1077
#    self.__has_value = True
1078
#  def is_local_opt(self):
1079
#    return self.__is_local_opt
1080
#  def is_valid(self):
1081
#    return None != self.__short_name and None != self.__long_name and True == self.__has_value and None != self.__value
1082
#
1083
#g_opts =\
1084
#[\
1085
#Option('I', 'help', False, True),\
1086
#Option('V', 'version', False, True),\
1087
#Option('h', 'host', True, False),\
1088
#Option('P', 'port', True, False),\
1089
#Option('u', 'user', True, False),\
1090
#Option('t', 'timeout', True, False, 0),\
1091
#Option('p', 'password', True, False, ''),\
1092
## 要跑哪个模块,默认全跑
1093
#Option('m', 'module', True, False, 'all'),\
1094
## 日志文件路径,不同脚本的main函数中中会改成不同的默认值
1095
#Option('l', 'log-file', True, False)
1096
#]\
1097
#
1098
#def change_opt_defult_value(opt_long_name, opt_default_val):
1099
#  global g_opts
1100
#  for opt in g_opts:
1101
#    if opt.get_long_name() == opt_long_name:
1102
#      opt.set_value(opt_default_val)
1103
#      return
1104
#
1105
#def has_no_local_opts():
1106
#  global g_opts
1107
#  no_local_opts = True
1108
#  for opt in g_opts:
1109
#    if opt.is_local_opt() and opt.has_value():
1110
#      no_local_opts = False
1111
#  return no_local_opts
1112
#
1113
#def check_db_client_opts():
1114
#  global g_opts
1115
#  for opt in g_opts:
1116
#    if not opt.is_local_opt() and not opt.has_value():
1117
#      raise MyError('option "-{0}" has not been specified, maybe you should run "{1} --help" for help'\
1118
#          .format(opt.get_short_name(), sys.argv[0]))
1119
#
1120
#def parse_option(opt_name, opt_val):
1121
#  global g_opts
1122
#  for opt in g_opts:
1123
#    if opt_name in (('-' + opt.get_short_name()), ('--' + opt.get_long_name())):
1124
#      opt.set_value(opt_val)
1125
#
1126
#def parse_options(argv):
1127
#  global g_opts
1128
#  short_opt_str = ''
1129
#  long_opt_list = []
1130
#  for opt in g_opts:
1131
#    if opt.is_with_param():
1132
#      short_opt_str += opt.get_short_name() + ':'
1133
#    else:
1134
#      short_opt_str += opt.get_short_name()
1135
#  for opt in g_opts:
1136
#    if opt.is_with_param():
1137
#      long_opt_list.append(opt.get_long_name() + '=')
1138
#    else:
1139
#      long_opt_list.append(opt.get_long_name())
1140
#  (opts, args) = getopt.getopt(argv, short_opt_str, long_opt_list)
1141
#  for (opt_name, opt_val) in opts:
1142
#    parse_option(opt_name, opt_val)
1143
#  if has_no_local_opts():
1144
#    check_db_client_opts()
1145
#
1146
#def deal_with_local_opt(opt, filename):
1147
#  if 'help' == opt.get_long_name():
1148
#    if 'upgrade_pre' == filename:
1149
#      global pre_help_str
1150
#      print pre_help_str
1151
#    elif 'upgrade_post' == filename:
1152
#      global post_help_str
1153
#      print post_help_str
1154
#    else:
1155
#            raise MyError('not supported filename:{0} for help option'.format(filename))
1156
#  elif 'version' == opt.get_long_name():
1157
#    global version_str
1158
#    print version_str
1159
#
1160
#def deal_with_local_opts(filename):
1161
#  global g_opts
1162
#  if has_no_local_opts():
1163
#    raise MyError('no local options, can not deal with local options')
1164
#  else:
1165
#    for opt in g_opts:
1166
#      if opt.is_local_opt() and opt.has_value():
1167
#        deal_with_local_opt(opt, filename)
1168
#        # 只处理一个
1169
#        return
1170
#
1171
#def get_opt_host():
1172
#  global g_opts
1173
#  for opt in g_opts:
1174
#    if 'host' == opt.get_long_name():
1175
#      return opt.get_value()
1176
#
1177
#def get_opt_port():
1178
#  global g_opts
1179
#  for opt in g_opts:
1180
#    if 'port' == opt.get_long_name():
1181
#      return opt.get_value()
1182
#
1183
#def get_opt_user():
1184
#  global g_opts
1185
#  for opt in g_opts:
1186
#    if 'user' == opt.get_long_name():
1187
#      return opt.get_value()
1188
#
1189
#def get_opt_password():
1190
#  global g_opts
1191
#  for opt in g_opts:
1192
#    if 'password' == opt.get_long_name():
1193
#      return opt.get_value()
1194
#
1195
#def get_opt_timeout():
1196
#  global g_opts
1197
#  for opt in g_opts:
1198
#    if 'timeout' == opt.get_long_name():
1199
#      return opt.get_value()
1200
#
1201
#def get_opt_module():
1202
#  global g_opts
1203
#  for opt in g_opts:
1204
#    if 'module' == opt.get_long_name():
1205
#      return opt.get_value()
1206
#
1207
#def get_opt_log_file():
1208
#  global g_opts
1209
#  for opt in g_opts:
1210
#    if 'log-file' == opt.get_long_name():
1211
#      return opt.get_value()
1212
#
1213
##parse_options(sys.argv[1:])
1214
#
1215
####====XXXX======######==== I am a splitter ====######======XXXX====####
1216
#filename:reset_upgrade_scripts.py
1217
##!/usr/bin/env python
1218
## -*- coding: utf-8 -*-
1219
#
1220
#import os
1221
#
1222
#def clear_action_codes(action_filename_list, action_begin_line, \
1223
#    action_end_line, is_special_upgrade_code):
1224
#  char_enter = '\n'
1225
#  for action_filename in action_filename_list:
1226
#    new_action_file_lines = []
1227
#    action_file = open(action_filename, 'r')
1228
#    action_file_lines = action_file.readlines()
1229
#    is_action_codes = False
1230
#    for action_file_line in action_file_lines:
1231
#      if is_action_codes and action_file_line == (action_end_line + char_enter):
1232
#        is_action_codes = False
1233
#      if not is_action_codes:
1234
#        new_action_file_lines.append(action_file_line)
1235
#      if not is_action_codes and action_file_line == (action_begin_line + char_enter):
1236
#        is_action_codes = True
1237
#    action_file.close()
1238
#    new_action_file = open(action_filename, 'w')
1239
#    for new_action_file_line in new_action_file_lines:
1240
#      if is_special_upgrade_code:
1241
#        if new_action_file_line == (action_end_line + char_enter):
1242
#          new_action_file.write('  return\n')
1243
#      new_action_file.write(new_action_file_line)
1244
#    new_action_file.close()
1245
#
1246
#def regenerate_upgrade_script():
1247
#  print('\n=========run gen_upgrade_scripts.py, begin=========\n')
1248
#  info = os.popen('./gen_upgrade_scripts.py;')
1249
#  print(info.read())
1250
#  print('\n=========run gen_upgrade_scripts.py, end=========\n')
1251
#
1252
#if __name__ == '__main__':
1253
#  action_begin_line = '####========******####======== actions begin ========####******========####'
1254
#  action_end_line = '####========******####========= actions end =========####******========####'
1255
#  action_filename_list = \
1256
#      [\
1257
#      'normal_ddl_actions_pre.py',\
1258
#      'normal_ddl_actions_post.py',\
1259
#      'normal_dml_actions_pre.py',\
1260
#      'normal_dml_actions_post.py',\
1261
#      'each_tenant_dml_actions_pre.py',\
1262
#      'each_tenant_dml_actions_post.py',\
1263
#      'each_tenant_ddl_actions_post.py'\
1264
#      ]
1265
#  special_upgrade_filename_list = \
1266
#      [\
1267
#      'special_upgrade_action_pre.py',\
1268
#      'special_upgrade_action_post.py'
1269
#      ]
1270
#  clear_action_codes(action_filename_list, action_begin_line, action_end_line, False)
1271
#  clear_action_codes(special_upgrade_filename_list, action_begin_line, action_end_line, True)
1272
#  regenerate_upgrade_script()
1273
#
1274
#
1275
####====XXXX======######==== I am a splitter ====######======XXXX====####
1276
#filename:run_modules.py
1277
##!/usr/bin/env python
1278
## -*- coding: utf-8 -*-
1279
#
1280
#ALL_MODULE = 'all'
1281
#
1282
## module for upgrade_pre.py
1283
#MODULE_BEGIN_UPGRADE          = 'begin_upgrade'
1284
#MODULE_BEGIN_ROLLING_UPGRADE  = 'begin_rolling_upgrade'
1285
#MODULE_SPECIAL_ACTION         = 'special_action'
1286
##MODULE_HEALTH_CHECK          = 'health_check'
1287
#
1288
## module for upgrade_post.py
1289
#MODULE_HEALTH_CHECK           = 'health_check'
1290
#MODULE_END_ROLLING_UPGRADE    = 'end_rolling_upgrade'
1291
#MODULE_TENANT_UPRADE          = 'tenant_upgrade'
1292
#MODULE_END_UPRADE             = 'end_upgrade'
1293
#MODULE_POST_CHECK             = 'post_check'
1294
#
1295
#def get_all_module_set():
1296
#  import run_modules
1297
#  module_set = set([])
1298
#  attrs_from_run_module = dir(run_modules)
1299
#  for attr in attrs_from_run_module:
1300
#    if attr.startswith('MODULE_'):
1301
#      module = getattr(run_modules, attr)
1302
#      module_set.add(module)
1303
#  return module_set
1304
#
1305
####====XXXX======######==== I am a splitter ====######======XXXX====####
1306
#filename:special_upgrade_action_pre.py
1307
##!/usr/bin/env python
1308
## -*- coding: utf-8 -*-
1309
#
1310
#from my_error import MyError
1311
#import time
1312
#import mysql.connector
1313
#from mysql.connector import errorcode
1314
#import logging
1315
#import re
1316
#import string
1317
#from random import Random
1318
#from actions import DMLCursor
1319
#from actions import QueryCursor
1320
#import binascii
1321
#import my_utils
1322
#import actions
1323
#import sys
1324
#import upgrade_health_checker
1325
#
1326
## 主库需要执行的升级动作
1327
#def do_special_upgrade(conn, cur, timeout, user, passwd):
1328
#  # special upgrade action
1329
##升级语句对应的action要写在下面的actions begin和actions end这两行之间,
1330
##因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
1331
##这两行之间的这些代码,如果不写在这两行之间的话会导致清空不掉相应的代码。
1332
#  current_version = actions.fetch_observer_version(cur)
1333
#  target_version = actions.get_current_cluster_version()
1334
#  # when upgrade across version, disable enable_ddl/major_freeze
1335
#  if current_version != target_version:
1336
#    actions.set_parameter(cur, 'enable_ddl', 'False', timeout)
1337
#    actions.set_parameter(cur, 'enable_major_freeze', 'False', timeout)
1338
#    actions.set_tenant_parameter(cur, '_enable_adaptive_compaction', 'False', timeout)
1339
#    # wait scheduler in storage to notice adaptive_compaction is switched to false
1340
#    time.sleep(60 * 2)
1341
#    query_cur = actions.QueryCursor(cur)
1342
#    wait_major_timeout = 600
1343
#    upgrade_health_checker.check_major_merge(query_cur, wait_major_timeout)
1344
#    actions.do_suspend_merge(cur, timeout)
1345
#  # When upgrading from a version prior to 4.2 to version 4.2, the bloom_filter should be disabled.
1346
#  # The param _bloom_filter_enabled is no longer in use as of version 4.2, there is no need to enable it again.
1347
#  if actions.get_version(current_version) < actions.get_version('4.2.0.0')\
1348
#      and actions.get_version(target_version) >= actions.get_version('4.2.0.0'):
1349
#    actions.set_tenant_parameter(cur, '_bloom_filter_enabled', 'False', timeout)
1350
#
1351
#####========******####======== actions begin ========####******========####
1352
#  return
1353
#####========******####========= actions end =========####******========####
1354
#
1355
#def query(cur, sql):
1356
#  log(sql)
1357
#  cur.execute(sql)
1358
#  results = cur.fetchall()
1359
#  return results
1360
#
1361
#def log(msg):
1362
#  logging.info(msg)
1363
#
1364
#def get_oracle_tenant_ids(cur):
1365
#  return [_[0] for _ in query(cur, 'select tenant_id from oceanbase.__all_tenant where compatibility_mode = 1')]
1366
#
1367
#def get_tenant_ids(cur):
1368
#  return [_[0] for _ in query(cur, 'select tenant_id from oceanbase.__all_tenant')]
1369
#
1370
####====XXXX======######==== I am a splitter ====######======XXXX====####
1371
#filename:tenant_upgrade_action.py
1372
##!/usr/bin/env python
1373
## -*- coding: utf-8 -*-
1374
#
1375
#import logging
1376
#import time
1377
#from actions import Cursor
1378
#from actions import DMLCursor
1379
#from actions import QueryCursor
1380
#import mysql.connector
1381
#from mysql.connector import errorcode
1382
#import actions
1383
#
1384
#def do_upgrade(conn, cur, timeout, user, pwd):
1385
#  # upgrade action
1386
##升级语句对应的action要写在下面的actions begin和actions end这两行之间,
1387
##因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
1388
##这两行之间的这些代码,如果不写在这两行之间的话会导致清空不掉相应的代码。
1389
#  across_version = upgrade_across_version(cur)
1390
#  if across_version:
1391
#    run_upgrade_job(conn, cur, "UPGRADE_ALL", timeout)
1392
#  else:
1393
#    run_upgrade_job(conn, cur, "UPGRADE_VIRTUAL_SCHEMA", timeout)
1394
#
1395
#  run_root_inspection(cur, timeout)
1396
#####========******####======== actions begin ========####******========####
1397
#  upgrade_syslog_level(conn, cur)
1398
#  return
1399
#
1400
#def upgrade_syslog_level(conn, cur):
1401
#  try:
1402
#    cur.execute("""select svr_ip, svr_port, value from oceanbase.__all_virtual_sys_parameter_stat where name = 'syslog_level'""")
1403
#    result = cur.fetchall()
1404
#    for r in result:
1405
#      logging.info("syslog level before upgrade: ip: {0}, port: {1}, value: {2}".format(r[0], r[1], r[2]))
1406
#    cur.execute("""select count(*) cnt from oceanbase.__all_virtual_sys_parameter_stat where name = 'syslog_level' and value = 'INFO'""")
1407
#    result = cur.fetchall()
1408
#    info_cnt = result[0][0]
1409
#    if info_cnt > 0:
1410
#      actions.set_parameter(cur, "syslog_level", "WDIAG")
1411
#  except Exception, e:
1412
#    logging.warn("upgrade syslog level failed!")
1413
#    raise e
1414
#####========******####========= actions end =========####******========####
1415
#
1416
#def query(cur, sql):
1417
#  cur.execute(sql)
1418
#  logging.info(sql)
1419
#  results = cur.fetchall()
1420
#  return results
1421
#
1422
#def get_tenant_ids(cur):
1423
#  return [_[0] for _ in query(cur, 'select tenant_id from oceanbase.__all_tenant')]
1424
#
1425
#def run_root_inspection(cur, timeout):
1426
#
1427
#  query_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 600)
1428
#
1429
#  actions.set_session_timeout(cur, query_timeout)
1430
#
1431
#  sql = "alter system run job 'root_inspection'"
1432
#  logging.info(sql)
1433
#  cur.execute(sql)
1434
#
1435
#  actions.set_session_timeout(cur, 10)
1436
#
1437
#def upgrade_across_version(cur):
1438
#  current_data_version = actions.get_current_data_version()
1439
#  int_current_data_version = actions.get_version(current_data_version)
1440
#
1441
#  across_version = False
1442
#  sys_tenant_id = 1
1443
#
1444
#  # 1. check if target_data_version/current_data_version match with current_data_version
1445
#  sql = "select count(*) from oceanbase.__all_table where table_name = '__all_virtual_core_table'"
1446
#  results = query(cur, sql)
1447
#  if len(results) < 1 or len(results[0]) < 1:
1448
#    logging.warn("row/column cnt not match")
1449
#    raise e
1450
#  elif results[0][0] <= 0:
1451
#    # __all_virtual_core_table doesn't exist, this cluster is upgraded from 4.0.0.0
1452
#    across_version = True
1453
#  else:
1454
#    # check
1455
#    tenant_ids = get_tenant_ids(cur)
1456
#    if len(tenant_ids) <= 0:
1457
#      logging.warn("tenant_ids count is unexpected")
1458
#      raise e
1459
#    tenant_count = len(tenant_ids)
1460
#
1461
#    sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0}".format(int_current_data_version)
1462
#    results = query(cur, sql)
1463
#    if len(results) != 1 or len(results[0]) != 1:
1464
#      logging.warn('result cnt not match')
1465
#      raise e
1466
#    elif 2 * tenant_count != results[0][0]:
1467
#      logging.info('target_data_version/current_data_version not match with {0}, tenant_cnt:{1}, result_cnt:{2}'.format(current_data_version, tenant_count, results[0][0]))
1468
#      across_version = True
1469
#    else:
1470
#      logging.info("all tenant's target_data_version/current_data_version are match with {0}".format(current_data_version))
1471
#      across_version = False
1472
#
1473
#  # 2. check if compatible match with current_data_version
1474
#  if not across_version:
1475
#    sql = "select count(*) from oceanbase.__all_virtual_tenant_parameter_info where name = 'compatible' and value != '{0}'".format(current_data_version)
1476
#    results = query(cur, sql)
1477
#    if len(results) < 1 or len(results[0]) < 1:
1478
#      logging.warn("row/column cnt not match")
1479
#      raise e
1480
#    elif results[0][0] == 0:
1481
#      logging.info("compatible are all matched")
1482
#    else:
1483
#      logging.info("compatible unmatched")
1484
#      across_version = True
1485
#
1486
#  return across_version
1487
#
1488
#def get_max_used_job_id(cur):
1489
#  try:
1490
#    max_job_id = 0
1491
#    sql = "select job_id from oceanbase.__all_rootservice_job order by job_id desc limit 1"
1492
#    results = query(cur, sql)
1493
#
1494
#    if (len(results) == 0):
1495
#      max_job_id = 0
1496
#    elif (len(results) != 1 or len(results[0]) != 1):
1497
#      logging.warn("row cnt not match")
1498
#      raise e
1499
#    else:
1500
#      max_job_id = results[0][0]
1501
#
1502
#    logging.info("get max_used_job_id:{0}".format(max_job_id))
1503
#
1504
#    return max_job_id
1505
#  except Exception, e:
1506
#    logging.warn("failed to get max_used_job_id")
1507
#    raise e
1508
#
1509
#def check_can_run_upgrade_job(cur, job_name):
1510
#  try:
1511
#    sql = """select job_status from oceanbase.__all_rootservice_job
1512
#             where job_type = '{0}' order by job_id desc limit 1""".format(job_name)
1513
#    results = query(cur, sql)
1514
#
1515
#    bret = True
1516
#    if (len(results) == 0):
1517
#      bret = True
1518
#      logging.info("upgrade job not created yet, should run upgrade job")
1519
#    elif (len(results) != 1 or len(results[0]) != 1):
1520
#      logging.warn("row cnt not match")
1521
#      raise e
1522
#    elif ("INPROGRESS" == results[0][0]):
1523
#      logging.warn("upgrade job still running, should wait")
1524
#      raise e
1525
#    elif ("SUCCESS" == results[0][0]):
1526
#      bret = True
1527
#      logging.info("maybe upgrade job remained, can run again")
1528
#    elif ("FAILED" == results[0][0]):
1529
#      bret = True
1530
#      logging.info("execute upgrade job failed, should run again")
1531
#    else:
1532
#      logging.warn("invalid job status: {0}".format(results[0][0]))
1533
#      raise e
1534
#
1535
#    return bret
1536
#  except Exception, e:
1537
#    logging.warn("failed to check if upgrade job can run")
1538
#    raise e
1539
#
1540
#def check_upgrade_job_result(cur, job_name, timeout, max_used_job_id):
1541
#  try:
1542
#    wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 100, 3600)
1543
#
1544
#    times = wait_timeout / 10
1545
#    while (times >= 0):
1546
#      sql = """select job_status, rs_svr_ip, rs_svr_port, gmt_create from oceanbase.__all_rootservice_job
1547
#               where job_type = '{0}' and job_id > {1} order by job_id desc limit 1
1548
#            """.format(job_name, max_used_job_id)
1549
#      results = query(cur, sql)
1550
#
1551
#      if (len(results) == 0):
1552
#        logging.info("upgrade job not created yet")
1553
#      elif (len(results) != 1 or len(results[0]) != 4):
1554
#        logging.warn("row cnt not match")
1555
#        raise e
1556
#      elif ("INPROGRESS" == results[0][0]):
1557
#        logging.info("upgrade job is still running")
1558
#        # check if rs change
1559
#        if times % 10 == 0:
1560
#          ip = results[0][1]
1561
#          port = results[0][2]
1562
#          gmt_create = results[0][3]
1563
#          sql = """select count(*) from oceanbase.__all_virtual_core_meta_table where role = 1 and svr_ip = '{0}' and svr_port = {1}""".format(ip, port)
1564
#          results = query(cur, sql)
1565
#          if (len(results) != 1 or len(results[0]) != 1):
1566
#            logging.warn("row/column cnt not match")
1567
#            raise e
1568
#          elif results[0][0] == 1:
1569
#            sql = """select count(*) from oceanbase.__all_rootservice_event_history where gmt_create > '{0}' and event = 'full_rootservice'""".format(gmt_create)
1570
#            results = query(cur, sql)
1571
#            if (len(results) != 1 or len(results[0]) != 1):
1572
#              logging.warn("row/column cnt not match")
1573
#              raise e
1574
#            elif results[0][0] > 0:
1575
#              logging.warn("rs changed, should check if upgrade job is still running")
1576
#              raise e
1577
#            else:
1578
#              logging.info("rs[{0}:{1}] still exist, keep waiting".format(ip, port))
1579
#          else:
1580
#            logging.warn("rs changed or not exist, should check if upgrade job is still running")
1581
#            raise e
1582
#      elif ("SUCCESS" == results[0][0]):
1583
#        logging.info("execute upgrade job successfully")
1584
#        break;
1585
#      elif ("FAILED" == results[0][0]):
1586
#        logging.warn("execute upgrade job failed")
1587
#        raise e
1588
#      else:
1589
#        logging.warn("invalid job status: {0}".format(results[0][0]))
1590
#        raise e
1591
#
1592
#      times = times - 1
1593
#      if times == -1:
1594
#        logging.warn("""check {0} job timeout""".format(job_name))
1595
#        raise e
1596
#      time.sleep(10)
1597
#  except Exception, e:
1598
#    logging.warn("failed to check upgrade job result")
1599
#    raise e
1600
#
1601
#def run_upgrade_job(conn, cur, job_name, timeout):
1602
#  try:
1603
#    logging.info("start to run upgrade job, job_name:{0}".format(job_name))
1604
#    # pre check
1605
#    if check_can_run_upgrade_job(cur, job_name):
1606
#      conn.autocommit = True
1607
#      # disable enable_ddl
1608
#      ori_enable_ddl = actions.get_ori_enable_ddl(cur, timeout)
1609
#      if ori_enable_ddl == 0:
1610
#        actions.set_parameter(cur, 'enable_ddl', 'True', timeout)
1611
#      # enable_sys_table_ddl
1612
#      actions.set_parameter(cur, 'enable_sys_table_ddl', 'True', timeout)
1613
#      # get max_used_job_id
1614
#      max_used_job_id = get_max_used_job_id(cur)
1615
#      # run upgrade job
1616
#      sql = """alter system run upgrade job '{0}'""".format(job_name)
1617
#      logging.info(sql)
1618
#      cur.execute(sql)
1619
#      # check upgrade job result
1620
#      check_upgrade_job_result(cur, job_name, timeout, max_used_job_id)
1621
#      # reset enable_sys_table_ddl
1622
#      actions.set_parameter(cur, 'enable_sys_table_ddl', 'False', timeout)
1623
#      # reset enable_ddl
1624
#      if ori_enable_ddl == 0:
1625
#        actions.set_parameter(cur, 'enable_ddl', 'False', timeout)
1626
#  except Exception, e:
1627
#    logging.warn("run upgrade job failed, :{0}".format(job_name))
1628
#    raise e
1629
#  logging.info("run upgrade job success, job_name:{0}".format(job_name))
1630
####====XXXX======######==== I am a splitter ====######======XXXX====####
1631
#filename:upgrade_checker.py
1632
##!/usr/bin/env python
1633
## -*- coding: utf-8 -*-
1634
#
1635
#import sys
1636
#import os
1637
#import mysql.connector
1638
#from mysql.connector import errorcode
1639
#import logging
1640
#import getopt
1641
#import time
1642
#
1643
#class UpgradeParams:
1644
#  log_filename = 'upgrade_checker.log'
1645
#  old_version = '4.0.0.0'
1646
##### --------------start : my_error.py --------------
1647
#class MyError(Exception):
1648
#  def __init__(self, value):
1649
#    self.value = value
1650
#  def __str__(self):
1651
#    return repr(self.value)
1652
##### --------------start : actions.py------------
1653
#class Cursor:
1654
#  __cursor = None
1655
#  def __init__(self, cursor):
1656
#    self.__cursor = cursor
1657
#  def exec_sql(self, sql, print_when_succ = True):
1658
#    try:
1659
#      self.__cursor.execute(sql)
1660
#      rowcount = self.__cursor.rowcount
1661
#      if True == print_when_succ:
1662
#        logging.info('succeed to execute sql: %s, rowcount = %d', sql, rowcount)
1663
#      return rowcount
1664
#    except mysql.connector.Error, e:
1665
#      logging.exception('mysql connector error, fail to execute sql: %s', sql)
1666
#      raise e
1667
#    except Exception, e:
1668
#      logging.exception('normal error, fail to execute sql: %s', sql)
1669
#      raise e
1670
#  def exec_query(self, sql, print_when_succ = True):
1671
#    try:
1672
#      self.__cursor.execute(sql)
1673
#      results = self.__cursor.fetchall()
1674
#      rowcount = self.__cursor.rowcount
1675
#      if True == print_when_succ:
1676
#        logging.info('succeed to execute query: %s, rowcount = %d', sql, rowcount)
1677
#      return (self.__cursor.description, results)
1678
#    except mysql.connector.Error, e:
1679
#      logging.exception('mysql connector error, fail to execute sql: %s', sql)
1680
#      raise e
1681
#    except Exception, e:
1682
#      logging.exception('normal error, fail to execute sql: %s', sql)
1683
#      raise e
1684
#
1685
#def set_parameter(cur, parameter, value):
1686
#  sql = """alter system set {0} = '{1}'""".format(parameter, value)
1687
#  logging.info(sql)
1688
#  cur.execute(sql)
1689
#  wait_parameter_sync(cur, parameter, value)
1690
#
1691
#def wait_parameter_sync(cur, key, value):
1692
#  sql = """select count(*) as cnt from oceanbase.__all_virtual_sys_parameter_stat
1693
#           where name = '{0}' and value != '{1}'""".format(key, value)
1694
#  times = 10
1695
#  while times > 0:
1696
#    logging.info(sql)
1697
#    cur.execute(sql)
1698
#    result = cur.fetchall()
1699
#    if len(result) != 1 or len(result[0]) != 1:
1700
#      logging.exception('result cnt not match')
1701
#      raise e
1702
#    elif result[0][0] == 0:
1703
#      logging.info("""{0} is sync, value is {1}""".format(key, value))
1704
#      break
1705
#    else:
1706
#      logging.info("""{0} is not sync, value should be {1}""".format(key, value))
1707
#
1708
#    times -= 1
1709
#    if times == 0:
1710
#      logging.exception("""check {0}:{1} sync timeout""".format(key, value))
1711
#      raise e
1712
#    time.sleep(5)
1713
#
1714
##### --------------start :  opt.py --------------
1715
#help_str = \
1716
#"""
1717
#Help:
1718
#""" +\
1719
#sys.argv[0] + """ [OPTIONS]""" +\
1720
#'\n\n' +\
1721
#'-I, --help          Display this help and exit.\n' +\
1722
#'-V, --version       Output version information and exit.\n' +\
1723
#'-h, --host=name     Connect to host.\n' +\
1724
#'-P, --port=name     Port number to use for connection.\n' +\
1725
#'-u, --user=name     User for login.\n' +\
1726
#'-t, --timeout=name  Cmd/Query/Inspection execute timeout(s).\n' +\
1727
#'-p, --password=name Password to use when connecting to server. If password is\n' +\
1728
#'                    not given it\'s empty string "".\n' +\
1729
#'-m, --module=name   Modules to run. Modules should be a string combined by some of\n' +\
1730
#'                    the following strings: ddl, normal_dml, each_tenant_dml,\n' +\
1731
#'                    system_variable_dml, special_action, all. "all" represents\n' +\
1732
#'                    that all modules should be run. They are splitted by ",".\n' +\
1733
#'                    For example: -m all, or --module=ddl,normal_dml,special_action\n' +\
1734
#'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\
1735
#'\n\n' +\
1736
#'Maybe you want to run cmd like that:\n' +\
1737
#sys.argv[0] + ' -h 127.0.0.1 -P 3306 -u admin -p admin\n'
1738
#
1739
#version_str = """version 1.0.0"""
1740
#
1741
#class Option:
1742
#  __g_short_name_set = set([])
1743
#  __g_long_name_set = set([])
1744
#  __short_name = None
1745
#  __long_name = None
1746
#  __is_with_param = None
1747
#  __is_local_opt = None
1748
#  __has_value = None
1749
#  __value = None
1750
#  def __init__(self, short_name, long_name, is_with_param, is_local_opt, default_value = None):
1751
#    if short_name in Option.__g_short_name_set:
1752
#      raise MyError('duplicate option short name: {0}'.format(short_name))
1753
#    elif long_name in Option.__g_long_name_set:
1754
#      raise MyError('duplicate option long name: {0}'.format(long_name))
1755
#    Option.__g_short_name_set.add(short_name)
1756
#    Option.__g_long_name_set.add(long_name)
1757
#    self.__short_name = short_name
1758
#    self.__long_name = long_name
1759
#    self.__is_with_param = is_with_param
1760
#    self.__is_local_opt = is_local_opt
1761
#    self.__has_value = False
1762
#    if None != default_value:
1763
#      self.set_value(default_value)
1764
#  def is_with_param(self):
1765
#    return self.__is_with_param
1766
#  def get_short_name(self):
1767
#    return self.__short_name
1768
#  def get_long_name(self):
1769
#    return self.__long_name
1770
#  def has_value(self):
1771
#    return self.__has_value
1772
#  def get_value(self):
1773
#    return self.__value
1774
#  def set_value(self, value):
1775
#    self.__value = value
1776
#    self.__has_value = True
1777
#  def is_local_opt(self):
1778
#    return self.__is_local_opt
1779
#  def is_valid(self):
1780
#    return None != self.__short_name and None != self.__long_name and True == self.__has_value and None != self.__value
1781
#
1782
#g_opts =\
1783
#[\
1784
#Option('I', 'help', False, True),\
1785
#Option('V', 'version', False, True),\
1786
#Option('h', 'host', True, False),\
1787
#Option('P', 'port', True, False),\
1788
#Option('u', 'user', True, False),\
1789
#Option('t', 'timeout', True, False, 0),\
1790
#Option('p', 'password', True, False, ''),\
1791
## 要跑哪个模块,默认全跑
1792
#Option('m', 'module', True, False, 'all'),\
1793
## 日志文件路径,不同脚本的main函数中中会改成不同的默认值
1794
#Option('l', 'log-file', True, False)
1795
#]\
1796
#
1797
#def change_opt_defult_value(opt_long_name, opt_default_val):
1798
#  global g_opts
1799
#  for opt in g_opts:
1800
#    if opt.get_long_name() == opt_long_name:
1801
#      opt.set_value(opt_default_val)
1802
#      return
1803
#
1804
#def has_no_local_opts():
1805
#  global g_opts
1806
#  no_local_opts = True
1807
#  for opt in g_opts:
1808
#    if opt.is_local_opt() and opt.has_value():
1809
#      no_local_opts = False
1810
#  return no_local_opts
1811
#
1812
#def check_db_client_opts():
1813
#  global g_opts
1814
#  for opt in g_opts:
1815
#    if not opt.is_local_opt() and not opt.has_value():
1816
#      raise MyError('option "-{0}" has not been specified, maybe you should run "{1} --help" for help'\
1817
#          .format(opt.get_short_name(), sys.argv[0]))
1818
#
1819
#def parse_option(opt_name, opt_val):
1820
#  global g_opts
1821
#  for opt in g_opts:
1822
#    if opt_name in (('-' + opt.get_short_name()), ('--' + opt.get_long_name())):
1823
#      opt.set_value(opt_val)
1824
#
1825
#def parse_options(argv):
1826
#  global g_opts
1827
#  short_opt_str = ''
1828
#  long_opt_list = []
1829
#  for opt in g_opts:
1830
#    if opt.is_with_param():
1831
#      short_opt_str += opt.get_short_name() + ':'
1832
#    else:
1833
#      short_opt_str += opt.get_short_name()
1834
#  for opt in g_opts:
1835
#    if opt.is_with_param():
1836
#      long_opt_list.append(opt.get_long_name() + '=')
1837
#    else:
1838
#      long_opt_list.append(opt.get_long_name())
1839
#  (opts, args) = getopt.getopt(argv, short_opt_str, long_opt_list)
1840
#  for (opt_name, opt_val) in opts:
1841
#    parse_option(opt_name, opt_val)
1842
#  if has_no_local_opts():
1843
#    check_db_client_opts()
1844
#
1845
#def deal_with_local_opt(opt):
1846
#  if 'help' == opt.get_long_name():
1847
#    global help_str
1848
#    print help_str
1849
#  elif 'version' == opt.get_long_name():
1850
#    global version_str
1851
#    print version_str
1852
#
1853
#def deal_with_local_opts():
1854
#  global g_opts
1855
#  if has_no_local_opts():
1856
#    raise MyError('no local options, can not deal with local options')
1857
#  else:
1858
#    for opt in g_opts:
1859
#      if opt.is_local_opt() and opt.has_value():
1860
#        deal_with_local_opt(opt)
1861
#        # 只处理一个
1862
#        return
1863
#
1864
#def get_opt_host():
1865
#  global g_opts
1866
#  for opt in g_opts:
1867
#    if 'host' == opt.get_long_name():
1868
#      return opt.get_value()
1869
#
1870
#def get_opt_port():
1871
#  global g_opts
1872
#  for opt in g_opts:
1873
#    if 'port' == opt.get_long_name():
1874
#      return opt.get_value()
1875
#
1876
#def get_opt_user():
1877
#  global g_opts
1878
#  for opt in g_opts:
1879
#    if 'user' == opt.get_long_name():
1880
#      return opt.get_value()
1881
#
1882
#def get_opt_password():
1883
#  global g_opts
1884
#  for opt in g_opts:
1885
#    if 'password' == opt.get_long_name():
1886
#      return opt.get_value()
1887
#
1888
#def get_opt_timeout():
1889
#  global g_opts
1890
#  for opt in g_opts:
1891
#    if 'timeout' == opt.get_long_name():
1892
#      return opt.get_value()
1893
#
1894
#def get_opt_module():
1895
#  global g_opts
1896
#  for opt in g_opts:
1897
#    if 'module' == opt.get_long_name():
1898
#      return opt.get_value()
1899
#
1900
#def get_opt_log_file():
1901
#  global g_opts
1902
#  for opt in g_opts:
1903
#    if 'log-file' == opt.get_long_name():
1904
#      return opt.get_value()
1905
##### ---------------end----------------------
1906
#
1907
##### --------------start :  do_upgrade_pre.py--------------
1908
#def config_logging_module(log_filenamme):
1909
#  logging.basicConfig(level=logging.INFO,\
1910
#      format='[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s',\
1911
#      datefmt='%Y-%m-%d %H:%M:%S',\
1912
#      filename=log_filenamme,\
1913
#      filemode='w')
1914
#  # 定义日志打印格式
1915
#  formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s', '%Y-%m-%d %H:%M:%S')
1916
#  #######################################
1917
#  # 定义一个Handler打印INFO及以上级别的日志到sys.stdout
1918
#  stdout_handler = logging.StreamHandler(sys.stdout)
1919
#  stdout_handler.setLevel(logging.INFO)
1920
#  # 设置日志打印格式
1921
#  stdout_handler.setFormatter(formatter)
1922
#  # 将定义好的stdout_handler日志handler添加到root logger
1923
#  logging.getLogger('').addHandler(stdout_handler)
1924
##### ---------------end----------------------
1925
#
1926
#
1927
#fail_list=[]
1928
#
1929
#def get_version(version_str):
1930
#  versions = version_str.split(".")
1931
#
1932
#  if len(versions) != 4:
1933
#    logging.exception("""version:{0} is invalid""".format(version_str))
1934
#    raise e
1935
#
1936
#  major = int(versions[0])
1937
#  minor = int(versions[1])
1938
#  major_patch = int(versions[2])
1939
#  minor_patch = int(versions[3])
1940
#
1941
#  if major > 0xffffffff or minor > 0xffff or major_patch > 0xff or minor_patch > 0xff:
1942
#    logging.exception("""version:{0} is invalid""".format(version_str))
1943
#    raise e
1944
#
1945
#  version = (major << 32) | (minor << 16) | (major_patch << 8) | (minor_patch)
1946
#  return version
1947
#
1948
##### START ####
1949
## 1. 检查前置版本
1950
#def check_observer_version(query_cur, upgrade_params):
1951
#  (desc, results) = query_cur.exec_query("""select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'""")
1952
#  if len(results) != 1:
1953
#    fail_list.append('min_observer_version is not sync')
1954
#  elif cmp(results[0][0], upgrade_params.old_version) < 0 :
1955
#    fail_list.append('old observer version is expected equal or higher then: {0}, actual version:{1}'.format(upgrade_params.old_version, results[0][0]))
1956
#  logging.info('check observer version success, version = {0}'.format(results[0][0]))
1957
#
1958
#def check_data_version(query_cur):
1959
#  min_cluster_version = 0
1960
#  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
1961
#  (desc, results) = query_cur.exec_query(sql)
1962
#  if len(results) != 1:
1963
#    fail_list.append('min_observer_version is not sync')
1964
#  elif len(results[0]) != 1:
1965
#    fail_list.append('column cnt not match')
1966
#  else:
1967
#    min_cluster_version = get_version(results[0][0])
1968
#
1969
#    # check data version
1970
#    if min_cluster_version < get_version("4.1.0.0"):
1971
#      # last barrier cluster version should be 4.1.0.0
1972
#      fail_list.append('last barrier cluster version is 4.1.0.0. prohibit cluster upgrade from cluster version less than 4.1.0.0')
1973
#    else:
1974
#      data_version_str = ''
1975
#      data_version = 0
1976
#      # check compatible is same
1977
#      sql = """select distinct value from oceanbase.__all_virtual_tenant_parameter_info where name='compatible'"""
1978
#      (desc, results) = query_cur.exec_query(sql)
1979
#      if len(results) != 1:
1980
#        fail_list.append('compatible is not sync')
1981
#      elif len(results[0]) != 1:
1982
#        fail_list.append('column cnt not match')
1983
#      else:
1984
#        data_version_str = results[0][0]
1985
#        data_version = get_version(results[0][0])
1986
#
1987
#        if data_version < get_version("4.1.0.0"):
1988
#          # last barrier data version should be 4.1.0.0
1989
#          fail_list.append('last barrier data version is 4.1.0.0. prohibit cluster upgrade from data version less than 4.1.0.0')
1990
#        else:
1991
#          # check target_data_version/current_data_version
1992
#          sql = "select count(*) from oceanbase.__all_tenant"
1993
#          (desc, results) = query_cur.exec_query(sql)
1994
#          if len(results) != 1 or len(results[0]) != 1:
1995
#            fail_list.append('result cnt not match')
1996
#          else:
1997
#            tenant_count = results[0][0]
1998
#
1999
#            sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0}".format(data_version)
2000
#            (desc, results) = query_cur.exec_query(sql)
2001
#            if len(results) != 1 or len(results[0]) != 1:
2002
#              fail_list.append('result cnt not match')
2003
#            elif 2 * tenant_count != results[0][0]:
2004
#              fail_list.append('target_data_version/current_data_version not match with {0}, tenant_cnt:{1}, result_cnt:{2}'.format(data_version_str, tenant_count, results[0][0]))
2005
#            else:
2006
#              logging.info("check data version success, all tenant's compatible/target_data_version/current_data_version is {0}".format(data_version_str))
2007
#
2008
## 2. 检查paxos副本是否同步, paxos副本是否缺失
2009
#def check_paxos_replica(query_cur):
2010
#  # 2.1 检查paxos副本是否同步
2011
#  (desc, results) = query_cur.exec_query("""select count(1) as unsync_cnt from GV$OB_LOG_STAT where in_sync = 'NO'""")
2012
#  if results[0][0] > 0 :
2013
#    fail_list.append('{0} replicas unsync, please check'.format(results[0][0]))
2014
#  # 2.2 检查paxos副本是否有缺失 TODO
2015
#  logging.info('check paxos replica success')
2016
#
2017
## 3. 检查是否有做balance, locality变更
2018
#def check_rebalance_task(query_cur):
2019
#  # 3.1 检查是否有做locality变更
2020
#  (desc, results) = query_cur.exec_query("""select count(1) as cnt from DBA_OB_TENANT_JOBS where job_status='INPROGRESS' and result_code is null""")
2021
#  if results[0][0] > 0 :
2022
#    fail_list.append('{0} locality tasks is doing, please check'.format(results[0][0]))
2023
#  # 3.2 检查是否有做balance
2024
#  (desc, results) = query_cur.exec_query("""select count(1) as rebalance_task_cnt from CDB_OB_LS_REPLICA_TASKS""")
2025
#  if results[0][0] > 0 :
2026
#    fail_list.append('{0} rebalance tasks is doing, please check'.format(results[0][0]))
2027
#  logging.info('check rebalance task success')
2028
#
2029
## 4. 检查集群状态
2030
#def check_cluster_status(query_cur):
2031
#  # 4.1 检查是否非合并状态
2032
#  (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""")
2033
#  if results[0][0] > 0 :
2034
#    fail_list.append('{0} tenant is merging, please check'.format(results[0][0]))
2035
#  (desc, results) = query_cur.exec_query("""select /*+ query_timeout(1000000000) */ count(1) from __all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0""")
2036
#  if results[0][0] > 0 :
2037
#    fail_list.append('{0} tablet is merging, please check'.format(results[0][0]))
2038
#  logging.info('check cluster status success')
2039
#
2040
## 5. 检查是否有异常租户(creating,延迟删除,恢复中)
2041
#def check_tenant_status(query_cur):
2042
#
2043
#  # check tenant schema
2044
#  (desc, results) = query_cur.exec_query("""select count(*) as count from DBA_OB_TENANTS where status != 'NORMAL'""")
2045
#  if len(results) != 1 or len(results[0]) != 1:
2046
#    fail_list.append('results len not match')
2047
#  elif 0 != results[0][0]:
2048
#    fail_list.append('has abnormal tenant, should stop')
2049
#  else:
2050
#    logging.info('check tenant status success')
2051
#
2052
#  # check tenant info
2053
#  # don't support restore tenant upgrade
2054
#  (desc, results) = query_cur.exec_query("""select count(*) as count from oceanbase.__all_virtual_tenant_info where tenant_role != 'PRIMARY' and tenant_role != 'STANDBY'""")
2055
#  if len(results) != 1 or len(results[0]) != 1:
2056
#    fail_list.append('results len not match')
2057
#  elif 0 != results[0][0]:
2058
#    fail_list.append('has abnormal tenant info, should stop')
2059
#  else:
2060
#    logging.info('check tenant info success')
2061
#
2062
#   # check tenant lock status
2063
#  (desc, results) = query_cur.exec_query("""select count(*) from DBA_OB_TENANTS where LOCKED = 'YES'""")
2064
#  if len(results) != 1 or len(results[0]) != 1:
2065
#    fail_list.append('results len not match')
2066
#  elif 0 != results[0][0]:
2067
#    fail_list.append('has locked tenant, should unlock')
2068
#  else:
2069
#    logging.info('check tenant lock status success')
2070
#
2071
## 6. 检查无恢复任务
2072
#def check_restore_job_exist(query_cur):
2073
#  (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_RESTORE_PROGRESS""")
2074
#  if len(results) != 1 or len(results[0]) != 1:
2075
#    fail_list.append('failed to restore job cnt')
2076
#  elif results[0][0] != 0:
2077
#      fail_list.append("""still has restore job, upgrade is not allowed temporarily""")
2078
#  logging.info('check restore job success')
2079
#
2080
#def check_is_primary_zone_distributed(primary_zone_str):
2081
#  semicolon_pos = len(primary_zone_str)
2082
#  for i in range(len(primary_zone_str)):
2083
#    if primary_zone_str[i] == ';':
2084
#      semicolon_pos = i
2085
#      break
2086
#  comma_pos = len(primary_zone_str)
2087
#  for j in range(len(primary_zone_str)):
2088
#    if primary_zone_str[j] == ',':
2089
#      comma_pos = j
2090
#      break
2091
#  if comma_pos < semicolon_pos:
2092
#    return True
2093
#  else:
2094
#    return False
2095
#
2096
## 7. 升级前需要primary zone只有一个
2097
#def check_tenant_primary_zone(query_cur):
2098
#  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
2099
#  (desc, results) = query_cur.exec_query(sql)
2100
#  if len(results) != 1:
2101
#    fail_list.append('min_observer_version is not sync')
2102
#  elif len(results[0]) != 1:
2103
#    fail_list.append('column cnt not match')
2104
#  else:
2105
#    min_cluster_version = get_version(results[0][0])
2106
#    if min_cluster_version < get_version("4.1.0.0"):
2107
#      (desc, results) = query_cur.exec_query("""select tenant_name,primary_zone from DBA_OB_TENANTS where  tenant_id != 1""");
2108
#      for item in results:
2109
#        if cmp(item[1], "RANDOM") == 0:
2110
#          fail_list.append('{0} tenant primary zone random before update not allowed'.format(item[0]))
2111
#        elif check_is_primary_zone_distributed(item[1]):
2112
#          fail_list.append('{0} tenant primary zone distributed before update not allowed'.format(item[0]))
2113
#      logging.info('check tenant primary zone success')
2114
#
2115
## 8. 修改永久下线的时间,避免升级过程中缺副本
2116
#def modify_server_permanent_offline_time(cur):
2117
#  set_parameter(cur, 'server_permanent_offline_time', '72h')
2118
#
2119
## 9. 检查是否有DDL任务在执行
2120
#def check_ddl_task_execute(query_cur):
2121
#  (desc, results) = query_cur.exec_query("""select count(1) from __all_virtual_ddl_task_status""")
2122
#  if 0 != results[0][0]:
2123
#    fail_list.append("There are DDL task in progress")
2124
#  logging.info('check ddl task execut status success')
2125
#
2126
## 10. 检查无备份任务
2127
#def check_backup_job_exist(query_cur):
2128
#  # Backup jobs cannot be in-progress during upgrade.
2129
#  (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_BACKUP_JOBS""")
2130
#  if len(results) != 1 or len(results[0]) != 1:
2131
#    fail_list.append('failed to backup job cnt')
2132
#  elif results[0][0] != 0:
2133
#    fail_list.append("""still has backup job, upgrade is not allowed temporarily""")
2134
#  else:
2135
#    logging.info('check backup job success')
2136
#
2137
## 11. 检查无归档任务
2138
#def check_archive_job_exist(query_cur):
2139
#  min_cluster_version = 0
2140
#  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
2141
#  (desc, results) = query_cur.exec_query(sql)
2142
#  if len(results) != 1:
2143
#    fail_list.append('min_observer_version is not sync')
2144
#  elif len(results[0]) != 1:
2145
#    fail_list.append('column cnt not match')
2146
#  else:
2147
#    min_cluster_version = get_version(results[0][0])
2148
#
2149
#    # Archive jobs cannot be in-progress before upgrade from 4.0.
2150
#    if min_cluster_version < get_version("4.1.0.0"):
2151
#      (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_ARCHIVELOG where status!='STOP'""")
2152
#      if len(results) != 1 or len(results[0]) != 1:
2153
#        fail_list.append('failed to archive job cnt')
2154
#      elif results[0][0] != 0:
2155
#        fail_list.append("""still has archive job, upgrade is not allowed temporarily""")
2156
#      else:
2157
#        logging.info('check archive job success')
2158
#
2159
## 12. 检查归档路径是否清空
2160
#def check_archive_dest_exist(query_cur):
2161
#  min_cluster_version = 0
2162
#  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
2163
#  (desc, results) = query_cur.exec_query(sql)
2164
#  if len(results) != 1:
2165
#    fail_list.append('min_observer_version is not sync')
2166
#  elif len(results[0]) != 1:
2167
#    fail_list.append('column cnt not match')
2168
#  else:
2169
#    min_cluster_version = get_version(results[0][0])
2170
#    # archive dest need to be cleaned before upgrade from 4.0.
2171
#    if min_cluster_version < get_version("4.1.0.0"):
2172
#      (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_ARCHIVE_DEST""")
2173
#      if len(results) != 1 or len(results[0]) != 1:
2174
#        fail_list.append('failed to archive dest cnt')
2175
#      elif results[0][0] != 0:
2176
#        fail_list.append("""still has archive destination, upgrade is not allowed temporarily""")
2177
#      else:
2178
#        logging.info('check archive destination success')
2179
#
2180
## 13. 检查备份路径是否清空
2181
#def check_backup_dest_exist(query_cur):
2182
#  min_cluster_version = 0
2183
#  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
2184
#  (desc, results) = query_cur.exec_query(sql)
2185
#  if len(results) != 1:
2186
#    fail_list.append('min_observer_version is not sync')
2187
#  elif len(results[0]) != 1:
2188
#    fail_list.append('column cnt not match')
2189
#  else:
2190
#    min_cluster_version = get_version(results[0][0])
2191
#    # backup dest need to be cleaned before upgrade from 4.0.
2192
#    if min_cluster_version < get_version("4.1.0.0"):
2193
#      (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_BACKUP_PARAMETER where name='data_backup_dest' and (value!=NULL or value!='')""")
2194
#      if len(results) != 1 or len(results[0]) != 1:
2195
#        fail_list.append('failed to data backup dest cnt')
2196
#      elif results[0][0] != 0:
2197
#        fail_list.append("""still has backup destination, upgrade is not allowed temporarily""")
2198
#      else:
2199
#        logging.info('check backup destination success')
2200
#
2201
#def check_server_version(query_cur):
2202
#    sql = """select distinct(substring_index(build_version, '_', 1)) from __all_server""";
2203
#    (desc, results) = query_cur.exec_query(sql);
2204
#    if len(results) != 1:
2205
#      fail_list.append("servers build_version not match")
2206
#    else:
2207
#      logging.info("check server version success")
2208
#
2209
## 14. 检查server是否可服务
2210
#def check_observer_status(query_cur):
2211
#  (desc, results) = query_cur.exec_query("""select count(*) from oceanbase.__all_server where (start_service_time <= 0 or status != "active")""")
2212
#  if results[0][0] > 0 :
2213
#    fail_list.append('{0} observer not available , please check'.format(results[0][0]))
2214
#  logging.info('check observer status success')
2215
#
2216
## 15  检查schema是否刷新成功
2217
#def check_schema_status(query_cur):
2218
#  (desc, results) = query_cur.exec_query("""select if (a.cnt = b.cnt, 1, 0) as passed from (select count(*) as cnt from oceanbase.__all_virtual_server_schema_info where refreshed_schema_version > 1 and refreshed_schema_version % 8 = 0) as a join (select count(*) as cnt from oceanbase.__all_server join oceanbase.__all_tenant) as b""")
2219
#  if results[0][0] != 1 :
2220
#    fail_list.append('{0} schema not available, please check'.format(results[0][0]))
2221
#  logging.info('check schema status success')
2222
#
2223
## 16. 检查是否存在名为all/all_user/all_meta的租户
2224
#def check_not_supported_tenant_name(query_cur):
2225
#  names = ["all", "all_user", "all_meta"]
2226
#  (desc, results) = query_cur.exec_query("""select tenant_name from oceanbase.DBA_OB_TENANTS""")
2227
#  for i in range(len(results)):
2228
#    if results[i][0].lower() in names:
2229
#      fail_list.append('a tenant named all/all_user/all_meta (case insensitive) cannot exist in the cluster, please rename the tenant')
2230
#      break
2231
#  logging.info('check special tenant name success')
2232
## 17  检查日志传输压缩是否有使用zlib压缩算法,在升级前需要保证所有observer未开启日志传输压缩或使用非zlib压缩算法
2233
#def check_log_transport_compress_func(query_cur):
2234
#  (desc, results) = query_cur.exec_query("""select count(1) as cnt from oceanbase.__all_virtual_tenant_parameter_info where (name like "log_transport_compress_func" and value like "zlib_1.0")""")
2235
#  if results[0][0] > 0 :
2236
#    fail_list.append('The zlib compression algorithm is no longer supported with log_transport_compress_func, please replace it with other compression algorithms')
2237
#  logging.info('check log_transport_compress_func success')
2238
## 18 检查升级过程中是否有表使用zlib压缩,在升级前需要保证所有表都不使用zlib压缩
2239
#def check_table_compress_func(query_cur):
2240
#  (desc, results) = query_cur.exec_query("""select /*+ query_timeout(1000000000) */ count(1) from __all_virtual_table where (compress_func_name like '%zlib%')""")
2241
#  if results[0][0] > 0 :
2242
#    fail_list.append('There are tables use zlib compression, please replace it with other compression algorithms or do not use compression during the upgrade')
2243
#  logging.info('check table compression method success')
2244
## 19 检查升级过程中 table_api/obkv 连接传输是否使用了zlib压缩,在升级前需要保证所有 obkv/table_api 连接未开启zlib压缩传输或者使用非zlib压缩算法
2245
#def check_table_api_transport_compress_func(query_cur):
2246
#  (desc, results) = query_cur.exec_query("""select count(1) as cnt from GV$OB_PARAMETERS where (name like "tableapi_transport_compress_func" and value like "zlib%");""")
2247
#  if results[0][0] > 0 :
2248
#    fail_list.append('Table api connection is not allowed to use zlib as compression algorithm during the upgrade, please use other compression algorithms by setting table_api_transport_compress_func')
2249
#  logging.info('check table_api_transport_compress_func success')
2250
#
2251
## 17. 检查无租户克隆任务
2252
#def check_tenant_clone_job_exist(query_cur):
2253
#  min_cluster_version = 0
2254
#  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
2255
#  (desc, results) = query_cur.exec_query(sql)
2256
#  if len(results) != 1:
2257
#    fail_list.append('min_observer_version is not sync')
2258
#  elif len(results[0]) != 1:
2259
#    fail_list.append('column cnt not match')
2260
#  else:
2261
#    min_cluster_version = get_version(results[0][0])
2262
#    if min_cluster_version >= get_version("4.3.0.0"):
2263
#      (desc, results) = query_cur.exec_query("""select count(1) from __all_virtual_clone_job""")
2264
#      if len(results) != 1 or len(results[0]) != 1:
2265
#        fail_list.append('failed to tenant clone job cnt')
2266
#      elif results[0][0] != 0:
2267
#        fail_list.append("""still has tenant clone job, upgrade is not allowed temporarily""")
2268
#      else:
2269
#        logging.info('check tenant clone job success')
2270
#
2271
## 18. 检查无租户快照任务
2272
#def check_tenant_snapshot_task_exist(query_cur):
2273
#  min_cluster_version = 0
2274
#  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
2275
#  (desc, results) = query_cur.exec_query(sql)
2276
#  if len(results) != 1:
2277
#    fail_list.append('min_observer_version is not sync')
2278
#  elif len(results[0]) != 1:
2279
#    fail_list.append('column cnt not match')
2280
#  else:
2281
#    min_cluster_version = get_version(results[0][0])
2282
#    if min_cluster_version >= get_version("4.3.0.0"):
2283
#      (desc, results) = query_cur.exec_query("""select count(1) from __all_virtual_tenant_snapshot where status!='NORMAL'""")
2284
#      if len(results) != 1 or len(results[0]) != 1:
2285
#        fail_list.append('failed to tenant snapshot task')
2286
#      elif results[0][0] != 0:
2287
#        fail_list.append("""still has tenant snapshot task, upgrade is not allowed temporarily""")
2288
#      else:
2289
#        logging.info('check tenant snapshot task success')
2290
#
2291
## 17. 检查是否有租户在升到4.3.0版本之前已将binlog_row_image设为MINIMAL
2292
#def check_variable_binlog_row_image(query_cur):
2293
## 4.3.0.0之前的版本,MINIMAL模式生成的日志CDC无法正常消费(DELETE日志).
2294
## 4.3.0版本开始,MINIMAL模式做了改进,支持CDC消费,需要在升级到4.3.0.0之后再打开.
2295
#  min_cluster_version = 0
2296
#  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
2297
#  (desc, results) = query_cur.exec_query(sql)
2298
#  if len(results) != 1:
2299
#    fail_list.append('min_observer_version is not sync')
2300
#  elif len(results[0]) != 1:
2301
#    fail_list.append('column cnt not match')
2302
#  else:
2303
#    min_cluster_version = get_version(results[0][0])
2304
#    # check cluster version
2305
#    if min_cluster_version < get_version("4.3.0.0"):
2306
#      (desc, results) = query_cur.exec_query("""select count(*) from CDB_OB_SYS_VARIABLES where NAME='binlog_row_image' and VALUE = '0'""")
2307
#      if results[0][0] > 0 :
2308
#        fail_list.append('Sys Variable binlog_row_image is set to MINIMAL, please check'.format(results[0][0]))
2309
#    logging.info('check variable binlog_row_image success')
2310
#
2311
## last check of do_check, make sure no function execute after check_fail_list
2312
#def check_fail_list():
2313
#  if len(fail_list) != 0 :
2314
#     error_msg ="upgrade checker failed with " + str(len(fail_list)) + " reasons: " + ", ".join(['['+x+"] " for x in fail_list])
2315
#     raise MyError(error_msg)
2316
#
2317
#def set_query_timeout(query_cur, timeout):
2318
#  if timeout != 0:
2319
#    sql = """set @@session.ob_query_timeout = {0}""".format(timeout * 1000 * 1000)
2320
#    query_cur.exec_sql(sql)
2321
#
2322
## 开始升级前的检查
2323
#def do_check(my_host, my_port, my_user, my_passwd, timeout, upgrade_params):
2324
#  try:
2325
#    conn = mysql.connector.connect(user = my_user,
2326
#                                   password = my_passwd,
2327
#                                   host = my_host,
2328
#                                   port = my_port,
2329
#                                   database = 'oceanbase',
2330
#                                   raise_on_warnings = True)
2331
#    conn.autocommit = True
2332
#    cur = conn.cursor(buffered=True)
2333
#    try:
2334
#      query_cur = Cursor(cur)
2335
#      set_query_timeout(query_cur, timeout)
2336
#      check_observer_version(query_cur, upgrade_params)
2337
#      check_data_version(query_cur)
2338
#      check_paxos_replica(query_cur)
2339
#      check_rebalance_task(query_cur)
2340
#      check_cluster_status(query_cur)
2341
#      check_tenant_status(query_cur)
2342
#      check_restore_job_exist(query_cur)
2343
#      check_tenant_primary_zone(query_cur)
2344
#      check_ddl_task_execute(query_cur)
2345
#      check_backup_job_exist(query_cur)
2346
#      check_archive_job_exist(query_cur)
2347
#      check_archive_dest_exist(query_cur)
2348
#      check_backup_dest_exist(query_cur)
2349
#      check_observer_status(query_cur)
2350
#      check_schema_status(query_cur)
2351
#      check_server_version(query_cur)
2352
#      check_not_supported_tenant_name(query_cur)
2353
#      check_tenant_clone_job_exist(query_cur)
2354
#      check_tenant_snapshot_task_exist(query_cur)
2355
#      check_log_transport_compress_func(query_cur)
2356
#      check_table_compress_func(query_cur)
2357
#      check_table_api_transport_compress_func(query_cur)
2358
#      check_variable_binlog_row_image(query_cur)
2359
#      # all check func should execute before check_fail_list
2360
#      check_fail_list()
2361
#      modify_server_permanent_offline_time(cur)
2362
#    except Exception, e:
2363
#      logging.exception('run error')
2364
#      raise e
2365
#    finally:
2366
#      cur.close()
2367
#      conn.close()
2368
#  except mysql.connector.Error, e:
2369
#    logging.exception('connection error')
2370
#    raise e
2371
#  except Exception, e:
2372
#    logging.exception('normal error')
2373
#    raise e
2374
#
2375
#if __name__ == '__main__':
2376
#  upgrade_params = UpgradeParams()
2377
#  change_opt_defult_value('log-file', upgrade_params.log_filename)
2378
#  parse_options(sys.argv[1:])
2379
#  if not has_no_local_opts():
2380
#    deal_with_local_opts()
2381
#  else:
2382
#    check_db_client_opts()
2383
#    log_filename = get_opt_log_file()
2384
#    upgrade_params.log_filename = log_filename
2385
#    # 日志配置放在这里是为了前面的操作不要覆盖掉日志文件
2386
#    config_logging_module(upgrade_params.log_filename)
2387
#    try:
2388
#      host = get_opt_host()
2389
#      port = int(get_opt_port())
2390
#      user = get_opt_user()
2391
#      password = get_opt_password()
2392
#      timeout = int(get_opt_timeout())
2393
#      logging.info('parameters from cmd: host=\"%s\", port=%s, user=\"%s\", password=\"%s\", timeout=\"%s\", log-file=\"%s\"',\
2394
#          host, port, user, password, timeout, log_filename)
2395
#      do_check(host, port, user, password, timeout, upgrade_params)
2396
#    except mysql.connector.Error, e:
2397
#      logging.exception('mysql connctor error')
2398
#      raise e
2399
#    except Exception, e:
2400
#      logging.exception('normal error')
2401
#      raise e
2402
####====XXXX======######==== I am a splitter ====######======XXXX====####
2403
#filename:upgrade_health_checker.py
2404
##!/usr/bin/env python
2405
## -*- coding: utf-8 -*-
2406
#
2407
#import sys
2408
#import os
2409
#import time
2410
#import mysql.connector
2411
#from mysql.connector import errorcode
2412
#import logging
2413
#import getopt
2414
#
2415
#class UpgradeParams:
2416
#  log_filename = 'upgrade_cluster_health_checker.log'
2417
#
2418
##### --------------start : my_error.py --------------
2419
#class MyError(Exception):
2420
#  def __init__(self, value):
2421
#    self.value = value
2422
#  def __str__(self):
2423
#    return repr(self.value)
2424
#
2425
##### --------------start : actions.py 只允许执行查询语句--------------
2426
#class QueryCursor:
2427
#  __cursor = None
2428
#  def __init__(self, cursor):
2429
#    self.__cursor = cursor
2430
#  def exec_sql(self, sql, print_when_succ = True):
2431
#    try:
2432
#      self.__cursor.execute(sql)
2433
#      rowcount = self.__cursor.rowcount
2434
#      if True == print_when_succ:
2435
#        logging.info('succeed to execute sql: %s, rowcount = %d', sql, rowcount)
2436
#      return rowcount
2437
#    except mysql.connector.Error, e:
2438
#      logging.exception('mysql connector error, fail to execute sql: %s', sql)
2439
#      raise e
2440
#    except Exception, e:
2441
#      logging.exception('normal error, fail to execute sql: %s', sql)
2442
#      raise e
2443
#  def exec_query(self, sql, print_when_succ = True):
2444
#    try:
2445
#      self.__cursor.execute(sql)
2446
#      results = self.__cursor.fetchall()
2447
#      rowcount = self.__cursor.rowcount
2448
#      if True == print_when_succ:
2449
#        logging.info('succeed to execute query: %s, rowcount = %d', sql, rowcount)
2450
#      return (self.__cursor.description, results)
2451
#    except mysql.connector.Error, e:
2452
#      logging.exception('mysql connector error, fail to execute sql: %s', sql)
2453
#      raise e
2454
#    except Exception, e:
2455
#      logging.exception('normal error, fail to execute sql: %s', sql)
2456
#      raise e
2457
##### ---------------end----------------------
2458
#
2459
##### --------------start :  opt.py --------------
2460
#help_str = \
2461
#"""
2462
#Help:
2463
#""" +\
2464
#sys.argv[0] + """ [OPTIONS]""" +\
2465
#'\n\n' +\
2466
#'-I, --help          Display this help and exit.\n' +\
2467
#'-V, --version       Output version information and exit.\n' +\
2468
#'-h, --host=name     Connect to host.\n' +\
2469
#'-P, --port=name     Port number to use for connection.\n' +\
2470
#'-u, --user=name     User for login.\n' +\
2471
#'-p, --password=name Password to use when connecting to server. If password is\n' +\
2472
#'                    not given it\'s empty string "".\n' +\
2473
#'-m, --module=name   Modules to run. Modules should be a string combined by some of\n' +\
2474
#'                    the following strings: ddl, normal_dml, each_tenant_dml,\n' +\
2475
#'                    system_variable_dml, special_action, all. "all" represents\n' +\
2476
#'                    that all modules should be run. They are splitted by ",".\n' +\
2477
#'                    For example: -m all, or --module=ddl,normal_dml,special_action\n' +\
2478
#'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\
2479
#'-t, --timeout=name  check timeout.\n' + \
2480
#'-z, --zone=name     If zone is not specified, check all servers status in cluster. \n' +\
2481
#'                    Otherwise, only check servers status in specified zone. \n' + \
2482
#'\n\n' +\
2483
#'Maybe you want to run cmd like that:\n' +\
2484
#sys.argv[0] + ' -h 127.0.0.1 -P 3306 -u admin -p admin\n'
2485
#
2486
#version_str = """version 1.0.0"""
2487
#
2488
#class Option:
2489
#  __g_short_name_set = set([])
2490
#  __g_long_name_set = set([])
2491
#  __short_name = None
2492
#  __long_name = None
2493
#  __is_with_param = None
2494
#  __is_local_opt = None
2495
#  __has_value = None
2496
#  __value = None
2497
#  def __init__(self, short_name, long_name, is_with_param, is_local_opt, default_value = None):
2498
#    if short_name in Option.__g_short_name_set:
2499
#      raise MyError('duplicate option short name: {0}'.format(short_name))
2500
#    elif long_name in Option.__g_long_name_set:
2501
#      raise MyError('duplicate option long name: {0}'.format(long_name))
2502
#    Option.__g_short_name_set.add(short_name)
2503
#    Option.__g_long_name_set.add(long_name)
2504
#    self.__short_name = short_name
2505
#    self.__long_name = long_name
2506
#    self.__is_with_param = is_with_param
2507
#    self.__is_local_opt = is_local_opt
2508
#    self.__has_value = False
2509
#    if None != default_value:
2510
#      self.set_value(default_value)
2511
#  def is_with_param(self):
2512
#    return self.__is_with_param
2513
#  def get_short_name(self):
2514
#    return self.__short_name
2515
#  def get_long_name(self):
2516
#    return self.__long_name
2517
#  def has_value(self):
2518
#    return self.__has_value
2519
#  def get_value(self):
2520
#    return self.__value
2521
#  def set_value(self, value):
2522
#    self.__value = value
2523
#    self.__has_value = True
2524
#  def is_local_opt(self):
2525
#    return self.__is_local_opt
2526
#  def is_valid(self):
2527
#    return None != self.__short_name and None != self.__long_name and True == self.__has_value and None != self.__value
2528
#
2529
#g_opts =\
2530
#[\
2531
#Option('I', 'help', False, True),\
2532
#Option('V', 'version', False, True),\
2533
#Option('h', 'host', True, False),\
2534
#Option('P', 'port', True, False),\
2535
#Option('u', 'user', True, False),\
2536
#Option('p', 'password', True, False, ''),\
2537
## 要跑哪个模块,默认全跑
2538
#Option('m', 'module', True, False, 'all'),\
2539
## 日志文件路径,不同脚本的main函数中中会改成不同的默认值
2540
#Option('l', 'log-file', True, False),\
2541
#Option('t', 'timeout', True, False, 0),\
2542
#Option('z', 'zone', True, False, ''),\
2543
#]\
2544
#
2545
#def change_opt_defult_value(opt_long_name, opt_default_val):
2546
#  global g_opts
2547
#  for opt in g_opts:
2548
#    if opt.get_long_name() == opt_long_name:
2549
#      opt.set_value(opt_default_val)
2550
#      return
2551
#
2552
#def has_no_local_opts():
2553
#  global g_opts
2554
#  no_local_opts = True
2555
#  for opt in g_opts:
2556
#    if opt.is_local_opt() and opt.has_value():
2557
#      no_local_opts = False
2558
#  return no_local_opts
2559
#
2560
#def check_db_client_opts():
2561
#  global g_opts
2562
#  for opt in g_opts:
2563
#    if not opt.is_local_opt() and not opt.has_value():
2564
#      raise MyError('option "-{0}" has not been specified, maybe you should run "{1} --help" for help'\
2565
#          .format(opt.get_short_name(), sys.argv[0]))
2566
#
2567
#def parse_option(opt_name, opt_val):
2568
#  global g_opts
2569
#  for opt in g_opts:
2570
#    if opt_name in (('-' + opt.get_short_name()), ('--' + opt.get_long_name())):
2571
#      opt.set_value(opt_val)
2572
#
2573
#def parse_options(argv):
2574
#  global g_opts
2575
#  short_opt_str = ''
2576
#  long_opt_list = []
2577
#  for opt in g_opts:
2578
#    if opt.is_with_param():
2579
#      short_opt_str += opt.get_short_name() + ':'
2580
#    else:
2581
#      short_opt_str += opt.get_short_name()
2582
#  for opt in g_opts:
2583
#    if opt.is_with_param():
2584
#      long_opt_list.append(opt.get_long_name() + '=')
2585
#    else:
2586
#      long_opt_list.append(opt.get_long_name())
2587
#  (opts, args) = getopt.getopt(argv, short_opt_str, long_opt_list)
2588
#  for (opt_name, opt_val) in opts:
2589
#    parse_option(opt_name, opt_val)
2590
#  if has_no_local_opts():
2591
#    check_db_client_opts()
2592
#
2593
#def deal_with_local_opt(opt):
2594
#  if 'help' == opt.get_long_name():
2595
#    global help_str
2596
#    print help_str
2597
#  elif 'version' == opt.get_long_name():
2598
#    global version_str
2599
#    print version_str
2600
#
2601
#def deal_with_local_opts():
2602
#  global g_opts
2603
#  if has_no_local_opts():
2604
#    raise MyError('no local options, can not deal with local options')
2605
#  else:
2606
#    for opt in g_opts:
2607
#      if opt.is_local_opt() and opt.has_value():
2608
#        deal_with_local_opt(opt)
2609
#        # 只处理一个
2610
#        return
2611
#
2612
#def get_opt_host():
2613
#  global g_opts
2614
#  for opt in g_opts:
2615
#    if 'host' == opt.get_long_name():
2616
#      return opt.get_value()
2617
#
2618
#def get_opt_port():
2619
#  global g_opts
2620
#  for opt in g_opts:
2621
#    if 'port' == opt.get_long_name():
2622
#      return opt.get_value()
2623
#
2624
#def get_opt_user():
2625
#  global g_opts
2626
#  for opt in g_opts:
2627
#    if 'user' == opt.get_long_name():
2628
#      return opt.get_value()
2629
#
2630
#def get_opt_password():
2631
#  global g_opts
2632
#  for opt in g_opts:
2633
#    if 'password' == opt.get_long_name():
2634
#      return opt.get_value()
2635
#
2636
#def get_opt_module():
2637
#  global g_opts
2638
#  for opt in g_opts:
2639
#    if 'module' == opt.get_long_name():
2640
#      return opt.get_value()
2641
#
2642
#def get_opt_log_file():
2643
#  global g_opts
2644
#  for opt in g_opts:
2645
#    if 'log-file' == opt.get_long_name():
2646
#      return opt.get_value()
2647
#
2648
#def get_opt_timeout():
2649
#  global g_opts
2650
#  for opt in g_opts:
2651
#    if 'timeout' == opt.get_long_name():
2652
#      return opt.get_value()
2653
#
2654
#def get_opt_zone():
2655
#  global g_opts
2656
#  for opt in g_opts:
2657
#    if 'zone' == opt.get_long_name():
2658
#      return opt.get_value()
2659
##### ---------------end----------------------
2660
#
2661
##### --------------start :  do_upgrade_pre.py--------------
2662
#def config_logging_module(log_filenamme):
2663
#  logging.basicConfig(level=logging.INFO,\
2664
#      format='[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s',\
2665
#      datefmt='%Y-%m-%d %H:%M:%S',\
2666
#      filename=log_filenamme,\
2667
#      filemode='w')
2668
#  # 定义日志打印格式
2669
#  formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s', '%Y-%m-%d %H:%M:%S')
2670
#  #######################################
2671
#  # 定义一个Handler打印INFO及以上级别的日志到sys.stdout
2672
#  stdout_handler = logging.StreamHandler(sys.stdout)
2673
#  stdout_handler.setLevel(logging.INFO)
2674
#  # 设置日志打印格式
2675
#  stdout_handler.setFormatter(formatter)
2676
#  # 将定义好的stdout_handler日志handler添加到root logger
2677
#  logging.getLogger('').addHandler(stdout_handler)
2678
##### ---------------end----------------------
2679
#
2680
#def check_zone_valid(query_cur, zone):
2681
#  if zone != '':
2682
#    sql = """select count(*) from oceanbase.DBA_OB_ZONES where zone = '{0}'""".format(zone)
2683
#    (desc, results) = query_cur.exec_query(sql);
2684
#    if len(results) != 1 or len(results[0]) != 1:
2685
#      raise MyError("unmatched row/column cnt")
2686
#    elif results[0][0] == 0:
2687
#      raise MyError("zone:{0} doesn't exist".format(zone))
2688
#    else:
2689
#      logging.info("zone:{0} is valid".format(zone))
2690
#  else:
2691
#    logging.info("zone is empty, check all servers in cluster")
2692
#
2693
#def fetch_tenant_ids(query_cur):
2694
#  try:
2695
#    tenant_id_list = []
2696
#    (desc, results) = query_cur.exec_query("""select distinct tenant_id from oceanbase.__all_tenant order by tenant_id desc""")
2697
#    for r in results:
2698
#      tenant_id_list.append(r[0])
2699
#    return tenant_id_list
2700
#  except Exception, e:
2701
#    logging.exception('fail to fetch distinct tenant ids')
2702
#    raise e
2703
#
2704
#def set_default_timeout_by_tenant(query_cur, timeout, timeout_per_tenant, min_timeout):
2705
#  if timeout > 0:
2706
#    logging.info("use timeout from opt, timeout(s):{0}".format(timeout))
2707
#  else:
2708
#    tenant_id_list = fetch_tenant_ids(query_cur)
2709
#    cal_timeout = len(tenant_id_list) * timeout_per_tenant
2710
#    timeout = (cal_timeout if cal_timeout > min_timeout else min_timeout)
2711
#    logging.info("use default timeout caculated by tenants, "
2712
#                 "timeout(s):{0}, tenant_count:{1}, "
2713
#                 "timeout_per_tenant(s):{2}, min_timeout(s):{3}"
2714
#                 .format(timeout, len(tenant_id_list), timeout_per_tenant, min_timeout))
2715
#
2716
#  return timeout
2717
#
2718
##### START ####
2719
## 0. 检查server版本是否严格一致
2720
#def check_server_version_by_zone(query_cur, zone):
2721
#  if zone == '':
2722
#    logging.info("skip check server version by cluster")
2723
#  else:
2724
#    sql = """select distinct(substring_index(build_version, '_', 1)) from oceanbase.__all_server where zone = '{0}'""".format(zone);
2725
#    (desc, results) = query_cur.exec_query(sql);
2726
#    if len(results) != 1:
2727
#      raise MyError("servers build_version not match")
2728
#    else:
2729
#      logging.info("check server version success")
2730
#
2731
## 1. 检查paxos副本是否同步, paxos副本是否缺失
2732
#def check_paxos_replica(query_cur, timeout):
2733
#  # 1.1 检查paxos副本是否同步
2734
#  sql = """select count(*) from oceanbase.GV$OB_LOG_STAT where in_sync = 'NO'"""
2735
#  wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600)
2736
#  check_until_timeout(query_cur, sql, 0, wait_timeout)
2737
#
2738
#  # 1.2 检查paxos副本是否有缺失 TODO
2739
#  logging.info('check paxos replica success')
2740
#
2741
## 2. 检查observer是否可服务
2742
#def check_observer_status(query_cur, zone, timeout):
2743
#  sql = """select count(*) from oceanbase.__all_server where (start_service_time <= 0 or status='inactive')"""
2744
#  if zone != '':
2745
#    sql += """ and zone = '{0}'""".format(zone)
2746
#  wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600)
2747
#  check_until_timeout(query_cur, sql, 0, wait_timeout)
2748
#
2749
## 3. 检查schema是否刷新成功
2750
#def check_schema_status(query_cur, timeout):
2751
#  sql = """select if (a.cnt = b.cnt, 1, 0) as passed from (select count(*) as cnt from oceanbase.__all_virtual_server_schema_info where refreshed_schema_version > 1 and refreshed_schema_version % 8 = 0) as a join (select count(*) as cnt from oceanbase.__all_server join oceanbase.__all_tenant) as b"""
2752
#  wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600)
2753
#  check_until_timeout(query_cur, sql, 1, wait_timeout)
2754
#
2755
## 4. check major finish
2756
#def check_major_merge(query_cur, timeout):
2757
#  need_check = 0
2758
#  (desc, results) = query_cur.exec_query("""select distinct value from oceanbase.GV$OB_PARAMETERS where name = 'enable_major_freeze';""")
2759
#  if len(results) != 1:
2760
#    need_check = 1
2761
#  elif results[0][0] != 'True':
2762
#    need_check = 1
2763
#  if need_check == 1:
2764
#    wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600)
2765
#    sql = """select count(1) from oceanbase.CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')"""
2766
#    check_until_timeout(query_cur, sql, 0, wait_timeout)
2767
#    sql2 = """select /*+ query_timeout(1000000000) */ count(1) from oceanbase.__all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0"""
2768
#    check_until_timeout(query_cur, sql2, 0, wait_timeout)
2769
#
2770
#def check_until_timeout(query_cur, sql, value, timeout):
2771
#  times = timeout / 10
2772
#  while times >= 0:
2773
#    (desc, results) = query_cur.exec_query(sql)
2774
#
2775
#    if len(results) != 1 or len(results[0]) != 1:
2776
#      raise MyError("unmatched row/column cnt")
2777
#    elif results[0][0] == value:
2778
#      logging.info("check value is {0} success".format(value))
2779
#      break
2780
#    else:
2781
#      logging.info("value is {0}, expected value is {1}, not matched".format(results[0][0], value))
2782
#
2783
#    times -= 1
2784
#    if times == -1:
2785
#      logging.warn("""check {0} job timeout""".format(job_name))
2786
#      raise e
2787
#    time.sleep(10)
2788
#
2789
## 开始健康检查
2790
#def do_check(my_host, my_port, my_user, my_passwd, upgrade_params, timeout, need_check_major_status, zone = ''):
2791
#  try:
2792
#    conn = mysql.connector.connect(user = my_user,
2793
#                                   password = my_passwd,
2794
#                                   host = my_host,
2795
#                                   port = my_port,
2796
#                                   database = 'oceanbase',
2797
#                                   raise_on_warnings = True)
2798
#    conn.autocommit = True
2799
#    cur = conn.cursor(buffered=True)
2800
#    try:
2801
#      query_cur = QueryCursor(cur)
2802
#      check_zone_valid(query_cur, zone)
2803
#      check_observer_status(query_cur, zone, timeout)
2804
#      check_paxos_replica(query_cur, timeout)
2805
#      check_schema_status(query_cur, timeout)
2806
#      check_server_version_by_zone(query_cur, zone)
2807
#      if True == need_check_major_status:
2808
#        check_major_merge(query_cur, timeout)
2809
#    except Exception, e:
2810
#      logging.exception('run error')
2811
#      raise e
2812
#    finally:
2813
#      cur.close()
2814
#      conn.close()
2815
#  except mysql.connector.Error, e:
2816
#    logging.exception('connection error')
2817
#    raise e
2818
#  except Exception, e:
2819
#    logging.exception('normal error')
2820
#    raise e
2821
#
2822
#if __name__ == '__main__':
2823
#  upgrade_params = UpgradeParams()
2824
#  change_opt_defult_value('log-file', upgrade_params.log_filename)
2825
#  parse_options(sys.argv[1:])
2826
#  if not has_no_local_opts():
2827
#    deal_with_local_opts()
2828
#  else:
2829
#    check_db_client_opts()
2830
#    log_filename = get_opt_log_file()
2831
#    upgrade_params.log_filename = log_filename
2832
#    # 日志配置放在这里是为了前面的操作不要覆盖掉日志文件
2833
#    config_logging_module(upgrade_params.log_filename)
2834
#    try:
2835
#      host = get_opt_host()
2836
#      port = int(get_opt_port())
2837
#      user = get_opt_user()
2838
#      password = get_opt_password()
2839
#      timeout = int(get_opt_timeout())
2840
#      zone = get_opt_zone()
2841
#      logging.info('parameters from cmd: host=\"%s\", port=%s, user=\"%s\", password=\"%s\", log-file=\"%s\", timeout=%s, zone=\"%s\"', \
2842
#          host, port, user, password, log_filename, timeout, zone)
2843
#      do_check(host, port, user, password, upgrade_params, timeout, False, zone) # need_check_major_status = False
2844
#    except mysql.connector.Error, e:
2845
#      logging.exception('mysql connctor error')
2846
#      raise e
2847
#    except Exception, e:
2848
#      logging.exception('normal error')
2849
#      raise e
2850
#
2851
####====XXXX======######==== I am a splitter ====######======XXXX====####
2852
#filename:upgrade_post_checker.py
2853
##!/usr/bin/env python
2854
## -*- coding: utf-8 -*-
2855
#
2856
#import sys
2857
#import os
2858
#import time
2859
#import mysql.connector
2860
#from mysql.connector import errorcode
2861
#import logging
2862
#import time
2863
#import actions
2864
#
2865
##### START
2866
## 1 检查版本号
2867
#def check_cluster_version(cur, timeout):
2868
#  current_cluster_version = actions.get_current_cluster_version()
2869
#  actions.wait_parameter_sync(cur, False, "min_observer_version", current_cluster_version, timeout)
2870
#
2871
## 2 检查租户版本号
2872
#def check_data_version(cur, query_cur, timeout):
2873
#
2874
#  # get tenant except standby tenant
2875
#  sql = "select tenant_id from oceanbase.__all_tenant except select tenant_id from oceanbase.__all_virtual_tenant_info where tenant_role = 'STANDBY'"
2876
#  (desc, results) = query_cur.exec_query(sql)
2877
#  if len(results) == 0:
2878
#    logging.warn('result cnt not match')
2879
#    raise e
2880
#  tenant_count = len(results)
2881
#  tenant_ids_str = ''
2882
#  for index, row in enumerate(results):
2883
#    tenant_ids_str += """{0}{1}""".format((',' if index > 0 else ''), row[0])
2884
#
2885
#  # get server cnt
2886
#  sql = "select count(*) from oceanbase.__all_server";
2887
#  (desc, results) = query_cur.exec_query(sql)
2888
#  if len(results) != 1 or len(results[0]) != 1:
2889
#    logging.warn('result cnt not match')
2890
#    raise e
2891
#  server_count = results[0][0]
2892
#
2893
#  # check compatible sync
2894
#  parameter_count = int(server_count) * int(tenant_count)
2895
#  current_data_version = actions.get_current_data_version()
2896
#
2897
#  query_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 2, 60)
2898
#  actions.set_session_timeout(cur, query_timeout)
2899
#
2900
#  sql = """select count(*) as cnt from oceanbase.__all_virtual_tenant_parameter_info where name = 'compatible' and value = '{0}' and tenant_id in ({1})""".format(current_data_version, tenant_ids_str)
2901
#
2902
#  wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 60)
2903
#  times = wait_timeout / 5
2904
#  while times >= 0:
2905
#    logging.info(sql)
2906
#    cur.execute(sql)
2907
#    result = cur.fetchall()
2908
#    if len(result) != 1 or len(result[0]) != 1:
2909
#      logging.exception('result cnt not match')
2910
#      raise e
2911
#    elif result[0][0] == parameter_count:
2912
#      logging.info("""'compatible' is sync, value is {0}""".format(current_data_version))
2913
#      break
2914
#    else:
2915
#      logging.info("""'compatible' is not sync, value should be {0}, expected_cnt should be {1}, current_cnt is {2}""".format(current_data_version, parameter_count, result[0][0]))
2916
#
2917
#    times -= 1
2918
#    if times == -1:
2919
#      logging.exception("""check compatible:{0} sync timeout""".format(current_data_version))
2920
#      raise e
2921
#    time.sleep(5)
2922
#
2923
#  actions.set_session_timeout(cur, 10)
2924
#
2925
#  # check target_data_version/current_data_version from __all_core_table
2926
#  int_current_data_version = actions.get_version(current_data_version)
2927
#  sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0} and tenant_id in ({1})".format(int_current_data_version, tenant_ids_str)
2928
#  (desc, results) = query_cur.exec_query(sql)
2929
#  if len(results) != 1 or len(results[0]) != 1:
2930
#    logging.warn('result cnt not match')
2931
#    raise e
2932
#  elif 2 * tenant_count != results[0][0]:
2933
#    logging.warn('target_data_version/current_data_version not match with {0}, tenant_cnt:{1}, result_cnt:{2}'.format(current_data_version, tenant_count, results[0][0]))
2934
#    raise e
2935
#  else:
2936
#    logging.info("all tenant's target_data_version/current_data_version are match with {0}".format(current_data_version))
2937
#
2938
## 3 检查内部表自检是否成功
2939
#def check_root_inspection(cur, query_cur, timeout):
2940
#  sql = "select count(*) from oceanbase.__all_virtual_upgrade_inspection where info != 'succeed'"
2941
#
2942
#  wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 600)
2943
#
2944
#  times = wait_timeout / 10
2945
#  while times >= 0 :
2946
#    (desc, results) = query_cur.exec_query(sql)
2947
#    if results[0][0] == 0:
2948
#      break
2949
#    time.sleep(10)
2950
#    times -= 1
2951
#
2952
#  if times == -1:
2953
#    logging.warn('check root inspection failed!')
2954
#    raise e
2955
#  logging.info('check root inspection success')
2956
#
2957
## 4 开ddl
2958
#def enable_ddl(cur, timeout):
2959
#  actions.set_parameter(cur, 'enable_ddl', 'True', timeout)
2960
#
2961
## 5 打开rebalance
2962
#def enable_rebalance(cur, timeout):
2963
#  actions.set_parameter(cur, 'enable_rebalance', 'True', timeout)
2964
#
2965
## 6 打开rereplication
2966
#def enable_rereplication(cur, timeout):
2967
#  actions.set_parameter(cur, 'enable_rereplication', 'True', timeout)
2968
#
2969
## 7 打开major freeze
2970
#def enable_major_freeze(cur, timeout):
2971
#  actions.set_parameter(cur, 'enable_major_freeze', 'True', timeout)
2972
#  actions.set_tenant_parameter(cur, '_enable_adaptive_compaction', 'True', timeout)
2973
#  actions.do_resume_merge(cur, timeout)
2974
#
2975
## 开始升级后的检查
2976
#def do_check(conn, cur, query_cur, timeout):
2977
#  try:
2978
#    check_cluster_version(cur, timeout)
2979
#    check_data_version(cur, query_cur, timeout)
2980
#    check_root_inspection(cur, query_cur, timeout)
2981
#    enable_ddl(cur, timeout)
2982
#    enable_rebalance(cur, timeout)
2983
#    enable_rereplication(cur, timeout)
2984
#    enable_major_freeze(cur, timeout)
2985
#  except Exception, e:
2986
#    logging.exception('run error')
2987
#    raise e
2988
####====XXXX======######==== I am a splitter ====######======XXXX====####
2989
#sub file module end
2990

2991

2992
import os
2993
import sys
2994
import datetime
2995
from random import Random
2996

2997
class SplitError(Exception):
2998
  def __init__(self, value):
2999
    self.value = value
3000
  def __str__(self):
3001
    return repr(self.value)
3002

3003
def random_str(rand_str_len = 8):
3004
  str = ''
3005
  chars = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789'
3006
  length = len(chars) - 1
3007
  random = Random()
3008
  for i in range(rand_str_len):
3009
    str += chars[random.randint(0, length)]
3010
  return str
3011

3012
def split_py_files(sub_files_dir):
3013
  char_enter = '\n'
3014
  file_splitter_line = '####====XXXX======######==== I am a splitter ====######======XXXX====####'
3015
  sub_filename_line_prefix = '#filename:'
3016
  sub_file_module_end_line = '#sub file module end'
3017
  os.makedirs(sub_files_dir)
3018
  print('succeed to create run dir: ' + sub_files_dir + char_enter)
3019
  cur_file = open(sys.argv[0], 'r')
3020
  cur_file_lines = cur_file.readlines()
3021
  cur_file_lines_count = len(cur_file_lines)
3022
  sub_file_lines = []
3023
  sub_filename = ''
3024
  begin_read_sub_py_file = False
3025
  is_first_splitter_line = True
3026
  i = 0
3027
  while i < cur_file_lines_count:
3028
    if (file_splitter_line + char_enter) != cur_file_lines[i]:
3029
      if begin_read_sub_py_file:
3030
        sub_file_lines.append(cur_file_lines[i])
3031
    else:
3032
      if is_first_splitter_line:
3033
        is_first_splitter_line = False
3034
      else:
3035
        #读完一个子文件了,写到磁盘中
3036
        sub_file = open(sub_files_dir + '/' + sub_filename, 'w')
3037
        for sub_file_line in sub_file_lines:
3038
          sub_file.write(sub_file_line[1:])
3039
        sub_file.close()
3040
        #清空sub_file_lines
3041
        sub_file_lines = []
3042
      #再读取下一行的文件名或者结束标记
3043
      i += 1
3044
      if i >= cur_file_lines_count:
3045
        raise SplitError('invalid line index:' + str(i) + ', lines_count:' + str(cur_file_lines_count))
3046
      elif (sub_file_module_end_line + char_enter) == cur_file_lines[i]:
3047
        print 'succeed to split all sub py files'
3048
        break
3049
      else:
3050
        mark_idx = cur_file_lines[i].find(sub_filename_line_prefix)
3051
        if 0 != mark_idx:
3052
          raise SplitError('invalid sub file name line, mark_idx = ' + str(mark_idx) + ', line = ' + cur_file_lines[i])
3053
        else:
3054
          sub_filename = cur_file_lines[i][len(sub_filename_line_prefix):-1]
3055
          begin_read_sub_py_file = True
3056
    i += 1
3057
  cur_file.close()
3058

3059

3060

3061
if __name__ == '__main__':
3062
  cur_filename = sys.argv[0][sys.argv[0].rfind(os.sep)+1:]
3063
  (cur_file_short_name,cur_file_ext_name1) = os.path.splitext(sys.argv[0])
3064
  (cur_file_real_name,cur_file_ext_name2) = os.path.splitext(cur_filename)
3065
  sub_files_dir_suffix = '_extract_files_' + datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') + '_' + random_str()
3066
  sub_files_dir = cur_file_short_name + sub_files_dir_suffix
3067
  sub_files_short_dir = cur_file_real_name + sub_files_dir_suffix
3068
  split_py_files(sub_files_dir)
3069
  exec('from ' + sub_files_short_dir + '.do_upgrade_pre import do_upgrade_by_argv')
3070
  do_upgrade_by_argv(sys.argv[1:])
3071

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.