oceanbase

Форк
0
/
ob_mview_refresh_stats_maintenance_task.cpp 
294 строки · 9.1 Кб
1
/**
2
 * Copyright (c) 2023 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14

15
#include "rootserver/mview/ob_mview_refresh_stats_maintenance_task.h"
16
#include "observer/ob_server_struct.h"
17
#include "observer/omt/ob_multi_tenant.h"
18
#include "share/ob_errno.h"
19
#include "share/schema/ob_mview_info.h"
20
#include "share/schema/ob_mview_refresh_stats.h"
21
#include "share/schema/ob_mview_refresh_stats_params.h"
22
#include "storage/mview/ob_mview_refresh_stats_purge.h"
23

24
namespace oceanbase
25
{
26
namespace rootserver
27
{
28
using namespace common;
29
using namespace share::schema;
30

31
/**
32
 * ObMViewRefreshStatsMaintenanceTask
33
 */
34

35
ObMViewRefreshStatsMaintenanceTask::ObMViewRefreshStatsMaintenanceTask()
36
  : tenant_id_(OB_INVALID_TENANT_ID),
37
    round_(0),
38
    status_(StatusType::PREPARE),
39
    error_code_(OB_SUCCESS),
40
    last_fetch_mview_id_(OB_INVALID_ID),
41
    mview_idx_(0),
42
    fetch_mview_num_(0),
43
    purge_mview_num_(0),
44
    purge_stats_num_(0),
45
    start_time_(-1),
46
    start_purge_time_(-1),
47
    cost_us_(-1),
48
    prepare_cost_us_(-1),
49
    purge_cost_us_(-1),
50
    fetch_finish_(false),
51
    in_sched_(false),
52
    is_stop_(true),
53
    is_inited_(false)
54
{
55
}
56

57
ObMViewRefreshStatsMaintenanceTask::~ObMViewRefreshStatsMaintenanceTask() {}
58

59
int ObMViewRefreshStatsMaintenanceTask::init()
60
{
61
  int ret = OB_SUCCESS;
62
  if (IS_INIT) {
63
    ret = OB_INIT_TWICE;
64
    LOG_WARN("ObMViewRefreshStatsMaintenanceTask init twice", KR(ret), KP(this));
65
  } else {
66
    const uint64_t tenant_id = MTL_ID();
67
    tenant_id_ = tenant_id;
68
    mview_ids_.set_attr(ObMemAttr(tenant_id, "MVIds"));
69
    is_inited_ = true;
70
  }
71
  return ret;
72
}
73

74
int ObMViewRefreshStatsMaintenanceTask::start()
75
{
76
  int ret = OB_SUCCESS;
77
  if (IS_NOT_INIT) {
78
    ret = OB_NOT_INIT;
79
    LOG_WARN("ObMViewRefreshStatsMaintenanceTask not init", KR(ret), KP(this));
80
  } else {
81
    is_stop_ = false;
82
    if (!in_sched_ &&
83
        OB_FAIL(schedule_task(MVREF_STATS_MAINTENANCE_SCHED_INTERVAL, false /*repeat*/))) {
84
      LOG_WARN("fail to schedule mvref stats maintenance task", KR(ret));
85
    } else {
86
      in_sched_ = true;
87
    }
88
  }
89
  return ret;
90
}
91

92
void ObMViewRefreshStatsMaintenanceTask::stop()
93
{
94
  is_stop_ = true;
95
  in_sched_ = false;
96
  cancel_task();
97
}
98

99
void ObMViewRefreshStatsMaintenanceTask::wait() { wait_task(); }
100

101
void ObMViewRefreshStatsMaintenanceTask::destroy()
102
{
103
  is_inited_ = false;
104
  is_stop_ = true;
105
  in_sched_ = false;
106
  cancel_task();
107
  wait_task();
108
  cleanup();
109
  tenant_id_ = OB_INVALID_TENANT_ID;
110
  mview_ids_.destroy();
111
}
112

