oceanbase

Форк
0
/
ob_mview_maintenance_task.cpp 
356 строк · 12.0 Кб
1
/**
2
 * Copyright (c) 2023 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14

15
#include "rootserver/mview/ob_mview_maintenance_task.h"
16
#include "observer/dbms_scheduler/ob_dbms_sched_job_utils.h"
17
#include "observer/ob_server_struct.h"
18
#include "observer/omt/ob_multi_tenant.h"
19
#include "share/ob_errno.h"
20
#include "share/schema/ob_mview_info.h"
21
#include "share/schema/ob_mview_refresh_stats_params.h"
22
#include "storage/mview/ob_mview_refresh_stats_purge.h"
23

24
namespace oceanbase
25
{
26
namespace rootserver
27
{
28
using namespace common;
29
using namespace share::schema;
30
using namespace dbms_scheduler;
31

32
ObMViewMaintenanceTask::ObMViewMaintenanceTask()
33
  : tenant_id_(OB_INVALID_TENANT_ID),
34
    round_(0),
35
    status_(StatusType::PREPARE),
36
    error_code_(OB_SUCCESS),
37
    last_fetch_mview_id_(OB_INVALID_ID),
38
    mview_idx_(0),
39
    gc_mview_id_(OB_INVALID_ID),
40
    fetch_mview_num_(0),
41
    gc_mview_num_(0),
42
    gc_stats_num_(0),
43
    start_time_(-1),
44
    start_gc_mview_time_(-1),
45
    cost_us_(-1),
46
    prepare_cost_us_(-1),
47
    gc_mview_cost_us_(-1),
48
    fetch_finish_(false),
49
    in_sched_(false),
50
    is_stop_(true),
51
    is_inited_(false)
52
{
53
}
54

55
ObMViewMaintenanceTask::~ObMViewMaintenanceTask() {}
56

57
int ObMViewMaintenanceTask::init()
58
{
59
  int ret = OB_SUCCESS;
60
  if (IS_INIT) {
61
    ret = OB_INIT_TWICE;
62
    LOG_WARN("ObMViewMaintenanceTask init twice", KR(ret), KP(this));
63
  } else {
64
    const uint64_t tenant_id = MTL_ID();
65
    tenant_id_ = tenant_id;
66
    mview_ids_.set_attr(ObMemAttr(tenant_id, "MVIds"));
67
    is_inited_ = true;
68
  }
69
  return ret;
70
}
71

72
int ObMViewMaintenanceTask::start()
73
{
74
  int ret = OB_SUCCESS;
75
  if (IS_NOT_INIT) {
76
    ret = OB_NOT_INIT;
77
    LOG_WARN("ObMViewMaintenanceTask not init", KR(ret), KP(this));
78
  } else {
79
    is_stop_ = false;
80
    if (!in_sched_ && OB_FAIL(schedule_task(MVIEW_MAINTENANCE_SCHED_INTERVAL, false /*repeat*/))) {
81
      LOG_WARN("fail to schedule mview maintenance task", KR(ret));
82
    } else {
83
      in_sched_ = true;
84
    }
85
  }
86
  return ret;
87
}
88

89
void ObMViewMaintenanceTask::stop()
90
{
91
  is_stop_ = true;
92
  in_sched_ = false;
93
  cancel_task();
94
}
95

96
void ObMViewMaintenanceTask::wait() { wait_task(); }
97

98
void ObMViewMaintenanceTask::destroy()
99
{
100
  is_inited_ = false;
101
  is_stop_ = true;
102
  in_sched_ = false;
103
  cancel_task();
104
  wait_task();
105
  cleanup();
106
  tenant_id_ = OB_INVALID_TENANT_ID;
107
  mview_ids_.destroy();
108
}
109

