oceanbase

Форк
0
/
obmp_stmt_get_piece_data.cpp 
372 строки · 14.5 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER
14

15
#include "obmp_stmt_get_piece_data.h"
16

17
#include "lib/worker.h"
18
#include "lib/oblog/ob_log.h"
19
#include "sql/ob_sql_context.h"
20
#include "lib/stat/ob_session_stat.h"
21
#include "rpc/ob_request.h"
22
#include "share/schema/ob_schema_getter_guard.h"
23
#include "sql/ob_sql_context.h"
24
#include "sql/session/ob_sql_session_info.h"
25
#include "sql/ob_sql.h"
26
#include "observer/ob_req_time_service.h"
27
#include "observer/mysql/obmp_utils.h"
28
#include "observer/mysql/obmp_stmt_send_piece_data.h"
29
#include "rpc/obmysql/packet/ompk_piece.h"
30
#include "observer/omt/ob_tenant.h"
31
#include "sql/plan_cache/ob_ps_cache.h"
32

33
namespace oceanbase
34
{
35

36
using namespace rpc;
37
using namespace common;
38
using namespace share;
39
using namespace obmysql;
40
using namespace sql;
41

42
namespace observer
43
{
44

45
ObMPStmtGetPieceData::ObMPStmtGetPieceData(const ObGlobalContext &gctx)
46
    : ObMPBase(gctx),
47
      single_process_timestamp_(0),
48
      exec_start_timestamp_(0),
49
      exec_end_timestamp_(0),
50
      stmt_id_(0),
51
      column_id_(-1),
52
      offset_(0),
53
      piece_size_(0)
54
{
55
  ctx_.exec_type_ = MpQuery;
56
}
57

58
/*
59
 * request packet:
60
 * 1  COM_STMT_GET_PIECE_data
61
 * 4  stmt_id
62
 * 2  offset
63
 * 4  offset_type
64
 * 4  column_id
65
 * 4  piece_size
66
 */
67
int ObMPStmtGetPieceData::before_process()
68
{
69
  int ret = OB_SUCCESS;
70
  if (OB_FAIL(ObMPBase::before_process())) {
71
    LOG_WARN("failed to pre processing packet", K(ret));
72
  } else {
73
    const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
74
    const char* pos = pkt.get_cdata();
75
    // stmt_id
76
    ObMySQLUtil::get_int4(pos, stmt_id_);
77
    int16_t offset_type = 0;
78
    ObMySQLUtil::get_int2(pos, offset_type);
79
    ObMySQLUtil::get_int4(pos, offset_);
80
    ObMySQLUtil::get_int2(pos, column_id_);
81
    ObMySQLUtil::get_int8(pos, piece_size_);
82
    LOG_DEBUG("get piece data", K(stmt_id_), K(column_id_), K(piece_size_));
83
  }
84
  return ret;
85
}
86

87
int ObMPStmtGetPieceData::process()
88
{
89
  int ret = OB_SUCCESS;
90
  ObSQLSessionInfo *sess = NULL;
91
  bool need_response_error = true;
92
  bool need_disconnect = true;
93
  bool async_resp_used = false; // 由事务提交线程异步回复客户端
94
  int64_t query_timeout = 0;
95
  ObSMConnection *conn = get_conn();
96

97
  if (OB_ISNULL(req_) || OB_ISNULL(conn)) {
98
    ret = OB_ERR_UNEXPECTED;
99
    LOG_WARN("req or conn is null", K_(req), K(stmt_id_), K(conn), K(ret));
100
  } else if (OB_UNLIKELY(!conn->is_in_authed_phase())) {
101
    ret = OB_ERR_NO_PRIVILEGE;
102
    LOG_WARN("receive sql without session", K(ret), K(stmt_id_));
103
  } else if (OB_ISNULL(conn->tenant_)) {
104
    ret = OB_ERR_UNEXPECTED;
105
    LOG_ERROR("invalid tenant", K(conn->tenant_), K(ret), K(stmt_id_));
106
  } else if (OB_FAIL(get_session(sess))) {
107
    LOG_WARN("get session fail", K(ret), K(stmt_id_));
108
  } else if (OB_ISNULL(sess)) {
109
    ret = OB_ERR_UNEXPECTED;
110
    LOG_WARN("session is NULL or invalid", K(sess), K(ret), K(stmt_id_));
111
  } else if (OB_FAIL(update_transmission_checksum_flag(*sess))) {
112
    LOG_WARN("update transmisson checksum flag failed", K(ret), K(stmt_id_));
113
  } else {
114
    ObSQLSessionInfo &session = *sess;
115
    THIS_WORKER.set_session(sess);
116
    ObSQLSessionInfo::LockGuard lock_guard(session.get_query_lock());
117
    session.set_current_trace_id(ObCurTraceId::get_trace_id());
118
    session.init_use_rich_format();
119
    session.get_raw_audit_record().request_memory_used_ = 0;
120
    observer::ObProcessMallocCallback pmcb(0,
121
          session.get_raw_audit_record().request_memory_used_);
122
    lib::ObMallocCallbackGuard guard(pmcb);
123
    int64_t tenant_version = 0;
124
    int64_t sys_version = 0;
125
    const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
126
    int64_t packet_len = pkt.get_clen();
127
    if (OB_UNLIKELY(!session.is_valid())) {
128
      ret = OB_ERR_UNEXPECTED;
129
      LOG_ERROR("invalid session", K_(stmt_id), K(ret));
130
    } else if (OB_FAIL(process_kill_client_session(session))) {
131
      LOG_WARN("client session has been killed", K(ret));
132
    } else if (OB_UNLIKELY(session.is_zombie())) {
133
      ret = OB_ERR_SESSION_INTERRUPTED;
134
      LOG_WARN("session has been killed", K(session.get_session_state()), K_(stmt_id),
135
               K(session.get_sessid()), "proxy_sessid", session.get_proxy_sessid(), K(ret));
136
    } else if (OB_UNLIKELY(packet_len > session.get_max_packet_size())) {
137
      ret = OB_ERR_NET_PACKET_TOO_LARGE;
138
      LOG_WARN("packet too large than allowd for the session", K_(stmt_id), K(ret));
139
    } else if (OB_FAIL(session.get_query_timeout(query_timeout))) {
140
      LOG_WARN("fail to get query timeout", K_(stmt_id), K(ret));
141
    } else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
142
                session.get_effective_tenant_id(), tenant_version))) {
143
      LOG_WARN("fail get tenant broadcast version", K(ret));
144
    } else if (OB_FAIL(gctx_.schema_service_->get_tenant_received_broadcast_version(
145
                OB_SYS_TENANT_ID, sys_version))) {
146
      LOG_WARN("fail get tenant broadcast version", K(ret));
147
    } else if (pkt.exist_trace_info()
148
               && OB_FAIL(session.update_sys_variable(SYS_VAR_OB_TRACE_INFO,
149
                                                      pkt.get_trace_info()))) {
150
      LOG_WARN("fail to update trace info", K(ret));
151
    } else if (FALSE_IT(session.set_txn_free_route(pkt.txn_free_route()))) {
152
    } else if (OB_FAIL(process_extra_info(session, pkt, need_response_error))) {
153
      LOG_WARN("fail get process extra info", K(ret));
154
    } else if (FALSE_IT(session.post_sync_session_info())) {
155
    } else {
156
      need_disconnect = false;
157
      THIS_WORKER.set_timeout_ts(get_receive_timestamp() + query_timeout);
158
      session.partition_hit().reset();
159
      if (OB_FAIL(process_get_piece_data_stmt(session))) {
160
        LOG_WARN("execute sql failed", K_(stmt_id), K(ret));
161
      }
162
    }
163

164
    session.set_last_trace_id(ObCurTraceId::get_trace_id());
165
    THIS_WORKER.set_session(NULL);
166
    revert_session(sess); //current ignore revert session ret
167
  }
