oceanbase
392 строки · 15.2 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS
14
15#include "ob_dbms_sched_job_utils.h"
16
17#include "lib/ob_errno.h"
18#include "lib/oblog/ob_log_module.h"
19#include "lib/time/ob_time_utility.h"
20#include "lib/string/ob_string.h"
21#include "lib/string/ob_sql_string.h"
22#include "lib/mysqlclient/ob_mysql_proxy.h"
23#include "lib/mysqlclient/ob_mysql_result.h"
24#include "lib/mysqlclient/ob_isql_client.h"
25#include "lib/mysqlclient/ob_mysql_transaction.h"
26#include "rpc/obrpc/ob_rpc_packet.h"
27#include "lib/worker.h"
28#include "share/ob_dml_sql_splicer.h"
29#include "share/inner_table/ob_inner_table_schema_constants.h"
30#include "share/schema/ob_schema_utils.h"
31#include "observer/omt/ob_tenant_config_mgr.h"
32#include "observer/ob_server_struct.h"
33#include "sql/session/ob_sql_session_mgr.h"
34#include "share/schema/ob_schema_getter_guard.h"
35#include "share/stat/ob_dbms_stats_maintenance_window.h"
36
37namespace oceanbase
38{
39
40using namespace common;
41using namespace share;
42using namespace share::schema;
43using namespace sqlclient;
44using namespace sql;
45
46namespace dbms_scheduler
47{
48
49int ObDBMSSchedJobInfo::deep_copy(ObIAllocator &allocator, const ObDBMSSchedJobInfo &other)
50{
51int ret = OB_SUCCESS;
52tenant_id_ = other.tenant_id_;
53job_ = other.job_;
54last_modify_ = other.last_modify_;
55last_date_ = other.last_date_;
56this_date_ = other.this_date_;
57next_date_ = other.next_date_;
58total_ = other.total_;
59failures_ = other.failures_;
60flag_ = other.flag_;
61scheduler_flags_ = other.scheduler_flags_;
62start_date_ = other.start_date_;
63end_date_ = other.end_date_;
64enabled_ = other.enabled_;
65auto_drop_ = other.auto_drop_;
66interval_ts_ = other.interval_ts_;
67is_oracle_tenant_ = other.is_oracle_tenant_;
68max_run_duration_ = other.max_run_duration_;
69
70OZ (ob_write_string(allocator, other.lowner_, lowner_));
71OZ (ob_write_string(allocator, other.powner_, powner_));
72OZ (ob_write_string(allocator, other.cowner_, cowner_));
73
74OZ (ob_write_string(allocator, other.interval_, interval_));
75
76OZ (ob_write_string(allocator, other.what_, what_));
77OZ (ob_write_string(allocator, other.nlsenv_, nlsenv_));
78OZ (ob_write_string(allocator, other.charenv_, charenv_));
79OZ (ob_write_string(allocator, other.field1_, field1_));
80OZ (ob_write_string(allocator, other.exec_env_, exec_env_));
81OZ (ob_write_string(allocator, other.job_name_, job_name_));
82OZ (ob_write_string(allocator, other.job_class_, job_class_));
83OZ (ob_write_string(allocator, other.program_name_, program_name_));
84return ret;
85}
86
87int ObDBMSSchedJobClassInfo::deep_copy(common::ObIAllocator &allocator, const ObDBMSSchedJobClassInfo &other)
88{
89int ret = OB_SUCCESS;
90tenant_id_ = other.tenant_id_;
91is_oracle_tenant_ = other.is_oracle_tenant_;
92log_history_ = other.log_history_;
93OZ (ob_write_string(allocator, other.job_class_name_, job_class_name_));
94OZ (ob_write_string(allocator, other.service_, service_));
95OZ (ob_write_string(allocator, other.resource_consumer_group_, resource_consumer_group_));
96OZ (ob_write_string(allocator, other.logging_level_, logging_level_));
97OZ (ob_write_string(allocator, other.comments_, comments_));
98return ret;
99}
100
101int ObDBMSSchedJobUtils::disable_dbms_sched_job(
102ObISQLClient &sql_client,
103const uint64_t tenant_id,
104const ObString &job_name,
105const bool if_exists)
106{
107int ret = OB_SUCCESS;
108if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || job_name.empty())) {
109ret = OB_INVALID_ARGUMENT;
110LOG_WARN("invalid args", KR(ret), K(tenant_id), K(job_name));
111} else {
112const uint64_t exec_tenant_id = ObSchemaUtils::get_exec_tenant_id(tenant_id);
113ObDMLSqlSplicer dml;
114if (OB_FAIL(dml.add_pk_column(
115"tenant_id", ObSchemaUtils::get_extract_tenant_id(exec_tenant_id, tenant_id)))
116|| OB_FAIL(dml.add_pk_column("job_name", job_name))
117|| OB_FAIL(dml.add_column("enabled", false))) {
118LOG_WARN("add column failed", KR(ret));
119} else {
120ObDMLExecHelper exec(sql_client, exec_tenant_id);
121int64_t affected_rows = 0;
122if (OB_FAIL(exec.exec_update(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, dml, affected_rows))) {
123LOG_WARN("execute update failed", KR(ret));
124} else if (!if_exists && !is_double_row(affected_rows)) {
125ret = OB_ERR_UNEXPECTED;
126LOG_WARN("affected_rows unexpected to be two", KR(ret), K(affected_rows));
127}
128}
129}
130return ret;
131}
132
133int ObDBMSSchedJobUtils::remove_dbms_sched_job(
134ObISQLClient &sql_client,
135const uint64_t tenant_id,
136const ObString &job_name,
137const bool if_exists)
138{
139int ret = OB_SUCCESS;
140if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || job_name.empty())) {
141ret = OB_INVALID_ARGUMENT;
142LOG_WARN("invalid args", KR(ret), K(tenant_id), K(job_name));
143} else {
144const uint64_t exec_tenant_id = ObSchemaUtils::get_exec_tenant_id(tenant_id);
145ObDMLSqlSplicer dml;
146if (OB_FAIL(dml.add_pk_column(
147"tenant_id", ObSchemaUtils::get_extract_tenant_id(exec_tenant_id, tenant_id)))
148|| OB_FAIL(dml.add_pk_column("job_name", job_name))) {
149LOG_WARN("add column failed", KR(ret));
150} else {
151ObDMLExecHelper exec(sql_client, exec_tenant_id);
152int64_t affected_rows = 0;
153if (OB_FAIL(exec.exec_delete(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, dml, affected_rows))) {
154LOG_WARN("execute delete failed", KR(ret));
155} else if (!if_exists && !is_double_row(affected_rows)) {
156ret = OB_ERR_UNEXPECTED;
157LOG_WARN("affected_rows unexpected to be two", KR(ret), K(affected_rows));
158}
159}
160}
161return ret;
162}
163
164int ObDBMSSchedJobUtils::create_dbms_sched_job(
165common::ObISQLClient &sql_client,
166const uint64_t tenant_id,
167const int64_t job_id,
168const dbms_scheduler::ObDBMSSchedJobInfo &job_info)
169{
170int ret = OB_SUCCESS;
171if (OB_INVALID_TENANT_ID == tenant_id) {
172ret = OB_INVALID_ARGUMENT;
173LOG_WARN("invalid tenant id", KR(ret), K(tenant_id));
174} else {
175if (OB_FAIL(add_dbms_sched_job(sql_client, tenant_id, job_id, job_info))) {
176LOG_WARN("failed to add dbms scheduler job", KR(ret));
177} else if (OB_FAIL(add_dbms_sched_job(sql_client, tenant_id, 0, job_info))) {
178LOG_WARN("failed to add dbms scheduler job", KR(ret));
179}
180}
181return ret;
182}
183
184int ObDBMSSchedJobUtils::add_dbms_sched_job(
185common::ObISQLClient &sql_client,
186const uint64_t tenant_id,
187const int64_t job_id,
188const dbms_scheduler::ObDBMSSchedJobInfo &job_info)
189{
190int ret = OB_SUCCESS;
191if (OB_INVALID_TENANT_ID == tenant_id) {
192ret = OB_INVALID_ARGUMENT;
193LOG_WARN("invalid tenant id", KR(ret), K(tenant_id));
194} else {
195ObDMLSqlSplicer dml;
196const uint64_t exec_tenant_id = ObSchemaUtils::get_exec_tenant_id(tenant_id);
197ObDMLExecHelper exec(sql_client, exec_tenant_id);
198int64_t affected_rows = 0;
199const int64_t now = ObTimeUtility::current_time();
200
201OZ (dml.add_gmt_create(now));
202OZ (dml.add_gmt_modified(now));
203OZ (dml.add_pk_column("tenant_id",
204ObSchemaUtils::get_extract_tenant_id(job_info.tenant_id_, job_info.tenant_id_)));
205OZ (dml.add_pk_column("job", job_id));
206OZ (dml.add_column("lowner", ObHexEscapeSqlStr(job_info.lowner_)));
207OZ (dml.add_column("powner", ObHexEscapeSqlStr(job_info.powner_)));
208OZ (dml.add_column("cowner", ObHexEscapeSqlStr(job_info.cowner_)));
209OZ (dml.add_raw_time_column("next_date", job_info.start_date_));
210OZ (dml.add_column("total", 0));
211OZ (dml.add_column("`interval#`", ObHexEscapeSqlStr(
212job_info.repeat_interval_.empty() ? ObString("null") : job_info.repeat_interval_)));
213OZ (dml.add_column("flag", job_info.flag_));
214OZ (dml.add_column("job_name", ObHexEscapeSqlStr(job_info.job_name_)));
215OZ (dml.add_column("job_style", ObHexEscapeSqlStr(job_info.job_style_)));
216OZ (dml.add_column("job_type", ObHexEscapeSqlStr(job_info.job_type_)));
217OZ (dml.add_column("job_class", ObHexEscapeSqlStr(job_info.job_class_)));
218OZ (dml.add_column("job_action", ObHexEscapeSqlStr(job_info.job_action_)));
219OZ (dml.add_column("what", ObHexEscapeSqlStr(job_info.job_action_)));
220OZ (dml.add_raw_time_column("start_date", job_info.start_date_));
221OZ (dml.add_raw_time_column("end_date", job_info.end_date_));
222OZ (dml.add_column("repeat_interval", ObHexEscapeSqlStr(job_info.repeat_interval_)));
223OZ (dml.add_column("enabled", job_info.enabled_));
224OZ (dml.add_column("auto_drop", job_info.auto_drop_));
225OZ (dml.add_column("max_run_duration", job_info.max_run_duration_));
226OZ (dml.add_column("interval_ts", job_info.interval_ts_));
227OZ (dml.add_column("scheduler_flags", job_info.scheduler_flags_));
228OZ (dml.add_column("exec_env", job_info.exec_env_));
229
230if (OB_SUCC(ret) && OB_FAIL(exec.exec_insert(
231OB_ALL_TENANT_SCHEDULER_JOB_TNAME, dml, affected_rows))) {
232LOG_WARN("failed to execute insert", KR(ret));
233} else if (OB_UNLIKELY(!is_single_row(affected_rows))) {
234ret = OB_ERR_UNEXPECTED;
235LOG_WARN("affected_rows unexpected to be one", KR(ret), K(affected_rows));
236}
237}
238return ret;
239}
240
241int ObDBMSSchedJobUtils::init_session(
242ObSQLSessionInfo &session,
243ObSchemaGetterGuard &schema_guard,
244const ObString &tenant_name,
245uint64_t tenant_id,
246const ObString &database_name,
247uint64_t database_id,
248const ObUserInfo* user_info,
249const ObDBMSSchedJobInfo &job_info)
250{
251int ret = OB_SUCCESS;
252ObArenaAllocator *allocator = NULL;
253const bool print_info_log = true;
254const bool is_sys_tenant = true;
255ObPCMemPctConf pc_mem_conf;
256ObObj compatibility_mode;
257ObObj sql_mode;
258if (job_info.is_oracle_tenant_) {
259compatibility_mode.set_int(1);
260sql_mode.set_uint(ObUInt64Type, DEFAULT_ORACLE_MODE);
261} else {
262compatibility_mode.set_int(0);
263sql_mode.set_uint(ObUInt64Type, DEFAULT_MYSQL_MODE);
264}
265OX (session.set_inner_session());
266OZ (session.load_default_sys_variable(print_info_log, is_sys_tenant));
267OZ (session.update_max_packet_size());
268OZ (session.init_tenant(tenant_name.ptr(), tenant_id));
269OZ (session.load_all_sys_vars(schema_guard));
270OZ (session.update_sys_variable(share::SYS_VAR_SQL_MODE, sql_mode));
271OZ (session.update_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, compatibility_mode));
272OZ (session.update_sys_variable(share::SYS_VAR_NLS_DATE_FORMAT,
273ObTimeConverter::COMPAT_OLD_NLS_DATE_FORMAT));
274OZ (session.update_sys_variable(share::SYS_VAR_NLS_TIMESTAMP_FORMAT,
275ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_FORMAT));
276OZ (session.update_sys_variable(share::SYS_VAR_NLS_TIMESTAMP_TZ_FORMAT,
277ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_TZ_FORMAT));
278OZ (session.set_default_database(database_name));
279OZ (session.get_pc_mem_conf(pc_mem_conf));
280CK (OB_NOT_NULL(GCTX.sql_engine_));
281OX (session.set_database_id(database_id));
282OZ (session.set_user(
283user_info->get_user_name(), user_info->get_host_name_str(), user_info->get_user_id()));
284OX (session.set_user_priv_set(OB_PRIV_ALL | OB_PRIV_GRANT));
285if (OB_SUCC(ret) && job_info.is_date_expression_job_class()) {
286// set larger timeout for mview scheduler jobs
287const int64_t QUERY_TIMEOUT_US = (24 * 60 * 60 * 1000000L); // 24hours
288const int64_t TRX_TIMEOUT_US = (24 * 60 * 60 * 1000000L); // 24hours
289ObObj query_timeout_obj;
290ObObj trx_timeout_obj;
291query_timeout_obj.set_int(QUERY_TIMEOUT_US);
292trx_timeout_obj.set_int(TRX_TIMEOUT_US);
293OZ (session.update_sys_variable(SYS_VAR_OB_QUERY_TIMEOUT, query_timeout_obj));
294OZ (session.update_sys_variable(SYS_VAR_OB_TRX_TIMEOUT, trx_timeout_obj));
295}
296return ret;
297}
298
299int ObDBMSSchedJobUtils::init_env(
300ObDBMSSchedJobInfo &job_info,
301ObSQLSessionInfo &session)
302{
303int ret = OB_SUCCESS;
304ObSchemaGetterGuard schema_guard;
305const ObTenantSchema *tenant_info = NULL;
306const ObSysVariableSchema *sys_variable_schema = NULL;
307ObSEArray<const ObUserInfo *, 1> user_infos;
308const ObUserInfo* user_info = NULL;
309const ObDatabaseSchema *database_schema = NULL;
310share::schema::ObUserLoginInfo login_info;
311ObExecEnv exec_env;
312CK (OB_NOT_NULL(GCTX.schema_service_));
313CK (job_info.valid());
314OZ (GCTX.schema_service_->get_tenant_schema_guard(job_info.get_tenant_id(), schema_guard));
315OZ (schema_guard.get_tenant_info(job_info.get_tenant_id(), tenant_info));
316OZ (schema_guard.get_user_info(
317job_info.get_tenant_id(), job_info.get_powner(), user_infos));
318OZ (schema_guard.get_database_schema(
319job_info.get_tenant_id(), job_info.get_cowner(), database_schema));
320if (OB_SUCC(ret) &&
321user_infos.count() > 1 &&
322ObDbmsStatsMaintenanceWindow::is_stats_job(job_info.get_job_name())) {
323OZ(ObDbmsStatsMaintenanceWindow::reset_opt_stats_user_infos(user_infos));
324}
325OV (1 == user_infos.count(), OB_ERR_UNEXPECTED, K(job_info), K(user_infos));
326CK (OB_NOT_NULL(user_info = user_infos.at(0)));
327CK (OB_NOT_NULL(user_info));
328CK (OB_NOT_NULL(tenant_info));
329CK (OB_NOT_NULL(database_schema));
330OZ (exec_env.init(job_info.get_exec_env()));
331OZ (init_session(session,
332schema_guard,
333tenant_info->get_tenant_name(),
334job_info.get_tenant_id(),
335database_schema->get_database_name(),
336database_schema->get_database_id(),
337user_info,
338job_info));
339OZ (exec_env.store(session));
340return ret;
341}
342
343int ObDBMSSchedJobUtils::create_session(
344const uint64_t tenant_id,
345ObFreeSessionCtx &free_session_ctx,
346ObSQLSessionInfo *&session_info)
347{
348int ret = OB_SUCCESS;
349uint32_t sid = sql::ObSQLSessionInfo::INVALID_SESSID;
350uint64_t proxy_sid = 0;
351if (OB_ISNULL(GCTX.session_mgr_)) {
352ret = OB_ERR_UNEXPECTED;
353LOG_WARN("session_mgr_ is null", KR(ret));
354} else if (OB_FAIL(GCTX.session_mgr_->create_sessid(sid))) {
355LOG_WARN("alloc session id failed", KR(ret));
356} else if (OB_FAIL(GCTX.session_mgr_->create_session(
357tenant_id, sid, proxy_sid, ObTimeUtility::current_time(), session_info))) {
358LOG_WARN("create session failed", K(ret), K(sid));
359GCTX.session_mgr_->mark_sessid_unused(sid);
360session_info = NULL;
361} else if (OB_ISNULL(session_info)) {
362ret = OB_ERR_UNEXPECTED;
363LOG_WARN("unexpected session info is null", K(ret));
364} else {
365free_session_ctx.sessid_ = sid;
366free_session_ctx.proxy_sessid_ = proxy_sid;
367}
368return ret;
369}
370
371int ObDBMSSchedJobUtils::destroy_session(
372ObFreeSessionCtx &free_session_ctx,
373ObSQLSessionInfo *session_info)
374{
375int ret = OB_SUCCESS;
376if (OB_ISNULL(GCTX.session_mgr_)) {
377ret = OB_ERR_UNEXPECTED;
378LOG_WARN("session_mgr_ is null", KR(ret));
379} else if (OB_ISNULL(session_info)) {
380ret = OB_INVALID_ARGUMENT;
381LOG_WARN("session_info is null", KR(ret));
382} else {
383session_info->set_session_sleep();
384GCTX.session_mgr_->revert_session(session_info);
385GCTX.session_mgr_->free_session(free_session_ctx);
386GCTX.session_mgr_->mark_sessid_unused(free_session_ctx.sessid_);
387}
388return ret;
389}
390
391} // end for namespace dbms_scheduler
392} // end for namespace oceanbase
393