oceanbase

Форк
0
/
ob_mysql_request_manager.cpp 
334 строки · 11.6 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER
14

15
#include "ob_mysql_request_manager.h"
16
#include "share/ob_define.h"
17
#include "lib/time/ob_time_utility.h"
18
#include "lib/allocator/ob_malloc.h"
19
#include "lib/allocator/ob_concurrent_fifo_allocator.h"
20
#include "lib/allocator/page_arena.h"
21
#include "lib/stat/ob_session_stat.h"
22
#include "lib/alloc/alloc_func.h"
23
#include "lib/thread/thread_mgr.h"
24
#include "lib/rc/ob_rc.h"
25
#include "common/ob_clock_generator.h"
26
#include "share/rc/ob_context.h"
27
#include "observer/mysql/ob_mysql_request_manager.h"
28
#include "observer/ob_server.h"
29
#include "observer/mysql/ob_mysql_request_manager.h"
30
#include "sql/plan_cache/ob_plan_cache.h"
31
#include "sql/plan_cache/ob_plan_cache_callback.h"
32
#include "sql/plan_cache/ob_plan_cache_value.h"
33
#include "sql/session/ob_basic_session_info.h"
34
#include "observer/mysql/ob_query_response_time.h"
35

36
namespace oceanbase
37
{
38
using namespace oceanbase::share::schema;
39
namespace obmysql
40
{
41

42
ObMySQLRequestRecord::~ObMySQLRequestRecord()
43
{
44

45
}
46

47
const int64_t ObMySQLRequestManager::EVICT_INTERVAL;
48

49
ObMySQLRequestManager::ObMySQLRequestManager()
50
  : inited_(false), destroyed_(false), request_id_(0), mem_limit_(0),
51
    allocator_(), queue_(), task_(),
52
    tenant_id_(OB_INVALID_TENANT_ID), tg_id_(-1), stop_flag_(true)
53
{
54
}
55

56
ObMySQLRequestManager::~ObMySQLRequestManager()
57
{
58
  if (inited_) {
59
    destroy();
60
  }
61
}
62

63
int ObMySQLRequestManager::init(uint64_t tenant_id,
64
                                const int64_t max_mem_size,
65
                                const int64_t queue_size)
66
{
67
  int ret = OB_SUCCESS;
68
  if (inited_) {
69
    ret = OB_INIT_TWICE;
70
  } else if (OB_FAIL(queue_.init(ObModIds::OB_MYSQL_REQUEST_RECORD, queue_size, tenant_id))) {
71
    SERVER_LOG(WARN, "Failed to init ObMySQLRequestQueue", K(ret));
72
  } else if (OB_FAIL(allocator_.init(SQL_AUDIT_PAGE_SIZE,
73
                                     ObModIds::OB_MYSQL_REQUEST_RECORD,
74
                                     tenant_id,
75
                                     INT64_MAX))) {
76
    SERVER_LOG(WARN, "failed to init allocator", K(ret));
77
  } else {
78
    //check FIFO mem used and sql audit records every 1 seconds
79
    if (OB_FAIL(task_.init(this))) {
80
      SERVER_LOG(WARN, "fail to init sql audit time tast", K(ret));
81
    } else {
82
      mem_limit_ = max_mem_size;
83
      tenant_id_ = tenant_id;
84
      inited_ = true;
85
      destroyed_ = false;
86
    }
87
  }
88
  if ((OB_FAIL(ret)) && (!inited_)) {
89
    destroy();
90
  }
91
  return ret;
92
}
93

94
int ObMySQLRequestManager::start()
95
{
96
  int ret = OB_SUCCESS;
97
  if (OB_UNLIKELY(!inited_)) {
98
    ret = OB_NOT_INIT;
99
    SERVER_LOG(WARN, "ObMySQLRequestManager is not inited", K(tenant_id_));
100
  } else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::ReqMemEvict, tg_id_))) {
101
    SERVER_LOG(WARN, "create failed", K(ret));
102
  } else if (OB_FAIL(TG_START(tg_id_))) {
103
    SERVER_LOG(WARN, "init timer fail", K(ret));
104
  } else if (OB_FAIL(TG_SCHEDULE(tg_id_, task_, EVICT_INTERVAL, true))) {
105
    SERVER_LOG(WARN, "start eliminate task failed", K(ret));
106
  } else {
107
    stop_flag_ = false;
108
  }
109
  return ret;
110
}
111

112
void ObMySQLRequestManager::stop()
113
{
114
  if (inited_ && !stop_flag_) {
115
    TG_STOP(tg_id_);
116
  }
117
}
118

119
void ObMySQLRequestManager::wait()
120
{
121
  if (inited_ && !stop_flag_) {
122
    TG_WAIT(tg_id_);
123
    stop_flag_ = true;
124
  }
125
}
126

127
void ObMySQLRequestManager::destroy()
128
{
129
  if (!destroyed_) {
130
    TG_DESTROY(tg_id_);
131
    clear_queue();
132
    queue_.destroy();
133
    allocator_.destroy();
134
    inited_ = false;
135
    destroyed_ = true;
136
    stop_flag_ = true;
137
  }
138
}
139

