oceanbase

Форк
0
/
obmp_init_db.cpp 
247 строк · 10.7 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER
14

15
#include "observer/mysql/obmp_init_db.h"
16

17
#include "lib/worker.h"
18
#include "rpc/ob_request.h"
19
#include "rpc/obmysql/packet/ompk_ok.h"
20
#include "share/schema/ob_multi_version_schema_service.h"
21
#include "share/schema/ob_schema_getter_guard.h"
22
#include "sql/ob_sql_utils.h"
23
#include "sql/session/ob_sql_session_mgr.h"
24
#include "rpc/obmysql/obsm_struct.h"
25
#include "observer/mysql/obmp_utils.h"
26
#include "observer/mysql/ob_query_retry_ctrl.h"
27

28
using namespace oceanbase::rpc;
29
using namespace oceanbase::obmysql;
30
using namespace oceanbase::common;
31
using namespace oceanbase::sql;
32
using namespace oceanbase::observer;
33
using namespace oceanbase::share::schema;
34

35
int ObMPInitDB::deserialize()
36
{
37
  int ret = OB_SUCCESS;
38
  if (OB_ISNULL(req_)) {
39
    ret = OB_ERR_UNEXPECTED;
40
    LOG_WARN("invalid packet", K(ret), K_(req));
41
  } else if (OB_UNLIKELY(req_->get_type() != ObRequest::OB_MYSQL)) {
42
    ret = OB_ERR_UNEXPECTED;
43
    LOG_WARN("invalid packet", K(ret), K_(req), K(req_->get_type()));
44
  } else {
45
    const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
46
    db_name_.assign_ptr(const_cast<char *>(pkt.get_cdata()), pkt.get_clen()-1);
47
  }
48
  return ret;
49
}
50

