oceanbase

Форк
0
/
ob_dbms_sched_table_operator.cpp 
601 строка · 24.6 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14

15
#include "ob_dbms_sched_table_operator.h"
16

17
#include "lib/ob_errno.h"
18
#include "lib/oblog/ob_log_module.h"
19
#include "lib/time/ob_time_utility.h"
20
#include "lib/string/ob_string.h"
21
#include "lib/string/ob_sql_string.h"
22
#include "lib/mysqlclient/ob_mysql_proxy.h"
23
#include "lib/mysqlclient/ob_mysql_result.h"
24
#include "lib/mysqlclient/ob_isql_client.h"
25
#include "lib/mysqlclient/ob_mysql_transaction.h"
26
#include "rpc/obrpc/ob_rpc_packet.h"
27
#include "lib/worker.h"
28
#include "share/ob_dml_sql_splicer.h"
29
#include "share/inner_table/ob_inner_table_schema_constants.h"
30
#include "share/schema/ob_schema_utils.h"
31
#include "observer/omt/ob_tenant_config_mgr.h"
32
#include "observer/ob_server_struct.h"
33
#include "observer/dbms_scheduler/ob_dbms_sched_job_utils.h"
34
#include "share/schema/ob_multi_version_schema_service.h"
35

36

37
namespace oceanbase
38
{
39

40
using namespace common;
41
using namespace share;
42
using namespace share::schema;
43
using namespace sqlclient;
44
using namespace storage;
45

46
namespace dbms_scheduler
47
{
48

49
int ObDBMSSchedTableOperator::update_next_date(
50
  uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, int64_t next_date)
51
{
52
  int ret = OB_SUCCESS;
53

54
  ObDMLSqlSplicer dml;
55
  ObSqlString sql;
56
  int64_t affected_rows = 0;
57
  const int64_t now = ObTimeUtility::current_time();
58

59
  CK (OB_NOT_NULL(sql_proxy_));
60
  CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
61
  CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID));
62

63
  OZ (dml.add_gmt_modified(now));
64
  OZ (dml.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
65
  OZ (dml.add_pk_column("job", job_info.job_));
66
  OZ (dml.add_pk_column("job_name", job_info.job_name_));
67
  OZ (dml.add_time_column("next_date", next_date));
68
  OZ (dml.splice_update_sql(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, sql));
69
  OZ (sql_proxy_->write(tenant_id, sql.ptr(), affected_rows));
70
  return ret;
71
}
72

73

74
int ObDBMSSchedTableOperator::update_for_start(
75
  uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, int64_t next_date)
76
{
77
  int ret = OB_SUCCESS;
78

79
  ObDMLSqlSplicer dml;
80
  ObSqlString sql;
81
  int64_t affected_rows = 0;
82
  const int64_t now = ObTimeUtility::current_time();
83

84
  CK (OB_NOT_NULL(sql_proxy_));
85
  CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
86
  CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID));
87

88
  OX (job_info.this_date_ = now);
89
  OZ (dml.add_gmt_modified(now));
90
  OZ (dml.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
91
  OZ (dml.add_pk_column("job", job_info.job_));
92
  OZ (dml.add_pk_column("job_name", job_info.job_name_));
93
  OZ (dml.add_time_column("this_date", job_info.this_date_));
94
  OZ (dml.add_time_column("next_date", next_date));
95
  OZ (dml.add_column("state", "SCHEDULED"));
96
  OZ (dml.splice_update_sql(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, sql));
97
  OZ (sql.append_fmt(" and this_date is null"));
98
  OZ (sql_proxy_->write(tenant_id, sql.ptr(), affected_rows));
99
  CK (affected_rows == 1);
100
  return ret;
101
}
102

