ClickHouse

Форк
0
/
ExternalTable.cpp 
209 строк · 8.0 Кб
1
#include <boost/program_options.hpp>
2
#include <DataTypes/DataTypeFactory.h>
3
#include <Storages/IStorage.h>
4
#include <Storages/ColumnsDescription.h>
5
#include <Storages/ConstraintsDescription.h>
6
#include <Interpreters/Context.h>
7
#include <Interpreters/DatabaseCatalog.h>
8
#include <IO/ReadBufferFromFile.h>
9
#include <IO/LimitReadBuffer.h>
10

11
#include <QueryPipeline/Pipe.h>
12
#include <Processors/Executors/PipelineExecutor.h>
13
#include <Processors/Sinks/SinkToStorage.h>
14
#include <Processors/Executors/CompletedPipelineExecutor.h>
15
#include <Processors/Formats/IInputFormat.h>
16
#include <QueryPipeline/QueryPipelineBuilder.h>
17

18
#include <Core/ExternalTable.h>
19
#include <Poco/Net/MessageHeader.h>
20
#include <Parsers/ASTNameTypePair.h>
21
#include <Parsers/ParserCreateQuery.h>
22
#include <Parsers/parseQuery.h>
23
#include <base/scope_guard.h>
24

25

26
namespace DB
27
{
28

29
namespace ErrorCodes
30
{
31
    extern const int BAD_ARGUMENTS;
32
}
33

34
/// Parsing a list of types with `,` as separator. For example, `Int, Enum('foo'=1,'bar'=2), Double`
35
/// Used in `parseStructureFromTypesField`
36
class ParserTypeList : public IParserBase
37
{
38
protected:
39
    const char * getName() const override { return "type pair list"; }
40
    bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override
41
    {
42
        return ParserList(std::make_unique<ParserDataType>(), std::make_unique<ParserToken>(TokenType::Comma), false)
43
        .parse(pos, node, expected);
44
    }
45
};
46

47
ExternalTableDataPtr BaseExternalTable::getData(ContextPtr context)
48
{
49
    initReadBuffer();
50
    initSampleBlock();
51
    auto input = context->getInputFormat(format, *read_buffer, sample_block, context->getSettingsRef().get("max_block_size").get<UInt64>());
52

53
    auto data = std::make_unique<ExternalTableData>();
54
    data->pipe = std::make_unique<QueryPipelineBuilder>();
55
    data->pipe->init(Pipe(std::move(input)));
56
    data->table_name = name;
57

58
    return data;
59
}
60

61
void BaseExternalTable::clear()
62
{
63
    name.clear();
64
    file.clear();
65
    format.clear();
66
    structure.clear();
67
    sample_block.clear();
68
    read_buffer.reset();
69
}
70

71
void BaseExternalTable::parseStructureFromStructureField(const std::string & argument)
72
{
73
    ParserNameTypePairList parser;
74
    const auto * pos = argument.data();
75
    String error;
76
    ASTPtr columns_list_raw = tryParseQuery(parser, pos, pos + argument.size(), error, false, "", false, DBMS_DEFAULT_MAX_QUERY_SIZE, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS, true);
77

78
    if (!columns_list_raw)
79
        throw Exception(ErrorCodes::BAD_ARGUMENTS, "Error while parsing table structure: {}", error);
80

81
    for (auto & child : columns_list_raw->children)
82
    {
83
        auto * column = child->as<ASTNameTypePair>();
84
        /// We use `formatWithPossiblyHidingSensitiveData` instead of `getColumnNameWithoutAlias` because `column->type` is an ASTFunction.
85
        /// `getColumnNameWithoutAlias` will return name of the function with `(arguments)` even if arguments is empty.
86
        if (column)
87
            structure.emplace_back(column->name, column->type->formatWithPossiblyHidingSensitiveData(0, true, true));
88
        else
89
            throw Exception(ErrorCodes::BAD_ARGUMENTS, "Error while parsing table structure: expected column definition, got {}", child->formatForErrorMessage());
90
    }
91
}
92

93
void BaseExternalTable::parseStructureFromTypesField(const std::string & argument)
94
{
95
    ParserTypeList parser;
96
    const auto * pos = argument.data();
97
    String error;
98
    ASTPtr type_list_raw = tryParseQuery(parser, pos, pos+argument.size(), error, false, "", false, DBMS_DEFAULT_MAX_QUERY_SIZE, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS, true);
99

100
    if (!type_list_raw)
101
        throw Exception(ErrorCodes::BAD_ARGUMENTS, "Error while parsing table structure: {}", error);
102

103
    for (size_t i = 0; i < type_list_raw->children.size(); ++i)
104
        structure.emplace_back("_" + toString(i + 1), type_list_raw->children[i]->formatWithPossiblyHidingSensitiveData(0, true, true));
105
}
106

107
void BaseExternalTable::initSampleBlock()
108
{
109
    const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
110

111
    for (const auto & elem : structure)
112
    {
113
        ColumnWithTypeAndName column;
114
        column.name = elem.first;
115
        column.type = data_type_factory.get(elem.second);
116
        column.column = column.type->createColumn();
117
        sample_block.insert(std::move(column));
118
    }
119
}
120

121

122
void ExternalTable::initReadBuffer()
123
{
124
    if (file == "-")
125
        read_buffer = std::make_unique<ReadBufferFromFileDescriptor>(STDIN_FILENO);
126
    else
127
        read_buffer = std::make_unique<ReadBufferFromFile>(file);
128
}
129

130
ExternalTable::ExternalTable(const boost::program_options::variables_map & external_options)
131
{
132
    if (external_options.count("file"))
133
        file = external_options["file"].as<std::string>();
134
    else
135
        throw Exception(ErrorCodes::BAD_ARGUMENTS, "--file field have not been provided for external table");
136

137
    if (external_options.count("name"))
138
        name = external_options["name"].as<std::string>();
139
    else
140
        throw Exception(ErrorCodes::BAD_ARGUMENTS, "--name field have not been provided for external table");
141

142
    if (external_options.count("format"))
143
        format = external_options["format"].as<std::string>();
144
    else
145
        throw Exception(ErrorCodes::BAD_ARGUMENTS, "--format field have not been provided for external table");
146

147
    if (external_options.count("structure"))
148
        parseStructureFromStructureField(external_options["structure"].as<std::string>());
149
    else if (external_options.count("types"))
150
        parseStructureFromTypesField(external_options["types"].as<std::string>());
151
    else
152
        throw Exception(ErrorCodes::BAD_ARGUMENTS, "Neither --structure nor --types have not been provided for external table");
153
}
154

155

156
void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header, ReadBuffer & stream)
157
{
158
    /// After finishing this function we will be ready to receive the next file, for this we clear all the information received.
159
    /// We should use SCOPE_EXIT because read_buffer should be reset correctly if there will be an exception.
160
    SCOPE_EXIT(clear());
161

162
    const Settings & settings = getContext()->getSettingsRef();
163

164
    if (settings.http_max_multipart_form_data_size)
165
        read_buffer = std::make_unique<LimitReadBuffer>(
166
            stream, settings.http_max_multipart_form_data_size,
167
            /* trow_exception */ true, /* exact_limit */ std::optional<size_t>(),
168
            "the maximum size of multipart/form-data. "
169
            "This limit can be tuned by 'http_max_multipart_form_data_size' setting");
170
    else
171
        read_buffer = wrapReadBufferReference(stream);
172

173
    /// Retrieve a collection of parameters from MessageHeader
174
    Poco::Net::NameValueCollection content;
175
    std::string label;
176
    Poco::Net::MessageHeader::splitParameters(header.get("Content-Disposition"), label, content);
177

178
    /// Get parameters
179
    name = content.get("name", "_data");
180
    format = params.get(name + "_format", "TabSeparated");
181

182
    if (params.has(name + "_structure"))
183
        parseStructureFromStructureField(params.get(name + "_structure"));
184
    else if (params.has(name + "_types"))
185
        parseStructureFromTypesField(params.get(name + "_types"));
186
    else
187
        throw Exception(ErrorCodes::BAD_ARGUMENTS,
188
                        "Neither structure nor types have not been provided for external table {}. "
189
                        "Use fields {}_structure or {}_types to do so.", name, name, name);
190

191
    ExternalTableDataPtr data = getData(getContext());
192

193
    /// Create table
194
    NamesAndTypesList columns = sample_block.getNamesAndTypesList();
195
    auto temporary_table = TemporaryTableHolder(getContext(), ColumnsDescription{columns}, {});
196
    auto storage = temporary_table.getTable();
197
    getContext()->addExternalTable(data->table_name, std::move(temporary_table));
198
    auto sink = storage->write(ASTPtr(), storage->getInMemoryMetadataPtr(), getContext(), /*async_insert=*/false);
199

200
    /// Write data
201
    auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*data->pipe));
202
    pipeline.complete(std::move(sink));
203
    pipeline.setNumThreads(1);
204

205
    CompletedPipelineExecutor executor(pipeline);
206
    executor.execute();
207
}
208

209
}
210

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.