51
int ObMPInitDB::process()
52
{
53
  LOG_INFO("init db", K_(db_name));
54
  int ret = OB_SUCCESS;
55
  bool need_disconnect = true;
56
  ObSQLSessionInfo *session = NULL;
57
  ObString tmp_db_name;
58
  ObDataBuffer allocator(db_name_conv_buf, sizeof(db_name_conv_buf));
59
  const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
60
  int64_t query_timeout = 0;
61
  bool is_packet_retry = false;
62
  bool need_response_error = true; //temporary placeholder
63
  if (OB_FAIL(get_session(session))) {
64
    LOG_WARN("get session  fail", K(ret));
65
  } else if (OB_ISNULL(session)) {
66
    ret = OB_ERR_UNEXPECTED;
67
    LOG_ERROR("null pointer");
68
  } else if (OB_FAIL(process_kill_client_session(*session))) {
69
    LOG_WARN("client session has been killed", K(ret));
70
  } else if (OB_FAIL(session->get_query_timeout(query_timeout))) {
71
    LOG_WARN("fail to get query timeout", K(ret));
72
  } else if (OB_ISNULL(gctx_.schema_service_)) {
73
    ret = OB_ERR_UNEXPECTED;
74
    LOG_WARN("schema_service is null", K(ret));
75
  } else {
76
    ObCollationType old_db_coll_type = CS_TYPE_INVALID;
77
    ObCollationType collation_connection = CS_TYPE_INVALID;
78
    ObSQLSessionInfo::LockGuard lock_guard(session->get_query_lock());
79
    setup_wb(*session);
80
    tmp_db_name = session->get_database_name();
81
    session->update_last_active_time();
82
    const uint64_t effective_tenant_id = session->get_effective_tenant_id();
83
    int64_t global_version = OB_INVALID_VERSION;
84
    int64_t local_version = OB_INVALID_VERSION;
85
    ObQueryRetryType retry_type = RETRY_TYPE_NONE;
86
    int64_t retry_times = 0;
87
    THIS_WORKER.set_timeout_ts(get_receive_timestamp() + query_timeout);
88
    ObNameCaseMode mode = OB_NAME_CASE_INVALID;
89
    if (OB_UNLIKELY(session->is_zombie())) {
90
      ret = OB_ERR_SESSION_INTERRUPTED;
91
      LOG_WARN("session has been killed", K(ret), KPC(session));
92
    } else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(effective_tenant_id, global_version))) {
93
      LOG_WARN("fail to get global_version", K(ret), K(effective_tenant_id));
94
    } else if (OB_FAIL(gctx_.schema_service_->get_tenant_refreshed_schema_version(effective_tenant_id, local_version))) {
95
      LOG_WARN("fail to get local_version", K(ret), K(effective_tenant_id));
96
    } else if (OB_FAIL(session->get_collation_database(old_db_coll_type))) {
97
      LOG_WARN("fail to get collation_database", K(ret));
98
    } else if (OB_FAIL(session->get_collation_connection(collation_connection))) {
99
      LOG_WARN("fail to get collation_connection", K(ret));
100
    } else if (OB_FAIL(session->get_name_case_mode(mode))) {
101
      LOG_WARN("fail to get name case mode", K(mode), K(ret));
102
    } else if (OB_FAIL(update_transmission_checksum_flag(*session))) {
103
      LOG_WARN("update transmisson checksum flag failed", K(ret));
104
    } else if (FALSE_IT(session->set_txn_free_route(pkt.txn_free_route()))) {
105
    } else if (OB_FAIL(process_extra_info(*session, pkt, need_response_error))) {
106
      LOG_WARN("fail get process extra info", K(ret));
107
    } else if (FALSE_IT(session->post_sync_session_info())) {
108
    } else {
109
      need_disconnect = false;
110
      bool perserve_lettercase = lib::is_oracle_mode() ?
111
          true : (mode != OB_LOWERCASE_AND_INSENSITIVE);
112
      if (OB_FAIL(ObSQLUtils::convert_sql_text_to_schema_for_storing(allocator,
113
                                                                     session->get_dtc_params(),
114
                                                                     db_name_))) {
115
        LOG_WARN("fail to convert db name", K(ret), KPHEX(db_name_.ptr(), db_name_.length()));
116
      } else if (OB_FAIL(ObSQLUtils::check_and_convert_db_name(
117
                  collation_connection, perserve_lettercase, db_name_))) {
118
        LOG_WARN("failed to check database name", K(db_name_), K(ret));
119
      } else {
120
        bool force_local_retry = false;
121
        do {
122
          retry_type = RETRY_TYPE_NONE;
123
          ret = do_process(session);
124
          if (is_schema_error(ret)) {
125
            if (local_version < global_version) {
126
              if (!THIS_WORKER.is_timeout()) {
127
                if (force_local_retry
128
                    || retry_times < ObQueryRetryCtrl::MAX_SCHEMA_ERROR_LOCAL_RETRY_TIMES) {
129
                  retry_type = RETRY_TYPE_LOCAL;
130
                } else {
131
                  retry_type = RETRY_TYPE_PACKET;
132
                }
133
                retry_times++;
134
                if (RETRY_TYPE_LOCAL == retry_type) {
135
                  ob_usleep(ObQueryRetryCtrl::WAIT_LOCAL_SCHEMA_REFRESHED_US
136
                         * ObQueryRetryCtrl::linear_timeout_factor(retry_times));
137
                }
138
                int tmp_ret = gctx_.schema_service_->get_tenant_refreshed_schema_version(effective_tenant_id, local_version);
139
                if (OB_SUCCESS != tmp_ret) {
140
                  LOG_WARN("fail to get local_version", K(ret), K(tmp_ret), K(effective_tenant_id));
141
                }
142
              }
143
              LOG_WARN("schema err, need retry", K(ret),
144
                       K(retry_type), K(retry_times), K(force_local_retry),
145
                       LITERAL_K(ObQueryRetryCtrl::MAX_SCHEMA_ERROR_LOCAL_RETRY_TIMES));
146
            }
147
          }
148
          force_local_retry = false;
149
          if (RETRY_TYPE_LOCAL == retry_type) {
150
            // 在本线程重试
151
            force_local_retry = true;
152
          } else if (RETRY_TYPE_PACKET == retry_type) {
153
            // 扔回队列中重试
154
            if (!THIS_WORKER.can_retry()) {
155
              // 不允许丢回队列,在本线程重试
156
              // FIXME: when will we be here?
157
              force_local_retry = true;
158
              LOG_WARN("fail to set retry flag, force to do local retry");
159
            } else {
160
              THIS_WORKER.set_need_retry();
161
              is_packet_retry = true;
162
            }
163
          }
164
          if (force_local_retry) {
165
            clear_wb_content(*session);
166
          }
167
        } while (force_local_retry);
168
      }
169
    }
