oceanbase
601 строка · 24.6 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS
14
15#include "ob_dbms_sched_table_operator.h"
16
17#include "lib/ob_errno.h"
18#include "lib/oblog/ob_log_module.h"
19#include "lib/time/ob_time_utility.h"
20#include "lib/string/ob_string.h"
21#include "lib/string/ob_sql_string.h"
22#include "lib/mysqlclient/ob_mysql_proxy.h"
23#include "lib/mysqlclient/ob_mysql_result.h"
24#include "lib/mysqlclient/ob_isql_client.h"
25#include "lib/mysqlclient/ob_mysql_transaction.h"
26#include "rpc/obrpc/ob_rpc_packet.h"
27#include "lib/worker.h"
28#include "share/ob_dml_sql_splicer.h"
29#include "share/inner_table/ob_inner_table_schema_constants.h"
30#include "share/schema/ob_schema_utils.h"
31#include "observer/omt/ob_tenant_config_mgr.h"
32#include "observer/ob_server_struct.h"
33#include "observer/dbms_scheduler/ob_dbms_sched_job_utils.h"
34#include "share/schema/ob_multi_version_schema_service.h"
35
36
37namespace oceanbase
38{
39
40using namespace common;
41using namespace share;
42using namespace share::schema;
43using namespace sqlclient;
44using namespace storage;
45
46namespace dbms_scheduler
47{
48
49int ObDBMSSchedTableOperator::update_next_date(
50uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, int64_t next_date)
51{
52int ret = OB_SUCCESS;
53
54ObDMLSqlSplicer dml;
55ObSqlString sql;
56int64_t affected_rows = 0;
57const int64_t now = ObTimeUtility::current_time();
58
59CK (OB_NOT_NULL(sql_proxy_));
60CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
61CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID));
62
63OZ (dml.add_gmt_modified(now));
64OZ (dml.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
65OZ (dml.add_pk_column("job", job_info.job_));
66OZ (dml.add_pk_column("job_name", job_info.job_name_));
67OZ (dml.add_time_column("next_date", next_date));
68OZ (dml.splice_update_sql(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, sql));
69OZ (sql_proxy_->write(tenant_id, sql.ptr(), affected_rows));
70return ret;
71}
72
73
74int ObDBMSSchedTableOperator::update_for_start(
75uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, int64_t next_date)
76{
77int ret = OB_SUCCESS;
78
79ObDMLSqlSplicer dml;
80ObSqlString sql;
81int64_t affected_rows = 0;
82const int64_t now = ObTimeUtility::current_time();
83
84CK (OB_NOT_NULL(sql_proxy_));
85CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
86CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID));
87
88OX (job_info.this_date_ = now);
89OZ (dml.add_gmt_modified(now));
90OZ (dml.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
91OZ (dml.add_pk_column("job", job_info.job_));
92OZ (dml.add_pk_column("job_name", job_info.job_name_));
93OZ (dml.add_time_column("this_date", job_info.this_date_));
94OZ (dml.add_time_column("next_date", next_date));
95OZ (dml.add_column("state", "SCHEDULED"));
96OZ (dml.splice_update_sql(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, sql));
97OZ (sql.append_fmt(" and this_date is null"));
98OZ (sql_proxy_->write(tenant_id, sql.ptr(), affected_rows));
99CK (affected_rows == 1);
100return ret;
101}
102
103int ObDBMSSchedTableOperator::seperate_job_id_from_name(ObString &job_name, int64_t &job_id)
104{
105int ret = OB_SUCCESS;
106const char* prefix = "JOB$_";
107job_id = 0;
108if (job_name.prefix_match(prefix)) {
109char nptr[JOB_NAME_MAX_SIZE];
110char *endptr = NULL;
111snprintf(nptr, JOB_NAME_MAX_SIZE, "%.*s", job_name.length(), job_name.ptr());
112job_id = strtoll(nptr + strlen(prefix), &endptr, 10);
113if (job_id <= 0) {
114LOG_WARN("job_id is not right", K(job_name), K(nptr), K(job_id));
115} else if (*endptr != '\0' || job_id <= JOB_ID_OFFSET) {
116job_id = 0; // use job_info.job_ when job_id is not formal
117}
118}
119return ret;
120}
121
122
123int ObDBMSSchedTableOperator::update_for_end(
124uint64_t tenant_id, ObDBMSSchedJobInfo &job_info, int err, const ObString &errmsg)
125{
126int ret = OB_SUCCESS;
127
128ObMySQLTransaction trans;
129ObDMLSqlSplicer dml1;
130ObSqlString sql1;
131ObDMLSqlSplicer dml2;
132ObSqlString sql2;
133int64_t affected_rows = 0;
134const int64_t now = ObTimeUtility::current_time();
135
136UNUSED(errmsg);
137
138CK (OB_NOT_NULL(sql_proxy_));
139CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
140CK (OB_LIKELY(job_info.job_ != OB_INVALID_ID));
141
142uint64_t data_version = 0;
143if (OB_SUCC(ret)) {
144if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id, data_version))) {
145LOG_WARN("fail to get tenant data version", KR(ret), K(data_version));
146} else if (MOCK_DATA_VERSION <= data_version) {
147CK (OB_LIKELY(!job_info.job_class_.empty()));
148}
149}
150
151ObDBMSSchedJobClassInfo job_class_info;
152ObArenaAllocator allocator("DBMSSchedTmp");
153if (MOCK_DATA_VERSION <= data_version) {
154OZ (get_dbms_sched_job_class_info(tenant_id, job_info.is_oracle_tenant(), job_info.get_job_class(), allocator, job_class_info));
155}
156// when if failures > 16 then set broken flag.
157OX (job_info.failures_ = errmsg.empty() ? 0 : (job_info.failures_ + 1));
158OX (job_info.flag_ = job_info.failures_ > 15 ? (job_info.flag_ | 0x1) : (job_info.flag_ & 0xfffffffffffffffE));
159if ((now >= job_info.end_date_ || job_info.get_interval_ts() == 0) && (true == job_info.auto_drop_)) {
160// when end_date is reach or no interval set, and auto_drop is set true, drop job.
161OZ (dml1.add_gmt_modified(now));
162OZ (dml1.add_pk_column("tenant_id",
163ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
164OZ (dml1.add_pk_column("job_name", job_info.job_name_));
165OZ (dml1.splice_delete_sql(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, sql1));
166} else {
167if (OB_SUCC(ret) && ((job_info.flag_ & 0x1) != 0)) {
168// when if failures > 16 then set broken state.
169job_info.next_date_ = 64060560000000000; // 4000-01-01
170OZ (dml1.add_column("state", "BROKEN"));
171} else if (now >= job_info.end_date_) {
172// when end_date is reach and auto_drop is set false, disable set completed state.
173job_info.enabled_ = false;
174OZ (dml1.add_column("state", "COMPLETED"));
175OZ (dml1.add_column("enabled", job_info.enabled_));
176}
177CK (job_info.this_date_ > 0 || !errmsg.empty());
178OX (job_info.total_ += (job_info.this_date_ > 0 ? now - job_info.this_date_ : 0));
179OZ (dml1.add_gmt_modified(now));
180OZ (dml1.add_pk_column("tenant_id",
181ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
182OZ (dml1.add_pk_column("job", job_info.job_));
183OZ (dml1.add_pk_column("job_name", job_info.job_name_));
184OZ (dml1.add_column(true, "this_date"));
185OZ (dml1.add_time_column("last_date", job_info.this_date_));
186OZ (dml1.add_time_column("next_date", job_info.next_date_));
187OZ (dml1.add_column("failures", job_info.failures_));
188OZ (dml1.add_column("flag", job_info.failures_ > 16 ? 1 : job_info.flag_));
189OZ (dml1.add_column("total", job_info.total_));
190OZ (dml1.splice_update_sql(OB_ALL_TENANT_SCHEDULER_JOB_TNAME, sql1));
191}
192
193//If a non-existent JOB CLASS is entered when creating a JOB,
194//job_run_detail still needs to be recorded.
195bool need_write_job_run_detail = true;
196if (MOCK_DATA_VERSION <= data_version) {
197ObString logging_level = job_class_info.get_logging_level();
198if (logging_level.empty()) {
199LOG_WARN("logging_level may not assigned");
200} else if (0 == logging_level.case_compare("OFF")) {
201need_write_job_run_detail = false;
202} else if (0 == logging_level.case_compare("RUNS")) {
203need_write_job_run_detail = true;
204} else if (0 == logging_level.case_compare("FAILED RUNS") && !errmsg.empty()) {
205need_write_job_run_detail = true;
206}
207}
208
209if (need_write_job_run_detail) {
210OZ (dml2.add_gmt_create(now));
211OZ (dml2.add_gmt_modified(now));
212OZ (dml2.add_pk_column("tenant_id", ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
213int64_t job_id = 0;
214OZ (seperate_job_id_from_name(job_info.get_job_name(), job_id));
215if (job_id <= 0) {
216job_id = job_info.get_job_id();
217}
218OZ (dml2.add_pk_column("job", job_id));
219OZ (dml2.add_time_column("time", now));
220OZ (dml2.add_column("code", err));
221OZ (dml2.add_column(
222"message", ObHexEscapeSqlStr(errmsg.empty() ? ObString("SUCCESS") : errmsg)));
223if (MOCK_DATA_VERSION <= data_version) {
224OZ (dml2.add_column("job_class", job_info.job_class_));
225}
226OZ (dml2.splice_insert_sql(OB_ALL_TENANT_SCHEDULER_JOB_RUN_DETAIL_TNAME, sql2));
227}
228
229OZ (trans.start(sql_proxy_, tenant_id, true));
230
231OZ (trans.write(tenant_id, sql1.ptr(), affected_rows));
232if (need_write_job_run_detail) {
233OZ (trans.write(tenant_id, sql2.ptr(), affected_rows));
234}
235
236int tmp_ret = OB_SUCCESS;
237if (trans.is_started()) {
238if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
239LOG_WARN("failed to commit trans", KR(ret), KR(tmp_ret));
240ret = OB_SUCC(ret) ? tmp_ret : ret;
241}
242}
243
244return ret;
245}
246
247int ObDBMSSchedTableOperator::check_job_can_running(int64_t tenant_id, int64_t alive_job_count, bool &can_running)
248{
249int ret = OB_SUCCESS;
250uint64_t job_queue_processor = 0;
251uint64_t job_running_cnt = 0;
252ObSqlString sql;
253omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
254share::schema::ObSchemaGetterGuard guard;
255bool is_restore = false;
256OX (can_running = false);
257CK (tenant_config.is_valid());
258OX (job_queue_processor = tenant_config->job_queue_processes);
259// found current running job count
260if (OB_FAIL(ret)) {
261} else if (alive_job_count <= job_queue_processor) {
262can_running = true;
263} else {
264OZ (sql.append_fmt("select count(*) from %s where this_date is not null", OB_ALL_TENANT_SCHEDULER_JOB_TNAME));
265
266CK (OB_NOT_NULL(GCTX.schema_service_));
267OZ (GCTX.schema_service_->get_tenant_schema_guard(tenant_id, guard));
268OZ (guard.check_tenant_is_restore(tenant_id, is_restore));
269
270// job can not run in standy cluster and restore.
271if (OB_SUCC(ret) && job_queue_processor > 0
272&& !is_restore) {
273SMART_VAR(ObMySQLProxy::MySQLResult, result) {
274if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
275LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id));
276} else if (OB_ISNULL(result.get_result())) {
277ret = OB_ERR_UNEXPECTED;
278LOG_WARN("get result failed", K(ret), K(sql), K(tenant_id));
279} else {
280if (OB_SUCCESS == (ret = result.get_result()->next())) {
281int64_t int_value = 0;
282if (OB_FAIL(result.get_result()->get_int(static_cast<const int64_t>(0), int_value))) {
283LOG_WARN("failed to get column in row. ", K(ret));
284} else {
285job_running_cnt = static_cast<uint64_t>(int_value);
286}
287} else {
288LOG_WARN("failed to calc all running job, no row return", K(ret));
289}
290}
291}
292OX (can_running = (job_queue_processor > job_running_cnt));
293}
294}
295return ret;
296}
297
298int ObDBMSSchedTableOperator::extract_info(
299sqlclient::ObMySQLResult &result, int64_t tenant_id, bool is_oracle_tenant,
300ObIAllocator &allocator, ObDBMSSchedJobInfo &job_info)
301{
302int ret = OB_SUCCESS;
303ObDBMSSchedJobInfo job_info_local;
304
305job_info_local.tenant_id_ = tenant_id;
306job_info_local.is_oracle_tenant_ = is_oracle_tenant;
307EXTRACT_INT_FIELD_MYSQL(result, "job", job_info_local.job_, uint64_t);
308EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "lowner", job_info_local.lowner_);
309EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "powner", job_info_local.powner_);
310EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "cowner", job_info_local.cowner_);
311
312#define EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, col_name, v) \
313do { \
314ObObj obj; \
315OZ ((result).get_obj(col_name, obj)); \
316if (OB_SUCC(ret)) { \
317if (obj.is_null()) { \
318v = static_cast<int64_t>(0); \
319} else { \
320OZ (obj.get_timestamp(v)); \
321} \
322} else if (OB_ERR_COLUMN_NOT_FOUND == ret) { \
323ret = OB_SUCCESS; \
324v = static_cast<int64_t>(0); \
325} \
326} while (false)
327
328#define EXTRACT_NUMBER_FIELD_MYSQL_SKIP_RET(result, col_name, v) \
329do { \
330common::number::ObNumber nmb_val; \
331OZ ((result).get_number(col_name, nmb_val)); \
332if (OB_ERR_NULL_VALUE == ret || OB_ERR_COLUMN_NOT_FOUND == ret) { \
333ret = OB_SUCCESS; \
334v = static_cast<int64_t>(0); \
335} else if (OB_SUCCESS == ret) { \
336OZ (nmb_val.extract_valid_int64_with_trunc(v)); \
337} \
338} while (false)
339
340EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "gmt_modified", job_info_local.last_modify_);
341EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "last_date", job_info_local.last_date_);
342EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "this_date", job_info_local.this_date_);
343EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "next_date", job_info_local.next_date_);
344EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "total", job_info_local.total_, uint64_t);
345EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "start_date", job_info_local.start_date_);
346EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET(result, "end_date", job_info_local.end_date_);
347
348#undef EXTRACT_NUMBER_FIELD_MYSQL_SKIP_RET
349#undef EXTRACT_TIMESTAMP_FIELD_MYSQL_SKIP_RET
350
351EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "interval#", job_info_local.interval_);
352EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "failures", job_info_local.failures_, uint64_t);
353EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "flag", job_info_local.flag_, uint64_t);
354EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "what", job_info_local.what_);
355EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "nlsenv", job_info_local.nlsenv_);
356EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "charenv", job_info_local.charenv_);
357EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "field1", job_info_local.field1_);
358EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "scheduler_flags", job_info_local.scheduler_flags_, uint64_t);
359EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "exec_env", job_info_local.exec_env_);
360EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "job_name", job_info_local.job_name_);
361EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "job_class", job_info_local.job_class_);
362EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "program_name", job_info_local.program_name_);
363EXTRACT_BOOL_FIELD_MYSQL_SKIP_RET(result, "enabled", job_info_local.enabled_);
364EXTRACT_BOOL_FIELD_MYSQL_SKIP_RET(result, "auto_drop", job_info_local.auto_drop_);
365EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "interval_ts", job_info_local.interval_ts_, uint64_t);
366EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "max_run_duration", job_info_local.max_run_duration_, uint64_t);
367
368OZ (job_info.deep_copy(allocator, job_info_local));
369
370return ret;
371}
372
373int ObDBMSSchedTableOperator::get_dbms_sched_job_info(
374uint64_t tenant_id, bool is_oracle_tenant, uint64_t job_id, const common::ObString &job_name,
375ObIAllocator &allocator, ObDBMSSchedJobInfo &job_info)
376{
377int ret = OB_SUCCESS;
378ObSqlString sql;
379int64_t affected_rows = 0;
380
381CK (OB_NOT_NULL(sql_proxy_));
382CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
383CK (OB_LIKELY(job_id != OB_INVALID_ID));
384
385if (!job_name.empty()) {
386OZ (sql.append_fmt("select * from %s where tenant_id = %lu and job_name = \'%.*s\' and job = %ld",
387OB_ALL_TENANT_SCHEDULER_JOB_TNAME,
388ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id),
389job_name.length(),
390job_name.ptr(),
391job_id));
392} else {
393OZ (sql.append_fmt("select * from %s where tenant_id = %lu and job = %ld",
394OB_ALL_TENANT_SCHEDULER_JOB_TNAME,
395ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id),
396job_id));
397}
398
399
400if (OB_SUCC(ret)) {
401SMART_VAR(ObMySQLProxy::MySQLResult, result) {
402if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
403LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id), K(job_id));
404} else if (OB_ISNULL(result.get_result())) {
405ret = OB_ERR_UNEXPECTED;
406LOG_WARN("failed to get result", K(ret), K(tenant_id), K(job_id));
407} else {
408if (OB_SUCCESS == (ret = result.get_result()->next())) {
409OZ (extract_info(*(result.get_result()), tenant_id, is_oracle_tenant, allocator, job_info));
410if (OB_SUCC(ret)) {
411int tmp_ret = result.get_result()->next();
412if (OB_SUCCESS == tmp_ret) {
413ret = OB_ERR_UNEXPECTED;
414LOG_ERROR("got more than one row for dbms sched job!", K(ret), K(tenant_id), K(job_id));
415} else if (tmp_ret != OB_ITER_END) {
416ret = tmp_ret;
417LOG_ERROR("got next row for dbms sched job failed", K(ret), K(tenant_id), K(job_id));
418}
419}
420} else if (OB_ITER_END == ret) {
421LOG_WARN("job not exists, may delete alreay!", K(ret), K(tenant_id), K(job_id));
422} else {
423LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_id));
424}
425}
426}
427}
428return ret;
429}
430
431int ObDBMSSchedTableOperator::get_dbms_sched_job_infos_in_tenant(
432uint64_t tenant_id, bool is_oracle_tenant, ObIAllocator &allocator, ObIArray<ObDBMSSchedJobInfo> &job_infos)
433{
434int ret = OB_SUCCESS;
435ObSqlString sql;
436int64_t affected_rows = 0;
437
438CK (OB_NOT_NULL(sql_proxy_));
439CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
440
441OZ (sql.append_fmt("select * from %s where job > 0 and job_name != \'%s\' and (state is NULL or state != \'%s\')",
442OB_ALL_TENANT_SCHEDULER_JOB_TNAME,
443"__dummy_guard",
444"COMPLETED"));
445
446if (OB_SUCC(ret)) {
447SMART_VAR(ObMySQLProxy::MySQLResult, result) {
448if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
449LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id));
450} else if (OB_ISNULL(result.get_result())) {
451ret = OB_ERR_UNEXPECTED;
452LOG_WARN("get result failed", K(ret), K(sql), K(tenant_id));
453} else {
454do {
455if (OB_FAIL(result.get_result()->next())) {
456LOG_INFO("failed to get result", K(ret));
457} else {
458ObDBMSSchedJobInfo job_info;
459OZ (extract_info(*(result.get_result()), tenant_id, is_oracle_tenant, allocator, job_info));
460OZ (job_infos.push_back(job_info));
461}
462} while (OB_SUCC(ret));
463ret = OB_ITER_END == ret ? OB_SUCCESS : ret;
464}
465}
466}
467
468return ret;
469}
470
471int ObDBMSSchedTableOperator::extract_job_class_info(
472sqlclient::ObMySQLResult &result, int64_t tenant_id, bool is_oracle_tenant,
473ObIAllocator &allocator, ObDBMSSchedJobClassInfo &job_class_info)
474{
475int ret = OB_SUCCESS;
476ObDBMSSchedJobClassInfo job_class_info_local;
477
478job_class_info_local.tenant_id_ = tenant_id;
479job_class_info_local.is_oracle_tenant_ = is_oracle_tenant;
480// EXTRACT_INT_FIELD_MYSQL_SKIP_RET(result, "log_history", job_class_info_local.log_history_, uint64_t);
481EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "job_class_name", job_class_info_local.job_class_name_);
482EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "resource_consumer_group", job_class_info_local.resource_consumer_group_);
483EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "service", job_class_info_local.service_);
484EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "logging_level", job_class_info_local.logging_level_);
485EXTRACT_VARCHAR_FIELD_MYSQL_SKIP_RET(result, "comments", job_class_info_local.comments_);
486
487OZ (job_class_info.deep_copy(allocator, job_class_info_local));
488
489return ret;
490}
491
492int ObDBMSSchedTableOperator::get_dbms_sched_job_class_info(
493uint64_t tenant_id, bool is_oracle_tenant, const common::ObString job_class_name,
494common::ObIAllocator &allocator, ObDBMSSchedJobClassInfo &job_class_info) {
495int ret = OB_SUCCESS;
496ObSqlString sql;
497int64_t affected_rows = 0;
498
499CK (OB_NOT_NULL(sql_proxy_));
500CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
501CK (OB_LIKELY(!job_class_name.empty()));
502OZ (sql.append_fmt("select * from %s where tenant_id = %lu and job_class_name = \'%.*s\'",
503OB_ALL_TENANT_SCHEDULER_JOB_CLASS_TNAME, ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id), job_class_name.length(), job_class_name.ptr()));
504if (OB_SUCC(ret)) {
505SMART_VAR(ObMySQLProxy::MySQLResult, result) {
506if (OB_FAIL(sql_proxy_->read(result, tenant_id, sql.ptr()))) {
507LOG_WARN("execute query failed", K(ret), K(sql), K(tenant_id), K(job_class_name));
508} else if (OB_ISNULL(result.get_result())) {
509ret = OB_ERR_UNEXPECTED;
510LOG_WARN("get result failed", K(ret), K(sql), K(tenant_id), K(job_class_name));
511} else {
512if (OB_SUCCESS == (ret = result.get_result()->next())) {
513OZ (extract_job_class_info(*(result.get_result()), tenant_id, is_oracle_tenant, allocator, job_class_info));
514if (OB_SUCC(ret)) {
515int tmp_ret = result.get_result()->next();
516if (OB_SUCCESS == tmp_ret) {
517ret = OB_ERR_UNEXPECTED;
518LOG_ERROR("got more than one row for dbms sched job class!", K(ret), K(tenant_id), K(job_class_name));
519} else if (tmp_ret != OB_ITER_END) {
520ret = tmp_ret;
521LOG_ERROR("got next row for dbms sched job class failed", K(ret), K(tenant_id), K(job_class_name));
522}
523}
524} else if (OB_ITER_END == ret) {
525LOG_INFO("job_class_name not exists, may delete alreay!", K(ret), K(tenant_id), K(job_class_name));
526ret = OB_SUCCESS; // job not exist, do nothing ...
527} else {
528LOG_WARN("failed to get next", K(ret), K(tenant_id), K(job_class_name));
529}
530}
531}
532}
533return ret;
534}
535
536int ObDBMSSchedTableOperator::register_default_job_class(uint64_t tenant_id)
537{
538int ret = OB_SUCCESS;
539ObMySQLTransaction trans;
540ObSqlString sql;
541ObDMLSqlSplicer dml;
542int64_t affected_rows = 0;
543OZ (dml.add_pk_column("tenant_id",
544share::schema::ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id)));
545OZ (dml.add_pk_column("job_class_name", ObHexEscapeSqlStr("DEFAULT_JOB_CLASS")));
546OZ (dml.add_column("log_history", 0));
547OZ (dml.add_column("logging_level", ObHexEscapeSqlStr("RUNS")));
548OZ (dml.add_column("comments", "DEFAULT_JOB_CLASS"));
549OZ (dml.splice_insert_sql(OB_ALL_TENANT_SCHEDULER_JOB_CLASS_TNAME, sql));
550
551OZ (trans.start(sql_proxy_, tenant_id, true));
552OZ (trans.write(tenant_id, sql.ptr(), affected_rows));
553CK (1 == affected_rows);
554int tmp_ret = OB_SUCCESS;
555if (trans.is_started()) {
556if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
557LOG_WARN("failed to commit trans", KR(ret), KR(tmp_ret));
558ret = OB_SUCC(ret) ? tmp_ret : ret;
559}
560}
561LOG_INFO("register default job class", K(ret), K(tenant_id));
562return ret;
563}
564
565int ObDBMSSchedTableOperator::purge_run_detail_histroy(uint64_t tenant_id)
566{
567int ret = OB_SUCCESS;
568int64_t DAY_INTERVAL_USEC = 24 * 60 * 60 * 1000000LL;
569const int64_t now = ObTimeUtility::current_time();
570
571ObMySQLTransaction trans;
572ObSqlString sql;
573int64_t affected_rows = 0;
574
575CK (OB_NOT_NULL(sql_proxy_));
576CK (OB_LIKELY(tenant_id != OB_INVALID_ID));
577
578OZ (sql.append_fmt("delete from %s where tenant_id = %ld and time < usec_to_time(%ld - NVL((select log_history from %s where %s.job_class = %s.job_class_name),0) * %ld)",
579OB_ALL_TENANT_SCHEDULER_JOB_RUN_DETAIL_TNAME,
580share::schema::ObSchemaUtils::get_extract_tenant_id(tenant_id, tenant_id),
581now,
582OB_ALL_TENANT_SCHEDULER_JOB_CLASS_TNAME,
583OB_ALL_TENANT_SCHEDULER_JOB_RUN_DETAIL_TNAME,
584OB_ALL_TENANT_SCHEDULER_JOB_CLASS_TNAME,
585DAY_INTERVAL_USEC
586));
587OZ (trans.start(sql_proxy_, tenant_id, true));
588OZ (trans.write(tenant_id, sql.ptr(), affected_rows));
589int tmp_ret = OB_SUCCESS;
590if (trans.is_started()) {
591if (OB_SUCCESS != (tmp_ret = trans.end(OB_SUCC(ret)))) {
592LOG_WARN("failed to commit trans", KR(ret), KR(tmp_ret));
593ret = OB_SUCC(ret) ? tmp_ret : ret;
594}
595}
596LOG_INFO("purge run detail history", K(ret), K(tenant_id), K(sql.ptr()));
597return ret;
598}
599
600} // end for namespace dbms_scheduler
601} // end for namespace oceanbase
602