ClickHouse

Форк
0
/
SQLiteSource.cpp 
161 строка · 5.3 Кб
1
#include "SQLiteSource.h"
2

3
#if USE_SQLITE
4
#include <base/range.h>
5
#include <Common/logger_useful.h>
6
#include <Common/assert_cast.h>
7

8
#include <Columns/ColumnArray.h>
9
#include <Columns/ColumnDecimal.h>
10
#include <Columns/ColumnNullable.h>
11
#include <Columns/ColumnString.h>
12
#include <Columns/ColumnsNumber.h>
13

14
#include <DataTypes/DataTypeNullable.h>
15

16

17
namespace DB
18
{
19

20
namespace ErrorCodes
21
{
22
    extern const int SQLITE_ENGINE_ERROR;
23
}
24

25
SQLiteSource::SQLiteSource(
26
    SQLitePtr sqlite_db_,
27
    const String & query_str_,
28
    const Block & sample_block,
29
    const UInt64 max_block_size_)
30
    : ISource(sample_block.cloneEmpty())
31
    , query_str(query_str_)
32
    , max_block_size(max_block_size_)
33
    , sqlite_db(std::move(sqlite_db_))
34
{
35
    description.init(sample_block);
36

37
    sqlite3_stmt * compiled_stmt = nullptr;
38
    int status = sqlite3_prepare_v2(
39
        sqlite_db.get(),
40
        query_str.c_str(),
41
        static_cast<int>(query_str.size() + 1),
42
        &compiled_stmt, nullptr);
43

44
    if (status != SQLITE_OK)
45
        throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
46
                        "Cannot prepare sqlite statement. Status: {}. Message: {}",
47
                        status, sqlite3_errstr(status));
48

49
    compiled_statement = std::unique_ptr<sqlite3_stmt, StatementDeleter>(compiled_stmt, StatementDeleter());
50
}
51

52
Chunk SQLiteSource::generate()
53
{
54
    if (!compiled_statement)
55
        return {};
56

57
    MutableColumns columns = description.sample_block.cloneEmptyColumns();
58
    size_t num_rows = 0;
59

60
    while (true)
61
    {
62
        int status = sqlite3_step(compiled_statement.get());
63

64
        if (status == SQLITE_BUSY)
65
        {
66
            continue;
67
        }
68
        else if (status == SQLITE_DONE)
69
        {
70
            compiled_statement.reset();
71
            break;
72
        }
73
        else if (status != SQLITE_ROW)
74
        {
75
            throw Exception(ErrorCodes::SQLITE_ENGINE_ERROR,
76
                "Expected SQLITE_ROW status, but got status {}. Error: {}, Message: {}",
77
                status, sqlite3_errstr(status), sqlite3_errmsg(sqlite_db.get()));
78
        }
79

80
        int column_count = sqlite3_column_count(compiled_statement.get());
81

82
        for (int column_index = 0; column_index < column_count; ++column_index)
83
        {
84
            if (sqlite3_column_type(compiled_statement.get(), column_index) == SQLITE_NULL)
85
            {
86
                columns[column_index]->insertDefault();
87
                continue;
88
            }
89

90
            auto & [type, is_nullable] = description.types[column_index];
91
            if (is_nullable)
92
            {
93
                ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[column_index]);
94
                insertValue(column_nullable.getNestedColumn(), type, column_index);
95
                column_nullable.getNullMapData().emplace_back(0);
96
            }
97
            else
98
            {
99
                insertValue(*columns[column_index], type, column_index);
100
            }
101
        }
102

103
        if (++num_rows == max_block_size)
104
            break;
105
    }
106

107
    if (num_rows == 0)
108
    {
109
        compiled_statement.reset();
110
        return {};
111
    }
112

113
    return Chunk(std::move(columns), num_rows);
114
}
115

116
void SQLiteSource::insertValue(IColumn & column, ExternalResultDescription::ValueType type, int idx)
117
{
118
    switch (type)
119
    {
120
        case ValueType::vtUInt8:
121
            assert_cast<ColumnUInt8 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
122
            break;
123
        case ValueType::vtUInt16:
124
            assert_cast<ColumnUInt16 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
125
            break;
126
        case ValueType::vtUInt32:
127
            assert_cast<ColumnUInt32 &>(column).insertValue(static_cast<UInt32>(sqlite3_column_int64(compiled_statement.get(), idx)));
128
            break;
129
        case ValueType::vtUInt64:
130
            /// There is no uint64 in sqlite3, only int and int64
131
            assert_cast<ColumnUInt64 &>(column).insertValue(sqlite3_column_int64(compiled_statement.get(), idx));
132
            break;
133
        case ValueType::vtInt8:
134
            assert_cast<ColumnInt8 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
135
            break;
136
        case ValueType::vtInt16:
137
            assert_cast<ColumnInt16 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
138
            break;
139
        case ValueType::vtInt32:
140
            assert_cast<ColumnInt32 &>(column).insertValue(sqlite3_column_int(compiled_statement.get(), idx));
141
            break;
142
        case ValueType::vtInt64:
143
            assert_cast<ColumnInt64 &>(column).insertValue(sqlite3_column_int64(compiled_statement.get(), idx));
144
            break;
145
        case ValueType::vtFloat32:
146
            assert_cast<ColumnFloat32 &>(column).insertValue(static_cast<Float32>(sqlite3_column_double(compiled_statement.get(), idx)));
147
            break;
148
        case ValueType::vtFloat64:
149
            assert_cast<ColumnFloat64 &>(column).insertValue(sqlite3_column_double(compiled_statement.get(), idx));
150
            break;
151
        default:
152
            const char * data = reinterpret_cast<const char *>(sqlite3_column_text(compiled_statement.get(), idx));
153
            int len = sqlite3_column_bytes(compiled_statement.get(), idx);
154
            assert_cast<ColumnString &>(column).insertData(data, len);
155
            break;
156
    }
157
}
158

159
}
160

161
#endif
162

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.