oceanbase

Форк
0
/
ob_srv_xlator.cpp 
420 строк · 16.1 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER
14

15
#include "observer/ob_srv_xlator.h"
16

17
#include "share/ob_tenant_mgr.h"
18
#include "share/schema/ob_schema_service_rpc_proxy.h"
19
#include "rpc/ob_request.h"
20
#include "rpc/obmysql/ob_mysql_packet.h"
21
#include "rpc/obmysql/ob_sql_sock_session.h"
22
#include "share/rpc/ob_batch_processor.h"
23
#include "share/rpc/ob_blacklist_req_processor.h"
24
#include "share/rpc/ob_blacklist_resp_processor.h"
25
#include "sql/executor/ob_executor_rpc_processor.h"
26
#include "sql/engine/cmd/ob_kill_executor.h"
27
#include "sql/engine/cmd/ob_load_data_rpc.h"
28
#include "sql/engine/px/ob_px_rpc_processor.h"
29
#include "sql/dtl/ob_dtl_rpc_processor.h"
30
#include "sql/ob_sql_task.h"
31
#include "share/interrupt/ob_interrupt_rpc_proxy.h"
32
#include "storage/tx/ob_trans_rpc.h"
33
#include "storage/tx/ob_gts_rpc.h"
34
#include "storage/tx/ob_dup_table_rpc.h"
35
#include "storage/tx/ob_ts_response_handler.h"
36
#include "storage/tx/wrs/ob_weak_read_service_rpc_define.h"  // weak_read_service
37
#include "observer/ob_rpc_processor_simple.h"
38
#include "observer/ob_srv_task.h"
39
#include "observer/mysql/obmp_query.h"
40
#include "observer/mysql/obmp_ping.h"
41
#include "observer/mysql/obmp_quit.h"
42
#include "observer/mysql/obmp_connect.h"
43
#include "observer/mysql/obmp_init_db.h"
44
#include "observer/mysql/obmp_default.h"
45
#include "observer/mysql/obmp_change_user.h"
46
#include "observer/mysql/obmp_error.h"
47
#include "observer/mysql/obmp_statistic.h"
48
#include "observer/mysql/obmp_stmt_prepare.h"
49
#include "observer/mysql/obmp_stmt_execute.h"
50
#include "observer/mysql/obmp_stmt_fetch.h"
51
#include "observer/mysql/obmp_stmt_close.h"
52
#include "observer/mysql/obmp_stmt_prexecute.h"
53
#include "observer/mysql/obmp_stmt_send_piece_data.h"
54
#include "observer/mysql/obmp_stmt_get_piece_data.h"
55
#include "observer/mysql/obmp_stmt_send_long_data.h"
56
#include "observer/mysql/obmp_stmt_reset.h"
57
#include "observer/mysql/obmp_reset_connection.h"
58

59
#include "observer/table/ob_table_rpc_processor.h"
60
#include "observer/table/ob_table_execute_processor.h"
61
#include "observer/table/ob_table_batch_execute_processor.h"
62
#include "observer/table/ob_table_query_processor.h"
63
#include "observer/table/ob_table_query_and_mutate_processor.h"
64
#include "logservice/palf/log_rpc_processor.h"
65

66
using namespace oceanbase::observer;
67
using namespace oceanbase::lib;
68
using namespace oceanbase::rpc;
69
using namespace oceanbase::sql;
70
using namespace oceanbase::common;
71
using namespace oceanbase::transaction;
72
using namespace oceanbase::obrpc;
73
using namespace oceanbase::obmysql;
74

75
#define PROCESSOR_BEGIN(pcode)                  \
76
  switch (pcode) {
77

78
#define PROCESSOR_END()                         \
79
  default:                                      \
80
  ret = OB_NOT_SUPPORTED;                       \
81
  }
82

