ClickHouse
161 строка · 5.3 Кб
1#include "SQLiteSource.h"
2
3#if USE_SQLITE
4#include <base/range.h>
5#include <Common/logger_useful.h>
6#include <Common/assert_cast.h>
7
8#include <Columns/ColumnArray.h>
9#include <Columns/ColumnDecimal.h>
10#include <Columns/ColumnNullable.h>
11#include <Columns/ColumnString.h>
12#include <Columns/ColumnsNumber.h>
13
14#include <DataTypes/DataTypeNullable.h>
15
16
17namespace DB
18{
19
20namespace ErrorCodes
21{
22extern const int SQLITE_ENGINE_ERROR;
23}
24
25SQLiteSource::SQLiteSource(
26SQLitePtr sqlite_db_,
27const String & query_str_,
28const Block & sample_block,
29const UInt64 max_block_size_)
30: ISource(sample_block.cloneEmpty())
31, query_str(query_str_)
32, max_block_size(max_block_size_)
33, sqlite_db(std::move(sqlite_db_))
34{
35description.init(sample_block);
36
37sqlite3_stmt * compiled_stmt = nullptr;
38int status = sqlite3_prepare_v2(
39sqlite_db.get(),
40query_str.c_str(),
41static_cast<int>(query_str.size() + 1),
42&compiled_stmt, nullptr);
43
44if (status != SQLITE_OK)
45throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
46"Cannot prepare sqlite statement. Status: {}. Message: {}",
47status, sqlite3_errstr(status));
48
49compiled_statement = std::unique_ptr<sqlite3_stmt, StatementDeleter>(compiled_stmt, StatementDeleter());
50}
51
52Chunk SQLiteSource::generate()
53{
54if (!compiled_statement)
55return {};
56
57MutableColumns columns = description.sample_block.cloneEmptyColumns();
58size_t num_rows = 0;
59
60while (true)
61{
62int status = sqlite3_step(compiled_statement.get());
63
64if (status == SQLITE_BUSY)
65{
66continue;
67}
68else if (status == SQLITE_DONE)
69{
70compiled_statement.reset();
71break;
72}
73else if (status != SQLITE_ROW)
74{
75throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
76"Expected SQLITE_ROW status, but got status {}. Error: {}, Message: {}",
77status, sqlite3_errstr(status), sqlite3_errmsg(sqlite_db.get()));
78}
79
80int column_count = sqlite3_column_count(compiled_statement.get());
81
82for (int column_index = 0; column_index < column_count; ++column_index)
83{
84if (sqlite3_column_type(compiled_statement.get(), column_index) == SQLITE_NULL)
85{
86columns[column_index]->insertDefault();
87continue;
88}
89
90auto & [type, is_nullable] = description.types[column_index];
91if (is_nullable)
92{
93ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[column_index]);
94insertValue(column_nullable.getNestedColumn(), type, column_index);
95column_nullable.getNullMapData().emplace_back(0);
96}
97else
98{
99insertValue(*columns[column_index], type, column_index);
100}
101}
102
103if (++num_rows == max_block_size)
104break;
105}
106
107if (num_rows == 0)
108{
109compiled_statement.reset();
110return {};
111}
112
113return Chunk(std::move(columns), num_rows);
114}
115
116void SQLiteSource::insertValue(IColumn & column, ExternalResultDescription::ValueType type, int idx)
117{
118switch (type)
119{
120case ValueType::vtUInt8:
121assert_cast<ColumnUInt8 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
122break;
123case ValueType::vtUInt16:
124assert_cast<ColumnUInt16 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
125break;
126case ValueType::vtUInt32:
127assert_cast<ColumnUInt32 &>(column).insertValue(static_cast<UInt32>(sqlite3_column_int64(compiled_statement.get(), idx)));
128break;
129case ValueType::vtUInt64:
130/// There is no uint64 in sqlite3, only int and int64
131assert_cast<ColumnUInt64 &>(column).insertValue(sqlite3_column_int64(compiled_statement.get(), idx));
132break;
133case ValueType::vtInt8:
134assert_cast<ColumnInt8 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
135break;
136case ValueType::vtInt16:
137assert_cast<ColumnInt16 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
138break;
139case ValueType::vtInt32:
140assert_cast<ColumnInt32 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
141break;
142case ValueType::vtInt64:
143assert_cast<ColumnInt64 &>(column).insertValue(sqlite3_column_int64(compiled_statement.get(), idx));
144break;
145case ValueType::vtFloat32:
146assert_cast<ColumnFloat32 &>(column).insertValue(static_cast<Float32>(sqlite3_column_double(compiled_statement.get(), idx)));
147break;
148case ValueType::vtFloat64:
149assert_cast<ColumnFloat64 &>(column).insertValue(sqlite3_column_double(compiled_statement.get(), idx));
150break;
151default:
152const char * data = reinterpret_cast<const char *>(sqlite3_column_text(compiled_statement.get(), idx));
153int len = sqlite3_column_bytes(compiled_statement.get(), idx);
154assert_cast<ColumnString &>(column).insertData(data, len);
155break;
156}
157}
158
159}
160
161#endif
162