oceanbase

Форк
0
/
actions.py 
573 строки · 18.9 Кб
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3

4
import time
5
import re
6
import json
7
import traceback
8
import sys
9
import mysql.connector
10
from mysql.connector import errorcode
11
from my_error import MyError
12
import logging
13

14
class SqlItem:
15
  action_sql = None
16
  rollback_sql = None
17
  def __init__(self, action_sql, rollback_sql):
18
    self.action_sql = action_sql
19
    self.rollback_sql = rollback_sql
20

21
current_cluster_version = "4.3.0.1"
22
current_data_version = "4.3.0.1"
23
g_succ_sql_list = []
24
g_commit_sql_list = []
25

26
def get_current_cluster_version():
27
  return current_cluster_version
28

29
def get_current_data_version():
30
  return current_data_version
31

32
def refresh_commit_sql_list():
33
  global g_succ_sql_list
34
  global g_commit_sql_list
35
  if len(g_commit_sql_list) < len(g_succ_sql_list):
36
    for i in range(len(g_commit_sql_list), len(g_succ_sql_list)):
37
      g_commit_sql_list.append(g_succ_sql_list[i])
38

39
def get_succ_sql_list_str():
40
  global g_succ_sql_list
41
  ret_str = ''
42
  for i in range(0, len(g_succ_sql_list)):
43
    if i > 0:
44
      ret_str += '\n'
45
    ret_str += g_succ_sql_list[i].action_sql + ';'
46
  return ret_str
47

48
def get_commit_sql_list_str():
49
  global g_commit_sql_list
50
  ret_str = ''
51
  for i in range(0, len(g_commit_sql_list)):
52
    if i > 0:
53
      ret_str += '\n'
54
    ret_str += g_commit_sql_list[i].action_sql + ';'
55
  return ret_str
56

57
def get_rollback_sql_file_lines_str():
58
  global g_commit_sql_list
59
  ret_str = ''
60
  g_commit_sql_list_len = len(g_commit_sql_list)
61
  for i in range(0, g_commit_sql_list_len):
62
    if i > 0:
63
      ret_str += '\n'
64
    idx = g_commit_sql_list_len - 1 - i
65
    ret_str += '/*\n' + g_commit_sql_list[idx].action_sql + ';\n*/\n'
66
    ret_str += g_commit_sql_list[idx].rollback_sql + ';'
67
  return ret_str
68

69
def dump_rollback_sql_to_file(rollback_sql_filename):
70
  logging.info('===================== begin to dump rollback sql file ============================')
71
  rollback_sql_file = open(rollback_sql_filename, 'w')
72
  rollback_sql_file.write('# 此文件是回滚用的sql。\n')
73
  rollback_sql_file.write('# 注释的sql是已经成功commit的sql,它的下一条没被注释的sql则是对应的回滚sql。回滚的sql的排序跟commit的sql的排序刚好相反。\n')
74
  rollback_sql_file.write('# 跑升级脚本失败的时候可以参考本文件来进行回滚。\n')
75
  rollback_sql_file.write('\n')
76
  rollback_sql_file_lines_str = get_rollback_sql_file_lines_str()
77
  rollback_sql_file.write(rollback_sql_file_lines_str)
78
  rollback_sql_file.close()
79
  logging.info('=========== succeed to dump rollback sql file to: ' + rollback_sql_filename + '===============')
80

81
def check_is_ddl_sql(sql):
82
  word_list = sql.split()
83
  if len(word_list) < 1:
84
    raise MyError('sql is empty, sql="{0}"'.format(sql))
85
  key_word = word_list[0].lower()
86
  if 'create' != key_word and 'alter' != key_word:
87
    raise MyError('sql must be ddl, key_word="{0}", sql="{1}"'.format(key_word, sql))
88

89
def check_is_query_sql(sql):
90
  word_list = sql.split()
91
  if len(word_list) < 1:
92
    raise MyError('sql is empty, sql="{0}"'.format(sql))
