ClickHouse
150 строк · 4.2 Кб
1#include <Planner/ActionsChain.h>
2
3#include <boost/algorithm/string/split.hpp>
4#include <boost/algorithm/string/join.hpp>
5
6#include <IO/WriteBuffer.h>
7#include <IO/WriteHelpers.h>
8#include <IO/Operators.h>
9#include <IO/WriteBufferFromString.h>
10
11namespace DB
12{
13
14ActionsChainStep::ActionsChainStep(ActionsDAGPtr actions_,
15bool use_actions_nodes_as_output_columns_,
16ColumnsWithTypeAndName additional_output_columns_)
17: actions(std::move(actions_))
18, use_actions_nodes_as_output_columns(use_actions_nodes_as_output_columns_)
19, additional_output_columns(std::move(additional_output_columns_))
20{
21initialize();
22}
23
24void ActionsChainStep::finalizeInputAndOutputColumns(const NameSet & child_input_columns)
25{
26child_required_output_columns_names.clear();
27
28auto child_input_columns_copy = child_input_columns;
29
30std::unordered_set<std::string_view> output_nodes_names;
31output_nodes_names.reserve(actions->getOutputs().size());
32
33for (auto & output_node : actions->getOutputs())
34output_nodes_names.insert(output_node->result_name);
35
36for (const auto & node : actions->getNodes())
37{
38auto it = child_input_columns_copy.find(node.result_name);
39if (it == child_input_columns_copy.end())
40continue;
41
42child_input_columns_copy.erase(it);
43child_required_output_columns_names.insert(node.result_name);
44
45if (output_nodes_names.contains(node.result_name))
46continue;
47
48actions->getOutputs().push_back(&node);
49output_nodes_names.insert(node.result_name);
50}
51
52actions->removeUnusedActions();
53/// TODO: Analyzer fix ActionsDAG input and constant nodes with same name
54actions->projectInput();
55initialize();
56}
57
58void ActionsChainStep::dump(WriteBuffer & buffer) const
59{
60buffer << "DAG" << '\n';
61buffer << actions->dumpDAG();
62
63if (!available_output_columns.empty())
64{
65buffer << "Available output columns " << available_output_columns.size() << '\n';
66for (const auto & column : available_output_columns)
67buffer << "Name " << column.name << " type " << column.type->getName() << '\n';
68}
69
70if (!child_required_output_columns_names.empty())
71{
72buffer << "Child required output columns " << boost::join(child_required_output_columns_names, ", ");
73buffer << '\n';
74}
75}
76
77String ActionsChainStep::dump() const
78{
79WriteBufferFromOwnString buffer;
80dump(buffer);
81
82return buffer.str();
83}
84
85void ActionsChainStep::initialize()
86{
87auto required_columns_names = actions->getRequiredColumnsNames();
88input_columns_names = NameSet(required_columns_names.begin(), required_columns_names.end());
89
90available_output_columns.clear();
91
92if (use_actions_nodes_as_output_columns)
93{
94std::unordered_set<std::string_view> available_output_columns_names;
95
96for (const auto & node : actions->getNodes())
97{
98if (available_output_columns_names.contains(node.result_name))
99continue;
100
101available_output_columns.emplace_back(node.column, node.result_type, node.result_name);
102available_output_columns_names.insert(node.result_name);
103}
104}
105
106available_output_columns.insert(available_output_columns.end(), additional_output_columns.begin(), additional_output_columns.end());
107}
108
109void ActionsChain::finalize()
110{
111if (steps.empty())
112return;
113
114/// For last chain step there are no columns required in child nodes
115NameSet empty_child_input_columns;
116steps.back().get()->finalizeInputAndOutputColumns(empty_child_input_columns);
117
118Int64 steps_last_index = steps.size() - 1;
119for (Int64 i = steps_last_index; i >= 1; --i)
120{
121auto & current_step = steps[i];
122auto & previous_step = steps[i - 1];
123
124previous_step->finalizeInputAndOutputColumns(current_step->getInputColumnNames());
125}
126}
127
128void ActionsChain::dump(WriteBuffer & buffer) const
129{
130size_t steps_size = steps.size();
131
132for (size_t i = 0; i < steps_size; ++i)
133{
134const auto & step = steps[i];
135buffer << "Step " << i << '\n';
136step->dump(buffer);
137
138buffer << '\n';
139}
140}
141
142String ActionsChain::dump() const
143{
144WriteBufferFromOwnString buffer;
145dump(buffer);
146
147return buffer.str();
148}
149
150}
151