Skip to content

Commit

Permalink
Remove the db lock from the context to prevent blocking user requests
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk committed Oct 14, 2024
1 parent 80b7d85 commit 8d1bab3
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 38 deletions.
3 changes: 2 additions & 1 deletion src/storage/batch_indexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class WriteBatchIndexer : public rocksdb::WriteBatch::Handler {
DCHECK_NOTNULL(dest_batch);
DCHECK_NOTNULL(snapshot);
}
explicit WriteBatchIndexer(engine::Context& ctx) : WriteBatchIndexer(ctx.storage, ctx.batch.get(), ctx.snapshot) {}
explicit WriteBatchIndexer(engine::Context& ctx)
: WriteBatchIndexer(ctx.storage, ctx.batch.get(), ctx.GetSnapshot()) {}
rocksdb::Status PutCF(uint32_t column_family_id, const rocksdb::Slice& key, const rocksdb::Slice& value) override {
return dest_batch_->Put(storage_->GetCFHandle(static_cast<ColumnFamilyID>(column_family_id)), key, value);
}
Expand Down
27 changes: 13 additions & 14 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &o
std::string *value) {
if (ctx.is_txn_mode) {
DCHECK_NOTNULL(options.snapshot);
DCHECK_EQ(ctx.snapshot->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
}
rocksdb::Status s;
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
Expand All @@ -624,7 +624,7 @@ rocksdb::Status Storage::Get(engine::Context &ctx, const rocksdb::ReadOptions &o
rocksdb::PinnableSlice *value) {
if (ctx.is_txn_mode) {
DCHECK_NOTNULL(options.snapshot);
DCHECK_EQ(ctx.snapshot->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
}
rocksdb::Status s;
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
Expand Down Expand Up @@ -657,7 +657,7 @@ rocksdb::Iterator *Storage::NewIterator(engine::Context &ctx, const rocksdb::Rea
rocksdb::ColumnFamilyHandle *column_family) {
if (ctx.is_txn_mode) {
DCHECK_NOTNULL(options.snapshot);
DCHECK_EQ(ctx.snapshot->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
}
auto iter = db_->NewIterator(options, column_family);
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
Expand All @@ -673,7 +673,7 @@ void Storage::MultiGet(engine::Context &ctx, const rocksdb::ReadOptions &options
rocksdb::PinnableSlice *values, rocksdb::Status *statuses) {
if (ctx.is_txn_mode) {
DCHECK_NOTNULL(options.snapshot);
DCHECK_EQ(ctx.snapshot->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
DCHECK_EQ(ctx.GetSnapshot()->GetSequenceNumber(), options.snapshot->GetSequenceNumber());
}
if (is_txn_mode_ && txn_write_batch_->GetWriteBatch()->Count() > 0) {
txn_write_batch_->MultiGetFromBatchAndDB(db_.get(), options, column_family, num_keys, keys, values, statuses,
Expand Down Expand Up @@ -1274,30 +1274,29 @@ bool Storage::ReplDataManager::FileExists(Storage *storage, const std::string &d
return crc == tmp_crc;
}

[[nodiscard]] rocksdb::ReadOptions Context::GetReadOptions() const {
[[nodiscard]] rocksdb::ReadOptions Context::GetReadOptions() {
rocksdb::ReadOptions read_options;
if (is_txn_mode) read_options.snapshot = snapshot;
if (is_txn_mode) read_options.snapshot = GetSnapshot();
return read_options;
}

[[nodiscard]] rocksdb::ReadOptions Context::DefaultScanOptions() const {
[[nodiscard]] rocksdb::ReadOptions Context::DefaultScanOptions() {
rocksdb::ReadOptions read_options = storage->DefaultScanOptions();
if (is_txn_mode) read_options.snapshot = snapshot;
if (is_txn_mode) read_options.snapshot = GetSnapshot();
return read_options;
}

[[nodiscard]] rocksdb::ReadOptions Context::DefaultMultiGetOptions() const {
[[nodiscard]] rocksdb::ReadOptions Context::DefaultMultiGetOptions() {
rocksdb::ReadOptions read_options = storage->DefaultMultiGetOptions();
if (is_txn_mode) read_options.snapshot = snapshot;
if (is_txn_mode) read_options.snapshot = GetSnapshot();
return read_options;
}

void Context::RefreshLatestSnapshot() {
auto guard = storage->WriteLockGuard();
if (snapshot) {
storage->GetDB()->ReleaseSnapshot(snapshot);
if (snapshot_) {
storage->GetDB()->ReleaseSnapshot(snapshot_);
}
snapshot = storage->GetDB()->GetSnapshot();
snapshot_ = storage->GetDB()->GetSnapshot();
if (batch) {
batch->Clear();
}
Expand Down
48 changes: 25 additions & 23 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,7 @@ class Storage {
/// Context does not provide thread safety guarantees and is generally only passed as a parameter between APIs.
struct Context {
engine::Storage *storage = nullptr;
/// If is_txn_mode is true, snapshot should be specified instead of nullptr when used,
/// and should be consistent with snapshot in ReadOptions to avoid ambiguity.
/// Normally it will be fixed to the latest Snapshot when the Context is constructed.
/// If is_txn_mode is false, the snapshot is nullptr.
const rocksdb::Snapshot *snapshot = nullptr;

std::unique_ptr<rocksdb::WriteBatchWithIndex> batch = nullptr;

/// is_txn_mode is used to determine whether the current Context is in transactional mode,
Expand All @@ -398,30 +394,23 @@ struct Context {

/// GetReadOptions returns a default ReadOptions, and if is_txn_mode = true, then its snapshot is specified by the
/// Context
[[nodiscard]] rocksdb::ReadOptions GetReadOptions() const;
[[nodiscard]] rocksdb::ReadOptions GetReadOptions();
/// DefaultScanOptions returns a DefaultScanOptions, and if is_txn_mode = true, then its snapshot is specified by the
/// Context. Otherwise it is the same as Storage::DefaultScanOptions
[[nodiscard]] rocksdb::ReadOptions DefaultScanOptions() const;
[[nodiscard]] rocksdb::ReadOptions DefaultScanOptions();
/// DefaultMultiGetOptions returns a DefaultMultiGetOptions, and if is_txn_mode = true, then its snapshot is specified
/// by the Context. Otherwise it is the same as Storage::DefaultMultiGetOptions
[[nodiscard]] rocksdb::ReadOptions DefaultMultiGetOptions() const;
[[nodiscard]] rocksdb::ReadOptions DefaultMultiGetOptions();

void RefreshLatestSnapshot();

/// TODO: Change it to defer getting the context, and the snapshot is pinned after the first read operation
explicit Context(engine::Storage *storage) : storage(storage) {
auto guard = storage->ReadLockGuard();
if (!storage->GetConfig()->txn_context_enabled) {
is_txn_mode = false;
return;
}
snapshot = storage->GetDB()->GetSnapshot(); // NOLINT
}
explicit Context(engine::Storage *storage)
: storage(storage), is_txn_mode(storage->GetConfig()->txn_context_enabled) {}
~Context() {
if (storage) {
auto guard = storage->WriteLockGuard();
if (storage->GetDB() && snapshot) {
storage->GetDB()->ReleaseSnapshot(snapshot);
if (snapshot_ && storage->GetDB()) {
storage->GetDB()->ReleaseSnapshot(snapshot_);
}
}
}
Expand All @@ -430,22 +419,35 @@ struct Context {
Context &operator=(Context &&ctx) noexcept {
if (this != &ctx) {
storage = ctx.storage;
snapshot = ctx.snapshot;
snapshot_ = ctx.snapshot_;
batch = std::move(ctx.batch);

ctx.storage = nullptr;
ctx.snapshot = nullptr;
ctx.snapshot_ = nullptr;
}
return *this;
}
Context(Context &&ctx) noexcept : storage(ctx.storage), snapshot(ctx.snapshot), batch(std::move(ctx.batch)) {
Context(Context &&ctx) noexcept : storage(ctx.storage), batch(std::move(ctx.batch)), snapshot_(ctx.snapshot_) {
ctx.storage = nullptr;
ctx.snapshot = nullptr;
ctx.snapshot_ = nullptr;
}

const rocksdb::Snapshot *GetSnapshot() {
if (snapshot_ == nullptr) {
snapshot_ = storage->GetDB()->GetSnapshot(); // NOLINT
}
return snapshot_;
}

private:
/// It is only used by NonTransactionContext
explicit Context(engine::Storage *storage, bool txn_mode) : storage(storage), is_txn_mode(txn_mode) {}

/// If is_txn_mode is true, snapshot should be specified instead of nullptr when used,
/// and should be consistent with snapshot in ReadOptions to avoid ambiguity.
/// Normally it will be fixed to the latest Snapshot when the Context is constructed.
/// If is_txn_mode is false, the snapshot is nullptr.
const rocksdb::Snapshot *snapshot_ = nullptr;
};

} // namespace engine

0 comments on commit 8d1bab3

Please sign in to comment.