Ton

Форк
0
/
RocksDb.cpp 
274 строки · 8.1 Кб
1
/*
2
    This file is part of TON Blockchain Library.
3

4
    TON Blockchain Library is free software: you can redistribute it and/or modify
5
    it under the terms of the GNU Lesser General Public License as published by
6
    the Free Software Foundation, either version 2 of the License, or
7
    (at your option) any later version.
8

9
    TON Blockchain Library is distributed in the hope that it will be useful,
10
    but WITHOUT ANY WARRANTY; without even the implied warranty of
11
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
    GNU Lesser General Public License for more details.
13

14
    You should have received a copy of the GNU Lesser General Public License
15
    along with TON Blockchain Library.  If not, see <http://www.gnu.org/licenses/>.
16

17
    Copyright 2017-2020 Telegram Systems LLP
18
*/
19
#include "td/db/RocksDb.h"
20

21
#include "rocksdb/db.h"
22
#include "rocksdb/table.h"
23
#include "rocksdb/statistics.h"
24
#include "rocksdb/write_batch.h"
25
#include "rocksdb/utilities/optimistic_transaction_db.h"
26
#include "rocksdb/utilities/transaction.h"
27

28
namespace td {
29
namespace {
30
static Status from_rocksdb(rocksdb::Status status) {
31
  if (status.ok()) {
32
    return Status::OK();
33
  }
34
  return Status::Error(status.ToString());
35
}
36
static Slice from_rocksdb(rocksdb::Slice slice) {
37
  return Slice(slice.data(), slice.size());
38
}
39
static rocksdb::Slice to_rocksdb(Slice slice) {
40
  return rocksdb::Slice(slice.data(), slice.size());
41
}
42
}  // namespace
43

44
Status RocksDb::destroy(Slice path) {
45
  return from_rocksdb(rocksdb::DestroyDB(path.str(), {}));
46
}
47

48
RocksDb::RocksDb(RocksDb &&) = default;
49
RocksDb &RocksDb::operator=(RocksDb &&) = default;
50

51
RocksDb::~RocksDb() {
52
  if (!db_) {
53
    return;
54
  }
55
  end_snapshot().ensure();
56
}
57

58
RocksDb RocksDb::clone() const {
59
  return RocksDb{db_, options_};
60
}
61

62
Result<RocksDb> RocksDb::open(std::string path, RocksDbOptions options) {
63
  rocksdb::OptimisticTransactionDB *db;
64
  {
65
    rocksdb::Options db_options;
66

67
    static auto default_cache = rocksdb::NewLRUCache(1 << 30);
68
    if (options.block_cache == nullptr) {
69
      options.block_cache = default_cache;
70
    }
71

72
    rocksdb::BlockBasedTableOptions table_options;
73
    table_options.block_cache = options.block_cache;
74
    db_options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
75

76
    db_options.use_direct_reads = options.use_direct_reads;
77
    db_options.manual_wal_flush = true;
78
    db_options.create_if_missing = true;
79
    db_options.max_background_compactions = 4;
80
    db_options.max_background_flushes = 2;
81
    db_options.bytes_per_sync = 1 << 20;
82
    db_options.writable_file_max_buffer_size = 2 << 14;
83
    db_options.statistics = options.statistics;
84
    rocksdb::OptimisticTransactionDBOptions occ_options;
85
    occ_options.validate_policy = rocksdb::OccValidationPolicy::kValidateSerial;
86
    rocksdb::ColumnFamilyOptions cf_options(db_options);
87
    std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
88
    column_families.push_back(rocksdb::ColumnFamilyDescriptor(rocksdb::kDefaultColumnFamilyName, cf_options));
89
    std::vector<rocksdb::ColumnFamilyHandle *> handles;
90
    TRY_STATUS(from_rocksdb(rocksdb::OptimisticTransactionDB::Open(db_options, occ_options, std::move(path),
91
                                                                   column_families, &handles, &db)));
92
    CHECK(handles.size() == 1);
93
    // i can delete the handle since DBImpl is always holding a reference to
94
    // default column family
95
    delete handles[0];
96
  }
97
  return RocksDb(std::shared_ptr<rocksdb::OptimisticTransactionDB>(db), std::move(options));
98
}
99

100
std::shared_ptr<rocksdb::Statistics> RocksDb::create_statistics() {
101
  return rocksdb::CreateDBStatistics();
102
}
103

104
std::string RocksDb::statistics_to_string(const std::shared_ptr<rocksdb::Statistics> statistics) {
105
  return statistics->ToString();
106
}
107

108
void RocksDb::reset_statistics(const std::shared_ptr<rocksdb::Statistics> statistics) {
109
  statistics->Reset();
110
}
111

112
std::shared_ptr<rocksdb::Cache> RocksDb::create_cache(size_t capacity) {
113
  return rocksdb::NewLRUCache(capacity);
114
}
115

116
std::unique_ptr<KeyValueReader> RocksDb::snapshot() {
117
  auto res = std::make_unique<RocksDb>(clone());
118
  res->begin_snapshot().ensure();
119
  return std::move(res);
120
}
121

122
std::string RocksDb::stats() const {
123
  std::string out;
124
  db_->GetProperty("rocksdb.stats", &out);
125
  //db_->GetProperty("rocksdb.cur-size-all-mem-tables", &out);
126
  return out;
127
}
128

129
Result<RocksDb::GetStatus> RocksDb::get(Slice key, std::string &value) {
130
  //LOG(ERROR) << "GET";
131
  rocksdb::Status status;
132
  if (snapshot_) {
133
    rocksdb::ReadOptions options;
134
    options.snapshot = snapshot_.get();
135
    status = db_->Get(options, to_rocksdb(key), &value);
136
  } else if (transaction_) {
137
    status = transaction_->Get({}, to_rocksdb(key), &value);
138
  } else {
139
    status = db_->Get({}, to_rocksdb(key), &value);
140
  }
141
  if (status.ok()) {
142
    return GetStatus::Ok;
143
  }
144
  if (status.code() == rocksdb::Status::kNotFound) {
145
    return GetStatus::NotFound;
146
  }
147
  return from_rocksdb(status);
148
}
149

150
Status RocksDb::set(Slice key, Slice value) {
151
  if (write_batch_) {
152
    return from_rocksdb(write_batch_->Put(to_rocksdb(key), to_rocksdb(value)));
153
  }
154
  if (transaction_) {
155
    return from_rocksdb(transaction_->Put(to_rocksdb(key), to_rocksdb(value)));
156
  }
157
  return from_rocksdb(db_->Put({}, to_rocksdb(key), to_rocksdb(value)));
158
}
159

160
Status RocksDb::erase(Slice key) {
161
  if (write_batch_) {
162
    return from_rocksdb(write_batch_->Delete(to_rocksdb(key)));
163
  }
164
  if (transaction_) {
165
    return from_rocksdb(transaction_->Delete(to_rocksdb(key)));
166
  }
167
  return from_rocksdb(db_->Delete({}, to_rocksdb(key)));
168
}
169

170
Result<size_t> RocksDb::count(Slice prefix) {
171
  rocksdb::ReadOptions options;
172
  options.snapshot = snapshot_.get();
173
  std::unique_ptr<rocksdb::Iterator> iterator;
174
  if (snapshot_ || !transaction_) {
175
    iterator.reset(db_->NewIterator(options));
176
  } else {
177
    iterator.reset(transaction_->GetIterator(options));
178
  }
179

180
  size_t res = 0;
181
  for (iterator->Seek(to_rocksdb(prefix)); iterator->Valid(); iterator->Next()) {
182
    if (from_rocksdb(iterator->key()).truncate(prefix.size()) != prefix) {
183
      break;
184
    }
185
    res++;
186
  }
187
  if (!iterator->status().ok()) {
188
    return from_rocksdb(iterator->status());
189
  }
190
  return res;
191
}
192

193
Status RocksDb::for_each(std::function<Status(Slice, Slice)> f) {
194
  rocksdb::ReadOptions options;
195
  options.snapshot = snapshot_.get();
196
  std::unique_ptr<rocksdb::Iterator> iterator;
197
  if (snapshot_ || !transaction_) {
198
    iterator.reset(db_->NewIterator(options));
199
  } else {
200
    iterator.reset(transaction_->GetIterator(options));
201
  }
202

203
  iterator->SeekToFirst();
204
  for (; iterator->Valid(); iterator->Next()) {
205
    auto key = from_rocksdb(iterator->key());
206
    auto value = from_rocksdb(iterator->value());
207
    TRY_STATUS(f(key, value));
208
  }
209
  if (!iterator->status().ok()) {
210
    return from_rocksdb(iterator->status());
211
  }
212
  return Status::OK();
213
}
214

215
Status RocksDb::begin_write_batch() {
216
  CHECK(!transaction_);
217
  write_batch_ = std::make_unique<rocksdb::WriteBatch>();
218
  return Status::OK();
219
}
220

221
Status RocksDb::begin_transaction() {
222
  CHECK(!write_batch_);
223
  rocksdb::WriteOptions options;
224
  options.sync = true;
225
  transaction_.reset(db_->BeginTransaction(options, {}));
226
  return Status::OK();
227
}
228

229
Status RocksDb::commit_write_batch() {
230
  CHECK(write_batch_);
231
  auto write_batch = std::move(write_batch_);
232
  rocksdb::WriteOptions options;
233
  options.sync = true;
234
  return from_rocksdb(db_->Write(options, write_batch.get()));
235
}
236

237
Status RocksDb::commit_transaction() {
238
  CHECK(transaction_);
239
  auto transaction = std::move(transaction_);
240
  return from_rocksdb(transaction->Commit());
241
}
242

243
Status RocksDb::abort_write_batch() {
244
  CHECK(write_batch_);
245
  write_batch_.reset();
246
  return Status::OK();
247
}
248

249
Status RocksDb::abort_transaction() {
250
  CHECK(transaction_);
251
  transaction_.reset();
252
  return Status::OK();
253
}
254

255
Status RocksDb::flush() {
256
  return from_rocksdb(db_->Flush({}));
257
}
258

259
Status RocksDb::begin_snapshot() {
260
  snapshot_.reset(db_->GetSnapshot());
261
  return td::Status::OK();
262
}
263

264
Status RocksDb::end_snapshot() {
265
  if (snapshot_) {
266
    db_->ReleaseSnapshot(snapshot_.release());
267
  }
268
  return td::Status::OK();
269
}
270

271
RocksDb::RocksDb(std::shared_ptr<rocksdb::OptimisticTransactionDB> db, RocksDbOptions options)
272
    : db_(std::move(db)), options_(options) {
273
}
274
}  // namespace td
275

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.