2
* Copyright (c) 2021 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
13
#define USING_LOG_PREFIX RS
15
#include "ob_dbms_job_utils.h"
17
#include "lib/ob_errno.h"
18
#include "lib/oblog/ob_log_module.h"
19
#include "lib/time/ob_time_utility.h"
20
#include "lib/string/ob_string.h"
21
#include "lib/string/ob_sql_string.h"
22
#include "lib/mysqlclient/ob_mysql_proxy.h"
23
#include "lib/mysqlclient/ob_mysql_result.h"
24
#include "lib/mysqlclient/ob_isql_client.h"
25
#include "lib/mysqlclient/ob_mysql_transaction.h"
26
#include "rpc/obrpc/ob_rpc_packet.h"
27
#include "lib/worker.h"
28
#include "share/ob_dml_sql_splicer.h"
29
#include "share/inner_table/ob_inner_table_schema_constants.h"
30
#include "share/schema/ob_schema_utils.h"
31
#include "share/schema/ob_multi_version_schema_service.h"
32
#include "observer/omt/ob_tenant_config_mgr.h"
33
#include "observer/ob_server_struct.h"
39
using namespace common;
41
using namespace share::schema;
42
using namespace sqlclient;
46
const char *ObDBMSJobInfo::__ALL_SERVER_BC = "__ALL_SERVER_BC";
47
int ObDBMSJobInfo::deep_copy(ObIAllocator &allocator, const ObDBMSJobInfo &other)
50
tenant_id_ = other.tenant_id_;
52
last_modify_ = other.last_modify_;
53
last_date_ = other.last_date_;
54
this_date_ = other.this_date_;
55
next_date_ = other.next_date_;
56
total_ = other.total_;
57
failures_ = other.failures_;
59
scheduler_flags_ = other.scheduler_flags_;
61
OZ (ob_write_string(allocator, other.lowner_, lowner_));
62
OZ (ob_write_string(allocator, other.powner_, powner_));
63
OZ (ob_write_string(allocator, other.cowner_, cowner_));
65
OZ (ob_write_string(allocator, other.interval_, interval_));
67
OZ (ob_write_string(allocator, other.what_, what_));
68
OZ (ob_write_string(allocator, other.nlsenv_, nlsenv_));
69
OZ (ob_write_string(allocator, other.charenv_, charenv_));
70
OZ (ob_write_string(allocator, other.field1_, field1_));
71
OZ (ob_write_string(allocator, other.exec_env_, exec_env_));
75
int ObDBMSJobUtils::update_for_start(
76
uint64_t tenant_id, ObDBMSJobInfo &job_info, bool update_nextdate)
82
int64_t affected_rows = 0;
83
const int64_t now = ObTimeUtility::current_time();
85
int64_t dummy_execute_at = 0;
87
CK (OB_NOT_NULL(sql_proxy_));
88
CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
89
CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID));
92
job_info, (update_nextdate ? job_info.next_date_ : dummy_execute_at), delay, true));
94
OX (job_info.this_date_ = now);
95
OZ (dml.add_gmt_modified(now));
96
OZ (dml.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
97
OZ (dml.add_pk_column("job", job_info.job_));
98
OZ (dml.add_time_column("this_date", job_info.this_date_));
99
OZ (dml.splice_update_sql(OB_ALL_JOB_TNAME, sql));
100
OZ (sql_proxy_->write(tenant_id, sql.ptr(), affected_rows));
105
int ObDBMSJobUtils::update_nextdate(
106
uint64_t tenant_id, ObDBMSJobInfo &job_info)
108
int ret = OB_SUCCESS;
112
int64_t affected_rows = 0;
113
const int64_t now = ObTimeUtility::current_time();
115
CK (OB_NOT_NULL(sql_proxy_));
116
CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
117
CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID));
119
OZ (dml.add_gmt_modified(now));
120
OZ (dml.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
121
OZ (dml.add_pk_column("job", job_info.job_));
122
OZ (dml.add_time_column("next_date", job_info.next_date_));
123
OZ (dml.splice_update_sql(OB_ALL_JOB_TNAME, sql));
124
OZ (sql_proxy_->write(tenant_id, sql.ptr(), affected_rows));
130
int ObDBMSJobUtils::update_for_end(
131
uint64_t tenant_id, ObDBMSJobInfo &job_info, int err, const ObString &errmsg)
133
int ret = OB_SUCCESS;
135
ObMySQLTransaction trans;
136
ObDMLSqlSplicer dml1;
138
ObDMLSqlSplicer dml2;
140
int64_t affected_rows = 0;
141
const int64_t now = ObTimeUtility::current_time();
147
CK (OB_NOT_NULL(sql_proxy_));
148
CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
149
CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID));
151
OX (job_info.failures_ = errmsg.empty() ? 0 : (job_info.failures_ + 1));
152
// when dbms_job run end, if failures > 16 then set broken flag else clear it.
153
OX (job_info.flag_ = job_info.failures_ > 15 ? (job_info.flag_ | 0x1) : (job_info.flag_ & 0xfffffffffffffffE));
154
if (OB_SUCC(ret) && ((job_info.flag_ & 0x1) != 0)) {
155
job_info.next_date_ = 64060560000000000; // 4000-01-01
157
CK (job_info.this_date_ > 0);
158
OX (job_info.total_ += (now - job_info.this_date_));
159
OZ (dml1.add_gmt_modified(now));
160
OZ (dml1.add_pk_column("tenant_id",
161
ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
162
OZ (dml1.add_pk_column("job", job_info.job_));
163
OZ (dml1.add_column(true, "this_date"));
164
OZ (dml1.add_time_column("last_date", job_info.this_date_));
165
OZ (dml1.add_time_column("next_date", job_info.next_date_));
166
OZ (dml1.add_column("failures", job_info.failures_));
167
OZ (dml1.add_column("flag", job_info.failures_ > 16 ? 1 : job_info.flag_));
168
OZ (dml1.add_column("total", job_info.total_));
169
OZ (dml1.splice_update_sql(OB_ALL_JOB_TNAME, sql1));
171
char exec_addr[MAX_IP_PORT_LENGTH];
173
OX (memset(exec_addr, 0, MAX_IP_PORT_LENGTH));
174
OX (offset = GCTX.self_addr().to_string(exec_addr, MAX_IP_PORT_LENGTH));
175
CK (offset < MAX_IP_PORT_LENGTH);
176
OZ (dml2.add_gmt_create(now));
177
OZ (dml2.add_gmt_modified(now));
178
OZ (dml2.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
179
OZ (dml2.add_pk_column("job", job_info.job_));
180
OZ (dml2.add_time_column("time", now));
181
OZ (dml2.add_pk_column("exec_addr", ObHexEscapeSqlStr(ObString(exec_addr))), ObString(exec_addr));
182
OZ (dml2.add_column("code", err));
184
"message", ObHexEscapeSqlStr(errmsg.empty() ? ObString("SUCCESS") : errmsg)));
185
OZ (dml2.splice_insert_sql(OB_ALL_JOB_LOG_TNAME, sql2));
187
OZ (trans.start(sql_proxy_, tenant_id, true));
189
OZ (trans.write(tenant_id, sql1.ptr(), affected_rows), sql1);
190
OZ (trans.write(tenant_id, sql2.ptr(), affected_rows), sql2);
192
if (trans.is_started()) {
193
int ret = OB_SUCCESS;
194
if (OB_FAIL(trans.end(true))) {
195
LOG_WARN("failed to end transaction", K(ret));
202
int ObDBMSJobUtils::check_job_can_running(int64_t tenant_id, bool &can_running)
204
int ret = OB_SUCCESS;
205
uint64_t job_queue_processor = 0;
206
uint64_t job_running_cnt = 0;
208
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
209
share::schema::ObSchemaGetterGuard guard;
210
bool is_restore = false;
211
OX (can_running = false);
212
CK (tenant_config.is_valid());
213
OX (job_queue_processor = tenant_config->job_queue_processes);
214
// found current running job count
215
OZ (sql.append("select count(*) from __all_job where this_date is not null"));
217
CK (OB_NOT_NULL(GCTX.schema_service_));
218
OZ (GCTX.schema_service_->get_tenant_schema_guard(tenant_id, guard));
219
OZ (guard.check_tenant_is_restore(tenant_id, is_restore));
221
// job can not run in standy cluster and restore.
222
if (OB_SUCC(ret) && job_queue_processor > 0
223
&& !GCTX.is_standby_cluster()
225
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
226
if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
227
LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id));
228
} else if (OB_NOT_NULL(result.get_result())) {
229
if (OB_SUCCESS == (ret = result.get_result()->next())) {
230
int64_t int_value = 0;
231
if (OB_FAIL(result.get_result()->get_int(static_cast<const int64_t>(0), int_value))) {
232
LOG_WARN("failed to get column in row. ", K(ret));
234
job_running_cnt = static_cast<uint64_t>(int_value);
237
LOG_WARN("failed to calc all running job, no row return", K(ret));
241
OX (can_running = (job_queue_processor > job_running_cnt));
246
int ObDBMSJobUtils::extract_info(
247
sqlclient::ObMySQLResult &result, ObIAllocator &allocator, ObDBMSJobInfo &job_info)
249
int ret = OB_SUCCESS;
250
ObDBMSJobInfo job_info_local;
252
EXTRACT_INT_FIELD_MYSQL(result, "tenant_id", job_info_local.tenant_id_, uint64_t);
253
EXTRACT_INT_FIELD_MYSQL(result, "job", job_info_local.job_, uint64_t);
254
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "lowner", job_info_local.lowner_);
255
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "powner", job_info_local.powner_);
256
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "cowner", job_info_local.cowner_);
258
#define EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, col_name, v) \
261
OZ ((result).get_obj(col_name, obj)); \
262
if (OB_SUCC(ret)) { \
263
if (obj.is_null()) { \
264
v = static_cast<int64_t>(0); \
266
OZ (obj.get_timestamp(v)); \
268
} else if (OB_ERR_COLUMN_NOT_FOUND == ret) { \
270
v = static_cast<int64_t>(0); \
274
#define EXTRACT_NUMBER_FIELD_MYSQL_SKIP_RET(result, col_name, v) \
276
common::number::ObNumber nmb_val; \
277
OZ ((result).get_number(col_name, nmb_val)); \
278
if (OB_ERR_NULL_VALUE == ret || OB_ERR_COLUMN_NOT_FOUND == ret) { \
280
v = static_cast<int64_t>(0); \
281
} else if (OB_SUCCESS == ret) { \
282
OZ (nmb_val.extract_valid_int64_with_trunc(v)); \
286
EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "gmt_modified", job_info_local.last_modify_);
287
EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "last_date", job_info_local.last_date_);
288
EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "this_date", job_info_local.this_date_);
289
EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "next_date", job_info_local.next_date_);
290
EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "total", job_info_local.total_, uint64_t);
292
#undef EXTRACT_NUMBER_FIELD_MYSQL_SKIP_RET
293
#undef EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET
295
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "interval#", job_info_local.interval_);
296
EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "failures", job_info_local.failures_, uint64_t);
297
EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "flag", job_info_local.flag_, uint64_t);
298
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "what", job_info_local.what_);
299
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "nlsenv", job_info_local.nlsenv_);
300
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "charenv", job_info_local.charenv_);
301
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "field1", job_info_local.field1_);
302
EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "scheduler_flags", job_info_local.scheduler_flags_, uint64_t);
303
EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "exec_env", job_info_local.exec_env_);
305
OZ (job_info.deep_copy(allocator, job_info_local));
310
int ObDBMSJobUtils::get_dbms_job_info(
311
uint64_t tenant_id, uint64_t job_id, ObIAllocator &allocator, ObDBMSJobInfo &job_info)
313
int ret = OB_SUCCESS;
315
int64_t affected_rows = 0;
317
CK (OB_NOT_NULL(sql_proxy_));
318
CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
319
CK (OB_LIKELY(job_id != OB_INVALID_ID));
321
OZ (sql.append_fmt("select * from %s where job = %ld", OB_ALL_JOB_TNAME, job_id));
324
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
325
if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
326
LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id), K(job_id));
327
} else if (OB_NOT_NULL(result.get_result())) {
328
if (OB_SUCCESS == (ret = result.get_result()->next())) {
329
OZ (extract_info(*(result.get_result()), allocator, job_info));
330
OX (job_info.tenant_id_ = tenant_id);
331
if (OB_SUCC(ret) && (result.get_result()->next()) != OB_ITER_END) {
332
LOG_ERROR("got more than one row for dbms job!", K(ret), K(tenant_id), K(job_id));
333
ret = OB_ERR_UNEXPECTED;
335
} else if (OB_ITER_END == ret) {
336
LOG_INFO("job not exists, may delete alreay!", K(ret), K(tenant_id), K(job_id));
337
ret = OB_SUCCESS; // job not exist, do nothing ...
339
LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_id));
347
int ObDBMSJobUtils::get_dbms_job_infos_in_tenant(
348
uint64_t tenant_id, ObIAllocator &allocator, ObIArray<ObDBMSJobInfo> &job_infos)
350
int ret = OB_SUCCESS;
352
int64_t affected_rows = 0;
354
CK (OB_NOT_NULL(sql_proxy_));
355
CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
357
OZ (sql.append_fmt("select * from %s", OB_ALL_JOB_TNAME));
360
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
361
if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
362
LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id));
363
} else if (OB_NOT_NULL(result.get_result())) {
365
if (OB_FAIL(result.get_result()->next())) {
366
if (ret != OB_ITER_END) {
367
LOG_WARN("failed to get result from result", K(ret));
370
ObDBMSJobInfo job_info;
371
OZ (extract_info(*(result.get_result()), allocator, job_info));
372
OX (job_info.tenant_id_ = tenant_id);
373
OZ (job_infos.push_back(job_info));
375
} while (OB_SUCC(ret));
376
ret = OB_ITER_END == ret ? OB_SUCCESS : ret;
384
int ObDBMSJobUtils::calc_execute_at(
385
ObDBMSJobInfo &job_info, int64_t &execute_at, int64_t &delay, bool ignore_nextdate)
387
int ret = OB_SUCCESS;
389
ObString &interval = job_info.get_interval();
391
const int64_t now = ObTimeUtility::current_time();
392
int64_t last_sub_next =
393
(job_info.get_last_modify() / 1000 / 1000) - (job_info.get_next_date() / 1000/ 1000);
394
if (job_info.get_next_date() != 0 && (!ignore_nextdate || job_info.get_next_date() != execute_at)) {
395
if (job_info.get_next_date() > now) {
396
execute_at = job_info.get_next_date();
397
delay = job_info.get_next_date() - now;
398
} else if (last_sub_next < 5 && last_sub_next >= -5) {
408
if (delay < 0 && !interval.empty()) {
410
ObOracleSqlProxy oracle_proxy(*(static_cast<ObMySQLProxy *>(sql_proxy_)));
411
// NOTE: we need utc timestamp.
413
"select to_date(to_char(sys_extract_utc(to_timestamp(to_char(sysdate, 'YYYY-MM-DD HH24:MI:SS'),"
414
"'YYYY-MM-DD HH24:MI:SS')), 'YYYY-MM-DD HH24:MI:SS'), 'YYYY-MM-DD HH24:MI:SS'),"
415
" to_date(to_char(sys_extract_utc(to_timestamp(to_char(%.*s, 'YYYY-MM-DD HH24:MI:SS'),"
416
"'YYYY-MM-DD HH24:MI:SS')), 'YYYY-MM-DD HH24:MI:SS'), 'YYYY-MM-DD HH24:MI:SS') from dual;",
417
interval.length(), interval.ptr()));
419
SMART_VAR(ObMySQLProxy::MySQLResult, result) {
420
if (OB_FAIL(oracle_proxy.read(result, job_info.get_tenant_id(), sql.ptr()))) {
421
LOG_WARN("execute query failed", K(ret), K(sql), K(job_info));
422
} else if (OB_NOT_NULL(result.get_result())) {
423
if (OB_FAIL(result.get_result()->next())) {
424
LOG_WARN("failed to get result", K(ret));
428
OZ (result.get_result()->get_datetime(col_idx, sysdate));
430
&& OB_FAIL(result.get_result()->get_datetime(col_idx + 1, execute_at))) {
431
if (OB_ERR_NULL_VALUE == ret) {
437
} else if (job_info.get_next_date() > execute_at) {
438
execute_at = job_info.get_next_date();
439
delay = execute_at - sysdate;
441
delay = execute_at - sysdate;
444
OX (job_info.next_date_ = execute_at);
445
OZ (update_nextdate(job_info.get_tenant_id(), job_info));
456
} // end for namespace dbms_job
457
} // end for namespace oceanbase