oceanbase

Форк
0
/
ob_dbms_sched_job_utils.cpp 
392 строки · 15.2 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14

15
#include "ob_dbms_sched_job_utils.h"
16

17
#include "lib/ob_errno.h"
18
#include "lib/oblog/ob_log_module.h"
19
#include "lib/time/ob_time_utility.h"
20
#include "lib/string/ob_string.h"
21
#include "lib/string/ob_sql_string.h"
22
#include "lib/mysqlclient/ob_mysql_proxy.h"
23
#include "lib/mysqlclient/ob_mysql_result.h"
24
#include "lib/mysqlclient/ob_isql_client.h"
25
#include "lib/mysqlclient/ob_mysql_transaction.h"
26
#include "rpc/obrpc/ob_rpc_packet.h"
27
#include "lib/worker.h"
28
#include "share/ob_dml_sql_splicer.h"
29
#include "share/inner_table/ob_inner_table_schema_constants.h"
30
#include "share/schema/ob_schema_utils.h"
31
#include "observer/omt/ob_tenant_config_mgr.h"
32
#include "observer/ob_server_struct.h"
33
#include "sql/session/ob_sql_session_mgr.h"
34
#include "share/schema/ob_schema_getter_guard.h"
35
#include "share/stat/ob_dbms_stats_maintenance_window.h"
36

37
namespace oceanbase
38
{
39

40
using namespace common;
41
using namespace share;
42
using namespace share::schema;
43
using namespace sqlclient;
44
using namespace sql;
45

46
namespace dbms_scheduler
47
{
48

49
int ObDBMSSchedJobInfo::deep_copy(ObIAllocator &allocator, const ObDBMSSchedJobInfo &other)
50
{
51
  int ret = OB_SUCCESS;
52
  tenant_id_ = other.tenant_id_;
53
  job_ = other.job_;
54
  last_modify_ = other.last_modify_;
55
  last_date_ = other.last_date_;
56
  this_date_ = other.this_date_;
57
  next_date_ = other.next_date_;
58
  total_ = other.total_;
59
  failures_ = other.failures_;
60
  flag_ = other.flag_;
61
  scheduler_flags_ = other.scheduler_flags_;
62
  start_date_ = other.start_date_;
63
  end_date_ = other.end_date_;
64
  enabled_ = other.enabled_;
65
  auto_drop_ = other.auto_drop_;
66
  interval_ts_ = other.interval_ts_;
67
  is_oracle_tenant_ = other.is_oracle_tenant_;
68
  max_run_duration_ = other.max_run_duration_;
69

70
  OZ (ob_write_string(allocator, other.lowner_, lowner_));
71
  OZ (ob_write_string(allocator, other.powner_, powner_));
72
  OZ (ob_write_string(allocator, other.cowner_, cowner_));
73

74
  OZ (ob_write_string(allocator, other.interval_, interval_));
75

76
  OZ (ob_write_string(allocator, other.what_, what_));
77
  OZ (ob_write_string(allocator, other.nlsenv_, nlsenv_));
78
  OZ (ob_write_string(allocator, other.charenv_, charenv_));
79
  OZ (ob_write_string(allocator, other.field1_, field1_));
80
  OZ (ob_write_string(allocator, other.exec_env_, exec_env_));
81
  OZ (ob_write_string(allocator, other.job_name_, job_name_));
82
  OZ (ob_write_string(allocator, other.job_class_, job_class_));
83
  OZ (ob_write_string(allocator, other.program_name_, program_name_));
84
  return ret;
85
}
86

87
int ObDBMSSchedJobClassInfo::deep_copy(common::ObIAllocator &allocator, const ObDBMSSchedJobClassInfo &other)
88
{
89
  int ret = OB_SUCCESS;
90
  tenant_id_ = other.tenant_id_;
91
  is_oracle_tenant_ = other.is_oracle_tenant_;
92
  log_history_ = other.log_history_;
93
  OZ (ob_write_string(allocator, other.job_class_name_, job_class_name_));
94
  OZ (ob_write_string(allocator, other.service_, service_));
95
  OZ (ob_write_string(allocator, other.resource_consumer_group_, resource_consumer_group_));
96
  OZ (ob_write_string(allocator, other.logging_level_, logging_level_));
97
  OZ (ob_write_string(allocator, other.comments_, comments_));
98
  return ret;
99
}
100

101
int ObDBMSSchedJobUtils::disable_dbms_sched_job(
102
    ObISQLClient &sql_client,
103
    const uint64_t tenant_id,
104
    const ObString &job_name,
105
    const bool if_exists)
106
{
107
  int ret = OB_SUCCESS;
108
  if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || job_name.empty())) {
109
    ret = OB_INVALID_ARGUMENT;
110
    LOG_WARN("invalid args", KR(ret), K(tenant_id), K(job_name));
111
  } else {
112
    const uint64_t exec_tenant_id = ObSchemaUtils::get_exec_tenant_id(tenant_id);
113
    ObDMLSqlSplicer dml;
114
    if (OB_FAIL(dml.add_pk_column(
115
        "tenant_id", ObSchemaUtils::get_extract_tenant_id(exec_tenant_id, tenant_id)))
116
        || OB_FAIL(dml.add_pk_column("job_name", job_name))
117
        || OB_FAIL(dml.add_column("enabled", false))) {
118
      LOG_WARN("add column failed", KR(ret));
119
    } else {
120
      ObDMLExecHelper exec(sql_client, exec_tenant_id);
121
      int64_t affected_rows = 0;
122
      if (OB_FAIL(exec.exec_update(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, dml, affected_rows))) {
123
        LOG_WARN("execute update failed", KR(ret));
124
      } else if (!if_exists && !is_double_row(affected_rows)) {
125
        ret = OB_ERR_UNEXPECTED;
126
        LOG_WARN("affected_rows unexpected to be two", KR(ret), K(affected_rows));
127
      }
128
    }
129
  }
130
  return ret;
131
}
132

