oceanbase

Форк
0
/
upgrade_health_checker.py 
446 строк · 15.9 Кб
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3

4
import sys
5
import os
6
import time
7
import mysql.connector
8
from mysql.connector import errorcode
9
import logging
10
import getopt
11

12
class UpgradeParams:
13
  log_filename = 'upgrade_cluster_health_checker.log'
14

15
#### --------------start : my_error.py --------------
16
class MyError(Exception):
17
  def __init__(self, value):
18
    self.value = value
19
  def __str__(self):
20
    return repr(self.value)
21

22
#### --------------start : actions.py 只允许执行查询语句--------------
23
class QueryCursor:
24
  __cursor = None
25
  def __init__(self, cursor):
26
    self.__cursor = cursor
27
  def exec_sql(self, sql, print_when_succ = True):
28
    try:
29
      self.__cursor.execute(sql)
30
      rowcount = self.__cursor.rowcount
31
      if True == print_when_succ:
32
        logging.info('succeed to execute sql: %s, rowcount = %d', sql, rowcount)
33
      return rowcount
34
    except mysql.connector.Error, e:
35
      logging.exception('mysql connector error, fail to execute sql: %s', sql)
36
      raise e
37
    except Exception, e:
38
      logging.exception('normal error, fail to execute sql: %s', sql)
39
      raise e
40
  def exec_query(self, sql, print_when_succ = True):
41
    try:
42
      self.__cursor.execute(sql)
43
      results = self.__cursor.fetchall()
44
      rowcount = self.__cursor.rowcount
45
      if True == print_when_succ:
46
        logging.info('succeed to execute query: %s, rowcount = %d', sql, rowcount)
47
      return (self.__cursor.description, results)
48
    except mysql.connector.Error, e:
49
      logging.exception('mysql connector error, fail to execute sql: %s', sql)
50
      raise e
51
    except Exception, e:
52
      logging.exception('normal error, fail to execute sql: %s', sql)
53
      raise e
54
#### ---------------end----------------------
55

56
#### --------------start :  opt.py --------------
57
help_str = \
58
"""
59
Help:
60
""" +\
61
sys.argv[0] + """ [OPTIONS]""" +\
62
'\n\n' +\
63
'-I, --help          Display this help and exit.\n' +\
64
'-V, --version       Output version information and exit.\n' +\
65
'-h, --host=name     Connect to host.\n' +\
66
'-P, --port=name     Port number to use for connection.\n' +\
67
'-u, --user=name     User for login.\n' +\
68
'-p, --password=name Password to use when connecting to server. If password is\n' +\
69
'                    not given it\'s empty string "".\n' +\
70
'-m, --module=name   Modules to run. Modules should be a string combined by some of\n' +\
71
'                    the following strings: ddl, normal_dml, each_tenant_dml,\n' +\
72
'                    system_variable_dml, special_action, all. "all" represents\n' +\
73
'                    that all modules should be run. They are splitted by ",".\n' +\
74
'                    For example: -m all, or --module=ddl,normal_dml,special_action\n' +\
75
'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\
76
'-t, --timeout=name  check timeout.\n' + \
77
'-z, --zone=name     If zone is not specified, check all servers status in cluster. \n' +\
78
'                    Otherwise, only check servers status in specified zone. \n' + \
79
'\n\n' +\
80
'Maybe you want to run cmd like that:\n' +\
81
sys.argv[0] + ' -h 127.0.0.1 -P 3306 -u admin -p admin\n'
82

83
version_str = """version 1.0.0"""
84

85
class Option:
86
  __g_short_name_set = set([])
87
  __g_long_name_set = set([])
88
  __short_name = None
89
  __long_name = None
90
  __is_with_param = None
91
  __is_local_opt = None
92
  __has_value = None
93
  __value = None
94
  def __init__(self, short_name, long_name, is_with_param, is_local_opt, default_value = None):
95
    if short_name in Option.__g_short_name_set:
96
      raise MyError('duplicate option short name: {0}'.format(short_name))
97
    elif long_name in Option.__g_long_name_set:
98
      raise MyError('duplicate option long name: {0}'.format(long_name))
99
    Option.__g_short_name_set.add(short_name)
100
    Option.__g_long_name_set.add(long_name)
101
    self.__short_name = short_name
102
    self.__long_name = long_name
103
    self.__is_with_param = is_with_param
104
    self.__is_local_opt = is_local_opt
105
    self.__has_value = False
106
    if None != default_value:
107
      self.set_value(default_value)
108
  def is_with_param(self):
109
    return self.__is_with_param
110
  def get_short_name(self):
111
    return self.__short_name
