ClickHouse
209 строк · 8.0 Кб
1#include <boost/program_options.hpp>
2#include <DataTypes/DataTypeFactory.h>
3#include <Storages/IStorage.h>
4#include <Storages/ColumnsDescription.h>
5#include <Storages/ConstraintsDescription.h>
6#include <Interpreters/Context.h>
7#include <Interpreters/DatabaseCatalog.h>
8#include <IO/ReadBufferFromFile.h>
9#include <IO/LimitReadBuffer.h>
10
11#include <QueryPipeline/Pipe.h>
12#include <Processors/Executors/PipelineExecutor.h>
13#include <Processors/Sinks/SinkToStorage.h>
14#include <Processors/Executors/CompletedPipelineExecutor.h>
15#include <Processors/Formats/IInputFormat.h>
16#include <QueryPipeline/QueryPipelineBuilder.h>
17
18#include <Core/ExternalTable.h>
19#include <Poco/Net/MessageHeader.h>
20#include <Parsers/ASTNameTypePair.h>
21#include <Parsers/ParserCreateQuery.h>
22#include <Parsers/parseQuery.h>
23#include <base/scope_guard.h>
24
25
26namespace DB
27{
28
29namespace ErrorCodes
30{
31extern const int BAD_ARGUMENTS;
32}
33
34/// Parsing a list of types with `,` as separator. For example, `Int, Enum('foo'=1,'bar'=2), Double`
35/// Used in `parseStructureFromTypesField`
36class ParserTypeList : public IParserBase
37{
38protected:
39const char * getName() const override { return "type pair list"; }
40bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override
41{
42return ParserList(std::make_unique<ParserDataType>(), std::make_unique<ParserToken>(TokenType::Comma), false)
43.parse(pos, node, expected);
44}
45};
46
47ExternalTableDataPtr BaseExternalTable::getData(ContextPtr context)
48{
49initReadBuffer();
50initSampleBlock();
51auto input = context->getInputFormat(format, *read_buffer, sample_block, context->getSettingsRef().get("max_block_size").get<UInt64>());
52
53auto data = std::make_unique<ExternalTableData>();
54data->pipe = std::make_unique<QueryPipelineBuilder>();
55data->pipe->init(Pipe(std::move(input)));
56data->table_name = name;
57
58return data;
59}
60
61void BaseExternalTable::clear()
62{
63name.clear();
64file.clear();
65format.clear();
66structure.clear();
67sample_block.clear();
68read_buffer.reset();
69}
70
71void BaseExternalTable::parseStructureFromStructureField(const std::string & argument)
72{
73ParserNameTypePairList parser;
74const auto * pos = argument.data();
75String error;
76ASTPtr columns_list_raw = tryParseQuery(parser, pos, pos + argument.size(), error, false, "", false, DBMS_DEFAULT_MAX_QUERY_SIZE, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS, true);
77
78if (!columns_list_raw)
79throw Exception(ErrorCodes::BAD_ARGUMENTS, "Error while parsing table structure: {}", error);
80
81for (auto & child : columns_list_raw->children)
82{
83auto * column = child->as<ASTNameTypePair>();
84/// We use `formatWithPossiblyHidingSensitiveData` instead of `getColumnNameWithoutAlias` because `column->type` is an ASTFunction.
85/// `getColumnNameWithoutAlias` will return name of the function with `(arguments)` even if arguments is empty.
86if (column)
87structure.emplace_back(column->name, column->type->formatWithPossiblyHidingSensitiveData(0, true, true));
88else
89throw Exception(ErrorCodes::BAD_ARGUMENTS, "Error while parsing table structure: expected column definition, got {}", child->formatForErrorMessage());
90}
91}
92
93void BaseExternalTable::parseStructureFromTypesField(const std::string & argument)
94{
95ParserTypeList parser;
96const auto * pos = argument.data();
97String error;
98ASTPtr type_list_raw = tryParseQuery(parser, pos, pos+argument.size(), error, false, "", false, DBMS_DEFAULT_MAX_QUERY_SIZE, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS, true);
99
100if (!type_list_raw)
101throw Exception(ErrorCodes::BAD_ARGUMENTS, "Error while parsing table structure: {}", error);
102
103for (size_t i = 0; i < type_list_raw->children.size(); ++i)
104structure.emplace_back("_" + toString(i + 1), type_list_raw->children[i]->formatWithPossiblyHidingSensitiveData(0, true, true));
105}
106
107void BaseExternalTable::initSampleBlock()
108{
109const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
110
111for (const auto & elem : structure)
112{
113ColumnWithTypeAndName column;
114column.name = elem.first;
115column.type = data_type_factory.get(elem.second);
116column.column = column.type->createColumn();
117sample_block.insert(std::move(column));
118}
119}
120
121
122void ExternalTable::initReadBuffer()
123{
124if (file == "-")
125read_buffer = std::make_unique<ReadBufferFromFileDescriptor>(STDIN_FILENO);
126else
127read_buffer = std::make_unique<ReadBufferFromFile>(file);
128}
129
130ExternalTable::ExternalTable(const boost::program_options::variables_map & external_options)
131{
132if (external_options.count("file"))
133file = external_options["file"].as<std::string>();
134else
135throw Exception(ErrorCodes::BAD_ARGUMENTS, "--file field have not been provided for external table");
136
137if (external_options.count("name"))
138name = external_options["name"].as<std::string>();
139else
140throw Exception(ErrorCodes::BAD_ARGUMENTS, "--name field have not been provided for external table");
141
142if (external_options.count("format"))
143format = external_options["format"].as<std::string>();
144else
145throw Exception(ErrorCodes::BAD_ARGUMENTS, "--format field have not been provided for external table");
146
147if (external_options.count("structure"))
148parseStructureFromStructureField(external_options["structure"].as<std::string>());
149else if (external_options.count("types"))
150parseStructureFromTypesField(external_options["types"].as<std::string>());
151else
152throw Exception(ErrorCodes::BAD_ARGUMENTS, "Neither --structure nor --types have not been provided for external table");
153}
154
155
156void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header, ReadBuffer & stream)
157{
158/// After finishing this function we will be ready to receive the next file, for this we clear all the information received.
159/// We should use SCOPE_EXIT because read_buffer should be reset correctly if there will be an exception.
160SCOPE_EXIT(clear());
161
162const Settings & settings = getContext()->getSettingsRef();
163
164if (settings.http_max_multipart_form_data_size)
165read_buffer = std::make_unique<LimitReadBuffer>(
166stream, settings.http_max_multipart_form_data_size,
167/* trow_exception */ true, /* exact_limit */ std::optional<size_t>(),
168"the maximum size of multipart/form-data. "
169"This limit can be tuned by 'http_max_multipart_form_data_size' setting");
170else
171read_buffer = wrapReadBufferReference(stream);
172
173/// Retrieve a collection of parameters from MessageHeader
174Poco::Net::NameValueCollection content;
175std::string label;
176Poco::Net::MessageHeader::splitParameters(header.get("Content-Disposition"), label, content);
177
178/// Get parameters
179name = content.get("name", "_data");
180format = params.get(name + "_format", "TabSeparated");
181
182if (params.has(name + "_structure"))
183parseStructureFromStructureField(params.get(name + "_structure"));
184else if (params.has(name + "_types"))
185parseStructureFromTypesField(params.get(name + "_types"));
186else
187throw Exception(ErrorCodes::BAD_ARGUMENTS,
188"Neither structure nor types have not been provided for external table {}. "
189"Use fields {}_structure or {}_types to do so.", name, name, name);
190
191ExternalTableDataPtr data = getData(getContext());
192
193/// Create table
194NamesAndTypesList columns = sample_block.getNamesAndTypesList();
195auto temporary_table = TemporaryTableHolder(getContext(), ColumnsDescription{columns}, {});
196auto storage = temporary_table.getTable();
197getContext()->addExternalTable(data->table_name, std::move(temporary_table));
198auto sink = storage->write(ASTPtr(), storage->getInMemoryMetadataPtr(), getContext(), /*async_insert=*/false);
199
200/// Write data
201auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*data->pipe));
202pipeline.complete(std::move(sink));
203pipeline.setNumThreads(1);
204
205CompletedPipelineExecutor executor(pipeline);
206executor.execute();
207}
208
209}
210