110
void ObMViewMaintenanceTask::runTimerTask()
111
{
112
  int ret = OB_SUCCESS;
113
  if (IS_NOT_INIT) {
114
    ret = OB_NOT_INIT;
115
    LOG_WARN("ObMViewMaintenanceTask not init", KR(ret), KP(this));
116
  } else if (OB_UNLIKELY(is_stop_)) {
117
    // do nothing
118
  } else {
119
    switch (status_) {
120
      case StatusType::PREPARE:
121
        if (OB_FAIL(prepare())) {
122
          LOG_WARN("fail to prepare", KR(ret));
123
        }
124
        break;
125
      case StatusType::GC_MVIEW:
126
        if (OB_FAIL(gc_mview())) {
127
          LOG_WARN("fail to gc mview", KR(ret));
128
        }
129
        break;
130
      case StatusType::SUCCESS:
131
      case StatusType::FAIL:
132
        if (OB_FAIL(finish())) {
133
          LOG_WARN("fail to finish", KR(ret));
134
        }
135
        break;
136
      default:
137
        ret = OB_ERR_UNEXPECTED;
138
        LOG_WARN("unexpected status", KR(ret), K(status_));
139
        break;
140
    }
141
  }
142
}
143

144
bool ObMViewMaintenanceTask::is_retry_ret_code(int ret_code) { return OB_EAGAIN == ret_code; }
145

146
void ObMViewMaintenanceTask::switch_status(StatusType new_status, int ret_code)
147
{
148
  int ret = OB_SUCCESS;
149
  if (OB_LIKELY(OB_SUCCESS == ret_code)) {
150
    status_ = new_status;
151
  } else if (is_retry_ret_code(ret_code)) {
152
    // do nothing
153
  } else {
154
    status_ = StatusType::FAIL;
155
    error_code_ = ret_code;
156
  }
157
  if (in_sched_ && OB_FAIL(schedule_task(MVIEW_MAINTENANCE_SCHED_INTERVAL, false /*repeat*/))) {
158
    LOG_WARN("fail to schedule mview maintenance task", KR(ret));
159
  }
160
}
161

162
int ObMViewMaintenanceTask::prepare()
163
{
164
  int ret = OB_SUCCESS;
165
  uint64_t compat_version = 0;
166
  if (start_time_ == -1) {
167
    start_time_ = ObTimeUtil::current_time();
168
  }
169
  if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, compat_version))) {
170
    LOG_WARN("fail to get data version", KR(ret), K_(tenant_id));
171
  } else if (OB_UNLIKELY(compat_version < DATA_VERSION_4_3_0_0)) {
172
    ret = OB_EAGAIN;
173
    LOG_WARN("version lower than 4.3, try again", KR(ret), K_(tenant_id), K(compat_version));
174
  } else {
175
    ++round_;
176
    prepare_cost_us_ = ObTimeUtil::current_time() - start_time_;
177
    LOG_INFO("mview maintenance task prepare success", K(tenant_id_), K(round_),
178
             K(prepare_cost_us_));
179
  }
180
  switch_status(StatusType::GC_MVIEW, ret);
181
  return ret;
182
}
183

184
int ObMViewMaintenanceTask::gc_mview()
185
{
186
  int ret = OB_SUCCESS;
187
  StatusType new_status = StatusType::GC_MVIEW;
188
  if (start_gc_mview_time_ == -1) {
189
    start_gc_mview_time_ = ObTimeUtil::current_time();
190
  }
191
  if (mview_idx_ >= mview_ids_.count() && OB_INVALID_ID == gc_mview_id_) { // fetch next batch
192
    mview_ids_.reset();
193
    mview_idx_ = 0;
194
    if (OB_FAIL(ObMViewInfo::batch_fetch_mview_ids(*GCTX.sql_proxy_, tenant_id_,
195
                                                   last_fetch_mview_id_, mview_ids_,
196
                                                   MVIEW_NUM_FETCH_PER_SCHED))) {
197
      LOG_WARN("fail to batch fetch mview ids", KR(ret), K(tenant_id_), K(last_fetch_mview_id_));
198
    } else {
199
      fetch_mview_num_ += mview_ids_.count();
200
      fetch_finish_ = mview_ids_.count() < MVIEW_NUM_FETCH_PER_SCHED;
201
      if (!mview_ids_.empty()) {
202
        last_fetch_mview_id_ = mview_ids_.at(mview_ids_.count() - 1);
203
      }
204
    }
205
  } else { // gc current batch
206
    ObSchemaGetterGuard schema_guard;
207
    int64_t tenant_schema_version = OB_INVALID_VERSION;
208
    int64_t gc_mview_num = 0;
209
    int64_t gc_stats_num = 0;
210
    int64_t affected_rows = 0;
211
    if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id_, schema_guard))) {
212
      LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id_));