113
void ObMViewRefreshStatsMaintenanceTask::runTimerTask()
114
{
115
  int ret = OB_SUCCESS;
116
  if (IS_NOT_INIT) {
117
    ret = OB_NOT_INIT;
118
    LOG_WARN("ObMViewRefreshStatsMaintenanceTask not init", KR(ret), KP(this));
119
  } else if (OB_UNLIKELY(is_stop_)) {
120
    // do nothing
121
  } else {
122
    switch (status_) {
123
      case StatusType::PREPARE:
124
        if (OB_FAIL(prepare())) {
125
          LOG_WARN("fail to prepare", KR(ret));
126
        }
127
        break;
128
      case StatusType::PURGE:
129
        if (OB_FAIL(purge())) {
130
          LOG_WARN("fail to purge", KR(ret));
131
        }
132
        break;
133
      case StatusType::SUCCESS:
134
      case StatusType::FAIL:
135
        if (OB_FAIL(finish())) {
136
          LOG_WARN("fail to finish", KR(ret));
137
        }
138
        break;
139
      default:
140
        ret = OB_ERR_UNEXPECTED;
141
        LOG_WARN("unexpected status", KR(ret), K(status_));
142
        break;
143
    }
144
  }
145
}
146

147
bool ObMViewRefreshStatsMaintenanceTask::is_retry_ret_code(int ret_code)
148
{
149
  return OB_EAGAIN == ret_code;
150
}
151

152
void ObMViewRefreshStatsMaintenanceTask::switch_status(StatusType new_status, int ret_code)
153
{
154
  int ret = OB_SUCCESS;
155
  if (OB_LIKELY(OB_SUCCESS == ret_code)) {
156
    status_ = new_status;
157
  } else if (is_retry_ret_code(ret_code)) {
158
    // do nothing
159
  } else {
160
    status_ = StatusType::FAIL;
161
    error_code_ = ret_code;
162
  }
163
  if (in_sched_ &&
164
      OB_FAIL(schedule_task(MVREF_STATS_MAINTENANCE_SCHED_INTERVAL, false /*repeat*/))) {
165
    LOG_WARN("fail to schedule mvref stats maintenance task", KR(ret));
166
  }
167
}
168

169
int ObMViewRefreshStatsMaintenanceTask::prepare()
170
{
171
  int ret = OB_SUCCESS;
172
  uint64_t compat_version = 0;
173
  if (start_time_ == -1) {
174
    start_time_ = ObTimeUtil::current_time();
175
  }
176
  if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, compat_version))) {
177
    LOG_WARN("fail to get data version", KR(ret), K_(tenant_id));
178
  } else if (OB_UNLIKELY(compat_version < DATA_VERSION_4_3_0_0)) {
179
    ret = OB_EAGAIN;
180
    LOG_WARN("version lower than 4.3, try again", KR(ret), K_(tenant_id), K(compat_version));
181
  } else {
182
    ++round_;
183
    prepare_cost_us_ = ObTimeUtil::current_time() - start_time_;
184
    LOG_INFO("mvref stats maintenance task prepare success", K(tenant_id_), K(round_),
185
             K(prepare_cost_us_));
186
  }
187
  switch_status(StatusType::PURGE, ret);
188
  return ret;
189
}
190

