2
* Copyright (c) 2021 OceanBase
3
* OceanBase CE is licensed under Mulan PubL v2.
4
* You can use this software according to the terms and conditions of the Mulan PubL v2.
5
* You may obtain a copy of Mulan PubL v2 at:
6
* http://license.coscl.org.cn/MulanPubL-2.0
7
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
8
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
9
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
10
* See the Mulan PubL v2 for more details.
13
#include "ob_lua_api.h"
14
#include "ob_lua_handler.h"
21
#include <sys/socket.h>
25
#include "lib/oblog/ob_log.h"
26
#include "lib/atomic/ob_atomic.h"
27
#include "lib/signal/ob_signal_utils.h"
28
#include "lib/thread/ob_thread_name.h"
29
#include "lib/thread/protected_stack_allocator.h"
30
#include "lib/utility/utility.h"
31
#include "lib/thread/thread.h"
37
extern int ob_epoll_wait(int __epfd, struct epoll_event *__events,
38
int __maxevents, int __timeout);
41
using namespace oceanbase;
42
using namespace oceanbase::common;
43
using namespace oceanbase::diagnose;
45
void ObLuaHandler::memory_update(const int size)
48
OB_LOG_RET(ERROR, OB_INVALID_ARGUMENT, "bad param", K(size));
49
} else if (size > 0) {
58
void *ObLuaHandler::realloc_functor(void *userdata, void *ptr, size_t osize, size_t nsize)
65
} else if (OB_NOT_NULL(ret = diagnose::alloc(nsize))) {
66
if (OB_NOT_NULL(ptr)) {
67
memmove(ret, ptr, std::min(osize, nsize));
70
if (OB_NOT_NULL(ptr)) {
76
int ObLuaHandler::process(const char* lua_code)
78
bool has_segv = false;
80
LuaExec(const char* lua_code) : code_(lua_code) {}
82
lua_State* L = lua_newstate(realloc_functor, nullptr);
84
OB_LOG_RET(ERROR, OB_INVALID_ARGUMENT, "luastate is NULL");
87
APIRegister::get_instance().register_api(L);
89
luaL_dostring(L, code_);
90
} catch (std::exception& e) {
91
_OB_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "exception during lua code execution, reason %s", e.what());
98
OB_LOG(INFO, "Lua code was executed", K(alloc_count_), K(free_count_), K(alloc_size_), K(free_size_));
99
LuaExec func(lua_code);
100
do_with_crash_restore(func, has_segv);
102
_OB_LOG(INFO, "restore from sigsegv, coredump during lua code execution at %s\n", crash_restore_buffer);
104
OB_LOG(INFO, "Lua code was executed successfully", K(alloc_count_), K(free_count_), K(alloc_size_), K(free_size_));
108
LuaGC(common::ObVector<Function>& destructors) : destructors_(destructors) {}
110
for (int64_t i = destructors_.size() - 1; i >= 0 ; --i) {
113
destructors_.clear();
115
common::ObVector<Function>& destructors_;
117
LuaGC gc(destructors_);
118
do_with_crash_restore(gc, has_segv);
120
_OB_LOG(INFO, "restore from sigsegv, coredump during lua gc at %s\n", crash_restore_buffer);
122
OB_LOG(INFO, "Lua gc successfully", K(alloc_count_), K(free_count_), K(alloc_size_), K(free_size_));
127
void ObUnixDomainListener::run1()
129
int ret = OB_SUCCESS;
130
if ((listen_fd_ = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0)) < 0) {
131
OB_LOG(ERROR, "ObUnixDomainListener socket init failed", K(errno));
132
ret = OB_ERR_UNEXPECTED;
134
struct sockaddr_un s;
135
struct epoll_event listen_ev;
136
int epoll_fd = epoll_create(256);
137
s.sun_family = AF_UNIX;
138
strncpy(s.sun_path, addr, sizeof(s.sun_path) - 1);
140
listen_ev.events = EPOLLIN;
141
listen_ev.data.fd = listen_fd_;
142
if (bind(listen_fd_, (struct sockaddr *)&s, sizeof(s)) < 0) {
143
OB_LOG(ERROR, "ObUnixDomainListener bind failed", K(listen_fd_), K(errno));
144
ret = OB_ERR_UNEXPECTED;
145
} else if (listen(listen_fd_, MAX_CONNECTION_QUEUE_LENGTH) < 0) {
146
OB_LOG(ERROR, "ObUnixDomainListener listen failed", K(listen_fd_), K(errno));
147
ret = OB_ERR_UNEXPECTED;
148
} else if (epoll_fd < 0) {
149
OB_LOG(ERROR, "ObUnixDomainListener create epoll failed", K(errno));
150
ret = OB_ERR_UNEXPECTED;
151
} else if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd_, &listen_ev) < 0) {
152
OB_LOG(ERROR, "ObUnixDomainListener add listen to epoll failed", K(errno));
153
ret = OB_ERR_UNEXPECTED;
155
lib::set_thread_name("LuaHandler");
156
constexpr int64_t EPOLL_EVENT_BUFFER_SIZE = 32;
157
constexpr int64_t TIMEOUT = 1000;
158
struct epoll_event events[EPOLL_EVENT_BUFFER_SIZE];
159
struct epoll_event conn_ev;
160
char *code_buffer = (char *)diagnose::alloc(CODE_BUFFER_SIZE);
161
while (OB_LIKELY(!has_set_stop())) {
163
int ret = OB_SUCCESS;
164
int64_t event_cnt = ob_epoll_wait(epoll_fd, events, EPOLL_EVENT_BUFFER_SIZE, TIMEOUT);
166
if (EINTR == errno) {
169
OB_LOG(ERROR, "ObUnixDomainListener epoll wait failed", K(epoll_fd), K(errno));
172
for (int64_t i = 0; i < event_cnt; ++i) {
173
if (events[i].data.fd == listen_fd_) {
174
if ((conn_fd = accept(listen_fd_, NULL, NULL)) < 0) {
175
if (EAGAIN != errno) {
176
ret = OB_ERR_UNEXPECTED;
177
OB_LOG(ERROR, "ObUnixDomainListener accept failed", K(listen_fd_), K(errno));
180
conn_ev.events = EPOLLIN;
181
conn_ev.data.fd = conn_fd;
182
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, conn_fd, &conn_ev) < 0) {
183
OB_LOG(ERROR, "ObUnixDomainListener add event to epoll failed", K(epoll_fd), K(conn_fd), K(errno));
186
} else if (events[i].events & EPOLLIN) {
188
char *buffer = code_buffer;
189
int64_t size = CODE_BUFFER_SIZE;
190
memset(code_buffer, 0, CODE_BUFFER_SIZE);
191
while ((rbytes = read(events[i].data.fd, buffer, size)) > 0) {
196
OB_LOG(ERROR, "ObUnixDomainListener read from socket failed", K(errno));
197
} else if (FALSE_IT(APIRegister::get_instance().set_fd(events[i].data.fd))) {
199
} else if (OB_FAIL(ObLuaHandler::get_instance().process(code_buffer))) {
200
OB_LOG(ERROR, "ObUnixDomainListener process failed", K(ret));
201
} else if (OB_FAIL(APIRegister::get_instance().flush())) {
202
OB_LOG(ERROR, "ObUnixDomainListener flush failed", K(ret));
206
if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, nullptr) < 0) {
207
OB_LOG(ERROR, "ObUnixDomainListener del failed", K(errno));
209
close(events[i].data.fd);
210
APIRegister::get_instance().set_fd(-1);
212
OB_LOG(ERROR, "unexpected type");
216
diagnose::free(code_buffer);
220
OB_LOG(INFO, "ObUnixDomainListener running");