oceanbase

Форк
0
/
ob_lua_handler.cpp 
223 строки · 7.7 Кб
1
/**
2
 * Copyright (c) 2021 OceanBase
3
 * OceanBase CE is licensed under Mulan PubL v2.
4
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
5
 * You may obtain a copy of Mulan PubL v2 at:
6
 *          http://license.coscl.org.cn/MulanPubL-2.0
7
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
 * See the Mulan PubL v2 for more details.
11
 */
12

13
#include "ob_lua_api.h"
14
#include "ob_lua_handler.h"
15

16
#include <algorithm>
17
#include <functional>
18
#include <thread>
19

20
#include <sys/epoll.h>
21
#include <sys/socket.h>
22
#include <sys/un.h>
23
#include <unistd.h>
24

25
#include "lib/oblog/ob_log.h"
26
#include "lib/atomic/ob_atomic.h"
27
#include "lib/signal/ob_signal_utils.h"
28
#include "lib/thread/ob_thread_name.h"
29
#include "lib/thread/protected_stack_allocator.h"
30
#include "lib/utility/utility.h"
31
#include "lib/thread/thread.h"
32

33
extern "C" {
34
  #include <lua.h>
35
  #include <lauxlib.h>
36
  #include <lualib.h>
37
extern int ob_epoll_wait(int __epfd, struct epoll_event *__events,
38
		                     int __maxevents, int __timeout);
39
}
40

41
using namespace oceanbase;
42
using namespace oceanbase::common;
43
using namespace oceanbase::diagnose;
44

45
void ObLuaHandler::memory_update(const int size)
46
{
47
  if (0 == size) {
48
    OB_LOG_RET(ERROR, OB_INVALID_ARGUMENT, "bad param", K(size));
49
  } else if (size > 0) {
50
    ++alloc_count_;
51
    alloc_size_ += size;
52
  } else {
53
    ++free_count_;
54
    free_size_ -= size;
55
  }
56
}
57

58
void *ObLuaHandler::realloc_functor(void *userdata, void *ptr, size_t osize, size_t nsize)
59
{
60
  void *ret = nullptr;
61
  UNUSED(userdata);
62
  UNUSED(osize);
63
  if (0 == nsize) {
64
    // do nothing
65
  } else if (OB_NOT_NULL(ret = diagnose::alloc(nsize))) {
66
    if (OB_NOT_NULL(ptr)) {
67
      memmove(ret, ptr, std::min(osize, nsize));
68
    }
69
  }
70
  if (OB_NOT_NULL(ptr)) {
71
    diagnose::free(ptr);
72
  }
73
  return ret;
74
}
75

76
int ObLuaHandler::process(const char* lua_code)
77
{
78
  bool has_segv = false;
79
  struct LuaExec {
80
    LuaExec(const char* lua_code) : code_(lua_code) {}
81
    void operator()() {
82
      lua_State* L = lua_newstate(realloc_functor, nullptr);
83
      if (OB_ISNULL(L)) {
84
        OB_LOG_RET(ERROR, OB_INVALID_ARGUMENT, "luastate is NULL");
85
      } else {
86
        luaL_openlibs(L);
87
        APIRegister::get_instance().register_api(L);
88
        try {
89
          luaL_dostring(L, code_);
90
        } catch (std::exception& e) {
91
          _OB_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "exception during lua code execution, reason %s", e.what());
92
        }
93
        lua_close(L);
94
      }
95
    }
96
    const char* code_;
97
  };
98
  OB_LOG(INFO, "Lua code was executed", K(alloc_count_), K(free_count_), K(alloc_size_), K(free_size_));
99
  LuaExec func(lua_code);
100
  do_with_crash_restore(func, has_segv);
101
  if (has_segv) {
102
    _OB_LOG(INFO, "restore from sigsegv, coredump during lua code execution at %s\n", crash_restore_buffer);
103
  } else {
104
    OB_LOG(INFO, "Lua code was executed successfully", K(alloc_count_), K(free_count_), K(alloc_size_), K(free_size_));
105
  }
106

107
  struct LuaGC {
108
    LuaGC(common::ObVector<Function>& destructors) : destructors_(destructors) {}
109
    void operator()() {
110
      for (int64_t i = destructors_.size() - 1; i >= 0 ; --i) {
111
        destructors_[i]();
112
      }
113
      destructors_.clear();
114
    }
115
    common::ObVector<Function>& destructors_;
116
  };
117
  LuaGC gc(destructors_);
118
  do_with_crash_restore(gc, has_segv);
119
  if (has_segv) {
120
    _OB_LOG(INFO, "restore from sigsegv, coredump during lua gc at %s\n", crash_restore_buffer);
121
  } else {
122
    OB_LOG(INFO, "Lua gc successfully", K(alloc_count_), K(free_count_), K(alloc_size_), K(free_size_));
123
  }
124
  return OB_SUCCESS;
125
}
126

