oceanbase

Форк
0
/
ob_dbms_job_utils.cpp 
457 строк · 17.4 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14

15
#include "ob_dbms_job_utils.h"
16

17
#include "lib/ob_errno.h"
18
#include "lib/oblog/ob_log_module.h"
19
#include "lib/time/ob_time_utility.h"
20
#include "lib/string/ob_string.h"
21
#include "lib/string/ob_sql_string.h"
22
#include "lib/mysqlclient/ob_mysql_proxy.h"
23
#include "lib/mysqlclient/ob_mysql_result.h"
24
#include "lib/mysqlclient/ob_isql_client.h"
25
#include "lib/mysqlclient/ob_mysql_transaction.h"
26
#include "rpc/obrpc/ob_rpc_packet.h"
27
#include "lib/worker.h"
28
#include "share/ob_dml_sql_splicer.h"
29
#include "share/inner_table/ob_inner_table_schema_constants.h"
30
#include "share/schema/ob_schema_utils.h"
31
#include "share/schema/ob_multi_version_schema_service.h"
32
#include "observer/omt/ob_tenant_config_mgr.h"
33
#include "observer/ob_server_struct.h"
34

35

36
namespace oceanbase
37
{
38

39
using namespace common;
40
using namespace share;
41
using namespace share::schema;
42
using namespace sqlclient;
43

44
namespace dbms_job
45
{
46
const char *ObDBMSJobInfo::__ALL_SERVER_BC = "__ALL_SERVER_BC";
47
int ObDBMSJobInfo::deep_copy(ObIAllocator &allocator, const ObDBMSJobInfo &other)
48
{
49
  int ret = OB_SUCCESS;
50
  tenant_id_ = other.tenant_id_;
51
  job_ = other.job_;
52
  last_modify_ = other.last_modify_;
53
  last_date_ = other.last_date_;
54
  this_date_ = other.this_date_;
55
  next_date_ = other.next_date_;
56
  total_ = other.total_;
57
  failures_ = other.failures_;
58
  flag_ = other.flag_;
59
  scheduler_flags_ = other.scheduler_flags_;
60

61
  OZ (ob_write_string(allocator, other.lowner_, lowner_));
62
  OZ (ob_write_string(allocator, other.powner_, powner_));
63
  OZ (ob_write_string(allocator, other.cowner_, cowner_));
64

65
  OZ (ob_write_string(allocator, other.interval_, interval_));
66

67
  OZ (ob_write_string(allocator, other.what_, what_));
68
  OZ (ob_write_string(allocator, other.nlsenv_, nlsenv_));
69
  OZ (ob_write_string(allocator, other.charenv_, charenv_));
70
  OZ (ob_write_string(allocator, other.field1_, field1_));
71
  OZ (ob_write_string(allocator, other.exec_env_, exec_env_));
72
  return ret;
73
}
74

75
int ObDBMSJobUtils::update_for_start(
76
  uint64_t tenant_id, ObDBMSJobInfo &job_info, bool update_nextdate)
77
{
78
  int ret = OB_SUCCESS;
79

80
  ObDMLSqlSplicer dml;
81
  ObSqlString sql;
82
  int64_t affected_rows = 0;
83
  const int64_t now = ObTimeUtility::current_time();
84
  int64_t delay = 0;
85
  int64_t dummy_execute_at = 0;
86

87
  CK (OB_NOT_NULL(sql_proxy_));
88
  CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
89
  CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID));
90

91
  OZ (calc_execute_at(
92
    job_info, (update_nextdate ? job_info.next_date_ : dummy_execute_at), delay, true));
93

94
  OX (job_info.this_date_ = now);
95
  OZ (dml.add_gmt_modified(now));
96
  OZ (dml.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
97
  OZ (dml.add_pk_column("job", job_info.job_));
98
  OZ (dml.add_time_column("this_date", job_info.this_date_));
99
  OZ (dml.splice_update_sql(OB_ALL_JOB_TNAME, sql));
100
  OZ (sql_proxy_->write(tenant_id, sql.ptr(), affected_rows));
101

102
  return ret;
103
}
104

105
int ObDBMSJobUtils::update_nextdate(
106
  uint64_t tenant_id, ObDBMSJobInfo &job_info)
107
{
108
  int ret = OB_SUCCESS;
109

110
  ObDMLSqlSplicer dml;
111
  ObSqlString sql;
112
  int64_t affected_rows = 0;
113
  const int64_t now = ObTimeUtility::current_time();
114

115
  CK (OB_NOT_NULL(sql_proxy_));
116
  CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
117
  CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID));
