Skip to content

Commit

Permalink
Avoid manually releasing DB pointer via unique_ptr (#1487)
Browse files Browse the repository at this point in the history
  • Loading branch information
PragmaTwice authored Jun 7, 2023
1 parent c471fa5 commit f557255
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 33 deletions.
63 changes: 63 additions & 0 deletions src/common/db_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "rocksdb/db.h"
#include "rocksdb/iterator.h"
#include "rocksdb/utilities/backup_engine.h"
#include "storage/storage.h"

namespace util {
Expand All @@ -39,4 +40,66 @@ struct UniqueIterator : std::unique_ptr<rocksdb::Iterator> {
: BaseType(storage->NewIterator(options)) {}
};

namespace details {

template <typename T, auto* F, Status::Code C = Status::NotOK, typename... Args>
StatusOr<std::unique_ptr<T>> WrapOutPtrToUnique(Args&&... args) {
T* ptr = nullptr;
auto s = (*F)(std::forward<Args>(args)..., &ptr);

if (!s.ok()) {
return {C, s.ToString()};
}

return ptr;
}

inline rocksdb::Status DBOpenForReadOnly(const rocksdb::DBOptions& db_options, const std::string& dbname,
const std::vector<rocksdb::ColumnFamilyDescriptor>& column_families,
std::vector<rocksdb::ColumnFamilyHandle*>* handles, rocksdb::DB** dbptr) {
return rocksdb::DB::OpenForReadOnly(db_options, dbname, column_families, handles, dbptr);
}

} // namespace details

inline StatusOr<std::unique_ptr<rocksdb::DB>> DBOpen(const rocksdb::Options& options, const std::string& dbname) {
return details::WrapOutPtrToUnique<
rocksdb::DB,
static_cast<rocksdb::Status (*)(const rocksdb::Options&, const std::string&, rocksdb::DB**)>(rocksdb::DB::Open),
Status::DBOpenErr>(options, dbname);
}

inline StatusOr<std::unique_ptr<rocksdb::DB>> DBOpen(
const rocksdb::DBOptions& db_options, const std::string& dbname,
const std::vector<rocksdb::ColumnFamilyDescriptor>& column_families,
std::vector<rocksdb::ColumnFamilyHandle*>* handles) {
return details::WrapOutPtrToUnique<
rocksdb::DB,
static_cast<rocksdb::Status (*)(const rocksdb::DBOptions&, const std::string&,
const std::vector<rocksdb::ColumnFamilyDescriptor>&,
std::vector<rocksdb::ColumnFamilyHandle*>*, rocksdb::DB**)>(rocksdb::DB::Open),
Status::DBOpenErr>(db_options, dbname, column_families, handles);
}

inline StatusOr<std::unique_ptr<rocksdb::DB>> DBOpenForReadOnly(
const rocksdb::DBOptions& db_options, const std::string& dbname,
const std::vector<rocksdb::ColumnFamilyDescriptor>& column_families,
std::vector<rocksdb::ColumnFamilyHandle*>* handles) {
return details::WrapOutPtrToUnique<
rocksdb::DB,
static_cast<rocksdb::Status (*)(
const rocksdb::DBOptions&, const std::string&, const std::vector<rocksdb::ColumnFamilyDescriptor>&,
std::vector<rocksdb::ColumnFamilyHandle*>*, rocksdb::DB**)>(details::DBOpenForReadOnly),
Status::DBOpenErr>(db_options, dbname, column_families, handles);
}

inline StatusOr<std::unique_ptr<rocksdb::BackupEngine>> BackupEngineOpen(rocksdb::Env* db_env,
const rocksdb::BackupEngineOptions& options) {
return details::WrapOutPtrToUnique<
rocksdb::BackupEngine,
static_cast<rocksdb::IOStatus (*)(rocksdb::Env*, const rocksdb::BackupEngineOptions&, rocksdb::BackupEngine**)>(
rocksdb::BackupEngine::Open),
Status::DBBackupErr>(db_env, options);
}

} // namespace util
54 changes: 24 additions & 30 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <random>

#include "compact_filter.h"
#include "db_util.h"
#include "event_listener.h"
#include "event_util.h"
#include "redis_db.h"
Expand Down Expand Up @@ -72,9 +73,8 @@ void Storage::CloseDB() {

db_closing_ = true;
db_->SyncWAL();
rocksdb::CancelAllBackgroundWork(db_, true);
rocksdb::CancelAllBackgroundWork(db_.get(), true);
for (auto handle : cf_handles_) db_->DestroyColumnFamilyHandle(handle);
delete db_;
db_ = nullptr;
}

Expand Down Expand Up @@ -200,36 +200,31 @@ Status Storage::SetDBOption(const std::string &key, const std::string &value) {
}

Status Storage::CreateColumnFamilies(const rocksdb::Options &options) {
rocksdb::DB *tmp_db = nullptr;
rocksdb::ColumnFamilyOptions cf_options(options);
rocksdb::Status s = rocksdb::DB::Open(options, config_->db_dir, &tmp_db);
if (s.ok()) {
auto res = util::DBOpen(options, config_->db_dir);
if (res) {
std::vector<std::string> cf_names = {kMetadataColumnFamilyName, kZSetScoreColumnFamilyName, kPubSubColumnFamilyName,
kPropagateColumnFamilyName, kStreamColumnFamilyName};
std::vector<rocksdb::ColumnFamilyHandle *> cf_handles;
s = tmp_db->CreateColumnFamilies(cf_options, cf_names, &cf_handles);
auto s = (*res)->CreateColumnFamilies(cf_options, cf_names, &cf_handles);
if (!s.ok()) {
delete tmp_db;
return {Status::DBOpenErr, s.ToString()};
}

for (auto handle : cf_handles) tmp_db->DestroyColumnFamilyHandle(handle);
tmp_db->Close();
delete tmp_db;
}

if (!s.ok()) {
for (auto handle : cf_handles) (*res)->DestroyColumnFamilyHandle(handle);
(*res)->Close();
} else {
// We try to create column families by opening the database without column families.
// If it's ok means we didn't create column families (cannot open without column families if created).
// When goes wrong, we need to check whether it's caused by column families NOT being opened or not.
// If the status message contains `Column families not opened` means that we have created the column
// families, let's ignore the error.
std::string not_opened_prefix = "Column families not opened";
if (s.IsInvalidArgument() && s.ToString().find(not_opened_prefix) != std::string::npos) {
const char *not_opened_prefix = "Column families not opened";
if (res.Msg().find(not_opened_prefix) != std::string::npos) {
return Status::OK();
}

return {Status::NotOK, s.ToString()};
return res;
}

return Status::OK();
Expand Down Expand Up @@ -314,18 +309,16 @@ Status Storage::Open(bool read_only) {
if (!s.ok()) return {Status::NotOK, s.ToString()};

auto start = std::chrono::high_resolution_clock::now();
if (read_only) {
s = rocksdb::DB::OpenForReadOnly(options, config_->db_dir, column_families, &cf_handles_, &db_);
} else {
s = rocksdb::DB::Open(options, config_->db_dir, column_families, &cf_handles_, &db_);
}
auto dbs = read_only ? util::DBOpenForReadOnly(options, config_->db_dir, column_families, &cf_handles_)
: util::DBOpen(options, config_->db_dir, column_families, &cf_handles_);
auto end = std::chrono::high_resolution_clock::now();
int64_t duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
if (!s.ok()) {
LOG(INFO) << "[storage] Failed to load the data from disk: " << duration << " ms";
return {Status::DBOpenErr, s.ToString()};
}

db_ = std::move(*dbs);
LOG(INFO) << "[storage] Success to load the data from disk: " << duration << " ms";
return Status::OK();
}
Expand All @@ -341,7 +334,7 @@ Status Storage::CreateBackup() {

// 1) Create checkpoint of rocksdb for backup
rocksdb::Checkpoint *checkpoint = nullptr;
rocksdb::Status s = rocksdb::Checkpoint::Create(db_, &checkpoint);
rocksdb::Status s = rocksdb::Checkpoint::Create(db_.get(), &checkpoint);
if (!s.ok()) {
LOG(WARNING) << "Failed to create checkpoint object for backup. Error: " << s.ToString();
return {Status::NotOK, s.ToString()};
Expand Down Expand Up @@ -382,18 +375,18 @@ void Storage::DestroyBackup() {
return;
}
backup_->StopBackup();
delete backup_;
backup_ = nullptr;
}

Status Storage::RestoreFromBackup() {
// TODO(@ruoshan): assert role to be slave
// We must reopen the backup engine every time, as the files is changed
rocksdb::BackupEngineOptions bk_option(config_->backup_sync_dir);
auto s = rocksdb::BackupEngine::Open(db_->GetEnv(), bk_option, &backup_);
if (!s.ok()) return {Status::DBBackupErr, s.ToString()};
auto bes = util::BackupEngineOpen(db_->GetEnv(), bk_option);
if (!bes) return bes;
backup_ = std::move(*bes);

s = backup_->RestoreDBFromLatestBackup(config_->db_dir, config_->db_dir);
auto s = backup_->RestoreDBFromLatestBackup(config_->db_dir, config_->db_dir);
if (!s.ok()) {
LOG(ERROR) << "[storage] Failed to restore database from the latest backup. Error: " << s.ToString();
} else {
Expand Down Expand Up @@ -520,7 +513,7 @@ rocksdb::Status Storage::Get(const rocksdb::ReadOptions &options, const rocksdb:
rocksdb::Status Storage::Get(const rocksdb::ReadOptions &options, rocksdb::ColumnFamilyHandle *column_family,
const rocksdb::Slice &key, std::string *value) {
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
return txn_write_batch_->GetFromBatchAndDB(db_, options, column_family, key, value);
return txn_write_batch_->GetFromBatchAndDB(db_.get(), options, column_family, key, value);
}
return db_->Get(options, column_family, key, value);
}
Expand All @@ -542,7 +535,8 @@ void Storage::MultiGet(const rocksdb::ReadOptions &options, rocksdb::ColumnFamil
const size_t num_keys, const rocksdb::Slice *keys, rocksdb::PinnableSlice *values,
rocksdb::Status *statuses) {
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
txn_write_batch_->MultiGetFromBatchAndDB(db_, options, column_family, num_keys, keys, values, statuses, false);
txn_write_batch_->MultiGetFromBatchAndDB(db_.get(), options, column_family, num_keys, keys, values, statuses,
false);
} else {
db_->MultiGet(options, column_family, num_keys, keys, values, statuses, false);
}
Expand Down Expand Up @@ -701,7 +695,7 @@ void Storage::SetIORateLimit(int64_t max_io_mb) {
rate_limiter_->SetBytesPerSecond(max_io_mb * static_cast<int64_t>(MiB));
}

rocksdb::DB *Storage::GetDB() { return db_; }
rocksdb::DB *Storage::GetDB() { return db_.get(); }

Status Storage::BeginTxn() {
if (is_txn_mode_) {
Expand Down Expand Up @@ -834,7 +828,7 @@ Status Storage::ReplDataManager::GetFullReplDataInfo(Storage *storage, std::stri
// Create checkpoint if not exist
if (!storage->env_->FileExists(data_files_dir).ok()) {
rocksdb::Checkpoint *checkpoint = nullptr;
rocksdb::Status s = rocksdb::Checkpoint::Create(storage->db_, &checkpoint);
rocksdb::Status s = rocksdb::Checkpoint::Create(storage->db_.get(), &checkpoint);
if (!s.ok()) {
LOG(WARNING) << "Failed to create checkpoint object. Error: " << s.ToString();
return {Status::NotOK, s.ToString()};
Expand Down
4 changes: 2 additions & 2 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,10 @@ class Storage {
std::string GetReplIdFromDbEngine();

private:
rocksdb::DB *db_ = nullptr;
std::unique_ptr<rocksdb::DB> db_ = nullptr;
std::string replid_;
time_t backup_creating_time_;
rocksdb::BackupEngine *backup_ = nullptr;
std::unique_ptr<rocksdb::BackupEngine> backup_ = nullptr;
rocksdb::Env *env_;
std::shared_ptr<rocksdb::SstFileManager> sst_file_manager_;
std::shared_ptr<rocksdb::RateLimiter> rate_limiter_;
Expand Down
2 changes: 1 addition & 1 deletion tests/cppunit/types/set_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ TEST_F(RedisSetTest, Move) {
uint64_t ret = 0;
bool flag = false;
rocksdb::Status s = set_->Add(key_, fields_, &ret);
EXPECT_TRUE(s.ok() && static_cast<int>(fields_.size()) == ret);
EXPECT_TRUE(s.ok() && fields_.size() == ret);
Slice dst("set-test-move-key");
for (auto &field : fields_) {
s = set_->Move(key_, dst, field, &flag);
Expand Down

0 comments on commit f557255

Please sign in to comment.