2
* Copyright (c) 2021 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
13
#define USING_LOG_PREFIX SERVER
15
#include "observer/ob_server_schema_updater.h"
16
#include "lib/thread/thread_mgr.h"
17
#include "share/schema/ob_multi_version_schema_service.h"
18
#include "share/inner_table/ob_inner_table_schema.h"
19
#include "observer/ob_server.h"
21
using namespace oceanbase::common;
22
using namespace oceanbase::share;
23
using namespace oceanbase::share::schema;
29
ObServerSchemaTask::ObServerSchemaTask()
30
: type_(INVALID), did_retry_(false), schema_info_()
34
ObServerSchemaTask::ObServerSchemaTask(
37
: type_(type), did_retry_(did_retry), schema_info_()
41
ObServerSchemaTask::ObServerSchemaTask(
44
const ObRefreshSchemaInfo &schema_info)
45
: type_(type), did_retry_(did_retry), schema_info_(schema_info)
49
ObServerSchemaTask::ObServerSchemaTask(TYPE type)
50
: type_(type), did_retry_(false), schema_info_()
54
ObServerSchemaTask::ObServerSchemaTask(
56
const uint64_t tenant_id,
57
const int64_t schema_version)
58
: type_(type), did_retry_(false), schema_info_()
60
schema_info_.set_tenant_id(tenant_id);
61
schema_info_.set_schema_version(schema_version);
64
bool ObServerSchemaTask::need_process_alone() const
66
return REFRESH == type_ || RELEASE == type_;
69
bool ObServerSchemaTask::is_valid() const
71
return INVALID != type_;
74
void ObServerSchemaTask::reset()
81
int64_t ObServerSchemaTask::hash() const
83
uint64_t hash_val = 0;
84
hash_val = murmurhash(&type_, sizeof(type_), hash_val);
85
if (ASYNC_REFRESH == type_) {
86
const uint64_t tenant_id = get_tenant_id();
87
const int64_t schema_version = get_schema_version();
88
hash_val = murmurhash(&tenant_id, sizeof(tenant_id), hash_val);
89
hash_val = murmurhash(&schema_version, sizeof(schema_version), hash_val);
91
return static_cast<int64_t>(hash_val);
94
bool ObServerSchemaTask::operator ==(const ObServerSchemaTask &other) const
96
bool bret = (type_ == other.type_);
97
if (bret && ASYNC_REFRESH == type_) {
98
bret = (get_tenant_id() == other.get_tenant_id())
99
&& (get_schema_version() == other.get_schema_version());
104
bool ObServerSchemaTask::operator <(const ObServerSchemaTask &other) const
106
bool bret = (type_ < other.type_);
107
if (!bret && ASYNC_REFRESH == type_ && ASYNC_REFRESH == other.type_) {
108
if (get_tenant_id() < other.get_tenant_id()) {
110
} else if (get_tenant_id() == other.get_tenant_id()
111
&& get_schema_version() < other.get_schema_version()) {
120
bool ObServerSchemaTask::greator_than(
121
const ObServerSchemaTask <,
122
const ObServerSchemaTask &rt)
124
bool bret = (lt.type_ > rt.type_);
125
if (!bret && ASYNC_REFRESH == lt.type_ && ASYNC_REFRESH == rt.type_) {
126
if (lt.get_tenant_id() > rt.get_tenant_id()) {
128
} else if (lt.get_tenant_id() == rt.get_tenant_id()
129
&& lt.get_schema_version() > rt.get_schema_version()) {
138
bool ObServerSchemaTask::compare_without_version(const ObServerSchemaTask &other) const
140
return (*this == other);
143
uint64_t ObServerSchemaTask::get_group_id() const
145
return static_cast<uint64_t>(type_);
148
bool ObServerSchemaTask::is_barrier() const
153
int ObServerSchemaUpdater::init(const common::ObAddr &host, ObMultiVersionSchemaService *schema_mgr)
155
int ret = OB_SUCCESS;
156
const int64_t MAX_THREAD_CNT = 2;
157
const int64_t MAX_TENANT_CNT = 1024;
158
if (NULL == schema_mgr) {
159
ret = OB_INVALID_ARGUMENT;
160
LOG_WARN("schema_mgr must not null");
161
} else if (OB_FAIL(task_queue_.init(this,
165
LOG_WARN("init task queue failed", KR(ret), LITERAL_K(MAX_THREAD_CNT), K(MAX_TENANT_CNT));
168
schema_mgr_ = schema_mgr;
174
void ObServerSchemaUpdater::stop()
177
LOG_WARN_RET(OB_NOT_INIT, "not init");
183
void ObServerSchemaUpdater::wait()
186
LOG_WARN_RET(OB_NOT_INIT, "not init");
192
void ObServerSchemaUpdater::destroy()
203
int ObServerSchemaUpdater::process_barrier(const ObServerSchemaTask &task, bool &stopped)
205
UNUSEDx(task, stopped);
206
return OB_NOT_SUPPORTED;
209
int ObServerSchemaUpdater::batch_process_tasks(
210
const ObIArray<ObServerSchemaTask> &batch_tasks, bool &stopped)
212
int ret = OB_SUCCESS;
213
ObCurTraceId::init(host_);
214
ObArray<ObServerSchemaTask> tasks;
217
LOG_WARN("ob_server_schema_updeter is not inited.", KR(ret));
218
} else if (stopped) {
220
LOG_WARN("ob_server_schema_updeter is stopped.", KR(ret));
221
} else if (batch_tasks.count() <= 0) {
222
ret = OB_INVALID_ARGUMENT;
223
LOG_WARN("batch_tasks cnt is 0", KR(ret));
224
} else if (OB_FAIL(tasks.assign(batch_tasks))) {
225
LOG_WARN("fail to assign task", KR(ret), "task_cnt", batch_tasks.count());
227
std::sort(tasks.begin(), tasks.end(), ObServerSchemaTask::greator_than);
228
ObServerSchemaTask::TYPE type = tasks.at(0).type_;
229
if ((ObServerSchemaTask::REFRESH == type || ObServerSchemaTask::RELEASE == type)
230
&& (1 != tasks.count())) {
231
ret = OB_ERR_UNEXPECTED;
232
LOG_WARN("refresh/release schema task should process alone",
233
KR(ret), "task_cnt", tasks.count());
234
} else if (ObServerSchemaTask::REFRESH == type) {
235
if (OB_FAIL(process_refresh_task(tasks.at(0)))) {
236
LOG_WARN("fail to process refresh task", KR(ret), K(tasks.at(0)));
238
} else if (ObServerSchemaTask::RELEASE == type) {
239
if (OB_FAIL(process_release_task())) {
240
LOG_WARN("fail to process release task", KR(ret), K(tasks.at(0)));
242
} else if (ObServerSchemaTask::ASYNC_REFRESH == type) {
243
if (OB_FAIL(process_async_refresh_tasks(tasks))) {
244
LOG_WARN("fail to process async refresh tasks", KR(ret));
247
ret = OB_ERR_UNEXPECTED;
248
LOG_WARN("invalid type", KR(ret), K(type));
251
ObCurTraceId::reset();
255
int ObServerSchemaUpdater::process_refresh_task(const ObServerSchemaTask &task)
257
int ret = OB_SUCCESS;
258
const ObRefreshSchemaInfo &schema_info = task.schema_info_;
259
ObTaskController::get().switch_task(share::ObTaskType::SCHEMA);
260
THIS_WORKER.set_timeout_ts(INT64_MAX);
261
LOG_INFO("[REFRESH_SCHEMA] start to process schema refresh task", KR(ret), K(schema_info));
262
if (OB_ISNULL(schema_mgr_)) {
263
ret = OB_ERR_UNEXPECTED;
264
LOG_WARN("schema_mgr_ is NULL", KR(ret));
265
} else if (OB_ISNULL(GCTX.root_service_)) {
266
ret = OB_ERR_UNEXPECTED;
267
LOG_WARN("rootservice is null", KR(ret));
268
} else if (GCTX.root_service_->in_service()
269
&& !GCTX.root_service_->is_full_service()) {
270
// Rootservice will refresh schema(holding mutex lock) when it restarts,
271
// so refresh schema task triggered by heartbeat should be avoided in such a situation.
273
LOG_WARN("rootservice is not in full service, try again", KR(ret),
274
K(GCTX.root_service_->in_service()), K(GCTX.root_service_->is_full_service()));
276
ObRefreshSchemaInfo local_schema_info;
277
const uint64_t new_sequence_id = schema_info.get_sequence_id();
278
uint64_t last_sequence_id = OB_INVALID_ID;
280
} else if (OB_FAIL(schema_mgr_->get_last_refreshed_schema_info(local_schema_info))) {
281
LOG_WARN("fail to get local schema info", KR(ret));
282
} else if (FALSE_IT(last_sequence_id = local_schema_info.get_sequence_id())) {
283
} else if (OB_INVALID_ID != last_sequence_id && last_sequence_id >= new_sequence_id) {
285
LOG_INFO("[REFRESH_SCHEMA] local schema info is newer, no need to refresh schema",
286
KR(ret), K(local_schema_info), K(schema_info));
288
// empty tenant_ids means refresh all tenants' schema.
289
ObSEArray<uint64_t, 1> tenant_ids;
290
uint64_t local_sequence_id = local_schema_info.get_sequence_id();
291
uint64_t new_sequence_id = schema_info.get_sequence_id();
292
// It means observer don't lost heartbeat if sequence_id is consistent, so observer can only refresh specific tenant's schema.
293
if (local_schema_info.is_valid()
294
&& OB_INVALID_VERSION != local_sequence_id
295
&& local_sequence_id + 1 == new_sequence_id) {
296
uint64_t refresh_tenant_id = schema_info.get_tenant_id();
297
if (OB_FAIL(tenant_ids.push_back(refresh_tenant_id))) {
298
LOG_WARN("fail to push back tenant_id", KR(ret), K(refresh_tenant_id));
300
LOG_INFO("[REFRESH_SCHEMA] refresh schema by tenant",
301
KR(ret), K(refresh_tenant_id), K(local_schema_info), K(schema_info));
304
int64_t begin_time = ::oceanbase::common::ObTimeUtility::current_time();
305
LOG_INFO("[REFRESH_SCHEMA] begin refresh schema, ", K(begin_time));
306
bool check_bootstrap = (OB_INVALID_ID == new_sequence_id);
307
if (FAILEDx(schema_mgr_->refresh_and_add_schema(tenant_ids, check_bootstrap))) {
308
LOG_WARN("fail to refresh and add schema", KR(ret));
309
} else if (OB_FAIL(schema_mgr_->set_last_refreshed_schema_info(schema_info))) {
310
LOG_WARN("fail to set last_refreshed_schema_info", KR(ret));
313
LOG_INFO("[REFRESH_SCHEMA] end refresh schema with new mode, ",
314
KR(ret), K(tenant_ids), "used time", ObTimeUtility::current_time() - begin_time);
318
int tmp_ret = OB_SUCCESS;
319
if (OB_TMP_FAIL(try_load_baseline_schema_version_())) { // ignore ret
320
LOG_WARN("fail to load tenant baseline schema version", KR(tmp_ret));
323
// For performance, schema_guard will be cached in one session instead of each SQL statement constructs its own new schema_guard,
324
// which may lead to lack of schema slots since schema guard will hold reference of schema mgr in long time.
325
// To avoid -4023 error caused by lack of schema slots while refresh schema, observer should try to release cached schema_guard in different sessions in such situation.
326
if (OB_EAGAIN == ret) {
327
OBSERVER.get_sql_session_mgr().try_check_session();
330
// dump schema statistics info
331
if (REACH_TIME_INTERVAL(10 * 60 * 1000 * 1000)) { // 10min
332
if (OB_NOT_NULL(schema_mgr_)) {
333
schema_mgr_->dump_schema_statistics();
339
int ObServerSchemaUpdater::process_release_task()
341
int ret = OB_SUCCESS;
342
ObTaskController::get().switch_task(share::ObTaskType::SCHEMA);
343
THIS_WORKER.set_timeout_ts(INT64_MAX);
344
if (OB_ISNULL(schema_mgr_)) {
345
ret = OB_ERR_UNEXPECTED;
346
LOG_WARN("schema_mgr_ is NULL", KR(ret));
347
} else if (!schema_mgr_->is_sys_full_schema()) {
348
// auto release unused memory of schema after schema split
349
} else if (OB_FAIL(schema_mgr_->try_eliminate_schema_mgr())) {
350
LOG_WARN("fail to eliminate schema mgr", KR(ret));
352
LOG_INFO("try to release schema", KR(ret));
356
int ObServerSchemaUpdater::process_async_refresh_tasks(
357
const ObIArray<ObServerSchemaTask> &tasks)
359
int ret = OB_SUCCESS;
360
ObTaskController::get().switch_task(share::ObTaskType::SCHEMA);
361
THIS_WORKER.set_timeout_ts(INT64_MAX);
362
if (OB_ISNULL(schema_mgr_)) {
363
ret = OB_ERR_UNEXPECTED;
364
LOG_WARN("schema_mgr_ is NULL", KR(ret));
366
// For each tenant, we can only execute the async refresh schema task which has maximum schema_version.
367
ObSEArray<uint64_t, UNIQ_TASK_QUEUE_BATCH_EXECUTE_NUM> tenant_ids;
368
for (int64_t i = 0; OB_SUCC(ret) && i < tasks.count(); i++) {
369
const ObServerSchemaTask &cur_task = tasks.at(i);
370
if (ObServerSchemaTask::ASYNC_REFRESH != cur_task.type_) {
371
ret = OB_ERR_UNEXPECTED;
372
LOG_WARN("cur task type should be ASYNC_REFRESH", KR(ret), K(cur_task));
374
const ObServerSchemaTask &last_task = tasks.at(i - 1);
375
if (last_task.get_tenant_id() < cur_task.get_tenant_id()
376
|| (last_task.get_tenant_id() == cur_task.get_tenant_id()
377
&& last_task.get_schema_version() < cur_task.get_schema_version())) {
378
ret = OB_ERR_UNEXPECTED;
379
LOG_WARN("cur task should be less than last task",
380
KR(ret), K(last_task), K(cur_task));
384
int64_t cnt = tenant_ids.count();
385
if (0 == cnt || tenant_ids.at(cnt - 1) != cur_task.get_tenant_id()) {
386
const uint64_t tenant_id = cur_task.get_tenant_id();
387
int64_t local_version = OB_INVALID_VERSION;
388
int tmp_ret = OB_SUCCESS;
389
if (i > 0 && tasks.at(i - 1).get_tenant_id() == cur_task.get_tenant_id()) {
390
// Tasks have been sorted by (tenant_id, schema_version) in desc order, so we just get first task by tenant.
391
} else if (OB_SUCCESS != (tmp_ret = schema_mgr_->get_tenant_refreshed_schema_version(
392
tenant_id, local_version))) { // ignore ret
393
if (OB_ENTRY_NOT_EXIST != tmp_ret) {
394
LOG_WARN("failed to get tenant refreshed schema version", KR(tmp_ret), K(tenant_id));
396
} else if (cur_task.get_schema_version() > local_version) {
397
if (OB_FAIL(tenant_ids.push_back(cur_task.get_tenant_id()))) {
398
LOG_WARN("fail to push back task", KR(ret), K(cur_task));
404
if (OB_SUCC(ret) && tenant_ids.count() > 0) {
405
if (OB_FAIL(schema_mgr_->refresh_and_add_schema(tenant_ids))) {
406
LOG_WARN("fail to refresh schema", KR(ret), K(tenant_ids));
410
LOG_INFO("try to async refresh schema", KR(ret));
414
int ObServerSchemaUpdater::try_reload_schema(
415
const ObRefreshSchemaInfo &schema_info,
416
const bool set_received_schema_version)
419
int ret = OB_SUCCESS;
422
LOG_WARN("ob_server_schema_updeter is not inited.", KR(ret));
423
} else if (OB_ISNULL(schema_mgr_)) {
424
ret = OB_ERR_UNEXPECTED;
425
LOG_WARN("schema_service is null", KR(ret));
427
// Try to update received_broadcast_version which used to check if local schema is new enough for SQL execution.
428
// Here, we ignore error since set_tenant_received_broadcast_version() may fail before tenant firstly refresh schema.
429
int tmp_ret = OB_SUCCESS;
430
if (OB_INVALID_TENANT_ID != schema_info.get_tenant_id()
431
&& schema_info.get_schema_version() > 0
432
&& set_received_schema_version
433
&& OB_TMP_FAIL(schema_mgr_->set_tenant_received_broadcast_version(
434
schema_info.get_tenant_id(), schema_info.get_schema_version()))) {
435
LOG_WARN("fail to set tenant received broadcast version", K(tmp_ret), K(schema_info));
438
const bool did_retry = true;
439
ObServerSchemaTask refresh_task(ObServerSchemaTask::REFRESH, did_retry, schema_info);
440
if (OB_FAIL(task_queue_.add(refresh_task))) {
441
if (OB_EAGAIN != ret) {
442
LOG_WARN("schedule fetch new schema task failed", KR(ret), K(schema_info));
445
LOG_INFO("schedule fetch new schema task", KR(ret), K(schema_info));
451
int ObServerSchemaUpdater::try_release_schema()
453
int ret = OB_SUCCESS;
454
ObServerSchemaTask release_task(ObServerSchemaTask::RELEASE);
457
LOG_WARN("ob_server_schema_updeter is not inited.", KR(ret));
458
} else if (OB_FAIL(task_queue_.add(release_task))) {
459
if (OB_EAGAIN != ret) {
460
LOG_WARN("schedule release schema task failed", KR(ret));
463
LOG_INFO("schedule release schema task", KR(ret));
468
int ObServerSchemaUpdater::async_refresh_schema(
469
const uint64_t tenant_id,
470
const int64_t schema_version)
472
int ret = OB_SUCCESS;
473
ObServerSchemaTask refresh_task(ObServerSchemaTask::ASYNC_REFRESH,
474
tenant_id, schema_version);
477
LOG_WARN("ob_server_schema_updeter is not inited.", KR(ret));
478
} else if (OB_INVALID_TENANT_ID == tenant_id
479
|| OB_INVALID_ID == tenant_id) {
480
ret = OB_INVALID_ARGUMENT;
481
LOG_WARN("invalid arg", KR(ret), K(tenant_id), K(schema_version));
482
} else if (OB_FAIL(task_queue_.add(refresh_task))) {
483
if (OB_EAGAIN != ret) {
484
LOG_WARN("schedule async refresh schema task failed",
485
KR(ret), K(tenant_id), K(schema_version));
488
LOG_INFO("schedule async refresh schema task",
489
KR(ret), K(tenant_id), K(schema_version));
494
int ObServerSchemaUpdater::try_load_baseline_schema_version_()
496
int ret = OB_SUCCESS;
497
ObArray<uint64_t> tenant_ids;
498
if (OB_ISNULL(schema_mgr_)) {
499
ret = OB_ERR_UNEXPECTED;
500
LOG_WARN("schema_mgr_ is NULL", KR(ret));
504
ObSchemaGetterGuard guard;
505
if (FAILEDx(schema_mgr_->get_tenant_schema_guard(OB_SYS_TENANT_ID, guard))) {
506
LOG_WARN("fail to get schema guard", KR(ret));
507
} else if (OB_FAIL(guard.get_available_tenant_ids(tenant_ids))) {
508
LOG_WARN("fail to get avaliable tenant ids", KR(ret));
512
int64_t timeout = GCONF.rpc_timeout;
513
int64_t baseline_schema_version = OB_INVALID_VERSION; // not used
514
FOREACH_X(tenant_id, tenant_ids, OB_SUCC(ret)) { // ignore ret
515
int tmp_ret = OB_SUCCESS;
517
if (OB_TMP_FAIL(ctx.set_timeout(timeout))) {
518
LOG_WARN("fail to set timeout", KR(tmp_ret), K(*tenant_id), K(timeout));
519
} else if (OB_TMP_FAIL(schema_mgr_->get_baseline_schema_version(
520
*tenant_id, true/*auto_update*/, baseline_schema_version))) {
521
LOG_WARN("fail to update baseline schema version", KR(tmp_ret), K(*tenant_id));