133
int ObDBMSSchedJobUtils::remove_dbms_sched_job(
134
    ObISQLClient &sql_client,
135
    const uint64_t tenant_id,
136
    const ObString &job_name,
137
    const bool if_exists)
138
{
139
  int ret = OB_SUCCESS;
140
  if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || job_name.empty())) {
141
    ret = OB_INVALID_ARGUMENT;
142
    LOG_WARN("invalid args", KR(ret), K(tenant_id), K(job_name));
143
  } else {
144
    const uint64_t exec_tenant_id = ObSchemaUtils::get_exec_tenant_id(tenant_id);
145
    ObDMLSqlSplicer dml;
146
    if (OB_FAIL(dml.add_pk_column(
147
        "tenant_id", ObSchemaUtils::get_extract_tenant_id(exec_tenant_id, tenant_id)))
148
        || OB_FAIL(dml.add_pk_column("job_name", job_name))) {
149
      LOG_WARN("add column failed", KR(ret));
150
    } else {
151
      ObDMLExecHelper exec(sql_client, exec_tenant_id);
152
      int64_t affected_rows = 0;
153
      if (OB_FAIL(exec.exec_delete(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, dml, affected_rows))) {
154
        LOG_WARN("execute delete failed", KR(ret));
155
      } else if (!if_exists && !is_double_row(affected_rows)) {
156
        ret = OB_ERR_UNEXPECTED;
157
        LOG_WARN("affected_rows unexpected to be two", KR(ret), K(affected_rows));
158
      }
159
    }
160
  }
161
  return ret;
162
}
163

164
int ObDBMSSchedJobUtils::create_dbms_sched_job(
165
    common::ObISQLClient &sql_client,
166
    const uint64_t tenant_id,
167
    const int64_t job_id,
168
    const dbms_scheduler::ObDBMSSchedJobInfo &job_info)
169
{
170
  int ret = OB_SUCCESS;
171
  if (OB_INVALID_TENANT_ID == tenant_id) {
172
    ret = OB_INVALID_ARGUMENT;
173
    LOG_WARN("invalid tenant id", KR(ret), K(tenant_id));
174
  } else {
175
    if (OB_FAIL(add_dbms_sched_job(sql_client, tenant_id, job_id, job_info))) {
176
      LOG_WARN("failed to add dbms scheduler job", KR(ret));
177
    } else if (OB_FAIL(add_dbms_sched_job(sql_client, tenant_id, 0, job_info))) {
178
      LOG_WARN("failed to add dbms scheduler job", KR(ret));
179
    }
180
  }
181
  return ret;
182
}
183