118

119
  OZ (dml.add_gmt_modified(now));
120
  OZ (dml.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
121
  OZ (dml.add_pk_column("job", job_info.job_));
122
  OZ (dml.add_time_column("next_date", job_info.next_date_));
123
  OZ (dml.splice_update_sql(OB_ALL_JOB_TNAME, sql));
124
  OZ (sql_proxy_->write(tenant_id, sql.ptr(), affected_rows));
125

126
  return ret;
127
}
128

129

130
int ObDBMSJobUtils::update_for_end(
131
  uint64_t tenant_id, ObDBMSJobInfo &job_info, int err, const ObString &errmsg)
132
{
133
  int ret = OB_SUCCESS;
134

135
  ObMySQLTransaction trans;
136
  ObDMLSqlSplicer dml1;
137
  ObSqlString sql1;
138
  ObDMLSqlSplicer dml2;
139
  ObSqlString sql2;
140
  int64_t affected_rows = 0;
141
  const int64_t now = ObTimeUtility::current_time();
142
  int64_t next_date;
143
  int64_t delay;
144

145
  UNUSED(errmsg);
146

147
  CK (OB_NOT_NULL(sql_proxy_));
148
  CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
149
  CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID));
150

151
  OX (job_info.failures_ = errmsg.empty() ? 0 : (job_info.failures_ + 1));
152
  // when dbms_job run end, if failures > 16 then set broken flag else clear it.
153
  OX (job_info.flag_ = job_info.failures_ > 15 ? (job_info.flag_ | 0x1) : (job_info.flag_ & 0xfffffffffffffffE));
154
  if (OB_SUCC(ret) && ((job_info.flag_ & 0x1) != 0)) {
155
    job_info.next_date_ = 64060560000000000; // 4000-01-01
156
  }
157
  CK (job_info.this_date_ > 0);
158
  OX (job_info.total_ += (now - job_info.this_date_));
159
  OZ (dml1.add_gmt_modified(now));