168

169
  if (OB_FAIL(ret) && need_response_error && is_conn_valid()) {
170
    send_error_packet(ret, NULL);
171
  }
172

173
  if (OB_FAIL(ret) && need_disconnect && is_conn_valid()) {
174
    force_disconnect();
175
    LOG_WARN("disconnect connection when process query", K(ret));
176
  }
177
  return ret;
178
}
179

180
int ObMPStmtGetPieceData::process_get_piece_data_stmt(ObSQLSessionInfo &session)
181
{
182
  int ret = OB_SUCCESS;
183
  bool need_response_error = true;
184
  int64_t tenant_version = 0;
185
  int64_t sys_version = 0;
186
  setup_wb(session);
187

188
  ObVirtualTableIteratorFactory vt_iter_factory(*gctx_.vt_iter_creator_);
189
  ObSessionStatEstGuard stat_est_guard(get_conn()->tenant_->id(), session.get_sessid());
190
  const bool enable_trace_log = lib::is_trace_log_enabled();
191
  if (enable_trace_log) {
192
    ObThreadLogLevelUtils::init(session.get_log_id_level_map());
193
  }
194
  ret = do_process(session);
195
  if (enable_trace_log) {
196
    ObThreadLogLevelUtils::clear();
197
  }
198

199
  //对于tracelog的处理,不影响正常逻辑,错误码无须赋值给ret
200
  int tmp_ret = OB_SUCCESS;
201
  //清空WARNING BUFFER
202
  tmp_ret = do_after_process(session, ctx_, false);
203
  UNUSED(tmp_ret);
204
  return ret;
205
}
206