112
  def get_long_name(self):
113
    return self.__long_name
114
  def has_value(self):
115
    return self.__has_value
116
  def get_value(self):
117
    return self.__value
118
  def set_value(self, value):
119
    self.__value = value
120
    self.__has_value = True
121
  def is_local_opt(self):
122
    return self.__is_local_opt
123
  def is_valid(self):
124
    return None != self.__short_name and None != self.__long_name and True == self.__has_value and None != self.__value
125

126
g_opts =\
127
[\
128
Option('I', 'help', False, True),\
129
Option('V', 'version', False, True),\
130
Option('h', 'host', True, False),\
131
Option('P', 'port', True, False),\
132
Option('u', 'user', True, False),\
133
Option('p', 'password', True, False, ''),\
134
# 要跑哪个模块,默认全跑
135
Option('m', 'module', True, False, 'all'),\
136
# 日志文件路径,不同脚本的main函数中中会改成不同的默认值
137
Option('l', 'log-file', True, False),\
138
Option('t', 'timeout', True, False, 0),\
139
Option('z', 'zone', True, False, ''),\
140
]\
141

142
def change_opt_defult_value(opt_long_name, opt_default_val):
143
  global g_opts
144
  for opt in g_opts:
145
    if opt.get_long_name() == opt_long_name:
146
      opt.set_value(opt_default_val)
147
      return
148

149
def has_no_local_opts():
150
  global g_opts
151
  no_local_opts = True
152
  for opt in g_opts:
153
    if opt.is_local_opt() and opt.has_value():
154
      no_local_opts = False
155
  return no_local_opts
156

157
def check_db_client_opts():
158
  global g_opts
159
  for opt in g_opts:
160
    if not opt.is_local_opt() and not opt.has_value():
161
      raise MyError('option "-{0}" has not been specified, maybe you should run "{1} --help" for help'\
162
          .format(opt.get_short_name(), sys.argv[0]))
163

164
def parse_option(opt_name, opt_val):
165
  global g_opts
166
  for opt in g_opts:
167
    if opt_name in (('-' + opt.get_short_name()), ('--' + opt.get_long_name())):
168
      opt.set_value(opt_val)
169

170
def parse_options(argv):
171
  global g_opts
172
  short_opt_str = ''
173
  long_opt_list = []
174
  for opt in g_opts:
175
    if opt.is_with_param():
176
      short_opt_str += opt.get_short_name() + ':'
177
    else:
178
      short_opt_str += opt.get_short_name()
179
  for opt in g_opts:
180
    if opt.is_with_param():
181
      long_opt_list.append(opt.get_long_name() + '=')
182
    else:
183
      long_opt_list.append(opt.get_long_name())
184
  (opts, args) = getopt.getopt(argv, short_opt_str, long_opt_list)
185
  for (opt_name, opt_val) in opts:
186
    parse_option(opt_name, opt_val)
187
  if has_no_local_opts():
188
    check_db_client_opts()
189

190
def deal_with_local_opt(opt):
191
  if 'help' == opt.get_long_name():
192
    global help_str
193
    print help_str
194
  elif 'version' == opt.get_long_name():
195
    global version_str
196
    print version_str
197

198
def deal_with_local_opts():
199
  global g_opts
200
  if has_no_local_opts():
201
    raise MyError('no local options, can not deal with local options')
202
  else:
203
    for opt in g_opts:
204
      if opt.is_local_opt() and opt.has_value():
205
        deal_with_local_opt(opt)
206
        # 只处理一个
207
        return
208

209
def get_opt_host():
210
  global g_opts
211
  for opt in g_opts:
212
    if 'host' == opt.get_long_name():
213
      return opt.get_value()
214

215
def get_opt_port():
216
  global g_opts
217
  for opt in g_opts:
218
    if 'port' == opt.get_long_name():
219
      return opt.get_value()
220

221
def get_opt_user():
222
  global g_opts
223
  for opt in g_opts:
224
    if 'user' == opt.get_long_name():
225
      return opt.get_value()
226

227
def get_opt_password():
228
  global g_opts
229
  for opt in g_opts:
230
    if 'password' == opt.get_long_name():
231
      return opt.get_value()
232

233
def get_opt_module():
234
  global g_opts
235
  for opt in g_opts:
236
    if 'module' == opt.get_long_name():
237
      return opt.get_value()
238

239
def get_opt_log_file():
240
  global g_opts
241
  for opt in g_opts:
242
    if 'log-file' == opt.get_long_name():
243
      return opt.get_value()
244

245
def get_opt_timeout():
246
  global g_opts
247
  for opt in g_opts:
