2
* Copyright (c) 2021 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
13
#define USING_LOG_PREFIX SERVER
15
#include "obmp_stmt_get_piece_data.h"
17
#include "lib/worker.h"
18
#include "lib/oblog/ob_log.h"
19
#include "sql/ob_sql_context.h"
20
#include "lib/stat/ob_session_stat.h"
21
#include "rpc/ob_request.h"
22
#include "share/schema/ob_schema_getter_guard.h"
23
#include "sql/ob_sql_context.h"
24
#include "sql/session/ob_sql_session_info.h"
25
#include "sql/ob_sql.h"
26
#include "observer/ob_req_time_service.h"
27
#include "observer/mysql/obmp_utils.h"
28
#include "observer/mysql/obmp_stmt_send_piece_data.h"
29
#include "rpc/obmysql/packet/ompk_piece.h"
30
#include "observer/omt/ob_tenant.h"
31
#include "sql/plan_cache/ob_ps_cache.h"
37
using namespace common;
39
using namespace obmysql;
45
ObMPStmtGetPieceData::ObMPStmtGetPieceData(const ObGlobalContext &gctx)
47
single_process_timestamp_(0),
48
exec_start_timestamp_(0),
49
exec_end_timestamp_(0),
55
ctx_.exec_type_ = MpQuery;
60
* 1 COM_STMT_GET_PIECE_data
67
int ObMPStmtGetPieceData::before_process()
70
if (OB_FAIL(ObMPBase::before_process())) {
71
LOG_WARN("failed to pre processing packet", K(ret));
73
const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
74
const char* pos = pkt.get_cdata();
76
ObMySQLUtil::get_int4(pos, stmt_id_);
77
int16_t offset_type = 0;
78
ObMySQLUtil::get_int2(pos, offset_type);
79
ObMySQLUtil::get_int4(pos, offset_);
80
ObMySQLUtil::get_int2(pos, column_id_);
81
ObMySQLUtil::get_int8(pos, piece_size_);
82
LOG_DEBUG("get piece data", K(stmt_id_), K(column_id_), K(piece_size_));
87
int ObMPStmtGetPieceData::process()
90
ObSQLSessionInfo *sess = NULL;
91
bool need_response_error = true;
92
bool need_disconnect = true;
93
bool async_resp_used = false; // 由事务提交线程异步回复客户端
94
int64_t query_timeout = 0;
95
ObSMConnection *conn = get_conn();
97
if (OB_ISNULL(req_) || OB_ISNULL(conn)) {
98
ret = OB_ERR_UNEXPECTED;
99
LOG_WARN("req or conn is null", K_(req), K(stmt_id_), K(conn), K(ret));
100
} else if (OB_UNLIKELY(!conn->is_in_authed_phase())) {
101
ret = OB_ERR_NO_PRIVILEGE;
102
LOG_WARN("receive sql without session", K(ret), K(stmt_id_));
103
} else if (OB_ISNULL(conn->tenant_)) {
104
ret = OB_ERR_UNEXPECTED;
105
LOG_ERROR("invalid tenant", K(conn->tenant_), K(ret), K(stmt_id_));
106
} else if (OB_FAIL(get_session(sess))) {
107
LOG_WARN("get session fail", K(ret), K(stmt_id_));
108
} else if (OB_ISNULL(sess)) {
109
ret = OB_ERR_UNEXPECTED;
110
LOG_WARN("session is NULL or invalid", K(sess), K(ret), K(stmt_id_));
111
} else if (OB_FAIL(update_transmission_checksum_flag(*sess))) {
112
LOG_WARN("update transmisson checksum flag failed", K(ret), K(stmt_id_));
114
ObSQLSessionInfo &session = *sess;
115
THIS_WORKER.set_session(sess);
116
ObSQLSessionInfo::LockGuard lock_guard(session.get_query_lock());
117
session.set_current_trace_id(ObCurTraceId::get_trace_id());
118
session.init_use_rich_format();
119
session.get_raw_audit_record().request_memory_used_ = 0;
120
observer::ObProcessMallocCallback pmcb(0,
121
session.get_raw_audit_record().request_memory_used_);
122
lib::ObMallocCallbackGuard guard(pmcb);
123
int64_t tenant_version = 0;
124
int64_t sys_version = 0;
125
const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
126
int64_t packet_len = pkt.get_clen();
127
if (OB_UNLIKELY(!session.is_valid())) {
128
ret = OB_ERR_UNEXPECTED;
129
LOG_ERROR("invalid session", K_(stmt_id), K(ret));
130
} else if (OB_FAIL(process_kill_client_session(session))) {
131
LOG_WARN("client session has been killed", K(ret));
132
} else if (OB_UNLIKELY(session.is_zombie())) {
133
ret = OB_ERR_SESSION_INTERRUPTED;
134
LOG_WARN("session has been killed", K(session.get_session_state()), K_(stmt_id),
135
K(session.get_sessid()), "proxy_sessid", session.get_proxy_sessid(), K(ret));
136
} else if (OB_UNLIKELY(packet_len > session.get_max_packet_size())) {
137
ret = OB_ERR_NET_PACKET_TOO_LARGE;
138
LOG_WARN("packet too large than allowd for the session", K_(stmt_id), K(ret));
139
} else if (OB_FAIL(session.get_query_timeout(query_timeout))) {
140
LOG_WARN("fail to get query timeout", K_(stmt_id), K(ret));
141
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
142
session.get_effective_tenant_id(), tenant_version))) {
143
LOG_WARN("fail get tenant broadcast version", K(ret));
144
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
145
OB_SYS_TENANT_ID, sys_version))) {
146
LOG_WARN("fail get tenant broadcast version", K(ret));
147
} else if (pkt.exist_trace_info()
148
&& OB_FAIL(session.update_sys_variable(SYS_VAR_OB_TRACE_INFO,
149
pkt.get_trace_info()))) {
150
LOG_WARN("fail to update trace info", K(ret));
151
} else if (FALSE_IT(session.set_txn_free_route(pkt.txn_free_route()))) {
152
} else if (OB_FAIL(process_extra_info(session, pkt, need_response_error))) {
153
LOG_WARN("fail get process extra info", K(ret));
154
} else if (FALSE_IT(session.post_sync_session_info())) {
156
need_disconnect = false;
157
THIS_WORKER.set_timeout_ts(get_receive_timestamp() + query_timeout);
158
session.partition_hit().reset();
159
if (OB_FAIL(process_get_piece_data_stmt(session))) {
160
LOG_WARN("execute sql failed", K_(stmt_id), K(ret));
164
session.set_last_trace_id(ObCurTraceId::get_trace_id());
165
THIS_WORKER.set_session(NULL);
166
revert_session(sess); //current ignore revert session ret
169
if (OB_FAIL(ret) && need_response_error && is_conn_valid()) {
170
send_error_packet(ret, NULL);
173
if (OB_FAIL(ret) && need_disconnect && is_conn_valid()) {
175
LOG_WARN("disconnect connection when process query", K(ret));
180
int ObMPStmtGetPieceData::process_get_piece_data_stmt(ObSQLSessionInfo &session)
182
int ret = OB_SUCCESS;
183
bool need_response_error = true;
184
int64_t tenant_version = 0;
185
int64_t sys_version = 0;
188
ObVirtualTableIteratorFactory vt_iter_factory(*gctx_.vt_iter_creator_);
189
ObSessionStatEstGuard stat_est_guard(get_conn()->tenant_->id(), session.get_sessid());
190
const bool enable_trace_log = lib::is_trace_log_enabled();
191
if (enable_trace_log) {
192
ObThreadLogLevelUtils::init(session.get_log_id_level_map());
194
ret = do_process(session);
195
if (enable_trace_log) {
196
ObThreadLogLevelUtils::clear();
199
//对于tracelog的处理,不影响正常逻辑,错误码无须赋值给ret
200
int tmp_ret = OB_SUCCESS;
202
tmp_ret = do_after_process(session, ctx_, false);
207
int ObMPStmtGetPieceData::do_process(ObSQLSessionInfo &session)
209
int ret = OB_SUCCESS;
210
ObAuditRecordData &audit_record = session.get_raw_audit_record();
211
audit_record.try_cnt_++;
212
const bool enable_perf_event = lib::is_diagnose_info_enabled();
213
const bool enable_sql_audit = GCONF.enable_sql_audit
214
&& session.get_local_ob_enable_sql_audit();
215
single_process_timestamp_ = ObTimeUtility::current_time();
216
bool is_diagnostics_stmt = false;
218
ObWaitEventStat total_wait_desc;
219
ObDiagnoseSessionInfo *di = ObDiagnoseSessionInfo::get_local_diagnose_info();
221
ObMaxWaitGuard max_wait_guard(enable_perf_event
222
? &audit_record.exec_record_.max_wait_event_
224
ObTotalWaitGuard total_wait_guard(enable_perf_event ? &total_wait_desc : NULL, di);
225
if (enable_perf_event) {
226
audit_record.exec_record_.record_start(di);
228
int64_t execution_id = 0;
229
ObString sql = "get piece info";
231
exec_start_timestamp_ = ObTimeUtility::current_time();
232
if (FALSE_IT(execution_id = gctx_.sql_engine_->get_execution_id())) {
234
} else if (OB_FAIL(set_session_active(sql, session, ObTimeUtil::current_time(),
235
obmysql::ObMySQLCmd::COM_STMT_GET_PIECE_DATA))) {
236
LOG_WARN("fail to set session active", K(ret));
237
} else if (OB_FAIL(response_result(session))) {
238
exec_end_timestamp_ = ObTimeUtility::current_time();
240
session.set_current_execution_id(execution_id);
243
exec_end_timestamp_ = ObTimeUtility::current_time();
245
// some statistics must be recorded for plan stat, even though sql audit disabled
246
bool first_record = (1 == audit_record.try_cnt_);
247
ObExecStatUtils::record_exec_timestamp(*this, first_record, audit_record.exec_timestamp_);
248
audit_record.exec_timestamp_.update_stage_time();
250
if (enable_perf_event) {
251
audit_record.exec_record_.record_end(di);
252
audit_record.exec_record_.wait_time_end_ = total_wait_desc.time_waited_;
253
audit_record.exec_record_.wait_count_end_ = total_wait_desc.total_waits_;
254
audit_record.update_event_stage_state();
255
const int64_t time_cost = exec_end_timestamp_ - get_receive_timestamp();
256
EVENT_INC(SQL_PS_PREPARE_COUNT);
257
EVENT_ADD(SQL_PS_PREPARE_TIME, time_cost);
262
// store the warning message from the most recent statement in the current session
263
if (OB_SUCC(ret) && is_diagnostics_stmt) {
264
// if diagnostic stmt execute successfully, it dosen't clear the warning message
265
session.update_show_warnings_buf();
267
session.set_show_warnings_buf(ret); // TODO: 挪个地方性能会更好,减少部分wb拷贝
272
bool is_partition_hit = session.partition_hit().get_bool();
273
int err = send_error_packet(ret, NULL, is_partition_hit);
274
if (OB_SUCCESS != err) { // 发送error包
275
LOG_WARN("send error packet failed", K(ret), K(err));
278
if (enable_sql_audit) {
279
audit_record.status_ = ret;
280
audit_record.client_addr_ = session.get_peer_addr();
281
audit_record.user_client_addr_ = session.get_user_client_addr();
282
audit_record.user_group_ = THIS_WORKER.get_group_id();
283
audit_record.ps_stmt_id_ = stmt_id_;
284
audit_record.plan_id_ = column_id_;
285
audit_record.return_rows_ = piece_size_;
286
audit_record.is_perf_event_closed_ = !lib::is_diagnose_info_enabled();
287
if (OB_NOT_NULL(session.get_ps_cache())) {
288
ObPsStmtInfoGuard guard;
289
ObPsStmtInfo *ps_info = NULL;
290
ObPsStmtId inner_stmt_id = OB_INVALID_ID;
291
if (OB_SUCC(session.get_inner_ps_stmt_id(stmt_id_, inner_stmt_id))
292
&& OB_SUCC(session.get_ps_cache()->get_stmt_info_guard(inner_stmt_id, guard))
293
&& OB_NOT_NULL(ps_info = guard.get_stmt_info())) {
294
audit_record.ps_inner_stmt_id_ = inner_stmt_id;
295
audit_record.sql_ = const_cast<char *>(ps_info->get_ps_sql().ptr());
296
audit_record.sql_len_ = min(ps_info->get_ps_sql().length(), OB_MAX_SQL_LENGTH);
298
LOG_WARN("get sql fail in get piece data", K(stmt_id_));
302
ObSQLUtils::handle_audit_record(false, EXECUTE_PS_GET_PIECE, session, ctx_.is_sensitive_);
304
clear_wb_content(session);
308
int ObMPStmtGetPieceData::response_result(ObSQLSessionInfo &session)
310
int ret = OB_SUCCESS;
311
ObPieceBuffer piece_buf;
312
ObPieceCache *piece_cache = session.get_piece_cache();
313
if (OB_ISNULL(piece_cache)) {
314
// must be init in fetch
315
ret = OB_ERR_UNEXPECTED;
316
LOG_WARN("piece cache is null.", K(ret), K(stmt_id_), K(column_id_));
317
} else if (OB_FAIL(piece_cache->get_piece_buffer(stmt_id_,
323
LOG_WARN("get piece buffer fail.", K(ret), K(stmt_id_));
324
} else if (NULL == piece_buf.get_piece_buffer()) {
325
ret = OB_ERR_UNEXPECTED;
326
LOG_WARN(" piece buffer is null. ", K(ret));
328
// response piece packet
329
OMPKPiece piece_packet(piece_buf.get_piece_mode(),
331
piece_buf.get_piece_buffer()->length(),
332
*piece_buf.get_piece_buffer());
333
if (OB_FAIL(response_packet(piece_packet, &session))) {
334
LOG_WARN("response piece packet fail.", K(ret), K(stmt_id_), K(column_id_));
336
ObPiece *piece = NULL;
337
if (OB_FAIL(update_last_pkt_pos())) {
338
LOG_WARN("failed to update last packet pos", K(ret));
339
} else if (OB_FAIL(piece_cache->get_piece(stmt_id_, column_id_, piece))) {
340
LOG_WARN("get piece fail", K(stmt_id_), K(column_id_), K(ret) );
341
} else if (NULL != piece) {
342
uint64_t count = NULL == piece->get_buffer_array()
344
: piece->get_buffer_array()->count();
345
if (0 != count && offset_ == count - 1 && ObLastPiece == piece_buf.get_piece_mode()) {
347
if (OB_FAIL(piece_cache->remove_piece(piece_cache->get_piece_key(stmt_id_, column_id_),
349
LOG_WARN("remove piece fail", K(stmt_id_), K(column_id_));
355
// in multi-stmt, send extra ok packet in the last stmt(has no more result)
356
if (need_send_extra_ok_packet()) {
358
ok_param.affected_rows_ = 0;
359
ok_param.is_partition_hit_ = session.partition_hit().get_bool();
360
ok_param.has_more_result_ = false;
361
if (OB_FAIL(send_ok_packet(session, ok_param))) {
362
LOG_WARN("fail to send ok packt", K(ok_param), K(ret));
371
} //end of namespace observer
372
} //end of namespace oceanbase