103
int ObDBMSSchedTableOperator::seperate_job_id_from_name(ObString &job_name, int64_t &job_id)
104
{
105
  int ret = OB_SUCCESS;
106
  const char* prefix = "JOB$_";
107
  job_id = 0;
108
  if (job_name.prefix_match(prefix)) {
109
    char nptr[JOB_NAME_MAX_SIZE];
110
    char *endptr = NULL;
111
    snprintf(nptr, JOB_NAME_MAX_SIZE, "%.*s", job_name.length(), job_name.ptr());
112
    job_id = strtoll(nptr + strlen(prefix), &endptr, 10);
113
    if (job_id <= 0) {
114
      LOG_WARN("job_id is not right", K(job_name), K(nptr), K(job_id));
115
    } else if (*endptr != '\0' || job_id <= JOB_ID_OFFSET) {
116
      job_id = 0; // use job_info.job_ when job_id is not formal
117
    }
118
  }
119
  return ret;
120
}
121

122

123
int ObDBMSSchedTableOperator::update_for_end(
124
  uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, int err, const ObString &errmsg)
125
{
126
  int ret = OB_SUCCESS;
127

128
  ObMySQLTransaction trans;
129
  ObDMLSqlSplicer dml1;
130
  ObSqlString sql1;
131
  ObDMLSqlSplicer dml2;
132
  ObSqlString sql2;
133
  int64_t affected_rows = 0;
134
  const int64_t now = ObTimeUtility::current_time();
135

136
  UNUSED(errmsg);
137

138
  CK (OB_NOT_NULL(sql_proxy_));
139
  CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
140
  CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID));
141

142
  uint64_t data_version = 0;
143
  if (OB_SUCC(ret)) {
144
    if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
145
      LOG_WARN("fail to get tenant data version", KR(ret), K(data_version));
146
    } else if (MOCK_DATA_VERSION <= data_version) {
147
      CK (OB_LIKELY(!job_info.job_class_.empty()));
148
    }
149
  }
150

151
  ObDBMSSchedJobClassInfo job_class_info;
152
  ObArenaAllocator allocator("DBMSSchedTmp");
153
  if (MOCK_DATA_VERSION <= data_version) {
154
    OZ (get_dbms_sched_job_class_info(tenant_id, job_info.is_oracle_tenant(), job_info.get_job_class(), allocator, job_class_info));
155
  }
156
  // when if failures > 16 then set broken flag.
157
  OX (job_info.failures_ = errmsg.empty() ? 0 : (job_info.failures_ + 1));
158
  OX (job_info.flag_ = job_info.failures_ > 15 ? (job_info.flag_ | 0x1) : (job_info.flag_ & 0xfffffffffffffffE));