207
int ObMPStmtGetPieceData::do_process(ObSQLSessionInfo &session)
208
{
209
  int ret = OB_SUCCESS;
210
  ObAuditRecordData &audit_record = session.get_raw_audit_record();
211
  audit_record.try_cnt_++;
212
  const bool enable_perf_event = lib::is_diagnose_info_enabled();
213
  const bool enable_sql_audit = GCONF.enable_sql_audit
214
                                && session.get_local_ob_enable_sql_audit();
215
  single_process_timestamp_ = ObTimeUtility::current_time();
216
  bool is_diagnostics_stmt = false;
217

218
  ObWaitEventStat total_wait_desc;
219
  ObDiagnoseSessionInfo *di = ObDiagnoseSessionInfo::get_local_diagnose_info();
220
  {
221
    ObMaxWaitGuard max_wait_guard(enable_perf_event 
222
                                    ? &audit_record.exec_record_.max_wait_event_ 
223
                                    : NULL, di);
224
    ObTotalWaitGuard total_wait_guard(enable_perf_event ? &total_wait_desc : NULL, di);
225
    if (enable_perf_event) {
226
      audit_record.exec_record_.record_start(di);
227
    }
228
    int64_t execution_id = 0;
229
    ObString sql = "get piece info";
230
    //监控项统计开始
231
    exec_start_timestamp_ = ObTimeUtility::current_time();
232
    if (FALSE_IT(execution_id = gctx_.sql_engine_->get_execution_id())) {
233
      //nothing to do
234
    } else if (OB_FAIL(set_session_active(sql, session, ObTimeUtil::current_time(), 
235
                                          obmysql::ObMySQLCmd::COM_STMT_GET_PIECE_DATA))) {
236
      LOG_WARN("fail to set session active", K(ret));
237
    } else if (OB_FAIL(response_result(session))) {
238
      exec_end_timestamp_ = ObTimeUtility::current_time();
239
    } else {
240
      session.set_current_execution_id(execution_id);
241

242
      //监控项统计结束
243
      exec_end_timestamp_ = ObTimeUtility::current_time();
244

245
      // some statistics must be recorded for plan stat, even though sql audit disabled
246
      bool first_record = (1 == audit_record.try_cnt_);
247
      ObExecStatUtils::record_exec_timestamp(*this, first_record, audit_record.exec_timestamp_);
248
      audit_record.exec_timestamp_.update_stage_time();
249

250
      if (enable_perf_event) {
251
        audit_record.exec_record_.record_end(di);
252
        audit_record.exec_record_.wait_time_end_ = total_wait_desc.time_waited_;
253
        audit_record.exec_record_.wait_count_end_ = total_wait_desc.total_waits_;
254
        audit_record.update_event_stage_state();
255
        const int64_t time_cost = exec_end_timestamp_ - get_receive_timestamp();
256
        EVENT_INC(SQL_PS_PREPARE_COUNT);
257
        EVENT_ADD(SQL_PS_PREPARE_TIME, time_cost);
258
      }
259
    }
260
  } // diagnose end
261

262
  // store the warning message from the most recent statement in the current session
263
  if (OB_SUCC(ret) && is_diagnostics_stmt) {
264
    // if diagnostic stmt execute successfully, it dosen't clear the warning message
265
    session.update_show_warnings_buf();
266
  } else {
267
    session.set_show_warnings_buf(ret); // TODO: 挪个地方性能会更好,减少部分wb拷贝
268
  }
269

270
  //set read_only
271
  if (OB_FAIL(ret)) {
272
    bool is_partition_hit = session.partition_hit().get_bool();
273
    int err = send_error_packet(ret, NULL, is_partition_hit);
274
    if (OB_SUCCESS != err) {  // 发送error包
275
      LOG_WARN("send error packet failed", K(ret), K(err));
276
    }
277
  }
278
  if (enable_sql_audit) {
279
    audit_record.status_ = ret;
280
    audit_record.client_addr_ = session.get_peer_addr();
281
    audit_record.user_client_addr_ = session.get_user_client_addr();
282
    audit_record.user_group_ = THIS_WORKER.get_group_id();
283
    audit_record.ps_stmt_id_ = stmt_id_;
284
    audit_record.plan_id_ = column_id_;
285
    audit_record.return_rows_ = piece_size_;
286
    audit_record.is_perf_event_closed_ = !lib::is_diagnose_info_enabled();
287
    if (OB_NOT_NULL(session.get_ps_cache())) {
288
      ObPsStmtInfoGuard guard;
289
      ObPsStmtInfo *ps_info = NULL;
290
      ObPsStmtId inner_stmt_id = OB_INVALID_ID;
291
      if (OB_SUCC(session.get_inner_ps_stmt_id(stmt_id_, inner_stmt_id))
292
            && OB_SUCC(session.get_ps_cache()->get_stmt_info_guard(inner_stmt_id, guard))
293
            && OB_NOT_NULL(ps_info = guard.get_stmt_info())) {
294
        audit_record.ps_inner_stmt_id_ = inner_stmt_id;
295
        audit_record.sql_ = const_cast<char *>(ps_info->get_ps_sql().ptr());
296
        audit_record.sql_len_ = min(ps_info->get_ps_sql().length(), OB_MAX_SQL_LENGTH);
297
      } else {
298
        LOG_WARN("get sql fail in get piece data", K(stmt_id_));
299
      }
300
    }
301
  }
302
  ObSQLUtils::handle_audit_record(false, EXECUTE_PS_GET_PIECE, session, ctx_.is_sensitive_);
303

304
  clear_wb_content(session);
305
  return ret;
306
}
307