140
/*
141
 * record infomation
142
 * 1.server addr           addr
143
 * 2.server port           int
144
 * 3.client addr           addr
145
 * 4.client port           int
146
 * 5.user_name             varchar
147
 * 6.request_id            int
148
 * 7.sql_id                int
149
 * 8.sql                   varchar
150
 * 9.request_time          int
151
 *10.elipsed_time          int
152
 *11.tenant_name           varchar
153
 */
154

155
int ObMySQLRequestManager::record_request(const ObAuditRecordData &audit_record,
156
                                          const bool enable_query_response_time_stats,
157
                                          bool is_sensitive)
158
{
159
  int ret = OB_SUCCESS;
160
  if (!inited_) {
161
    ret = OB_NOT_INIT;
162
  } else {
163
    ObMySQLRequestRecord *record = NULL;
164
    char *buf = NULL;
165
    //alloc mem from allocator
166
    int64_t pos = sizeof(ObMySQLRequestRecord);
167
    int64_t total_size = sizeof(ObMySQLRequestRecord)
168
                     + audit_record.sql_len_
169
                     + audit_record.tenant_name_len_
170
                     + audit_record.user_name_len_
171
                     + audit_record.db_name_len_
172
                     + audit_record.params_value_len_
173
                     + audit_record.rule_name_len_;
174
    if (NULL == (buf = (char*)alloc(total_size))) {
175
      if (REACH_TIME_INTERVAL(100 * 1000)) {
176
        SERVER_LOG(WARN, "record concurrent fifoallocator alloc mem failed",
177
            K(total_size), K(tenant_id_), K(mem_limit_), K(request_id_), K(ret));
178
      }
179
      ret = OB_ALLOCATE_MEMORY_FAILED;
180
    } else {
181
      record = new(buf)ObMySQLRequestRecord();
182
      record->allocator_ = &allocator_;
183
      record->data_ = audit_record;
184
      //deep copy sql
185
      if ((audit_record.sql_len_ > 0) && (NULL != audit_record.sql_)) {
186
        int64_t stmt_len = min(audit_record.sql_len_, OB_MAX_SQL_LENGTH);
187
        MEMCPY(buf + pos, audit_record.sql_, stmt_len);
188
        record->data_.sql_ = buf + pos;
189
        pos += stmt_len;
190
      }
191
      //deep copy params value
192
      if ((audit_record.params_value_len_ > 0) && (NULL != audit_record.params_value_)) {
193
        MEMCPY(buf + pos, audit_record.params_value_, audit_record.params_value_len_);
194
        record->data_.params_value_ = buf + pos;
195
        pos += audit_record.params_value_len_;
196
      }
197
      //deep copy rule name
198
      if ((audit_record.rule_name_len_ > 0) && (NULL != audit_record.rule_name_)) {
199
        MEMCPY(buf + pos, audit_record.rule_name_, audit_record.rule_name_len_);
200
        record->data_.rule_name_ = buf + pos;
201
        pos += audit_record.rule_name_len_;
202
      }
203
      //deep copy tenant_name
204
      if ((audit_record.tenant_name_len_ > 0) && (NULL != audit_record.tenant_name_)) {
205
        int64_t tenant_len = min(audit_record.tenant_name_len_, OB_MAX_TENANT_NAME_LENGTH);
206
        MEMCPY(buf + pos, audit_record.tenant_name_, tenant_len);
207
        record->data_.tenant_name_ = buf + pos;
208
        pos += tenant_len;
209
      }
210
      //deep copy user_name
211
      if ((audit_record.user_name_len_ > 0) && (NULL != audit_record.user_name_)) {
212
        int64_t user_len = min(audit_record.user_name_len_, OB_MAX_USER_NAME_LENGTH);
213
        MEMCPY(buf + pos, audit_record.user_name_, user_len);
214
        record->data_.user_name_ = buf + pos;
215
        pos += user_len;
216
      }
217
      //deep copy db_name
218
      if ((audit_record.db_name_len_ > 0) && (NULL != audit_record.db_name_)) {
219
        int64_t db_len = min(audit_record.db_name_len_, OB_MAX_DATABASE_NAME_LENGTH);
220
        MEMCPY(buf + pos, audit_record.db_name_, db_len);
221
        record->data_.db_name_ = buf + pos;
222
        pos += db_len;
223
      }
224
      //for find bug
225
      // only print this log if enable_perf_event is enable,
226
      // for `receive_ts_` might be invalid if `enable_perf_event` is false
227
      if (lib::is_diagnose_info_enabled()
228
          && OB_UNLIKELY(ObClockGenerator::getClock() - audit_record.exec_timestamp_.receive_ts_ > US_PER_HOUR)) {
229
        SERVER_LOG(WARN, "record: query too slow ",
230
                   "elapsed", ObClockGenerator::getClock() - audit_record.exec_timestamp_.receive_ts_,
231
                   "receive_ts", audit_record.exec_timestamp_.receive_ts_);
232
      }
233

234
      // query response time
235
      if (enable_query_response_time_stats) {
236
        observer::ObRSTCollector::get_instance().collect_query_response_time(audit_record.tenant_id_,audit_record.get_elapsed_time());
237
      }
238

239
      //push into queue
240
      if (OB_SUCC(ret)) {
241
        if (is_sensitive) {
242
          free(record);
243
          record = NULL;
244
        } else if (OB_FAIL(queue_.push_with_imme_seq(record, record->data_.request_id_))) {
245
          //sql audit槽位已满时会push失败, 依赖后台线程进行淘汰获得可用槽位
246
          if (REACH_TIME_INTERVAL(2 * 1000 * 1000)) {
247
            SERVER_LOG(WARN, "push into queue failed", K(ret));
248
          }
249
          free(record);
250
          record = NULL;
251
        }
252
      }
253
    }
254
  } // end
255
  return ret;
256
}
257