191
int ObMViewRefreshStatsMaintenanceTask::purge()
192
{
193
  int ret = OB_SUCCESS;
194
  StatusType new_status = StatusType::PURGE;
195
  if (start_purge_time_ == -1) {
196
    start_purge_time_ = ObTimeUtil::current_time();
197
  }
198
  if (mview_idx_ >= mview_ids_.count()) { // fetch next batch
199
    mview_ids_.reset();
200
    mview_idx_ = 0;
201
    if (OB_FAIL(ObMViewInfo::batch_fetch_mview_ids(*GCTX.sql_proxy_, tenant_id_,
202
                                                   last_fetch_mview_id_, mview_ids_,
203
                                                   MVIEW_NUM_FETCH_PER_SCHED))) {
204
      LOG_WARN("fail to batch fetch mview ids", KR(ret), K(tenant_id_), K(last_fetch_mview_id_));
205
    } else {
206
      fetch_mview_num_ += mview_ids_.count();
207
      fetch_finish_ = mview_ids_.count() < MVIEW_NUM_FETCH_PER_SCHED;
208
      if (!mview_ids_.empty()) {
209
        last_fetch_mview_id_ = mview_ids_.at(mview_ids_.count() - 1);
210
      }
211
    }
212
  } else { // purge current batch
213
    int64_t purge_mview_num = 0;
214
    int64_t purge_stats_num = 0;
215
    int64_t affected_rows = 0;
216
    while (OB_SUCC(ret) && mview_idx_ < mview_ids_.count() &&
217
           purge_stats_num < MVREF_STATS_NUM_PURGE_PER_SCHED) {
218
      const uint64_t mview_id = mview_ids_.at(mview_idx_);
219
      const int64_t limit = MVREF_STATS_NUM_PURGE_PER_SCHED - purge_stats_num;
220
      ObMViewRefreshStatsParams refresh_stats_params;
221
      ObMViewRefreshStats::FilterParam filter_param;
222
      if (OB_FAIL(ObMViewRefreshStatsParams::fetch_mview_refresh_stats_params(
223
            *GCTX.sql_proxy_, tenant_id_, mview_id, refresh_stats_params,
224
            true /*with_sys_defaults*/))) {
225
        LOG_WARN("fail to fetch mview refresh stats params", KR(ret), K(tenant_id_), K(mview_id));
226
      } else if (refresh_stats_params.get_retention_period() == -1) {
227
        // never be purged, skip
228
        affected_rows = 0;
229
      } else {
230
        filter_param.set_mview_id(mview_id);
231
        filter_param.set_retention_period(refresh_stats_params.get_retention_period());
232
        if (OB_FAIL(ObMViewRefreshStatsPurgeUtil::purge_refresh_stats(
233
              *GCTX.sql_proxy_, tenant_id_, filter_param, affected_rows, limit))) {
234
          LOG_WARN("fail to purge refresh stats", KR(ret), K(tenant_id_), K(filter_param),
235
                   K(limit));
236
        }
237
      }
238
      if (OB_SUCC(ret)) {
239
        purge_stats_num += affected_rows;
240
        if (affected_rows < limit) {
241
          ++purge_mview_num;
242
          ++mview_idx_;
243
        }
244
      }
245
    }
246
    purge_mview_num_ += purge_mview_num;
247
    purge_stats_num_ += purge_stats_num;
248
  }
249
  if (OB_SUCC(ret) && fetch_finish_ && mview_idx_ >= mview_ids_.count()) { // goto next status
250
    purge_cost_us_ = ObTimeUtility::current_time() - start_purge_time_;
251
    LOG_INFO("mvref stats maintenance task purge success", K(tenant_id_), K(round_),
252
             K(purge_cost_us_), K(fetch_mview_num_), K(purge_mview_num_), K(purge_stats_num_));
253
    new_status = StatusType::SUCCESS;
254
  }
255
  switch_status(new_status, ret);
256
  return ret;
257
}
258

259
int ObMViewRefreshStatsMaintenanceTask::finish()
260
{
261
  int ret = OB_SUCCESS;
262
  cost_us_ = ObTimeUtility::current_time() - start_time_;
263
  LOG_INFO("mvref stats maintenace task finish", K(tenant_id_), K(round_), K(status_),
264
           K(error_code_), K(cost_us_), K(prepare_cost_us_), K(purge_cost_us_), K(fetch_mview_num_),
265
           K(purge_mview_num_), K(purge_stats_num_));
266
  // cleanup
267
  cleanup();
268
  // schedule next round
269
  if (in_sched_ && OB_FAIL(schedule_task(MVREF_STATS_MAINTENANCE_INTERVAL, false /*repeat*/))) {
270
    LOG_WARN("fail to schedule mvref stats maintenance task", KR(ret));
271
  }
272
  return ret;
273
}
274

275
void ObMViewRefreshStatsMaintenanceTask::cleanup()
276
{
277
  status_ = StatusType::PREPARE;
278
  error_code_ = OB_SUCCESS;
279
  last_fetch_mview_id_ = OB_INVALID_ID;
280
  mview_ids_.reset();
281
  mview_idx_ = 0;
282
  fetch_mview_num_ = 0;
283
  purge_mview_num_ = 0;
284
  purge_stats_num_ = 0;
285
  start_time_ = -1;
286
  start_purge_time_ = -1;
287
  cost_us_ = -1;
288
  prepare_cost_us_ = -1;
289
  purge_cost_us_ = -1;
290
  fetch_finish_ = false;
291
}
292

293
} // namespace rootserver
294
} // namespace oceanbase
295

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.