308
int ObMPStmtGetPieceData::response_result(ObSQLSessionInfo &session)
309
{
310
  int ret = OB_SUCCESS;
311
  ObPieceBuffer piece_buf;
312
  ObPieceCache *piece_cache = session.get_piece_cache();
313
  if (OB_ISNULL(piece_cache)) {
314
    // must be init in fetch
315
    ret = OB_ERR_UNEXPECTED;
316
    LOG_WARN("piece cache is null.", K(ret), K(stmt_id_), K(column_id_));
317
  } else if (OB_FAIL(piece_cache->get_piece_buffer(stmt_id_, 
318
                                                  column_id_, 
319
                                                  offset_, 
320
                                                  piece_size_, 
321
                                                  piece_buf,
322
                                                  session))) {
323
    LOG_WARN("get piece buffer fail.", K(ret), K(stmt_id_));
324
  } else if (NULL == piece_buf.get_piece_buffer()) {
325
    ret = OB_ERR_UNEXPECTED;
326
    LOG_WARN(" piece buffer is null. ", K(ret));
327
  } else {
328
    // response piece packet
329
    OMPKPiece piece_packet(piece_buf.get_piece_mode(),
330
                           piece_buf.is_null(),
331
                           piece_buf.get_piece_buffer()->length(),
332
                           *piece_buf.get_piece_buffer());
333
    if (OB_FAIL(response_packet(piece_packet, &session))) {
334
      LOG_WARN("response piece packet fail.", K(ret), K(stmt_id_), K(column_id_));
335
    } else {
336
      ObPiece *piece = NULL;
337
      if (OB_FAIL(update_last_pkt_pos())) {
338
        LOG_WARN("failed to update last packet pos", K(ret));
339
      } else if (OB_FAIL(piece_cache->get_piece(stmt_id_, column_id_, piece))) {
340
        LOG_WARN("get piece fail", K(stmt_id_), K(column_id_), K(ret) );
341
      } else if (NULL != piece) {
342
        uint64_t count = NULL == piece->get_buffer_array() 
343
                          ? 0 
344
                          : piece->get_buffer_array()->count();
345
        if (0 != count && offset_ == count - 1 && ObLastPiece == piece_buf.get_piece_mode()) {
346
          // 证明发送完全部数据
347
          if (OB_FAIL(piece_cache->remove_piece(piece_cache->get_piece_key(stmt_id_, column_id_), 
348
                                                session))) {
349
            LOG_WARN("remove piece fail", K(stmt_id_), K(column_id_));
350
          }
351
        }
352
      }
353
      // for obproxy
354
      if (OB_SUCC(ret)) {
355
        // in multi-stmt, send extra ok packet in the last stmt(has no more result)
356
        if (need_send_extra_ok_packet()) {
357
          ObOKPParam ok_param;
358
          ok_param.affected_rows_ = 0;
359
          ok_param.is_partition_hit_ = session.partition_hit().get_bool();
360
          ok_param.has_more_result_ = false;
361
          if (OB_FAIL(send_ok_packet(session, ok_param))) {
362
            LOG_WARN("fail to send ok packt", K(ok_param), K(ret));
363
          }
364
        }
365
      }
366
    }
367
  }
368
  return ret;
369
}
370

371
} //end of namespace observer
372
} //end of namespace oceanbase
373

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.