258
int ObMySQLRequestManager::get_mem_limit(uint64_t tenant_id,
259
                                         int64_t &mem_limit)
260
{
261
  int ret = OB_SUCCESS;
262
  int64_t tenant_mem_limit = lib::get_tenant_memory_limit(tenant_id);
263
  // default mem limit
264
  mem_limit = static_cast<int64_t>(static_cast<double>(tenant_mem_limit) * SQL_AUDIT_MEM_FACTOR);
265

266
  // get mem_percentage from session info
267
  ObArenaAllocator alloc;
268
  ObObj obj_val;
269
  int64_t mem_pct = 0;
270
  if (OB_FAIL(ObBasicSessionInfo::get_global_sys_variable(tenant_id,
271
                                                          alloc,
272
                                                          ObDataTypeCastParams(),
273
                                                          ObString(OB_SV_SQL_AUDIT_PERCENTAGE),
274
                                                          obj_val))) {
275
    LOG_WARN("failed to get global sys variable", K(ret), K(tenant_id), K(OB_SV_SQL_AUDIT_PERCENTAGE), K(obj_val));
276
  } else if (OB_FAIL(obj_val.get_int(mem_pct))) {
277
    LOG_WARN("failed to get int", K(ret), K(obj_val));
278
  } else if (mem_pct < 0 || mem_pct > 100) {
279
    ret = OB_INVALID_ARGUMENT;
280
    LOG_WARN("invalid value of sql audit mem percentage", K(ret), K(mem_pct));
281
  } else {
282
    mem_limit = static_cast<int64_t>(static_cast<double>(tenant_mem_limit * mem_pct) / 100);
283
    LOG_DEBUG("tenant sql audit memory limit",
284
             K(tenant_id), K(tenant_mem_limit), K(mem_pct), K(mem_limit));
285
  }
286
  return ret;
287
}
288

289
int ObMySQLRequestManager::mtl_new(ObMySQLRequestManager* &req_mgr)
290
{
291
  int ret = OB_SUCCESS;
292
  req_mgr = OB_NEW(ObMySQLRequestManager, ObMemAttr(MTL_ID(), ObModIds::OB_MYSQL_REQUEST_RECORD));
293
  if (nullptr == req_mgr) {
294
    ret = OB_ALLOCATE_MEMORY_FAILED;
295
    LOG_WARN("failed to alloc memory for ObMySQLRequestManager", K(ret));
296
  }
297
  return ret;
298
}
299

300
int ObMySQLRequestManager::mtl_init(ObMySQLRequestManager* &req_mgr)
301
{
302
  int ret = OB_SUCCESS;
303
  if (nullptr == req_mgr) {
304
    ret = OB_ERR_UNEXPECTED;
305
    LOG_WARN("ObMySQLRequestManager not alloc yet", K(ret));
306
  } else {
307
    uint64_t tenant_id = lib::current_resource_owner_id();
308
    int64_t mem_limit = lib::get_tenant_memory_limit(tenant_id);
309
    mem_limit = static_cast<int64_t>(static_cast<double>(mem_limit) * SQL_AUDIT_MEM_FACTOR);
310
    bool use_mini_queue = lib::is_mini_mode() || MTL_IS_MINI_MODE() || is_meta_tenant(tenant_id);
311
    int64_t queue_size = use_mini_queue ? MINI_MODE_MAX_QUEUE_SIZE : MAX_QUEUE_SIZE;
312
    if (OB_FAIL(req_mgr->init(tenant_id, mem_limit, queue_size))) {
313
      LOG_WARN("failed to init request manager", K(ret));
314
    } else {
315
      // do nothing
316
    }
317
    LOG_INFO("mtl init finish", K(tenant_id), K(mem_limit), K(queue_size), K(ret));
318
  }
319
  if (OB_FAIL(ret) && req_mgr != nullptr) {
320
    // cleanup
321
    common::ob_delete(req_mgr);
322
    req_mgr = nullptr;
323
  }
324
  return ret;
325
}
326

327
void ObMySQLRequestManager::mtl_destroy(ObMySQLRequestManager* &req_mgr)
328
{
329
  common::ob_delete(req_mgr);
330
  req_mgr = nullptr;
331
}
332

333
} // end of namespace obmysql
334
} // end of namespace oceanbase
335

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.