93
  key_word = word_list[0].lower()
94
  if 'select' != key_word and 'show' != key_word and 'desc' != key_word:
95
    raise MyError('sql must be query, key_word="{0}", sql="{1}"'.format(key_word, sql))
96

97
def check_is_update_sql(sql):
98
  word_list = sql.split()
99
  if len(word_list) < 1:
100
    raise MyError('sql is empty, sql="{0}"'.format(sql))
101
  key_word = word_list[0].lower()
102
  if 'insert' != key_word and 'update' != key_word and 'replace' != key_word and 'set' != key_word and 'delete' != key_word:
103
    # 还有类似这种:select @current_ts := now()
104
    if not (len(word_list) >= 3 and 'select' == word_list[0].lower()\
105
        and word_list[1].lower().startswith('@') and ':=' == word_list[2].lower()):
106
      raise MyError('sql must be update, key_word="{0}", sql="{1}"'.format(key_word, sql))
107

108
def get_min_cluster_version(cur):
109
  min_cluster_version = 0
110
  sql = """select distinct value from oceanbase.GV$OB_PARAMETERS where name='min_observer_version'"""
111
  logging.info(sql)
112
  cur.execute(sql)
113
  results = cur.fetchall()
114
  if len(results) != 1:
115
    logging.exception('min_observer_version is not sync')
116
    raise e
117
  elif len(results[0]) != 1:
118
    logging.exception('column cnt not match')
119
    raise e
120
  else:
121
    min_cluster_version = get_version(results[0][0])
122
  return min_cluster_version
123

124
def set_parameter(cur, parameter, value, timeout = 0):
125
  sql = """alter system set {0} = '{1}'""".format(parameter, value)
126
  logging.info(sql)
127
  cur.execute(sql)
128
  wait_parameter_sync(cur, False, parameter, value, timeout)
129

130
def set_session_timeout(cur, seconds):
131
  sql = "set @@session.ob_query_timeout = {0}".format(seconds * 1000 * 1000)
132
  logging.info(sql)
133
  cur.execute(sql)
134

135
def set_default_timeout_by_tenant(cur, timeout, timeout_per_tenant, min_timeout):
136
  if timeout > 0:
137
    logging.info("use timeout from opt, timeout(s):{0}".format(timeout))
138
  else:
139
    query_cur = QueryCursor(cur)
140
    tenant_id_list = fetch_tenant_ids(query_cur)
141
    cal_timeout = len(tenant_id_list) * timeout_per_tenant
142
    timeout = (cal_timeout if cal_timeout > min_timeout else min_timeout)
143
    logging.info("use default timeout caculated by tenants, "
144
                 "timeout(s):{0}, tenant_count:{1}, "
145
                 "timeout_per_tenant(s):{2}, min_timeout(s):{3}"
146
                 .format(timeout, len(tenant_id_list), timeout_per_tenant, min_timeout))
147

148
  return timeout
149

150
def set_tenant_parameter(cur, parameter, value, timeout = 0):
151

152
  tenants_list = []
153
  if get_min_cluster_version(cur) < get_version("4.2.1.0"):
154
    tenants_list = ['all']
155
  else:
156
    tenants_list = ['sys', 'all_user', 'all_meta']
157

158
  query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60)
159

160
  set_session_timeout(cur, query_timeout)
161

162
  for tenants in tenants_list:
163
    sql = """alter system set {0} = '{1}' tenant = '{2}'""".format(parameter, value, tenants)
164
    logging.info(sql)
165
    cur.execute(sql)
166

167
  set_session_timeout(cur, 10)
168

169
  wait_parameter_sync(cur, True, parameter, value, timeout)
170

171
def get_ori_enable_ddl(cur, timeout):
172
  ori_value_str = fetch_ori_enable_ddl(cur)
173
  wait_parameter_sync(cur, False, 'enable_ddl', ori_value_str, timeout)
174
  ori_value = (ori_value_str == 'True')
175
  return ori_value
176

