13
#define USING_LOG_PREFIX SERVER
14
#include "observer/mysql/obmp_change_user.h"
15
#include "observer/mysql/obmp_utils.h"
16
#include "lib/string/ob_sql_string.h"
17
#include "rpc/obmysql/ob_mysql_util.h"
18
#include "rpc/obmysql/packet/ompk_ok.h"
19
#include "share/schema/ob_schema_getter_guard.h"
20
#include "share/schema/ob_schema_getter_guard.h"
21
#include "share/schema/ob_schema_struct.h"
22
#include "share/schema/ob_multi_version_schema_service.h"
23
#include "sql/ob_sql.h"
24
#include "sql/ob_end_trans_callback.h"
25
#include "sql/session/ob_sql_session_mgr.h"
26
#include "sql/session/ob_sql_session_info.h"
27
#include "sql/session/ob_user_resource_mgr.h"
28
#include "sql/parser/ob_parser.h"
29
#include "sql/parser/ob_parser_utils.h"
30
#include "rpc/obmysql/obsm_struct.h"
33
using namespace oceanbase::common;
34
using namespace oceanbase::rpc;
35
using namespace oceanbase::obmysql;
36
using namespace oceanbase::share::schema;
41
int ObMPChangeUser::deserialize()
44
if (OB_ISNULL(req_) || OB_UNLIKELY(ObRequest::OB_MYSQL != req_->get_type())) {
45
ret = OB_ERR_UNEXPECTED;
46
LOG_ERROR("invalid request", K(req_));
48
ObSQLSessionInfo *session = NULL;
49
ObMySQLCapabilityFlags capability;
50
if (OB_FAIL(get_session(session))) {
51
LOG_WARN("get session fail", K(ret));
52
} else if (OB_ISNULL(session)) {
53
ret = OB_ERR_UNEXPECTED;
54
LOG_ERROR("fail to get session info", K(ret), K(session));
56
ObSQLSessionInfo::LockGuard lock_guard(session->get_query_lock());
57
session->update_last_active_time();
58
capability = session->get_capability();
60
if (NULL != session) {
61
revert_session(session);
64
pkt_ = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
65
const char *buf = pkt_.get_cdata();
66
const char *pos = pkt_.get_cdata();
68
const int64_t len = pkt_.get_clen() - 1;
69
const char *end = buf + len;
71
if (OB_LIKELY(pos < end)) {
72
username_.assign_ptr(pos, static_cast<int32_t>(STRLEN(pos)));
73
pos += username_.length() + 1;
76
if (OB_LIKELY(pos < end)) {
77
if (capability.cap_flags_.OB_CLIENT_SECURE_CONNECTION) {
78
uint8_t auth_response_len = 0;
79
ObMySQLUtil::get_uint1(pos, auth_response_len);
80
auth_response_.assign_ptr(pos, static_cast<int32_t>(auth_response_len));
81
pos += auth_response_len;
83
auth_response_.assign_ptr(pos, static_cast<int32_t>(STRLEN(pos)));
84
pos += auth_response_.length() + 1;
88
if (OB_LIKELY(pos < end)) {
89
database_.assign_ptr(pos, static_cast<int32_t>(STRLEN(pos)));
90
pos += database_.length() + 1;
93
if (OB_LIKELY(pos < end)) {
94
ObMySQLUtil::get_uint2(pos, charset_);
97
if (OB_LIKELY(pos < end)) {
98
if (capability.cap_flags_.OB_CLIENT_PLUGIN_AUTH) {
99
auth_plugin_name_.assign_ptr(pos, static_cast<int32_t>(STRLEN(pos)));
100
pos += auth_plugin_name_.length() + 1;
104
if (OB_LIKELY(pos < end)) {
105
if (capability.cap_flags_.OB_CLIENT_CONNECT_ATTRS) {
106
uint64_t all_attrs_len = 0;
107
const char *attrs_end = NULL;
108
if (OB_FAIL(ObMySQLUtil::get_length(pos, all_attrs_len))) {
109
LOG_WARN("fail to get all_attrs_len", K(ret));
111
attrs_end = pos + all_attrs_len;
114
while(OB_SUCC(ret) && OB_LIKELY(pos < attrs_end)) {
115
if (OB_FAIL(decode_string_kv(attrs_end, pos, str_kv))) {
116
OB_LOG(WARN, "fail to decode string kv", K(ret));
118
if (str_kv.key_ == OB_MYSQL_PROXY_SESSION_VARS) {
119
const char *vars_start = str_kv.value_.ptr();
120
if (OB_FAIL(decode_session_vars(vars_start, str_kv.value_.length()))) {
121
OB_LOG(WARN, "fail to decode session vars", K(ret));
135
int ObMPChangeUser::decode_string_kv(const char *attrs_end, const char *&pos, ObStringKV &kv)
137
int ret = OB_SUCCESS;
138
uint64_t key_len = 0;
139
uint64_t value_len = 0;
140
if (OB_ISNULL(pos)) {
141
ret = OB_INVALID_ARGUMENT;
142
LOG_WARN("invalie input value", K(pos), K(ret));
144
if (OB_FAIL(ObMySQLUtil::get_length(pos, key_len))) {
145
OB_LOG(WARN, "fail t get key len", K(pos), K(ret));
146
} else if (pos + key_len >= attrs_end) {
150
kv.key_.assign_ptr(pos, static_cast<uint32_t>(key_len));
152
if (OB_FAIL(ObMySQLUtil::get_length(pos, value_len))) {
153
OB_LOG(WARN, "fail t get value len", K(pos), K(ret));
155
kv.value_.assign_ptr(pos, static_cast<uint32_t>(value_len));
164
int ObMPChangeUser::decode_session_vars(const char *&pos, const int64_t session_vars_len)
166
int ret = OB_SUCCESS;
167
if (OB_ISNULL(pos) || OB_UNLIKELY(session_vars_len < 0)) {
168
ret = OB_INVALID_ARGUMENT;
169
OB_LOG(WARN, "invalie input value", K(pos), K(session_vars_len), K(ret));
171
const char *end = pos + session_vars_len;
172
bool found_separator = false;
174
while (OB_SUCC(ret) && OB_LIKELY(pos < end)) {
175
if (OB_FAIL(decode_string_kv(end, pos, tmp_kv))) {
176
OB_LOG(WARN, "fail to decode string kv", K(ret));
178
if (tmp_kv.key_ == ObMySQLPacket::get_separator_kv().key_
179
&& tmp_kv.value_ == ObMySQLPacket::get_separator_kv().value_) {
180
found_separator = true;
183
if (found_separator) {
184
if (OB_FAIL(user_vars_.push_back(tmp_kv))) {
185
OB_LOG(WARN, "fail to push back user_vars", K(tmp_kv), K(ret));
188
if (OB_FAIL(sys_vars_.push_back(tmp_kv))) {
189
OB_LOG(WARN, "fail to push back sys_vars", K(tmp_kv), K(ret));
200
int ObMPChangeUser::process()
202
int ret = OB_SUCCESS;
203
ObSQLSessionInfo *session = NULL;
204
bool is_proxy_mod = get_conn()->is_proxy_;
205
bool need_disconnect = true;
206
bool need_response_error = true;
207
const ObMySQLRawPacket &pkt = reinterpret_cast<const ObMySQLRawPacket&>(req_->get_packet());
208
if (OB_FAIL(get_session(session))) {
209
LOG_ERROR("get session fail", K(ret));
210
} else if (OB_ISNULL(session)) {
211
ret = OB_ERR_UNEXPECTED;
212
LOG_ERROR("fail to get session info", K(ret), K(session));
213
} else if (OB_FAIL(process_kill_client_session(*session))) {
214
LOG_WARN("client session has been killed", K(ret));
215
} else if (FALSE_IT(session->set_txn_free_route(pkt.txn_free_route()))) {
216
} else if (OB_FAIL(process_extra_info(*session, pkt, need_response_error))) {
217
LOG_WARN("fail get process extra info", K(ret));
218
} else if (FALSE_IT(session->post_sync_session_info())) {
220
need_disconnect = false;
221
ObSQLSessionInfo::LockGuard lock_guard(session->get_query_lock());
222
session->update_last_active_time();
223
if (OB_FAIL(ObSqlTransControl::rollback_trans(session, need_disconnect))) {
224
OB_LOG(WARN, "fail to rollback trans for change user", K(ret), K(session));
226
session->clean_status();
227
if (OB_FAIL(load_privilege_info(session))) {
228
OB_LOG(WARN,"load privilige info failed", K(ret),K(session->get_sessid()));
231
if (!sys_vars_.empty()) {
232
for (int64_t i = 0; OB_SUCC(ret) && i < sys_vars_.count(); ++i) {
233
if (OB_FAIL(session->update_sys_variable(sys_vars_.at(i).key_, sys_vars_.at(i).value_))) {
234
OB_LOG(WARN, "fail to update session vars", "sys_var", sys_vars_.at(i), K(ret));
238
if (OB_SUCC(ret) && !user_vars_.empty()) {
239
if (OB_FAIL(replace_user_variables(*session))) {
240
OB_LOG(WARN, "fail to replace user variables", K(ret));
251
ok_param.is_on_change_user_ = true;
252
if (OB_FAIL(send_ok_packet(*session, ok_param))) {
253
OB_LOG(WARN, "response ok packet fail", K(ret));
255
} else if (need_response_error) {
256
if (OB_FAIL(send_error_packet(ret, NULL))) {
257
OB_LOG(WARN,"response fail packet fail", K(ret));
259
need_disconnect = true;
262
if (OB_UNLIKELY(need_disconnect) && is_conn_valid()) {
263
if (OB_ISNULL(session)) {
264
LOG_WARN("will disconnect connection", K(ret), K(session));
266
LOG_WARN("will disconnect connection", K(ret), KPC(session));
271
if (session != NULL) {
272
revert_session(session);
277
int ObMPChangeUser::load_privilege_info(ObSQLSessionInfo *session)
279
int ret = OB_SUCCESS;
281
ObSchemaGetterGuard schema_guard;
282
ObSMConnection *conn = NULL;
283
if (OB_ISNULL(session) || OB_ISNULL(gctx_.schema_service_)) {
284
ret = OB_INVALID_ARGUMENT;
285
OB_LOG(WARN,"invalid argument", K(session), K(gctx_.schema_service_));
286
} else if (OB_ISNULL(conn = get_conn())) {
287
ret = OB_ERR_UNEXPECTED;
288
LOG_ERROR("null conn", K(ret));
289
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(
290
session->get_effective_tenant_id(), schema_guard))) {
291
OB_LOG(WARN,"fail get schema guard", K(ret));
293
share::schema::ObUserLoginInfo login_info;
294
const char *sep_pos = username_.find('@');
295
if (NULL != sep_pos) {
296
ObString username(sep_pos - username_.ptr(), username_.ptr());
297
login_info.user_name_ = username;
298
login_info.tenant_name_ = username_.after(sep_pos);
299
if (login_info.tenant_name_ != session->get_tenant_name()) {
300
ret = OB_OP_NOT_ALLOW;
301
OB_LOG(WARN, "failed to change user in different tenant", K(ret),
302
K(login_info.tenant_name_), K(session->get_tenant_name()));
303
LOG_USER_ERROR(OB_OP_NOT_ALLOW, "forbid! change user command in differernt tenant");
306
login_info.user_name_ = username_;
309
if (login_info.tenant_name_.empty()) {
310
login_info.tenant_name_ = session->get_tenant_name();
312
if (!database_.empty()) {
313
login_info.db_ = database_;
315
login_info.client_ip_ = session->get_client_ip();
316
OB_LOG(INFO, "com change user", "username", login_info.user_name_,
317
"tenant name", login_info.tenant_name_);
318
login_info.scramble_str_.assign_ptr(conn->scramble_buf_, sizeof(conn->scramble_buf_));
319
login_info.passwd_ = auth_response_;
322
SSL *ssl_st = SQL_REQ_OP.get_sql_ssl_st(req_);
324
share::schema::ObSessionPrivInfo session_priv;
327
} else if (OB_FAIL(session->on_user_disconnect())) {
328
LOG_WARN("user disconnect failed", K(ret));
330
const ObUserInfo *user_info = NULL;
332
} else if (OB_FAIL(schema_guard.check_user_access(login_info, session_priv,
333
ssl_st, user_info))) {
334
OB_LOG(WARN, "User access denied", K(login_info), K(ret));
335
} else if (OB_FAIL(session->on_user_connect(session_priv, user_info))) {
336
OB_LOG(WARN, "user connect failed", K(ret), K(session_priv));
338
uint64_t db_id = OB_INVALID_ID;
339
const ObSysVariableSchema *sys_variable_schema = NULL;
340
session->set_user(session_priv.user_name_, session_priv.host_name_, session_priv.user_id_);
341
session->set_user_priv_set(session_priv.user_priv_set_);
342
session->set_db_priv_set(session_priv.db_priv_set_);
343
session->set_enable_role_array(session_priv.enable_role_id_array_);
344
if (OB_FAIL(session->set_tenant(login_info.tenant_name_, session_priv.tenant_id_))) {
345
OB_LOG(WARN, "fail to set tenant", "tenant name", login_info.tenant_name_, K(ret));
346
} else if (OB_FAIL(session->set_default_database(database_))) {
347
OB_LOG(WARN, "failed to set default database", K(ret), K(database_));
348
} else if (OB_FAIL(session->set_real_client_ip_and_port(login_info.client_ip_, session->get_client_addr_port()))) {
349
LOG_WARN("failed to set_real_client_ip_and_port", K(ret));
350
} else if (OB_FAIL(schema_guard.get_sys_variable_schema(session_priv.tenant_id_, sys_variable_schema))) {
351
LOG_WARN("get sys variable schema failed", K(ret));
352
} else if (OB_ISNULL(sys_variable_schema)) {
353
ret = OB_ERR_UNEXPECTED;
354
LOG_WARN("sys variable schema is null", K(ret));
355
} else if (OB_FAIL(session->load_all_sys_vars(*sys_variable_schema, true))) {
356
LOG_WARN("load system variables failed", K(ret));
357
} else if (OB_FAIL(session->update_database_variables(&schema_guard))) {
358
OB_LOG(WARN, "failed to update database variables", K(ret));
359
} else if (!database_.empty() && OB_FAIL(schema_guard.get_database_id(session->get_effective_tenant_id(),
360
session->get_database_name(),
362
OB_LOG(WARN, "failed to get database id", K(ret));
363
} else if (OB_FAIL(update_transmission_checksum_flag(*session))) {
364
LOG_WARN("update transmisson checksum flag failed", K(ret));
365
} else if (OB_FAIL(update_proxy_sys_vars(*session))) {
366
LOG_WARN("update_proxy_sys_vars failed", K(ret));
367
} else if (OB_FAIL(update_charset_sys_vars(*conn, *session))) {
368
LOG_WARN("fail to update charset sys vars", K(ret));
370
session->set_database_id(db_id);
371
session->reset_user_var();
382
int ObMPChangeUser::replace_user_variables(ObBasicSessionInfo &session) const
384
int ret = OB_SUCCESS;
385
if (!user_vars_.empty()) {
388
if (OB_FAIL(sql.append_fmt("SET"))) {
389
OB_LOG(WARN, "fail to append_fmt 'SET'", K(ret));
392
for (int64_t i = 0; OB_SUCC(ret) && i < user_vars_.count(); ++i) {
393
kv = user_vars_.at(i);
394
if (OB_FAIL(sql.append_fmt(" @%.*s = %.*s,",
395
kv.key_.length(), kv.key_.ptr(),
396
kv.value_.length(), kv.value_.ptr()))) {
397
OB_LOG(WARN, "fail to append fmt user var", K(ret), K(kv));
402
*(sql.ptr() + sql.length() - 1) = ';';
404
stmt.assign_ptr(sql.ptr(), static_cast<int32_t>(sql.length()));
405
ObArenaAllocator allocator(ObModIds::OB_SQL_PARSER);
406
ObParser parser(allocator, session.get_sql_mode());
407
SMART_VAR(ParseResult, result) {
408
if (OB_FAIL(parser.parse(stmt, result))) {
409
OB_LOG(WARN, "fail to parse stmt", K(ret), K(stmt));
412
ParseNode *node = result.result_tree_;
413
ObArenaAllocator calc_buf(ObModIds::OB_SQL_SESSION);
414
ObCastCtx cast_ctx(&calc_buf, NULL, CM_NONE, ObCharset::get_system_collation());
415
if (OB_FAIL(parse_var_node(node, cast_ctx, session))) {
416
OB_LOG(WARN, "fail to parse user var node", K(ret));
425
int ObMPChangeUser::parse_var_node(const ParseNode *node, ObCastCtx &cast_ctx, ObBasicSessionInfo &session) const
427
int ret = OB_SUCCESS;
428
if (OB_ISNULL(node)) {
429
ret = OB_ERR_UNEXPECTED;
430
OB_LOG(WARN, "node is null", K(ret));
433
ParseNode *tmp_node = NULL;
434
ParseNode *val_node = NULL;
438
for (int64_t i = 0; OB_SUCC(ret) && !found && i < node->num_child_; ++i) {
439
if (NULL != (tmp_node = node->children_[i])) {
440
if (0 == tmp_node->num_child_) {
441
if (T_USER_VARIABLE_IDENTIFIER == tmp_node->type_) {
444
if (node->num_child_ != 2) {
445
ret = OB_ERR_UNEXPECTED;
446
OB_LOG(WARN, "node children num must be 2 if it is VAR SET", K(ret), K_(node->num_child));
447
} else if (OB_ISNULL(val_node = node->children_[1 - i])) {
448
ret = OB_ERR_UNEXPECTED;
449
OB_LOG(WARN, "val node is null", K(ret));
451
var.assign_ptr(tmp_node->str_value_, static_cast<int32_t>(tmp_node->str_len_));
452
val.assign_ptr(val_node->str_value_, static_cast<int32_t>(val_node->str_len_));
453
type = (static_cast<ObObjType>(val_node->type_));
454
if (OB_FAIL(handle_user_var(var, val, type, cast_ctx, session))) {
455
OB_LOG(WARN, "fail to handle user var", K(ret), K(var), K(val), K(type));
459
} else if (OB_FAIL(parse_var_node(tmp_node, cast_ctx, session))) {
460
OB_LOG(WARN, "fail to parse node", K(ret));
468
int ObMPChangeUser::handle_user_var(const ObString &var, const ObString &val,
469
const ObObjType type, ObCastCtx &cast_ctx,
470
ObBasicSessionInfo &session) const
472
int ret = OB_SUCCESS;
475
const ObObj *out_obj = NULL;
476
ObSessionVariable sess_var;
477
if (ObNullType == type) {
478
sess_var.value_.set_null();
479
sess_var.meta_.set_collation_level(CS_LEVEL_IMPLICIT);
480
sess_var.meta_.set_collation_type(CS_TYPE_BINARY);
483
in_obj.set_varchar(val);
484
in_obj.set_collation_type(ObCharset::get_system_collation());
485
if (OB_FAIL(ObObjCaster::to_type(type, cast_ctx, in_obj, buf_obj, out_obj))) {
486
OB_LOG(WARN, "fail to cast varchar to target type", K(ret), K(type), K(in_obj));
487
} else if (OB_ISNULL(out_obj)) {
488
ret = OB_ERR_UNEXPECTED;
489
OB_LOG(WARN, "out obj is null", K(ret));
491
sess_var.value_ = *out_obj;
492
sess_var.meta_.set_type(out_obj->get_type());
493
sess_var.meta_.set_scale(out_obj->get_scale());
494
sess_var.meta_.set_collation_level(CS_LEVEL_IMPLICIT);
495
sess_var.meta_.set_collation_type(out_obj->get_collation_type());
497
if (OB_SUCC(ret) && OB_FAIL(session.replace_user_variable(var, sess_var))) {
498
OB_LOG(WARN, "fail to replace user var", K(ret), K(var), K(sess_var));