oceanbase

Форк
0
/
ob_async_cmd_driver.cpp 
115 строк · 4.4 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER
14

15
#include "ob_async_cmd_driver.h"
16

17
#include "lib/profile/ob_perf_event.h"
18
#include "obsm_row.h"
19
#include "sql/resolver/cmd/ob_variable_set_stmt.h"
20
#include "observer/mysql/obmp_query.h"
21
#include "observer/mysql/obmp_stmt_prexecute.h"
22

23
namespace oceanbase
24
{
25
using namespace common;
26
using namespace sql;
27
using namespace obmysql;
28
namespace observer
29
{
30

31
ObAsyncCmdDriver::ObAsyncCmdDriver(const ObGlobalContext &gctx,
32
                                 const ObSqlCtx &ctx,
33
                                 sql::ObSQLSessionInfo &session,
34
                                 ObQueryRetryCtrl &retry_ctrl,
35
                                 ObIMPPacketSender &sender,
36
                                 bool is_prexecute)
37
    : ObQueryDriver(gctx, ctx, session, retry_ctrl, sender, is_prexecute)
38
{
39
}
40

41
ObAsyncCmdDriver::~ObAsyncCmdDriver()
42
{
43
}
44

45
/*
46
 * for now, ObAsyncCmdDriver is just an implementation of async end trans
47
 */
48
int ObAsyncCmdDriver::response_result(ObMySQLResultSet &result)
49
{
50
  ACTIVE_SESSION_FLAG_SETTER_GUARD(in_sql_execution);
51
  int ret = OB_SUCCESS;
52
  ObSqlEndTransCb &sql_end_cb = session_.get_mysql_end_trans_cb();
53
  ObEndTransCbPacketParam pkt_param;
54
  result.set_end_trans_async(true);
55
  ObCurTraceId::TraceId *cur_trace_id = NULL;
56
  if (OB_ISNULL(cur_trace_id = ObCurTraceId::get_trace_id())) {
57
    ret = OB_ERR_UNEXPECTED;
58
    LOG_ERROR("current trace id is NULL", K(ret));
59
  } else if (is_prexecute_
60
    && OB_FAIL(response_query_header(result, false, false, true))) {
61
    LOG_WARN("flush buffer fail before send async ok packet.", K(ret));
62
  } else if (OB_FAIL(sql_end_cb.set_packet_param(pkt_param.fill(result, session_, *cur_trace_id)))) {
63
    LOG_ERROR("fail to set packet param", K(ret));
64
  } else if (OB_FAIL(result.open())) {
65
    //once open failed, nothing will be responded to clients
66
    //so, we need to decide to retry or not here.
67
    int cli_ret = OB_SUCCESS;
68
    int close_ret = OB_SUCCESS;
69
    if ((close_ret = result.close()) != OB_SUCCESS) { //should not use OB_FAIL which will overwrite the ret
70
      LOG_WARN("close result set fail", K(close_ret));
71
    }
72
    if (!result.is_async_end_trans_submitted()) {
73
      retry_ctrl_.test_and_save_retry_state(gctx_, ctx_, result, ret, cli_ret, is_prexecute_);
74
      LOG_WARN("result set open failed, check if need retry",
75
               K(ret), K(cli_ret), K(retry_ctrl_.need_retry()));
76
      ret = cli_ret;
77
    } else {
78
      LOG_WARN("result set open failed, async end trans submmited, don't retry", K(ret));
79
    }
80
    //send error packet in sql thread
81
    if (!OB_SUCC(ret) && !retry_ctrl_.need_retry() && (!result.is_async_end_trans_submitted())) {
82
      int sret = OB_SUCCESS;
83
      bool is_partition_hit = session_.partition_hit().get_bool();
84
      if (OB_SUCCESS != (sret = sender_.send_error_packet(ret, NULL, is_partition_hit))) {
85
        LOG_WARN("send error packet fail", K(sret), K(ret));
86
      }
87
    }
88
  } else {
89
    //what if begin;select 1; select 2; commit; commit;
90
    //we should still have to respond a packet to client in terms of the last commit
91
    if (OB_UNLIKELY(!result.is_async_end_trans_submitted())) {
92
      ObOKPParam ok_param;
93
      ok_param.affected_rows_ = 0;
94
      // The commit asynchronous callback logic needs
95
      // to trigger the update logic of affected row first.
96
      if (session_.is_session_sync_support()) {
97
        session_.set_affected_rows_is_changed(ok_param.affected_rows_);
98
      }
99
      session_.set_affected_rows(ok_param.affected_rows_);
100
      ok_param.is_partition_hit_ = session_.partition_hit().get_bool();
101
      ok_param.has_more_result_ = result.has_more_result();
102
      if (OB_FAIL(sender_.send_ok_packet(session_, ok_param))) {
103
        LOG_WARN("fail to send ok packt", K(ok_param), K(ret));
104
      }
105
    }
106
    int close_ret = OB_SUCCESS;
107
    if (OB_SUCCESS != (close_ret = result.close())) {
108
      LOG_WARN("close result failed", K(close_ret));
109
    }
110
  }
111
  return ret;
112
}
113

114
}/* ns observer*/
115
}/* ns oceanbase */
116

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.