177
def fetch_ori_enable_ddl(cur):
178
  ori_value = 'True'
179
  sql = """select value from oceanbase.__all_sys_parameter where name = 'enable_ddl'"""
180

181
  logging.info(sql)
182
  cur.execute(sql)
183
  result = cur.fetchall()
184

185
  if len(result) == 0:
186
    # means default value, is True
187
    ori_value = 'True'
188
  elif len(result) != 1 or len(result[0]) != 1:
189
    logging.exception('result cnt not match')
190
    raise e
191
  elif result[0][0].lower() in ["1", "true", "on", "yes", 't']:
192
    ori_value = 'True'
193
  elif result[0][0].lower() in ["0", "false", "off", "no", 'f']:
194
    ori_value = 'False'
195
  else:
196
    logging.exception("""result value is invalid, result:{0}""".format(result[0][0]))
197
    raise e
198
  return ori_value
199

200
# print version like "x.x.x.x"
201
def print_version(version):
202
  version = int(version)
203
  major = (version >> 32) & 0xffffffff
204
  minor = (version >> 16) & 0xffff
205
  major_patch = (version >> 8) & 0xff
206
  minor_patch = version & 0xff
207
  version_str = "{0}.{1}.{2}.{3}".format(major, minor, major_patch, minor_patch)
208

209
# version str should like "x.x.x.x"
210
def get_version(version_str):
211
  versions = version_str.split(".")
212

213
  if len(versions) != 4:
214
    logging.exception("""version:{0} is invalid""".format(version_str))
215
    raise e
216

217
  major = int(versions[0])
218
  minor = int(versions[1])
219
  major_patch = int(versions[2])
220
  minor_patch = int(versions[3])
221

222
  if major > 0xffffffff or minor > 0xffff or major_patch > 0xff or minor_patch > 0xff:
223
    logging.exception("""version:{0} is invalid""".format(version_str))
224
    raise e
225

226
  version = (major << 32) | (minor << 16) | (major_patch << 8) | (minor_patch)
227
  return version
228

229
def check_server_version_by_cluster(cur):
230
  sql = """select distinct(substring_index(build_version, '_', 1)) from __all_server""";
231
  logging.info(sql)
232
  cur.execute(sql)
233
  result = cur.fetchall()
234
  if len(result) != 1:
235
    raise MyError("servers build_version not match")
236
  else:
237
    logging.info("check server version success")
238

239
def check_parameter(cur, is_tenant_config, key, value):
240
  table_name = "GV$OB_PARAMETERS" if not is_tenant_config else "__all_virtual_tenant_parameter_info"
241
  sql = """select * from oceanbase.{0}
242
           where name = '{1}' and value = '{2}'""".format(table_name, key, value)
243
  logging.info(sql)
244
  cur.execute(sql)
245
  result = cur.fetchall()
246
  bret = False
247
  if len(result) > 0:
248
    bret = True
249
  else:
250
    bret = False
251
  return bret
252

253
def wait_parameter_sync(cur, is_tenant_config, key, value, timeout):
254
  table_name = "GV$OB_PARAMETERS" if not is_tenant_config else "__all_virtual_tenant_parameter_info"
255
  sql = """select count(*) as cnt from oceanbase.{0}
256
           where name = '{1}' and value != '{2}'""".format(table_name, key, value)
257

258
  wait_timeout = 0
259
  query_timeout = 0
260
  if not is_tenant_config or timeout > 0:
261
    wait_timeout = (timeout if timeout > 0 else 60)
262
    query_timeout = wait_timeout
263
  else:
264
    # is_tenant_config & timeout not set
265
    wait_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60)
266
    query_timeout = set_default_timeout_by_tenant(cur, timeout, 2, 60)
267

268
  set_session_timeout(cur, query_timeout)
269

270
  times = wait_timeout / 5
271
  while times >= 0:
272
    logging.info(sql)
273
    cur.execute(sql)
274
    result = cur.fetchall()
275
    if len(result) != 1 or len(result[0]) != 1:
