oceanbase

Форк
0
/
obmp_stmt_reset.cpp 
163 строки · 5.6 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER
14
#include "obmp_stmt_reset.h"
15
#include "rpc/ob_request.h"
16
#include "rpc/obmysql/ob_mysql_util.h"
17
#include "rpc/obmysql/ob_mysql_packet.h"
18
#include "sql/plan_cache/ob_ps_cache.h"
19
#include "sql/plan_cache/ob_prepare_stmt_struct.h"
20
#include "sql/session/ob_sql_session_info.h"
21
#include "observer/mysql/obmp_utils.h"
22
#include "observer/mysql/obmp_stmt_send_piece_data.h"
23

24
namespace oceanbase
25
{
26
using namespace common;
27
using namespace rpc;
28
using namespace obmysql;
29
using namespace sql;
30

31
namespace observer
32
{
33

34
int ObMPStmtReset::deserialize()
35
{
36
  int ret = OB_SUCCESS;
37
  if (OB_ISNULL(req_)) {
38
    ret = OB_ERR_UNEXPECTED;
39
    LOG_WARN("invalid packet", K(ret), K_(req));
40
  } else if (OB_UNLIKELY(req_->get_type() != ObRequest::OB_MYSQL)) {
41
    ret = OB_ERR_UNEXPECTED;
42
    LOG_WARN("invalid packet", K(ret), K_(req), K(req_->get_type()));
43
  } else {
44
    const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
45
    const char* pos = pkt.get_cdata();
46
    uint32_t stmt_id = -1; //INVALID_STMT_ID
47
    ObMySQLUtil::get_uint4(pos, stmt_id);
48
    stmt_id_ = stmt_id;
49
  }
50
  return ret;
51
}
52

53
int ObMPStmtReset::process()
54
{
55
  int ret = OB_SUCCESS;
56
  bool need_disconnect = true;
57
  bool need_response_error = true;
58
  sql::ObSQLSessionInfo *session = NULL;
59
  const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
60
  if (OB_ISNULL(req_)) {
61
    ret = OB_INVALID_ARGUMENT;
62
    LOG_WARN("invalid packet", K(ret), KP(req_));
63
  } else if (OB_INVALID_STMT_ID == stmt_id_) {
64
    ret = OB_INVALID_ARGUMENT;
65
    LOG_WARN("stmt_id is invalid", K(ret));
66
  } else if (OB_FAIL(get_session(session))) {
67
    LOG_WARN("get session failed");
68
  } else if (OB_ISNULL(session)) {
69
    ret = OB_ERR_UNEXPECTED;
70
    LOG_WARN("session is NULL or invalid", K(ret), K(session));
71
  } else if (OB_FAIL(process_kill_client_session(*session))) {
72
    LOG_WARN("client session has been killed", K(ret));
73
  } else if (FALSE_IT(session->set_txn_free_route(pkt.txn_free_route()))) {
74
  } else if (OB_FAIL(process_extra_info(*session, pkt, need_response_error))) {
75
    LOG_WARN("fail get process extra info", K(ret));
76
  } else if (FALSE_IT(session->post_sync_session_info())) {
77
  } else if (FALSE_IT(need_disconnect = false)) {
78
  } else if (OB_FAIL(update_transmission_checksum_flag(*session))) {
79
    LOG_WARN("update transmisson checksum flag failed", K(ret));
80
  } else {
81
    ObPieceCache *piece_cache = session->get_piece_cache();
82
    int64_t param_num = 0;
83
    THIS_WORKER.set_session(session);
84
    ObSQLSessionInfo::LockGuard lock_guard(session->get_query_lock());
85
    LOG_TRACE("close ps stmt or cursor", K_(stmt_id), K(session->get_sessid()));
86
    session->init_use_rich_format();
87

88
    // get stmt info
89
    if (OB_NOT_NULL(session->get_ps_cache())) {
90
      ObPsStmtInfoGuard guard;
91
      ObPsStmtInfo *ps_info = NULL;
92
      ObPsStmtId inner_stmt_id = OB_INVALID_ID;
93
      OZ (session->get_inner_ps_stmt_id(stmt_id_, inner_stmt_id));
94
      if (OB_FAIL(session->get_ps_cache()->get_stmt_info_guard(inner_stmt_id, guard))) {
95
        LOG_WARN("get stmt info guard failed", K(ret), K(stmt_id_), K(inner_stmt_id));
96
      } else if (OB_ISNULL(ps_info = guard.get_stmt_info())) {
97
        ret = OB_ERR_UNEXPECTED;
98
        LOG_WARN("get stmt info is null", K(ret));
99
      } else {
100
        param_num= ps_info->get_num_of_param();
101
      }
102
    }
103

104
    // remove piece
105
    if (NULL == piece_cache) {
106
    } else {
107
      for (uint64_t i = 0; OB_SUCC(ret) && i < param_num; i++) {
108
        if (OB_FAIL(piece_cache->remove_piece(
109
                            piece_cache->get_piece_key(stmt_id_, i),
110
                            *session))) {
111
          if (OB_HASH_NOT_EXIST == ret) {
112
            ret = OB_SUCCESS;
113
          } else {
114
            LOG_WARN("remove piece fail", K(stmt_id_), K(i), K(ret));
115
          }
116
        }
117
      }
118
    }
119

120
    // close cursor
121
    if (OB_NOT_NULL(session->get_cursor(stmt_id_))) {
122
      if (OB_FAIL(session->close_cursor(stmt_id_))) {
123
        LOG_WARN("fail to close cursor", K(ret), K_(stmt_id), K(session->get_sessid()));
124
      }
125
    }
126

127
    if (OB_SUCC(ret)) {
128
      if (pkt.exist_trace_info()
129
          && OB_FAIL(session->update_sys_variable(share::SYS_VAR_OB_TRACE_INFO,
130
                                                  pkt.get_trace_info()))) {
131
        LOG_WARN("fail to update trace info", K(ret));
132
      }
133
    }
134
  }
135

136
  if (OB_SUCC(ret)) {
137
    ObOKPParam ok_param;
138
    ok_param.affected_rows_ = 0;
139
    ok_param.is_partition_hit_ = session->partition_hit().get_bool();
140
    ok_param.has_more_result_ = false;
141
    if (OB_FAIL(send_ok_packet(*session, ok_param))) {
142
      LOG_WARN("send ok packet fail.", K(ret), K(stmt_id_));
143
    }
144
  } else {
145
    if (need_response_error) {
146
      send_error_packet(ret, NULL);
147
    }
148
    if (OB_ERR_PREPARE_STMT_CHECKSUM == ret || need_disconnect) {
149
      force_disconnect();
150
      LOG_WARN("prepare stmt checksum error, disconnect connection", K(ret));
151
    }
152
  }
153
  flush_buffer(true);
154

155
  THIS_WORKER.set_session(NULL);
156
  if (NULL != session) {
157
    revert_session(session);
158
  }
159
  return ret;
160
}
161

162
} //end of namespace sql
163
} //end of namespace oceanbase
164

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.