160
  OZ (dml1.add_pk_column("tenant_id",
161
        ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
162
  OZ (dml1.add_pk_column("job", job_info.job_));
163
  OZ (dml1.add_column(true, "this_date"));
164
  OZ (dml1.add_time_column("last_date", job_info.this_date_));
165
  OZ (dml1.add_time_column("next_date", job_info.next_date_));
166
  OZ (dml1.add_column("failures", job_info.failures_));
167
  OZ (dml1.add_column("flag", job_info.failures_ > 16 ? 1 : job_info.flag_));
168
  OZ (dml1.add_column("total", job_info.total_));
169
  OZ (dml1.splice_update_sql(OB_ALL_JOB_TNAME, sql1));
170

171
  char exec_addr[MAX_IP_PORT_LENGTH];
172
  int64_t offset = 0;
173
  OX (memset(exec_addr, 0, MAX_IP_PORT_LENGTH));
174
  OX (offset = GCTX.self_addr().to_string(exec_addr, MAX_IP_PORT_LENGTH));
175
  CK (offset < MAX_IP_PORT_LENGTH);
176
  OZ (dml2.add_gmt_create(now));
177
  OZ (dml2.add_gmt_modified(now));
178
  OZ (dml2.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
179
  OZ (dml2.add_pk_column("job", job_info.job_));
180
  OZ (dml2.add_time_column("time", now));
181
  OZ (dml2.add_pk_column("exec_addr", ObHexEscapeSqlStr(ObString(exec_addr))), ObString(exec_addr));
182
  OZ (dml2.add_column("code", err));
183
  OZ (dml2.add_column(
184
    "message", ObHexEscapeSqlStr(errmsg.empty() ? ObString("SUCCESS") : errmsg)));
185
  OZ (dml2.splice_insert_sql(OB_ALL_JOB_LOG_TNAME, sql2));
186

187
  OZ (trans.start(sql_proxy_, tenant_id, true));
188

189
  OZ (trans.write(tenant_id, sql1.ptr(), affected_rows), sql1);
190
  OZ (trans.write(tenant_id, sql2.ptr(), affected_rows), sql2);
191

192
  if (trans.is_started()) {
193
    int ret = OB_SUCCESS;
194
    if (OB_FAIL(trans.end(true))) {
195
      LOG_WARN("failed to end transaction", K(ret));
196
    }
197
  }
198

199
  return ret;
200
}
201

202
int ObDBMSJobUtils::check_job_can_running(int64_t tenant_id, bool &can_running)
203
{
204
  int ret = OB_SUCCESS;
205
  uint64_t job_queue_processor = 0;
206
  uint64_t job_running_cnt = 0;
207
  ObSqlString sql;
208
  omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
209
  share::schema::ObSchemaGetterGuard guard;
210
  bool is_restore = false;
211
  OX (can_running = false);
212
  CK (tenant_config.is_valid());
213
  OX (job_queue_processor = tenant_config->job_queue_processes);
214
  // found current running job count
215
  OZ (sql.append("select count(*) from __all_job where this_date is not null"));
216

217
  CK (OB_NOT_NULL(GCTX.schema_service_));
218
  OZ (GCTX.schema_service_->get_tenant_schema_guard(tenant_id, guard));
219
  OZ (guard.check_tenant_is_restore(tenant_id, is_restore));
220

221
  // job can not run in standy cluster and restore.
222
  if (OB_SUCC(ret) && job_queue_processor > 0
223
      && !GCTX.is_standby_cluster()
224
      && !is_restore) {
225
    SMART_VAR(ObMySQLProxy::MySQLResult, result) {
226
      if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
227
        LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id));
228
      } else if (OB_NOT_NULL(result.get_result())) {
229
        if (OB_SUCCESS == (ret = result.get_result()->next())) {
230
          int64_t int_value = 0;
231
          if (OB_FAIL(result.get_result()->get_int(static_cast<const int64_t>(0), int_value))) {
232
            LOG_WARN("failed to get column in row. ", K(ret));
233
          } else {
234
            job_running_cnt = static_cast<uint64_t>(int_value);
235
          }
236
        } else {
237
          LOG_WARN("failed to calc all running job, no row return", K(ret));
238
        }
239
      }
240
    }
241
    OX (can_running = (job_queue_processor > job_running_cnt));
242
  }
243
  return ret;
244
}
245

246
int ObDBMSJobUtils::extract_info(
247
  sqlclient::ObMySQLResult &result, ObIAllocator &allocator, ObDBMSJobInfo &job_info)
248
{
249
  int ret = OB_SUCCESS;
250
  ObDBMSJobInfo job_info_local;
251

252
  EXTRACT_INT_FIELD_MYSQL(result, "tenant_id", job_info_local.tenant_id_, uint64_t);
253
  EXTRACT_INT_FIELD_MYSQL(result, "job", job_info_local.job_, uint64_t);
254
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "lowner", job_info_local.lowner_);
255
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "powner", job_info_local.powner_);
256
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "cowner", job_info_local.cowner_);
257

258
#define EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, col_name, v)   \
259
do {                                                                  \
260
  ObObj obj;                                                          \
261
  OZ ((result).get_obj(col_name, obj));                               \
262
  if (OB_SUCC(ret)) {                                                 \
263
    if (obj.is_null()) {                                              \
264
      v = static_cast<int64_t>(0);                                    \
265
    } else {                                                          \
266
      OZ (obj.get_timestamp(v));                                      \
267
    }                                                                 \
268
  } else if (OB_ERR_COLUMN_NOT_FOUND == ret) {                        \
269
    ret = OB_SUCCESS;                                                 \
270
    v = static_cast<int64_t>(0);                                      \
271
  }                                                                   \
272
} while (false)
273