276
      logging.exception('result cnt not match')
277
      raise e
278
    elif result[0][0] == 0:
279
      logging.info("""{0} is sync, value is {1}""".format(key, value))
280
      break
281
    else:
282
      logging.info("""{0} is not sync, value should be {1}""".format(key, value))
283

284
    times -= 1
285
    if times == -1:
286
      logging.exception("""check {0}:{1} sync timeout""".format(key, value))
287
      raise e
288
    time.sleep(5)
289

290
  set_session_timeout(cur, 10)
291

292
def do_begin_upgrade(cur, timeout):
293

294
  if not check_parameter(cur, False, "enable_upgrade_mode", "True"):
295
    action_sql = "alter system begin upgrade"
296
    rollback_sql = "alter system end upgrade"
297
    logging.info(action_sql)
298

299
    cur.execute(action_sql)
300

301
    global g_succ_sql_list
302
    g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
303

304
  wait_parameter_sync(cur, False, "enable_upgrade_mode", "True", timeout)
305

306

307
def do_begin_rolling_upgrade(cur, timeout):
308

309
  if not check_parameter(cur, False, "_upgrade_stage", "DBUPGRADE"):
310
    action_sql = "alter system begin rolling upgrade"
311
    rollback_sql = "alter system end upgrade"
312

313
    logging.info(action_sql)
314
    cur.execute(action_sql)
315

316
    global g_succ_sql_list
317
    g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
318

319
  wait_parameter_sync(cur, False, "_upgrade_stage", "DBUPGRADE", timeout)
320

321

322
def do_end_rolling_upgrade(cur, timeout):
323

324
  # maybe in upgrade_post_check stage or never run begin upgrade
325
  if check_parameter(cur, False, "enable_upgrade_mode", "False"):
326
    return
327

328
  current_cluster_version = get_current_cluster_version()
329
  if not check_parameter(cur, False, "_upgrade_stage", "POSTUPGRADE") or not check_parameter(cur, False, "min_observer_version", current_cluster_version):
330
    action_sql = "alter system end rolling upgrade"
331
    rollback_sql = "alter system end upgrade"
332

333
    logging.info(action_sql)
334
    cur.execute(action_sql)
335

336
    global g_succ_sql_list
337
    g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
338

339
  wait_parameter_sync(cur, False, "min_observer_version", current_data_version, timeout)
340
  wait_parameter_sync(cur, False, "_upgrade_stage", "POSTUPGRADE", timeout)
341

342

343
def do_end_upgrade(cur, timeout):
344

345
  if not check_parameter(cur, False, "enable_upgrade_mode", "False"):
346
    action_sql = "alter system end upgrade"
347
    rollback_sql = ""
348

349
    logging.info(action_sql)
350
    cur.execute(action_sql)
351

352
    global g_succ_sql_list
353
    g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
354

355
  wait_parameter_sync(cur, False, "enable_upgrade_mode", "False", timeout)
356

357
def do_suspend_merge(cur, timeout):
358
  tenants_list = []
359
  if get_min_cluster_version(cur) < get_version("4.2.1.0"):
360
    tenants_list = ['all']
361
  else:
362
    tenants_list = ['sys', 'all_user', 'all_meta']
363

364
  query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60)
365

366
  set_session_timeout(cur, query_timeout)
367

368
  for tenants in tenants_list:
369
    action_sql = "alter system suspend merge tenant = {0}".format(tenants)
370
    rollback_sql = "alter system resume merge tenant = {0}".format(tenants)
371
    logging.info(action_sql)
372
    cur.execute(action_sql)
373

374
  set_session_timeout(cur, 10)
375

376
def do_resume_merge(cur, timeout):
377
  tenants_list = []
378
  if get_min_cluster_version(cur) < get_version("4.2.1.0"):
379
    tenants_list = ['all']
380
  else:
381
    tenants_list = ['sys', 'all_user', 'all_meta']
382

383
  query_timeout = set_default_timeout_by_tenant(cur, timeout, 10, 60)
