ClickHouse

Форк
0
/
ShellCommandSource.cpp 
697 строк · 21.8 Кб
1
#include <Processors/Sources/ShellCommandSource.h>
2

3
#include <poll.h>
4

5
#include <Common/Stopwatch.h>
6
#include <Common/logger_useful.h>
7

8
#include <IO/WriteHelpers.h>
9
#include <IO/ReadHelpers.h>
10

11
#include <QueryPipeline/Pipe.h>
12
#include <Processors/ISimpleTransform.h>
13
#include <Processors/Formats/IOutputFormat.h>
14
#include <Processors/Executors/CompletedPipelineExecutor.h>
15
#include <Interpreters/Context.h>
16
#include <boost/circular_buffer.hpp>
17

18

19
namespace DB
20
{
21

22
namespace ErrorCodes
23
{
24
    extern const int UNSUPPORTED_METHOD;
25
    extern const int TIMEOUT_EXCEEDED;
26
    extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
27
    extern const int CANNOT_WRITE_TO_FILE_DESCRIPTOR;
28
    extern const int CANNOT_FCNTL;
29
    extern const int CANNOT_POLL;
30
}
31

32
static bool tryMakeFdNonBlocking(int fd)
33
{
34
    int flags = fcntl(fd, F_GETFL, 0);
35
    if (-1 == flags)
36
        return false;
37
    if (-1 == fcntl(fd, F_SETFL, flags | O_NONBLOCK))
38
        return false;
39

40
    return true;
41
}
42

43
static void makeFdNonBlocking(int fd)
44
{
45
    bool result = tryMakeFdNonBlocking(fd);
46
    if (!result)
47
        throw ErrnoException(ErrorCodes::CANNOT_FCNTL, "Cannot set non-blocking mode of pipe");
48
}
49

50
static bool tryMakeFdBlocking(int fd)
51
{
52
    int flags = fcntl(fd, F_GETFL, 0);
53
    if (-1 == flags)
54
        return false;
55

56
    if (-1 == fcntl(fd, F_SETFL, flags & (~O_NONBLOCK)))
57
        return false;
58

59
    return true;
60
}
61

62
static void makeFdBlocking(int fd)
63
{
64
    bool result = tryMakeFdBlocking(fd);
65
    if (!result)
66
        throw ErrnoException(ErrorCodes::CANNOT_FCNTL, "Cannot set blocking mode of pipe");
67
}
68

69
static int pollWithTimeout(pollfd * pfds, size_t num, size_t timeout_milliseconds)
70
{
71
    int res;
72

73
    while (true)
74
    {
75
        Stopwatch watch;
76
        res = poll(pfds, static_cast<nfds_t>(num), static_cast<int>(timeout_milliseconds));
77

78
        if (res < 0)
79
        {
80
            if (errno != EINTR)
81
                throw ErrnoException(ErrorCodes::CANNOT_POLL, "Cannot poll");
82

83
            const auto elapsed = watch.elapsedMilliseconds();
84
            if (timeout_milliseconds <= elapsed)
85
                break;
86
            timeout_milliseconds -= elapsed;
87
        }
88
        else
89
        {
90
            break;
91
        }
92
    }
93

94
    return res;
95
}
96

97
static bool pollFd(int fd, size_t timeout_milliseconds, int events)
98
{
99
    pollfd pfd;
100
    pfd.fd = fd;
101
    pfd.events = events;
102
    pfd.revents = 0;
103

104
    return pollWithTimeout(&pfd, 1, timeout_milliseconds) > 0;
105
}
106

107
class TimeoutReadBufferFromFileDescriptor : public BufferWithOwnMemory<ReadBuffer>
108
{
109
public:
110
    explicit TimeoutReadBufferFromFileDescriptor(
111
        int stdout_fd_,
112
        int stderr_fd_,
113
        size_t timeout_milliseconds_,
114
        ExternalCommandStderrReaction stderr_reaction_)
115
        : stdout_fd(stdout_fd_)
116
        , stderr_fd(stderr_fd_)
117
        , timeout_milliseconds(timeout_milliseconds_)
118
        , stderr_reaction(stderr_reaction_)
119
    {
120
        makeFdNonBlocking(stdout_fd);
121
        makeFdNonBlocking(stderr_fd);
122

123
        pfds[0].fd = stdout_fd;
124
        pfds[0].events = POLLIN;
125
        pfds[1].fd = stderr_fd;
126
        pfds[1].events = POLLIN;
127

128
        if (stderr_reaction == ExternalCommandStderrReaction::NONE)
129
            num_pfds = 1;
130
        else
131
            num_pfds = 2;
132
    }
133

134
    bool nextImpl() override
135
    {
136
        size_t bytes_read = 0;
137

138
        while (!bytes_read)
139
        {
140
            pfds[0].revents = 0;
141
            pfds[1].revents = 0;
142
            size_t num_events = pollWithTimeout(pfds, num_pfds, timeout_milliseconds);
143
            if (0 == num_events)
144
                throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Pipe read timeout exceeded {} milliseconds", timeout_milliseconds);
145

146
            bool has_stdout = pfds[0].revents > 0;
147
            bool has_stderr = pfds[1].revents > 0;
148

149
            if (has_stderr)
150
            {
151
                if (stderr_read_buf == nullptr)
152
                    stderr_read_buf.reset(new char[BUFFER_SIZE]);
153
                ssize_t res = ::read(stderr_fd, stderr_read_buf.get(), BUFFER_SIZE);
154
                if (res > 0)
155
                {
156
                    std::string_view str(stderr_read_buf.get(), res);
157
                    if (stderr_reaction == ExternalCommandStderrReaction::THROW)
158
                        throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Executable generates stderr: {}", str);
159
                    else if (stderr_reaction == ExternalCommandStderrReaction::LOG)
160
                        LOG_WARNING(
161
                            getLogger("TimeoutReadBufferFromFileDescriptor"), "Executable generates stderr: {}", str);
162
                    else if (stderr_reaction == ExternalCommandStderrReaction::LOG_FIRST)
163
                    {
164
                        res = std::min(ssize_t(stderr_result_buf.reserve()), res);
165
                        if (res > 0)
166
                            stderr_result_buf.insert(stderr_result_buf.end(), str.begin(), str.begin() + res);
167
                    }
168
                    else if (stderr_reaction == ExternalCommandStderrReaction::LOG_LAST)
169
                    {
170
                        stderr_result_buf.insert(stderr_result_buf.end(), str.begin(), str.begin() + res);
171
                    }
172
                }
173
            }
174

175
            if (has_stdout)
176
            {
177
                ssize_t res = ::read(stdout_fd, internal_buffer.begin(), internal_buffer.size());
178

179
                if (-1 == res && errno != EINTR)
180
                    throw ErrnoException(ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR, "Cannot read from pipe");
181

182
                if (res == 0)
183
                    break;
184

185
                if (res > 0)
186
                    bytes_read += res;
187
            }
188
        }
189

190
        if (bytes_read > 0)
191
        {
192
            working_buffer = internal_buffer;
193
            working_buffer.resize(bytes_read);
194
        }
195
        else
196
        {
197
            return false;
198
        }
199

200
        return true;
201
    }
202

203
    void reset() const
204
    {
205
        makeFdBlocking(stdout_fd);
206
        makeFdBlocking(stderr_fd);
207
    }
208

209
    ~TimeoutReadBufferFromFileDescriptor() override
210
    {
211
        tryMakeFdBlocking(stdout_fd);
212
        tryMakeFdBlocking(stderr_fd);
213

214
        if (!stderr_result_buf.empty())
215
        {
216
            String stderr_result;
217
            stderr_result.reserve(stderr_result_buf.size());
218
            stderr_result.append(stderr_result_buf.begin(), stderr_result_buf.end());
219
            LOG_WARNING(
220
                getLogger("ShellCommandSource"),
221
                "Executable generates stderr at the {}: {}",
222
                stderr_reaction == ExternalCommandStderrReaction::LOG_FIRST ? "beginning" : "end",
223
                stderr_result);
224
        }
225
    }
226

227
private:
228
    int stdout_fd;
229
    int stderr_fd;
230
    size_t timeout_milliseconds;
231
    ExternalCommandStderrReaction stderr_reaction;
232

233
    static constexpr size_t BUFFER_SIZE = 4_KiB;
234
    pollfd pfds[2];
235
    size_t num_pfds;
236
    std::unique_ptr<char[]> stderr_read_buf;
237
    boost::circular_buffer_space_optimized<char> stderr_result_buf{BUFFER_SIZE};
238
};
239

240
class TimeoutWriteBufferFromFileDescriptor : public BufferWithOwnMemory<WriteBuffer>
241
{
242
public:
243
    explicit TimeoutWriteBufferFromFileDescriptor(int fd_, size_t timeout_milliseconds_)
244
        : fd(fd_), timeout_milliseconds(timeout_milliseconds_)
245
    {
246
        makeFdNonBlocking(fd);
247
    }
248

249
    void nextImpl() override
250
    {
251
        if (!offset())
252
            return;
253

254
        size_t bytes_written = 0;
255

256
        while (bytes_written != offset())
257
        {
258
            if (!pollFd(fd, timeout_milliseconds, POLLOUT))
259
                throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Pipe write timeout exceeded {} milliseconds", timeout_milliseconds);
260

261
            ssize_t res = ::write(fd, working_buffer.begin() + bytes_written, offset() - bytes_written);
262

263
            if ((-1 == res || 0 == res) && errno != EINTR)
264
                throw ErrnoException(ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR, "Cannot write into pipe");
265

266
            if (res > 0)
267
                bytes_written += res;
268
        }
269
    }
270

271
    void reset() const
272
    {
273
        makeFdBlocking(fd);
274
    }
275

276
    ~TimeoutWriteBufferFromFileDescriptor() override
277
    {
278
        tryMakeFdBlocking(fd);
279
    }
280

281
private:
282
    int fd;
283
    size_t timeout_milliseconds;
284
};
285

286
class ShellCommandHolder
287
{
288
public:
289
    using ShellCommandBuilderFunc = std::function<std::unique_ptr<ShellCommand>()>;
290

291
    explicit ShellCommandHolder(ShellCommandBuilderFunc && func_)
292
        : func(std::move(func_))
293
    {}
294

295
    std::unique_ptr<ShellCommand> buildCommand()
296
    {
297
        if (returned_command)
298
            return std::move(returned_command);
299

300
        return func();
301
    }
302

303
    void returnCommand(std::unique_ptr<ShellCommand> command)
304
    {
305
        returned_command = std::move(command);
306
    }
307

308
private:
309
    std::unique_ptr<ShellCommand> returned_command;
310
    ShellCommandBuilderFunc func;
311
};
312

313
namespace
314
{
315
    /** A stream, that get child process and sends data using tasks in background threads.
316
    * For each send data task background thread is created. Send data task must send data to process input pipes.
317
    * ShellCommandPoolSource receives data from process stdout.
318
    *
319
    * If process_pool is passed in constructor then after source is destroyed process is returned to pool.
320
    */
321
    class ShellCommandSource final : public ISource
322
    {
323
    public:
324

325
        using SendDataTask = std::function<void(void)>;
326

327
        ShellCommandSource(
328
            ContextPtr context_,
329
            const std::string & format_,
330
            size_t command_read_timeout_milliseconds,
331
            ExternalCommandStderrReaction stderr_reaction,
332
            bool check_exit_code_,
333
            const Block & sample_block_,
334
            std::unique_ptr<ShellCommand> && command_,
335
            std::vector<SendDataTask> && send_data_tasks = {},
336
            const ShellCommandSourceConfiguration & configuration_ = {},
337
            std::unique_ptr<ShellCommandHolder> && command_holder_ = nullptr,
338
            std::shared_ptr<ProcessPool> process_pool_ = nullptr)
339
            : ISource(sample_block_)
340
            , context(context_)
341
            , format(format_)
342
            , sample_block(sample_block_)
343
            , command(std::move(command_))
344
            , configuration(configuration_)
345
            , timeout_command_out(command->out.getFD(), command->err.getFD(), command_read_timeout_milliseconds, stderr_reaction)
346
            , command_holder(std::move(command_holder_))
347
            , process_pool(process_pool_)
348
            , check_exit_code(check_exit_code_)
349
        {
350
            try
351
            {
352
                for (auto && send_data_task : send_data_tasks)
353
                {
354
                    send_data_threads.emplace_back([task = std::move(send_data_task), this]() mutable
355
                    {
356
                        try
357
                        {
358
                            task();
359
                        }
360
                        catch (...)
361
                        {
362
                            std::lock_guard lock(send_data_lock);
363
                            exception_during_send_data = std::current_exception();
364

365
                            /// task should be reset inside catch block or else it breaks d'tor
366
                            /// invariants such as in ~WriteBuffer.
367
                            task = {};
368
                        }
369
                    });
370
                }
371
                size_t max_block_size = configuration.max_block_size;
372

373
                if (configuration.read_fixed_number_of_rows)
374
                {
375
                    /** Currently parallel parsing input format cannot read exactly max_block_size rows from input,
376
                    * so it will be blocked on ReadBufferFromFileDescriptor because this file descriptor represent pipe that does not have eof.
377
                    */
378
                    auto context_for_reading = Context::createCopy(context);
379
                    context_for_reading->setSetting("input_format_parallel_parsing", false);
380
                    context = context_for_reading;
381

382
                    if (configuration.read_number_of_rows_from_process_output)
383
                    {
384
                        /// Initialize executor in generate
385
                        return;
386
                    }
387

388
                    max_block_size = configuration.number_of_rows_to_read;
389
                }
390

391
                pipeline = QueryPipeline(Pipe(context->getInputFormat(format, timeout_command_out, sample_block, max_block_size)));
392
                executor = std::make_unique<PullingPipelineExecutor>(pipeline);
393
            }
394
            catch (...)
395
            {
396
                cleanup();
397
                throw;
398
            }
399
        }
400

401
        ~ShellCommandSource() override
402
        {
403
            cleanup();
404
        }
405

406
    protected:
407
        void cleanup()
408
        {
409
            for (auto & thread : send_data_threads)
410
                if (thread.joinable())
411
                    thread.join();
412

413
            if (command_is_invalid)
414
                command = nullptr;
415

416
            if (command_holder && process_pool)
417
            {
418
                bool valid_command = configuration.read_fixed_number_of_rows && current_read_rows >= configuration.number_of_rows_to_read;
419

420
                if (command && valid_command)
421
                    command_holder->returnCommand(std::move(command));
422

423
                process_pool->returnObject(std::move(command_holder));
424
            }
425
        }
426

427
        Chunk generate() override
428
        {
429
            rethrowExceptionDuringSendDataIfNeeded();
430

431
            Chunk chunk;
432

433
            try
434
            {
435
                if (configuration.read_fixed_number_of_rows)
436
                {
437
                    if (!executor && configuration.read_number_of_rows_from_process_output)
438
                    {
439
                        readText(configuration.number_of_rows_to_read, timeout_command_out);
440
                        char dummy;
441
                        readChar(dummy, timeout_command_out);
442

443
                        size_t max_block_size = configuration.number_of_rows_to_read;
444
                        pipeline = QueryPipeline(Pipe(context->getInputFormat(format, timeout_command_out, sample_block, max_block_size)));
445
                        executor = std::make_unique<PullingPipelineExecutor>(pipeline);
446
                    }
447

448
                    if (current_read_rows >= configuration.number_of_rows_to_read)
449
                        return {};
450
                }
451

452
                if (!executor->pull(chunk))
453
                    return {};
454

455
                current_read_rows += chunk.getNumRows();
456
            }
457
            catch (...)
458
            {
459
                command_is_invalid = true;
460
                throw;
461
            }
462

463
            return chunk;
464
        }
465

466
        Status prepare() override
467
        {
468
            auto status = ISource::prepare();
469

470
            if (status == Status::Finished)
471
            {
472
                for (auto & thread : send_data_threads)
473
                    if (thread.joinable())
474
                        thread.join();
475

476
                if (check_exit_code)
477
                {
478
                    if (process_pool)
479
                    {
480
                        bool valid_command
481
                            = configuration.read_fixed_number_of_rows && current_read_rows >= configuration.number_of_rows_to_read;
482

483
                        // We can only wait for pooled commands when they are invalid.
484
                        if (!valid_command)
485
                            command->wait();
486
                    }
487
                    else
488
                        command->wait();
489
                }
490

491
                rethrowExceptionDuringSendDataIfNeeded();
492
            }
493

494
            return status;
495
        }
496

497
        String getName() const override { return "ShellCommandSource"; }
498

499
    private:
500

501
        void rethrowExceptionDuringSendDataIfNeeded()
502
        {
503
            std::lock_guard lock(send_data_lock);
504
            if (exception_during_send_data)
505
            {
506
                command_is_invalid = true;
507
                std::rethrow_exception(exception_during_send_data);
508
            }
509
        }
510

511
        ContextPtr context;
512
        std::string format;
513
        Block sample_block;
514

515
        std::unique_ptr<ShellCommand> command;
516
        ShellCommandSourceConfiguration configuration;
517

518
        TimeoutReadBufferFromFileDescriptor timeout_command_out;
519

520
        size_t current_read_rows = 0;
521

522
        ShellCommandHolderPtr command_holder;
523
        std::shared_ptr<ProcessPool> process_pool;
524

525
        bool check_exit_code = false;
526

527
        QueryPipeline pipeline;
528
        std::unique_ptr<PullingPipelineExecutor> executor;
529

530
        std::vector<ThreadFromGlobalPool> send_data_threads;
531

532
        std::mutex send_data_lock;
533
        std::exception_ptr exception_during_send_data;
534

535
        std::atomic<bool> command_is_invalid {false};
536
    };
537

538
    class SendingChunkHeaderTransform final : public ISimpleTransform
539
    {
540
    public:
541
        SendingChunkHeaderTransform(const Block & header, std::shared_ptr<TimeoutWriteBufferFromFileDescriptor> buffer_)
542
            : ISimpleTransform(header, header, false)
543
            , buffer(buffer_)
544
        {
545
        }
546

547
        String getName() const override { return "SendingChunkHeaderTransform"; }
548

549
    protected:
550

551
        void transform(Chunk & chunk) override
552
        {
553
            writeText(chunk.getNumRows(), *buffer);
554
            writeChar('\n', *buffer);
555
        }
556

557
    private:
558
        std::shared_ptr<TimeoutWriteBufferFromFileDescriptor> buffer;
559
    };
560

561
}
562

563
ShellCommandSourceCoordinator::ShellCommandSourceCoordinator(const Configuration & configuration_)
564
    : configuration(configuration_)
565
{
566
    if (configuration.is_executable_pool)
567
        process_pool = std::make_shared<ProcessPool>(configuration.pool_size ? configuration.pool_size : std::numeric_limits<size_t>::max());
568
}
569

570
Pipe ShellCommandSourceCoordinator::createPipe(
571
    const std::string & command,
572
    const std::vector<std::string> & arguments,
573
    std::vector<Pipe> && input_pipes,
574
    Block sample_block,
575
    ContextPtr context,
576
    const ShellCommandSourceConfiguration & source_configuration)
577
{
578
    ShellCommand::Config command_config(command);
579
    command_config.arguments = arguments;
580
    for (size_t i = 1; i < input_pipes.size(); ++i)
581
        command_config.write_fds.emplace_back(i + 2);
582

583
    std::unique_ptr<ShellCommand> process;
584
    std::unique_ptr<ShellCommandHolder> process_holder;
585

586
    auto destructor_strategy = ShellCommand::DestructorStrategy{true /*terminate_in_destructor*/, SIGTERM, configuration.command_termination_timeout_seconds};
587
    command_config.terminate_in_destructor_strategy = destructor_strategy;
588

589
    bool is_executable_pool = (process_pool != nullptr);
590
    if (is_executable_pool)
591
    {
592
        bool execute_direct = configuration.execute_direct;
593

594
        bool result = process_pool->tryBorrowObject(
595
            process_holder,
596
            [command_config, execute_direct]()
597
            {
598
                ShellCommandHolder::ShellCommandBuilderFunc func = [command_config, execute_direct]() mutable
599
                {
600
                    if (execute_direct)
601
                        return ShellCommand::executeDirect(command_config);
602
                    else
603
                        return ShellCommand::execute(command_config);
604
                };
605

606
                return std::make_unique<ShellCommandHolder>(std::move(func));
607
            },
608
            configuration.max_command_execution_time_seconds * 10000);
609

610
        if (!result)
611
            throw Exception(
612
                ErrorCodes::TIMEOUT_EXCEEDED,
613
                "Could not get process from pool, max command execution timeout exceeded {} seconds",
614
                configuration.max_command_execution_time_seconds);
615

616
        process = process_holder->buildCommand();
617
    }
618
    else
619
    {
620
        if (configuration.execute_direct)
621
            process = ShellCommand::executeDirect(command_config);
622
        else
623
            process = ShellCommand::execute(command_config);
624
    }
625

626
    std::vector<ShellCommandSource::SendDataTask> tasks;
627
    tasks.reserve(input_pipes.size());
628

629
    for (size_t i = 0; i < input_pipes.size(); ++i)
630
    {
631
        WriteBufferFromFile * write_buffer = nullptr;
632

633
        if (i == 0)
634
        {
635
            write_buffer = &process->in;
636
        }
637
        else
638
        {
639
            int descriptor = static_cast<int>(i) + 2;
640
            auto it = process->write_fds.find(descriptor);
641
            if (it == process->write_fds.end())
642
                throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Process does not contain descriptor to write {}", descriptor);
643

644
            write_buffer = &it->second;
645
        }
646

647
        int write_buffer_fd = write_buffer->getFD();
648
        auto timeout_write_buffer
649
            = std::make_shared<TimeoutWriteBufferFromFileDescriptor>(write_buffer_fd, configuration.command_write_timeout_milliseconds);
650

651
        input_pipes[i].resize(1);
652

653
        if (configuration.send_chunk_header)
654
        {
655
            auto transform = std::make_shared<SendingChunkHeaderTransform>(input_pipes[i].getHeader(), timeout_write_buffer);
656
            input_pipes[i].addTransform(std::move(transform));
657
        }
658

659
        auto pipeline = std::make_shared<QueryPipeline>(std::move(input_pipes[i]));
660
        auto out = context->getOutputFormat(configuration.format, *timeout_write_buffer, materializeBlock(pipeline->getHeader()));
661
        out->setAutoFlush();
662
        pipeline->complete(std::move(out));
663

664
        ShellCommandSource::SendDataTask task = [pipeline, timeout_write_buffer, write_buffer, is_executable_pool]()
665
        {
666
            CompletedPipelineExecutor executor(*pipeline);
667
            executor.execute();
668

669
            timeout_write_buffer->finalize();
670
            timeout_write_buffer->reset();
671

672
            if (!is_executable_pool)
673
            {
674
                write_buffer->close();
675
            }
676
        };
677

678
        tasks.emplace_back(std::move(task));
679
    }
680

681
    auto source = std::make_unique<ShellCommandSource>(
682
        context,
683
        configuration.format,
684
        configuration.command_read_timeout_milliseconds,
685
        configuration.stderr_reaction,
686
        configuration.check_exit_code,
687
        std::move(sample_block),
688
        std::move(process),
689
        std::move(tasks),
690
        source_configuration,
691
        std::move(process_holder),
692
        process_pool);
693

694
    return Pipe(std::move(source));
695
}
696

697
}
698

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.