2
* Copyright (c) 2021 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
13
#define USING_LOG_PREFIX SERVER
15
#include "ob_async_plan_driver.h"
17
#include "rpc/obmysql/packet/ompk_eof.h"
18
#include "rpc/obmysql/packet/ompk_resheader.h"
19
#include "rpc/obmysql/packet/ompk_field.h"
20
#include "rpc/obmysql/packet/ompk_row.h"
21
#include "rpc/obmysql/ob_mysql_field.h"
22
#include "rpc/obmysql/ob_mysql_packet.h"
23
#include "lib/profile/ob_perf_event.h"
25
#include "observer/mysql/obmp_query.h"
26
#include "observer/mysql/ob_mysql_end_trans_cb.h"
27
#include "observer/mysql/obmp_stmt_prexecute.h"
31
using namespace common;
33
using namespace obmysql;
37
ObAsyncPlanDriver::ObAsyncPlanDriver(const ObGlobalContext &gctx,
39
sql::ObSQLSessionInfo &session,
40
ObQueryRetryCtrl &retry_ctrl,
41
ObIMPPacketSender &sender,
43
: ObQueryDriver(gctx, ctx, session, retry_ctrl, sender, is_prexecute)
47
ObAsyncPlanDriver::~ObAsyncPlanDriver()
51
int ObAsyncPlanDriver::response_result(ObMySQLResultSet &result)
53
ACTIVE_SESSION_FLAG_SETTER_GUARD(in_sql_execution);
55
// result.open 后 pkt_param 所需的 last insert id 等各项参数都已经计算完毕
56
// 对于异步增删改的情况,需要提前update last insert id,以确保回调pkt_param参数正确
57
// 后续result set close的时候,还会进行一次store_last_insert_id的调用
58
ObCurTraceId::TraceId *cur_trace_id = NULL;
59
if (OB_ISNULL(cur_trace_id = ObCurTraceId::get_trace_id())) {
60
ret = OB_ERR_UNEXPECTED;
61
LOG_ERROR("current trace id is NULL", K(ret));
62
} else if (OB_FAIL(result.open())) {
63
LOG_WARN("failed to do result set open", K(ret));
64
} else if (OB_FAIL(result.update_last_insert_id_to_client())) {
65
LOG_WARN("failed to update last insert id after open", K(ret));
68
result.set_end_trans_async(true);
71
if (OB_SUCCESS != ret) {
72
// 如果try_again为true,说明这条SQL需要重做。考虑到重做之前我们需要回滚整个事务,会调用EndTransCb
73
// 所以这里设置一个标记,告诉EndTransCb这种情况下不要给客户端回包。
74
int cli_ret = OB_SUCCESS;
75
retry_ctrl_.test_and_save_retry_state(gctx_, ctx_, result, ret, cli_ret);
76
if (retry_ctrl_.need_retry()) {
77
result.set_will_retry();
78
result.set_end_trans_async(false);
82
// result.close()结束后返回到这里,然后运行重试逻辑
84
// result.close()结束后,process流程应该干净利落地结束,有什么事留到callback里面做
86
int cret = result.close();
87
if (retry_ctrl_.need_retry()) {
88
LOG_WARN("result set open failed, will retry",
89
K(ret), K(cli_ret), K(cret), K(retry_ctrl_.need_retry()));
91
LOG_WARN("result set open failed, let's leave process(). EndTransCb will clean this mess",
92
K(ret), K(cli_ret), K(cret), K(retry_ctrl_.need_retry()));
95
} else if (is_prexecute_ && OB_FAIL(response_query_header(result, false, false, true))) {
97
* prexecute 仅在执行成功时候发送 header 包
99
* 1. 只返回一个 error 包, 这个时候需要注意 ps stmt 的泄漏问题
100
* 2. 重试, local 重试直接交给上层做, package 重试需要 注意 ps stmt 的泄漏问题
101
* 3. response_query_header & flush_buffer 不会产生需要重试的报错,此位置是 ObAsyncPlanDriver 重试前的一步,中间不会有别的可能重试的操作
102
* 4. ps stmt 清理要注意异步回包的情况,可能需要在异步包里面做清理
104
// need close result set
105
int close_ret = OB_SUCCESS;
106
if (OB_SUCCESS != (close_ret = result.close())) {
107
LOG_WARN("close result failed", K(close_ret));
109
LOG_WARN("prexecute response query head fail. ", K(ret));
110
} else if (result.is_with_rows()) {
111
ret = OB_ERR_UNEXPECTED;
112
LOG_ERROR("SELECT should not use async method. wrong code!!!", K(ret));
113
} else if (OB_FAIL(result.close())) {
114
LOG_WARN("result close failed, let's leave process(). EndTransCb will clean this mess", K(ret));
116
// async 并没有调用 ObSqlEndTransCb 回复 OK 包
117
// 所以 二合一协议的 OK 包也要放在这里处理
119
if (stmt::T_SELECT == result.get_stmt_type()) {
120
ret = OB_ERR_UNEXPECTED;
121
LOG_WARN("select stmt do not use async plan in prexecute.", K(ret));
122
} else if (!result.is_async_end_trans_submitted()) {
123
// is_async_end_trans_submitted 表示异步回包准备好了
125
ok_param.affected_rows_ = result.get_affected_rows();
126
ok_param.is_partition_hit_ = session_.partition_hit().get_bool();
127
ok_param.has_more_result_ = result.has_more_result();
128
ok_param.send_last_row_ = true;
129
if (OB_FAIL(sender_.send_ok_packet(session_, ok_param))) {
130
LOG_WARN("fail to send ok packt", K(ok_param), K(ret));
136
// 仅在end_trans执行后(无论成功与否)才会设置,意味着一定会回调
137
// 之所以设计这么一个值,是因为open/close可能“半途而废”,根本没走到
138
// end_trans()接口。这个变量相当于一个最终的确认。
139
bool async_resp_used = result.is_async_end_trans_submitted();
140
if (async_resp_used && retry_ctrl_.need_retry()) {
141
LOG_ERROR("the async request is ok, couldn't send request again");
143
LOG_DEBUG("test if async end trans submitted",
144
K(ret), K(async_resp_used), K(retry_ctrl_.need_retry()));
146
//if the error code is ob_timeout, we add more error info msg for dml query.
147
if (OB_TIMEOUT == ret) {
148
LOG_USER_ERROR(OB_TIMEOUT, THIS_WORKER.get_timeout_ts() - session_.get_query_start_time());
151
// 错误处理,没有走异步的时候负责回错误包
152
if (!OB_SUCC(ret) && !async_resp_used && !retry_ctrl_.need_retry()) {
153
int sret = OB_SUCCESS;
154
bool is_partition_hit = session_.get_err_final_partition_hit(ret);
155
if (OB_SUCCESS != (sret = sender_.send_error_packet(ret, NULL, is_partition_hit))) {
156
LOG_WARN("send error packet fail", K(sret), K(ret));
158
//根据与事务层的约定,无论end_participant和end_stmt是否成功,
159
//判断事务commit或回滚是否成功都只看最后的end_trans是否成功,
160
//而SQL是要保证一定要调到end_trans的,调end_trans的时候判断了是否需要断连接,