184
int ObDBMSSchedJobUtils::add_dbms_sched_job(
185
    common::ObISQLClient &sql_client,
186
    const uint64_t tenant_id,
187
    const int64_t job_id,
188
    const dbms_scheduler::ObDBMSSchedJobInfo &job_info)
189
{
190
  int ret = OB_SUCCESS;
191
  if (OB_INVALID_TENANT_ID == tenant_id) {
192
    ret = OB_INVALID_ARGUMENT;
193
    LOG_WARN("invalid tenant id", KR(ret), K(tenant_id));
194
  } else {
195
    ObDMLSqlSplicer dml;
196
    const uint64_t exec_tenant_id = ObSchemaUtils::get_exec_tenant_id(tenant_id);
197
    ObDMLExecHelper exec(sql_client, exec_tenant_id);
198
    int64_t affected_rows = 0;
199
    const int64_t now = ObTimeUtility::current_time();
200

201
    OZ (dml.add_gmt_create(now));
202
    OZ (dml.add_gmt_modified(now));
203
    OZ (dml.add_pk_column("tenant_id",
204
        ObSchemaUtils::get_extract_tenant_id(job_info.tenant_id_, job_info.tenant_id_)));
205
    OZ (dml.add_pk_column("job", job_id));
206
    OZ (dml.add_column("lowner", ObHexEscapeSqlStr(job_info.lowner_)));
207
    OZ (dml.add_column("powner", ObHexEscapeSqlStr(job_info.powner_)));
208
    OZ (dml.add_column("cowner", ObHexEscapeSqlStr(job_info.cowner_)));
209
    OZ (dml.add_raw_time_column("next_date", job_info.start_date_));
210
    OZ (dml.add_column("total", 0));
211
    OZ (dml.add_column("`interval#`", ObHexEscapeSqlStr(
212
        job_info.repeat_interval_.empty() ? ObString("null") : job_info.repeat_interval_)));
213
    OZ (dml.add_column("flag", job_info.flag_));
214
    OZ (dml.add_column("job_name", ObHexEscapeSqlStr(job_info.job_name_)));
215
    OZ (dml.add_column("job_style", ObHexEscapeSqlStr(job_info.job_style_)));
216
    OZ (dml.add_column("job_type", ObHexEscapeSqlStr(job_info.job_type_)));
217
    OZ (dml.add_column("job_class", ObHexEscapeSqlStr(job_info.job_class_)));
218
    OZ (dml.add_column("job_action", ObHexEscapeSqlStr(job_info.job_action_)));
219
    OZ (dml.add_column("what", ObHexEscapeSqlStr(job_info.job_action_)));
220
    OZ (dml.add_raw_time_column("start_date", job_info.start_date_));
221
    OZ (dml.add_raw_time_column("end_date", job_info.end_date_));
222
    OZ (dml.add_column("repeat_interval", ObHexEscapeSqlStr(job_info.repeat_interval_)));
223
    OZ (dml.add_column("enabled", job_info.enabled_));
224
    OZ (dml.add_column("auto_drop", job_info.auto_drop_));
225
    OZ (dml.add_column("max_run_duration", job_info.max_run_duration_));
226
    OZ (dml.add_column("interval_ts", job_info.interval_ts_));
227
    OZ (dml.add_column("scheduler_flags", job_info.scheduler_flags_));
228
    OZ (dml.add_column("exec_env", job_info.exec_env_));
229

230
    if (OB_SUCC(ret) && OB_FAIL(exec.exec_insert(
231
        OB_ALL_TENANT_SCHEDULER_JOB_TNAME, dml, affected_rows))) {
232
      LOG_WARN("failed to execute insert", KR(ret));
233
    } else if (OB_UNLIKELY(!is_single_row(affected_rows))) {
234
      ret = OB_ERR_UNEXPECTED;
235
      LOG_WARN("affected_rows unexpected to be one", KR(ret), K(affected_rows));
236
    }
237
  }
238
  return ret;
239
}
240