159
  if ((now >= job_info.end_date_ || job_info.get_interval_ts() == 0) && (true == job_info.auto_drop_)) {
160
    // when end_date is reach or no interval set, and auto_drop is set true, drop job.
161
    OZ (dml1.add_gmt_modified(now));
162
    OZ (dml1.add_pk_column("tenant_id",
163
          ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
164
    OZ (dml1.add_pk_column("job_name", job_info.job_name_));
165
    OZ (dml1.splice_delete_sql(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, sql1));
166
  } else {
167
    if (OB_SUCC(ret) && ((job_info.flag_ & 0x1) != 0)) {
168
      // when if failures > 16 then set broken state.
169
      job_info.next_date_ = 64060560000000000; // 4000-01-01
170
      OZ (dml1.add_column("state", "BROKEN"));
171
    } else if (now >= job_info.end_date_) {
172
      // when end_date is reach and auto_drop is set false, disable set completed state.
173
      job_info.enabled_ = false;
174
      OZ (dml1.add_column("state", "COMPLETED"));
175
      OZ (dml1.add_column("enabled", job_info.enabled_));
176
    }
177
    CK (job_info.this_date_ > 0 || !errmsg.empty());
178
    OX (job_info.total_ += (job_info.this_date_ > 0 ? now - job_info.this_date_ : 0));
179
    OZ (dml1.add_gmt_modified(now));
180
    OZ (dml1.add_pk_column("tenant_id",
181
          ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
182
    OZ (dml1.add_pk_column("job", job_info.job_));
183
    OZ (dml1.add_pk_column("job_name", job_info.job_name_));
184
    OZ (dml1.add_column(true, "this_date"));
185
    OZ (dml1.add_time_column("last_date", job_info.this_date_));
186
    OZ (dml1.add_time_column("next_date", job_info.next_date_));
187
    OZ (dml1.add_column("failures", job_info.failures_));
188
    OZ (dml1.add_column("flag", job_info.failures_ > 16 ? 1 : job_info.flag_));
189
    OZ (dml1.add_column("total", job_info.total_));
190
    OZ (dml1.splice_update_sql(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, sql1));
191
  }
192

193
  //If a non-existent JOB CLASS is entered when creating a JOB,
194
  //job_run_detail still needs to be recorded.
195
  bool need_write_job_run_detail = true;
196
  if (MOCK_DATA_VERSION <= data_version) {
197
    ObString logging_level = job_class_info.get_logging_level();
198
    if (logging_level.empty()) {
199
      LOG_WARN("logging_level may not assigned");
200
    } else if (0 == logging_level.case_compare("OFF")) {
201
      need_write_job_run_detail = false;
202
    } else if (0 == logging_level.case_compare("RUNS")) {
203
      need_write_job_run_detail = true;
204
    } else if (0 == logging_level.case_compare("FAILED RUNS") && !errmsg.empty()) {
205
      need_write_job_run_detail = true;
206
    }
207
  }
208

209
  if (need_write_job_run_detail) {
210
    OZ (dml2.add_gmt_create(now));
211
    OZ (dml2.add_gmt_modified(now));
212
    OZ (dml2.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
213
    int64_t job_id = 0;
214
    OZ (seperate_job_id_from_name(job_info.get_job_name(), job_id));
215
    if (job_id <= 0) {
216
      job_id = job_info.get_job_id();
217
    }
218
    OZ (dml2.add_pk_column("job", job_id));
219
    OZ (dml2.add_time_column("time", now));
220
    OZ (dml2.add_column("code", err));
221
    OZ (dml2.add_column(
222
      "message", ObHexEscapeSqlStr(errmsg.empty() ? ObString("SUCCESS") : errmsg)));
223
    if (MOCK_DATA_VERSION <= data_version) {
224
      OZ (dml2.add_column("job_class", job_info.job_class_));
225
    }
226
    OZ (dml2.splice_insert_sql(OB_ALL_TENANT_SCHEDULER_JOB_RUN_DETAIL_TNAME, sql2));
227
  }
228

229
  OZ (trans.start(sql_proxy_, tenant_id, true));
230

231
  OZ (trans.write(tenant_id, sql1.ptr(), affected_rows));
232
  if (need_write_job_run_detail) {
233
    OZ (trans.write(tenant_id, sql2.ptr(), affected_rows));
234
  }
235

236
  int tmp_ret = OB_SUCCESS;
237
  if (trans.is_started()) {
238
    if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
239
      LOG_WARN("failed to commit trans", KR(ret), KR(tmp_ret));
240
      ret = OB_SUCC(ret) ? tmp_ret : ret;
241
    }
242
  }
243

244
  return ret;
245
}
246

247
int ObDBMSSchedTableOperator::check_job_can_running(int64_t tenant_id, int64_t alive_job_count, bool &can_running)
248
{
249
  int ret = OB_SUCCESS;
250
  uint64_t job_queue_processor = 0;
251
  uint64_t job_running_cnt = 0;
252
  ObSqlString sql;
253
  omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
254
  share::schema::ObSchemaGetterGuard guard;
255
  bool is_restore = false;
256
  OX (can_running = false);
257
  CK (tenant_config.is_valid());
258
  OX (job_queue_processor = tenant_config->job_queue_processes);
259
  // found current running job count
260
  if (OB_FAIL(ret)) {
261
  } else if (alive_job_count <= job_queue_processor) {
262
    can_running = true;
263
  } else {
264
    OZ (sql.append_fmt("select count(*) from %s where this_date is not null", OB_ALL_TENANT_SCHEDULER_JOB_TNAME));
265

266
    CK (OB_NOT_NULL(GCTX.schema_service_));
267
    OZ (GCTX.schema_service_->get_tenant_schema_guard(tenant_id, guard));
268
    OZ (guard.check_tenant_is_restore(tenant_id, is_restore));
269

270
    // job can not run in standy cluster and restore.
271
    if (OB_SUCC(ret) && job_queue_processor > 0
272
        && !is_restore) {
273
      SMART_VAR(ObMySQLProxy::MySQLResult, result) {
274
        if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
275
          LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id));
276
        } else if (OB_ISNULL(result.get_result())) {
277
          ret = OB_ERR_UNEXPECTED;
278
          LOG_WARN("get result failed", K(ret), K(sql), K(tenant_id));
279
        } else {
280
          if (OB_SUCCESS == (ret = result.get_result()->next())) {
281
            int64_t int_value = 0;
282
            if (OB_FAIL(result.get_result()->get_int(static_cast<const int64_t>(0), int_value))) {
283
              LOG_WARN("failed to get column in row. ", K(ret));
284
            } else {
285
              job_running_cnt = static_cast<uint64_t>(int_value);
286
            }
287
          } else {
288
            LOG_WARN("failed to calc all running job, no row return", K(ret));
289
          }
290
        }
291
      }
292
      OX (can_running = (job_queue_processor > job_running_cnt));
293
    }
294
  }
295
  return ret;
296
}
297

