2
* Copyright (c) 2021 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
13
#define USING_LOG_PREFIX SERVER
15
#include "observer/ob_srv_xlator.h"
17
#include "share/ob_tenant_mgr.h"
18
#include "share/schema/ob_schema_service_rpc_proxy.h"
19
#include "rpc/ob_request.h"
20
#include "rpc/obmysql/ob_mysql_packet.h"
21
#include "rpc/obmysql/ob_sql_sock_session.h"
22
#include "share/rpc/ob_batch_processor.h"
23
#include "share/rpc/ob_blacklist_req_processor.h"
24
#include "share/rpc/ob_blacklist_resp_processor.h"
25
#include "sql/executor/ob_executor_rpc_processor.h"
26
#include "sql/engine/cmd/ob_kill_executor.h"
27
#include "sql/engine/cmd/ob_load_data_rpc.h"
28
#include "sql/engine/px/ob_px_rpc_processor.h"
29
#include "sql/dtl/ob_dtl_rpc_processor.h"
30
#include "sql/ob_sql_task.h"
31
#include "share/interrupt/ob_interrupt_rpc_proxy.h"
32
#include "storage/tx/ob_trans_rpc.h"
33
#include "storage/tx/ob_gts_rpc.h"
34
#include "storage/tx/ob_dup_table_rpc.h"
35
#include "storage/tx/ob_ts_response_handler.h"
36
#include "storage/tx/wrs/ob_weak_read_service_rpc_define.h" // weak_read_service
37
#include "observer/ob_rpc_processor_simple.h"
38
#include "observer/ob_srv_task.h"
39
#include "observer/mysql/obmp_query.h"
40
#include "observer/mysql/obmp_ping.h"
41
#include "observer/mysql/obmp_quit.h"
42
#include "observer/mysql/obmp_connect.h"
43
#include "observer/mysql/obmp_init_db.h"
44
#include "observer/mysql/obmp_default.h"
45
#include "observer/mysql/obmp_change_user.h"
46
#include "observer/mysql/obmp_error.h"
47
#include "observer/mysql/obmp_statistic.h"
48
#include "observer/mysql/obmp_stmt_prepare.h"
49
#include "observer/mysql/obmp_stmt_execute.h"
50
#include "observer/mysql/obmp_stmt_fetch.h"
51
#include "observer/mysql/obmp_stmt_close.h"
52
#include "observer/mysql/obmp_stmt_prexecute.h"
53
#include "observer/mysql/obmp_stmt_send_piece_data.h"
54
#include "observer/mysql/obmp_stmt_get_piece_data.h"
55
#include "observer/mysql/obmp_stmt_send_long_data.h"
56
#include "observer/mysql/obmp_stmt_reset.h"
57
#include "observer/mysql/obmp_reset_connection.h"
59
#include "observer/table/ob_table_rpc_processor.h"
60
#include "observer/table/ob_table_execute_processor.h"
61
#include "observer/table/ob_table_batch_execute_processor.h"
62
#include "observer/table/ob_table_query_processor.h"
63
#include "observer/table/ob_table_query_and_mutate_processor.h"
64
#include "logservice/palf/log_rpc_processor.h"
66
using namespace oceanbase::observer;
67
using namespace oceanbase::lib;
68
using namespace oceanbase::rpc;
69
using namespace oceanbase::sql;
70
using namespace oceanbase::common;
71
using namespace oceanbase::transaction;
72
using namespace oceanbase::obrpc;
73
using namespace oceanbase::obmysql;
75
#define PROCESSOR_BEGIN(pcode) \
78
#define PROCESSOR_END() \
80
ret = OB_NOT_SUPPORTED; \
83
#define NEW_MYSQL_PROCESSOR(ObMySQLP, ...) \
85
ObIAllocator *alloc = &THIS_WORKER.get_sql_arena_allocator() ; \
86
ObMySQLP *p = OB_NEWx(ObMySQLP, alloc, __VA_ARGS__); \
88
ret = OB_ALLOCATE_MEMORY_FAILED; \
89
} else if (OB_FAIL(p->init())) { \
90
SERVER_LOG(ERROR, "Init " #ObMySQLP "fail", K(ret)); \
91
worker_allocator_delete(p); \
98
#define MYSQL_PROCESSOR(ObMySQLP, ...) \
99
case ObMySQLP::COM: { \
100
NEW_MYSQL_PROCESSOR(ObMySQLP, __VA_ARGS__); \
104
void ObSrvRpcXlator::register_rpc_process_function(int pcode, RPCProcessFunc func) {
105
if(pcode >= MAX_PCODE || pcode < 0) {
106
SERVER_LOG_RET(ERROR, OB_ERROR, "(SHOULD NEVER HAPPEN) input pcode is out of range in server rpc xlator", K(pcode));
108
} else if (funcs_[pcode] != nullptr) {
109
SERVER_LOG_RET(ERROR, OB_ERROR, "(SHOULD NEVER HAPPEN) duplicate pcode in server rpc xlator", K(pcode));
112
funcs_[pcode] = func;
116
ObIAllocator &oceanbase::observer::get_sql_arena_allocator() {
117
return THIS_WORKER.get_sql_arena_allocator();
120
int ObSrvRpcXlator::translate(rpc::ObRequest &req, ObReqProcessor *&processor)
122
int ret = OB_SUCCESS;
124
const ObRpcPacket &pkt
125
= reinterpret_cast<const ObRpcPacket&>(req.get_packet());
126
int pcode = pkt.get_pcode();
128
if (OB_UNLIKELY(pcode < 0 || pcode >= MAX_PCODE || funcs_[pcode] == nullptr)) {
129
ret = OB_NOT_SUPPORTED;
130
LOG_WARN("not support packet", K(pkt), K(ret), K(MAX_PCODE));
132
ret = funcs_[pcode](gctx_, processor, session_handler_);
135
if (OB_SUCC(ret) && NULL == processor) {
136
ret = OB_NOT_SUPPORTED;
139
if (!OB_SUCC(ret) && NULL != processor) {
140
ob_delete(processor);
147
int ObSrvXlator::th_init()
149
int ret = common::OB_SUCCESS;
151
if (OB_FAIL(rpc_xlator_.th_init())) {
152
LOG_ERROR("init rpc translator for thread fail", K(ret));
153
} else if (OB_FAIL(mysql_xlator_.th_init())) {
154
LOG_ERROR("init mysql translator for thread fail", K(ret));
159
int ObSrvXlator::th_destroy()
161
int ret = common::OB_SUCCESS;
162
if (OB_FAIL(rpc_xlator_.th_destroy())) {
163
LOG_ERROR("destroy rpc translator for thread fail", K(ret));
164
} else if (OB_FAIL(mysql_xlator_.th_destroy())) {
165
LOG_ERROR("destroy mysql translator for thread fail", K(ret));
170
typedef union EP_RPCP_BUF {
171
char rpcp_buffer_[RPCP_BUF_SIZE]; // reserve memory for rpc processor
172
char ep_buffer_[sizeof (ObErrorP) + sizeof (ObMPError)];
174
// Make sure election rpc processor allocated successfully when OOM occurs
175
STATIC_ASSERT(RPCP_BUF_SIZE >= sizeof(oceanbase::palf::ElectionPrepareRequestMsgP), "RPCP_BUF_SIZE should be big enough to allocate election processer");
176
STATIC_ASSERT(RPCP_BUF_SIZE >= sizeof(oceanbase::palf::ElectionPrepareResponseMsgP), "RPCP_BUF_SIZE should be big enough to allocate election processer");
177
STATIC_ASSERT(RPCP_BUF_SIZE >= sizeof(oceanbase::palf::ElectionAcceptRequestMsgP), "RPCP_BUF_SIZE should be big enough to allocate election processer");
178
STATIC_ASSERT(RPCP_BUF_SIZE >= sizeof(oceanbase::palf::ElectionAcceptResponseMsgP), "RPCP_BUF_SIZE should be big enough to allocate election processer");
179
STATIC_ASSERT(RPCP_BUF_SIZE >= sizeof(oceanbase::palf::ElectionChangeLeaderMsgP), "RPCP_BUF_SIZE should be big enough to allocate election processer");
182
char buffer_[sizeof (ObMPStmtClose)];
185
_RLOCAL(EP_RPCP_BUF, co_ep_rpcp_buf) __attribute__((aligned(64)));
186
_RLOCAL(CLOSEPBUF, co_closepbuf) __attribute__((aligned(64)));
188
int ObSrvMySQLXlator::translate(rpc::ObRequest &req, ObReqProcessor *&processor)
190
int ret = OB_SUCCESS;
193
if (ObRequest::OB_MYSQL != req.get_type()) {
194
LOG_ERROR("can't translate non-mysql request");
195
ret = OB_ERR_UNEXPECTED;
197
if (req.is_in_connected_phase()) {
198
ret = get_mp_connect_processor(processor);
200
const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket &>(req.get_packet());
201
switch (pkt.get_cmd()) {
202
MYSQL_PROCESSOR(ObMPQuery, gctx_);
203
MYSQL_PROCESSOR(ObMPQuit, gctx_);
204
MYSQL_PROCESSOR(ObMPPing, gctx_);
205
MYSQL_PROCESSOR(ObMPInitDB, gctx_);
206
MYSQL_PROCESSOR(ObMPChangeUser, gctx_);
207
MYSQL_PROCESSOR(ObMPStatistic, gctx_);
208
MYSQL_PROCESSOR(ObMPStmtPrepare, gctx_);
209
MYSQL_PROCESSOR(ObMPStmtExecute, gctx_);
210
MYSQL_PROCESSOR(ObMPStmtFetch, gctx_);
211
MYSQL_PROCESSOR(ObMPStmtReset, gctx_);
212
MYSQL_PROCESSOR(ObMPStmtPrexecute, gctx_);
213
MYSQL_PROCESSOR(ObMPStmtSendPieceData, gctx_);
214
MYSQL_PROCESSOR(ObMPStmtGetPieceData, gctx_);
215
MYSQL_PROCESSOR(ObMPStmtSendLongData, gctx_);
216
MYSQL_PROCESSOR(ObMPResetConnection, gctx_);
217
// ps stmt close request may not response packet.
218
// Howerver, in get processor phase, it may report
219
// error due to lack of memory and this response error packet.
220
// To avoid this situation, we make stmt close processor
222
case obmysql::COM_STMT_CLOSE: {
223
char *closepbuf = (&co_closepbuf)->buffer_;
224
ObMPStmtClose* p = new (&closepbuf[0]) ObMPStmtClose(gctx_);
225
if (OB_FAIL(p->init())) {
226
SERVER_LOG(ERROR, "Init ObMPStmtClose fail", K(ret));
233
case obmysql::COM_FIELD_LIST: {
234
/*为了和proxy进行适配,对于COM_FIELD_LIST命令的支持,按照以下原则支持:
235
* 1. 如果是非Proxy模式,返回正常的查询结果包
237
* 2.1. 如果有版本号:1.7.6 以下返回不支持错误包;
238
* 1.7.6 及以上返回正常额查询结果;
240
* 2.2. 如果没有版本号,返回不支持错误包;
242
ObSMConnection *conn = reinterpret_cast<ObSMConnection* >(
243
SQL_REQ_OP.get_sql_session(&req));
244
if (OB_ISNULL(conn)) {
245
ret = OB_ERR_UNEXPECTED;
246
LOG_WARN("get unexpected null", K(conn), K(ret));
247
} else if (conn->is_proxy_) {
248
const char *sup_proxy_min_version = "1.7.6";
249
uint64_t min_proxy_version = 0;
250
if (OB_FAIL(ObClusterVersion::get_version(sup_proxy_min_version, min_proxy_version))) {
251
LOG_WARN("failed to get version", K(ret));
252
} else if (conn->proxy_version_ < min_proxy_version) {
253
NEW_MYSQL_PROCESSOR(ObMPDefault, gctx_);
255
NEW_MYSQL_PROCESSOR(ObMPQuery, gctx_);
258
NEW_MYSQL_PROCESSOR(ObMPQuery, gctx_);
263
NEW_MYSQL_PROCESSOR(ObMPDefault, gctx_);
266
if (OB_SUCC(ret) && pkt.get_cmd() == obmysql::COM_FIELD_LIST) {
267
if (OB_ISNULL(static_cast<ObMPQuery *>(processor))) {
268
ret = OB_ERR_UNEXPECTED;
269
LOG_WARN("get unexpected null", K(static_cast<ObMPQuery *>(processor)));
271
static_cast<ObMPQuery *>(processor)->set_is_com_filed_list();
274
if (OB_SUCC(ret) && (pkt.get_cmd() == obmysql::COM_STMT_PREPARE
275
|| pkt.get_cmd() == obmysql::COM_STMT_PREXECUTE)) {
276
ObSMConnection *conn = reinterpret_cast<ObSMConnection* >(
277
SQL_REQ_OP.get_sql_session(&req));
278
if (OB_ISNULL(conn) || OB_ISNULL(dynamic_cast<ObMPBase *>(processor))) {
279
ret = OB_ERR_UNEXPECTED;
280
LOG_WARN("get unexpected null", K(dynamic_cast<ObMPBase *>(processor)));
282
uint64_t proxy_version = conn->is_proxy_ ? conn->proxy_version_ : 0;
283
static_cast<ObMPBase *>(processor)->set_proxy_version(proxy_version);
287
if (OB_FAIL(ret) && NULL != processor) {
288
worker_allocator_delete(processor);
296
ObReqProcessor *ObSrvXlator::get_processor(ObRequest &req)
298
int ret = OB_SUCCESS;
299
ObReqProcessor *processor = NULL;
301
// 1. create processor by request type.
302
if (req.get_discard_flag() == true) {
303
ret = OB_WAITQUEUE_TIMEOUT;
304
} else if (ObRequest::OB_MYSQL == req.get_type()) {
305
ret = mysql_xlator_.translate(req, processor);
306
} else if (ObRequest::OB_RPC == req.get_type()) {
307
const obrpc::ObRpcPacket &pkt
308
= reinterpret_cast<const obrpc::ObRpcPacket &>(req.get_packet());
309
ret = rpc_xlator_.translate(req, processor);
311
THIS_WORKER.set_timeout_ts(req.get_receive_timestamp() + pkt.get_timeout());
312
THIS_WORKER.set_ntp_offset(req.get_receive_timestamp() - req.get_send_timestamp());
314
} else if (ObRequest::OB_TASK == req.get_type() ||
315
ObRequest::OB_TS_TASK == req.get_type() ||
316
ObRequest::OB_SQL_TASK == req.get_type()) {
317
processor = &static_cast<ObSrvTask&>(req).get_processor();
319
LOG_WARN("can't translate packet", "type", req.get_type());
320
ret = OB_UNKNOWN_PACKET;
323
// destroy processor if alloc before but translate fail.
324
if (OB_FAIL(ret) && NULL != processor) {
325
worker_allocator_delete(processor);
329
if (OB_ISNULL(processor)) {
330
if (ObRequest::OB_RPC == req.get_type()) {
331
processor = get_error_rpc_processor(ret);
332
} else if (ObRequest::OB_MYSQL == req.get_type()) {
333
processor = get_error_mysql_processor(ret);
334
(static_cast<ObMPError*>(processor))->set_need_disconnect(true);
342
int ObSrvXlator::release(ObReqProcessor *processor)
344
int ret = OB_SUCCESS;
345
const char *epbuf = (&co_ep_rpcp_buf)->ep_buffer_;
346
const char *cpbuf = (&co_closepbuf)->buffer_;
347
const char *rpcpbuf = (&co_ep_rpcp_buf)->rpcp_buffer_;
348
if (NULL == processor) {
349
ret = OB_INVALID_ARGUMENT;
350
LOG_ERROR("invalid argument", K(processor), K(ret));
351
} else if (reinterpret_cast<char*>(processor) == epbuf || reinterpret_cast<char*>(processor) == rpcpbuf) {
352
processor->destroy();
353
processor->~ObReqProcessor();
354
} else if (reinterpret_cast<char*>(processor) == cpbuf) {
355
processor->destroy();
356
ObRequest::TransportProto nio_protocol = (ObRequest::TransportProto)processor->get_nio_protocol();
357
processor->~ObReqProcessor();
359
processor->destroy();
361
// task request is allocated when new task composed, then delete
363
ObRequest *req = const_cast<ObRequest*>(processor->get_ob_request());
364
ObRequest::Type req_type = (ObRequest::Type)processor->get_req_type();
365
ObRequest::TransportProto nio_protocol = (ObRequest::TransportProto)processor->get_nio_protocol();
366
bool need_retry = processor->get_need_retry();
367
bool async_resp_used = processor->get_async_resp_used();
368
if (ObRequest::OB_TASK == req_type) {
369
//Deal with sqltask memory release
372
} else if (ObRequest::OB_TS_TASK == req_type) {
373
//Deal with the memory release of the transaction task
374
ObTsResponseTaskFactory::free(static_cast<ObTsResponseTask *>(req));
375
//op_reclaim_free(req);
377
} else if (ObRequest::OB_SQL_TASK == req_type) {
378
ObSqlTaskFactory::get_instance().free(static_cast<ObSqlTask *>(req));
381
worker_allocator_delete(processor);
388
ObReqProcessor *ObSrvXlator::get_error_rpc_processor(const int ret)
390
char *epbuf = (&co_ep_rpcp_buf)->ep_buffer_;
391
ObErrorP *p = new (&epbuf[0]) ObErrorP(ret);
395
ObReqProcessor *ObSrvXlator::get_error_mysql_processor(const int ret)
397
char *epbuf = (&co_ep_rpcp_buf)->ep_buffer_;
398
ObMPError *p = new (&epbuf[0]) ObMPError(ret);
402
int ObSrvMySQLXlator::get_mp_connect_processor(ObReqProcessor *&ret_proc)
404
int ret = OB_SUCCESS;
405
ObMPConnect *proc = NULL;
406
void *buf = THIS_WORKER.get_sql_arena_allocator().alloc(sizeof(ObMPConnect));
407
if (OB_ISNULL(buf)) {
408
ret = OB_ALLOCATE_MEMORY_FAILED;
409
LOG_ERROR("failed to allocate memory for ObMPConnect", K(ret));
411
proc = new(buf) ObMPConnect(gctx_);
412
if (OB_FAIL(proc->init())) {
413
LOG_ERROR("init ObMPConnect fail", K(ret));
414
worker_allocator_delete(proc);