ClickHouse
697 строк · 21.8 Кб
1#include <Processors/Sources/ShellCommandSource.h>
2
3#include <poll.h>
4
5#include <Common/Stopwatch.h>
6#include <Common/logger_useful.h>
7
8#include <IO/WriteHelpers.h>
9#include <IO/ReadHelpers.h>
10
11#include <QueryPipeline/Pipe.h>
12#include <Processors/ISimpleTransform.h>
13#include <Processors/Formats/IOutputFormat.h>
14#include <Processors/Executors/CompletedPipelineExecutor.h>
15#include <Interpreters/Context.h>
16#include <boost/circular_buffer.hpp>
17
18
19namespace DB
20{
21
22namespace ErrorCodes
23{
24extern const int UNSUPPORTED_METHOD;
25extern const int TIMEOUT_EXCEEDED;
26extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
27extern const int CANNOT_WRITE_TO_FILE_DESCRIPTOR;
28extern const int CANNOT_FCNTL;
29extern const int CANNOT_POLL;
30}
31
32static bool tryMakeFdNonBlocking(int fd)
33{
34int flags = fcntl(fd, F_GETFL, 0);
35if (-1 == flags)
36return false;
37if (-1 == fcntl(fd, F_SETFL, flags | O_NONBLOCK))
38return false;
39
40return true;
41}
42
43static void makeFdNonBlocking(int fd)
44{
45bool result = tryMakeFdNonBlocking(fd);
46if (!result)
47throw ErrnoException(ErrorCodes::CANNOT_FCNTL, "Cannot set non-blocking mode of pipe");
48}
49
50static bool tryMakeFdBlocking(int fd)
51{
52int flags = fcntl(fd, F_GETFL, 0);
53if (-1 == flags)
54return false;
55
56if (-1 == fcntl(fd, F_SETFL, flags & (~O_NONBLOCK)))
57return false;
58
59return true;
60}
61
62static void makeFdBlocking(int fd)
63{
64bool result = tryMakeFdBlocking(fd);
65if (!result)
66throw ErrnoException(ErrorCodes::CANNOT_FCNTL, "Cannot set blocking mode of pipe");
67}
68
69static int pollWithTimeout(pollfd * pfds, size_t num, size_t timeout_milliseconds)
70{
71int res;
72
73while (true)
74{
75Stopwatch watch;
76res = poll(pfds, static_cast<nfds_t>(num), static_cast<int>(timeout_milliseconds));
77
78if (res < 0)
79{
80if (errno != EINTR)
81throw ErrnoException(ErrorCodes::CANNOT_POLL, "Cannot poll");
82
83const auto elapsed = watch.elapsedMilliseconds();
84if (timeout_milliseconds <= elapsed)
85break;
86timeout_milliseconds -= elapsed;
87}
88else
89{
90break;
91}
92}
93
94return res;
95}
96
97static bool pollFd(int fd, size_t timeout_milliseconds, int events)
98{
99pollfd pfd;
100pfd.fd = fd;
101pfd.events = events;
102pfd.revents = 0;
103
104return pollWithTimeout(&pfd, 1, timeout_milliseconds) > 0;
105}
106
107class TimeoutReadBufferFromFileDescriptor : public BufferWithOwnMemory<ReadBuffer>
108{
109public:
110explicit TimeoutReadBufferFromFileDescriptor(
111int stdout_fd_,
112int stderr_fd_,
113size_t timeout_milliseconds_,
114ExternalCommandStderrReaction stderr_reaction_)
115: stdout_fd(stdout_fd_)
116, stderr_fd(stderr_fd_)
117, timeout_milliseconds(timeout_milliseconds_)
118, stderr_reaction(stderr_reaction_)
119{
120makeFdNonBlocking(stdout_fd);
121makeFdNonBlocking(stderr_fd);
122
123pfds[0].fd = stdout_fd;
124pfds[0].events = POLLIN;
125pfds[1].fd = stderr_fd;
126pfds[1].events = POLLIN;
127
128if (stderr_reaction == ExternalCommandStderrReaction::NONE)
129num_pfds = 1;
130else
131num_pfds = 2;
132}
133
134bool nextImpl() override
135{
136size_t bytes_read = 0;
137
138while (!bytes_read)
139{
140pfds[0].revents = 0;
141pfds[1].revents = 0;
142size_t num_events = pollWithTimeout(pfds, num_pfds, timeout_milliseconds);
143if (0 == num_events)
144throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Pipe read timeout exceeded {} milliseconds", timeout_milliseconds);
145
146bool has_stdout = pfds[0].revents > 0;
147bool has_stderr = pfds[1].revents > 0;
148
149if (has_stderr)
150{
151if (stderr_read_buf == nullptr)
152stderr_read_buf.reset(new char[BUFFER_SIZE]);
153ssize_t res = ::read(stderr_fd, stderr_read_buf.get(), BUFFER_SIZE);
154if (res > 0)
155{
156std::string_view str(stderr_read_buf.get(), res);
157if (stderr_reaction == ExternalCommandStderrReaction::THROW)
158throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Executable generates stderr: {}", str);
159else if (stderr_reaction == ExternalCommandStderrReaction::LOG)
160LOG_WARNING(
161getLogger("TimeoutReadBufferFromFileDescriptor"), "Executable generates stderr: {}", str);
162else if (stderr_reaction == ExternalCommandStderrReaction::LOG_FIRST)
163{
164res = std::min(ssize_t(stderr_result_buf.reserve()), res);
165if (res > 0)
166stderr_result_buf.insert(stderr_result_buf.end(), str.begin(), str.begin() + res);
167}
168else if (stderr_reaction == ExternalCommandStderrReaction::LOG_LAST)
169{
170stderr_result_buf.insert(stderr_result_buf.end(), str.begin(), str.begin() + res);
171}
172}
173}
174
175if (has_stdout)
176{
177ssize_t res = ::read(stdout_fd, internal_buffer.begin(), internal_buffer.size());
178
179if (-1 == res && errno != EINTR)
180throw ErrnoException(ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, "Cannot read from pipe");
181
182if (res == 0)
183break;
184
185if (res > 0)
186bytes_read += res;
187}
188}
189
190if (bytes_read > 0)
191{
192working_buffer = internal_buffer;
193working_buffer.resize(bytes_read);
194}
195else
196{
197return false;
198}
199
200return true;
201}
202
203void reset() const
204{
205makeFdBlocking(stdout_fd);
206makeFdBlocking(stderr_fd);
207}
208
209~TimeoutReadBufferFromFileDescriptor() override
210{
211tryMakeFdBlocking(stdout_fd);
212tryMakeFdBlocking(stderr_fd);
213
214if (!stderr_result_buf.empty())
215{
216String stderr_result;
217stderr_result.reserve(stderr_result_buf.size());
218stderr_result.append(stderr_result_buf.begin(), stderr_result_buf.end());
219LOG_WARNING(
220getLogger("ShellCommandSource"),
221"Executable generates stderr at the {}: {}",
222stderr_reaction == ExternalCommandStderrReaction::LOG_FIRST ? "beginning" : "end",
223stderr_result);
224}
225}
226
227private:
228int stdout_fd;
229int stderr_fd;
230size_t timeout_milliseconds;
231ExternalCommandStderrReaction stderr_reaction;
232
233static constexpr size_t BUFFER_SIZE = 4_KiB;
234pollfd pfds[2];
235size_t num_pfds;
236std::unique_ptr<char[]> stderr_read_buf;
237boost::circular_buffer_space_optimized<char> stderr_result_buf{BUFFER_SIZE};
238};
239
240class TimeoutWriteBufferFromFileDescriptor : public BufferWithOwnMemory<WriteBuffer>
241{
242public:
243explicit TimeoutWriteBufferFromFileDescriptor(int fd_, size_t timeout_milliseconds_)
244: fd(fd_), timeout_milliseconds(timeout_milliseconds_)
245{
246makeFdNonBlocking(fd);
247}
248
249void nextImpl() override
250{
251if (!offset())
252return;
253
254size_t bytes_written = 0;
255
256while (bytes_written != offset())
257{
258if (!pollFd(fd, timeout_milliseconds, POLLOUT))
259throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Pipe write timeout exceeded {} milliseconds", timeout_milliseconds);
260
261ssize_t res = ::write(fd, working_buffer.begin() + bytes_written, offset() - bytes_written);
262
263if ((-1 == res || 0 == res) && errno != EINTR)
264throw ErrnoException(ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR, "Cannot write into pipe");
265
266if (res > 0)
267bytes_written += res;
268}
269}
270
271void reset() const
272{
273makeFdBlocking(fd);
274}
275
276~TimeoutWriteBufferFromFileDescriptor() override
277{
278tryMakeFdBlocking(fd);
279}
280
281private:
282int fd;
283size_t timeout_milliseconds;
284};
285
286class ShellCommandHolder
287{
288public:
289using ShellCommandBuilderFunc = std::function<std::unique_ptr<ShellCommand>()>;
290
291explicit ShellCommandHolder(ShellCommandBuilderFunc && func_)
292: func(std::move(func_))
293{}
294
295std::unique_ptr<ShellCommand> buildCommand()
296{
297if (returned_command)
298return std::move(returned_command);
299
300return func();
301}
302
303void returnCommand(std::unique_ptr<ShellCommand> command)
304{
305returned_command = std::move(command);
306}
307
308private:
309std::unique_ptr<ShellCommand> returned_command;
310ShellCommandBuilderFunc func;
311};
312
313namespace
314{
315/** A stream, that get child process and sends data using tasks in background threads.
316* For each send data task background thread is created. Send data task must send data to process input pipes.
317* ShellCommandPoolSource receives data from process stdout.
318*
319* If process_pool is passed in constructor then after source is destroyed process is returned to pool.
320*/
321class ShellCommandSource final : public ISource
322{
323public:
324
325using SendDataTask = std::function<void(void)>;
326
327ShellCommandSource(
328ContextPtr context_,
329const std::string & format_,
330size_t command_read_timeout_milliseconds,
331ExternalCommandStderrReaction stderr_reaction,
332bool check_exit_code_,
333const Block & sample_block_,
334std::unique_ptr<ShellCommand> && command_,
335std::vector<SendDataTask> && send_data_tasks = {},
336const ShellCommandSourceConfiguration & configuration_ = {},
337std::unique_ptr<ShellCommandHolder> && command_holder_ = nullptr,
338std::shared_ptr<ProcessPool> process_pool_ = nullptr)
339: ISource(sample_block_)
340, context(context_)
341, format(format_)
342, sample_block(sample_block_)
343, command(std::move(command_))
344, configuration(configuration_)
345, timeout_command_out(command->out.getFD(), command->err.getFD(), command_read_timeout_milliseconds, stderr_reaction)
346, command_holder(std::move(command_holder_))
347, process_pool(process_pool_)
348, check_exit_code(check_exit_code_)
349{
350try
351{
352for (auto && send_data_task : send_data_tasks)
353{
354send_data_threads.emplace_back([task = std::move(send_data_task), this]() mutable
355{
356try
357{
358task();
359}
360catch (...)
361{
362std::lock_guard lock(send_data_lock);
363exception_during_send_data = std::current_exception();
364
365/// task should be reset inside catch block or else it breaks d'tor
366/// invariants such as in ~WriteBuffer.
367task = {};
368}
369});
370}
371size_t max_block_size = configuration.max_block_size;
372
373if (configuration.read_fixed_number_of_rows)
374{
375/** Currently parallel parsing input format cannot read exactly max_block_size rows from input,
376* so it will be blocked on ReadBufferFromFileDescriptor because this file descriptor represent pipe that does not have eof.
377*/
378auto context_for_reading = Context::createCopy(context);
379context_for_reading->setSetting("input_format_parallel_parsing", false);
380context = context_for_reading;
381
382if (configuration.read_number_of_rows_from_process_output)
383{
384/// Initialize executor in generate
385return;
386}
387
388max_block_size = configuration.number_of_rows_to_read;
389}
390
391pipeline = QueryPipeline(Pipe(context->getInputFormat(format, timeout_command_out, sample_block, max_block_size)));
392executor = std::make_unique<PullingPipelineExecutor>(pipeline);
393}
394catch (...)
395{
396cleanup();
397throw;
398}
399}
400
401~ShellCommandSource() override
402{
403cleanup();
404}
405
406protected:
407void cleanup()
408{
409for (auto & thread : send_data_threads)
410if (thread.joinable())
411thread.join();
412
413if (command_is_invalid)
414command = nullptr;
415
416if (command_holder && process_pool)
417{
418bool valid_command = configuration.read_fixed_number_of_rows && current_read_rows >= configuration.number_of_rows_to_read;
419
420if (command && valid_command)
421command_holder->returnCommand(std::move(command));
422
423process_pool->returnObject(std::move(command_holder));
424}
425}
426
427Chunk generate() override
428{
429rethrowExceptionDuringSendDataIfNeeded();
430
431Chunk chunk;
432
433try
434{
435if (configuration.read_fixed_number_of_rows)
436{
437if (!executor && configuration.read_number_of_rows_from_process_output)
438{
439readText(configuration.number_of_rows_to_read, timeout_command_out);
440char dummy;
441readChar(dummy, timeout_command_out);
442
443size_t max_block_size = configuration.number_of_rows_to_read;
444pipeline = QueryPipeline(Pipe(context->getInputFormat(format, timeout_command_out, sample_block, max_block_size)));
445executor = std::make_unique<PullingPipelineExecutor>(pipeline);
446}
447
448if (current_read_rows >= configuration.number_of_rows_to_read)
449return {};
450}
451
452if (!executor->pull(chunk))
453return {};
454
455current_read_rows += chunk.getNumRows();
456}
457catch (...)
458{
459command_is_invalid = true;
460throw;
461}
462
463return chunk;
464}
465
466Status prepare() override
467{
468auto status = ISource::prepare();
469
470if (status == Status::Finished)
471{
472for (auto & thread : send_data_threads)
473if (thread.joinable())
474thread.join();
475
476if (check_exit_code)
477{
478if (process_pool)
479{
480bool valid_command
481= configuration.read_fixed_number_of_rows && current_read_rows >= configuration.number_of_rows_to_read;
482
483// We can only wait for pooled commands when they are invalid.
484if (!valid_command)
485command->wait();
486}
487else
488command->wait();
489}
490
491rethrowExceptionDuringSendDataIfNeeded();
492}
493
494return status;
495}
496
497String getName() const override { return "ShellCommandSource"; }
498
499private:
500
501void rethrowExceptionDuringSendDataIfNeeded()
502{
503std::lock_guard lock(send_data_lock);
504if (exception_during_send_data)
505{
506command_is_invalid = true;
507std::rethrow_exception(exception_during_send_data);
508}
509}
510
511ContextPtr context;
512std::string format;
513Block sample_block;
514
515std::unique_ptr<ShellCommand> command;
516ShellCommandSourceConfiguration configuration;
517
518TimeoutReadBufferFromFileDescriptor timeout_command_out;
519
520size_t current_read_rows = 0;
521
522ShellCommandHolderPtr command_holder;
523std::shared_ptr<ProcessPool> process_pool;
524
525bool check_exit_code = false;
526
527QueryPipeline pipeline;
528std::unique_ptr<PullingPipelineExecutor> executor;
529
530std::vector<ThreadFromGlobalPool> send_data_threads;
531
532std::mutex send_data_lock;
533std::exception_ptr exception_during_send_data;
534
535std::atomic<bool> command_is_invalid {false};
536};
537
538class SendingChunkHeaderTransform final : public ISimpleTransform
539{
540public:
541SendingChunkHeaderTransform(const Block & header, std::shared_ptr<TimeoutWriteBufferFromFileDescriptor> buffer_)
542: ISimpleTransform(header, header, false)
543, buffer(buffer_)
544{
545}
546
547String getName() const override { return "SendingChunkHeaderTransform"; }
548
549protected:
550
551void transform(Chunk & chunk) override
552{
553writeText(chunk.getNumRows(), *buffer);
554writeChar('\n', *buffer);
555}
556
557private:
558std::shared_ptr<TimeoutWriteBufferFromFileDescriptor> buffer;
559};
560
561}
562
563ShellCommandSourceCoordinator::ShellCommandSourceCoordinator(const Configuration & configuration_)
564: configuration(configuration_)
565{
566if (configuration.is_executable_pool)
567process_pool = std::make_shared<ProcessPool>(configuration.pool_size ? configuration.pool_size : std::numeric_limits<size_t>::max());
568}
569
570Pipe ShellCommandSourceCoordinator::createPipe(
571const std::string & command,
572const std::vector<std::string> & arguments,
573std::vector<Pipe> && input_pipes,
574Block sample_block,
575ContextPtr context,
576const ShellCommandSourceConfiguration & source_configuration)
577{
578ShellCommand::Config command_config(command);
579command_config.arguments = arguments;
580for (size_t i = 1; i < input_pipes.size(); ++i)
581command_config.write_fds.emplace_back(i + 2);
582
583std::unique_ptr<ShellCommand> process;
584std::unique_ptr<ShellCommandHolder> process_holder;
585
586auto destructor_strategy = ShellCommand::DestructorStrategy{true /*terminate_in_destructor*/, SIGTERM, configuration.command_termination_timeout_seconds};
587command_config.terminate_in_destructor_strategy = destructor_strategy;
588
589bool is_executable_pool = (process_pool != nullptr);
590if (is_executable_pool)
591{
592bool execute_direct = configuration.execute_direct;
593
594bool result = process_pool->tryBorrowObject(
595process_holder,
596[command_config, execute_direct]()
597{
598ShellCommandHolder::ShellCommandBuilderFunc func = [command_config, execute_direct]() mutable
599{
600if (execute_direct)
601return ShellCommand::executeDirect(command_config);
602else
603return ShellCommand::execute(command_config);
604};
605
606return std::make_unique<ShellCommandHolder>(std::move(func));
607},
608configuration.max_command_execution_time_seconds * 10000);
609
610if (!result)
611throw Exception(
612ErrorCodes::TIMEOUT_EXCEEDED,
613"Could not get process from pool, max command execution timeout exceeded {} seconds",
614configuration.max_command_execution_time_seconds);
615
616process = process_holder->buildCommand();
617}
618else
619{
620if (configuration.execute_direct)
621process = ShellCommand::executeDirect(command_config);
622else
623process = ShellCommand::execute(command_config);
624}
625
626std::vector<ShellCommandSource::SendDataTask> tasks;
627tasks.reserve(input_pipes.size());
628
629for (size_t i = 0; i < input_pipes.size(); ++i)
630{
631WriteBufferFromFile * write_buffer = nullptr;
632
633if (i == 0)
634{
635write_buffer = &process->in;
636}
637else
638{
639int descriptor = static_cast<int>(i) + 2;
640auto it = process->write_fds.find(descriptor);
641if (it == process->write_fds.end())
642throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Process does not contain descriptor to write {}", descriptor);
643
644write_buffer = &it->second;
645}
646
647int write_buffer_fd = write_buffer->getFD();
648auto timeout_write_buffer
649= std::make_shared<TimeoutWriteBufferFromFileDescriptor>(write_buffer_fd, configuration.command_write_timeout_milliseconds);
650
651input_pipes[i].resize(1);
652
653if (configuration.send_chunk_header)
654{
655auto transform = std::make_shared<SendingChunkHeaderTransform>(input_pipes[i].getHeader(), timeout_write_buffer);
656input_pipes[i].addTransform(std::move(transform));
657}
658
659auto pipeline = std::make_shared<QueryPipeline>(std::move(input_pipes[i]));
660auto out = context->getOutputFormat(configuration.format, *timeout_write_buffer, materializeBlock(pipeline->getHeader()));
661out->setAutoFlush();
662pipeline->complete(std::move(out));
663
664ShellCommandSource::SendDataTask task = [pipeline, timeout_write_buffer, write_buffer, is_executable_pool]()
665{
666CompletedPipelineExecutor executor(*pipeline);
667executor.execute();
668
669timeout_write_buffer->finalize();
670timeout_write_buffer->reset();
671
672if (!is_executable_pool)
673{
674write_buffer->close();
675}
676};
677
678tasks.emplace_back(std::move(task));
679}
680
681auto source = std::make_unique<ShellCommandSource>(
682context,
683configuration.format,
684configuration.command_read_timeout_milliseconds,
685configuration.stderr_reaction,
686configuration.check_exit_code,
687std::move(sample_block),
688std::move(process),
689std::move(tasks),
690source_configuration,
691std::move(process_holder),
692process_pool);
693
694return Pipe(std::move(source));
695}
696
697}
698