2
* Copyright (c) 2021 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
13
#define USING_LOG_PREFIX SERVER_OMT
14
#include "share/ob_define.h"
15
#include "lib/time/ob_time_utility.h"
16
#include "ob_retry_queue.h"
19
using namespace oceanbase::common;
20
using namespace oceanbase::omt;
21
using namespace oceanbase::rpc;
24
int ObRetryQueue::push(ObRequest &req, const uint64_t timestamp)
26
uint64_t idx = max(timestamp / RETRY_QUEUE_TIMESTEP, last_timestamp_ / RETRY_QUEUE_TIMESTEP + 2);
27
int queue_idx = idx & (RETRY_QUEUE_SIZE - 1);
28
return queue_[queue_idx].push(&req);
31
int ObRetryQueue::pop(ObLink *&task, bool need_clear)
33
int ret = OB_ENTRY_NOT_EXIST;
34
uint64_t curr_timestamp = ObTimeUtility::current_time();
35
uint64_t idx = last_timestamp_ / RETRY_QUEUE_TIMESTEP;
37
int queue_idx = idx & (RETRY_QUEUE_SIZE - 1);
38
while (last_timestamp_ <= curr_timestamp && OB_FAIL(queue_[queue_idx].pop(task))) {
39
ATOMIC_FAA(&last_timestamp_, RETRY_QUEUE_TIMESTEP);
40
queue_idx = (++idx) & (RETRY_QUEUE_SIZE - 1);
44
while (queue_idx < RETRY_QUEUE_SIZE && OB_FAIL(queue_[queue_idx].pop(task))) {
51
uint64_t ObRetryQueue::get_last_timestamp() const
53
return last_timestamp_;