127
void ObUnixDomainListener::run1()
128
{
129
  int ret = OB_SUCCESS;
130
  if ((listen_fd_ = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0)) < 0) {
131
    OB_LOG(ERROR, "ObUnixDomainListener socket init failed", K(errno));
132
    ret = OB_ERR_UNEXPECTED;
133
  } else {
134
    struct sockaddr_un s;
135
    struct epoll_event listen_ev;
136
    int epoll_fd = epoll_create(256);
137
    s.sun_family = AF_UNIX;
138
    strncpy(s.sun_path, addr, sizeof(s.sun_path) - 1);
139
    unlink(addr);
140
    listen_ev.events = EPOLLIN;    
141
    listen_ev.data.fd = listen_fd_;
142
    if (bind(listen_fd_, (struct sockaddr *)&s, sizeof(s)) < 0) {
143
      OB_LOG(ERROR, "ObUnixDomainListener bind failed", K(listen_fd_), K(errno));
144
      ret = OB_ERR_UNEXPECTED;
145
    } else if (listen(listen_fd_, MAX_CONNECTION_QUEUE_LENGTH) < 0) {
146
      OB_LOG(ERROR, "ObUnixDomainListener listen failed", K(listen_fd_), K(errno));
147
      ret = OB_ERR_UNEXPECTED;
148
    } else if (epoll_fd < 0) {
149
      OB_LOG(ERROR, "ObUnixDomainListener create epoll failed", K(errno));
150
      ret = OB_ERR_UNEXPECTED;
151
    } else if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd_, &listen_ev) < 0) {
152
      OB_LOG(ERROR, "ObUnixDomainListener add listen to epoll failed", K(errno));
153
      ret = OB_ERR_UNEXPECTED;
154
    } else {
155
      lib::set_thread_name("LuaHandler");
156
      constexpr int64_t EPOLL_EVENT_BUFFER_SIZE = 32;
157
      constexpr int64_t TIMEOUT = 1000;
158
      struct epoll_event events[EPOLL_EVENT_BUFFER_SIZE];
159
      struct epoll_event conn_ev;
160
      char *code_buffer = (char *)diagnose::alloc(CODE_BUFFER_SIZE);
161
      while (OB_LIKELY(!has_set_stop())) {
162
        int conn_fd = -1;
163
        int ret = OB_SUCCESS;
164
        int64_t event_cnt = ob_epoll_wait(epoll_fd, events, EPOLL_EVENT_BUFFER_SIZE, TIMEOUT);
165
        if (event_cnt < 0) {
166
          if (EINTR == errno) {
167
            // timeout, ignore
168
          } else {
169
            OB_LOG(ERROR, "ObUnixDomainListener epoll wait failed", K(epoll_fd), K(errno));
170
          }
171
        }
172
        for (int64_t i = 0; i < event_cnt; ++i) {
173
          if (events[i].data.fd == listen_fd_) {
174
            if ((conn_fd = accept(listen_fd_, NULL, NULL)) < 0) {
175
              if (EAGAIN != errno) {
176
                ret = OB_ERR_UNEXPECTED;
177
                OB_LOG(ERROR, "ObUnixDomainListener accept failed", K(listen_fd_), K(errno));
178
              }
179
            } else {
180
              conn_ev.events = EPOLLIN;
181
              conn_ev.data.fd = conn_fd;
182
              if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &conn_ev) < 0) {
183
                OB_LOG(ERROR, "ObUnixDomainListener add event to epoll failed", K(epoll_fd), K(conn_fd), K(errno));
184
              }
185
            }
186
          } else if (events[i].events & EPOLLIN) {
187
            int rbytes = 0;
188
            char *buffer = code_buffer;
189
            int64_t size = CODE_BUFFER_SIZE;
190
            memset(code_buffer, 0, CODE_BUFFER_SIZE);
191
            while ((rbytes = read(events[i].data.fd, buffer, size)) > 0) {
192
              buffer += rbytes;
193
              size -= rbytes;
194
            }
195
            if (rbytes < 0) {
196
              OB_LOG(ERROR, "ObUnixDomainListener read from socket failed", K(errno));
197
            } else if (FALSE_IT(APIRegister::get_instance().set_fd(events[i].data.fd))) {
198
              // do nothing
199
            } else if (OB_FAIL(ObLuaHandler::get_instance().process(code_buffer))) {
200
              OB_LOG(ERROR, "ObUnixDomainListener process failed", K(ret));
201
            } else if (OB_FAIL(APIRegister::get_instance().flush())) {
202
              OB_LOG(ERROR, "ObUnixDomainListener flush failed", K(ret));
203
            } else {
204
              // do nothing
205
            }
206
            if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, nullptr) < 0) {
207
              OB_LOG(ERROR, "ObUnixDomainListener del failed", K(errno));
208
            }
209
            close(events[i].data.fd);
210
            APIRegister::get_instance().set_fd(-1);
211
          } else {
212
            OB_LOG(ERROR, "unexpected type");
213
          }
214
        }
215
      }
216
      diagnose::free(code_buffer);
217
      close(listen_fd_);
218
      listen_fd_ = -1;
219
      close(epoll_fd);
220
      OB_LOG(INFO, "ObUnixDomainListener running");
221
    }
222
  }
223
}
224

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.