oceanbase

Форк
0
/
tenant_upgrade_action.py 
258 строк · 9.6 Кб
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3

4
import logging
5
import time
6
from actions import Cursor
7
from actions import DMLCursor
8
from actions import QueryCursor
9
import mysql.connector
10
from mysql.connector import errorcode
11
import actions
12

13
def do_upgrade(conn, cur, timeout, user, pwd):
14
  # upgrade action
15
#升级语句对应的action要写在下面的actions begin和actions end这两行之间,
16
#因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
17
#这两行之间的这些代码,如果不写在这两行之间的话会导致清空不掉相应的代码。
18
  across_version = upgrade_across_version(cur)
19
  if across_version:
20
    run_upgrade_job(conn, cur, "UPGRADE_ALL", timeout)
21
  else:
22
    run_upgrade_job(conn, cur, "UPGRADE_VIRTUAL_SCHEMA", timeout)
23

24
  run_root_inspection(cur, timeout)
25
####========******####======== actions begin ========####******========####
26
  upgrade_syslog_level(conn, cur)
27
  return
28

29
def upgrade_syslog_level(conn, cur):
30
  try:
31
    cur.execute("""select svr_ip, svr_port, value from oceanbase.__all_virtual_sys_parameter_stat where name = 'syslog_level'""")
32
    result = cur.fetchall()
33
    for r in result:
34
      logging.info("syslog level before upgrade: ip: {0}, port: {1}, value: {2}".format(r[0], r[1], r[2]))
35
    cur.execute("""select count(*) cnt from oceanbase.__all_virtual_sys_parameter_stat where name = 'syslog_level' and value = 'INFO'""")
36
    result = cur.fetchall()
37
    info_cnt = result[0][0]
38
    if info_cnt > 0:
39
      actions.set_parameter(cur, "syslog_level", "WDIAG")
40
  except Exception, e:
41
    logging.warn("upgrade syslog level failed!")
42
    raise e
43
####========******####========= actions end =========####******========####
44

45
def query(cur, sql):
46
  cur.execute(sql)
47
  logging.info(sql)
48
  results = cur.fetchall()
49
  return results
50

51
def get_tenant_ids(cur):
52
  return [_[0] for _ in query(cur, 'select tenant_id from oceanbase.__all_tenant')]
53

54
def run_root_inspection(cur, timeout):
55

56
  query_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 10, 600)
57

58
  actions.set_session_timeout(cur, query_timeout)
59

60
  sql = "alter system run job 'root_inspection'"
61
  logging.info(sql)
62
  cur.execute(sql)
63

64
  actions.set_session_timeout(cur, 10)
65

66
def upgrade_across_version(cur):
67
  current_data_version = actions.get_current_data_version()
68
  int_current_data_version = actions.get_version(current_data_version)
69

70
  across_version = False
71
  sys_tenant_id = 1
72

73
  # 1. check if target_data_version/current_data_version match with current_data_version
74
  sql = "select count(*) from oceanbase.__all_table where table_name = '__all_virtual_core_table'"
75
  results = query(cur, sql)
76
  if len(results) < 1 or len(results[0]) < 1:
77
    logging.warn("row/column cnt not match")
78
    raise e
79
  elif results[0][0] <= 0:
80
    # __all_virtual_core_table doesn't exist, this cluster is upgraded from 4.0.0.0
81
    across_version = True
82
  else:
83
    # check
84
    tenant_ids = get_tenant_ids(cur)
85
    if len(tenant_ids) <= 0:
86
      logging.warn("tenant_ids count is unexpected")
87
      raise e
88
    tenant_count = len(tenant_ids)
89

90
    sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0}".format(int_current_data_version)
91
    results = query(cur, sql)
92
    if len(results) != 1 or len(results[0]) != 1:
93
      logging.warn('result cnt not match')
94
      raise e
95
    elif 2 * tenant_count != results[0][0]:
96
      logging.info('target_data_version/current_data_version not match with {0}, tenant_cnt:{1}, result_cnt:{2}'.format(current_data_version, tenant_count, results[0][0]))
97
      across_version = True
98
    else:
99
      logging.info("all tenant's target_data_version/current_data_version are match with {0}".format(current_data_version))
100
      across_version = False
101

102
  # 2. check if compatible match with current_data_version
103
  if not across_version:
104
    sql = "select count(*) from oceanbase.__all_virtual_tenant_parameter_info where name = 'compatible' and value != '{0}'".format(current_data_version)
105
    results = query(cur, sql)
106
    if len(results) < 1 or len(results[0]) < 1:
107
      logging.warn("row/column cnt not match")
108
      raise e
109
    elif results[0][0] == 0:
110
      logging.info("compatible are all matched")
111
    else:
112
      logging.info("compatible unmatched")
113
      across_version = True
114

115
  return across_version
116

117
def get_max_used_job_id(cur):
118
  try:
119
    max_job_id = 0
120
    sql = "select job_id from oceanbase.__all_rootservice_job order by job_id desc limit 1"
121
    results = query(cur, sql)
122

123
    if (len(results) == 0):
124
      max_job_id = 0
125
    elif (len(results) != 1 or len(results[0]) != 1):
126
      logging.warn("row cnt not match")
127
      raise e
128
    else:
129
      max_job_id = results[0][0]
130

131
    logging.info("get max_used_job_id:{0}".format(max_job_id))
132

133
    return max_job_id
134
  except Exception, e:
135
    logging.warn("failed to get max_used_job_id")
