oceanbase

Форк
0
/
ob_async_plan_driver.cpp 
168 строк · 7.3 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#define USING_LOG_PREFIX SERVER
14

15
#include "ob_async_plan_driver.h"
16

17
#include "rpc/obmysql/packet/ompk_eof.h"
18
#include "rpc/obmysql/packet/ompk_resheader.h"
19
#include "rpc/obmysql/packet/ompk_field.h"
20
#include "rpc/obmysql/packet/ompk_row.h"
21
#include "rpc/obmysql/ob_mysql_field.h"
22
#include "rpc/obmysql/ob_mysql_packet.h"
23
#include "lib/profile/ob_perf_event.h"
24
#include "obsm_row.h"
25
#include "observer/mysql/obmp_query.h"
26
#include "observer/mysql/ob_mysql_end_trans_cb.h"
27
#include "observer/mysql/obmp_stmt_prexecute.h"
28

29
namespace oceanbase
30
{
31
using namespace common;
32
using namespace sql;
33
using namespace obmysql;
34
namespace observer
35
{
36

37
ObAsyncPlanDriver::ObAsyncPlanDriver(const ObGlobalContext &gctx,
38
                                     const ObSqlCtx &ctx,
39
                                     sql::ObSQLSessionInfo &session,
40
                                     ObQueryRetryCtrl &retry_ctrl,
41
                                     ObIMPPacketSender &sender,
42
                                     bool is_prexecute)
43
    : ObQueryDriver(gctx, ctx, session, retry_ctrl, sender, is_prexecute)
44
{
45
}
46

47
ObAsyncPlanDriver::~ObAsyncPlanDriver()
48
{
49
}
50

51
int ObAsyncPlanDriver::response_result(ObMySQLResultSet &result)
52
{
53
  ACTIVE_SESSION_FLAG_SETTER_GUARD(in_sql_execution);
54
  int ret = OB_SUCCESS;
55
  // result.open 后 pkt_param 所需的 last insert id 等各项参数都已经计算完毕
56
  // 对于异步增删改的情况,需要提前update last insert id,以确保回调pkt_param参数正确
57
  // 后续result set close的时候,还会进行一次store_last_insert_id的调用
58
  ObCurTraceId::TraceId *cur_trace_id = NULL;
59
  if (OB_ISNULL(cur_trace_id = ObCurTraceId::get_trace_id())) {
60
    ret = OB_ERR_UNEXPECTED;
61
    LOG_ERROR("current trace id is NULL", K(ret));
62
  } else if (OB_FAIL(result.open())) {
63
    LOG_WARN("failed to do result set open", K(ret));
64
  } else if (OB_FAIL(result.update_last_insert_id_to_client())) {
65
    LOG_WARN("failed to update last insert id after open", K(ret));
66
  } else {
67
    // open 成功,允许异步回包
68
    result.set_end_trans_async(true);
69
  }
70

71
  if (OB_SUCCESS != ret) {
72
    // 如果try_again为true,说明这条SQL需要重做。考虑到重做之前我们需要回滚整个事务,会调用EndTransCb
73
    // 所以这里设置一个标记,告诉EndTransCb这种情况下不要给客户端回包。
74
    int cli_ret = OB_SUCCESS;
75
    retry_ctrl_.test_and_save_retry_state(gctx_, ctx_, result, ret, cli_ret);
76
    if (retry_ctrl_.need_retry()) {
77
      result.set_will_retry();
78
      result.set_end_trans_async(false);
79
    }
80
    // close背后的故事:
81
    // if (try_again) {
82
    //   result.close()结束后返回到这里,然后运行重试逻辑
83
    // } else {
84
    //   result.close()结束后,process流程应该干净利落地结束,有什么事留到callback里面做
85
    // }
86
    int cret = result.close();
87
    if (retry_ctrl_.need_retry()) {
88
      LOG_WARN("result set open failed, will retry",
89
               K(ret), K(cli_ret), K(cret), K(retry_ctrl_.need_retry()));
90
    } else {
91
      LOG_WARN("result set open failed, let's leave process(). EndTransCb will clean this mess",
92
               K(ret), K(cli_ret), K(cret), K(retry_ctrl_.need_retry()));
93
    }
94
    ret = cli_ret;
95
  } else if (is_prexecute_ && OB_FAIL(response_query_header(result, false, false, true))) {
96
    /*
97
    * prexecute 仅在执行成功时候发送 header 包
98
    * 执行失败时候有两种表现
99
    *  1. 只返回一个 error 包, 这个时候需要注意 ps stmt 的泄漏问题
100
    *  2. 重试, local 重试直接交给上层做, package 重试需要 注意 ps stmt 的泄漏问题
101
    *  3. response_query_header & flush_buffer 不会产生需要重试的报错,此位置是 ObAsyncPlanDriver 重试前的一步,中间不会有别的可能重试的操作
102
    *  4. ps stmt 清理要注意异步回包的情况,可能需要在异步包里面做清理
103
    */
104
    // need close result set
105
    int close_ret = OB_SUCCESS;
106
    if (OB_SUCCESS != (close_ret = result.close())) {
107
      LOG_WARN("close result failed", K(close_ret));
108
    }
109
    LOG_WARN("prexecute response query head fail. ", K(ret));
110
  } else if (result.is_with_rows()) {
111
    ret = OB_ERR_UNEXPECTED;
112
    LOG_ERROR("SELECT should not use async method. wrong code!!!", K(ret));
113
  } else if (OB_FAIL(result.close())) {
114
    LOG_WARN("result close failed, let's leave process(). EndTransCb will clean this mess", K(ret));
115
  } else {
116
    // async 并没有调用 ObSqlEndTransCb 回复  OK 包
117
    // 所以 二合一协议的 OK 包也要放在这里处理
118
    if (is_prexecute_) {
119
      if (stmt::T_SELECT == result.get_stmt_type()) {
120
        ret = OB_ERR_UNEXPECTED;
121
        LOG_WARN("select stmt do not use async plan in prexecute.", K(ret));
122
      } else if (!result.is_async_end_trans_submitted()) {
123
        // is_async_end_trans_submitted 表示异步回包准备好了
124
        ObOKPParam ok_param;
125
        ok_param.affected_rows_ = result.get_affected_rows();
126
        ok_param.is_partition_hit_ = session_.partition_hit().get_bool();
127
        ok_param.has_more_result_ = result.has_more_result();
128
        ok_param.send_last_row_ = true;
129
        if (OB_FAIL(sender_.send_ok_packet(session_, ok_param))) {
130
          LOG_WARN("fail to send ok packt", K(ok_param), K(ret));
131
        }
132
      }
133
    }
134
  }
135

136
  // 仅在end_trans执行后(无论成功与否)才会设置,意味着一定会回调
137
  // 之所以设计这么一个值,是因为open/close可能“半途而废”,根本没走到
138
  // end_trans()接口。这个变量相当于一个最终的确认。
139
  bool async_resp_used = result.is_async_end_trans_submitted();
140
  if (async_resp_used && retry_ctrl_.need_retry()) {
141
    LOG_ERROR("the async request is ok, couldn't send request again");
142
  }
143
  LOG_DEBUG("test if async end trans submitted",
144
            K(ret), K(async_resp_used), K(retry_ctrl_.need_retry()));
145

146
  //if the error code is ob_timeout, we add more error info msg for dml query.
147
  if (OB_TIMEOUT == ret) {
148
    LOG_USER_ERROR(OB_TIMEOUT, THIS_WORKER.get_timeout_ts() - session_.get_query_start_time());
149
  }
150

151
  // 错误处理,没有走异步的时候负责回错误包
152
  if (!OB_SUCC(ret) && !async_resp_used && !retry_ctrl_.need_retry()) {
153
    int sret = OB_SUCCESS;
154
    bool is_partition_hit = session_.get_err_final_partition_hit(ret);
155
    if (OB_SUCCESS != (sret = sender_.send_error_packet(ret, NULL, is_partition_hit))) {
156
      LOG_WARN("send error packet fail", K(sret), K(ret));
157
    }
158
    //根据与事务层的约定,无论end_participant和end_stmt是否成功,
159
    //判断事务commit或回滚是否成功都只看最后的end_trans是否成功,
160
    //而SQL是要保证一定要调到end_trans的,调end_trans的时候判断了是否需要断连接,
161
    //所以这里不需要判断是否需要断连接了
162
  }
163
  return ret;
164
}
165

166

167
}/* ns observer*/
168
}/* ns oceanbase */
169

170

171

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.