298
int ObDBMSSchedTableOperator::extract_info(
299
  sqlclient::ObMySQLResult &result, int64_t tenant_id, bool is_oracle_tenant,
300
  ObIAllocator &allocator, ObDBMSSchedJobInfo &job_info)
301
{
302
  int ret = OB_SUCCESS;
303
  ObDBMSSchedJobInfo job_info_local;
304

305
  job_info_local.tenant_id_ = tenant_id;
306
  job_info_local.is_oracle_tenant_ = is_oracle_tenant;
307
  EXTRACT_INT_FIELD_MYSQL(result, "job", job_info_local.job_, uint64_t);
308
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "lowner", job_info_local.lowner_);
309
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "powner", job_info_local.powner_);
310
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "cowner", job_info_local.cowner_);
311

312
#define EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, col_name, v)   \
313
do {                                                                  \
314
  ObObj obj;                                                          \
315
  OZ ((result).get_obj(col_name, obj));                               \
316
  if (OB_SUCC(ret)) {                                                 \
317
    if (obj.is_null()) {                                              \
318
      v = static_cast<int64_t>(0);                                    \
319
    } else {                                                          \
320
      OZ (obj.get_timestamp(v));                                      \
321
    }                                                                 \
322
  } else if (OB_ERR_COLUMN_NOT_FOUND == ret) {                        \
323
    ret = OB_SUCCESS;                                                 \
324
    v = static_cast<int64_t>(0);                                      \
325
  }                                                                   \
326
} while (false)
327

328
#define EXTRACT_NUMBER_FIELD_MYSQL_SKIP_RET(result, col_name, v)      \
329
do {                                                                  \
330
  common::number::ObNumber nmb_val;                                   \
331
  OZ ((result).get_number(col_name, nmb_val));                        \
332
  if (OB_ERR_NULL_VALUE == ret || OB_ERR_COLUMN_NOT_FOUND == ret) {   \
333
    ret = OB_SUCCESS;                                                 \
334
    v = static_cast<int64_t>(0);                                     \
335
  } else if (OB_SUCCESS == ret) {                                     \
336
    OZ (nmb_val.extract_valid_int64_with_trunc(v));                  \
337
  }                                                                   \
338
} while (false)
339

340
  EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "gmt_modified", job_info_local.last_modify_);
341
  EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "last_date", job_info_local.last_date_);
342
  EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "this_date", job_info_local.this_date_);
343
  EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "next_date", job_info_local.next_date_);
344
  EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "total", job_info_local.total_, uint64_t);
345
  EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "start_date", job_info_local.start_date_);
346
  EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "end_date", job_info_local.end_date_);
347

348
#undef EXTRACT_NUMBER_FIELD_MYSQL_SKIP_RET
349
#undef EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET
350

351
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "interval#", job_info_local.interval_);
352
  EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "failures", job_info_local.failures_, uint64_t);
353
  EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "flag", job_info_local.flag_, uint64_t);
