2
* Copyright (c) 2021 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
13
#define USING_LOG_PREFIX SERVER
15
#include "observer/mysql/obmp_init_db.h"
17
#include "lib/worker.h"
18
#include "rpc/ob_request.h"
19
#include "rpc/obmysql/packet/ompk_ok.h"
20
#include "share/schema/ob_multi_version_schema_service.h"
21
#include "share/schema/ob_schema_getter_guard.h"
22
#include "sql/ob_sql_utils.h"
23
#include "sql/session/ob_sql_session_mgr.h"
24
#include "rpc/obmysql/obsm_struct.h"
25
#include "observer/mysql/obmp_utils.h"
26
#include "observer/mysql/ob_query_retry_ctrl.h"
28
using namespace oceanbase::rpc;
29
using namespace oceanbase::obmysql;
30
using namespace oceanbase::common;
31
using namespace oceanbase::sql;
32
using namespace oceanbase::observer;
33
using namespace oceanbase::share::schema;
35
int ObMPInitDB::deserialize()
38
if (OB_ISNULL(req_)) {
39
ret = OB_ERR_UNEXPECTED;
40
LOG_WARN("invalid packet", K(ret), K_(req));
41
} else if (OB_UNLIKELY(req_->get_type() != ObRequest::OB_MYSQL)) {
42
ret = OB_ERR_UNEXPECTED;
43
LOG_WARN("invalid packet", K(ret), K_(req), K(req_->get_type()));
45
const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
46
db_name_.assign_ptr(const_cast<char *>(pkt.get_cdata()), pkt.get_clen()-1);
51
int ObMPInitDB::process()
53
LOG_INFO("init db", K_(db_name));
55
bool need_disconnect = true;
56
ObSQLSessionInfo *session = NULL;
58
ObDataBuffer allocator(db_name_conv_buf, sizeof(db_name_conv_buf));
59
const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
60
int64_t query_timeout = 0;
61
bool is_packet_retry = false;
62
bool need_response_error = true; //temporary placeholder
63
if (OB_FAIL(get_session(session))) {
64
LOG_WARN("get session fail", K(ret));
65
} else if (OB_ISNULL(session)) {
66
ret = OB_ERR_UNEXPECTED;
67
LOG_ERROR("null pointer");
68
} else if (OB_FAIL(process_kill_client_session(*session))) {
69
LOG_WARN("client session has been killed", K(ret));
70
} else if (OB_FAIL(session->get_query_timeout(query_timeout))) {
71
LOG_WARN("fail to get query timeout", K(ret));
72
} else if (OB_ISNULL(gctx_.schema_service_)) {
73
ret = OB_ERR_UNEXPECTED;
74
LOG_WARN("schema_service is null", K(ret));
76
ObCollationType old_db_coll_type = CS_TYPE_INVALID;
77
ObCollationType collation_connection = CS_TYPE_INVALID;
78
ObSQLSessionInfo::LockGuard lock_guard(session->get_query_lock());
80
tmp_db_name = session->get_database_name();
81
session->update_last_active_time();
82
const uint64_t effective_tenant_id = session->get_effective_tenant_id();
83
int64_t global_version = OB_INVALID_VERSION;
84
int64_t local_version = OB_INVALID_VERSION;
85
ObQueryRetryType retry_type = RETRY_TYPE_NONE;
86
int64_t retry_times = 0;
87
THIS_WORKER.set_timeout_ts(get_receive_timestamp() + query_timeout);
88
ObNameCaseMode mode = OB_NAME_CASE_INVALID;
89
if (OB_UNLIKELY(session->is_zombie())) {
90
ret = OB_ERR_SESSION_INTERRUPTED;
91
LOG_WARN("session has been killed", K(ret), KPC(session));
92
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(effective_tenant_id, global_version))) {
93
LOG_WARN("fail to get global_version", K(ret), K(effective_tenant_id));
94
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_refreshed_schema_version(effective_tenant_id, local_version))) {
95
LOG_WARN("fail to get local_version", K(ret), K(effective_tenant_id));
96
} else if (OB_FAIL(session->get_collation_database(old_db_coll_type))) {
97
LOG_WARN("fail to get collation_database", K(ret));
98
} else if (OB_FAIL(session->get_collation_connection(collation_connection))) {
99
LOG_WARN("fail to get collation_connection", K(ret));
100
} else if (OB_FAIL(session->get_name_case_mode(mode))) {
101
LOG_WARN("fail to get name case mode", K(mode), K(ret));
102
} else if (OB_FAIL(update_transmission_checksum_flag(*session))) {
103
LOG_WARN("update transmisson checksum flag failed", K(ret));
104
} else if (FALSE_IT(session->set_txn_free_route(pkt.txn_free_route()))) {
105
} else if (OB_FAIL(process_extra_info(*session, pkt, need_response_error))) {
106
LOG_WARN("fail get process extra info", K(ret));
107
} else if (FALSE_IT(session->post_sync_session_info())) {
109
need_disconnect = false;
110
bool perserve_lettercase = lib::is_oracle_mode() ?
111
true : (mode != OB_LOWERCASE_AND_INSENSITIVE);
112
if (OB_FAIL(ObSQLUtils::convert_sql_text_to_schema_for_storing(allocator,
113
session->get_dtc_params(),
115
LOG_WARN("fail to convert db name", K(ret), KPHEX(db_name_.ptr(), db_name_.length()));
116
} else if (OB_FAIL(ObSQLUtils::check_and_convert_db_name(
117
collation_connection, perserve_lettercase, db_name_))) {
118
LOG_WARN("failed to check database name", K(db_name_), K(ret));
120
bool force_local_retry = false;
122
retry_type = RETRY_TYPE_NONE;
123
ret = do_process(session);
124
if (is_schema_error(ret)) {
125
if (local_version < global_version) {
126
if (!THIS_WORKER.is_timeout()) {
127
if (force_local_retry
128
|| retry_times < ObQueryRetryCtrl::MAX_SCHEMA_ERROR_LOCAL_RETRY_TIMES) {
129
retry_type = RETRY_TYPE_LOCAL;
131
retry_type = RETRY_TYPE_PACKET;
134
if (RETRY_TYPE_LOCAL == retry_type) {
135
ob_usleep(ObQueryRetryCtrl::WAIT_LOCAL_SCHEMA_REFRESHED_US
136
* ObQueryRetryCtrl::linear_timeout_factor(retry_times));
138
int tmp_ret = gctx_.schema_service_->get_tenant_refreshed_schema_version(effective_tenant_id, local_version);
139
if (OB_SUCCESS != tmp_ret) {
140
LOG_WARN("fail to get local_version", K(ret), K(tmp_ret), K(effective_tenant_id));
143
LOG_WARN("schema err, need retry", K(ret),
144
K(retry_type), K(retry_times), K(force_local_retry),
145
LITERAL_K(ObQueryRetryCtrl::MAX_SCHEMA_ERROR_LOCAL_RETRY_TIMES));
148
force_local_retry = false;
149
if (RETRY_TYPE_LOCAL == retry_type) {
151
force_local_retry = true;
152
} else if (RETRY_TYPE_PACKET == retry_type) {
154
if (!THIS_WORKER.can_retry()) {
156
// FIXME: when will we be here?
157
force_local_retry = true;
158
LOG_WARN("fail to set retry flag, force to do local retry");
160
THIS_WORKER.set_need_retry();
161
is_packet_retry = true;
164
if (force_local_retry) {
165
clear_wb_content(*session);
167
} while (force_local_retry);
171
int set_db_ret = OB_SUCCESS;
172
if (OB_SUCCESS != (set_db_ret = session->set_default_database(tmp_db_name, old_db_coll_type))) {
173
LOG_WARN("failed to set default database", K(ret), K(set_db_ret), K(tmp_db_name));
177
session->set_show_warnings_buf(ret);
178
session->reset_warnings_buf();
179
ob_setup_tsi_warning_buffer(NULL);
180
} // end session guard
183
if (false == is_packet_retry && need_disconnect && is_conn_valid()) {
185
LOG_WARN("disconnect connection when process query", K(ret));
186
} else if (false == is_packet_retry && OB_FAIL(send_error_packet(ret, NULL))) { // 覆盖ret, 无需继续抛出
187
LOG_WARN("failed to send error packet", K(ret));
189
} else if (OB_LIKELY(NULL != session)) {
190
ObOKPParam ok_param; // use defualt value
191
if (OB_FAIL(send_ok_packet(*session, ok_param))) {
192
LOG_WARN("fail to send ok packet", K(ok_param), K(ret));
195
if (session != NULL) {
196
if (OB_FAIL(revert_session(session))) {
197
LOG_ERROR("failed to revert session", K(ret));
203
int ObMPInitDB::do_process(sql::ObSQLSessionInfo *session)
205
int ret = OB_SUCCESS;
206
int sret = OB_SUCCESS;
207
share::schema::ObSessionPrivInfo session_priv;
208
ObSchemaGetterGuard schema_guard;
210
if (OB_ISNULL(session) || OB_ISNULL(gctx_.schema_service_)) {
212
LOG_WARN("session not init", K(ret), K(session), K(gctx_.schema_service_));
213
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(
214
session->get_effective_tenant_id(), schema_guard))) {
215
LOG_WARN("fail to get schema guard", K(ret));
216
} else if (session->is_tenant_changed() && 0 != db_name_.case_compare(OB_SYS_DATABASE_NAME)) {
217
ret = OB_ERR_NO_DB_PRIVILEGE;
218
LOG_WARN("can only access oceanbase database when tenant changed", K(ret));
220
session->get_session_priv_info(session_priv);
221
if (OB_FAIL(ObSQLUtils::cvt_db_name_to_org(schema_guard, session, db_name_))) {
222
LOG_WARN("fail to cvt db name to orignal", K(db_name_), K(ret));
223
} else if (OB_FAIL(schema_guard.check_db_access(session_priv, db_name_))) {
224
LOG_WARN("fail to check db access.", K_(db_name), K(ret));
225
if (OB_ERR_NO_DB_SELECTED == ret) {
226
sret = OB_ERR_BAD_DATABASE;// 将错误码抛出让外层重试
228
sret = ret; // 保险起见,也抛出
231
uint64_t db_id = OB_INVALID_ID;
232
session->set_db_priv_set(session_priv.db_priv_set_);
233
if (OB_FAIL(session->set_default_database(db_name_))) {
234
LOG_WARN("failed to set default database", K(ret), K(db_name_));
235
} else if (OB_FAIL(session->update_database_variables(&schema_guard))) {
236
LOG_WARN("failed to update database variables", K(ret));
237
} else if (OB_FAIL(schema_guard.get_database_id(session->get_effective_tenant_id(),
238
session->get_database_name(),
240
LOG_WARN("failed to get database id", K(ret));
242
session->set_database_id(db_id);
246
return (OB_SUCCESS != sret) ? sret : ret;