12
#include <gtest/gtest.h>
13
#include <gmock/gmock.h>
15
#include "observer/table/ob_table_connection_mgr.h"
16
#include "lib/time/ob_time_utility.h"
17
#include "rpc/ob_request.h"
18
#include "rpc/frame/ob_net_easy.cpp"
19
#include "share/ob_thread_pool.h"
20
#include "lib/container/ob_array.h"
21
using namespace oceanbase::table;
22
using namespace oceanbase::common;
23
using namespace oceanbase::rpc;
24
using namespace oceanbase::share;
28
class TestTableConnection : public::testing::Test
31
TestTableConnection() {}
32
virtual ~TestTableConnection() {}
33
virtual void SetUp() {}
34
virtual void TearDown() {}
37
#define CONS_FAKE_REQUEST(ob_req, ob_addr)\
38
ObRequest ob_req(ObRequest::OB_RPC);\
39
easy_request_t ez_req;\
40
easy_message_session_t ms;\
42
easy_addr_t ez_addr = __to_ez_addr(ob_addr);\
46
ob_req.ez_req_ = &ez_req;
48
#define CONS_FAKE_EZ_CONN(ez_conn, ob_addr)\
49
easy_connection_t ez_conn;\
50
easy_addr_t ez_conn_addr = __to_ez_addr(ob_addr);\
51
ez_conn.addr = ez_conn_addr;\
54
class MockObTableRequestSender : public ObThreadPool
57
MockObTableRequestSender() {}
58
MockObTableRequestSender(const ObAddr &client_addr, int64_t thread_cnt, int64_t tenant_id,
59
int64_t database_id, int64_t user_id, int64_t request_times)
60
: client_addr_(client_addr),
61
thread_cnt_(thread_cnt),
62
tenant_id_(tenant_id),
63
database_id_(database_id),
65
request_times_(request_times)
67
set_thread_count(thread_cnt_);
70
virtual ~MockObTableRequestSender() {};
72
TO_STRING_KV(K_(client_addr), K_(thread_cnt), K_(tenant_id), K_(database_id), K_(user_id), K_(request_times));
79
int64_t request_times_;
82
void MockObTableRequestSender::run1()
84
CONS_FAKE_REQUEST(req, client_addr_);
85
for(int64_t i = 0; i < request_times_; i++) {
86
ASSERT_EQ(OB_SUCCESS, ObTableConnectionMgr::get_instance().update_table_connection(&req, tenant_id_, database_id_, user_id_));
91
class MockObTableConnCloser : public ObThreadPool
94
explicit MockObTableConnCloser(const ObAddr &client_addr)
95
: client_addr_(client_addr)
99
virtual ~MockObTableConnCloser() {};
101
TO_STRING_KV(K_(client_addr));
103
const ObAddr client_addr_;
106
void MockObTableConnCloser::run1()
108
CONS_FAKE_EZ_CONN(ez_conn, client_addr_)
109
ObTableConnectionMgr::get_instance().on_conn_close(&ez_conn);
113
TEST_F(TestTableConnection, test_table_connection)
116
ObTableConnection conn;
117
ObAddr addr(2130706433, 6001);
118
int64_t tenant_id = 1001;
119
int64_t database_id = 99999;
120
int64_t user_id = 99999;
121
ASSERT_EQ(OB_SUCCESS, conn.init(addr, tenant_id, database_id, user_id));
122
ASSERT_EQ(addr, conn.get_addr());
123
ASSERT_EQ(tenant_id, conn.get_tenant_id());
124
ASSERT_EQ(database_id, conn.get_database_id());
125
ASSERT_EQ(user_id, conn.get_user_id());
127
int64_t first_active_time = conn.get_first_active_time();
128
ASSERT_EQ(first_active_time, conn.get_last_active_time());
129
int64_t new_active_time = ObTimeUtility::current_time();
130
conn.update_last_active_time(new_active_time);
131
ASSERT_EQ(first_active_time, conn.get_first_active_time());
132
ASSERT_EQ(new_active_time, conn.get_last_active_time());
135
ASSERT_EQ(OB_INVALID_ARGUMENT, conn.init(invalid_addr, tenant_id, database_id, user_id));
139
TEST_F(TestTableConnection, test_connection_mgr)
141
ObAddr addr(2130706433, 6001);
142
int64_t tenant_id = 1001;
143
int64_t database_id = 99999;
144
int64_t user_id = 99999;
145
CONS_FAKE_REQUEST(req, addr);
146
ASSERT_EQ(OB_SUCCESS, ObTableConnectionMgr::get_instance().update_table_connection(&req, tenant_id, database_id, user_id));
147
ASSERT_EQ(1, ObTableConnectionMgr::get_instance().connection_map_.size());
148
CONS_FAKE_EZ_CONN(ez_conn, addr);
149
ObTableConnectionMgr::get_instance().on_conn_close(&ez_conn);
150
ASSERT_EQ(0, ObTableConnectionMgr::get_instance().connection_map_.size());
156
TEST_F(TestTableConnection, test_multi_request)
158
const int64_t start = ObTimeUtility::current_time();
159
const int64_t thread_cnt = 100;
160
int64_t tenant_id = 1001;
161
int64_t database_id = 99999;
162
int64_t user_id = 99999;
163
int64_t request_times = 100;
164
const ObAddr addr(2130706433, 1);
165
MockObTableRequestSender req_sender(addr, thread_cnt, tenant_id, database_id, user_id, request_times);
166
ASSERT_EQ(OB_SUCCESS, req_sender.start());
168
ASSERT_EQ(1, ObTableConnectionMgr::get_instance().connection_map_.size());
169
MockObTableConnCloser closer(addr);
170
ASSERT_EQ(OB_SUCCESS, closer.start());
172
ASSERT_EQ(0, ObTableConnectionMgr::get_instance().connection_map_.size());
173
const int64_t duration = ObTimeUtility::current_time() - start;
174
printf("time elapsed: %ldms\n", duration/1000);
180
TEST_F(TestTableConnection, test_multi_connection)
182
const int64_t start = ObTimeUtility::current_time();
183
ObArenaAllocator alloc;
184
const int64_t thread_cnt = 8;
185
int64_t tenant_id = 1001;
186
int64_t database_id = 99999;
187
int64_t user_id = 99999;
188
int64_t request_times = 100;
189
int64_t conn_cnt = 200;
190
const int32_t ip = 2130706433;
191
ObArray<MockObTableRequestSender *> senders;
192
ObArray<MockObTableConnCloser *> closers;
193
for (int64_t i = 1; i <= conn_cnt; i++) {
194
MockObTableRequestSender *sender = OB_NEWx(MockObTableRequestSender, (&alloc), ObAddr(ip, i), thread_cnt,
195
tenant_id, database_id, user_id, request_times);
196
ASSERT_NE(nullptr, sender);
197
ASSERT_EQ(OB_SUCCESS, senders.push_back(sender));
199
MockObTableConnCloser *closer = OB_NEWx(MockObTableConnCloser, (&alloc), ObAddr(ip, i));
200
ASSERT_NE(nullptr, closer);
201
ASSERT_EQ(OB_SUCCESS, closers.push_back(closer));
203
int64_t curr_mem = ObMallocAllocator::get_instance()->get_tenant_hold(OB_SERVER_TENANT_ID);
204
printf("current server tenant mem before sending request: %ld\n", curr_mem);
206
const int64_t req_start = ObTimeUtility::current_time();
207
for (int i = 0; i < conn_cnt; i++) {
208
ASSERT_EQ(OB_SUCCESS, senders.at(i)->start());
210
for (int i = 0; i < conn_cnt; i++) {
211
senders.at(i)->wait();
213
ASSERT_EQ(conn_cnt, ObTableConnectionMgr::get_instance().connection_map_.size());
215
curr_mem = ObMallocAllocator::get_instance()->get_tenant_hold(OB_SERVER_TENANT_ID);
216
printf("current server tenant mem after sending request: %ld\n", curr_mem);
218
for (int i = 0; i < conn_cnt; i++) {
219
ASSERT_EQ(OB_SUCCESS, closers.at(i)->start());
221
for (int i = 0; i < conn_cnt; i++) {
222
closers.at(i)->wait();
224
ASSERT_EQ(0, ObTableConnectionMgr::get_instance().connection_map_.size());
226
const int64_t end = ObTimeUtility::current_time();
227
printf("request time elapsed: %ldms\n", (end - req_start)/1000);
228
printf("all time elapsed: %ldms\n", (end - start)/1000);
234
int main(int argc, char** argv)
236
oceanbase::common::ObLogger::get_logger().set_log_level("INFO");
237
OB_LOGGER.set_log_level("INFO");
238
OB_LOGGER.set_file_name("test_table_connection.log", true);
239
::testing::InitGoogleTest(&argc, argv);
240
return RUN_ALL_TESTS();