2
* Copyright (c) 2021 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
13
#include "lib/thread/ob_thread_name.h"
14
#define USING_LOG_PREFIX SERVER
19
#include "easy_define.h"
20
#include "io/easy_io_struct.h"
21
#include "io/easy_connection.h"
22
#include "rpc/frame/ob_net_easy.h"
23
#include "observer/mysql/obsm_handler.h"
24
#include "lib/queue/ob_link_queue.h"
25
#include "lib/lock/ob_scond.h"
28
using namespace oceanbase;
29
using namespace oceanbase::observer;
30
using namespace oceanbase::common;
32
class MyReq :public common::ObLink
36
char *data_ = nullptr;
37
easy_request_t *ez_r_ = nullptr;
40
class MyReqHandler : public rpc::frame::ObReqHandler
43
MyReqHandler(std::function<int (MyReq *req)> &&do_process) {
47
EZ_ADD_CB(on_connect);
48
EZ_ADD_CB(on_disconnect);
50
do_process_ = std::move(do_process);
52
void *decode(easy_message_t *m) override;
53
int process(easy_request_t *r) override;
54
int encode(easy_request_t */* r */, void */* packet */) override;
56
std::function<int (MyReq *req)> do_process_;
59
void *MyReqHandler::decode(easy_message_t *m)
63
if ((req = (MyReq *)easy_pool_calloc(m->pool,
64
sizeof(MyReq))) == NULL) {
65
m->status = EASY_ERROR;
68
int len = m->input->last - m->input->pos;
69
//std::string msg(m->input->pos, len);
70
LOG_INFO("decode", K(req), K(len), K(m->c->doing_request_count));
71
req->data_ = m->input->pos;
73
m->input->last = m->input->pos;
77
void gen_http_resp(std::string &ret, std::string &output)
79
ret.append("HTTP/1.1");
80
ret.append(" 200 OK\r\n",9);
81
ret.append("Content-Length: ",16);
82
ret.append(std::to_string(output.size()));
84
//ret.append("Content-Type: text/plain; charset=utf-8\r\n",41);
85
ret.append("Connection: keep-alive\r\n\r\n",26);
89
int MyReqHandler::process(easy_request_t *r)
91
MyReq *req = (MyReq*)r->ipacket;
93
LOG_INFO("easy IO process", K(r));
96
easy_request_sleeping(r);
97
if (OB_FAIL(do_process_(req))) {
98
LOG_ERROR("do_process fail", K(ret));
106
int MyReqHandler::encode(easy_request_t *r, void *packet)
108
LOG_INFO("encode", K(r), K(packet), K(lbt()));
109
int ret = OB_SUCCESS;
110
easy_buf_t *buf = reinterpret_cast<easy_buf_t *>(packet);
111
easy_request_addbuf(r, buf);
115
void thread_start(void *args)
118
lib::set_thread_name("MysqlIO");
124
SimpleSqlServer() : eio_(nullptr), req_handler_([this] (MyReq *req) { push(req);return 0; }), req_(0) {}
125
~SimpleSqlServer() {}
127
int init(int port = 8080, int io_cnt = 1) {
128
int ret = OB_SUCCESS;
129
if (OB_ISNULL(eio_ = easy_eio_create(eio_, io_cnt))) {
130
ret = OB_LIBEASY_ERROR;
131
} else if (OB_ISNULL(easy_connection_add_listen(eio_, NULL, port, req_handler_.ez_handler()))) {
132
ret = OB_SERVER_LISTEN_ERROR;
133
LOG_ERROR("easy_connection_add_listen error", K(port), KERRMSG, K(ret));
134
} else if (OB_FAIL(q_.init(256))) {
135
LOG_ERROR("init queue fail", K(ret));
137
easy_eio_set_uthread_start(eio_, thread_start, nullptr);
138
int eret = easy_eio_start(eio_);
139
if (EASY_OK == eret) {
140
LOG_INFO("start mysql easy io");
142
ret = OB_LIBEASY_ERROR;
143
LOG_ERROR("start mysql easy io fail", K(ret));
149
ObLink *data = nullptr;
150
int ret = OB_SUCCESS;
152
if (OB_FAIL(q2_.pop(data))) {
158
return static_cast<MyReq*>(data);
160
void push2(MyReq *req) {
165
void *data = nullptr;
166
int ret = OB_SUCCESS;
168
if (OB_FAIL(q_.pop(data, 10000))) {
173
return static_cast<MyReq*>(data);
175
void push(MyReq *req) {
183
MyReqHandler req_handler_;
191
int main(int argc, char **argv)
198
char *log_level = (char*)"DEBUG";
199
while(EOF != (c = getopt(argc,argv,"c:w:p:t:l:"))) {
202
io_cnt = atoi(optarg);
205
worker_cnt = atoi(optarg);
211
time_sec = atoi(optarg);
221
system("rm -rf simple_sql_server.log*");
222
OB_LOGGER.set_file_name("simple_sql_server.log", true);
223
OB_LOGGER.set_log_level(log_level);
224
OB_LOGGER.set_enable_async_log(true);
226
int ret = OB_SUCCESS;
227
SimpleSqlServer server;
228
if (OB_FAIL(server.init(port, io_cnt))) {
229
LOG_ERROR("server init fail", K(ret));
233
std::vector<std::thread> ths;
234
for (int i = 0; i < worker_cnt; i++) {
235
std::thread th([&, i] () {
236
std::string thread_name("Worker");
237
thread_name.append(std::to_string(i));
238
lib::set_thread_name(thread_name.c_str());
240
MyReq *req = server.pop();
241
easy_request_t *r = req->ez_r_;
243
b = easy_buf_create(r->ms->pool, 1024);
245
LOG_ERROR("easy_buf_create fail");
249
std::string resp, output;
251
gen_http_resp(resp, output);
252
LOG_INFO("worker process", K(resp.size()));
253
b->last = easy_memcpy(b->last, resp.c_str(), resp.size());
256
r->opacket_size = resp.size();
257
easy_request_wakeup(r);
258
ATOMIC_INC(&server.get_req());
261
ths.push_back(std::move(th));
264
int64_t last_req_sum = 0;
266
for (int i = 0; i < time_sec; i++) {
267
int64_t curr_req_sum = ATOMIC_LOAD(&server.get_req());
268
int64_t new_req = curr_req_sum - last_req_sum;
274
if (new_req > 0 || silent < 3) {
275
std::cout << curr_req_sum << " " << new_req << std::endl;
277
last_req_sum = curr_req_sum;