oceanbase

Форк
0
/
ob_sync_cmd_driver.cpp 
377 строк · 14.6 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER
14

15
#include "ob_sync_cmd_driver.h"
16

17
#include "lib/profile/ob_perf_event.h"
18
#include "obsm_row.h"
19
#include "sql/resolver/cmd/ob_variable_set_stmt.h"
20
#include "observer/mysql/obmp_query.h"
21
#include "rpc/obmysql/packet/ompk_row.h"
22
#include "rpc/obmysql/packet/ompk_eof.h"
23
#include "share/ob_lob_access_utils.h"
24
#include "observer/mysql/obmp_stmt_prexecute.h"
25
#include "src/pl/ob_pl_user_type.h"
26
#ifdef OB_BUILD_ORACLE_XML
27
#include "sql/engine/expr/ob_expr_xml_func_helper.h"
28
#endif
29

30
namespace oceanbase
31
{
32
using namespace common;
33
using namespace sql;
34
using namespace obmysql;
35
using namespace share;
36
namespace observer
37
{
38

39
ObSyncCmdDriver::ObSyncCmdDriver(const ObGlobalContext &gctx,
40
                                 const ObSqlCtx &ctx,
41
                                 sql::ObSQLSessionInfo &session,
42
                                 ObQueryRetryCtrl &retry_ctrl,
43
                                 ObIMPPacketSender &sender,
44
                                 bool is_prexecute)
45
    : ObQueryDriver(gctx, ctx, session, retry_ctrl, sender, is_prexecute)
46
{
47
}
48

49
ObSyncCmdDriver::~ObSyncCmdDriver()
50
{
51
}
52

53
int ObSyncCmdDriver::send_eof_packet(bool has_more_result)
54
{
55
  int ret = OB_SUCCESS;
56
  OMPKEOF eofp;
57

58
  if (OB_FAIL(seal_eof_packet(has_more_result, eofp))) {
59
    LOG_WARN("failed to seal eof packet", K(ret), K(has_more_result));
60
  } else if (OB_FAIL(sender_.response_packet(eofp, &session_))) {
61
    LOG_WARN("response packet fail", K(ret), K(has_more_result));
62
  }
63
  return ret;
64
}
65

66
int ObSyncCmdDriver::seal_eof_packet(bool has_more_result, OMPKEOF& eofp)
67
{
68
  int ret = OB_SUCCESS;
69
  const ObWarningBuffer *warnings_buf = common::ob_get_tsi_warning_buffer();
70
  uint16_t warning_count = 0;
71
  if (OB_ISNULL(warnings_buf)) {
72
    LOG_WARN("can not get thread warnings buffer", K(warnings_buf));
73
  } else {
74
    warning_count = static_cast<uint16_t>(warnings_buf->get_readable_warning_count());
75
  }
76
  eofp.set_warning_count(warning_count);
77
  ObServerStatusFlags flags = eofp.get_server_status();
78
  flags.status_flags_.OB_SERVER_STATUS_IN_TRANS
79
    = (session_.is_server_status_in_transaction() ? 1 : 0);
80
  flags.status_flags_.OB_SERVER_STATUS_AUTOCOMMIT = (session_.get_local_autocommit() ? 1 : 0);
81
  flags.status_flags_.OB_SERVER_MORE_RESULTS_EXISTS = has_more_result;
82
  // flags.status_flags_.OB_SERVER_PS_OUT_PARAMS = 1;
83
  if (!session_.is_obproxy_mode()) {
84
    // in java client or others, use slow query bit to indicate partition hit or not
85
    flags.status_flags_.OB_SERVER_QUERY_WAS_SLOW = !session_.partition_hit().get_bool();
86
  }
87

88
  eofp.set_server_status(flags);
89

90
  // for proxy
91
  // in multi-stmt, send extra ok packet in the last stmt(has no more result)
92
  if (!is_prexecute_ && !has_more_result
93
        && OB_FAIL(sender_.update_last_pkt_pos())) {
94
    LOG_WARN("failed to update last packet pos", K(ret));
95
  }
96
  return ret;
97
}
98

99
int ObSyncCmdDriver::response_query_result(sql::ObResultSet &result,
100
                                           bool is_ps_protocol,
101
                                           bool has_more_result,
102
                                           bool &can_retry,
103
                                           int64_t fetch_limit)
104
{
105
  return ObQueryDriver::response_query_result(
106
    result, is_ps_protocol, has_more_result, can_retry, fetch_limit);
107
}
108

109

110
void ObSyncCmdDriver::free_output_row(ObMySQLResultSet &result)
111
{
112
  if (OB_NOT_NULL(result.get_exec_context().get_output_row())) {
113
    const ObNewRow *row = result.get_exec_context().get_output_row();
114
    for (int64_t i = 0; i < row->get_count(); ++i) {
115
      ObObj &obj = row->cells_[i];
116
      if (obj.is_pl_extend()) {
117
        (void)pl::ObUserDefinedType::destruct_obj(obj, &session_);
118
      }
119
    }
120
  }
121
}
122

123
int ObSyncCmdDriver::response_result(ObMySQLResultSet &result)
124
{
125
  ACTIVE_SESSION_FLAG_SETTER_GUARD(in_sql_execution);
126
  int ret = OB_SUCCESS;
127
  bool process_ok = false;
128
  // for select SQL
129
  OMPKEOF eofp;
130
  bool need_send_eof = false;
131
  if (OB_FAIL(result.open())) {
132
    // 只有open失败的时候才可能重试,因open的时候会开启事务/语句等,并且没有给用户返回任何信息
133
    int cret = OB_SUCCESS;
134
    int cli_ret = OB_SUCCESS;
135
    if (ObStmt::is_ddl_stmt(result.get_stmt_type(), result.has_global_variable())) {
136
      // even failed, still need update lsv, as drop multi tables are not in one trx.
137
      cret = process_schema_version_changes(result);
138
      if (OB_SUCCESS != cret) {
139
        LOG_WARN("failed to set schema version changes", K(cret));
140
      }
141
    }
142

143
    cret = result.close();
144
    if (cret != OB_SUCCESS) {
145
      LOG_WARN("close result set fail", K(cret));
146
    }
147

148
    // open失败,决定是否需要重试
149
    retry_ctrl_.test_and_save_retry_state(gctx_, ctx_, result, ret, cli_ret);
150
    LOG_WARN("result set open failed, check if need retry",
151
             K(ret), K(cli_ret), K(retry_ctrl_.need_retry()));
152
    ret = cli_ret;
153
  } else if (result.is_with_rows()) {
154
    if (!result.is_pl_stmt(result.get_stmt_type())) {
155
      ret = OB_ERR_UNEXPECTED;
156
      LOG_ERROR("Not SELECT, should not have any row!!!", K(ret));
157
    } else if (is_mysql_mode() && session_.client_non_standard()) {
158
      // do nothing
159
    } else if (OB_FAIL(response_query_result(result))) {
160
      LOG_WARN("response query result fail", K(ret));
161
      free_output_row(result);
162
      int cret = result.close();
163
      if (cret != OB_SUCCESS) {
164
        LOG_WARN("close result set fail", K(cret));
165
      }
166
    } else {
167
      if (OB_FAIL(seal_eof_packet(result.has_more_result(), eofp))) {
168
        LOG_WARN("failed to send eof package", K(ret), K(result.has_more_result()));
169
      } else {
170
        need_send_eof = true;
171
      }
172
    }
173
  } else if (is_prexecute_) {
174
    if (OB_FAIL(response_query_header(result, false, false , // in prexecute , has_more_result and has_ps out is no matter, it will be recalc
175
                                      true))) {
176
      // need close result set
177
      int close_ret = OB_SUCCESS;
178
      if (OB_SUCCESS != (close_ret = result.close())) {
179
        LOG_WARN("close result failed", K(close_ret));
180
      }
181
      LOG_WARN("prexecute response query head fail. ", K(ret));
182
    }
183
  }
184

185
  if (OB_SUCC(ret)) {
186
    // for CRUD SQL
187
    // must be called before result.close()
188
    process_schema_version_changes(result);
189
    free_output_row(result);
190
    if (OB_FAIL(result.close())) {
191
      LOG_WARN("close result set fail", K(ret));
192
    } else if (!result.is_with_rows()
193
                || (sender_.need_send_extra_ok_packet() && !result.has_more_result())
194
                || is_prexecute_
195
                || (is_mysql_mode() && session_.client_non_standard())) {
196
      process_ok = true;
197
      ObOKPParam ok_param;
198
      ok_param.message_ = const_cast<char*>(result.get_message());
199
      ok_param.affected_rows_ = result.get_affected_rows();
200
      ok_param.lii_ = result.get_last_insert_id_to_client();
201
      const ObWarningBuffer *warnings_buf = common::ob_get_tsi_warning_buffer();
202
      if (OB_ISNULL(warnings_buf)) {
203
        LOG_WARN("can not get thread warnings buffer");
204
      } else {
205
        ok_param.warnings_count_ =
206
            static_cast<uint16_t>(warnings_buf->get_readable_warning_count());
207
      }
208
      ok_param.is_partition_hit_ = session_.partition_hit().get_bool();
209
      ok_param.has_more_result_ = result.has_more_result();
210
      ok_param.has_pl_out_ = is_prexecute_ && result.is_with_rows() ? true : false;
211
      if (need_send_eof) {
212
        if (OB_FAIL(sender_.send_ok_packet(session_, ok_param, &eofp))) {
213
          LOG_WARN("send ok packet fail", K(ok_param), K(ret));
214
        }
215
      } else {
216
        if (OB_FAIL(sender_.send_ok_packet(session_, ok_param))) {
217
          LOG_WARN("send ok packet fail", K(ok_param), K(ret));
218
        }
219
      }
220
    } else {
221
      if (need_send_eof && OB_FAIL(sender_.response_packet(eofp, &session_))) {
222
        LOG_WARN("response packet fail", K(ret));
223
      }
224
    }
225
  } else { /*do nothing*/ }
226

227
  if (!OB_SUCC(ret) && !process_ok && !retry_ctrl_.need_retry()) {
228
    int sret = OB_SUCCESS;
229
    bool is_partition_hit = session_.partition_hit().get_bool();
230
    if (OB_SUCCESS != (sret = sender_.send_error_packet(ret, NULL, is_partition_hit))) {
231
      LOG_WARN("send error packet fail", K(sret), K(ret));
232
    }
233
  }
234
  return ret;
235
}
236

237
// must be called before result.close()
238
// two aspects:
239
// - set session last_schema_version to proxy for part DDL
240
// - promote local schema up to target version if last_schema_version is set
241
int ObSyncCmdDriver::process_schema_version_changes(
242
    const ObMySQLResultSet &result)
243
{
244
  int ret = OB_SUCCESS;
245

246
  if (OB_ISNULL(gctx_.schema_service_)) {
247
    ret = OB_INVALID_ARGUMENT;
248
    LOG_ERROR("invalid schema service", K(ret));
249
  } else {
250
    uint64_t tenant_id = session_.get_effective_tenant_id();
251
    // - set session last_schema_version to proxy for DDL
252
    if (ObStmt::is_ddl_stmt(result.get_stmt_type(), result.has_global_variable())) {
253
      if (OB_FAIL(ObSQLUtils::update_session_last_schema_version(*gctx_.schema_service_,
254
                                                                 session_))) {
255
        LOG_WARN("fail to update session last schema_version", K(ret));
256
      }
257
    }
258

259
    // TODO: (xiaochu.yh) 和xiyu沟通结论:这段逻辑可以下移
260
    //  > 应该是当时没有细想吧, 可以放到下层的result set中
261
    if (OB_SUCC(ret)) {
262
      // - promote local schema up to target version if last_schema_version is set
263
      if (result.get_stmt_type() == stmt::T_VARIABLE_SET) {
264
        const ObVariableSetStmt *set_stmt = static_cast<const ObVariableSetStmt*>(result.get_cmd());
265
        if (NULL != set_stmt) {
266
          ObVariableSetStmt::VariableSetNode tmp_node;//just for init node
267
          for (int64_t i = 0; OB_SUCC(ret) && i < set_stmt->get_variables_size(); ++i) {
268
            ObVariableSetStmt::VariableSetNode &var_node = tmp_node;
269
            ObString set_var_name(OB_SV_LAST_SCHEMA_VERSION);
270
            if (OB_FAIL(set_stmt->get_variable_node(i, var_node))) {
271
              LOG_WARN("fail to get_variable_node", K(i), K(ret));
272
            } else {
273
              if (ObCharset::case_insensitive_equal(var_node.variable_name_,
274
                                                    set_var_name)) {
275
                if (OB_FAIL(check_and_refresh_schema(tenant_id))) {
276
                  LOG_WARN("failed to check_and_refresh_schema", K(ret), K(tenant_id));
277
                }
278
                break;
279
              }
280
            }
281
          }
282
        }
283
      }
284
    }
285
  }
286
  return ret;
287
}
288

289
// FIXME: 在目标租户执行 set @@ob_last_schema_version = 123456;
290
//        后是否需要触发刷 schema?
291
//        当前的行为是,只要通过 sql 主动设置,则按照设置的来。
292
int ObSyncCmdDriver::check_and_refresh_schema(uint64_t tenant_id)
293
{
294
  int ret = OB_SUCCESS;
295
  int64_t local_version = 0;
296
  int64_t last_version = 0;
297

298
  if (OB_ISNULL(gctx_.schema_service_)) {
299
    ret = OB_INVALID_ARGUMENT;
300
    LOG_WARN("null schema service", K(ret), K(gctx_));
301
  } else {
302
    if (OB_FAIL(gctx_.schema_service_->get_tenant_refreshed_schema_version(tenant_id, local_version))) {
303
      LOG_WARN("fail to get tenant refreshed schema version", K(ret));
304
    } else if (OB_FAIL(session_.get_ob_last_schema_version(last_version))) {
305
      LOG_WARN("failed to get_sys_variable", K(OB_SV_LAST_SCHEMA_VERSION));
306
    } else if (local_version >= last_version) {
307
      // skip
308
    } else if (OB_FAIL(gctx_.schema_service_->async_refresh_schema(tenant_id, last_version))) {
309
      LOG_WARN("failed to refresh schema", K(ret), K(tenant_id), K(last_version));
310
    }
311
  }
312
  return ret;
313
}
314

315
int ObSyncCmdDriver::response_query_result(ObMySQLResultSet &result)
316
{
317
  int ret = OB_SUCCESS;
318
  const common::ObNewRow *row = NULL;
319
  if (OB_FAIL(result.next_row(row)) ) {
320
    LOG_WARN("fail to get next row", K(ret));
321
  } else if (OB_FAIL(response_query_header(result, result.has_more_result(), true))) {
322
    LOG_WARN("fail to response query header", K(ret));
323
  } else if (OB_ISNULL(ctx_.session_info_)) {
324
    ret = OB_ERR_UNEXPECTED;
325
    LOG_WARN("session info is null", K(ret));
326
  } else {
327
    ObNewRow *tmp_row = const_cast<ObNewRow*>(row);
328
    for (int64_t i = 0; OB_SUCC(ret) && i < tmp_row->get_count(); i++) {
329
      ObObj& value = tmp_row->get_cell(i);
330
      if (ob_is_string_tc(value.get_type()) && CS_TYPE_INVALID != value.get_collation_type()) {
331
        OZ(convert_string_value_charset(value, result));
332
      } else if (value.is_clob_locator()
333
                && OB_FAIL(convert_lob_value_charset(value, result))) {
334
        LOG_WARN("convert lob value charset failed", K(ret));
335
      } else if (ob_is_text_tc(value.get_type())
336
                && OB_FAIL(convert_text_value_charset(value, result))) {
337
        LOG_WARN("convert text value charset failed", K(ret));
338
      }
339
      if (OB_FAIL(ret)) {
340
      } else if ((value.is_lob() || value.is_lob_locator() || value.is_json() || value.is_geometry())
341
                  && OB_FAIL(process_lob_locator_results(value, result))) {
342
        LOG_WARN("convert lob locator to longtext failed", K(ret));
343
#ifdef OB_BUILD_ORACLE_XML
344
      } else if (value.is_user_defined_sql_type() && OB_FAIL(ObXMLExprHelper::process_sql_udt_results(value, result))) {
345
        LOG_WARN("convert udt to client format failed", K(ret), K(value.get_udt_subschema_id()));
346
#endif
347
      }
348
    }
349

350
    if (OB_SUCC(ret)) {
351
      MYSQL_PROTOCOL_TYPE protocol_type = result.is_ps_protocol() ? MYSQL_PROTOCOL_TYPE::BINARY : MYSQL_PROTOCOL_TYPE::TEXT;
352
      const ObSQLSessionInfo *tmp_session = result.get_exec_context().get_my_session();
353
      const ObDataTypeCastParams dtc_params = ObBasicSessionInfo::create_dtc_params(tmp_session);
354
      ObSMRow sm_row(protocol_type,
355
                     *row,
356
                     dtc_params,
357
                     result.get_field_columns(),
358
                     ctx_.schema_guard_,
359
                     tmp_session->get_effective_tenant_id());
360
      OMPKRow rp(sm_row);
361
      if (OB_FAIL(sender_.response_packet(rp, const_cast<ObSQLSessionInfo *>(tmp_session)))) {
362
        LOG_WARN("response packet fail", K(ret), KP(row));
363
      } else {
364
        ObArenaAllocator *allocator = NULL;
365
        if (OB_FAIL(result.get_exec_context().get_convert_charset_allocator(allocator))) {
366
          LOG_WARN("fail to get lob fake allocator", K(ret));
367
        } else if (OB_NOT_NULL(allocator)) {
368
          allocator->reset();
369
        }
370
      }
371
    }
372
  }
373
  return ret;
374
}
375

376
}/* ns observer*/
377
}/* ns oceanbase */
378

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.