oceanbase

Форк
0
/
ob_rs_reentrant_thread.cpp 
227 строк · 6.3 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX RS
14

15
#include "ob_rs_reentrant_thread.h"
16
#include "share/ob_errno.h"
17

18
namespace oceanbase
19
{
20
using namespace common;
21

22
namespace rootserver
23
{
24

25
CheckThreadSet ObRsReentrantThread::check_thread_set_;
26

27
ObRsReentrantThread::ObRsReentrantThread()
28
  :last_run_timestamp_(-1), thread_id_(-1)
29
{}
30

31
ObRsReentrantThread::ObRsReentrantThread(bool need_check)
32
  :last_run_timestamp_(need_check ? 0 : -1), thread_id_(-1)
33
{}
34

35
ObRsReentrantThread::~ObRsReentrantThread()
36
{}
37

38
void ObRsReentrantThread::update_last_run_timestamp() 
39
{
40
  int64_t time = ObTimeUtility::current_time();
41
  IGNORE_RETURN lib::Thread::update_loop_ts(time);
42
  if (ATOMIC_LOAD(&last_run_timestamp_) != -1) {
43
    ATOMIC_STORE(&last_run_timestamp_, time);
44
  }
45
}
46

47
bool ObRsReentrantThread::need_monitor_check() const 
48
{
49
  bool ret = false;
50
  int64_t last_run_timestamp = get_last_run_timestamp();
51
  int64_t schedule_interval = get_schedule_interval();
52
  if (schedule_interval >= 0 && last_run_timestamp > 0 
53
      && last_run_timestamp + schedule_interval + MAX_THREAD_SCHEDULE_OVERRUN_TIME 
54
      < ObTimeUtility::current_time()) {
55
    ret = true;
56
  }
57
  return ret;
58
}
59
int ObRsReentrantThread::start()
60
{
61
  return logical_start();
62
}
63
void ObRsReentrantThread::stop() 
64
{
65
  logical_stop();
66
}
67

68
void ObRsReentrantThread::wait() 
69
{
70
  logical_wait();
71
  if (get_last_run_timestamp() != -1) {
72
    ATOMIC_STORE(&last_run_timestamp_, 0);
73
  }
74
}
75

76
int ObRsReentrantThread::create(const int64_t thread_cnt, const char* thread_name)
77
{
78
  int ret = OB_SUCCESS;
79
  bool added = false;
80
  if (last_run_timestamp_ != -1) {
81
    if (OB_FAIL(ObRsReentrantThread::check_thread_set_.add(this))) {
82
      LOG_WARN("rs_monitor_check : fail to add check thread set", KR(ret));
83
    } else {
84
      added = true;
85
    }
86
  }
87

88
  if (FAILEDx(share::ObReentrantThread::create(thread_cnt, thread_name))) {
89
    LOG_WARN("fail to create reentraint thread", KR(ret), K(thread_name));
90
  } else if (last_run_timestamp_ != -1) {
91
    LOG_INFO("rs_monitor_check : reentrant thread check register success", K(thread_name));
92
  }
93

94
  // ensure create atomic, if failed, should remove it from check_thread_set
95
  if (OB_FAIL(ret) && added) {
96
    int tmp_ret = OB_SUCCESS;
97
    if (OB_TMP_FAIL(ObRsReentrantThread::check_thread_set_.remove(this))) {
98
      LOG_WARN("rs_monitor_check : fail to remove check thread set", KR(tmp_ret), K(thread_name));
99
    }
100
  }
101
  
102
  return ret;
103
}
104

105
int ObRsReentrantThread::destroy()
106
{
107
  int ret = OB_SUCCESS;
108
  const char *thread_name = get_thread_name();
109
  if (last_run_timestamp_ != -1 && OB_FAIL(ObRsReentrantThread::check_thread_set_.remove(this))) {
110
    LOG_WARN("rs_monitor_check : fail to remove check thread set", KR(ret), K(thread_name));
111
  } else if (OB_FAIL(share::ObReentrantThread::destroy())) {
112
    LOG_INFO("fail to destroy reentraint thread", KR(ret), K(thread_name));
113
  }  else if (last_run_timestamp_ != -1) {
114
    LOG_INFO("rs_monitor_check : reentrant thread check unregister success", 
115
        K(thread_name), K_(last_run_timestamp));
116
  }
117
  return ret;
118
}
119

120

121
int64_t ObRsReentrantThread::get_last_run_timestamp() const
122
{ 
123
  return ATOMIC_LOAD(&last_run_timestamp_); 
124
}
125

126
void ObRsReentrantThread::check_alert(const ObRsReentrantThread &thread)
127
{ 
128
  if (thread.need_monitor_check()) {
129
    const pid_t thread_id = thread.get_thread_id();
130
    const char *thread_name = thread.get_thread_name();
131
    int64_t last_run_timestamp = thread.get_last_run_timestamp();
132
    int64_t last_run_interval = ObTimeUtility::current_time() - last_run_timestamp;
133
    int64_t schedule_interval = thread.get_schedule_interval();
134
    int64_t check_interval = schedule_interval + MAX_THREAD_SCHEDULE_OVERRUN_TIME;
135
    LOG_ERROR_RET(common::OB_ERR_UNEXPECTED, "rs_monitor_check : thread hang", K(thread_id), K(thread_name), K(last_run_timestamp),
136
        KTIME(last_run_timestamp), K(last_run_interval), K(check_interval), K(schedule_interval));
137
    LOG_DBA_WARN(OB_ERR_ROOTSERVICE_THREAD_HUNG, "msg", "rootservice backgroud thread may be hung",
138
                 K(thread_id), K(thread_name), K(last_run_timestamp), KTIME(last_run_timestamp),
139
                 K(last_run_interval), K(check_interval), K(schedule_interval));
140
  }
141
}
142

143
CheckThreadSet::CheckThreadSet() 
144
  : arr_(), rwlock_(ObLatchIds::THREAD_HANG_CHECKER_LOCK)
145
{
146
}
147

148
CheckThreadSet::~CheckThreadSet() 
149
{
150
  arr_.reset();
151
}
152

153
void CheckThreadSet::reset() 
154
{
155
  SpinWLockGuard guard(rwlock_);
156
  arr_.reset();
157
}
158

159
int CheckThreadSet::remove(ObRsReentrantThread *thread) 
160
{
161
  int ret = OB_SUCCESS;
162
  int64_t idx = -1;
163
  SpinWLockGuard guard(rwlock_);
164
  if (OB_ISNULL(thread)) {
165
    ret = OB_INVALID_ARGUMENT;
166
  } else {
167
    for (int i = 0; (i < arr_.count()) && (-1 == idx); i++) {
168
      if (arr_[i] == thread) {
169
        idx = i;
170
      }
171
    }
172

173
    if (-1 != idx) {
174
      if (OB_FAIL(arr_.remove(idx))) {
175
        LOG_WARN("fail to remove", KR(ret), KP(thread), K(idx));
176
      }
177
    }
178
  }
179
  return ret;
180
}
181

182
int CheckThreadSet::add(ObRsReentrantThread *thread) 
183
{
184
  int ret = OB_SUCCESS;
185
  SpinWLockGuard guard(rwlock_);
186
  if (OB_ISNULL(thread)) {
187
    ret = OB_INVALID_ARGUMENT;
188
  } else {
189
    for (int i = 0; i < arr_.count(); i++) {
190
      if (arr_[i] == thread) {
191
        ret = OB_ENTRY_EXIST;
192
        break;
193
      }
194
    }
195

196
    if (FAILEDx(arr_.push_back(thread))) {
197
      LOG_WARN("fail to push back", KP(thread), KR(ret));
198
    }
199

200
     if (OB_UNLIKELY(ret == OB_ENTRY_EXIST)) {
201
      // There are existing items when adding elements, which should not be an exception
202
      ret = OB_SUCCESS;
203
    }
204
  }
205
  return ret;
206
}
207

208
void CheckThreadSet::loop_operation(void (*func)(const ObRsReentrantThread&))
209
{
210
  SpinRLockGuard guard(rwlock_);
211
  for (int i = 0; i < arr_.count(); i++) {
212
    if (OB_ISNULL(arr_[i])) {
213
      LOG_WARN_RET(common::OB_ERR_UNEXPECTED, "rs_monitor_check : arr[i] is NULL", K(i));
214
      continue;
215
    }
216
    func(*(arr_[i]));
217
  }
218
}
219

220
int64_t CheckThreadSet::get_thread_count()
221
{
222
  SpinRLockGuard guard(rwlock_);
223
  return arr_.count();
224
} 
225

226
}
227
}
228

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.