384

385
  set_session_timeout(cur, query_timeout)
386

387
  for tenants in tenants_list:
388
    action_sql = "alter system resume merge tenant = {0}".format(tenants)
389
    rollback_sql = "alter system suspend merge tenant = {0}".format(tenants)
390
    logging.info(action_sql)
391
    cur.execute(action_sql)
392

393
  set_session_timeout(cur, 10)
394

395
class Cursor:
396
  __cursor = None
397
  def __init__(self, cursor):
398
    self.__cursor = cursor
399
  def exec_sql(self, sql, print_when_succ = True):
400
    try:
401
      self.__cursor.execute(sql)
402
      rowcount = self.__cursor.rowcount
403
      if True == print_when_succ:
404
        logging.info('succeed to execute sql: %s, rowcount = %d', sql, rowcount)
405
      return rowcount
406
    except mysql.connector.Error, e:
407
      logging.exception('mysql connector error, fail to execute sql: %s', sql)
408
      raise e
409
    except Exception, e:
410
      logging.exception('normal error, fail to execute sql: %s', sql)
411
      raise e
412
  def exec_query(self, sql, print_when_succ = True):
413
    try:
414
      self.__cursor.execute(sql)
415
      results = self.__cursor.fetchall()
416
      rowcount = self.__cursor.rowcount
417
      if True == print_when_succ:
418
        logging.info('succeed to execute query: %s, rowcount = %d', sql, rowcount)
419
      return (self.__cursor.description, results)
420
    except mysql.connector.Error, e:
421
      logging.exception('mysql connector error, fail to execute sql: %s', sql)
422
      raise e
423
    except Exception, e:
424
      logging.exception('normal error, fail to execute sql: %s', sql)
425
      raise e
426

427
class DDLCursor:
428
  _cursor = None
429
  def __init__(self, cursor):
430
    self._cursor = Cursor(cursor)
431
  def exec_ddl(self, sql, print_when_succ = True):
432
    try:
433
      # 这里检查是不是ddl,不是ddl就抛错
434
      check_is_ddl_sql(sql)
435
      return self._cursor.exec_sql(sql, print_when_succ)
436
    except Exception, e:
437
      logging.exception('fail to execute ddl: %s', sql)
438
      raise e
439

440
class QueryCursor:
441
  _cursor = None
442
  def __init__(self, cursor):
443
    self._cursor = Cursor(cursor)
444
  def exec_query(self, sql, print_when_succ = True):
445
    try:
446
      # 这里检查是不是query,不是query就抛错
447
      check_is_query_sql(sql)
448
      return self._cursor.exec_query(sql, print_when_succ)
449
    except Exception, e:
450
      logging.exception('fail to execute dml query: %s', sql)
451
      raise e
452

453
class DMLCursor(QueryCursor):
454
  def exec_update(self, sql, print_when_succ = True):
455
    try:
456
      # 这里检查是不是update,不是update就抛错
457
      check_is_update_sql(sql)
458
      return self._cursor.exec_sql(sql, print_when_succ)
459
    except Exception, e:
460
      logging.exception('fail to execute dml update: %s', sql)
461
      raise e
462

463
class BaseDDLAction():
464
  __ddl_cursor = None
465
  _query_cursor = None
466
  def __init__(self, cursor):
467
    self.__ddl_cursor = DDLCursor(cursor)
468
    self._query_cursor = QueryCursor(cursor)
469
  def do_action(self):
470
    global g_succ_sql_list
471
    action_sql = self.get_action_ddl()
472
    rollback_sql = self.get_rollback_sql()
473
    self.__ddl_cursor.exec_ddl(action_sql)
474
    g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
475
    # ddl马上就提交了,因此刷新g_commit_sql_list
476
    refresh_commit_sql_list()
477

478
class BaseDMLAction():
479
  __dml_cursor = None
480
  _query_cursor = None
481
  def __init__(self, cursor):
482
    self.__dml_cursor = DMLCursor(cursor)