274
#define EXTRACT_NUMBER_FIELD_MYSQL_SKIP_RET(result, col_name, v)      \
275
do {                                                                  \
276
  common::number::ObNumber nmb_val;                                   \
277
  OZ ((result).get_number(col_name, nmb_val));                        \
278
  if (OB_ERR_NULL_VALUE == ret || OB_ERR_COLUMN_NOT_FOUND == ret) {   \
279
    ret = OB_SUCCESS;                                                 \
280
    v = static_cast<int64_t>(0);                                     \
281
  } else if (OB_SUCCESS == ret) {                                     \
282
    OZ (nmb_val.extract_valid_int64_with_trunc(v));                  \
283
  }                                                                   \
284
} while (false)
285

286
  EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "gmt_modified", job_info_local.last_modify_);
287
  EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "last_date", job_info_local.last_date_);
288
  EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "this_date", job_info_local.this_date_);
289
  EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "next_date", job_info_local.next_date_);
290
  EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "total", job_info_local.total_, uint64_t);
291

292
#undef EXTRACT_NUMBER_FIELD_MYSQL_SKIP_RET
293
#undef EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET
294

295
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "interval#", job_info_local.interval_);
296
  EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "failures", job_info_local.failures_, uint64_t);
297
  EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "flag", job_info_local.flag_, uint64_t);
298
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "what", job_info_local.what_);
299
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "nlsenv", job_info_local.nlsenv_);
300
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "charenv", job_info_local.charenv_);
301
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "field1", job_info_local.field1_);
302
  EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "scheduler_flags", job_info_local.scheduler_flags_, uint64_t);
303
  EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "exec_env", job_info_local.exec_env_);
304

305
  OZ (job_info.deep_copy(allocator, job_info_local));
306

307
  return ret;
308
}
309

310
int ObDBMSJobUtils::get_dbms_job_info(
311
  uint64_t tenant_id, uint64_t job_id, ObIAllocator &allocator, ObDBMSJobInfo &job_info)
312
{
313
  int ret = OB_SUCCESS;
314
  ObSqlString sql;
315
  int64_t affected_rows = 0;
316

317
  CK (OB_NOT_NULL(sql_proxy_));
318
  CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
319
  CK (OB_LIKELY(job_id != OB_INVALID_ID));
320

321
  OZ (sql.append_fmt("select * from %s where job = %ld", OB_ALL_JOB_TNAME, job_id));
322

323
  if (OB_SUCC(ret)) {
324
    SMART_VAR(ObMySQLProxy::MySQLResult, result) {
325
      if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
326
        LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id), K(job_id));
327
      } else if (OB_NOT_NULL(result.get_result())) {
328
        if (OB_SUCCESS == (ret = result.get_result()->next())) {
329
          OZ (extract_info(*(result.get_result()), allocator, job_info));
330
          OX (job_info.tenant_id_ = tenant_id);
331
          if (OB_SUCC(ret) && (result.get_result()->next()) != OB_ITER_END) {
332
            LOG_ERROR("got more than one row for dbms job!", K(ret), K(tenant_id), K(job_id));
333
            ret = OB_ERR_UNEXPECTED;
334
          }
335
        } else if (OB_ITER_END == ret) {
336
          LOG_INFO("job not exists, may delete alreay!", K(ret), K(tenant_id), K(job_id));
337
          ret = OB_SUCCESS; // job not exist, do nothing ...
338
        } else {
339
          LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_id));
340
        }
341
      }
342
    }
343
  }
344
  return ret;
345
}
346

347
int ObDBMSJobUtils::get_dbms_job_infos_in_tenant(
348
  uint64_t tenant_id, ObIAllocator &allocator, ObIArray<ObDBMSJobInfo> &job_infos)
349
{
350
  int ret = OB_SUCCESS;
351
  ObSqlString sql;
352
  int64_t affected_rows = 0;
353

354
  CK (OB_NOT_NULL(sql_proxy_));
355
  CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
356

357
  OZ (sql.append_fmt("select * from %s", OB_ALL_JOB_TNAME));
358

359
  if (OB_SUCC(ret)) {
360
    SMART_VAR(ObMySQLProxy::MySQLResult, result) {
361
      if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
362
        LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id));
363
      } else if (OB_NOT_NULL(result.get_result())) {
364
        do {
365
          if (OB_FAIL(result.get_result()->next())) {
366
            if (ret != OB_ITER_END) {
367
              LOG_WARN("failed to get result from result", K(ret));
368
            }
369
          } else {
370
            ObDBMSJobInfo job_info;
371
            OZ (extract_info(*(result.get_result()), allocator, job_info));
372
            OX (job_info.tenant_id_ = tenant_id);
373
            OZ (job_infos.push_back(job_info));
374
          }
375
        } while (OB_SUCC(ret));
376
        ret = OB_ITER_END == ret ? OB_SUCCESS : ret;
377
      }