248
    if 'timeout' == opt.get_long_name():
249
      return opt.get_value()
250

251
def get_opt_zone():
252
  global g_opts
253
  for opt in g_opts:
254
    if 'zone' == opt.get_long_name():
255
      return opt.get_value()
256
#### ---------------end----------------------
257

258
#### --------------start :  do_upgrade_pre.py--------------
259
def config_logging_module(log_filenamme):
260
  logging.basicConfig(level=logging.INFO,\
261
      format='[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s',\
262
      datefmt='%Y-%m-%d %H:%M:%S',\
263
      filename=log_filenamme,\
264
      filemode='w')
265
  # 定义日志打印格式
266
  formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s', '%Y-%m-%d %H:%M:%S')
267
  #######################################
268
  # 定义一个Handler打印INFO及以上级别的日志到sys.stdout
269
  stdout_handler = logging.StreamHandler(sys.stdout)
270
  stdout_handler.setLevel(logging.INFO)
271
  # 设置日志打印格式
272
  stdout_handler.setFormatter(formatter)
273
  # 将定义好的stdout_handler日志handler添加到root logger
274
  logging.getLogger('').addHandler(stdout_handler)
275
#### ---------------end----------------------
276

277
def check_zone_valid(query_cur, zone):
278
  if zone != '':
279
    sql = """select count(*) from oceanbase.DBA_OB_ZONES where zone = '{0}'""".format(zone)
280
    (desc, results) = query_cur.exec_query(sql);
281
    if len(results) != 1 or len(results[0]) != 1:
282
      raise MyError("unmatched row/column cnt")
283
    elif results[0][0] == 0:
284
      raise MyError("zone:{0} doesn't exist".format(zone))
285
    else:
286
      logging.info("zone:{0} is valid".format(zone))
287
  else:
288
    logging.info("zone is empty, check all servers in cluster")
289

290
def fetch_tenant_ids(query_cur):
291
  try:
292
    tenant_id_list = []
293
    (desc, results) = query_cur.exec_query("""select distinct tenant_id from oceanbase.__all_tenant order by tenant_id desc""")
294
    for r in results:
295
      tenant_id_list.append(r[0])
296
    return tenant_id_list
297
  except Exception, e:
298
    logging.exception('fail to fetch distinct tenant ids')
299
    raise e
300

301
def set_default_timeout_by_tenant(query_cur, timeout, timeout_per_tenant, min_timeout):
302
  if timeout > 0:
303
    logging.info("use timeout from opt, timeout(s):{0}".format(timeout))
304
  else:
305
    tenant_id_list = fetch_tenant_ids(query_cur)
306
    cal_timeout = len(tenant_id_list) * timeout_per_tenant
307
    timeout = (cal_timeout if cal_timeout > min_timeout else min_timeout)
308
    logging.info("use default timeout caculated by tenants, "
309
                 "timeout(s):{0}, tenant_count:{1}, "
310
                 "timeout_per_tenant(s):{2}, min_timeout(s):{3}"
311
                 .format(timeout, len(tenant_id_list), timeout_per_tenant, min_timeout))
312

313
  return timeout
314

315
#### START ####
316
# 0. 检查server版本是否严格一致
317
def check_server_version_by_zone(query_cur, zone):
318
  if zone == '':
319
    logging.info("skip check server version by cluster")
320
  else:
321
    sql = """select distinct(substring_index(build_version, '_', 1)) from oceanbase.__all_server where zone = '{0}'""".format(zone);
322
    (desc, results) = query_cur.exec_query(sql);
323
    if len(results) != 1:
324
      raise MyError("servers build_version not match")
325
    else:
326
      logging.info("check server version success")
327

328
# 1. 检查paxos副本是否同步, paxos副本是否缺失
329
def check_paxos_replica(query_cur, timeout):
330
  # 1.1 检查paxos副本是否同步
331
  sql = """select count(*) from oceanbase.GV$OB_LOG_STAT where in_sync = 'NO'"""
332
  wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600)
333
  check_until_timeout(query_cur, sql, 0, wait_timeout)
334

335
  # 1.2 检查paxos副本是否有缺失 TODO
336
  logging.info('check paxos replica success')
337

338
# 2. 检查observer是否可服务
339
def check_observer_status(query_cur, zone, timeout):
340
  sql = """select count(*) from oceanbase.__all_server where (start_service_time <= 0 or status='inactive')"""
341
  if zone != '':
342
    sql += """ and zone = '{0}'""".format(zone)
343
  wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 10, 600)
344
  check_until_timeout(query_cur, sql, 0, wait_timeout)
345