213
    } else if (OB_FAIL(schema_guard.get_schema_version(tenant_id_, tenant_schema_version))) {
214
      LOG_WARN("fail to get schema version", KR(ret), K(tenant_id_));
215
    }
216
    while (OB_SUCC(ret) && (mview_idx_ < mview_ids_.count() || OB_INVALID_ID != gc_mview_id_) &&
217
           gc_stats_num < MVREF_STATS_NUM_PURGE_PER_SCHED) {
218
      const int64_t limit = MVREF_STATS_NUM_PURGE_PER_SCHED - gc_stats_num;
219
      if (OB_INVALID_ID == gc_mview_id_) { // check next mview id
220
        const uint64_t mview_id = mview_ids_.at(mview_idx_);
221
        ObMViewInfo mview_info;
222
        const ObTableSchema *table_schema = nullptr;
223
        bool is_exist = false;
224
        if (OB_FAIL(
225
              ObMViewInfo::fetch_mview_info(*GCTX.sql_proxy_, tenant_id_, mview_id, mview_info))) {
226
          LOG_WARN("fail to fetch mview info", KR(ret), K(tenant_id_), K(mview_id));
227
        } else if (mview_info.get_schema_version() > tenant_schema_version) {
228
          is_exist = true; // skip, wait next round
229
        } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, mview_id, table_schema))) {
230
          LOG_WARN("fail to get table schema", KR(ret), K(tenant_id_), K(mview_id));
231
        } else {
232
          is_exist = (nullptr != table_schema);
233
        }
234
        if (OB_SUCC(ret)) {
235
          ++mview_idx_;
236
          if (!is_exist) {
237
            gc_mview_id_ = mview_id;
238
            LOG_INFO("gc one mview", K_(tenant_id), K(mview_id));
239
          }
240
        }
241
      } else {
242
        const uint64_t mview_id = gc_mview_id_;
243
        ObMViewRefreshStats::FilterParam filter_param;
244
        filter_param.set_mview_id(mview_id);
245
        if (OB_FAIL(ObMViewRefreshStatsPurgeUtil::purge_refresh_stats(
246
              *GCTX.sql_proxy_, tenant_id_, filter_param, affected_rows, limit))) {
247
          LOG_WARN("fail to purge refresh stats", KR(ret), K(tenant_id_), K(filter_param),
248
                   K(limit));
249
        } else {
250
          gc_stats_num += affected_rows;
251
        }
252
        if (OB_SUCC(ret) && affected_rows < limit) {
253
          if (OB_FAIL(drop_mview(mview_id))) {
254
            LOG_WARN("fail to drop mview", KR(ret), K(mview_id));
255
          } else {
256
            gc_mview_id_ = OB_INVALID_ID;
257
            ++gc_mview_num;
258
          }
259
        }
260
      }
261
    }
262
    gc_mview_num_ += gc_mview_num;
263
    gc_stats_num_ += gc_stats_num;
264
  }
265
  if (OB_SUCC(ret) && fetch_finish_ && mview_idx_ >= mview_ids_.count() &&
266
      OB_INVALID_ID == gc_mview_id_) { // goto next status
267
    gc_mview_cost_us_ = ObTimeUtility::current_time() - start_gc_mview_time_;
268
    LOG_INFO("mview maintenance task gc mview success", K(tenant_id_), K(round_),
269
             K(gc_mview_cost_us_), K(fetch_mview_num_), K(gc_mview_num_), K(gc_stats_num_));
270
    new_status = StatusType::SUCCESS;
271
  }
272
  switch_status(new_status, ret);
273
  return ret;
274
}
275

