oceanbase
222 строки · 8.3 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS
14#include "ob_snapshot_info_manager.h"
15#include "share/ob_snapshot_table_proxy.h"
16#include "share/schema/ob_schema_utils.h"
17#include "share/ob_ddl_common.h"
18#include "common/ob_timeout_ctx.h"
19#include "lib/mysqlclient/ob_mysql_transaction.h"
20#include "ob_rs_event_history_table_operator.h"
21
22using namespace oceanbase::common;
23using namespace oceanbase::share;
24using namespace oceanbase::share::schema;
25using namespace oceanbase::palf;
26namespace oceanbase
27{
28namespace rootserver
29{
30int ObSnapshotInfoManager::init(const ObAddr &self_addr)
31{
32int ret = OB_SUCCESS;
33if (!self_addr.is_valid()) {
34ret = OB_INVALID_ARGUMENT;
35LOG_WARN("invalid argument", K(ret), K(self_addr));
36} else {
37self_addr_ = self_addr;
38}
39return ret;
40}
41
42int ObSnapshotInfoManager::acquire_snapshot(
43common::ObMySQLTransaction &trans,
44const uint64_t tenant_id,
45const ObSnapshotInfo &snapshot)
46{
47int ret = OB_SUCCESS;
48ObSnapshotTableProxy snapshot_proxy;
49if (!snapshot.is_valid()) {
50ret = OB_INVALID_ARGUMENT;
51LOG_WARN("invalid argument", K(ret), K(snapshot));
52} else if (OB_FAIL(snapshot_proxy.add_snapshot(trans, snapshot))) {
53LOG_WARN("fail to add snapshot", K(ret), K(tenant_id), K(snapshot));
54}
55ROOTSERVICE_EVENT_ADD("snapshot", "acquire_snapshot", K(ret), K(snapshot), "rs_addr", self_addr_);
56return ret;
57}
58
59int ObSnapshotInfoManager::batch_acquire_snapshot(
60common::ObMySQLProxy &proxy,
61share::ObSnapShotType snapshot_type,
62const uint64_t tenant_id,
63const int64_t schema_version,
64const SCN &snapshot_scn,
65const char *comment,
66const common::ObIArray<ObTabletID> &tablet_ids)
67{
68int ret = OB_SUCCESS;
69ObMySQLTransaction trans;
70ObSnapshotTableProxy snapshot_proxy;
71ObSnapshotInfo snapshot;
72ObTimeoutCtx timeout_ctx;
73snapshot.snapshot_type_ = snapshot_type;
74snapshot.tenant_id_ = tenant_id;
75snapshot.snapshot_scn_ = snapshot_scn;
76snapshot.schema_version_ = schema_version;
77snapshot.comment_ = comment;
78if (OB_UNLIKELY(!snapshot.is_valid() || tablet_ids.count() <= 0)) {
79ret = OB_INVALID_ARGUMENT;
80LOG_WARN("invalid argument", K(ret), K(tablet_ids.count()));
81} else {
82int64_t rpc_timeout = 0;
83int64_t trx_timeout = 0;
84if (OB_FAIL(ObDDLUtil::get_ddl_rpc_timeout(tablet_ids.count(), rpc_timeout))) {
85LOG_WARN("get ddl rpc timeout failed", K(ret), K(tablet_ids.count()));
86} else if (OB_FAIL(ObDDLUtil::get_ddl_tx_timeout(tablet_ids.count(), trx_timeout))) {
87LOG_WARN("get ddl tx timeout failed", K(ret), K(tablet_ids.count()));
88} else if (OB_FAIL(timeout_ctx.set_trx_timeout_us(trx_timeout))) {
89LOG_WARN("set trx timeout failed", K(ret), K(trx_timeout));
90} else if (OB_FAIL(timeout_ctx.set_timeout(rpc_timeout))) {
91LOG_WARN("set timeout failed", K(ret), K(rpc_timeout));
92} else if (OB_FAIL(trans.start(&proxy, tenant_id))) {
93LOG_WARN("fail to start trans", K(ret), K(tenant_id));
94} else if (OB_FAIL(snapshot_proxy.batch_add_snapshot(trans, snapshot_type,
95tenant_id, schema_version, snapshot.snapshot_scn_, comment, tablet_ids))) {
96LOG_WARN("batch add snapshot failed", K(ret));
97} else {
98bool need_commit = (ret == OB_SUCCESS);
99int tmp_ret = trans.end(need_commit);
100if (OB_SUCCESS != tmp_ret) {
101LOG_WARN("fail to end trans", K(tmp_ret), K(need_commit));
102}
103ret = OB_SUCC(ret) ? tmp_ret : ret;
104}
105ROOTSERVICE_EVENT_ADD("snapshot", "batch_acquire_snapshot", K(ret), K(snapshot), "rs_addr", self_addr_);
106}
107
108return ret;
109}
110
111int ObSnapshotInfoManager::release_snapshot(
112common::ObMySQLTransaction &trans,
113const uint64_t tenant_id,
114const ObSnapshotInfo &snapshot)
115{
116int ret = OB_SUCCESS;
117ObSnapshotTableProxy snapshot_proxy;
118if (!snapshot.is_valid()) {
119ret = OB_INVALID_ARGUMENT;
120LOG_WARN("invalid argument", K(ret), K(snapshot));
121} else if (OB_FAIL(snapshot_proxy.remove_snapshot(trans, tenant_id, snapshot))) {
122LOG_WARN("fail to remove snapshot", K(ret), K(tenant_id), K(snapshot));
123}
124ROOTSERVICE_EVENT_ADD("snapshot", "release_snapshot", K(ret), K(snapshot), "rs_addr", self_addr_);
125return ret;
126}
127
128int ObSnapshotInfoManager::batch_release_snapshot_in_trans(
129common::ObMySQLTransaction &trans,
130share::ObSnapShotType snapshot_type,
131const uint64_t tenant_id,
132const int64_t schema_version,
133const SCN &snapshot_scn,
134const common::ObIArray<ObTabletID> &tablet_ids)
135{
136int ret = OB_SUCCESS;
137ObSnapshotTableProxy snapshot_proxy;
138ObSnapshotInfo snapshot;
139snapshot.snapshot_type_ = snapshot_type;
140snapshot.tenant_id_ = tenant_id;
141snapshot.snapshot_scn_ = snapshot_scn;
142snapshot.schema_version_ = schema_version;
143if (OB_UNLIKELY(tablet_ids.count() <= 0)) {
144ret = OB_INVALID_ARGUMENT;
145LOG_WARN("invalid argument", K(ret), K(tablet_ids.count()));
146} else if (OB_FAIL(snapshot_proxy.batch_remove_snapshots(trans,
147snapshot_type,
148tenant_id,
149schema_version,
150snapshot.snapshot_scn_,
151tablet_ids))) {
152LOG_WARN("fail to batch remove snapshots", K(ret));
153}
154ROOTSERVICE_EVENT_ADD("snapshot", "batch_release_snapshot", K(ret), K(snapshot), "rs_addr", self_addr_);
155return ret;
156}
157
158int ObSnapshotInfoManager::get_snapshot(common::ObMySQLProxy &proxy,
159const uint64_t tenant_id,
160share::ObSnapShotType snapshot_type,
161const char *extra_info,
162ObSnapshotInfo &snapshot_info)
163{
164int ret = OB_SUCCESS;
165ObSnapshotTableProxy snapshot_proxy;
166if (OB_FAIL(snapshot_proxy.get_snapshot(proxy, tenant_id, snapshot_type, extra_info, snapshot_info))) {
167if (OB_ITER_END == ret) {
168} else {
169LOG_WARN("fail to get snapshot", K(ret));
170}
171}
172
173return ret;
174}
175
176int ObSnapshotInfoManager::get_snapshot(common::ObMySQLProxy &proxy,
177const uint64_t tenant_id,
178share::ObSnapShotType snapshot_type,
179const SCN &snapshot_scn,
180share::ObSnapshotInfo &snapshot_info)
181{
182int ret = OB_SUCCESS;
183ObSnapshotTableProxy snapshot_proxy;
184if (OB_FAIL(snapshot_proxy.get_snapshot(proxy, tenant_id, snapshot_type, snapshot_scn, snapshot_info))) {
185if (OB_ITER_END == ret) {
186} else {
187LOG_WARN("fail to get snapshot", KR(ret), K(snapshot_type), K(snapshot_scn));
188}
189}
190return ret;
191}
192
193int ObSnapshotInfoManager::check_restore_point(common::ObMySQLProxy &proxy,
194const uint64_t tenant_id,
195const int64_t table_id,
196bool &is_exist)
197{
198int ret = OB_SUCCESS;
199is_exist = false;
200ObSnapshotTableProxy snapshot_proxy;
201if (OB_FAIL(snapshot_proxy.check_snapshot_exist(proxy, tenant_id, table_id,
202share::SNAPSHOT_FOR_RESTORE_POINT, is_exist))) {
203LOG_WARN("fail to check snapshot exist", K(ret), K(tenant_id), K(table_id));
204}
205return ret;
206}
207
208int ObSnapshotInfoManager::get_snapshot_count(common::ObMySQLProxy &proxy,
209const uint64_t tenant_id,
210share::ObSnapShotType snapshot_type,
211int64_t &count)
212{
213int ret = OB_SUCCESS;
214ObSnapshotTableProxy snapshot_proxy;
215if (OB_FAIL(snapshot_proxy.get_snapshot_count(proxy, tenant_id, snapshot_type, count))) {
216LOG_WARN("fail to get snapshot count", K(ret), K(tenant_id), K(snapshot_type));
217}
218return ret;
219}
220
221} //end rootserver
222} //end oceanbase
223
224