378
    }
379
  }
380

381
  return ret;
382
}
383

384
int ObDBMSJobUtils::calc_execute_at(
385
  ObDBMSJobInfo &job_info, int64_t &execute_at, int64_t &delay, bool ignore_nextdate)
386
{
387
  int ret = OB_SUCCESS;
388

389
  ObString &interval = job_info.get_interval();
390

391
  const int64_t now = ObTimeUtility::current_time();
392
  int64_t last_sub_next =
393
    (job_info.get_last_modify() / 1000 / 1000) - (job_info.get_next_date() / 1000/ 1000);
394
  if (job_info.get_next_date() != 0 && (!ignore_nextdate || job_info.get_next_date() != execute_at)) {
395
    if (job_info.get_next_date() > now) {
396
      execute_at = job_info.get_next_date();
397
      delay = job_info.get_next_date() - now;
398
    } else if (last_sub_next < 5 && last_sub_next >= -5) {
399
      execute_at = now;
400
      delay = 0;
401
    } else {
402
      delay = -1;
403
    }
404
  } else {
405
    delay = -1;
406
  }
407

408
  if (delay < 0 && !interval.empty()) {
409
    ObSqlString sql;
410
    ObOracleSqlProxy oracle_proxy(*(static_cast<ObMySQLProxy *>(sql_proxy_)));
411
    // NOTE: we need utc timestamp.
412
    OZ (sql.append_fmt(
413
      "select to_date(to_char(sys_extract_utc(to_timestamp(to_char(sysdate, 'YYYY-MM-DD HH24:MI:SS'),"
414
               "'YYYY-MM-DD HH24:MI:SS')), 'YYYY-MM-DD HH24:MI:SS'), 'YYYY-MM-DD HH24:MI:SS'),"
415
      " to_date(to_char(sys_extract_utc(to_timestamp(to_char(%.*s, 'YYYY-MM-DD HH24:MI:SS'),"
416
                "'YYYY-MM-DD HH24:MI:SS')), 'YYYY-MM-DD HH24:MI:SS'), 'YYYY-MM-DD HH24:MI:SS') from dual;",
417
      interval.length(), interval.ptr()));
418
    if (OB_SUCC(ret)) {
419
      SMART_VAR(ObMySQLProxy::MySQLResult, result) {
420
        if (OB_FAIL(oracle_proxy.read(result, job_info.get_tenant_id(), sql.ptr()))) {
421
          LOG_WARN("execute query failed", K(ret), K(sql), K(job_info));
422
        } else if (OB_NOT_NULL(result.get_result())) {
423
          if (OB_FAIL(result.get_result()->next())) {
424
            LOG_WARN("failed to get result", K(ret));
425
          } else {
426
            int64_t sysdate = 0;
427
            int64_t col_idx = 0;
428
            OZ (result.get_result()->get_datetime(col_idx, sysdate));
429
            if (OB_SUCC(ret)
430
                && OB_FAIL(result.get_result()->get_datetime(col_idx + 1, execute_at))) {
431
              if (OB_ERR_NULL_VALUE == ret) {
432
                ret = OB_SUCCESS;
433
                delay = -1;
434
              }
435
            }
436
            if (OB_FAIL(ret)) {
437
            } else if (job_info.get_next_date() > execute_at) {
438
              execute_at = job_info.get_next_date();
439
              delay = execute_at - sysdate;
440
            } else {
441
              delay = execute_at - sysdate;
442
            }
443
            if (OB_SUCC(ret)) {
444
              OX (job_info.next_date_ = execute_at);
445
              OZ (update_nextdate(job_info.get_tenant_id(), job_info));
446
            }
447
          }
448
        }
449
      }
450
    }
451
  }
452

453
  return ret;
454
}
455

456
} // end for namespace dbms_job
457
} // end for namespace oceanbase
458

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.