241
int ObDBMSSchedJobUtils::init_session(
242
  ObSQLSessionInfo &session,
243
  ObSchemaGetterGuard &schema_guard,
244
  const ObString &tenant_name,
245
  uint64_t tenant_id,
246
  const ObString &database_name,
247
  uint64_t database_id,
248
  const ObUserInfo* user_info,
249
  const ObDBMSSchedJobInfo &job_info)
250
{
251
  int ret = OB_SUCCESS;
252
  ObArenaAllocator *allocator = NULL;
253
  const bool print_info_log = true;
254
  const bool is_sys_tenant = true;
255
  ObPCMemPctConf pc_mem_conf;
256
  ObObj compatibility_mode;
257
  ObObj sql_mode;
258
  if (job_info.is_oracle_tenant_) {
259
    compatibility_mode.set_int(1);
260
    sql_mode.set_uint(ObUInt64Type, DEFAULT_ORACLE_MODE);
261
  } else {
262
    compatibility_mode.set_int(0);
263
    sql_mode.set_uint(ObUInt64Type, DEFAULT_MYSQL_MODE);
264
  }
265
  OX (session.set_inner_session());
266
  OZ (session.load_default_sys_variable(print_info_log, is_sys_tenant));
267
  OZ (session.update_max_packet_size());
268
  OZ (session.init_tenant(tenant_name.ptr(), tenant_id));
269
  OZ (session.load_all_sys_vars(schema_guard));
270
  OZ (session.update_sys_variable(share::SYS_VAR_SQL_MODE, sql_mode));
271
  OZ (session.update_sys_variable(share::SYS_VAR_OB_COMPATIBILITY_MODE, compatibility_mode));
272
  OZ (session.update_sys_variable(share::SYS_VAR_NLS_DATE_FORMAT,
273
                                  ObTimeConverter::COMPAT_OLD_NLS_DATE_FORMAT));
274
  OZ (session.update_sys_variable(share::SYS_VAR_NLS_TIMESTAMP_FORMAT,
275
                                  ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_FORMAT));
276
  OZ (session.update_sys_variable(share::SYS_VAR_NLS_TIMESTAMP_TZ_FORMAT,
277
                                  ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_TZ_FORMAT));
278
  OZ (session.set_default_database(database_name));
279
  OZ (session.get_pc_mem_conf(pc_mem_conf));
280
  CK (OB_NOT_NULL(GCTX.sql_engine_));
281
  OX (session.set_database_id(database_id));
282
  OZ (session.set_user(
283
    user_info->get_user_name(), user_info->get_host_name_str(), user_info->get_user_id()));
284
  OX (session.set_user_priv_set(OB_PRIV_ALL | OB_PRIV_GRANT));
285
  if (OB_SUCC(ret) && job_info.is_date_expression_job_class()) {
286
    // set larger timeout for mview scheduler jobs
287
    const int64_t QUERY_TIMEOUT_US = (24 * 60 * 60 * 1000000L); // 24hours
288
    const int64_t TRX_TIMEOUT_US = (24 * 60 * 60 * 1000000L); // 24hours
289
    ObObj query_timeout_obj;
290
    ObObj trx_timeout_obj;
291
    query_timeout_obj.set_int(QUERY_TIMEOUT_US);
292
    trx_timeout_obj.set_int(TRX_TIMEOUT_US);
293
    OZ (session.update_sys_variable(SYS_VAR_OB_QUERY_TIMEOUT, query_timeout_obj));
294
    OZ (session.update_sys_variable(SYS_VAR_OB_TRX_TIMEOUT, trx_timeout_obj));
295
  }
296
  return ret;
297
}
298

299
int ObDBMSSchedJobUtils::init_env(
300
    ObDBMSSchedJobInfo &job_info,
301
    ObSQLSessionInfo &session)
