oceanbase

Форк
0
/
easy_server_worker.cpp 
282 строки · 6.9 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#include "lib/thread/ob_thread_name.h"
14
#define USING_LOG_PREFIX SERVER
15

16
#include <string>
17
#include <functional>
18

19
#include "easy_define.h"
20
#include "io/easy_io_struct.h"
21
#include "io/easy_connection.h"
22
#include "rpc/frame/ob_net_easy.h"
23
#include "observer/mysql/obsm_handler.h"
24
#include "lib/queue/ob_link_queue.h"
25
#include "lib/lock/ob_scond.h"
26

27

28
using namespace oceanbase;
29
using namespace oceanbase::observer;
30
using namespace oceanbase::common;
31

32
class MyReq :public common::ObLink
33
{
34
public:
35
  int len_ = 0;
36
  char *data_ = nullptr;
37
  easy_request_t *ez_r_ = nullptr;
38
};
39

40
class MyReqHandler : public rpc::frame::ObReqHandler
41
{
42
public:
43
  MyReqHandler(std::function<int (MyReq *req)> &&do_process) {
44
    EZ_ADD_CB(decode);
45
    EZ_ADD_CB(encode);
46
    EZ_ADD_CB(process);
47
    EZ_ADD_CB(on_connect);
48
    EZ_ADD_CB(on_disconnect);
49
    EZ_ADD_CB(cleanup);
50
    do_process_ = std::move(do_process);
51
  }
52
  void *decode(easy_message_t *m) override;
53
  int process(easy_request_t *r) override;
54
  int encode(easy_request_t */* r */, void */* packet */) override;
55

56
  std::function<int (MyReq *req)> do_process_;
57
};
58

59
void *MyReqHandler::decode(easy_message_t *m)
60
{
61
  // alloc packet
62
  MyReq *req;
63
  if ((req = (MyReq *)easy_pool_calloc(m->pool,
64
                  sizeof(MyReq))) == NULL) {
65
    m->status = EASY_ERROR;
66
    return NULL;
67
  }
68
  int len = m->input->last - m->input->pos;
69
  //std::string msg(m->input->pos, len);
70
  LOG_INFO("decode", K(req), K(len), K(m->c->doing_request_count));
71
  req->data_ = m->input->pos;
72
  req->len_ = len;
73
  m->input->last = m->input->pos;
74
  return req;
75
}
76

77
void gen_http_resp(std::string &ret, std::string &output)
78
{
79
  ret.append("HTTP/1.1");
80
  ret.append(" 200 OK\r\n",9);
81
  ret.append("Content-Length: ",16);
82
  ret.append(std::to_string(output.size()));
83
  ret.append("\r\n",2);
84
  //ret.append("Content-Type: text/plain; charset=utf-8\r\n",41);
85
  ret.append("Connection: keep-alive\r\n\r\n",26);
86
  ret.append(output);
87
}
88

89
int MyReqHandler::process(easy_request_t *r)
90
{
91
  MyReq *req = (MyReq*)r->ipacket;
92
  req->ez_r_ = r;
93
  LOG_INFO("easy IO process", K(r));
94
  int eret = EASY_OK;
95
  int ret = OB_SUCCESS;
96
  easy_request_sleeping(r);
97
  if (OB_FAIL(do_process_(req))) {
98
    LOG_ERROR("do_process fail", K(ret));
99
    eret = EASY_ABORT;
100
  } else {
101
    eret = EASY_AGAIN;
102
  }
103
  return eret;
104
}
105

106
int MyReqHandler::encode(easy_request_t *r, void *packet)
107
{
108
  LOG_INFO("encode", K(r), K(packet), K(lbt()));
109
  int ret = OB_SUCCESS;
110
  easy_buf_t *buf = reinterpret_cast<easy_buf_t *>(packet);
111
  easy_request_addbuf(r, buf);
112
  return ret;
113
}
114

115
void thread_start(void *args)
116
{
117
  UNUSED(args);
118
  lib::set_thread_name("MysqlIO");
119
}
120