354
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "what", job_info_local.what_);
355
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "nlsenv", job_info_local.nlsenv_);
356
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "charenv", job_info_local.charenv_);
357
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "field1", job_info_local.field1_);
358
  EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "scheduler_flags", job_info_local.scheduler_flags_, uint64_t);
359
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "exec_env", job_info_local.exec_env_);
360
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "job_name", job_info_local.job_name_);
361
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "job_class", job_info_local.job_class_);
362
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "program_name", job_info_local.program_name_);
363
  EXTRACT_BOOL_FIELD_MYSQL_SKIP_RET(result, "enabled", job_info_local.enabled_);
364
  EXTRACT_BOOL_FIELD_MYSQL_SKIP_RET(result, "auto_drop", job_info_local.auto_drop_);
365
  EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "interval_ts", job_info_local.interval_ts_, uint64_t);
366
  EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "max_run_duration", job_info_local.max_run_duration_, uint64_t);
367

368
  OZ (job_info.deep_copy(allocator, job_info_local));
369

370
  return ret;
371
}
372

373
int ObDBMSSchedTableOperator::get_dbms_sched_job_info(
374
  uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const common::ObString &job_name,
375
  ObIAllocator &allocator, ObDBMSSchedJobInfo &job_info)
376
{
377
  int ret = OB_SUCCESS;
378
  ObSqlString sql;
379
  int64_t affected_rows = 0;
380

381
  CK (OB_NOT_NULL(sql_proxy_));
382
  CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
383
  CK (OB_LIKELY(job_id != OB_INVALID_ID));
384

385
  if (!job_name.empty()) {
386
    OZ (sql.append_fmt("select * from %s where tenant_id = %lu and job_name = \'%.*s\' and job = %ld",
387
        OB_ALL_TENANT_SCHEDULER_JOB_TNAME,
388
        ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id),
389
        job_name.length(),
390
        job_name.ptr(),
391
        job_id));
392
  } else {
393
    OZ (sql.append_fmt("select * from %s where tenant_id = %lu and job = %ld",
394
        OB_ALL_TENANT_SCHEDULER_JOB_TNAME,
395
        ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id),
396
        job_id));
397
  }
398

399

400
  if (OB_SUCC(ret)) {
401
    SMART_VAR(ObMySQLProxy::MySQLResult, result) {
402
      if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
403
        LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id), K(job_id));
404
      } else if (OB_ISNULL(result.get_result())) {
405
        ret = OB_ERR_UNEXPECTED;
406
        LOG_WARN("failed to get result", K(ret), K(tenant_id), K(job_id));
407
      } else {
408
        if (OB_SUCCESS == (ret = result.get_result()->next())) {
409
          OZ (extract_info(*(result.get_result()), tenant_id, is_oracle_tenant, allocator, job_info));
410
          if (OB_SUCC(ret)) {
411
            int tmp_ret = result.get_result()->next();
412
            if (OB_SUCCESS == tmp_ret) {
413
              ret = OB_ERR_UNEXPECTED;
414
              LOG_ERROR("got more than one row for dbms sched job!", K(ret), K(tenant_id), K(job_id));
415
            } else if (tmp_ret != OB_ITER_END) {
416
              ret = tmp_ret;
417
              LOG_ERROR("got next row for dbms sched job failed", K(ret), K(tenant_id), K(job_id));
418
            }
419
          }
420
        } else if (OB_ITER_END == ret) {
421
          LOG_WARN("job not exists, may delete alreay!", K(ret), K(tenant_id), K(job_id));
422
        } else {
423
          LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_id));
424
        }
425
      }
426
    }
427
  }
428
  return ret;
429
}
430

431
int ObDBMSSchedTableOperator::get_dbms_sched_job_infos_in_tenant(
432
  uint64_t tenant_id, bool is_oracle_tenant, ObIAllocator &allocator, ObIArray<ObDBMSSchedJobInfo> &job_infos)