136
    raise e
137

138
def check_can_run_upgrade_job(cur, job_name):
139
  try:
140
    sql = """select job_status from oceanbase.__all_rootservice_job
141
             where job_type = '{0}' order by job_id desc limit 1""".format(job_name)
142
    results = query(cur, sql)
143

144
    bret = True
145
    if (len(results) == 0):
146
      bret = True
147
      logging.info("upgrade job not created yet, should run upgrade job")
148
    elif (len(results) != 1 or len(results[0]) != 1):
149
      logging.warn("row cnt not match")
150
      raise e
151
    elif ("INPROGRESS" == results[0][0]):
152
      logging.warn("upgrade job still running, should wait")
153
      raise e
154
    elif ("SUCCESS" == results[0][0]):
155
      bret = True
156
      logging.info("maybe upgrade job remained, can run again")
157
    elif ("FAILED" == results[0][0]):
158
      bret = True
159
      logging.info("execute upgrade job failed, should run again")
160
    else:
161
      logging.warn("invalid job status: {0}".format(results[0][0]))
162
      raise e
163

164
    return bret
165
  except Exception, e:
166
    logging.warn("failed to check if upgrade job can run")
167
    raise e
168

169
def check_upgrade_job_result(cur, job_name, timeout, max_used_job_id):
170
  try:
171
    wait_timeout = actions.set_default_timeout_by_tenant(cur, timeout, 100, 3600)
172

173
    times = wait_timeout / 10
174
    while (times >= 0):
175
      sql = """select job_status, rs_svr_ip, rs_svr_port, gmt_create from oceanbase.__all_rootservice_job
176
               where job_type = '{0}' and job_id > {1} order by job_id desc limit 1
177
            """.format(job_name, max_used_job_id)
178
      results = query(cur, sql)
179

180
      if (len(results) == 0):
181
        logging.info("upgrade job not created yet")
182
      elif (len(results) != 1 or len(results[0]) != 4):
183
        logging.warn("row cnt not match")
184
        raise e
185
      elif ("INPROGRESS" == results[0][0]):
186
        logging.info("upgrade job is still running")
187
        # check if rs change
188
        if times % 10 == 0:
189
          ip = results[0][1]
190
          port = results[0][2]
191
          gmt_create = results[0][3]
192
          sql = """select count(*) from oceanbase.__all_virtual_core_meta_table where role = 1 and svr_ip = '{0}' and svr_port = {1}""".format(ip, port)
193
          results = query(cur, sql)
194
          if (len(results) != 1 or len(results[0]) != 1):
195
            logging.warn("row/column cnt not match")
196
            raise e
197
          elif results[0][0] == 1:
198
            sql = """select count(*) from oceanbase.__all_rootservice_event_history where gmt_create > '{0}' and event = 'full_rootservice'""".format(gmt_create)
199
            results = query(cur, sql)
200
            if (len(results) != 1 or len(results[0]) != 1):
201
              logging.warn("row/column cnt not match")
202
              raise e
203
            elif results[0][0] > 0:
204
              logging.warn("rs changed, should check if upgrade job is still running")
205
              raise e
206
            else:
207
              logging.info("rs[{0}:{1}] still exist, keep waiting".format(ip, port))
208
          else:
209
            logging.warn("rs changed or not exist, should check if upgrade job is still running")
210
            raise e
211
      elif ("SUCCESS" == results[0][0]):
212
        logging.info("execute upgrade job successfully")
213
        break;
214
      elif ("FAILED" == results[0][0]):
215
        logging.warn("execute upgrade job failed")
216
        raise e
217
      else:
218
        logging.warn("invalid job status: {0}".format(results[0][0]))
219
        raise e
220

221
      times = times - 1
222
      if times == -1:
223
        logging.warn("""check {0} job timeout""".format(job_name))
224
        raise e
225
      time.sleep(10)
226
  except Exception, e:
227
    logging.warn("failed to check upgrade job result")
228
    raise e
229

230
def run_upgrade_job(conn, cur, job_name, timeout):
231
  try:
232
    logging.info("start to run upgrade job, job_name:{0}".format(job_name))
233
    # pre check
234
    if check_can_run_upgrade_job(cur, job_name):
235
      conn.autocommit = True
236
      # disable enable_ddl
237
      ori_enable_ddl = actions.get_ori_enable_ddl(cur, timeout)
238
      if ori_enable_ddl == 0:
239
        actions.set_parameter(cur, 'enable_ddl', 'True', timeout)
240
      # enable_sys_table_ddl
241
      actions.set_parameter(cur, 'enable_sys_table_ddl', 'True', timeout)
242
      # get max_used_job_id
243
      max_used_job_id = get_max_used_job_id(cur)
244
      # run upgrade job
245
      sql = """alter system run upgrade job '{0}'""".format(job_name)
246
      logging.info(sql)
247
      cur.execute(sql)
248
      # check upgrade job result
249
      check_upgrade_job_result(cur, job_name, timeout, max_used_job_id)
250
      # reset enable_sys_table_ddl
251
      actions.set_parameter(cur, 'enable_sys_table_ddl', 'False', timeout)
252
      # reset enable_ddl
253
      if ori_enable_ddl == 0:
254
        actions.set_parameter(cur, 'enable_ddl', 'False', timeout)
255
  except Exception, e:
256
    logging.warn("run upgrade job failed, :{0}".format(job_name))
257
    raise e
258
  logging.info("run upgrade job success, job_name:{0}".format(job_name))
259

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.