oceanbase
342 строки · 13.8 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS
14#include "ob_tablet_drop.h"
15#include "ob_root_service.h"
16#include "share/ob_share_util.h"
17#include "share/tablet/ob_tablet_to_table_history_operator.h" // ObTabletToTableHistoryOperator
18#include "share/tablet/ob_tablet_to_ls_operator.h"
19#include "observer/ob_inner_sql_connection.h"
20
21namespace oceanbase
22{
23namespace rootserver
24{
25
26ObTabletDrop::~ObTabletDrop()
27{
28FOREACH(iter, args_map_) {
29if (OB_NOT_NULL(iter->second)
30&& FALSE_IT(iter->second->~ObIArray<ObTabletID>())) {
31}
32}
33}
34
35void ObTabletDrop::reset()
36{
37FOREACH(iter, args_map_) {
38if (OB_NOT_NULL(iter->second)
39&& FALSE_IT(iter->second->~ObIArray<ObTabletID>())) {
40}
41}
42args_map_.clear();
43}
44
45int ObTabletDrop::init()
46{
47int ret = OB_SUCCESS;
48if (OB_UNLIKELY(inited_)) {
49ret = OB_INIT_TWICE;
50LOG_WARN("ObTabletDrop init twice", KR(ret));
51} else if (OB_FAIL(args_map_.create(MAP_BUCKET_NUM, "TabletCtr"))) {
52LOG_WARN("fail to args map", KR(ret));
53} else {
54inited_ = true;
55}
56return ret;
57}
58
59int ObTabletDrop::add_drop_tablets_of_table_arg(
60const common::ObIArray<const share::schema::ObTableSchema*> &schemas)
61{
62int ret = OB_SUCCESS;
63common::ObArray<share::ObLSID> ls_ids;
64int64_t all_part_num = 0;
65if (OB_UNLIKELY(!inited_)) {
66ret = OB_NOT_INIT;
67LOG_WARN("ObTabletCreator not init", KR(ret));
68} else if (schemas.count() < 1) {
69ret = OB_INVALID_ARGUMENT;
70LOG_WARN("schemas count is less 1", KR(ret), K(schemas));
71} else if (OB_ISNULL(schemas.at(0))) {
72ret = OB_INVALID_ARGUMENT;
73LOG_WARN("NULL ptr", KR(ret), K(schemas));
74} else {
75const share::schema::ObTableSchema &table_schema = *schemas.at(0);
76if (is_inner_table(table_schema.get_table_id())) {
77ret = OB_INVALID_ARGUMENT;
78LOG_WARN("sys table cannot drop", K(table_schema), KR(ret));
79} else if (schemas.count() > 1) {
80int64_t data_table_id = OB_INVALID_ID;
81if (table_schema.is_index_local_storage()
82|| table_schema.is_aux_lob_table()
83|| table_schema.is_mlog_table()) {
84data_table_id = table_schema.get_data_table_id();
85} else {
86data_table_id = table_schema.get_table_id();
87}
88for (int64_t i = 1; OB_SUCC(ret) && i < schemas.count(); ++i) {
89const share::schema::ObTableSchema *aux_table_schema = schemas.at(i);
90if (OB_ISNULL(aux_table_schema)) {
91ret = OB_INVALID_ARGUMENT;
92LOG_WARN("ptr is null", KR(ret), K(schemas));
93} else if (is_inner_table(aux_table_schema->get_table_id())) {
94ret = OB_INVALID_ARGUMENT;
95LOG_WARN("sys table cannot drop", KPC(aux_table_schema), KR(ret));
96} else if (!aux_table_schema->is_index_local_storage()
97&& !aux_table_schema->is_aux_lob_table()
98&& !aux_table_schema->is_mlog_table()) {
99ret = OB_INVALID_ARGUMENT;
100LOG_WARN("aux_table_schema must be local index or aux lob table", KR(ret), K(schemas), KPC(aux_table_schema));
101} else if (data_table_id != aux_table_schema->get_data_table_id()) {
102ret = OB_INVALID_ARGUMENT;
103LOG_WARN("aux table schema must be of same data table", KR(ret), K(schemas), KPC(aux_table_schema));
104}
105}
106}
107if (OB_FAIL(ret)) {
108} else if (OB_FAIL(get_ls_from_table(table_schema, ls_ids))) {
109LOG_WARN("fail to get ls from table", KR(ret));
110} else {
111int64_t ls_idx = 0;
112ObPartitionLevel part_level = table_schema.get_part_level();
113if (PARTITION_LEVEL_ZERO == part_level) {
114if (OB_FAIL(drop_tablet_(schemas, ls_ids.at(ls_idx++),
115OB_INVALID_INDEX, OB_INVALID_INDEX))) {
116LOG_WARN("fail to drop tablet", K(table_schema), KR(ret));
117}
118} else {
119ObPartition **part_array = table_schema.get_part_array();
120int64_t part_num = table_schema.get_partition_num();
121if (OB_ISNULL(part_array)) {
122ret = OB_ERR_UNEXPECTED;
123LOG_WARN("part array is null", K(table_schema), KR(ret));
124} else {
125for (int64_t i = 0; i < part_num && OB_SUCC(ret); ++i) {
126if (OB_ISNULL(part_array[i])) {
127ret = OB_ERR_UNEXPECTED;
128LOG_WARN("NULL ptr", K(i), K(table_schema), KR(ret));
129} else if (PARTITION_LEVEL_ONE == part_level) {
130if (OB_FAIL(drop_tablet_(schemas, ls_ids.at(ls_idx++), i,
131OB_INVALID_INDEX))) {
132LOG_WARN("fail to drop tablet", K(table_schema), KR(ret));
133}
134} else if (PARTITION_LEVEL_TWO == part_level) {
135ObSubPartition **subpart_array = part_array[i]->get_subpart_array();
136int64_t sub_part_num = part_array[i]->get_subpartition_num();
137if (OB_ISNULL(subpart_array)) {
138ret = OB_ERR_UNEXPECTED;
139LOG_WARN("part array is null", K(table_schema), KR(ret));
140} else {
141for (int64_t j = 0; j < sub_part_num && OB_SUCC(ret); j++) {
142if (OB_ISNULL(subpart_array[j])) {
143ret = OB_ERR_UNEXPECTED;
144LOG_WARN("NULL ptr", K(j), K(table_schema), KR(ret));
145} else {
146if (OB_FAIL(drop_tablet_(schemas, ls_ids.at(ls_idx++), i, j))) {
147LOG_WARN("fail to drop tablet", K(table_schema), KR(ret));
148}
149}
150}
151}
152} else {
153ret = OB_ERR_UNEXPECTED;
154LOG_WARN("4.0 not support part type", K(table_schema), KR(ret));
155}
156}
157}
158}
159}
160}
161return ret;
162}
163
164int ObTabletDrop::get_ls_from_table(const share::schema::ObTableSchema &table_schema,
165common::ObIArray<share::ObLSID> &assign_ls_id_array)
166{
167int ret = OB_SUCCESS;
168int64_t all_part_num = 0;
169if (-1 == (all_part_num = table_schema.get_all_part_num())) {
170ret = OB_ERR_UNEXPECTED;
171LOG_WARN("fail to get tablet num", K(table_schema), KR(ret));
172} else if (OB_FAIL(assign_ls_id_array.reserve(all_part_num))) {
173LOG_WARN("fail to reserve array", KR(ret), K(all_part_num));
174} else if (is_sys_tenant(table_schema.get_tenant_id())) {
175for (int64_t i = 0; i < all_part_num && OB_SUCC(ret); i++) {
176if (OB_FAIL(assign_ls_id_array.push_back(share::SYS_LS))) {
177LOG_WARN("failed to push_back", KR(ret));
178}
179}
180} else {
181ObArray<ObTabletID> tablet_ids;
182if (OB_FAIL(table_schema.get_tablet_ids(tablet_ids))) {
183LOG_WARN("fail to get tablet ids", KR(ret), K(table_schema));
184} else if (OB_FAIL(share::ObTabletToLSTableOperator::batch_get_ls(trans_, tenant_id_, tablet_ids, assign_ls_id_array))) {
185LOG_WARN("fail to batch_get_ls", KR(ret), K(tenant_id_), K(table_schema));
186}
187}
188
189if (OB_FAIL(ret)) {
190} else if (assign_ls_id_array.count() != all_part_num) {
191ret = OB_ERR_UNEXPECTED;
192LOG_WARN("the ls of tablet is not equal partition num",
193KR(ret), K(assign_ls_id_array.count()), K(all_part_num));
194}
195return ret;
196}
197
198int ObTabletDrop::drop_tablet_(
199const common::ObIArray<const share::schema::ObTableSchema *> &table_schema_ptr_array,
200const share::ObLSID &ls_id,
201const int64_t part_idx,
202const int64_t subpart_idx)
203{
204int ret = OB_SUCCESS;
205common::ObIArray<ObTabletID> *tablet_id_array = NULL;
206
207if (table_schema_ptr_array.count() < 1) {
208ret = OB_ERR_UNEXPECTED;
209LOG_WARN("table_schema_ptr_array is empty", K(table_schema_ptr_array), KR(ret));
210} else if (OB_SUCC(args_map_.get_refactored(ls_id, tablet_id_array))) {
211//already exist
212} else if (OB_HASH_NOT_EXIST == ret) {
213//create new tablet_id_array
214void *ptr1 = allocator_.alloc(sizeof(ObArray<ObTabletID>));
215void *ptr2 = allocator_.alloc(sizeof(ObArray<ObTabletID>));
216if (OB_ISNULL(ptr1) || OB_ISNULL(ptr2)) {
217ret = OB_ALLOCATE_MEMORY_FAILED;
218LOG_WARN("fail alloc memory", KR(ret));
219} else {
220tablet_id_array = new (ptr1)ObArray<ObTabletID, ObIAllocator &>(
221OB_MALLOC_NORMAL_BLOCK_SIZE, allocator_);
222if (OB_FAIL(args_map_.set_refactored(ls_id, tablet_id_array, 0/*not overwrite*/))) {
223LOG_WARN("fail to set refactored", KR(ret), K(ls_id));
224}
225}
226} else {
227LOG_WARN("fail to get element from args_map", KR(ret), K(ls_id));
228}
229
230if (OB_FAIL(ret)) {
231} else if (OB_ISNULL(tablet_id_array)) {
232ret = OB_ERR_UNEXPECTED;
233LOG_WARN("NULL ptr", K(table_schema_ptr_array), KR(ret));
234} else if (OB_ISNULL(table_schema_ptr_array.at(0))) {
235ret = OB_ERR_UNEXPECTED;
236LOG_WARN("NULL ptr", K(table_schema_ptr_array), KR(ret));
237} else {
238bool only_drop_index = table_schema_ptr_array.at(0)->is_index_local_storage();
239ObBasePartition *first_table_part = NULL;
240ObBasePartition *part = NULL;
241for (int r = 0; r < table_schema_ptr_array.count() && OB_SUCC(ret); r++) {
242const share::schema::ObTableSchema *table_schema_ptr = table_schema_ptr_array.at(r);
243if (OB_ISNULL(table_schema_ptr)) {
244ret = OB_ERR_UNEXPECTED;
245LOG_WARN("NULL ptr", K(r), K(table_schema_ptr_array), KR(ret));
246} else if (PARTITION_LEVEL_ZERO == table_schema_ptr->get_part_level()) {
247ObTabletID tablet_id = table_schema_ptr->get_tablet_id();
248if (OB_FAIL(tablet_id_array->push_back(tablet_id))) {
249LOG_WARN("failed to assign table schema point", KR(ret), KPC(table_schema_ptr));
250}
251} else if (OB_FAIL(table_schema_ptr->get_part_by_idx(part_idx, subpart_idx, part))) {
252LOG_WARN("fail to get index part", KR(ret), KPC(table_schema_ptr), K(part_idx), K(subpart_idx));
253} else if (OB_ISNULL(part)) {
254ret = OB_INVALID_ARGUMENT;
255LOG_WARN("NULL ptr", KR(ret), KPC(table_schema_ptr), K(part_idx), K(subpart_idx));
256} else {
257if (r == 0) {
258first_table_part = part;
259} else {
260if (OB_UNLIKELY(!first_table_part->same_base_partition(*part))) {
261ret = OB_INVALID_ARGUMENT;
262LOG_WARN("parts in table and index table is not equal", KR(ret), KPC(first_table_part), KPC(part));
263}
264}
265if (OB_FAIL(ret)) {
266} else if (OB_FAIL(tablet_id_array->push_back(part->get_tablet_id()))) {
267LOG_WARN("failed to assign table schema point", KR(ret), K(part_idx), K(subpart_idx));
268}
269}
270}
271}
272return ret;
273}
274int ObTabletDrop::execute()
275{
276int ret = OB_SUCCESS;
277ObTimeoutCtx ctx;
278const int64_t default_timeout_ts = GCONF.rpc_timeout;
279const int64_t SLEEP_INTERVAL = 100 * 1000L; // 100ms
280observer::ObInnerSQLConnection *conn = NULL;
281if (OB_UNLIKELY(!inited_)) {
282ret = OB_NOT_INIT;
283LOG_WARN("ObTabletCreator not init", KR(ret));
284} else if (OB_ISNULL(conn = dynamic_cast<observer::ObInnerSQLConnection *>
285(trans_.get_connection()))) {
286ret = OB_ERR_UNEXPECTED;
287LOG_WARN("conn_ is NULL", KR(ret));
288} else if (OB_UNLIKELY(0 >= args_map_.size())) {
289ret = OB_INVALID_ARGUMENT;
290LOG_WARN("batch arg count is invalid", KR(ret));
291} else {
292FOREACH_X(iter, args_map_, OB_SUCC(ret)) {
293common::ObIArray<ObTabletID> *tablet_ids = iter->second;
294obrpc::ObBatchRemoveTabletArg arg;
295if (OB_ISNULL(tablet_ids)) {
296ret = OB_ERR_UNEXPECTED;
297LOG_WARN("tablet_ids is NULL ptr", KR(ret));
298} else if (tablet_ids->count() < 1) {
299ret = OB_ERR_UNEXPECTED;
300LOG_WARN("tablet_ids is empty", KR(ret));
301} else if (OB_FAIL(share::ObTabletToLSTableOperator::batch_remove(trans_, tenant_id_, *tablet_ids))) {
302LOG_WARN("tablet_ids count less than 1", KR(ret));
303} else if (OB_FAIL(share::ObTabletToTableHistoryOperator::drop_tablet_to_table_history(
304trans_, tenant_id_, schema_version_, *tablet_ids))) {
305LOG_WARN("fail to create tablet to table history", KR(ret), K_(tenant_id), K(schema_version_));
306} else if (OB_FAIL(arg.init(*(tablet_ids), iter->first))) {
307LOG_WARN("failed to find leader", KR(ret), K(iter->first), KPC(tablet_ids));
308} else {
309LOG_INFO("generate remove arg", K(arg), K(lbt()), KPC(tablet_ids));
310int64_t buf_len = arg.get_serialize_size();
311int64_t pos = 0;
312char *buf = (char*)allocator_.alloc(buf_len);
313if (OB_ISNULL(buf)) {
314ret = OB_ALLOCATE_MEMORY_FAILED;
315LOG_WARN("fail alloc memory", KR(ret));
316} else if (OB_FAIL(arg.serialize(buf, buf_len, pos))) {
317LOG_WARN("fail to serialize", KR(ret), K(arg));
318} else if (OB_FAIL(share::ObShareUtil::set_default_timeout_ctx(ctx, default_timeout_ts))) {
319LOG_WARN("fail to set timeout ctx", KR(ret), K(default_timeout_ts));
320} else {
321do {
322if (ctx.is_timeouted()) {
323ret = OB_TIMEOUT;
324LOG_WARN("already timeout", KR(ret), K(ctx));
325} else if (OB_FAIL(conn->register_multi_data_source(tenant_id_, iter->first,
326transaction::ObTxDataSourceType::DELETE_TABLET_NEW_MDS, buf, buf_len))) {
327LOG_WARN("fail to register_tx_data", KR(ret), K(arg), K(buf), K(buf_len));
328if (OB_LS_LOCATION_LEADER_NOT_EXIST == ret || OB_NOT_MASTER == ret) {
329LOG_INFO("fail to find leader, try again", K_(tenant_id), K(arg));
330ob_usleep(SLEEP_INTERVAL);
331}
332}
333} while (OB_LS_LOCATION_LEADER_NOT_EXIST == ret || OB_NOT_MASTER == ret);
334}
335}
336}
337}
338return ret;
339}
340
341} // rootserver
342} // oceanbase
343
344