433
{
434
  int ret = OB_SUCCESS;
435
  ObSqlString sql;
436
  int64_t affected_rows = 0;
437

438
  CK (OB_NOT_NULL(sql_proxy_));
439
  CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
440

441
  OZ (sql.append_fmt("select * from %s where job > 0 and job_name != \'%s\' and (state is NULL or state != \'%s\')",
442
      OB_ALL_TENANT_SCHEDULER_JOB_TNAME,
443
      "__dummy_guard",
444
      "COMPLETED"));
445

446
  if (OB_SUCC(ret)) {
447
    SMART_VAR(ObMySQLProxy::MySQLResult, result) {
448
      if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
449
        LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id));
450
      } else if (OB_ISNULL(result.get_result())) {
451
        ret = OB_ERR_UNEXPECTED;
452
        LOG_WARN("get result failed", K(ret), K(sql), K(tenant_id));
453
      } else {
454
        do {
455
          if (OB_FAIL(result.get_result()->next())) {
456
            LOG_INFO("failed to get result", K(ret));
457
          } else {
458
            ObDBMSSchedJobInfo job_info;
459
            OZ (extract_info(*(result.get_result()), tenant_id, is_oracle_tenant, allocator, job_info));
460
            OZ (job_infos.push_back(job_info));
461
          }
462
        } while (OB_SUCC(ret));
463
        ret = OB_ITER_END == ret ? OB_SUCCESS : ret;
464
      }
465
    }
466
  }
467

468
  return ret;
469
}
470

471
int ObDBMSSchedTableOperator::extract_job_class_info(
472
  sqlclient::ObMySQLResult &result, int64_t tenant_id, bool is_oracle_tenant,
473
  ObIAllocator &allocator, ObDBMSSchedJobClassInfo &job_class_info)
474
{
475
  int ret = OB_SUCCESS;
476
  ObDBMSSchedJobClassInfo job_class_info_local;
477

478
  job_class_info_local.tenant_id_ = tenant_id;
479
  job_class_info_local.is_oracle_tenant_ = is_oracle_tenant;
480
  // EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "log_history", job_class_info_local.log_history_, uint64_t);
481
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "job_class_name", job_class_info_local.job_class_name_);
482
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "resource_consumer_group", job_class_info_local.resource_consumer_group_);
483
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "service", job_class_info_local.service_);
484
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "logging_level", job_class_info_local.logging_level_);
485
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "comments", job_class_info_local.comments_);
486

487
  OZ (job_class_info.deep_copy(allocator, job_class_info_local));
488

489
  return ret;
490
}
491

492
int ObDBMSSchedTableOperator::get_dbms_sched_job_class_info(
493
  uint64_t tenant_id, bool is_oracle_tenant, const common::ObString job_class_name,
494
  common::ObIAllocator &allocator, ObDBMSSchedJobClassInfo &job_class_info) {
495
  int ret = OB_SUCCESS;
496
  ObSqlString sql;
497
  int64_t affected_rows = 0;
498

499
  CK (OB_NOT_NULL(sql_proxy_));
500
  CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
501
  CK (OB_LIKELY(!job_class_name.empty()));
502
  OZ (sql.append_fmt("select * from %s where tenant_id = %lu and job_class_name = \'%.*s\'",
503
      OB_ALL_TENANT_SCHEDULER_JOB_CLASS_TNAME, ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id), job_class_name.length(), job_class_name.ptr()));
504
  if (OB_SUCC(ret)) {
505
    SMART_VAR(ObMySQLProxy::MySQLResult, result) {
506
      if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
507
        LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id), K(job_class_name));
508
      } else if (OB_ISNULL(result.get_result())) {
509
        ret = OB_ERR_UNEXPECTED;
510
        LOG_WARN("get result failed", K(ret), K(sql), K(tenant_id), K(job_class_name));
511
      } else {
512
        if (OB_SUCCESS == (ret = result.get_result()->next())) {
513
          OZ (extract_job_class_info(*(result.get_result()), tenant_id, is_oracle_tenant, allocator, job_class_info));
514
          if (OB_SUCC(ret)) {
515
            int tmp_ret = result.get_result()->next();
516
            if (OB_SUCCESS == tmp_ret) {
517
              ret = OB_ERR_UNEXPECTED;
518
              LOG_ERROR("got more than one row for dbms sched job class!", K(ret), K(tenant_id), K(job_class_name));
519
            } else if (tmp_ret != OB_ITER_END) {
520
              ret = tmp_ret;
521
              LOG_ERROR("got next row for dbms sched job class failed", K(ret), K(tenant_id), K(job_class_name));
522
            }
523
          }
524
        } else if (OB_ITER_END == ret) {
525
          LOG_INFO("job_class_name not exists, may delete alreay!", K(ret), K(tenant_id), K(job_class_name));
526
          ret = OB_SUCCESS; // job not exist, do nothing ...
527
        } else {
528
          LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_class_name));
