oceanbase
294 строки · 9.1 Кб
1/**
2* Copyright (c) 2023 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS14
15#include "rootserver/mview/ob_mview_refresh_stats_maintenance_task.h"16#include "observer/ob_server_struct.h"17#include "observer/omt/ob_multi_tenant.h"18#include "share/ob_errno.h"19#include "share/schema/ob_mview_info.h"20#include "share/schema/ob_mview_refresh_stats.h"21#include "share/schema/ob_mview_refresh_stats_params.h"22#include "storage/mview/ob_mview_refresh_stats_purge.h"23
24namespace oceanbase25{
26namespace rootserver27{
28using namespace common;29using namespace share::schema;30
31/**
32* ObMViewRefreshStatsMaintenanceTask
33*/
34
35ObMViewRefreshStatsMaintenanceTask::ObMViewRefreshStatsMaintenanceTask()36: tenant_id_(OB_INVALID_TENANT_ID),37round_(0),38status_(StatusType::PREPARE),39error_code_(OB_SUCCESS),40last_fetch_mview_id_(OB_INVALID_ID),41mview_idx_(0),42fetch_mview_num_(0),43purge_mview_num_(0),44purge_stats_num_(0),45start_time_(-1),46start_purge_time_(-1),47cost_us_(-1),48prepare_cost_us_(-1),49purge_cost_us_(-1),50fetch_finish_(false),51in_sched_(false),52is_stop_(true),53is_inited_(false)54{
55}
56
57ObMViewRefreshStatsMaintenanceTask::~ObMViewRefreshStatsMaintenanceTask() {}58
59int ObMViewRefreshStatsMaintenanceTask::init()60{
61int ret = OB_SUCCESS;62if (IS_INIT) {63ret = OB_INIT_TWICE;64LOG_WARN("ObMViewRefreshStatsMaintenanceTask init twice", KR(ret), KP(this));65} else {66const uint64_t tenant_id = MTL_ID();67tenant_id_ = tenant_id;68mview_ids_.set_attr(ObMemAttr(tenant_id, "MVIds"));69is_inited_ = true;70}71return ret;72}
73
74int ObMViewRefreshStatsMaintenanceTask::start()75{
76int ret = OB_SUCCESS;77if (IS_NOT_INIT) {78ret = OB_NOT_INIT;79LOG_WARN("ObMViewRefreshStatsMaintenanceTask not init", KR(ret), KP(this));80} else {81is_stop_ = false;82if (!in_sched_ &&83OB_FAIL(schedule_task(MVREF_STATS_MAINTENANCE_SCHED_INTERVAL, false /*repeat*/))) {84LOG_WARN("fail to schedule mvref stats maintenance task", KR(ret));85} else {86in_sched_ = true;87}88}89return ret;90}
91
92void ObMViewRefreshStatsMaintenanceTask::stop()93{
94is_stop_ = true;95in_sched_ = false;96cancel_task();97}
98
99void ObMViewRefreshStatsMaintenanceTask::wait() { wait_task(); }100
101void ObMViewRefreshStatsMaintenanceTask::destroy()102{
103is_inited_ = false;104is_stop_ = true;105in_sched_ = false;106cancel_task();107wait_task();108cleanup();109tenant_id_ = OB_INVALID_TENANT_ID;110mview_ids_.destroy();111}
112
113void ObMViewRefreshStatsMaintenanceTask::runTimerTask()114{
115int ret = OB_SUCCESS;116if (IS_NOT_INIT) {117ret = OB_NOT_INIT;118LOG_WARN("ObMViewRefreshStatsMaintenanceTask not init", KR(ret), KP(this));119} else if (OB_UNLIKELY(is_stop_)) {120// do nothing121} else {122switch (status_) {123case StatusType::PREPARE:124if (OB_FAIL(prepare())) {125LOG_WARN("fail to prepare", KR(ret));126}127break;128case StatusType::PURGE:129if (OB_FAIL(purge())) {130LOG_WARN("fail to purge", KR(ret));131}132break;133case StatusType::SUCCESS:134case StatusType::FAIL:135if (OB_FAIL(finish())) {136LOG_WARN("fail to finish", KR(ret));137}138break;139default:140ret = OB_ERR_UNEXPECTED;141LOG_WARN("unexpected status", KR(ret), K(status_));142break;143}144}145}
146
147bool ObMViewRefreshStatsMaintenanceTask::is_retry_ret_code(int ret_code)148{
149return OB_EAGAIN == ret_code;150}
151
152void ObMViewRefreshStatsMaintenanceTask::switch_status(StatusType new_status, int ret_code)153{
154int ret = OB_SUCCESS;155if (OB_LIKELY(OB_SUCCESS == ret_code)) {156status_ = new_status;157} else if (is_retry_ret_code(ret_code)) {158// do nothing159} else {160status_ = StatusType::FAIL;161error_code_ = ret_code;162}163if (in_sched_ &&164OB_FAIL(schedule_task(MVREF_STATS_MAINTENANCE_SCHED_INTERVAL, false /*repeat*/))) {165LOG_WARN("fail to schedule mvref stats maintenance task", KR(ret));166}167}
168
169int ObMViewRefreshStatsMaintenanceTask::prepare()170{
171int ret = OB_SUCCESS;172uint64_t compat_version = 0;173if (start_time_ == -1) {174start_time_ = ObTimeUtil::current_time();175}176if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, compat_version))) {177LOG_WARN("fail to get data version", KR(ret), K_(tenant_id));178} else if (OB_UNLIKELY(compat_version < DATA_VERSION_4_3_0_0)) {179ret = OB_EAGAIN;180LOG_WARN("version lower than 4.3, try again", KR(ret), K_(tenant_id), K(compat_version));181} else {182++round_;183prepare_cost_us_ = ObTimeUtil::current_time() - start_time_;184LOG_INFO("mvref stats maintenance task prepare success", K(tenant_id_), K(round_),185K(prepare_cost_us_));186}187switch_status(StatusType::PURGE, ret);188return ret;189}
190
191int ObMViewRefreshStatsMaintenanceTask::purge()192{
193int ret = OB_SUCCESS;194StatusType new_status = StatusType::PURGE;195if (start_purge_time_ == -1) {196start_purge_time_ = ObTimeUtil::current_time();197}198if (mview_idx_ >= mview_ids_.count()) { // fetch next batch199mview_ids_.reset();200mview_idx_ = 0;201if (OB_FAIL(ObMViewInfo::batch_fetch_mview_ids(*GCTX.sql_proxy_, tenant_id_,202last_fetch_mview_id_, mview_ids_,203MVIEW_NUM_FETCH_PER_SCHED))) {204LOG_WARN("fail to batch fetch mview ids", KR(ret), K(tenant_id_), K(last_fetch_mview_id_));205} else {206fetch_mview_num_ += mview_ids_.count();207fetch_finish_ = mview_ids_.count() < MVIEW_NUM_FETCH_PER_SCHED;208if (!mview_ids_.empty()) {209last_fetch_mview_id_ = mview_ids_.at(mview_ids_.count() - 1);210}211}212} else { // purge current batch213int64_t purge_mview_num = 0;214int64_t purge_stats_num = 0;215int64_t affected_rows = 0;216while (OB_SUCC(ret) && mview_idx_ < mview_ids_.count() &&217purge_stats_num < MVREF_STATS_NUM_PURGE_PER_SCHED) {218const uint64_t mview_id = mview_ids_.at(mview_idx_);219const int64_t limit = MVREF_STATS_NUM_PURGE_PER_SCHED - purge_stats_num;220ObMViewRefreshStatsParams refresh_stats_params;221ObMViewRefreshStats::FilterParam filter_param;222if (OB_FAIL(ObMViewRefreshStatsParams::fetch_mview_refresh_stats_params(223*GCTX.sql_proxy_, tenant_id_, mview_id, refresh_stats_params,224true /*with_sys_defaults*/))) {225LOG_WARN("fail to fetch mview refresh stats params", KR(ret), K(tenant_id_), K(mview_id));226} else if (refresh_stats_params.get_retention_period() == -1) {227// never be purged, skip228affected_rows = 0;229} else {230filter_param.set_mview_id(mview_id);231filter_param.set_retention_period(refresh_stats_params.get_retention_period());232if (OB_FAIL(ObMViewRefreshStatsPurgeUtil::purge_refresh_stats(233*GCTX.sql_proxy_, tenant_id_, filter_param, affected_rows, limit))) {234LOG_WARN("fail to purge refresh stats", KR(ret), K(tenant_id_), K(filter_param),235K(limit));236}237}238if (OB_SUCC(ret)) {239purge_stats_num += affected_rows;240if (affected_rows < limit) {241++purge_mview_num;242++mview_idx_;243}244}245}246purge_mview_num_ += purge_mview_num;247purge_stats_num_ += purge_stats_num;248}249if (OB_SUCC(ret) && fetch_finish_ && mview_idx_ >= mview_ids_.count()) { // goto next status250purge_cost_us_ = ObTimeUtility::current_time() - start_purge_time_;251LOG_INFO("mvref stats maintenance task purge success", K(tenant_id_), K(round_),252K(purge_cost_us_), K(fetch_mview_num_), K(purge_mview_num_), K(purge_stats_num_));253new_status = StatusType::SUCCESS;254}255switch_status(new_status, ret);256return ret;257}
258
259int ObMViewRefreshStatsMaintenanceTask::finish()260{
261int ret = OB_SUCCESS;262cost_us_ = ObTimeUtility::current_time() - start_time_;263LOG_INFO("mvref stats maintenace task finish", K(tenant_id_), K(round_), K(status_),264K(error_code_), K(cost_us_), K(prepare_cost_us_), K(purge_cost_us_), K(fetch_mview_num_),265K(purge_mview_num_), K(purge_stats_num_));266// cleanup267cleanup();268// schedule next round269if (in_sched_ && OB_FAIL(schedule_task(MVREF_STATS_MAINTENANCE_INTERVAL, false /*repeat*/))) {270LOG_WARN("fail to schedule mvref stats maintenance task", KR(ret));271}272return ret;273}
274
275void ObMViewRefreshStatsMaintenanceTask::cleanup()276{
277status_ = StatusType::PREPARE;278error_code_ = OB_SUCCESS;279last_fetch_mview_id_ = OB_INVALID_ID;280mview_ids_.reset();281mview_idx_ = 0;282fetch_mview_num_ = 0;283purge_mview_num_ = 0;284purge_stats_num_ = 0;285start_time_ = -1;286start_purge_time_ = -1;287cost_us_ = -1;288prepare_cost_us_ = -1;289purge_cost_us_ = -1;290fetch_finish_ = false;291}
292
293} // namespace rootserver294} // namespace oceanbase295