276
int ObMViewMaintenanceTask::finish()
277
{
278
  int ret = OB_SUCCESS;
279
  cost_us_ = ObTimeUtility::current_time() - start_time_;
280
  LOG_INFO("mview maintenace task finish", K(tenant_id_), K(round_), K(status_), K(error_code_),
281
           K(cost_us_), K(prepare_cost_us_), K(gc_mview_cost_us_), K(fetch_mview_num_),
282
           K(gc_mview_num_), K(gc_stats_num_));
283
  // cleanup
284
  cleanup();
285
  // schedule next round
286
  if (in_sched_ && OB_FAIL(schedule_task(MVIEW_MAINTENANCE_INTERVAL, false /*repeat*/))) {
287
    LOG_WARN("fail to schedule mview maintenance task", KR(ret));
288
  }
289
  return ret;
290
}
291

292
void ObMViewMaintenanceTask::cleanup()
293
{
294
  status_ = StatusType::PREPARE;
295
  error_code_ = OB_SUCCESS;
296
  last_fetch_mview_id_ = OB_INVALID_ID;
297
  mview_ids_.reset();
298
  mview_idx_ = 0;
299
  gc_mview_id_ = OB_INVALID_ID;
300
  fetch_mview_num_ = 0;
301
  gc_mview_num_ = 0;
302
  gc_stats_num_ = 0;
303
  start_time_ = -1;
304
  start_gc_mview_time_ = -1;
305
  cost_us_ = -1;
306
  prepare_cost_us_ = -1;
307
  gc_mview_cost_us_ = -1;
308
  fetch_finish_ = false;
309
}
310

311
int ObMViewMaintenanceTask::drop_mview(uint64_t mview_id)
312
{
313
  int ret = OB_SUCCESS;
314
  if (OB_UNLIKELY(OB_INVALID_ID == mview_id)) {
315
    ret = OB_INVALID_ARGUMENT;
316
    LOG_WARN("invalid args", KR(ret), K(mview_id));
317
  } else {
318
    ObMySQLTransaction trans;
319
    ObMViewInfo mview_info;
320
    if (OB_FAIL(trans.start(GCTX.sql_proxy_, tenant_id_))) {
321
      LOG_WARN("fail to start trans", KR(ret), K(tenant_id_));
322
    } else if (OB_FAIL(ObMViewInfo::fetch_mview_info(trans, tenant_id_, mview_id, mview_info,
323
                                                     true /*for_update*/, true /*nowait*/))) {
324
      if (OB_LIKELY(OB_ERR_EXCLUSIVE_LOCK_CONFLICT_NOWAIT == ret)) {
325
        LOG_WARN("can not lock mview info", KR(ret), K_(tenant_id), K(mview_id));
326
        ret = OB_SUCCESS; // skip, wait next round
327
      } else if (OB_LIKELY(OB_ENTRY_NOT_EXIST == ret)) {
328
        LOG_WARN("mview info not exist", KR(ret), K_(tenant_id), K(mview_id));
329
        ret = OB_SUCCESS;
330
      } else {
331
        LOG_WARN("fail to fetch mview info", KR(ret), K(tenant_id_), K(mview_id));
332
      }
333
    } else if (OB_FAIL(ObMViewRefreshStatsParams::drop_mview_refresh_stats_params(
334
                 trans, tenant_id_, mview_id, true /*if_exists*/))) {
335
      LOG_WARN("fail to drop mview refresh stats params", KR(ret), K(tenant_id_), K(mview_id));
336
    } else if (!mview_info.get_refresh_job().empty() &&
337
               OB_FAIL(ObDBMSSchedJobUtils::remove_dbms_sched_job(
338
                 trans, tenant_id_, mview_info.get_refresh_job(), true /*if_exists*/))) {
339
      LOG_WARN("fail to remove dbms sched job", KR(ret), K(tenant_id_), "job_name",
340
               mview_info.get_refresh_job());
341
    } else if (OB_FAIL(ObMViewInfo::drop_mview_info(trans, mview_info))) {
342
      LOG_WARN("fail to drop mview info", KR(ret), K(mview_info));
343
    }
344
    if (trans.is_started()) {
345
      int tmp_ret = OB_SUCCESS;
346
      if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
347
        LOG_WARN("failed to commit trans", KR(ret), KR(tmp_ret));
348
        ret = COVER_SUCC(tmp_ret);
349
      }
350
    }
351
  }
352
  return ret;
353
}
354

355
} // namespace rootserver
356
} // namespace oceanbase
357

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.