2
* Copyright (c) 2021 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
13
#define USING_LOG_PREFIX SERVER
15
#include "ob_dbms_job_utils.h"
16
#include "ob_dbms_job_executor.h"
18
#include "lib/oblog/ob_log.h"
19
#include "lib/mysqlclient/ob_isql_connection.h"
20
#include "share/ob_define.h"
21
#include "share/ob_errno.h"
22
#include "share/schema/ob_schema_getter_guard.h"
24
#include "observer/ob_inner_sql_connection_pool.h"
25
#include "sql/session/ob_sql_session_info.h"
26
#include "sql/ob_sql.h"
27
#ifdef OB_BUILD_AUDIT_SECURITY
28
#include "pl/sys_package/ob_dbms_audit_mgmt.h"
33
using namespace common;
34
using namespace common::sqlclient;
35
using namespace share::schema;
36
using namespace observer;
42
int ObDBMSJobExecutor::init(
43
common::ObMySQLProxy *sql_proxy, ObMultiVersionSchemaService *schema_service)
48
LOG_WARN("job scheduler executor already init", K(inited_), K(ret));
49
} else if (OB_ISNULL(sql_proxy_ = sql_proxy)
50
|| OB_ISNULL(schema_service_ = schema_service)) {
51
ret = OB_INVALID_ARGUMENT;
52
LOG_WARN("sql proxy or schema service is null", K(sql_proxy), K(ret));
53
} else if (OB_FAIL(job_utils_.init(sql_proxy_))) {
54
LOG_WARN("fail to init action record", K(ret));
61
int ObDBMSJobExecutor::init_session(
62
sql::ObSQLSessionInfo &session,
63
ObSchemaGetterGuard &schema_guard,
64
const ObString &tenant_name, uint64_t tenant_id,
65
const ObString &database_name, uint64_t database_id,
66
const ObUserInfo* user_info,
70
ObObj oracle_sql_mode;
71
ObArenaAllocator *allocator = NULL;
72
const bool print_info_log = true;
73
const bool is_sys_tenant = true;
74
ObPCMemPctConf pc_mem_conf;
76
oracle_mode.set_int(1);
77
oracle_sql_mode.set_uint(ObUInt64Type, DEFAULT_ORACLE_MODE);
78
OZ (session.init(1, 1, allocator));
79
OX (session.set_inner_session());
80
OZ (session.load_default_sys_variable(print_info_log, is_sys_tenant));
81
OZ (session.update_max_packet_size());
82
OZ (session.init_tenant(tenant_name.ptr(), tenant_id));
83
OZ (session.load_all_sys_vars(schema_guard));
84
OZ (session.update_sys_variable(share::SYS_VAR_SQL_MODE, oracle_sql_mode));
85
OZ (session.update_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, oracle_mode));
86
OZ (session.update_sys_variable(share::SYS_VAR_NLS_DATE_FORMAT,
87
ObTimeConverter::COMPAT_OLD_NLS_DATE_FORMAT));
88
OZ (session.update_sys_variable(share::SYS_VAR_NLS_TIMESTAMP_FORMAT,
89
ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_FORMAT));
90
OZ (session.update_sys_variable(share::SYS_VAR_NLS_TIMESTAMP_TZ_FORMAT,
91
ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_TZ_FORMAT));
92
OZ (session.set_default_database(database_name));
93
OZ (session.get_pc_mem_conf(pc_mem_conf));
94
CK (OB_NOT_NULL(GCTX.sql_engine_));
95
OX (session.set_database_id(database_id));
97
user_info->get_user_name(), user_info->get_host_name_str(), user_info->get_user_id()));
98
OX (session.set_user_priv_set(OB_PRIV_ALL | OB_PRIV_GRANT));
99
OX (session.init_use_rich_format());
101
OZ (exec_env.store(session));
105
int ObDBMSJobExecutor::init_env(ObDBMSJobInfo &job_info, ObSQLSessionInfo &session)
107
int ret = OB_SUCCESS;
108
ObSchemaGetterGuard schema_guard;
109
const ObTenantSchema *tenant_info = NULL;
110
const ObSysVariableSchema *sys_variable_schema = NULL;
111
ObSEArray<const ObUserInfo *, 1> user_infos;
112
const ObUserInfo* user_info = NULL;
113
const ObDatabaseSchema *database_schema = NULL;
114
share::schema::ObUserLoginInfo login_info;
116
CK (OB_NOT_NULL(schema_service_));
117
CK (job_info.valid());
118
OZ (schema_service_->get_tenant_schema_guard(job_info.get_tenant_id(), schema_guard));
119
OZ (schema_guard.get_tenant_info(job_info.get_tenant_id(), tenant_info));
120
OZ (schema_guard.get_user_info(
121
job_info.get_tenant_id(), job_info.get_lowner(), user_infos));
122
OZ (schema_guard.get_database_schema(
123
job_info.get_tenant_id(), job_info.get_lowner(), database_schema));
124
OV (1 == user_infos.count(), OB_ERR_UNEXPECTED, K(job_info));
125
CK (OB_NOT_NULL(user_info = user_infos.at(0)));
126
CK (OB_NOT_NULL(user_info));
127
CK (OB_NOT_NULL(tenant_info));
128
CK (OB_NOT_NULL(database_schema));
129
OZ (exec_env.init(job_info.get_exec_env()));
130
OZ (init_session(session,
132
tenant_info->get_tenant_name(),
133
job_info.get_tenant_id(),
134
database_schema->get_database_name(),
135
database_schema->get_database_id(),
141
int ObDBMSJobExecutor::run_dbms_job(
142
uint64_t tenant_id, ObDBMSJobInfo &job_info, ObIAllocator &allocator)
144
int ret = OB_SUCCESS;
145
#ifndef OB_BUILD_ORACLE_PL
146
UNUSEDx(tenant_id, job_info, allocator);
147
ret = OB_NOT_SUPPORTED;
148
LOG_WARN("not support", K(ret));
151
ObInnerSQLConnectionPool *pool = NULL;
152
ObInnerSQLConnection *conn = NULL;
153
SMART_VAR(ObSQLSessionInfo, session) {
154
int64_t affected_rows = 0;
155
CK (OB_LIKELY(inited_));
156
CK (OB_NOT_NULL(sql_proxy_));
157
CK (sql_proxy_->is_inited());
159
if (job_info.is_mysql_audit_job()) {
160
pl::ObDbmsAuditMgmt::ObAuditParam audit_param;
161
OZ (pl::ObDbmsAuditMgmt::parse_mysql_job_param(job_info.get_what(), audit_param));
162
OZ (pl::ObDbmsAuditMgmt::clean_audit_trail_impl(audit_param, allocator, sql_proxy_, tenant_id), audit_param);
164
ObOracleSqlProxy oracle_proxy(*(static_cast<ObMySQLProxy *>(sql_proxy_)));
165
CK (job_info.valid());
166
CK (job_info.get_what().length() != 0);
167
OZ (what.append_fmt("BEGIN %.*s END;",
168
job_info.get_what().length(), job_info.get_what().ptr()));
169
CK (OB_NOT_NULL(pool = static_cast<ObInnerSQLConnectionPool *>(oracle_proxy.get_pool())));
170
OZ (init_env(job_info, session));
171
OZ (pool->acquire_spi_conn(&session, conn));
172
OZ (conn->execute_write(tenant_id, what.string().ptr(), affected_rows));
173
if (OB_NOT_NULL(conn)) {
174
sql_proxy_->close(conn, ret);
183
int ObDBMSJobExecutor::run_dbms_job(uint64_t tenant_id, uint64_t job_id)
185
int ret = OB_SUCCESS;
186
ObDBMSJobInfo job_info;
187
ObArenaAllocator allocator;
189
THIS_WORKER.set_timeout_ts(INT64_MAX);
191
OZ (job_utils_.get_dbms_job_info(tenant_id, job_id, allocator, job_info));
194
OZ (job_utils_.update_for_start(tenant_id, job_info));
196
OZ (run_dbms_job(tenant_id, job_info, allocator));
198
int tmp_ret = OB_SUCCESS;
199
ObString errmsg = common::ob_get_tsi_err_msg(ret);
200
if (errmsg.empty() && ret != OB_SUCCESS) {
201
errmsg = ObString(strlen(ob_errpkt_strerror(ret, lib::is_oracle_mode())),
202
ob_errpkt_strerror(ret, lib::is_oracle_mode()));
204
if ((tmp_ret = job_utils_.update_for_end(tenant_id, job_info, ret, errmsg)) != OB_SUCCESS) {
205
LOG_WARN("update dbms job failed", K(tmp_ret), K(ret));
207
ret = OB_SUCCESS == ret ? tmp_ret : ret;