121
class SimpleSqlServer
122
{
123
public:
124
  SimpleSqlServer() : eio_(nullptr), req_handler_([this] (MyReq *req) { push(req);return 0; }), req_(0) {}
125
  ~SimpleSqlServer() {}
126

127
  int init(int port = 8080, int io_cnt = 1) {
128
    int ret = OB_SUCCESS;
129
    if (OB_ISNULL(eio_ = easy_eio_create(eio_, io_cnt))) {
130
      ret = OB_LIBEASY_ERROR;
131
    } else if (OB_ISNULL(easy_connection_add_listen(eio_, NULL, port, req_handler_.ez_handler()))) {
132
      ret = OB_SERVER_LISTEN_ERROR;
133
      LOG_ERROR("easy_connection_add_listen error", K(port), KERRMSG, K(ret));
134
    } else if (OB_FAIL(q_.init(256))) {
135
      LOG_ERROR("init queue fail", K(ret));
136
    } else {
137
      easy_eio_set_uthread_start(eio_, thread_start, nullptr);
138
      int eret = easy_eio_start(eio_);
139
      if (EASY_OK == eret) {
140
        LOG_INFO("start mysql easy io");
141
      } else {
142
        ret = OB_LIBEASY_ERROR;
143
        LOG_ERROR("start mysql easy io fail", K(ret));
144
      }
145
    }
146
    return ret;
147
  }
148
  MyReq *pop2() {
149
    ObLink *data = nullptr;
150
    int ret = OB_SUCCESS;
151
    while (true) {
152
      if (OB_FAIL(q2_.pop(data))) {
153
        cond_.wait(1000);
154
      } else {
155
        break;
156
      }
157
    }
158
    return static_cast<MyReq*>(data);
159
  }
160
  void push2(MyReq *req) {
161
    q2_.push(req);
162
    cond_.signal();
163
  }
164
  MyReq *pop() {
165
    void *data = nullptr;
166
    int ret = OB_SUCCESS;
167
    while (true) {
168
      if (OB_FAIL(q_.pop(data, 10000))) {
169
      } else {
170
        break;
171
      }
172
    }
173
    return static_cast<MyReq*>(data);
174
  }
175
  void push(MyReq *req) {
176
    q_.push(req);
177
  }
178
  int64_t &get_req() {
179
    return req_;
180
  }
181
private:
182
  easy_io_t *eio_;
183
  MyReqHandler req_handler_;
184
  ObLinkQueue q2_;
185
  SCond cond_;
186

187
  ObLightyQueue q_;
188
  int64_t req_;
189
};
190

191
int main(int argc, char **argv)
192
{
193
  int c = 0;
194
  int port = 8080;
195
  int io_cnt = 40;
196
  int worker_cnt = 40;
197
  int time_sec = 3600;
198
  char *log_level = (char*)"DEBUG";
199
  while(EOF != (c = getopt(argc,argv,"c:w:p:t:l:"))) {
200
    switch(c) {
201
    case 'c':
202
      io_cnt = atoi(optarg);
203
      break;
204
    case 'w':
205
      worker_cnt = atoi(optarg);
206
      break;
207
    case 'p':
208
      port = atoi(optarg);
209
      break;
210
    case 't':
211
      time_sec = atoi(optarg);
212
      break;
213
    case 'l':
214
     log_level = optarg;
215
     break;
216
    default:
217
      break;
218
    }
219
  }
220

221
  system("rm -rf simple_sql_server.log*");
222
  OB_LOGGER.set_file_name("simple_sql_server.log", true);
223
  OB_LOGGER.set_log_level(log_level);
224
  OB_LOGGER.set_enable_async_log(true);
225

226
  int ret = OB_SUCCESS;
227
  SimpleSqlServer server;
228
  if (OB_FAIL(server.init(port, io_cnt))) {
229
    LOG_ERROR("server init fail", K(ret));
230
    return -1;
231
  }
232

233
  std::vector<std::thread> ths;
234
  for (int i = 0; i < worker_cnt; i++) {
235
    std::thread th([&, i] () {
236
      std::string thread_name("Worker");
237
      thread_name.append(std::to_string(i));
238
      lib::set_thread_name(thread_name.c_str());
239
      while (true) {
240
        MyReq *req = server.pop();
241
        easy_request_t *r = req->ez_r_;
242
        easy_buf_t *b;
243
        b = easy_buf_create(r->ms->pool, 1024);
244
        if (b == nullptr) {
245
          LOG_ERROR("easy_buf_create fail");
246
          break;
247
        }
248

249
        std::string resp, output;
250
        resp.reserve(1024);
251
        gen_http_resp(resp, output);
252
        LOG_INFO("worker process", K(resp.size()));
253
        b->last = easy_memcpy(b->last, resp.c_str(), resp.size());
254

255
        r->opacket = b;
256
        r->opacket_size = resp.size();
257
        easy_request_wakeup(r);
258
        ATOMIC_INC(&server.get_req());
259
      }
260
    });
261
    ths.push_back(std::move(th));
262
  }
263

264
  int64_t last_req_sum = 0;
265
  int silent = 0;
266
  for (int i = 0; i < time_sec; i++) {
267
    int64_t curr_req_sum = ATOMIC_LOAD(&server.get_req());
268
    int64_t new_req = curr_req_sum - last_req_sum;
269
    if (new_req == 0) {
270
      silent++;
271
    } else {
272
      silent = 0;
273
    }
274
    if (new_req > 0 || silent < 3) {
275
      std::cout << curr_req_sum << " " << new_req << std::endl;
276
    }
277
    last_req_sum = curr_req_sum;
278
    ::sleep(1);
279
  }
280

281
  return 0;
282
}
283

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.