13
#define USING_LOG_PREFIX SERVER
15
#include "ob_mysql_request_manager.h"
16
#include "share/ob_define.h"
17
#include "lib/time/ob_time_utility.h"
18
#include "lib/allocator/ob_malloc.h"
19
#include "lib/allocator/ob_concurrent_fifo_allocator.h"
20
#include "lib/allocator/page_arena.h"
21
#include "lib/stat/ob_session_stat.h"
22
#include "lib/alloc/alloc_func.h"
23
#include "lib/thread/thread_mgr.h"
24
#include "lib/rc/ob_rc.h"
25
#include "common/ob_clock_generator.h"
26
#include "share/rc/ob_context.h"
27
#include "observer/mysql/ob_mysql_request_manager.h"
28
#include "observer/ob_server.h"
29
#include "observer/mysql/ob_mysql_request_manager.h"
30
#include "sql/plan_cache/ob_plan_cache.h"
31
#include "sql/plan_cache/ob_plan_cache_callback.h"
32
#include "sql/plan_cache/ob_plan_cache_value.h"
33
#include "sql/session/ob_basic_session_info.h"
34
#include "observer/mysql/ob_query_response_time.h"
38
using namespace oceanbase::share::schema;
42
ObMySQLRequestRecord::~ObMySQLRequestRecord()
47
const int64_t ObMySQLRequestManager::EVICT_INTERVAL;
49
ObMySQLRequestManager::ObMySQLRequestManager()
50
: inited_(false), destroyed_(false), request_id_(0), mem_limit_(0),
51
allocator_(), queue_(), task_(),
52
tenant_id_(OB_INVALID_TENANT_ID), tg_id_(-1), stop_flag_(true)
56
ObMySQLRequestManager::~ObMySQLRequestManager()
63
int ObMySQLRequestManager::init(uint64_t tenant_id,
64
const int64_t max_mem_size,
65
const int64_t queue_size)
70
} else if (OB_FAIL(queue_.init(ObModIds::OB_MYSQL_REQUEST_RECORD, queue_size, tenant_id))) {
71
SERVER_LOG(WARN, "Failed to init ObMySQLRequestQueue", K(ret));
72
} else if (OB_FAIL(allocator_.init(SQL_AUDIT_PAGE_SIZE,
73
ObModIds::OB_MYSQL_REQUEST_RECORD,
76
SERVER_LOG(WARN, "failed to init allocator", K(ret));
79
if (OB_FAIL(task_.init(this))) {
80
SERVER_LOG(WARN, "fail to init sql audit time tast", K(ret));
82
mem_limit_ = max_mem_size;
83
tenant_id_ = tenant_id;
88
if ((OB_FAIL(ret)) && (!inited_)) {
94
int ObMySQLRequestManager::start()
97
if (OB_UNLIKELY(!inited_)) {
99
SERVER_LOG(WARN, "ObMySQLRequestManager is not inited", K(tenant_id_));
100
} else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::ReqMemEvict, tg_id_))) {
101
SERVER_LOG(WARN, "create failed", K(ret));
102
} else if (OB_FAIL(TG_START(tg_id_))) {
103
SERVER_LOG(WARN, "init timer fail", K(ret));
104
} else if (OB_FAIL(TG_SCHEDULE(tg_id_, task_, EVICT_INTERVAL, true))) {
105
SERVER_LOG(WARN, "start eliminate task failed", K(ret));
112
void ObMySQLRequestManager::stop()
114
if (inited_ && !stop_flag_) {
119
void ObMySQLRequestManager::wait()
121
if (inited_ && !stop_flag_) {
127
void ObMySQLRequestManager::destroy()
133
allocator_.destroy();
155
int ObMySQLRequestManager::record_request(const ObAuditRecordData &audit_record,
156
const bool enable_query_response_time_stats,
159
int ret = OB_SUCCESS;
163
ObMySQLRequestRecord *record = NULL;
166
int64_t pos = sizeof(ObMySQLRequestRecord);
167
int64_t total_size = sizeof(ObMySQLRequestRecord)
168
+ audit_record.sql_len_
169
+ audit_record.tenant_name_len_
170
+ audit_record.user_name_len_
171
+ audit_record.db_name_len_
172
+ audit_record.params_value_len_
173
+ audit_record.rule_name_len_;
174
if (NULL == (buf = (char*)alloc(total_size))) {
175
if (REACH_TIME_INTERVAL(100 * 1000)) {
176
SERVER_LOG(WARN, "record concurrent fifoallocator alloc mem failed",
177
K(total_size), K(tenant_id_), K(mem_limit_), K(request_id_), K(ret));
179
ret = OB_ALLOCATE_MEMORY_FAILED;
181
record = new(buf)ObMySQLRequestRecord();
182
record->allocator_ = &allocator_;
183
record->data_ = audit_record;
185
if ((audit_record.sql_len_ > 0) && (NULL != audit_record.sql_)) {
186
int64_t stmt_len = min(audit_record.sql_len_, OB_MAX_SQL_LENGTH);
187
MEMCPY(buf + pos, audit_record.sql_, stmt_len);
188
record->data_.sql_ = buf + pos;
192
if ((audit_record.params_value_len_ > 0) && (NULL != audit_record.params_value_)) {
193
MEMCPY(buf + pos, audit_record.params_value_, audit_record.params_value_len_);
194
record->data_.params_value_ = buf + pos;
195
pos += audit_record.params_value_len_;
198
if ((audit_record.rule_name_len_ > 0) && (NULL != audit_record.rule_name_)) {
199
MEMCPY(buf + pos, audit_record.rule_name_, audit_record.rule_name_len_);
200
record->data_.rule_name_ = buf + pos;
201
pos += audit_record.rule_name_len_;
204
if ((audit_record.tenant_name_len_ > 0) && (NULL != audit_record.tenant_name_)) {
205
int64_t tenant_len = min(audit_record.tenant_name_len_, OB_MAX_TENANT_NAME_LENGTH);
206
MEMCPY(buf + pos, audit_record.tenant_name_, tenant_len);
207
record->data_.tenant_name_ = buf + pos;
211
if ((audit_record.user_name_len_ > 0) && (NULL != audit_record.user_name_)) {
212
int64_t user_len = min(audit_record.user_name_len_, OB_MAX_USER_NAME_LENGTH);
213
MEMCPY(buf + pos, audit_record.user_name_, user_len);
214
record->data_.user_name_ = buf + pos;
218
if ((audit_record.db_name_len_ > 0) && (NULL != audit_record.db_name_)) {
219
int64_t db_len = min(audit_record.db_name_len_, OB_MAX_DATABASE_NAME_LENGTH);
220
MEMCPY(buf + pos, audit_record.db_name_, db_len);
221
record->data_.db_name_ = buf + pos;
227
if (lib::is_diagnose_info_enabled()
228
&& OB_UNLIKELY(ObClockGenerator::getClock() - audit_record.exec_timestamp_.receive_ts_ > US_PER_HOUR)) {
229
SERVER_LOG(WARN, "record: query too slow ",
230
"elapsed", ObClockGenerator::getClock() - audit_record.exec_timestamp_.receive_ts_,
231
"receive_ts", audit_record.exec_timestamp_.receive_ts_);
235
if (enable_query_response_time_stats) {
236
observer::ObRSTCollector::get_instance().collect_query_response_time(audit_record.tenant_id_,audit_record.get_elapsed_time());
244
} else if (OB_FAIL(queue_.push_with_imme_seq(record, record->data_.request_id_))) {
246
if (REACH_TIME_INTERVAL(2 * 1000 * 1000)) {
247
SERVER_LOG(WARN, "push into queue failed", K(ret));
258
int ObMySQLRequestManager::get_mem_limit(uint64_t tenant_id,
261
int ret = OB_SUCCESS;
262
int64_t tenant_mem_limit = lib::get_tenant_memory_limit(tenant_id);
264
mem_limit = static_cast<int64_t>(static_cast<double>(tenant_mem_limit) * SQL_AUDIT_MEM_FACTOR);
267
ObArenaAllocator alloc;
270
if (OB_FAIL(ObBasicSessionInfo::get_global_sys_variable(tenant_id,
272
ObDataTypeCastParams(),
273
ObString(OB_SV_SQL_AUDIT_PERCENTAGE),
275
LOG_WARN("failed to get global sys variable", K(ret), K(tenant_id), K(OB_SV_SQL_AUDIT_PERCENTAGE), K(obj_val));
276
} else if (OB_FAIL(obj_val.get_int(mem_pct))) {
277
LOG_WARN("failed to get int", K(ret), K(obj_val));
278
} else if (mem_pct < 0 || mem_pct > 100) {
279
ret = OB_INVALID_ARGUMENT;
280
LOG_WARN("invalid value of sql audit mem percentage", K(ret), K(mem_pct));
282
mem_limit = static_cast<int64_t>(static_cast<double>(tenant_mem_limit * mem_pct) / 100);
283
LOG_DEBUG("tenant sql audit memory limit",
284
K(tenant_id), K(tenant_mem_limit), K(mem_pct), K(mem_limit));
289
int ObMySQLRequestManager::mtl_new(ObMySQLRequestManager* &req_mgr)
291
int ret = OB_SUCCESS;
292
req_mgr = OB_NEW(ObMySQLRequestManager, ObMemAttr(MTL_ID(), ObModIds::OB_MYSQL_REQUEST_RECORD));
293
if (nullptr == req_mgr) {
294
ret = OB_ALLOCATE_MEMORY_FAILED;
295
LOG_WARN("failed to alloc memory for ObMySQLRequestManager", K(ret));
300
int ObMySQLRequestManager::mtl_init(ObMySQLRequestManager* &req_mgr)
302
int ret = OB_SUCCESS;
303
if (nullptr == req_mgr) {
304
ret = OB_ERR_UNEXPECTED;
305
LOG_WARN("ObMySQLRequestManager not alloc yet", K(ret));
307
uint64_t tenant_id = lib::current_resource_owner_id();
308
int64_t mem_limit = lib::get_tenant_memory_limit(tenant_id);
309
mem_limit = static_cast<int64_t>(static_cast<double>(mem_limit) * SQL_AUDIT_MEM_FACTOR);
310
bool use_mini_queue = lib::is_mini_mode() || MTL_IS_MINI_MODE() || is_meta_tenant(tenant_id);
311
int64_t queue_size = use_mini_queue ? MINI_MODE_MAX_QUEUE_SIZE : MAX_QUEUE_SIZE;
312
if (OB_FAIL(req_mgr->init(tenant_id, mem_limit, queue_size))) {
313
LOG_WARN("failed to init request manager", K(ret));
317
LOG_INFO("mtl init finish", K(tenant_id), K(mem_limit), K(queue_size), K(ret));
319
if (OB_FAIL(ret) && req_mgr != nullptr) {
321
common::ob_delete(req_mgr);
327
void ObMySQLRequestManager::mtl_destroy(ObMySQLRequestManager* &req_mgr)
329
common::ob_delete(req_mgr);