oceanbase

Форк
0
/
ob_dbms_job_executor.cpp 
213 строк · 7.6 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER
14

15
#include "ob_dbms_job_utils.h"
16
#include "ob_dbms_job_executor.h"
17

18
#include "lib/oblog/ob_log.h"
19
#include "lib/mysqlclient/ob_isql_connection.h"
20
#include "share/ob_define.h"
21
#include "share/ob_errno.h"
22
#include "share/schema/ob_schema_getter_guard.h"
23

24
#include "observer/ob_inner_sql_connection_pool.h"
25
#include "sql/session/ob_sql_session_info.h"
26
#include "sql/ob_sql.h"
27
#ifdef OB_BUILD_AUDIT_SECURITY
28
#include "pl/sys_package/ob_dbms_audit_mgmt.h"
29
#endif
30

31
namespace oceanbase
32
{
33
using namespace common;
34
using namespace common::sqlclient;
35
using namespace share::schema;
36
using namespace observer;
37
using namespace sql;
38

39
namespace dbms_job
40
{
41

42
int ObDBMSJobExecutor::init(
43
  common::ObMySQLProxy *sql_proxy, ObMultiVersionSchemaService *schema_service)
44
{
45
  int ret = OB_SUCCESS;
46
  if (inited_) {
47
    ret = OB_INIT_TWICE;
48
    LOG_WARN("job scheduler executor already init", K(inited_), K(ret));
49
  } else if (OB_ISNULL(sql_proxy_ = sql_proxy)
50
          || OB_ISNULL(schema_service_ = schema_service)) {
51
    ret = OB_INVALID_ARGUMENT;
52
    LOG_WARN("sql proxy or schema service is null", K(sql_proxy), K(ret));
53
  } else if (OB_FAIL(job_utils_.init(sql_proxy_))) {
54
    LOG_WARN("fail to init action record", K(ret));
55
  } else {
56
    inited_ = true;
57
  }
58
  return ret;
59
}
60

61
int ObDBMSJobExecutor::init_session(
62
  sql::ObSQLSessionInfo &session,
63
  ObSchemaGetterGuard &schema_guard,
64
  const ObString &tenant_name, uint64_t tenant_id,
65
  const ObString &database_name, uint64_t database_id,
66
  const ObUserInfo* user_info,
67
  ObExecEnv &exec_env)
68
{
69
  int ret = OB_SUCCESS;
70
  ObObj oracle_sql_mode;
71
  ObArenaAllocator *allocator = NULL;
72
  const bool print_info_log = true;
73
  const bool is_sys_tenant = true;
74
  ObPCMemPctConf pc_mem_conf;
75
  ObObj oracle_mode;
76
  oracle_mode.set_int(1);
77
  oracle_sql_mode.set_uint(ObUInt64Type, DEFAULT_ORACLE_MODE);
78
  OZ (session.init(1, 1, allocator));
79
  OX (session.set_inner_session());
80
  OZ (session.load_default_sys_variable(print_info_log, is_sys_tenant));
81
  OZ (session.update_max_packet_size());
82
  OZ (session.init_tenant(tenant_name.ptr(), tenant_id));
83
  OZ (session.load_all_sys_vars(schema_guard));
84
  OZ (session.update_sys_variable(share::SYS_VAR_SQL_MODE, oracle_sql_mode));
85
  OZ (session.update_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, oracle_mode));
86
  OZ (session.update_sys_variable(share::SYS_VAR_NLS_DATE_FORMAT,
87
                                  ObTimeConverter::COMPAT_OLD_NLS_DATE_FORMAT));
88
  OZ (session.update_sys_variable(share::SYS_VAR_NLS_TIMESTAMP_FORMAT,
89
                                  ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_FORMAT));
90
  OZ (session.update_sys_variable(share::SYS_VAR_NLS_TIMESTAMP_TZ_FORMAT,
91
                                  ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_TZ_FORMAT));
92
  OZ (session.set_default_database(database_name));
93
  OZ (session.get_pc_mem_conf(pc_mem_conf));
94
  CK (OB_NOT_NULL(GCTX.sql_engine_));
95
  OX (session.set_database_id(database_id));
96
  OZ (session.set_user(
97
    user_info->get_user_name(), user_info->get_host_name_str(), user_info->get_user_id()));
98
  OX (session.set_user_priv_set(OB_PRIV_ALL | OB_PRIV_GRANT));
99
  OX (session.init_use_rich_format());
100

101
  OZ (exec_env.store(session));
102
  return ret;
103
}
104

105
int ObDBMSJobExecutor::init_env(ObDBMSJobInfo &job_info, ObSQLSessionInfo &session)
106
{
107
  int ret = OB_SUCCESS;
108
  ObSchemaGetterGuard schema_guard;
109
  const ObTenantSchema *tenant_info = NULL;
110
  const ObSysVariableSchema *sys_variable_schema = NULL;
111
  ObSEArray<const ObUserInfo *, 1> user_infos;
112
  const ObUserInfo* user_info = NULL;
113
  const ObDatabaseSchema *database_schema = NULL;
114
  share::schema::ObUserLoginInfo login_info;
115
  ObExecEnv exec_env;
116
  CK (OB_NOT_NULL(schema_service_));
117
  CK (job_info.valid());
118
  OZ (schema_service_->get_tenant_schema_guard(job_info.get_tenant_id(), schema_guard));
119
  OZ (schema_guard.get_tenant_info(job_info.get_tenant_id(), tenant_info));
120
  OZ (schema_guard.get_user_info(
121
    job_info.get_tenant_id(), job_info.get_lowner(), user_infos));
122
  OZ (schema_guard.get_database_schema(
123
    job_info.get_tenant_id(), job_info.get_lowner(), database_schema));
124
  OV (1 == user_infos.count(), OB_ERR_UNEXPECTED, K(job_info));
125
  CK (OB_NOT_NULL(user_info = user_infos.at(0)));
126
  CK (OB_NOT_NULL(user_info));
127
  CK (OB_NOT_NULL(tenant_info));
128
  CK (OB_NOT_NULL(database_schema));
129
  OZ (exec_env.init(job_info.get_exec_env()));
130
  OZ (init_session(session,
131
                   schema_guard,
132
                   tenant_info->get_tenant_name(),
133
                   job_info.get_tenant_id(),
134
                   database_schema->get_database_name(),
135
                   database_schema->get_database_id(),
136
                   user_info,
137
                   exec_env));
138
  return ret;
139
}
140

141
int ObDBMSJobExecutor::run_dbms_job(
142
  uint64_t tenant_id, ObDBMSJobInfo &job_info, ObIAllocator &allocator)
143
{
144
  int ret = OB_SUCCESS;
145
#ifndef OB_BUILD_ORACLE_PL
146
    UNUSEDx(tenant_id, job_info, allocator);
147
    ret = OB_NOT_SUPPORTED;
148
    LOG_WARN("not support", K(ret));
149
#else
150
  ObSqlString what;
151
  ObInnerSQLConnectionPool *pool = NULL;
152
  ObInnerSQLConnection *conn = NULL;
153
  SMART_VAR(ObSQLSessionInfo, session) {
154
    int64_t affected_rows = 0;
155
    CK (OB_LIKELY(inited_));
156
    CK (OB_NOT_NULL(sql_proxy_));
157
    CK (sql_proxy_->is_inited());
158
    if (OB_SUCC(ret)) {
159
      if (job_info.is_mysql_audit_job()) {
160
        pl::ObDbmsAuditMgmt::ObAuditParam audit_param;
161
        OZ (pl::ObDbmsAuditMgmt::parse_mysql_job_param(job_info.get_what(), audit_param));
162
        OZ (pl::ObDbmsAuditMgmt::clean_audit_trail_impl(audit_param, allocator, sql_proxy_, tenant_id), audit_param);
163
      } else {
164
        ObOracleSqlProxy oracle_proxy(*(static_cast<ObMySQLProxy *>(sql_proxy_)));
165
        CK (job_info.valid());
166
        CK (job_info.get_what().length() != 0);
167
        OZ (what.append_fmt("BEGIN %.*s END;",
168
                            job_info.get_what().length(), job_info.get_what().ptr()));
169
        CK (OB_NOT_NULL(pool = static_cast<ObInnerSQLConnectionPool *>(oracle_proxy.get_pool())));
170
        OZ (init_env(job_info, session));
171
        OZ (pool->acquire_spi_conn(&session, conn));
172
        OZ (conn->execute_write(tenant_id, what.string().ptr(), affected_rows));
173
        if (OB_NOT_NULL(conn)) {
174
          sql_proxy_->close(conn, ret);
175
        }
176
      }
177
    }
178
  }
179
#endif
180
  return ret;
181
}
182

183
int ObDBMSJobExecutor::run_dbms_job(uint64_t tenant_id, uint64_t job_id)
184
{
185
  int ret = OB_SUCCESS;
186
  ObDBMSJobInfo job_info;
187
  ObArenaAllocator allocator;
188

189
  THIS_WORKER.set_timeout_ts(INT64_MAX);
190

191
  OZ (job_utils_.get_dbms_job_info(tenant_id, job_id, allocator, job_info));
192

193
  if (OB_SUCC(ret)) {
194
    OZ (job_utils_.update_for_start(tenant_id, job_info));
195

196
    OZ (run_dbms_job(tenant_id, job_info, allocator));
197

198
    int tmp_ret = OB_SUCCESS;
199
    ObString errmsg = common::ob_get_tsi_err_msg(ret);
200
    if (errmsg.empty() && ret != OB_SUCCESS) {
201
      errmsg = ObString(strlen(ob_errpkt_strerror(ret, lib::is_oracle_mode())),
202
                        ob_errpkt_strerror(ret, lib::is_oracle_mode()));
203
    }
204
    if ((tmp_ret = job_utils_.update_for_end(tenant_id, job_info, ret, errmsg)) != OB_SUCCESS) {
205
      LOG_WARN("update dbms job failed", K(tmp_ret), K(ret));
206
    }
207
    ret = OB_SUCCESS == ret ? tmp_ret : ret;
208
  }
209
  return ret;
210
}
211

212
}
213
}
214

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.