oceanbase

Форк
0
/
ob_worker_processor.cpp 
171 строка · 5.6 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER_OMT
14

15
#include "ob_worker_processor.h"
16
#include "share/ob_define.h"
17
#include "lib/utility/utility.h"
18
#include "lib/oblog/ob_trace_log.h"
19
#include "lib/profile/ob_perf_event.h"  // SET_PERF_EVENT
20
#include "lib/profile/ob_trace_id_adaptor.h"
21
#include "lib/oblog/ob_warning_buffer.h"
22
#include "rpc/ob_request.h"
23
#include "rpc/obrpc/ob_rpc_packet.h"
24
#include "rpc/frame/ob_req_translator.h"
25
#include "rpc/frame/ob_req_processor.h"
26
#include "share/config/ob_server_config.h"
27
#include "observer/omt/ob_th_worker.h"
28
#include "lib/utility/ob_hang_fatal_error.h"
29

30
using namespace oceanbase::common;
31
using namespace oceanbase::omt;
32
using namespace oceanbase::rpc;
33
using namespace oceanbase::rpc::frame;
34
using namespace oceanbase::obrpc;
35

36
ObWorkerProcessor::ObWorkerProcessor(
37
    ObReqTranslator &xlator,
38
    const common::ObAddr &myaddr)
39
    : translator_(xlator), myaddr_(myaddr)
40
{}
41

42
void ObWorkerProcessor::th_created()
43
{
44
  translator_.th_init();
45
}
46

47
void ObWorkerProcessor::th_destroy()
48
{
49
  translator_.th_destroy();
50
}
51

52
#ifdef ERRSIM
53
ERRSIM_POINT_DEF(EN_WORKER_PROCESS_REQUEST)
54
#endif
55

56
OB_NOINLINE int ObWorkerProcessor::process_err_test()
57
{
58
  int ret = OB_SUCCESS;
59

60
#ifdef ERRSIM
61
  ret = EN_WORKER_PROCESS_REQUEST;
62
  LOG_DEBUG("process err_test", K(ret));
63
#endif
64
  return ret;
65
}
66

67
inline int ObWorkerProcessor::process_one(rpc::ObRequest &req)
68
{
69
  int ret = OB_SUCCESS;
70
  ObReqProcessor *processor = NULL;
71

72
  if (OB_FAIL(process_err_test())) {
73
    LOG_WARN("ignore request with err_test", K(ret));
74
  } else if (OB_FAIL(translator_.translate(req, processor))) {
75
    LOG_WARN("translate request fail", K(ret));
76
    on_translate_fail(&req, ret);
77
  } else if (OB_ISNULL(processor)) {
78
    ret = OB_ERR_UNEXPECTED;
79
    LOG_ERROR("unexpected condition", K(ret));
80
  } else {
81
    NG_TRACE(before_processor_run);
82
    req.on_process_begin();
83
    req.set_trace_point(ObRequest::OB_EASY_REQUEST_WORKER_PROCESSOR_RUN);
84
    if (OB_FAIL(processor->run())) {
85
      LOG_WARN("process request fail", K(ret));
86
    }
87
    translator_.release(processor);
88
  }
89

90
  return ret;
91
}
92

93
int ObWorkerProcessor::process(rpc::ObRequest &req)
94
{
95
  int ret = OB_SUCCESS;
96

97
  if (THE_TRACE != nullptr) {
98
    THE_TRACE->reset();
99
  }
100
  OB_ATOMIC_EVENT_RESET_RECORDER();
101
  PERF_RESET_RECORDER();
102
  const bool enable_trace_log = lib::is_trace_log_enabled();
103
  const int64_t q_time = THIS_THWORKER.get_query_start_time() - req.get_receive_timestamp();
104
  NG_TRACE_EXT(process_begin,
105
               OB_ID(in_queue_time), q_time,
106
               OB_ID(receive_ts), req.get_receive_timestamp(),
107
               OB_ID(enqueue_ts), req.get_enqueue_timestamp());
108
  ObRequest::Type req_type = req.get_type(); // bugfix note: must be obtained in advance
109
  if (ObRequest::OB_RPC == req_type) {
110
    // internal RPC request
111
    const obrpc::ObRpcPacket &packet
112
        = static_cast<const obrpc::ObRpcPacket&>(req.get_packet());
113
    NG_TRACE_EXT(start_rpc, OB_ID(addr), RPC_REQ_OP.get_peer(&req), OB_ID(pcode), packet.get_pcode());
114
    ObCurTraceId::set(req.generate_trace_id(myaddr_));
115

116
#ifdef ERRSIM
117
    THIS_WORKER.set_module_type(packet.get_module_type());
118
#endif
119

120
    // Do not set thread local log level while log level upgrading (OB_LOGGER.is_info_as_wdiag)
121
    if (OB_LOGGER.is_info_as_wdiag()) {
122
      ObThreadLogLevelUtils::clear();
123
    } else {
124
      if (enable_trace_log && OB_LOG_LEVEL_NONE != packet.get_log_level()) {
125
        ObThreadLogLevelUtils::init(packet.get_log_level());
126
      }
127
    }
128
  } else if (ObRequest::OB_MYSQL == req_type) {
129
    NG_TRACE_EXT(start_sql, OB_ID(addr), SQL_REQ_OP.get_peer(&req));
130
    // mysql command request
131
    ObCurTraceId::set(req.generate_trace_id(myaddr_));
132
  }
133
  // record trace id
134
  ObTraceIdAdaptor trace_id_adaptor;
135
  trace_id_adaptor.set(ObCurTraceId::get());
136
  NG_TRACE_EXT(query_begin, OB_ID(trace_id), trace_id_adaptor);
137
  //NG_TRACE(query_begin);
138

139
  // setup and init warning buffer
140
  // For general SQL processing, the rpc processing function entry uses set_tsi_warning_buffer to set the session
141
  // The warning buffer is set to the thread part; but for the handler of the task remote execution, because
142
  // The error message needs to be used when serializing result_code until after the process() function
143
  // Therefore, the warning buffer member of the session cannot be used. Therefore, one is set by default.
144
  ob_setup_default_tsi_warning_buffer();
145
  ob_reset_tsi_warning_buffer();
146

147
  //Set the chid of the source package to the thread
148
  // int64_t st = ::oceanbase::common::ObTimeUtility::current_time();
149
  // PROFILE_LOG(DEBUG, HANDLE_PACKET_START_TIME PCODE, st, packet->get_pcode());
150
  // go!
151
  try {
152
    in_try_stmt = true;
153
    if (OB_FAIL(process_one(req))) {
154
      LOG_WARN("process request fail", K(ret));
155
    }
156
    in_try_stmt = false;
157
  } catch (OB_BASE_EXCEPTION &except) {
158
    _LOG_ERROR("Exception caught!!! errno = %d, exception info = %s", except.get_errno(), except.what());
159
    in_try_stmt = false;
160
  }
161

162
  // cleanup
163
  ObCurTraceId::reset();
164
  if (enable_trace_log) {
165
    ObThreadLogLevelUtils::clear();
166
  }
167
  PERF_GATHER_DATA();
168
  //LOG_INFO("yzf debug", "atomic_op", ATOMIC_EVENT_RECORDER);
169
  OB_ATOMIC_EVENT_GATHER_DATA();
170
  return ret;
171
}
172

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.