302
{
303
  int ret = OB_SUCCESS;
304
  ObSchemaGetterGuard schema_guard;
305
  const ObTenantSchema *tenant_info = NULL;
306
  const ObSysVariableSchema *sys_variable_schema = NULL;
307
  ObSEArray<const ObUserInfo *, 1> user_infos;
308
  const ObUserInfo* user_info = NULL;
309
  const ObDatabaseSchema *database_schema = NULL;
310
  share::schema::ObUserLoginInfo login_info;
311
  ObExecEnv exec_env;
312
  CK (OB_NOT_NULL(GCTX.schema_service_));
313
  CK (job_info.valid());
314
  OZ (GCTX.schema_service_->get_tenant_schema_guard(job_info.get_tenant_id(), schema_guard));
315
  OZ (schema_guard.get_tenant_info(job_info.get_tenant_id(), tenant_info));
316
  OZ (schema_guard.get_user_info(
317
    job_info.get_tenant_id(), job_info.get_powner(), user_infos));
318
  OZ (schema_guard.get_database_schema(
319
    job_info.get_tenant_id(), job_info.get_cowner(), database_schema));
320
  if (OB_SUCC(ret) &&
321
      user_infos.count() > 1 &&
322
      ObDbmsStatsMaintenanceWindow::is_stats_job(job_info.get_job_name())) {
323
    OZ(ObDbmsStatsMaintenanceWindow::reset_opt_stats_user_infos(user_infos));
324
  }
325
  OV (1 == user_infos.count(), OB_ERR_UNEXPECTED, K(job_info), K(user_infos));
326
  CK (OB_NOT_NULL(user_info = user_infos.at(0)));
327
  CK (OB_NOT_NULL(user_info));
328
  CK (OB_NOT_NULL(tenant_info));
329
  CK (OB_NOT_NULL(database_schema));
330
  OZ (exec_env.init(job_info.get_exec_env()));
331
  OZ (init_session(session,
332
                   schema_guard,
333
                   tenant_info->get_tenant_name(),
334
                   job_info.get_tenant_id(),
335
                   database_schema->get_database_name(),
336
                   database_schema->get_database_id(),
337
                   user_info,
338
                   job_info));
339
  OZ (exec_env.store(session));
340
  return ret;
341
}
342

343
int ObDBMSSchedJobUtils::create_session(
344
    const uint64_t tenant_id,
345
    ObFreeSessionCtx &free_session_ctx,
346
    ObSQLSessionInfo *&session_info)
347
{
348
  int ret = OB_SUCCESS;
349
  uint32_t sid = sql::ObSQLSessionInfo::INVALID_SESSID;
350
  uint64_t proxy_sid = 0;
351
  if (OB_ISNULL(GCTX.session_mgr_)) {
352
    ret = OB_ERR_UNEXPECTED;
353
    LOG_WARN("session_mgr_ is null", KR(ret));
354
  } else if (OB_FAIL(GCTX.session_mgr_->create_sessid(sid))) {
355
    LOG_WARN("alloc session id failed", KR(ret));
356
  } else if (OB_FAIL(GCTX.session_mgr_->create_session(
357
                tenant_id, sid, proxy_sid, ObTimeUtility::current_time(), session_info))) {
358
    LOG_WARN("create session failed", K(ret), K(sid));
359
    GCTX.session_mgr_->mark_sessid_unused(sid);
360
    session_info = NULL;
361
  } else if (OB_ISNULL(session_info)) {
362
    ret = OB_ERR_UNEXPECTED;
363
    LOG_WARN("unexpected session info is null", K(ret));
364
  } else {
365
    free_session_ctx.sessid_ = sid;
366
    free_session_ctx.proxy_sessid_ = proxy_sid;
367
  }
368
  return ret;
369
}
370

371
int ObDBMSSchedJobUtils::destroy_session(
372
    ObFreeSessionCtx &free_session_ctx,
373
    ObSQLSessionInfo *session_info)
374
{
375
  int ret = OB_SUCCESS;
376
  if (OB_ISNULL(GCTX.session_mgr_)) {
377
    ret = OB_ERR_UNEXPECTED;
378
    LOG_WARN("session_mgr_ is null", KR(ret));
379
  } else if (OB_ISNULL(session_info)) {
380
    ret = OB_INVALID_ARGUMENT;
381
    LOG_WARN("session_info is null", KR(ret));
382
  } else {
383
    session_info->set_session_sleep();
384
    GCTX.session_mgr_->revert_session(session_info);
385
    GCTX.session_mgr_->free_session(free_session_ctx);
386
    GCTX.session_mgr_->mark_sessid_unused(free_session_ctx.sessid_);
387
  }
388
  return ret;
389
}
390

391
} // end for namespace dbms_scheduler
392
} // end for namespace oceanbase
393

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.