oceanbase
485 строк · 17.5 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX SHARE_SCHEMA
14
15#include "rootserver/parallel_ddl/ob_index_name_checker.h"
16#include "share/schema/ob_schema_service_sql_impl.h"
17#include "share/schema/ob_multi_version_schema_service.h"
18using namespace oceanbase::lib;
19using namespace oceanbase::common;
20using namespace oceanbase::share;
21using namespace oceanbase::share::schema;
22using namespace oceanbase::rootserver;
23
24ObIndexNameCache::ObIndexNameCache(
25const uint64_t tenant_id,
26common::ObMySQLProxy &sql_proxy)
27: mutex_(common::ObLatchIds::IND_NAME_CACHE_LOCK),
28tenant_id_(tenant_id),
29sql_proxy_(sql_proxy),
30allocator_(ObMemAttr(OB_SYS_TENANT_ID, "IndNameInfo", ObCtxIds::SCHEMA_SERVICE)),
31cache_(ModulePageAllocator(allocator_)),
32loaded_(false)
33{
34}
35
36void ObIndexNameCache::reset_cache()
37{
38lib::ObMutexGuard guard(mutex_);
39(void) inner_reset_cache_();
40}
41
42void ObIndexNameCache::inner_reset_cache_()
43{
44cache_.destroy();
45allocator_.reset();
46loaded_ = false;
47FLOG_INFO("[INDEX NAME CACHE] reset index name map", K_(tenant_id));
48}
49
50int ObIndexNameCache::check_index_name_exist(
51const uint64_t tenant_id,
52const uint64_t database_id,
53const ObString &index_name,
54bool &is_exist)
55{
56int ret = OB_SUCCESS;
57is_exist = false;
58bool is_oracle_mode = false;
59if (OB_UNLIKELY(
60OB_INVALID_TENANT_ID == tenant_id
61|| tenant_id_ != tenant_id
62|| OB_INVALID_ID == database_id
63|| index_name.empty())) {
64ret = OB_INVALID_ARGUMENT;
65LOG_WARN("invalid arg", KR(ret), K(tenant_id_), K(tenant_id), K(database_id), K(index_name));
66} else if (OB_FAIL(ObCompatModeGetter::check_is_oracle_mode_with_tenant_id(
67tenant_id, is_oracle_mode))) {
68LOG_WARN("fail to check is oracle mode", KR(ret), K(tenant_id));
69} else {
70lib::ObMutexGuard guard(mutex_);
71ObString idx_name;
72uint64_t data_table_id = OB_INVALID_ID;
73if (OB_FAIL(try_load_cache_())) {
74LOG_WARN("fail to load index name cache", KR(ret), K(tenant_id));
75} else if (is_recyclebin_database_id(database_id)) {
76idx_name = index_name;
77data_table_id = OB_INVALID_ID;
78} else {
79uint64_t data_table_id = ObSimpleTableSchemaV2::extract_data_table_id_from_index_name(index_name);
80if (OB_INVALID_ID == database_id) {
81ret = OB_INVALID_ARGUMENT;
82LOG_WARN("invalid index name", KR(ret), K(index_name));
83} else if (OB_FAIL(ObSimpleTableSchemaV2::get_index_name(index_name, idx_name))) {
84LOG_WARN("fail to get original index name", KR(ret), K(index_name));
85} else {
86data_table_id = (is_oracle_mode && !is_mysql_sys_database_id(database_id)) ?
87OB_INVALID_ID : data_table_id;
88}
89}
90if (OB_SUCC(ret)) {
91ObIndexSchemaHashWrapper index_name_wrapper(
92tenant_id,
93database_id,
94data_table_id,
95idx_name);
96ObIndexNameInfo *index_name_info = NULL;
97if (OB_FAIL(cache_.get_refactored(index_name_wrapper, index_name_info))) {
98if (OB_HASH_NOT_EXIST == ret) {
99ret = OB_SUCCESS;
100} else {
101LOG_WARN("fail to get index name info", KR(ret), K(index_name_wrapper));
102}
103} else if (OB_ISNULL(index_name_info)) {
104ret = OB_ERR_UNEXPECTED;
105LOG_WARN("index name info is null", KR(ret), K(index_name_wrapper));
106} else {
107is_exist = true;
108LOG_INFO("index name exist", KR(ret), KPC(index_name_info),
109K(database_id), K(index_name), K(data_table_id), K(idx_name));
110// Before call check_index_name_exist(), index_name will be locked by trans first.
111// And add_index_name() will be called before trans commit.
112//
113// It may has garbage when trans commit failed after add_index_name() is called.
114// So, we need to double check if index name actually exists in inner table when confict occurs.
115ObSchemaService *schema_service_impl = NULL;
116uint64_t index_id = OB_INVALID_ID;
117if (OB_ISNULL(schema_service_impl = GSCHEMASERVICE.get_schema_service())) {
118ret = OB_ERR_UNEXPECTED;
119LOG_WARN("schema service impl is null", KR(ret));
120} else if (OB_FAIL(schema_service_impl->get_index_id(
121sql_proxy_, tenant_id, database_id,
122index_name_info->get_index_name(), index_id))) {
123LOG_WARN("fail to get index id", KR(ret), KPC(index_name_info));
124} else if (OB_INVALID_ID != index_id) {
125is_exist = true;
126} else {
127is_exist = false;
128FLOG_INFO("garbage index name exist, should be erased", KPC(index_name_info),
129K(database_id), K(index_name), K(data_table_id), K(idx_name));
130if (OB_FAIL(cache_.erase_refactored(index_name_wrapper))) {
131LOG_WARN("fail to erase key", KR(ret), K(index_name_wrapper));
132if (OB_HASH_NOT_EXIST != ret) {
133(void) inner_reset_cache_();
134}
135}
136}
137}
138}
139}
140return ret;
141}
142
143int ObIndexNameCache::add_index_name(
144const share::schema::ObTableSchema &index_schema)
145{
146int ret = OB_SUCCESS;
147const uint64_t tenant_id = index_schema.get_tenant_id();
148const uint64_t database_id = index_schema.get_database_id();
149const ObString &index_name = index_schema.get_table_name_str();
150const ObTableType table_type = index_schema.get_table_type();
151uint64_t data_table_id = index_schema.get_data_table_id();
152bool is_oracle_mode = false;
153if (OB_UNLIKELY(
154OB_INVALID_TENANT_ID == tenant_id
155|| tenant_id_ != tenant_id
156|| OB_INVALID_ID == database_id
157|| index_name.empty()
158|| !is_index_table(table_type))) {
159ret = OB_INVALID_ARGUMENT;
160LOG_WARN("invalid arg", KR(ret), K(tenant_id_),
161K(tenant_id), K(database_id), K(index_name), K(table_type));
162} else if (OB_UNLIKELY(!is_recyclebin_database_id(database_id)
163&& index_schema.get_origin_index_name_str().empty())) {
164ret = OB_INVALID_ARGUMENT;
165LOG_WARN("invalid index schema", KR(ret), K(index_schema));
166} else if (OB_FAIL(ObCompatModeGetter::check_is_oracle_mode_with_tenant_id(
167tenant_id, is_oracle_mode))) {
168LOG_WARN("fail to check is oracle mode", KR(ret), K(tenant_id));
169} else {
170lib::ObMutexGuard guard(mutex_);
171if (OB_FAIL(try_load_cache_())) {
172LOG_WARN("fail to load index name cache", KR(ret), K(tenant_id));
173} else {
174void *buf = NULL;
175ObIndexNameInfo *index_name_info = NULL;
176ObString idx_name;
177if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObIndexNameInfo)))) {
178ret = OB_ALLOCATE_MEMORY_FAILED;
179LOG_WARN("fail to alloc index name info", KR(ret));
180} else if (FALSE_IT(index_name_info = new (buf) ObIndexNameInfo())) {
181} else if (OB_FAIL(index_name_info->init(allocator_, index_schema))) {
182LOG_WARN("fail to init index name info", KR(ret), K(index_schema));
183} else if (is_recyclebin_database_id(database_id)) {
184data_table_id = OB_INVALID_ID;
185idx_name = index_name_info->get_index_name();
186} else {
187data_table_id = (is_oracle_mode && !is_mysql_sys_database_id(database_id)) ?
188OB_INVALID_ID : index_name_info->get_data_table_id();
189idx_name = index_name_info->get_original_index_name();
190}
191if (OB_SUCC(ret)) {
192int overwrite = 0;
193ObIndexSchemaHashWrapper index_name_wrapper(index_name_info->get_tenant_id(),
194database_id,
195data_table_id,
196idx_name);
197if (OB_FAIL(cache_.set_refactored(index_name_wrapper, index_name_info, overwrite))) {
198LOG_WARN("fail to set refactored", KR(ret), KPC(index_name_info));
199if (OB_HASH_EXIST == ret) {
200ObIndexNameInfo **exist_index_info = cache_.get(index_name_wrapper);
201if (OB_NOT_NULL(exist_index_info) && OB_NOT_NULL(*exist_index_info)) {
202FLOG_ERROR("[INDEX NAME CACHE] duplicated index info exist",
203KR(ret), KPC(index_name_info), KPC(*exist_index_info));
204}
205} else {
206(void) inner_reset_cache_();
207}
208} else {
209FLOG_INFO("[INDEX NAME CACHE] add index name to cache", KR(ret), KPC(index_name_info));
210}
211}
212}
213}
214return ret;
215}
216
217// need protect by mutex_
218int ObIndexNameCache::try_load_cache_()
219{
220int ret = OB_SUCCESS;
221if (loaded_) {
222// do nothing
223} else {
224(void) inner_reset_cache_();
225
226ObRefreshSchemaStatus schema_status;
227schema_status.tenant_id_ = tenant_id_;
228int64_t schema_version = OB_INVALID_VERSION;
229int64_t timeout_ts = OB_INVALID_TIMESTAMP;
230if (OB_FAIL(GSCHEMASERVICE.get_schema_version_in_inner_table(
231sql_proxy_, schema_status, schema_version))) {
232LOG_WARN("fail to get schema version", KR(ret), K(schema_status));
233} else if (!ObSchemaService::is_formal_version(schema_version)) {
234ret = OB_EAGAIN;
235LOG_WARN("schema version is informal, need retry", KR(ret), K(schema_status), K(schema_version));
236} else if (OB_FAIL(ObShareUtil::get_ctx_timeout(GCONF.internal_sql_execute_timeout, timeout_ts))) {
237LOG_WARN("fail to get timeout", KR(ret));
238} else {
239int64_t original_timeout_ts = THIS_WORKER.get_timeout_ts();
240int64_t schema_version = OB_INVALID_VERSION;
241THIS_WORKER.set_timeout_ts(timeout_ts);
242
243ObSchemaGetterGuard guard;
244int64_t start_time = ObTimeUtility::current_time();
245if (OB_FAIL(GSCHEMASERVICE.async_refresh_schema(tenant_id_, schema_version))) {
246LOG_WARN("fail to refresh schema", KR(ret), K_(tenant_id), K(schema_version));
247} else if (OB_FAIL(GSCHEMASERVICE.get_tenant_schema_guard(tenant_id_, guard))) {
248LOG_WARN("fail to get schema guard", KR(ret), K_(tenant_id));
249} else if (OB_FAIL(guard.get_schema_version(tenant_id_, schema_version))) {
250LOG_WARN("fail to get schema version", KR(ret), K_(tenant_id));
251} else if (OB_FAIL(guard.deep_copy_index_name_map(allocator_, cache_))) {
252LOG_WARN("fail to deep copy index name map", KR(ret), K_(tenant_id));
253} else {
254loaded_ = true;
255FLOG_INFO("[INDEX NAME CACHE] load index name map", KR(ret), K_(tenant_id),
256K(schema_version), "cost", ObTimeUtility::current_time() - start_time);
257}
258
259if (OB_FAIL(ret)) {
260(void) inner_reset_cache_();
261LOG_WARN("load index name map failed", KR(ret), K_(tenant_id),
262K(schema_version), "cost", ObTimeUtility::current_time() - start_time);
263}
264
265THIS_WORKER.set_timeout_ts(original_timeout_ts);
266}
267}
268return ret;
269}
270
271ObIndexNameChecker::ObIndexNameChecker()
272: rwlock_(),
273allocator_(ObMemAttr(OB_SYS_TENANT_ID, "IndNameCache", ObCtxIds::SCHEMA_SERVICE)),
274index_name_cache_map_(),
275sql_proxy_(NULL),
276inited_(false)
277{
278}
279
280ObIndexNameChecker::~ObIndexNameChecker()
281{
282destroy();
283}
284
285int ObIndexNameChecker::init(common::ObMySQLProxy &sql_proxy)
286{
287int ret = OB_SUCCESS;
288SpinWLockGuard guard(rwlock_);
289if (inited_) {
290ret = OB_INIT_TWICE;
291LOG_WARN("init twice", KR(ret));
292} else {
293const int64_t BUCKET_NUM = 1024;
294if (OB_FAIL(index_name_cache_map_.create(BUCKET_NUM, "IndNameMap", "IndNameMap"))) {
295LOG_WARN("fail to create hash map", KR(ret));
296} else {
297sql_proxy_ = &sql_proxy;
298inited_ = true;
299}
300}
301return ret;
302}
303
304void ObIndexNameChecker::destroy()
305{
306SpinWLockGuard guard(rwlock_);
307if (inited_) {
308FOREACH(it, index_name_cache_map_) {
309if (OB_NOT_NULL(it->second)) {
310(it->second)->~ObIndexNameCache();
311it->second = NULL;
312}
313}
314index_name_cache_map_.destroy();
315allocator_.reset();
316sql_proxy_ = NULL;
317inited_ = false;
318}
319}
320
321void ObIndexNameChecker::reset_all_cache()
322{
323int ret = OB_SUCCESS;
324SpinRLockGuard guard(rwlock_);
325if (inited_) {
326FOREACH(it, index_name_cache_map_) {
327if (OB_NOT_NULL(it->second)) {
328(void) (it->second)->reset_cache();
329}
330}
331}
332}
333
334int ObIndexNameChecker::reset_cache(const uint64_t tenant_id)
335{
336int ret = OB_SUCCESS;
337SpinRLockGuard guard(rwlock_);
338if (OB_UNLIKELY(!inited_)) {
339ret = OB_NOT_INIT;
340LOG_WARN("not init", KR(ret));
341} else {
342ObIndexNameCache *cache = NULL;
343if (OB_FAIL(index_name_cache_map_.get_refactored(tenant_id, cache))) {
344if (OB_HASH_NOT_EXIST != ret) {
345LOG_WARN("fail to get refactored", KR(ret), K(tenant_id));
346} else {
347// tenant not in cache, just skip
348ret = OB_SUCCESS;
349}
350} else if (OB_ISNULL(cache)) {
351ret = OB_ERR_UNEXPECTED;
352LOG_WARN("cache is null", KR(ret), K(tenant_id));
353} else {
354(void) cache->reset_cache();
355}
356}
357return ret;
358}
359
360int ObIndexNameChecker::check_index_name_exist(
361const uint64_t tenant_id,
362const uint64_t database_id,
363const ObString &index_name,
364bool &is_exist)
365{
366int ret = OB_SUCCESS;
367bool can_skip = false;
368is_exist = false;
369if (OB_UNLIKELY(!inited_)) {
370ret = OB_NOT_INIT;
371LOG_WARN("not init", KR(ret));
372} else if (OB_FAIL(check_tenant_can_be_skipped_(tenant_id, can_skip))) {
373LOG_WARN("fail to check tenant", KR(ret), K(tenant_id));
374} else if (can_skip) {
375// do nothing
376} else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id
377|| OB_INVALID_ID == database_id
378|| index_name.empty())) {
379ret = OB_INVALID_ARGUMENT;
380LOG_WARN("invalid arg", KR(ret), K(tenant_id), K(database_id), K(index_name));
381} else if (OB_FAIL(try_init_index_name_cache_map_(tenant_id))) {
382LOG_WARN("fail to init index name cache", KR(ret), K(tenant_id));
383} else {
384SpinRLockGuard guard(rwlock_);
385ObIndexNameCache *cache = NULL;
386if (OB_FAIL(index_name_cache_map_.get_refactored(tenant_id, cache))) {
387LOG_WARN("fail to get refactored", KR(ret), K(tenant_id));
388} else if (OB_ISNULL(cache)) {
389ret = OB_ERR_UNEXPECTED;
390LOG_WARN("cache is null", KR(ret));
391} else if (OB_FAIL(cache->check_index_name_exist(
392tenant_id, database_id, index_name, is_exist))) {
393LOG_WARN("fail to check index name exist",
394KR(ret), K(tenant_id), K(database_id), K(index_name));
395}
396}
397return ret;
398}
399
400int ObIndexNameChecker::add_index_name(
401const share::schema::ObTableSchema &index_schema)
402{
403int ret = OB_SUCCESS;
404const uint64_t tenant_id = index_schema.get_tenant_id();
405bool can_skip = false;
406if (OB_UNLIKELY(!inited_)) {
407ret = OB_NOT_INIT;
408LOG_WARN("not init", KR(ret));
409} else if (OB_FAIL(check_tenant_can_be_skipped_(tenant_id, can_skip))) {
410LOG_WARN("fail to check tenant", KR(ret), K(tenant_id));
411} else if (can_skip) {
412// do nothing
413} else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) {
414ret = OB_INVALID_ARGUMENT;
415LOG_WARN("invalid arg", KR(ret), K(tenant_id));
416} else if (OB_FAIL(try_init_index_name_cache_map_(tenant_id))) {
417LOG_WARN("fail to init index name cache", KR(ret), K(tenant_id));
418} else {
419SpinRLockGuard guard(rwlock_);
420ObIndexNameCache *cache = NULL;
421if (OB_FAIL(index_name_cache_map_.get_refactored(tenant_id, cache))) {
422LOG_WARN("fail to get refactored", KR(ret), K(tenant_id));
423} else if (OB_ISNULL(cache)) {
424ret = OB_ERR_UNEXPECTED;
425LOG_WARN("cache is null", KR(ret));
426} else if (OB_FAIL(cache->add_index_name(index_schema))) {
427LOG_WARN("fail to add index name", KR(ret), K(index_schema));
428}
429}
430return ret;
431}
432
433// only cache oracle tenant's index name map
434int ObIndexNameChecker::check_tenant_can_be_skipped_(
435const uint64_t tenant_id,
436bool &can_skip)
437{
438int ret = OB_SUCCESS;
439bool is_oracle_mode = false;
440can_skip = false;
441if (is_sys_tenant(tenant_id)
442|| is_meta_tenant(tenant_id)) {
443can_skip = true;
444} else if (OB_FAIL(ObCompatModeGetter::check_is_oracle_mode_with_tenant_id(
445tenant_id, is_oracle_mode))) {
446LOG_WARN("fail to check is oracle mode", KR(ret), K(tenant_id));
447} else {
448can_skip = !is_oracle_mode;
449}
450return ret;
451}
452
453int ObIndexNameChecker::try_init_index_name_cache_map_(const uint64_t tenant_id)
454{
455int ret = OB_SUCCESS;
456SpinWLockGuard guard(rwlock_);
457if (OB_UNLIKELY(!inited_)) {
458ret = OB_NOT_INIT;
459LOG_WARN("not init", KR(ret));
460} else if (OB_ISNULL(sql_proxy_)) {
461ret = OB_ERR_UNEXPECTED;
462LOG_WARN("sql_proxy is null", KR(ret));
463} else {
464ObIndexNameCache *cache = NULL;
465if (OB_FAIL(index_name_cache_map_.get_refactored(tenant_id, cache))) {
466if (OB_HASH_NOT_EXIST != ret) {
467LOG_WARN("fail to get cache", KR(ret), K(tenant_id));
468} else {
469ret = OB_SUCCESS;
470cache = NULL;
471void *buf = NULL;
472if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObIndexNameCache)))) {
473ret = OB_ALLOCATE_MEMORY_FAILED;
474LOG_WARN("fail to alloc memory", KR(ret));
475} else if (FALSE_IT(cache = new (buf) ObIndexNameCache(tenant_id, *sql_proxy_))) {
476} else if (OB_FAIL(index_name_cache_map_.set_refactored(tenant_id, cache))) {
477LOG_WARN("fail to set cache", KR(ret), K(tenant_id));
478}
479}
480} else {
481// cache exist, just skip
482}
483}
484return ret;
485}
486