83
#define NEW_MYSQL_PROCESSOR(ObMySQLP, ...)                              \
84
  do {                                                                  \
85
    ObIAllocator *alloc = &THIS_WORKER.get_sql_arena_allocator() ;      \
86
    ObMySQLP *p = OB_NEWx(ObMySQLP, alloc, __VA_ARGS__);                \
87
    if (OB_ISNULL(p)) {                                                 \
88
      ret = OB_ALLOCATE_MEMORY_FAILED;                                  \
89
    } else if (OB_FAIL(p->init())) {                                    \
90
      SERVER_LOG(ERROR, "Init " #ObMySQLP "fail", K(ret));              \
91
      worker_allocator_delete(p);                                       \
92
      p = NULL;                                                         \
93
    } else {                                                            \
94
      processor = p;                                                    \
95
    }                                                                   \
96
  } while (0)
97

98
#define MYSQL_PROCESSOR(ObMySQLP, ...)          \
99
  case ObMySQLP::COM: {                         \
100
    NEW_MYSQL_PROCESSOR(ObMySQLP, __VA_ARGS__); \
101
    break;                                      \
102
  }
103

104
void ObSrvRpcXlator::register_rpc_process_function(int pcode, RPCProcessFunc func) {
105
  if(pcode >= MAX_PCODE || pcode < 0) {
106
    SERVER_LOG_RET(ERROR, OB_ERROR, "(SHOULD NEVER HAPPEN) input pcode is out of range in server rpc xlator", K(pcode));
107
    ob_abort();
108
  } else if (funcs_[pcode] != nullptr) {
109
    SERVER_LOG_RET(ERROR, OB_ERROR, "(SHOULD NEVER HAPPEN) duplicate pcode in server rpc xlator", K(pcode));
110
    ob_abort();
111
  } else {
112
    funcs_[pcode] = func;
113
  }
114
}
115

116
ObIAllocator &oceanbase::observer::get_sql_arena_allocator() {
117
  return THIS_WORKER.get_sql_arena_allocator();
118
}
119

120
int ObSrvRpcXlator::translate(rpc::ObRequest &req, ObReqProcessor *&processor)
121
{
122
  int ret = OB_SUCCESS;
123
  processor = NULL;
124
  const ObRpcPacket &pkt
125
      = reinterpret_cast<const ObRpcPacket&>(req.get_packet());
126
  int pcode = pkt.get_pcode();
127

128
  if (OB_UNLIKELY(pcode < 0 || pcode >= MAX_PCODE || funcs_[pcode] == nullptr)) {
129
    ret = OB_NOT_SUPPORTED;
130
    LOG_WARN("not support packet", K(pkt), K(ret), K(MAX_PCODE));
131
  } else {
132
    ret = funcs_[pcode](gctx_, processor, session_handler_);
133
  }
134

135
  if (OB_SUCC(ret) && NULL == processor) {
136
    ret = OB_NOT_SUPPORTED;
137
  }
138

139
  if (!OB_SUCC(ret) && NULL != processor) {
140
    ob_delete(processor);
141
    processor = NULL;
142
  }
143

144
  return ret;
145
}
146

147
int ObSrvXlator::th_init()
148
{
149
  int ret = common::OB_SUCCESS;
150

151
  if (OB_FAIL(rpc_xlator_.th_init())) {
152
    LOG_ERROR("init rpc translator for thread fail", K(ret));
153
  } else if (OB_FAIL(mysql_xlator_.th_init())) {
154
    LOG_ERROR("init mysql translator for thread fail", K(ret));
155
  }
156
  return ret;
157
}
158

159
int ObSrvXlator::th_destroy()
160
{
161
  int ret = common::OB_SUCCESS;
162
  if (OB_FAIL(rpc_xlator_.th_destroy())) {
163
    LOG_ERROR("destroy rpc translator for thread fail", K(ret));
164
  } else if (OB_FAIL(mysql_xlator_.th_destroy())) {
165
    LOG_ERROR("destroy mysql translator for thread fail", K(ret));
166
  }
167
  return ret;
168
}
169

170
typedef union EP_RPCP_BUF {
171
  char rpcp_buffer_[RPCP_BUF_SIZE]; // reserve memory for rpc processor
172
  char ep_buffer_[sizeof (ObErrorP) + sizeof (ObMPError)];
173
} EP_RPCP_BUF;
174
// Make sure election rpc processor allocated successfully when OOM occurs
175
STATIC_ASSERT(RPCP_BUF_SIZE >= sizeof(oceanbase::palf::ElectionPrepareRequestMsgP), "RPCP_BUF_SIZE should be big enough to allocate election processer");
176
STATIC_ASSERT(RPCP_BUF_SIZE >= sizeof(oceanbase::palf::ElectionPrepareResponseMsgP), "RPCP_BUF_SIZE should be big enough to allocate election processer");
177
STATIC_ASSERT(RPCP_BUF_SIZE >= sizeof(oceanbase::palf::ElectionAcceptRequestMsgP), "RPCP_BUF_SIZE should be big enough to allocate election processer");
178
STATIC_ASSERT(RPCP_BUF_SIZE >= sizeof(oceanbase::palf::ElectionAcceptResponseMsgP), "RPCP_BUF_SIZE should be big enough to allocate election processer");
179
STATIC_ASSERT(RPCP_BUF_SIZE >= sizeof(oceanbase::palf::ElectionChangeLeaderMsgP), "RPCP_BUF_SIZE should be big enough to allocate election processer");
180

181
typedef struct {
182
  char buffer_[sizeof (ObMPStmtClose)];
183
} CLOSEPBUF;
184

185
_RLOCAL(EP_RPCP_BUF, co_ep_rpcp_buf) __attribute__((aligned(64)));
186
_RLOCAL(CLOSEPBUF, co_closepbuf)  __attribute__((aligned(64)));
187

188
int ObSrvMySQLXlator::translate(rpc::ObRequest &req, ObReqProcessor *&processor)
189
{
190
  int ret = OB_SUCCESS;
191
  processor = NULL;
192

193
  if (ObRequest::OB_MYSQL != req.get_type()) {
194
    LOG_ERROR("can't translate non-mysql request");
195
    ret = OB_ERR_UNEXPECTED;
196
  } else {
197
    if (req.is_in_connected_phase()) {
198
      ret = get_mp_connect_processor(processor);
199
    } else {
200
      const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket &>(req.get_packet());
201
      switch (pkt.get_cmd()) {
202
        MYSQL_PROCESSOR(ObMPQuery, gctx_);
203
        MYSQL_PROCESSOR(ObMPQuit, gctx_);
204
        MYSQL_PROCESSOR(ObMPPing, gctx_);
205
        MYSQL_PROCESSOR(ObMPInitDB, gctx_);
206
        MYSQL_PROCESSOR(ObMPChangeUser, gctx_);
207
        MYSQL_PROCESSOR(ObMPStatistic, gctx_);
208
        MYSQL_PROCESSOR(ObMPStmtPrepare, gctx_);
209
        MYSQL_PROCESSOR(ObMPStmtExecute, gctx_);
210
        MYSQL_PROCESSOR(ObMPStmtFetch, gctx_);
211
        MYSQL_PROCESSOR(ObMPStmtReset, gctx_);
212
        MYSQL_PROCESSOR(ObMPStmtPrexecute, gctx_);
213
        MYSQL_PROCESSOR(ObMPStmtSendPieceData, gctx_);
214
        MYSQL_PROCESSOR(ObMPStmtGetPieceData, gctx_);
215
        MYSQL_PROCESSOR(ObMPStmtSendLongData, gctx_);
216
        MYSQL_PROCESSOR(ObMPResetConnection, gctx_);
217
        // ps stmt close request may not response packet.
218
        // Howerver, in get processor phase, it may report 
219
        // error due to lack of memory and this response error packet.
220
        // To avoid this situation, we make stmt close processor
221
        // by stack memory
222
        case obmysql::COM_STMT_CLOSE: {
223
          char *closepbuf = (&co_closepbuf)->buffer_;
224
          ObMPStmtClose* p = new (&closepbuf[0]) ObMPStmtClose(gctx_);
225
          if (OB_FAIL(p->init())) {
226
            SERVER_LOG(ERROR, "Init ObMPStmtClose fail", K(ret));
227
            p->~ObMPStmtClose();
228
          } else {
229
            processor = p;
230
          }
231
          break;
232
        }
233
        case obmysql::COM_FIELD_LIST: {
234
        /*为了和proxy进行适配,对于COM_FIELD_LIST命令的支持,按照以下原则支持:
235
        * 1. 如果是非Proxy模式,返回正常的查询结果包
236
        * 2. 如果是Proxy模式:
237
        *   2.1. 如果有版本号:1.7.6 以下返回不支持错误包;
238
        *                    1.7.6 及以上返回正常额查询结果;
239
        *                    无效版本号返回不支持错误包
240
        *   2.2. 如果没有版本号,返回不支持错误包;
241
        */
242
          ObSMConnection *conn = reinterpret_cast<ObSMConnection* >(
243
              SQL_REQ_OP.get_sql_session(&req));
244
          if (OB_ISNULL(conn)) {
245
            ret = OB_ERR_UNEXPECTED;
246
            LOG_WARN("get unexpected null", K(conn), K(ret));
247
          } else if (conn->is_proxy_) {
248
            const char *sup_proxy_min_version = "1.7.6";
249
            uint64_t min_proxy_version = 0;
250
            if (OB_FAIL(ObClusterVersion::get_version(sup_proxy_min_version, min_proxy_version))) {
251
              LOG_WARN("failed to get version", K(ret));
252
            } else if (conn->proxy_version_ < min_proxy_version) {
253
              NEW_MYSQL_PROCESSOR(ObMPDefault, gctx_);
254
            } else {
255
              NEW_MYSQL_PROCESSOR(ObMPQuery, gctx_);
256
            }
257
          } else {
258
            NEW_MYSQL_PROCESSOR(ObMPQuery, gctx_);
259
          }
260
          break;
261
        }
262
        default:
263
          NEW_MYSQL_PROCESSOR(ObMPDefault, gctx_);
264
          break;
265
      }
266
      if (OB_SUCC(ret) && pkt.get_cmd() == obmysql::COM_FIELD_LIST) {
267
        if (OB_ISNULL(static_cast<ObMPQuery *>(processor))) {
268
          ret = OB_ERR_UNEXPECTED;
269
          LOG_WARN("get unexpected null", K(static_cast<ObMPQuery *>(processor)));
270
        } else {
271
          static_cast<ObMPQuery *>(processor)->set_is_com_filed_list();
272
        }
273
      }
274
      if (OB_SUCC(ret) && (pkt.get_cmd() == obmysql::COM_STMT_PREPARE
275
                            || pkt.get_cmd() == obmysql::COM_STMT_PREXECUTE)) {
276
        ObSMConnection *conn = reinterpret_cast<ObSMConnection* >(
277
            SQL_REQ_OP.get_sql_session(&req));
278
        if (OB_ISNULL(conn) || OB_ISNULL(dynamic_cast<ObMPBase *>(processor))) {
279
          ret = OB_ERR_UNEXPECTED;
280
          LOG_WARN("get unexpected null", K(dynamic_cast<ObMPBase *>(processor)));
281
        } else {
282
          uint64_t proxy_version = conn->is_proxy_ ? conn->proxy_version_ : 0;
283
            static_cast<ObMPBase *>(processor)->set_proxy_version(proxy_version);
284
        }
285
      }
286
    }
287
    if (OB_FAIL(ret) && NULL != processor) {
288
      worker_allocator_delete(processor);
289
      processor = NULL;
290
    }
291
  }
292

293
  return ret;
294
}
295

296
ObReqProcessor *ObSrvXlator::get_processor(ObRequest &req)
297
{
298
  int ret = OB_SUCCESS;
299
  ObReqProcessor *processor = NULL;
300

301
  // 1. create processor by request type.
302
  if (req.get_discard_flag() == true) {
303
    ret = OB_WAITQUEUE_TIMEOUT;
304
  } else if (ObRequest::OB_MYSQL == req.get_type()) {
305
    ret = mysql_xlator_.translate(req, processor);
306
  } else if (ObRequest::OB_RPC == req.get_type()) {
307
    const obrpc::ObRpcPacket &pkt
308
        = reinterpret_cast<const obrpc::ObRpcPacket &>(req.get_packet());
309
    ret = rpc_xlator_.translate(req, processor);
310
    if (OB_SUCC(ret)) {
311
      THIS_WORKER.set_timeout_ts(req.get_receive_timestamp() + pkt.get_timeout());
312
      THIS_WORKER.set_ntp_offset(req.get_receive_timestamp() - req.get_send_timestamp());
313
    }
314
  } else if (ObRequest::OB_TASK == req.get_type() ||
315
             ObRequest::OB_TS_TASK == req.get_type() ||
316
             ObRequest::OB_SQL_TASK == req.get_type()) {
317
    processor = &static_cast<ObSrvTask&>(req).get_processor();
318
  } else {
319
    LOG_WARN("can't translate packet", "type", req.get_type());
320
    ret = OB_UNKNOWN_PACKET;
321
  }
322

323
  // destroy processor if alloc before but translate fail.
324
  if (OB_FAIL(ret) && NULL != processor) {
325
    worker_allocator_delete(processor);
326
    processor = NULL;
327
  }
328

329
  if (OB_ISNULL(processor)) {
330
    if (ObRequest::OB_RPC == req.get_type()) {
331
      processor = get_error_rpc_processor(ret);
332
    } else if (ObRequest::OB_MYSQL == req.get_type()) {
333
      processor = get_error_mysql_processor(ret);
334
      (static_cast<ObMPError*>(processor))->set_need_disconnect(true);
335
    }
336
  }
337

338
  return processor;
339
}
340

341

342
int ObSrvXlator::release(ObReqProcessor *processor)
343
{
344
  int ret = OB_SUCCESS;
345
  const char *epbuf = (&co_ep_rpcp_buf)->ep_buffer_;
346
  const char *cpbuf = (&co_closepbuf)->buffer_;
347
  const char *rpcpbuf = (&co_ep_rpcp_buf)->rpcp_buffer_;
348
  if (NULL == processor) {
349
    ret = OB_INVALID_ARGUMENT;
350
    LOG_ERROR("invalid argument", K(processor), K(ret));
351
  } else if (reinterpret_cast<char*>(processor) == epbuf || reinterpret_cast<char*>(processor) == rpcpbuf) {
352
    processor->destroy();
353
    processor->~ObReqProcessor();
354
  } else if (reinterpret_cast<char*>(processor) == cpbuf) {
355
    processor->destroy();
356
    ObRequest::TransportProto nio_protocol = (ObRequest::TransportProto)processor->get_nio_protocol();
357
    processor->~ObReqProcessor();
358
  } else {
359
    processor->destroy();
360

361
    // task request is allocated when new task composed, then delete
362
    // here.
363
    ObRequest *req = const_cast<ObRequest*>(processor->get_ob_request());
364
    ObRequest::Type req_type = (ObRequest::Type)processor->get_req_type();
365
    ObRequest::TransportProto nio_protocol = (ObRequest::TransportProto)processor->get_nio_protocol();
366
    bool need_retry = processor->get_need_retry();
367
    bool async_resp_used = processor->get_async_resp_used();
368
    if (ObRequest::OB_TASK == req_type) {
369
      //Deal with sqltask memory release
370
      ob_delete(req);
371
      req = NULL;
372
    } else if (ObRequest::OB_TS_TASK == req_type) {
373
      //Deal with the memory release of the transaction task
374
      ObTsResponseTaskFactory::free(static_cast<ObTsResponseTask *>(req));
375
      //op_reclaim_free(req);
376
      req = NULL;
377
    } else if (ObRequest::OB_SQL_TASK == req_type) {
378
      ObSqlTaskFactory::get_instance().free(static_cast<ObSqlTask *>(req));
379
      req = NULL;
380
    } else {
381
      worker_allocator_delete(processor);
382
      processor = NULL;
383
    }
384
  }
385
  return ret;
386
}
387

388
ObReqProcessor *ObSrvXlator::get_error_rpc_processor(const int ret)
389
{
390
  char *epbuf = (&co_ep_rpcp_buf)->ep_buffer_;
391
  ObErrorP *p = new (&epbuf[0]) ObErrorP(ret);
392
  return p;
393
}
394

395
ObReqProcessor *ObSrvXlator::get_error_mysql_processor(const int ret)
396
{
397
  char *epbuf = (&co_ep_rpcp_buf)->ep_buffer_;
398
  ObMPError *p = new (&epbuf[0]) ObMPError(ret);
399
  return p;
400
}
401

402
int ObSrvMySQLXlator::get_mp_connect_processor(ObReqProcessor *&ret_proc)
403
{
404
  int ret = OB_SUCCESS;
405
  ObMPConnect *proc = NULL;
406
  void *buf = THIS_WORKER.get_sql_arena_allocator().alloc(sizeof(ObMPConnect));
407
  if (OB_ISNULL(buf)) {
408
    ret = OB_ALLOCATE_MEMORY_FAILED;
409
    LOG_ERROR("failed to allocate memory for ObMPConnect", K(ret));
410
  } else  {
411
    proc = new(buf) ObMPConnect(gctx_);
412
    if (OB_FAIL(proc->init())) {
413
      LOG_ERROR("init ObMPConnect fail", K(ret));
414
      worker_allocator_delete(proc);
415
      proc = NULL;
416
    }
417
  }
418
  ret_proc = proc;
419
  return ret;
420
}
421

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.