170
    if (OB_FAIL(ret)) {
171
      int set_db_ret = OB_SUCCESS;
172
      if (OB_SUCCESS != (set_db_ret = session->set_default_database(tmp_db_name, old_db_coll_type))) {
173
        LOG_WARN("failed to set default database", K(ret), K(set_db_ret), K(tmp_db_name));
174
      }
175
    }
176

177
    session->set_show_warnings_buf(ret);
178
    session->reset_warnings_buf();
179
    ob_setup_tsi_warning_buffer(NULL);
180
  }  // end session guard
181

182
  if (OB_FAIL(ret)) {
183
    if (false == is_packet_retry && need_disconnect && is_conn_valid()) {
184
      force_disconnect();
185
      LOG_WARN("disconnect connection when process query", K(ret));
186
    } else  if (false == is_packet_retry && OB_FAIL(send_error_packet(ret, NULL))) { // 覆盖ret, 无需继续抛出
187
      LOG_WARN("failed to send error packet", K(ret));
188
    }
189
  } else if (OB_LIKELY(NULL != session)) {
190
    ObOKPParam ok_param; // use defualt value
191
    if (OB_FAIL(send_ok_packet(*session, ok_param))) {
192
      LOG_WARN("fail to send ok packet", K(ok_param), K(ret));
193
    }
194
  }
195
  if (session != NULL) {
196
    if (OB_FAIL(revert_session(session))) {
197
      LOG_ERROR("failed to revert session", K(ret));
198
    }
199
  }
200
  return ret;
201
}
202

203
int ObMPInitDB::do_process(sql::ObSQLSessionInfo *session)
204
{
205
  int ret = OB_SUCCESS;
206
  int sret = OB_SUCCESS;
207
  share::schema::ObSessionPrivInfo session_priv;
208
  ObSchemaGetterGuard schema_guard;
209

210
  if (OB_ISNULL(session) || OB_ISNULL(gctx_.schema_service_)) {
211
    ret = OB_NOT_INIT;
212
    LOG_WARN("session not init", K(ret), K(session), K(gctx_.schema_service_));
213
  } else if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(
214
                                  session->get_effective_tenant_id(), schema_guard))) {
215
    LOG_WARN("fail to get schema guard", K(ret));
216
  } else if (session->is_tenant_changed() && 0 != db_name_.case_compare(OB_SYS_DATABASE_NAME)) {
217
    ret = OB_ERR_NO_DB_PRIVILEGE;
218
    LOG_WARN("can only access oceanbase database when tenant changed", K(ret));
219
  } else {
220
    session->get_session_priv_info(session_priv);
221
    if (OB_FAIL(ObSQLUtils::cvt_db_name_to_org(schema_guard, session, db_name_))) {
222
      LOG_WARN("fail to cvt db name to orignal", K(db_name_), K(ret));
223
    } else if (OB_FAIL(schema_guard.check_db_access(session_priv, db_name_))) {
224
      LOG_WARN("fail to check db access.", K_(db_name), K(ret));
225
      if (OB_ERR_NO_DB_SELECTED == ret) {
226
        sret = OB_ERR_BAD_DATABASE;// 将错误码抛出让外层重试
227
      } else {
228
        sret = ret; // 保险起见,也抛出
229
      }
230
    } else {
231
      uint64_t db_id = OB_INVALID_ID;
232
      session->set_db_priv_set(session_priv.db_priv_set_);
233
      if (OB_FAIL(session->set_default_database(db_name_))) {
234
        LOG_WARN("failed to set default database", K(ret), K(db_name_));
235
      } else if (OB_FAIL(session->update_database_variables(&schema_guard))) {
236
        LOG_WARN("failed to update database variables", K(ret));
237
      } else if (OB_FAIL(schema_guard.get_database_id(session->get_effective_tenant_id(),
238
                                                      session->get_database_name(),
239
                                                      db_id))) {
240
        LOG_WARN("failed to get database id", K(ret));
241
      } else {
242
        session->set_database_id(db_id);
243
      }
244
    }
245
  }
246
  return (OB_SUCCESS != sret) ? sret : ret;
247
}
248

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.