oceanbase
227 строк · 6.3 Кб
1/**
2* Copyright (c) 2021 OceanBase
3* OceanBase CE is licensed under Mulan PubL v2.
4* You can use this software according to the terms and conditions of the Mulan PubL v2.
5* You may obtain a copy of Mulan PubL v2 at:
6* http://license.coscl.org.cn/MulanPubL-2.0
7* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10* See the Mulan PubL v2 for more details.
11*/
12
13#define USING_LOG_PREFIX RS
14
15#include "ob_rs_reentrant_thread.h"
16#include "share/ob_errno.h"
17
18namespace oceanbase
19{
20using namespace common;
21
22namespace rootserver
23{
24
25CheckThreadSet ObRsReentrantThread::check_thread_set_;
26
27ObRsReentrantThread::ObRsReentrantThread()
28:last_run_timestamp_(-1), thread_id_(-1)
29{}
30
31ObRsReentrantThread::ObRsReentrantThread(bool need_check)
32:last_run_timestamp_(need_check ? 0 : -1), thread_id_(-1)
33{}
34
35ObRsReentrantThread::~ObRsReentrantThread()
36{}
37
38void ObRsReentrantThread::update_last_run_timestamp()
39{
40int64_t time = ObTimeUtility::current_time();
41IGNORE_RETURN lib::Thread::update_loop_ts(time);
42if (ATOMIC_LOAD(&last_run_timestamp_) != -1) {
43ATOMIC_STORE(&last_run_timestamp_, time);
44}
45}
46
47bool ObRsReentrantThread::need_monitor_check() const
48{
49bool ret = false;
50int64_t last_run_timestamp = get_last_run_timestamp();
51int64_t schedule_interval = get_schedule_interval();
52if (schedule_interval >= 0 && last_run_timestamp > 0
53&& last_run_timestamp + schedule_interval + MAX_THREAD_SCHEDULE_OVERRUN_TIME
54< ObTimeUtility::current_time()) {
55ret = true;
56}
57return ret;
58}
59int ObRsReentrantThread::start()
60{
61return logical_start();
62}
63void ObRsReentrantThread::stop()
64{
65logical_stop();
66}
67
68void ObRsReentrantThread::wait()
69{
70logical_wait();
71if (get_last_run_timestamp() != -1) {
72ATOMIC_STORE(&last_run_timestamp_, 0);
73}
74}
75
76int ObRsReentrantThread::create(const int64_t thread_cnt, const char* thread_name)
77{
78int ret = OB_SUCCESS;
79bool added = false;
80if (last_run_timestamp_ != -1) {
81if (OB_FAIL(ObRsReentrantThread::check_thread_set_.add(this))) {
82LOG_WARN("rs_monitor_check : fail to add check thread set", KR(ret));
83} else {
84added = true;
85}
86}
87
88if (FAILEDx(share::ObReentrantThread::create(thread_cnt, thread_name))) {
89LOG_WARN("fail to create reentraint thread", KR(ret), K(thread_name));
90} else if (last_run_timestamp_ != -1) {
91LOG_INFO("rs_monitor_check : reentrant thread check register success", K(thread_name));
92}
93
94// ensure create atomic, if failed, should remove it from check_thread_set
95if (OB_FAIL(ret) && added) {
96int tmp_ret = OB_SUCCESS;
97if (OB_TMP_FAIL(ObRsReentrantThread::check_thread_set_.remove(this))) {
98LOG_WARN("rs_monitor_check : fail to remove check thread set", KR(tmp_ret), K(thread_name));
99}
100}
101
102return ret;
103}
104
105int ObRsReentrantThread::destroy()
106{
107int ret = OB_SUCCESS;
108const char *thread_name = get_thread_name();
109if (last_run_timestamp_ != -1 && OB_FAIL(ObRsReentrantThread::check_thread_set_.remove(this))) {
110LOG_WARN("rs_monitor_check : fail to remove check thread set", KR(ret), K(thread_name));
111} else if (OB_FAIL(share::ObReentrantThread::destroy())) {
112LOG_INFO("fail to destroy reentraint thread", KR(ret), K(thread_name));
113} else if (last_run_timestamp_ != -1) {
114LOG_INFO("rs_monitor_check : reentrant thread check unregister success",
115K(thread_name), K_(last_run_timestamp));
116}
117return ret;
118}
119
120
121int64_t ObRsReentrantThread::get_last_run_timestamp() const
122{
123return ATOMIC_LOAD(&last_run_timestamp_);
124}
125
126void ObRsReentrantThread::check_alert(const ObRsReentrantThread &thread)
127{
128if (thread.need_monitor_check()) {
129const pid_t thread_id = thread.get_thread_id();
130const char *thread_name = thread.get_thread_name();
131int64_t last_run_timestamp = thread.get_last_run_timestamp();
132int64_t last_run_interval = ObTimeUtility::current_time() - last_run_timestamp;
133int64_t schedule_interval = thread.get_schedule_interval();
134int64_t check_interval = schedule_interval + MAX_THREAD_SCHEDULE_OVERRUN_TIME;
135LOG_ERROR_RET(common::OB_ERR_UNEXPECTED, "rs_monitor_check : thread hang", K(thread_id), K(thread_name), K(last_run_timestamp),
136KTIME(last_run_timestamp), K(last_run_interval), K(check_interval), K(schedule_interval));
137LOG_DBA_WARN(OB_ERR_ROOTSERVICE_THREAD_HUNG, "msg", "rootservice backgroud thread may be hung",
138K(thread_id), K(thread_name), K(last_run_timestamp), KTIME(last_run_timestamp),
139K(last_run_interval), K(check_interval), K(schedule_interval));
140}
141}
142
143CheckThreadSet::CheckThreadSet()
144: arr_(), rwlock_(ObLatchIds::THREAD_HANG_CHECKER_LOCK)
145{
146}
147
148CheckThreadSet::~CheckThreadSet()
149{
150arr_.reset();
151}
152
153void CheckThreadSet::reset()
154{
155SpinWLockGuard guard(rwlock_);
156arr_.reset();
157}
158
159int CheckThreadSet::remove(ObRsReentrantThread *thread)
160{
161int ret = OB_SUCCESS;
162int64_t idx = -1;
163SpinWLockGuard guard(rwlock_);
164if (OB_ISNULL(thread)) {
165ret = OB_INVALID_ARGUMENT;
166} else {
167for (int i = 0; (i < arr_.count()) && (-1 == idx); i++) {
168if (arr_[i] == thread) {
169idx = i;
170}
171}
172
173if (-1 != idx) {
174if (OB_FAIL(arr_.remove(idx))) {
175LOG_WARN("fail to remove", KR(ret), KP(thread), K(idx));
176}
177}
178}
179return ret;
180}
181
182int CheckThreadSet::add(ObRsReentrantThread *thread)
183{
184int ret = OB_SUCCESS;
185SpinWLockGuard guard(rwlock_);
186if (OB_ISNULL(thread)) {
187ret = OB_INVALID_ARGUMENT;
188} else {
189for (int i = 0; i < arr_.count(); i++) {
190if (arr_[i] == thread) {
191ret = OB_ENTRY_EXIST;
192break;
193}
194}
195
196if (FAILEDx(arr_.push_back(thread))) {
197LOG_WARN("fail to push back", KP(thread), KR(ret));
198}
199
200if (OB_UNLIKELY(ret == OB_ENTRY_EXIST)) {
201// There are existing items when adding elements, which should not be an exception
202ret = OB_SUCCESS;
203}
204}
205return ret;
206}
207
208void CheckThreadSet::loop_operation(void (*func)(const ObRsReentrantThread&))
209{
210SpinRLockGuard guard(rwlock_);
211for (int i = 0; i < arr_.count(); i++) {
212if (OB_ISNULL(arr_[i])) {
213LOG_WARN_RET(common::OB_ERR_UNEXPECTED, "rs_monitor_check : arr[i] is NULL", K(i));
214continue;
215}
216func(*(arr_[i]));
217}
218}
219
220int64_t CheckThreadSet::get_thread_count()
221{
222SpinRLockGuard guard(rwlock_);
223return arr_.count();
224}
225
226}
227}
228