346
# 3. 检查schema是否刷新成功
347
def check_schema_status(query_cur, timeout):
348
  sql = """select if (a.cnt = b.cnt, 1, 0) as passed from (select count(*) as cnt from oceanbase.__all_virtual_server_schema_info where refreshed_schema_version > 1 and refreshed_schema_version % 8 = 0) as a join (select count(*) as cnt from oceanbase.__all_server join oceanbase.__all_tenant) as b"""
349
  wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600)
350
  check_until_timeout(query_cur, sql, 1, wait_timeout)
351

352
# 4. check major finish
353
def check_major_merge(query_cur, timeout):
354
  need_check = 0
355
  (desc, results) = query_cur.exec_query("""select distinct value from oceanbase.GV$OB_PARAMETERS where name = 'enable_major_freeze';""")
356
  if len(results) != 1:
357
    need_check = 1
358
  elif results[0][0] != 'True':
359
    need_check = 1
360
  if need_check == 1:
361
    wait_timeout = set_default_timeout_by_tenant(query_cur, timeout, 30, 600)
362
    sql = """select count(1) from oceanbase.CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')"""
363
    check_until_timeout(query_cur, sql, 0, wait_timeout)
364
    sql2 = """select /*+ query_timeout(1000000000) */ count(1) from oceanbase.__all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0"""
365
    check_until_timeout(query_cur, sql2, 0, wait_timeout)
366

367
def check_until_timeout(query_cur, sql, value, timeout):
368
  times = timeout / 10
369
  while times >= 0:
370
    (desc, results) = query_cur.exec_query(sql)
371

372
    if len(results) != 1 or len(results[0]) != 1:
373
      raise MyError("unmatched row/column cnt")
374
    elif results[0][0] == value:
375
      logging.info("check value is {0} success".format(value))
376
      break
377
    else:
378
      logging.info("value is {0}, expected value is {1}, not matched".format(results[0][0], value))
379

380
    times -= 1
381
    if times == -1:
382
      logging.warn("""check {0} job timeout""".format(job_name))
383
      raise e
384
    time.sleep(10)
385

386
# 开始健康检查
387
def do_check(my_host, my_port, my_user, my_passwd, upgrade_params, timeout, need_check_major_status, zone = ''):
388
  try:
389
    conn = mysql.connector.connect(user = my_user,
390
                                   password = my_passwd,
391
                                   host = my_host,
392
                                   port = my_port,
393
                                   database = 'oceanbase',
394
                                   raise_on_warnings = True)
395
    conn.autocommit = True
396
    cur = conn.cursor(buffered=True)
397
    try:
398
      query_cur = QueryCursor(cur)
399
      check_zone_valid(query_cur, zone)
400
      check_observer_status(query_cur, zone, timeout)
401
      check_paxos_replica(query_cur, timeout)
402
      check_schema_status(query_cur, timeout)
403
      check_server_version_by_zone(query_cur, zone)
404
      if True == need_check_major_status:
405
        check_major_merge(query_cur, timeout)
406
    except Exception, e:
407
      logging.exception('run error')
408
      raise e
409
    finally:
410
      cur.close()
411
      conn.close()
412
  except mysql.connector.Error, e:
413
    logging.exception('connection error')
414
    raise e
415
  except Exception, e:
416
    logging.exception('normal error')
417
    raise e
418

419
if __name__ == '__main__':
420
  upgrade_params = UpgradeParams()
421
  change_opt_defult_value('log-file', upgrade_params.log_filename)
422
  parse_options(sys.argv[1:])
423
  if not has_no_local_opts():
424
    deal_with_local_opts()
425
  else:
426
    check_db_client_opts()
427
    log_filename = get_opt_log_file()
428
    upgrade_params.log_filename = log_filename
429
    # 日志配置放在这里是为了前面的操作不要覆盖掉日志文件
430
    config_logging_module(upgrade_params.log_filename)
431
    try:
432
      host = get_opt_host()
433
      port = int(get_opt_port())
434
      user = get_opt_user()
435
      password = get_opt_password()
436
      timeout = int(get_opt_timeout())
437
      zone = get_opt_zone()
438
      logging.info('parameters from cmd: host=\"%s\", port=%s, user=\"%s\", password=\"%s\", log-file=\"%s\", timeout=%s, zone=\"%s\"', \
439
          host, port, user, password, log_filename, timeout, zone)
440
      do_check(host, port, user, password, upgrade_params, timeout, False, zone) # need_check_major_status = False
441
    except mysql.connector.Error, e:
442
      logging.exception('mysql connctor error')
443
      raise e
444
    except Exception, e:
445
      logging.exception('normal error')
446
      raise e
447

448

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.