529
        }
530
      }
531
    }
532
  }
533
  return ret;
534
}
535

536
int ObDBMSSchedTableOperator::register_default_job_class(uint64_t tenant_id)
537
{
538
  int ret = OB_SUCCESS;
539
  ObMySQLTransaction trans;
540
  ObSqlString sql;
541
  ObDMLSqlSplicer dml;
542
  int64_t affected_rows = 0;
543
  OZ (dml.add_pk_column("tenant_id",
544
    share::schema::ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
545
  OZ (dml.add_pk_column("job_class_name", ObHexEscapeSqlStr("DEFAULT_JOB_CLASS")));
546
  OZ (dml.add_column("log_history", 0));
547
  OZ (dml.add_column("logging_level", ObHexEscapeSqlStr("RUNS")));
548
  OZ (dml.add_column("comments", "DEFAULT_JOB_CLASS"));
549
  OZ (dml.splice_insert_sql(OB_ALL_TENANT_SCHEDULER_JOB_CLASS_TNAME, sql));
550

551
  OZ (trans.start(sql_proxy_, tenant_id, true));
552
  OZ (trans.write(tenant_id, sql.ptr(), affected_rows));
553
  CK (1 == affected_rows);
554
  int tmp_ret = OB_SUCCESS;
555
  if (trans.is_started()) {
556
    if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
557
      LOG_WARN("failed to commit trans", KR(ret), KR(tmp_ret));
558
      ret = OB_SUCC(ret) ? tmp_ret : ret;
559
    }
560
  }
561
  LOG_INFO("register default job class", K(ret), K(tenant_id));
562
  return ret;
563
}
564

565
int ObDBMSSchedTableOperator::purge_run_detail_histroy(uint64_t tenant_id)
566
{
567
  int ret = OB_SUCCESS;
568
  int64_t DAY_INTERVAL_USEC = 24 * 60 * 60 * 1000000LL;
569
  const int64_t now = ObTimeUtility::current_time();
570

571
  ObMySQLTransaction trans;
572
  ObSqlString sql;
573
  int64_t affected_rows = 0;
574

575
  CK (OB_NOT_NULL(sql_proxy_));
576
  CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
577

578
  OZ (sql.append_fmt("delete from %s where tenant_id = %ld and time < usec_to_time(%ld - NVL((select log_history from %s where %s.job_class = %s.job_class_name),0) * %ld)",
579
      OB_ALL_TENANT_SCHEDULER_JOB_RUN_DETAIL_TNAME,
580
      share::schema::ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id),
581
      now,
582
      OB_ALL_TENANT_SCHEDULER_JOB_CLASS_TNAME,
583
      OB_ALL_TENANT_SCHEDULER_JOB_RUN_DETAIL_TNAME,
584
      OB_ALL_TENANT_SCHEDULER_JOB_CLASS_TNAME,
585
      DAY_INTERVAL_USEC
586
      ));
587
  OZ (trans.start(sql_proxy_, tenant_id, true));
588
  OZ (trans.write(tenant_id, sql.ptr(), affected_rows));
589
  int tmp_ret = OB_SUCCESS;
590
  if (trans.is_started()) {
591
    if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
592
      LOG_WARN("failed to commit trans", KR(ret), KR(tmp_ret));
593
      ret = OB_SUCC(ret) ? tmp_ret : ret;
594
    }
595
  }
596
  LOG_INFO("purge run detail history", K(ret), K(tenant_id), K(sql.ptr()));
597
  return ret;
598
}
599

600
} // end for namespace dbms_scheduler
601
} // end for namespace oceanbase
602

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.