oceanbase
356 строк · 12.0 Кб
1/**
2* Copyright (c) 2023 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS
14
15#include "rootserver/mview/ob_mview_maintenance_task.h"
16#include "observer/dbms_scheduler/ob_dbms_sched_job_utils.h"
17#include "observer/ob_server_struct.h"
18#include "observer/omt/ob_multi_tenant.h"
19#include "share/ob_errno.h"
20#include "share/schema/ob_mview_info.h"
21#include "share/schema/ob_mview_refresh_stats_params.h"
22#include "storage/mview/ob_mview_refresh_stats_purge.h"
23
24namespace oceanbase
25{
26namespace rootserver
27{
28using namespace common;
29using namespace share::schema;
30using namespace dbms_scheduler;
31
32ObMViewMaintenanceTask::ObMViewMaintenanceTask()
33: tenant_id_(OB_INVALID_TENANT_ID),
34round_(0),
35status_(StatusType::PREPARE),
36error_code_(OB_SUCCESS),
37last_fetch_mview_id_(OB_INVALID_ID),
38mview_idx_(0),
39gc_mview_id_(OB_INVALID_ID),
40fetch_mview_num_(0),
41gc_mview_num_(0),
42gc_stats_num_(0),
43start_time_(-1),
44start_gc_mview_time_(-1),
45cost_us_(-1),
46prepare_cost_us_(-1),
47gc_mview_cost_us_(-1),
48fetch_finish_(false),
49in_sched_(false),
50is_stop_(true),
51is_inited_(false)
52{
53}
54
55ObMViewMaintenanceTask::~ObMViewMaintenanceTask() {}
56
57int ObMViewMaintenanceTask::init()
58{
59int ret = OB_SUCCESS;
60if (IS_INIT) {
61ret = OB_INIT_TWICE;
62LOG_WARN("ObMViewMaintenanceTask init twice", KR(ret), KP(this));
63} else {
64const uint64_t tenant_id = MTL_ID();
65tenant_id_ = tenant_id;
66mview_ids_.set_attr(ObMemAttr(tenant_id, "MVIds"));
67is_inited_ = true;
68}
69return ret;
70}
71
72int ObMViewMaintenanceTask::start()
73{
74int ret = OB_SUCCESS;
75if (IS_NOT_INIT) {
76ret = OB_NOT_INIT;
77LOG_WARN("ObMViewMaintenanceTask not init", KR(ret), KP(this));
78} else {
79is_stop_ = false;
80if (!in_sched_ && OB_FAIL(schedule_task(MVIEW_MAINTENANCE_SCHED_INTERVAL, false /*repeat*/))) {
81LOG_WARN("fail to schedule mview maintenance task", KR(ret));
82} else {
83in_sched_ = true;
84}
85}
86return ret;
87}
88
89void ObMViewMaintenanceTask::stop()
90{
91is_stop_ = true;
92in_sched_ = false;
93cancel_task();
94}
95
96void ObMViewMaintenanceTask::wait() { wait_task(); }
97
98void ObMViewMaintenanceTask::destroy()
99{
100is_inited_ = false;
101is_stop_ = true;
102in_sched_ = false;
103cancel_task();
104wait_task();
105cleanup();
106tenant_id_ = OB_INVALID_TENANT_ID;
107mview_ids_.destroy();
108}
109
110void ObMViewMaintenanceTask::runTimerTask()
111{
112int ret = OB_SUCCESS;
113if (IS_NOT_INIT) {
114ret = OB_NOT_INIT;
115LOG_WARN("ObMViewMaintenanceTask not init", KR(ret), KP(this));
116} else if (OB_UNLIKELY(is_stop_)) {
117// do nothing
118} else {
119switch (status_) {
120case StatusType::PREPARE:
121if (OB_FAIL(prepare())) {
122LOG_WARN("fail to prepare", KR(ret));
123}
124break;
125case StatusType::GC_MVIEW:
126if (OB_FAIL(gc_mview())) {
127LOG_WARN("fail to gc mview", KR(ret));
128}
129break;
130case StatusType::SUCCESS:
131case StatusType::FAIL:
132if (OB_FAIL(finish())) {
133LOG_WARN("fail to finish", KR(ret));
134}
135break;
136default:
137ret = OB_ERR_UNEXPECTED;
138LOG_WARN("unexpected status", KR(ret), K(status_));
139break;
140}
141}
142}
143
144bool ObMViewMaintenanceTask::is_retry_ret_code(int ret_code) { return OB_EAGAIN == ret_code; }
145
146void ObMViewMaintenanceTask::switch_status(StatusType new_status, int ret_code)
147{
148int ret = OB_SUCCESS;
149if (OB_LIKELY(OB_SUCCESS == ret_code)) {
150status_ = new_status;
151} else if (is_retry_ret_code(ret_code)) {
152// do nothing
153} else {
154status_ = StatusType::FAIL;
155error_code_ = ret_code;
156}
157if (in_sched_ && OB_FAIL(schedule_task(MVIEW_MAINTENANCE_SCHED_INTERVAL, false /*repeat*/))) {
158LOG_WARN("fail to schedule mview maintenance task", KR(ret));
159}
160}
161
162int ObMViewMaintenanceTask::prepare()
163{
164int ret = OB_SUCCESS;
165uint64_t compat_version = 0;
166if (start_time_ == -1) {
167start_time_ = ObTimeUtil::current_time();
168}
169if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, compat_version))) {
170LOG_WARN("fail to get data version", KR(ret), K_(tenant_id));
171} else if (OB_UNLIKELY(compat_version < DATA_VERSION_4_3_0_0)) {
172ret = OB_EAGAIN;
173LOG_WARN("version lower than 4.3, try again", KR(ret), K_(tenant_id), K(compat_version));
174} else {
175++round_;
176prepare_cost_us_ = ObTimeUtil::current_time() - start_time_;
177LOG_INFO("mview maintenance task prepare success", K(tenant_id_), K(round_),
178K(prepare_cost_us_));
179}
180switch_status(StatusType::GC_MVIEW, ret);
181return ret;
182}
183
184int ObMViewMaintenanceTask::gc_mview()
185{
186int ret = OB_SUCCESS;
187StatusType new_status = StatusType::GC_MVIEW;
188if (start_gc_mview_time_ == -1) {
189start_gc_mview_time_ = ObTimeUtil::current_time();
190}
191if (mview_idx_ >= mview_ids_.count() && OB_INVALID_ID == gc_mview_id_) { // fetch next batch
192mview_ids_.reset();
193mview_idx_ = 0;
194if (OB_FAIL(ObMViewInfo::batch_fetch_mview_ids(*GCTX.sql_proxy_, tenant_id_,
195last_fetch_mview_id_, mview_ids_,
196MVIEW_NUM_FETCH_PER_SCHED))) {
197LOG_WARN("fail to batch fetch mview ids", KR(ret), K(tenant_id_), K(last_fetch_mview_id_));
198} else {
199fetch_mview_num_ += mview_ids_.count();
200fetch_finish_ = mview_ids_.count() < MVIEW_NUM_FETCH_PER_SCHED;
201if (!mview_ids_.empty()) {
202last_fetch_mview_id_ = mview_ids_.at(mview_ids_.count() - 1);
203}
204}
205} else { // gc current batch
206ObSchemaGetterGuard schema_guard;
207int64_t tenant_schema_version = OB_INVALID_VERSION;
208int64_t gc_mview_num = 0;
209int64_t gc_stats_num = 0;
210int64_t affected_rows = 0;
211if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id_, schema_guard))) {
212LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id_));
213} else if (OB_FAIL(schema_guard.get_schema_version(tenant_id_, tenant_schema_version))) {
214LOG_WARN("fail to get schema version", KR(ret), K(tenant_id_));
215}
216while (OB_SUCC(ret) && (mview_idx_ < mview_ids_.count() || OB_INVALID_ID != gc_mview_id_) &&
217gc_stats_num < MVREF_STATS_NUM_PURGE_PER_SCHED) {
218const int64_t limit = MVREF_STATS_NUM_PURGE_PER_SCHED - gc_stats_num;
219if (OB_INVALID_ID == gc_mview_id_) { // check next mview id
220const uint64_t mview_id = mview_ids_.at(mview_idx_);
221ObMViewInfo mview_info;
222const ObTableSchema *table_schema = nullptr;
223bool is_exist = false;
224if (OB_FAIL(
225ObMViewInfo::fetch_mview_info(*GCTX.sql_proxy_, tenant_id_, mview_id, mview_info))) {
226LOG_WARN("fail to fetch mview info", KR(ret), K(tenant_id_), K(mview_id));
227} else if (mview_info.get_schema_version() > tenant_schema_version) {
228is_exist = true; // skip, wait next round
229} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, mview_id, table_schema))) {
230LOG_WARN("fail to get table schema", KR(ret), K(tenant_id_), K(mview_id));
231} else {
232is_exist = (nullptr != table_schema);
233}
234if (OB_SUCC(ret)) {
235++mview_idx_;
236if (!is_exist) {
237gc_mview_id_ = mview_id;
238LOG_INFO("gc one mview", K_(tenant_id), K(mview_id));
239}
240}
241} else {
242const uint64_t mview_id = gc_mview_id_;
243ObMViewRefreshStats::FilterParam filter_param;
244filter_param.set_mview_id(mview_id);
245if (OB_FAIL(ObMViewRefreshStatsPurgeUtil::purge_refresh_stats(
246*GCTX.sql_proxy_, tenant_id_, filter_param, affected_rows, limit))) {
247LOG_WARN("fail to purge refresh stats", KR(ret), K(tenant_id_), K(filter_param),
248K(limit));
249} else {
250gc_stats_num += affected_rows;
251}
252if (OB_SUCC(ret) && affected_rows < limit) {
253if (OB_FAIL(drop_mview(mview_id))) {
254LOG_WARN("fail to drop mview", KR(ret), K(mview_id));
255} else {
256gc_mview_id_ = OB_INVALID_ID;
257++gc_mview_num;
258}
259}
260}
261}
262gc_mview_num_ += gc_mview_num;
263gc_stats_num_ += gc_stats_num;
264}
265if (OB_SUCC(ret) && fetch_finish_ && mview_idx_ >= mview_ids_.count() &&
266OB_INVALID_ID == gc_mview_id_) { // goto next status
267gc_mview_cost_us_ = ObTimeUtility::current_time() - start_gc_mview_time_;
268LOG_INFO("mview maintenance task gc mview success", K(tenant_id_), K(round_),
269K(gc_mview_cost_us_), K(fetch_mview_num_), K(gc_mview_num_), K(gc_stats_num_));
270new_status = StatusType::SUCCESS;
271}
272switch_status(new_status, ret);
273return ret;
274}
275
276int ObMViewMaintenanceTask::finish()
277{
278int ret = OB_SUCCESS;
279cost_us_ = ObTimeUtility::current_time() - start_time_;
280LOG_INFO("mview maintenace task finish", K(tenant_id_), K(round_), K(status_), K(error_code_),
281K(cost_us_), K(prepare_cost_us_), K(gc_mview_cost_us_), K(fetch_mview_num_),
282K(gc_mview_num_), K(gc_stats_num_));
283// cleanup
284cleanup();
285// schedule next round
286if (in_sched_ && OB_FAIL(schedule_task(MVIEW_MAINTENANCE_INTERVAL, false /*repeat*/))) {
287LOG_WARN("fail to schedule mview maintenance task", KR(ret));
288}
289return ret;
290}
291
292void ObMViewMaintenanceTask::cleanup()
293{
294status_ = StatusType::PREPARE;
295error_code_ = OB_SUCCESS;
296last_fetch_mview_id_ = OB_INVALID_ID;
297mview_ids_.reset();
298mview_idx_ = 0;
299gc_mview_id_ = OB_INVALID_ID;
300fetch_mview_num_ = 0;
301gc_mview_num_ = 0;
302gc_stats_num_ = 0;
303start_time_ = -1;
304start_gc_mview_time_ = -1;
305cost_us_ = -1;
306prepare_cost_us_ = -1;
307gc_mview_cost_us_ = -1;
308fetch_finish_ = false;
309}
310
311int ObMViewMaintenanceTask::drop_mview(uint64_t mview_id)
312{
313int ret = OB_SUCCESS;
314if (OB_UNLIKELY(OB_INVALID_ID == mview_id)) {
315ret = OB_INVALID_ARGUMENT;
316LOG_WARN("invalid args", KR(ret), K(mview_id));
317} else {
318ObMySQLTransaction trans;
319ObMViewInfo mview_info;
320if (OB_FAIL(trans.start(GCTX.sql_proxy_, tenant_id_))) {
321LOG_WARN("fail to start trans", KR(ret), K(tenant_id_));
322} else if (OB_FAIL(ObMViewInfo::fetch_mview_info(trans, tenant_id_, mview_id, mview_info,
323true /*for_update*/, true /*nowait*/))) {
324if (OB_LIKELY(OB_ERR_EXCLUSIVE_LOCK_CONFLICT_NOWAIT == ret)) {
325LOG_WARN("can not lock mview info", KR(ret), K_(tenant_id), K(mview_id));
326ret = OB_SUCCESS; // skip, wait next round
327} else if (OB_LIKELY(OB_ENTRY_NOT_EXIST == ret)) {
328LOG_WARN("mview info not exist", KR(ret), K_(tenant_id), K(mview_id));
329ret = OB_SUCCESS;
330} else {
331LOG_WARN("fail to fetch mview info", KR(ret), K(tenant_id_), K(mview_id));
332}
333} else if (OB_FAIL(ObMViewRefreshStatsParams::drop_mview_refresh_stats_params(
334trans, tenant_id_, mview_id, true /*if_exists*/))) {
335LOG_WARN("fail to drop mview refresh stats params", KR(ret), K(tenant_id_), K(mview_id));
336} else if (!mview_info.get_refresh_job().empty() &&
337OB_FAIL(ObDBMSSchedJobUtils::remove_dbms_sched_job(
338trans, tenant_id_, mview_info.get_refresh_job(), true /*if_exists*/))) {
339LOG_WARN("fail to remove dbms sched job", KR(ret), K(tenant_id_), "job_name",
340mview_info.get_refresh_job());
341} else if (OB_FAIL(ObMViewInfo::drop_mview_info(trans, mview_info))) {
342LOG_WARN("fail to drop mview info", KR(ret), K(mview_info));
343}
344if (trans.is_started()) {
345int tmp_ret = OB_SUCCESS;
346if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
347LOG_WARN("failed to commit trans", KR(ret), KR(tmp_ret));
348ret = COVER_SUCC(tmp_ret);
349}
350}
351}
352return ret;
353}
354
355} // namespace rootserver
356} // namespace oceanbase
357