oceanbase
637 строк · 24.1 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS
14#include "observer/ob_inner_sql_connection.h" //ObInnerSQLConnection
15#include "rootserver/parallel_ddl/ob_ddl_helper.h"
16#include "share/schema/ob_multi_version_schema_service.h"
17#include "share/schema/ob_ddl_sql_service.h"
18#include "share/ob_max_id_fetcher.h"
19#include "storage/tablelock/ob_table_lock_rpc_struct.h" //ObLockObjRequest
20#include "storage/tablelock/ob_lock_inner_connection_util.h" //ObInnerConnectionLockUtil
21
22using namespace oceanbase::lib;
23using namespace oceanbase::common;
24using namespace oceanbase::share;
25using namespace oceanbase::share::schema;
26using namespace oceanbase::rootserver;
27using namespace oceanbase::transaction::tablelock;
28
29ObDDLHelper::ObLockObjPair::ObLockObjPair()
30: obj_id_(0),
31lock_mode_(transaction::tablelock::MAX_LOCK_MODE)
32{
33}
34
35ObDDLHelper::ObLockObjPair::ObLockObjPair(
36const uint64_t obj_id,
37transaction::tablelock::ObTableLockMode lock_mode)
38: obj_id_(obj_id),
39lock_mode_(lock_mode)
40{
41}
42
43int ObDDLHelper::ObLockObjPair::init(
44const uint64_t obj_id,
45transaction::tablelock::ObTableLockMode lock_mode)
46{
47int ret = OB_SUCCESS;
48reset();
49obj_id_ = obj_id;
50lock_mode_ = lock_mode;
51return ret;
52}
53
54void ObDDLHelper::ObLockObjPair::reset()
55{
56obj_id_ = 0;
57lock_mode_ = transaction::tablelock::MAX_LOCK_MODE;
58}
59
60bool ObDDLHelper::ObLockObjPair::less_than(
61const ObLockObjPair &left,
62const ObLockObjPair &right)
63{
64bool bret = false;
65if (left.get_obj_id() != right.get_obj_id()) {
66bret = (left.get_obj_id() < right.get_obj_id());
67} else {
68bret = (left.get_lock_mode() < right.get_lock_mode());
69}
70return bret;
71}
72
73ObDDLHelper::ObDDLHelper(
74share::schema::ObMultiVersionSchemaService *schema_service,
75const uint64_t tenant_id)
76: inited_(false),
77schema_service_(schema_service),
78ddl_service_(NULL),
79sql_proxy_(NULL),
80ddl_trans_controller_(NULL),
81tenant_id_(tenant_id),
82task_id_(common::OB_INVALID_ID),
83schema_version_cnt_(0),
84object_id_cnt_(0),
85trans_(schema_service_,
86false, /*need_end_signal*/
87false, /*enable_query_stash*/
88true /*enable_ddl_parallel*/),
89lock_database_name_map_(),
90lock_object_name_map_(),
91lock_object_id_map_(),
92latest_schema_guard_(schema_service, tenant_id)
93{}
94
95ObDDLHelper::~ObDDLHelper()
96{
97}
98
99int ObDDLHelper::init(rootserver::ObDDLService &ddl_service)
100{
101int ret = OB_SUCCESS;
102if (OB_UNLIKELY(inited_)) {
103ret = OB_INIT_TWICE;
104LOG_WARN("ddl_helper already inited", KR(ret));
105} else if (OB_ISNULL(schema_service_)) {
106ret = OB_INVALID_ARGUMENT;
107LOG_WARN("schema_service is null", KR(ret));
108} else if (OB_FAIL(lock_database_name_map_.create(
109OBJECT_BUCKET_NUM, "LockDBNameMap", "LockDBNameMap"))) {
110LOG_WARN("fail to create lock database name map", KR(ret));
111} else if (OB_FAIL(lock_object_name_map_.create(
112OBJECT_BUCKET_NUM, "LockObjNameMap", "LockObjNameMap"))) {
113LOG_WARN("fail to create lock object name map", KR(ret));
114} else if (OB_FAIL(lock_object_id_map_.create(
115OBJECT_BUCKET_NUM, "LockObjIDMap", "LockObjIDMap"))) {
116LOG_WARN("fail to create lock object id map", KR(ret));
117} else {
118ddl_service_ = &ddl_service;
119sql_proxy_ = &(ddl_service.get_sql_proxy());
120ddl_trans_controller_ = &(schema_service_->get_ddl_trans_controller());
121task_id_ = OB_INVALID_ID;
122schema_version_cnt_ = 0;
123object_id_cnt_ = 0;
124inited_ = true;
125}
126return ret;
127}
128
129int ObDDLHelper::check_inner_stat_()
130{
131int ret = OB_SUCCESS;
132if (OB_UNLIKELY(!inited_)) {
133ret = OB_NOT_INIT;
134LOG_WARN("ddl_helper not init yet", KR(ret));
135} else if (OB_ISNULL(ddl_service_)
136|| OB_ISNULL(sql_proxy_)
137|| OB_ISNULL(schema_service_)
138|| OB_ISNULL(ddl_trans_controller_)) {
139ret = OB_NOT_INIT;
140LOG_WARN("ptr is null", KR(ret), KP_(ddl_service), KP_(schema_service),
141KP_(sql_proxy), K_(ddl_trans_controller));
142} else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id_)) {
143ret = OB_NOT_INIT;
144LOG_WARN("invalid tenant_id", KR(ret), K_(tenant_id));
145}
146return ret;
147}
148
149int ObDDLHelper::start_ddl_trans_()
150{
151int ret = OB_SUCCESS;
152bool with_snapshot = false;
153int64_t fake_schema_version = 1000;
154if (OB_FAIL(check_inner_stat_())) {
155LOG_WARN("fail to check inner stat", KR(ret));
156} else if (OB_FAIL(trans_.start(sql_proxy_, tenant_id_, fake_schema_version, with_snapshot))) {
157LOG_WARN("fail to start trans", KR(ret), K_(tenant_id), K(fake_schema_version), K(with_snapshot));
158}
159RS_TRACE(start_ddl_trans);
160return ret;
161}
162
163int ObDDLHelper::gen_task_id_and_schema_versions_()
164{
165int ret = OB_SUCCESS;
166// just for interface compatibility, schema version can be fetched from TSISchemaVersionGenerator
167ObArray<int64_t> schema_versions;
168int64_t version_cnt = OB_INVALID_INDEX;
169auto *tsi_generator = GET_TSI(TSISchemaVersionGenerator);
170if (OB_FAIL(check_inner_stat_())) {
171LOG_WARN("fail to check inner stat", KR(ret));
172} else if (OB_FAIL(ddl_trans_controller_->create_task_and_assign_schema_version(
173tenant_id_, schema_version_cnt_, task_id_, schema_versions))) {
174LOG_WARN("fail to gen task id and schema_versions", KR(ret), K_(tenant_id), K_(schema_version_cnt));
175} else if (OB_UNLIKELY(OB_INVALID_ID == task_id_
176|| schema_version_cnt_ != schema_versions.count())) {
177ret = OB_INVALID_ARGUMENT;
178LOG_WARN("task_id or schema version cnt not match", KR(ret), K_(tenant_id), K_(task_id),
179K_(schema_version_cnt), "schema_versions_cnt", schema_versions.count());
180} else if (OB_ISNULL(tsi_generator)) {
181ret = OB_ERR_UNEXPECTED;
182LOG_WARN("tsi schema version generator is null", KR(ret));
183} else if (OB_FAIL(tsi_generator->get_version_cnt(version_cnt))) {
184LOG_WARN("fail to get id cnt", KR(ret));
185} else if (OB_UNLIKELY(schema_version_cnt_ != version_cnt)) {
186ret = OB_INVALID_ARGUMENT;
187LOG_WARN("schema version cnt not match", KR(ret), K_(tenant_id), K_(task_id),
188K_(schema_version_cnt), K(version_cnt));
189}
190RS_TRACE(gen_task_id_and_versions);
191return ret;
192}
193
194int ObDDLHelper::serialize_inc_schema_dict_()
195{
196int ret = OB_SUCCESS;
197auto *tsi_generator = GET_TSI(TSISchemaVersionGenerator);
198int64_t start_schema_version = OB_INVALID_VERSION;
199if (OB_FAIL(check_inner_stat_())) {
200LOG_WARN("fail to check inner stat", KR(ret));
201} else if (OB_ISNULL(tsi_generator)) {
202ret = OB_ERR_UNEXPECTED;
203LOG_WARN("tsi schema version generator is null", KR(ret));
204} else if (OB_FAIL(tsi_generator->get_start_version(start_schema_version))) {
205LOG_WARN("fail to get start schema version", KR(ret));
206} else if (OB_FAIL(trans_.serialize_inc_schemas(start_schema_version - 1))) {
207LOG_WARN("fail to serialize inc schemas", KR(ret), K_(tenant_id),
208"start_schema_version", start_schema_version - 1);
209}
210RS_TRACE(inc_schema_dict);
211return ret;
212}
213
214int ObDDLHelper::wait_ddl_trans_()
215{
216int ret = OB_SUCCESS;
217ObTimeoutCtx ctx;
218const int64_t DEFAULT_TS = 10 * 1000 * 1000L; // 10s
219if (OB_FAIL(check_inner_stat_())) {
220LOG_WARN("fail to check inner stat", KR(ret));
221} else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, DEFAULT_TS))) {
222LOG_WARN("fail to set default ts", KR(ret));
223} else if (OB_FAIL(ddl_trans_controller_->wait_task_ready(tenant_id_, task_id_, ctx.get_timeout()))) {
224LOG_WARN("fail to wait ddl trans", KR(ret), K_(tenant_id), K_(task_id));
225}
226RS_TRACE(wait_ddl_trans);
227return ret;
228}
229
230// this function should be always called
231int ObDDLHelper::end_ddl_trans_(const int return_ret)
232{
233int ret = return_ret;
234
235// write 1503 ddl operation
236if (OB_SUCC(ret)) {
237auto *tsi_generator = GET_TSI(TSISchemaVersionGenerator);
238int64_t version_cnt = OB_INVALID_INDEX;
239int64_t boundary_schema_version = OB_INVALID_VERSION;
240share::schema::ObSchemaService *schema_service_impl = NULL;
241if (OB_ISNULL(tsi_generator)) {
242ret = OB_ERR_UNEXPECTED;
243LOG_WARN("tsi schema version generator is null", KR(ret));
244} else if (OB_FAIL(tsi_generator->get_version_cnt(version_cnt))) {
245LOG_WARN("fail to get version cnt", KR(ret), K(version_cnt));
246} else if (0 == version_cnt) {
247// no schema change, just skip
248} else if (OB_UNLIKELY(version_cnt < 2)) {
249ret = OB_ERR_UNEXPECTED;
250LOG_WARN("not enough version cnt for boudary ddl operation", KR(ret), K(version_cnt));
251} else if (OB_ISNULL(schema_service_)
252|| OB_ISNULL(schema_service_impl = schema_service_->get_schema_service())) {
253ret = OB_ERR_UNEXPECTED;
254LOG_WARN("ptr is null", KR(ret), KP_(schema_service));
255} else if (OB_FAIL(schema_service_->gen_new_schema_version(tenant_id_, boundary_schema_version))) {
256LOG_WARN("fail to gen new schema version", KR(ret), K_(tenant_id));
257} else {
258share::schema::ObDDLSqlService ddl_sql_service(*schema_service_impl);
259obrpc::ObDDLNopOpreatorArg arg;
260arg.schema_operation_.op_type_ = OB_DDL_END_SIGN;
261arg.schema_operation_.tenant_id_ = tenant_id_;
262if (OB_FAIL(ddl_sql_service.log_nop_operation(arg.schema_operation_,
263boundary_schema_version,
264NULL,
265trans_))) {
266LOG_WARN("fail to log ddl operation", KR(ret), K(arg));
267}
268}
269}
270
271if (trans_.is_started()) {
272int tmp_ret = OB_SUCCESS;
273bool is_commit = OB_SUCC(ret);
274if (OB_TMP_FAIL(trans_.end(is_commit))) {
275LOG_WARN("trans end failed", KR(ret), KR(tmp_ret), K(is_commit));
276ret = is_commit ? tmp_ret : ret;
277}
278}
279if (OB_NOT_NULL(ddl_trans_controller_) && OB_INVALID_ID != task_id_) {
280ddl_trans_controller_->remove_task(tenant_id_, task_id_);
281}
282RS_TRACE(end_ddl_trans);
283return ret;
284}
285
286int ObDDLHelper::execute()
287{
288return OB_NOT_IMPLEMENT;
289/*
290* Implement of parallel ddl should has following actions:
291*
292* ----------------------------------------------
293* 1. start ddl trans:
294* - to be exclusive with non-parallel ddl.
295* - to be concurrent with other parallel ddl.
296*
297* if (OB_FAIL(start_ddl_trans_())) {
298* LOG_WARN("fail to start ddl trans", KR(ret));
299* }
300*
301* ----------------------------------------------
302* 2. lock object by name/object_id
303* - to be exclusive with other parallel ddl which involving the same objects.
304* - lock object in trans
305* Attension:
306* 1) lock objects just for mutual exclusion, should check if related objects changed after acquire locks.
307* 2) For same object, lock object by name first. After that, lock object by id if it's neccessary.
308*
309* ----------------------------------------------
310* 3. fetch & generate schema:
311* - fetch the latest schemas from inner table.
312* - generate schema with arg and the latests schemas.
313*
314* ----------------------------------------------
315* 4. register task id & generate schema versions:
316* - generate an appropriate number of schema versions for this DDL and register task id.
317* - concurrent DDL trans will be committed in descending order of version later.
318*
319* if (FAILEDx(gen_task_id_and_schema_versions_())) {
320* LOG_WARN("fail to gen task id and schema versions", KR(ret));
321* }
322*
323* ----------------------------------------------
324* 5. create schema:
325* - persist schema in inner table.
326*
327* ----------------------------------------------
328* 6. [optional] serialize increment data dictionary:
329* - if table/database/tenant schema changed, records changed schemas in log and commits with DDL trans.
330*
331* if (FAILEDx(serialize_inc_schema_dict_())) {
332* LOG_WARN("fail to serialize inc schema dict", KR(ret));
333* }
334*
335* ----------------------------------------------
336* 7. wait concurrent ddl trans ended:
337* - wait concurrent DDL trans with smallest schema version ended.
338*
339* if (FAILEDx(wait_ddl_trans_())) {
340* LOG_WARN(fail to wait ddl trans, KR(ret));
341* }
342*
343* ----------------------------------------------
344* 8. end ddl trans:
345* - abort/commit ddl trans.
346*
347* if (OB_FAIL(end_ddl_trans_(ret))) { // won't overwrite ret
348* LOG_WARN("fail to end ddl trans", KR(ret));
349* }
350*/
351}
352
353int ObDDLHelper::add_lock_object_to_map_(
354const uint64_t lock_obj_id,
355const transaction::tablelock::ObTableLockMode lock_mode,
356ObjectLockMap &lock_map)
357{
358
359int ret = OB_SUCCESS;
360if (OB_FAIL(check_inner_stat_())) {
361LOG_WARN("fail to check inner stat", KR(ret));
362} else if (OB_UNLIKELY(transaction::tablelock::SHARE != lock_mode
363&& transaction::tablelock::EXCLUSIVE != lock_mode)) {
364ret = OB_NOT_SUPPORTED;
365LOG_WARN("not support lock mode to lock object by name", KR(ret), K(lock_mode));
366} else {
367bool need_update = false;
368transaction::tablelock::ObTableLockMode existed_lock_mode = transaction::tablelock::MAX_LOCK_MODE;
369if (OB_FAIL(lock_map.get_refactored(lock_obj_id, existed_lock_mode))) {
370if (OB_HASH_NOT_EXIST == ret) {
371ret = OB_SUCCESS;
372need_update = true;
373} else {
374LOG_WARN("fail to get lock object from map", KR(ret), K(lock_obj_id));
375}
376} else if (transaction::tablelock::SHARE == existed_lock_mode
377&& transaction::tablelock::EXCLUSIVE == lock_mode) {
378// upgrade lock
379need_update = true;
380}
381
382if (OB_SUCC(ret) && need_update) {
383int overwrite = 1;
384if (OB_FAIL(lock_map.set_refactored(lock_obj_id, lock_mode, overwrite))) {
385LOG_WARN("fail to set lock object to map", KR(ret), K(lock_obj_id), K(lock_mode));
386}
387}
388}
389return ret;
390}
391
392int ObDDLHelper::lock_objects_in_map_(
393const transaction::tablelock::ObLockOBJType obj_type,
394ObjectLockMap &lock_map)
395{
396int ret = OB_SUCCESS;
397ObArray<ObLockObjPair> lock_pairs;
398const int64_t lock_cnt = lock_map.size();
399ObTimeoutCtx ctx;
400observer::ObInnerSQLConnection *conn = NULL;
401if (OB_FAIL(check_inner_stat_())) {
402LOG_WARN("fail to check inner stat", KR(ret));
403} else if (OB_UNLIKELY(lock_cnt < 0)) {
404ret = OB_INVALID_ARGUMENT;
405LOG_WARN("unexpected lock cnt", KR(ret), K(lock_cnt));
406} else if (0 == lock_cnt) {
407// skip
408} else if (OB_FAIL(lock_pairs.reserve(lock_cnt))) {
409LOG_WARN("fail to reserve lock pairs", KR(ret), K(lock_cnt));
410} else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) {
411LOG_WARN("fail to set timeout ctx", KR(ret));
412} else if (OB_ISNULL(conn = dynamic_cast<observer::ObInnerSQLConnection *>
413(trans_.get_connection()))) {
414ret = OB_ERR_UNEXPECTED;
415LOG_WARN("trans conn is NULL", KR(ret));
416} else {
417ObLockObjPair pair;
418FOREACH_X(it, lock_map, OB_SUCC(ret)) {
419if (OB_FAIL(pair.init(it->first, it->second))) {
420LOG_WARN("fail to init lock pair", KR(ret),
421"obj_id", it->first, "lock_mode", it->second);
422} else if (OB_FAIL(lock_pairs.push_back(pair))) {
423LOG_WARN("fail to push back lock pair", KR(ret), K(pair));
424}
425} // end foreach
426if (OB_SUCC(ret)) {
427std::sort(lock_pairs.begin(), lock_pairs.end(), ObLockObjPair::less_than);
428FOREACH_X(it, lock_pairs, OB_SUCC(ret)) {
429const int64_t timeout = ctx.get_timeout();
430if (OB_UNLIKELY(timeout <= 0)) {
431ret = OB_TIMEOUT;
432LOG_WARN("already timeout", KR(ret), K(timeout));
433} else {
434transaction::tablelock::ObLockObjRequest lock_arg;
435lock_arg.obj_type_ = obj_type;
436lock_arg.owner_id_ = ObTableLockOwnerID(0);
437lock_arg.obj_id_ = it->get_obj_id();
438lock_arg.lock_mode_ = it->get_lock_mode();
439lock_arg.op_type_ = ObTableLockOpType::IN_TRANS_COMMON_LOCK;
440lock_arg.timeout_us_ = timeout;
441LOG_INFO("try lock object", KR(ret), K(lock_arg));
442if (OB_FAIL(ObInnerConnectionLockUtil::lock_obj(tenant_id_, lock_arg, conn))) {
443LOG_WARN("lock obj failed", KR(ret), K_(tenant_id), K(lock_arg));
444}
445}
446} // end foreach
447}
448}
449(void) lock_map.clear();
450return ret;
451}
452
453int ObDDLHelper::add_lock_object_by_database_name_(
454const ObString &database_name,
455const transaction::tablelock::ObTableLockMode lock_mode)
456{
457int ret = OB_SUCCESS;
458if (OB_FAIL(check_inner_stat_())) {
459LOG_WARN("fail to check inner stat", KR(ret));
460} else if (OB_UNLIKELY(database_name.empty())) {
461ret = OB_INVALID_ARGUMENT;
462LOG_WARN("database_name is invalid", KR(ret), K(database_name));
463} else {
464// use OB_ORIGIN_AND_INSENSITIVE and ignore end space to make more conficts for safety.
465common::ObCollationType cs_type = ObSchema::get_cs_type_with_cmp_mode(OB_ORIGIN_AND_INSENSITIVE);
466bool calc_end_space = false;
467uint64_t lock_obj_id = 0;
468lock_obj_id = common::ObCharset::hash(
469cs_type, database_name.ptr(), database_name.length(),
470lock_obj_id, calc_end_space, NULL);
471if (OB_FAIL(add_lock_object_to_map_(lock_obj_id, lock_mode, lock_database_name_map_))) {
472LOG_WARN("fail to add lock object to map", KR(ret), K(lock_obj_id), K(lock_mode));
473}
474LOG_INFO("add lock object by database name", KR(ret), K(database_name), K(lock_mode), K(lock_obj_id));
475}
476return ret;
477}
478
479int ObDDLHelper::lock_databases_by_name_()
480{
481return lock_objects_in_map_(ObLockOBJType::OBJ_TYPE_DATABASE_NAME, lock_database_name_map_);
482}
483
484int ObDDLHelper::add_lock_object_by_name_(
485const ObString &database_name,
486const ObString &object_name,
487const share::schema::ObSchemaType schema_type,
488const transaction::tablelock::ObTableLockMode lock_mode)
489{
490int ret = OB_SUCCESS;
491if (OB_FAIL(check_inner_stat_())) {
492LOG_WARN("fail to check inner stat", KR(ret));
493} else if (OB_UNLIKELY(database_name.empty() || object_name.empty())) {
494ret = OB_INVALID_ARGUMENT;
495LOG_WARN("database_name/object_name is invalid", KR(ret), K(database_name), K(object_name));
496} else {
497// 1. use OB_ORIGIN_AND_INSENSITIVE and ignore end space to make more conficts for safety.
498// 2. encoded with database name to make less conficts between different databases/users.
499common::ObCollationType cs_type = ObSchema::get_cs_type_with_cmp_mode(OB_ORIGIN_AND_INSENSITIVE);
500bool calc_end_space = false;
501uint64_t lock_obj_id = 0;
502lock_obj_id = common::ObCharset::hash(
503cs_type, database_name.ptr(), database_name.length(),
504lock_obj_id, calc_end_space, NULL);
505lock_obj_id = common::ObCharset::hash(
506cs_type, object_name.ptr(), object_name.length(),
507lock_obj_id, calc_end_space, NULL);
508if (OB_FAIL(add_lock_object_to_map_(lock_obj_id, lock_mode, lock_object_name_map_))) {
509LOG_WARN("fail to add lock object to map", KR(ret), K(lock_obj_id), K(lock_mode));
510}
511LOG_INFO("add lock object by name", KR(ret), K(database_name),
512K(object_name), K(schema_type), K(lock_mode), K(lock_obj_id));
513}
514return ret;
515}
516
517int ObDDLHelper::lock_existed_objects_by_name_()
518{
519return lock_objects_in_map_(ObLockOBJType::OBJ_TYPE_OBJECT_NAME, lock_object_name_map_);
520}
521
522int ObDDLHelper::add_lock_object_by_id_(
523const uint64_t lock_obj_id,
524const share::schema::ObSchemaType schema_type,
525const transaction::tablelock::ObTableLockMode lock_mode)
526{
527int ret = OB_SUCCESS;
528if (OB_FAIL(check_inner_stat_())) {
529LOG_WARN("fail to check inner stat", KR(ret));
530} else if (OB_UNLIKELY(OB_INVALID_ID == lock_obj_id)) {
531ret = OB_INVALID_ARGUMENT;
532LOG_WARN("object_id is invalid", KR(ret), K(lock_obj_id));
533} else if (OB_FAIL(add_lock_object_to_map_(lock_obj_id, lock_mode, lock_object_id_map_))) {
534LOG_WARN("fail to add lock object to map", KR(ret), K(lock_obj_id), K(lock_mode));
535}
536LOG_INFO("add lock object by id", KR(ret), K(lock_obj_id), K(schema_type), K(lock_mode));
537return ret;
538}
539
540int ObDDLHelper::lock_existed_objects_by_id_()
541{
542return lock_objects_in_map_(ObLockOBJType::OBJ_TYPE_COMMON_OBJ, lock_object_id_map_);
543}
544
545// 1. constraint name and foreign key name are in the same namespace in oracle tenant.
546// 2. constraint name and foreign key name are in different namespace in mysql tenant.
547int ObDDLHelper::check_constraint_name_exist_(
548const share::schema::ObTableSchema &table_schema,
549const common::ObString &constraint_name,
550const bool is_foreign_key,
551bool &exist)
552{
553int ret = OB_SUCCESS;
554bool is_oracle_mode = false;
555uint64_t constraint_id = OB_INVALID_ID;
556const uint64_t database_id = table_schema.get_database_id();
557exist = false;
558if (OB_FAIL(check_inner_stat_())) {
559LOG_WARN("fail to check inner stat", KR(ret));
560} else if (OB_FAIL(table_schema.check_if_oracle_compat_mode(is_oracle_mode))) {
561LOG_WARN("check if oracle compat mode failed", KR(ret), K_(tenant_id));
562} else {
563const bool check_fk = (is_oracle_mode || is_foreign_key);
564if (OB_SUCC(ret) && check_fk) {
565if (OB_FAIL(latest_schema_guard_.get_foreign_key_id(
566database_id, constraint_name, constraint_id))) {
567LOG_WARN("fail to get foreign key id", KR(ret), K_(tenant_id), K(database_id), K(constraint_name));
568} else if (OB_INVALID_ID != constraint_id) {
569exist = true;
570}
571}
572const bool check_cst = (is_oracle_mode || !is_foreign_key);
573if (OB_SUCC(ret) && !exist && check_cst) {
574if (table_schema.is_mysql_tmp_table()) {
575// tmp table in mysql mode, do nothing
576} else if (OB_FAIL(latest_schema_guard_.get_constraint_id(
577database_id, constraint_name, constraint_id))) {
578LOG_WARN("fail to get constraint id", KR(ret), K_(tenant_id), K(database_id), K(constraint_name));
579} else if (OB_INVALID_ID != constraint_id) {
580exist = true;
581}
582}
583}
584return ret;
585}
586
587int ObDDLHelper::gen_object_ids_(
588const int64_t object_cnt,
589share::ObIDGenerator &id_generator)
590{
591int ret = OB_SUCCESS;
592if (OB_FAIL(check_inner_stat_())) {
593LOG_WARN("fail to check inner stat", KR(ret));
594} else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id_
595|| object_cnt < 0)) {
596ret = OB_INVALID_ARGUMENT;
597LOG_WARN("invalid tenant_id or object_cnt", KR(ret), K_(tenant_id), K(object_cnt));
598} else if (0 == object_cnt) {
599// skip
600} else {
601uint64_t max_object_id = OB_INVALID_ID;
602uint64_t min_object_id = OB_INVALID_ID;
603share::schema::ObSchemaService *schema_service_impl = NULL;
604if (OB_ISNULL(schema_service_)
605|| OB_ISNULL(schema_service_impl = schema_service_->get_schema_service())) {
606ret = OB_ERR_UNEXPECTED;
607LOG_WARN("ptr is null", KR(ret), KP_(schema_service));
608} else if (OB_FAIL(schema_service_impl->fetch_new_object_ids(tenant_id_, object_cnt, max_object_id))) {
609LOG_WARN("fail to fetch new object ids", KR(ret), K_(tenant_id), K(object_cnt));
610} else if (OB_UNLIKELY(OB_INVALID_ID == max_object_id)) {
611ret = OB_ERR_UNEXPECTED;
612LOG_WARN("object_id is invalid", KR(ret), K_(tenant_id), K(object_cnt));
613} else if (0 >= (min_object_id = max_object_id - object_cnt + 1)) {
614ret = OB_ERR_UNEXPECTED;
615LOG_WARN("min_object_id should be greator than 0",
616KR(ret), K_(tenant_id), K(min_object_id), K(max_object_id), K(object_cnt));
617} else if (OB_FAIL(id_generator.init(1 /*step*/, min_object_id, max_object_id))) {
618LOG_WARN("fail to init id generator", KR(ret), K_(tenant_id),
619K(min_object_id), K(max_object_id), K(object_cnt));
620}
621}
622return ret;
623}
624
625int ObDDLHelper::gen_partition_object_and_tablet_ids_(
626ObIArray<ObTableSchema> &table_schemas)
627{
628int ret = OB_SUCCESS;
629if (OB_FAIL(check_inner_stat_())) {
630LOG_WARN("fail to check inner stat", KR(ret));
631} else if (OB_FAIL(ddl_service_->generate_object_id_for_partition_schemas(table_schemas))) {
632LOG_WARN("fail to generate object_ids", KR(ret));
633} else if (OB_FAIL(ddl_service_->generate_tables_tablet_id(table_schemas))) {
634LOG_WARN("fail to generate tablet_ids", KR(ret));
635}
636return ret;
637}
638