483
    self._query_cursor = QueryCursor(cursor)
484
  def do_action(self):
485
    global g_succ_sql_list
486
    action_sql = self.get_action_dml()
487
    rollback_sql = self.get_rollback_sql()
488
    self.__dml_cursor.exec_update(action_sql)
489
    g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
490

491
class BaseEachTenantDMLAction():
492
  __dml_cursor = None
493
  _query_cursor = None
494
  _tenant_id_list = None
495
  _cursor = None
496
  def __init__(self, cursor, tenant_id_list):
497
    self.__dml_cursor = DMLCursor(cursor)
498
    self._query_cursor = QueryCursor(cursor)
499
    self._tenant_id_list = tenant_id_list
500
    self._cursor = Cursor(cursor)
501
  def get_tenant_id_list(self):
502
    return self._tenant_id_list
503
  def do_each_tenant_action(self, tenant_id):
504
    global g_succ_sql_list
505
    action_sql = self.get_each_tenant_action_dml(tenant_id)
506
    rollback_sql = self.get_each_tenant_rollback_sql(tenant_id)
507
    self.__dml_cursor.exec_update(action_sql)
508
    g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
509

510
class BaseEachTenantDDLAction():
511
  __dml_cursor = None
512
  _query_cursor = None
513
  _tenant_id_list = None
514
  _all_table_name = "__all_table"
515
  def __init__(self, cursor, tenant_id_list):
516
    self.__ddl_cursor = DDLCursor(cursor)
517
    self._query_cursor = QueryCursor(cursor)
518
    self._tenant_id_list = tenant_id_list
519
  def get_tenant_id_list(self):
520
    return self._tenant_id_list
521
  def get_all_table_name(self):
522
    return self._all_table_name
523
  def set_all_table_name(self, table_name):
524
    self._all_table_name = table_name
525
  def do_each_tenant_action(self, tenant_id):
526
    global g_succ_sql_list
527
    action_sql = self.get_each_tenant_action_ddl(tenant_id)
528
    rollback_sql = self.get_each_tenant_rollback_sql(tenant_id)
529
    self.__ddl_cursor.exec_ddl(action_sql)
530
    g_succ_sql_list.append(SqlItem(action_sql, rollback_sql))
531
    # ddl马上就提交了,因此刷新g_commit_sql_list
532
    refresh_commit_sql_list()
533

534
def actions_cls_compare(x, y):
535
  diff = x.get_seq_num() - y.get_seq_num()
536
  if 0 == diff:
537
    raise MyError('seq num is equal')
538
  elif diff < 0:
539
    return -1
540
  else:
541
    return 1
542

543
def reflect_action_cls_list(action_module, action_name_prefix):
544
  action_cls_list = []
545
  cls_from_actions = dir(action_module)
546
  for cls in cls_from_actions:
547
    if cls.startswith(action_name_prefix):
548
      action_cls = getattr(action_module, cls)
549
      action_cls_list.append(action_cls)
550
  action_cls_list.sort(actions_cls_compare)
551
  return action_cls_list
552

553
def fetch_observer_version(cur):
554
  sql = """select distinct value from __all_virtual_sys_parameter_stat where name='min_observer_version'"""
555
  logging.info(sql)
556
  cur.execute(sql)
557
  result = cur.fetchall()
558
  if len(result) != 1:
559
    raise MyError('query results count is not 1')
560
  else:
561
    logging.info('get observer version success, version = {0}'.format(result[0][0]))
562
  return result[0][0]
563

564
def fetch_tenant_ids(query_cur):
565
  try:
566
    tenant_id_list = []
567
    (desc, results) = query_cur.exec_query("""select distinct tenant_id from oceanbase.__all_tenant order by tenant_id desc""")
568
    for r in results:
569
      tenant_id_list.append(r[0])
570
    return tenant_id_list
571
  except Exception, e:
572
    logging.exception('fail to fetch distinct tenant ids')
573
    raise e
574

575

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.