Skip to content

Commit

Permalink
Snapshot Optimization (#35)
Browse files Browse the repository at this point in the history
Motivation:
The most important information inside a snapshot is its Sequence number, which allows the compaction to know if the key-value should be deleted or not.
The sequence number is being changed when modification happens in the db.
This feature allows the db to take a snapshot without acquiring db mutex when the last snapshot has the same sequence number as a new one.
In transactional db with mostly read operations, it should improve performance when used with multithreaded environment and as well other scenarios of taking large amount of snapshots with mostly read operations.

This Feature must have folly library installed.

In order to cache the snapshots, there is last_snapshot_
(folly::atomic_shared_ptr, lock free atomic_shared_ptr) in order to
access the last_snapshot_ created and point to it.
For every GetSnapshotImpl call (where snapshots are being created), the
function checks if the sequence number is different than last_snapshot_,
if no, it creates new snapshot and inside this snapshot it adds a
reference to last_snapshot_ (the reference is cached_snapshot), so this sequence number will remain inside
SnapshotList (SnapshotList is the list of the snapshots in the system and used in compaction to show which snapshots are being used), if there are still snapshots holding this sequence number. If the sequence number as changed or the last_snapshot_ is nullptr it will create the snapshot while acquiring db_mutex.

For ReleaseSnapshotImpl (deleting a snapshot).
We will unref the last_snapshot_ (using comapre_exchange_weak) and if the refcount becomes 0, it will
call Deleter and remove this snapshot entirely from the SnapshotList and
continue with taking the db mutex.
If there are still references, it will return without taking it out from
the SnapshotList nor taking the db mutex
  • Loading branch information
Or Friedmann committed Jul 6, 2023
1 parent f66ae81 commit 523a6fa
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 6 deletions.
12 changes: 12 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,16 @@ if(WITH_TBB)
list(APPEND THIRDPARTY_LIBS TBB::TBB)
endif()

option(WITH_SNAP_OPTIMIZATION "Optimize Snapshot performance for read mostly workload" OFF)
if(WITH_SNAP_OPTIMIZATION)
find_package(folly REQUIRED)
add_definitions(-DSPEEDB_SNAP_OPTIMIZATION)
list(APPEND THIRDPARTY_LIBS folly)
message(STATUS "Enabling RTTI in all builds - part of folly requirements")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DROCKSDB_USE_RTTI")
endif()

# Stall notifications eat some performance from inserts
option(DISABLE_STALL_NOTIF "Build with stall notifications" OFF)
if(DISABLE_STALL_NOTIF)
Expand All @@ -446,6 +456,7 @@ endif()


# RTTI is by default AUTO which enables it in debug and disables it in release.
if(NOT WITH_SNAP_OPTIMIZATION)
set(USE_RTTI AUTO CACHE STRING "Enable RTTI in builds")
set_property(CACHE USE_RTTI PROPERTY STRINGS AUTO ON OFF)
if(USE_RTTI STREQUAL "AUTO")
Expand All @@ -471,6 +482,7 @@ else()
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti")
endif()
endif()
endif()

# Used to run CI build and tests so we can run faster
option(OPTDBG "Build optimized debug build with MSVC" OFF)
Expand Down
2 changes: 2 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ we still write it with its value to SST file.
This feature keeps only the delete record and reduce SST size for later compaction.
(#411)

* Snapshot optimization - The most important information inside a snapshot is its Sequence number, which allows the compaction to know if the key-value should be deleted or not. The sequence number is being changed when modification happens in the db. This feature allows the db to take a snapshot without acquiring db mutex when the last snapshot has the same sequence number as a new one. In transactional db with mostly read operations, it should improve performance when used with multithreaded environment and as well other scenarios of taking large amount of snapshots with mostly read operations.

### Enhancements
* CI: add a workflow for building and publishing jar to maven central (#507)
* LOG: Compaction job traces - report cf name and job id (#511)
Expand Down
31 changes: 31 additions & 0 deletions cmake/modules/FindFolly.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
find_path(FOLLY_ROOT_DIR
NAMES include/folly/folly-config.h
)

find_library(FOLLY_LIBRARIES
NAMES folly
HINTS ${FOLLY_ROOT_DIR}/lib
)

find_library(FOLLY_BENCHMARK_LIBRARIES
NAMES follybenchmark
HINTS ${FOLLY_ROOT_DIR}/lib
)

find_path(FOLLY_INCLUDE_DIR
NAMES folly/folly-config.h
HINTS ${FOLLY_ROOT_DIR}/include
)

include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(Folly DEFAULT_MSG
FOLLY_LIBRARIES
FOLLY_INCLUDE_DIR
)

mark_as_advanced(
FOLLY_ROOT_DIR
FOLLY_LIBRARIES
FOLLY_BENCHMARK_LIBRARIES
FOLLY_INCLUDE_DIR
)
70 changes: 68 additions & 2 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3714,10 +3714,16 @@ Status DBImpl::GetTimestampedSnapshots(

SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
bool lock) {
if (!is_snapshot_supported_) {
return nullptr;
}
int64_t unix_time = 0;
immutable_db_options_.clock->GetCurrentTime(&unix_time)
.PermitUncheckedError(); // Ignore error
SnapshotImpl* s = new SnapshotImpl;
#ifdef SPEEDB_SNAP_OPTIMIZATION
if (RefSnapshot(unix_time, is_write_conflict_boundary, s)) {
return s;
}
#endif

if (lock) {
mutex_.Lock();
Expand All @@ -3732,6 +3738,8 @@ SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
delete s;
return nullptr;
}
immutable_db_options_.clock->GetCurrentTime(&unix_time)
.PermitUncheckedError(); // Ignore error
auto snapshot_seq = GetLastPublishedSequence();
SnapshotImpl* snapshot =
snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
Expand Down Expand Up @@ -3862,6 +3870,47 @@ bool CfdListContains(const CfdList& list, ColumnFamilyData* cfd) {
}
} // namespace

#ifdef SPEEDB_SNAP_OPTIMIZATION
bool DBImpl::UnRefSnapshot(const SnapshotImpl* snapshot,
bool& is_cached_snapshot) {
SnapshotImpl* snap = const_cast<SnapshotImpl*>(snapshot);
if (snap->cached_snapshot) {
snapshots_.logical_count_.fetch_sub(1);
is_cached_snapshot = true;
size_t cnt = snap->cached_snapshot->refcount.fetch_sub(1);
if (cnt < 2) {
snapshots_.last_snapshot_.compare_exchange_weak(snap->cached_snapshot,
nullptr);
}
delete snap;
}
if (!snapshots_.deleteitem && is_cached_snapshot) {
return true;
}
return false;
}

bool DBImpl::RefSnapshot(int64_t unix_time, bool is_write_conflict_boundary,
SnapshotImpl* snapshot) {
std::shared_ptr<SnapshotImpl> shared_snap = snapshots_.last_snapshot_;
if (shared_snap &&
shared_snap->GetSequenceNumber() == GetLastPublishedSequence() &&
shared_snap->is_write_conflict_boundary_ == is_write_conflict_boundary) {
immutable_db_options_.clock->GetCurrentTime(&unix_time)
.PermitUncheckedError(); // Ignore error
snapshot->cached_snapshot = shared_snap;
snapshots_.logical_count_.fetch_add(1);
shared_snap->refcount.fetch_add(1);
snapshot->number_ = shared_snap->GetSequenceNumber();
snapshot->unix_time_ = unix_time;
snapshot->is_write_conflict_boundary_ = is_write_conflict_boundary;
return true;
}
return false;
}

#endif

void DBImpl::ReleaseSnapshot(const Snapshot* s) {
if (s == nullptr) {
// DBImpl::GetSnapshot() can return nullptr when snapshot
Expand All @@ -3870,9 +3919,24 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
return;
}
const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s);
#ifdef SPEEDB_SNAP_OPTIMIZATION
bool is_cached_snapshot = false;
if (UnRefSnapshot(casted_s, is_cached_snapshot)) {
return;
}
#endif
{
InstrumentedMutexLock l(&mutex_);
#ifdef SPEEDB_SNAP_OPTIMIZATION
std::scoped_lock<std::mutex> snaplock(snapshots_.lock);
snapshots_.deleteitem = false;
if (!is_cached_snapshot) {
snapshots_.Delete(casted_s);
delete casted_s;
}
#else
snapshots_.Delete(casted_s);
#endif
uint64_t oldest_snapshot;
if (snapshots_.empty()) {
oldest_snapshot = GetLastPublishedSequence();
Expand Down Expand Up @@ -3913,7 +3977,9 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
bottommost_files_mark_threshold_ = new_bottommost_files_mark_threshold;
}
}
#ifndef SPEEDB_SNAP_OPTIMIZATION
delete casted_s;
#endif
}

Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
Expand Down
6 changes: 6 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,12 @@ class DBImpl : public DB {
std::vector<Iterator*>* iterators) override;

virtual const Snapshot* GetSnapshot() override;
// Will unref a snapshot copy
// Returns true if the snapshot has not been deleted from SnapshotList
bool UnRefSnapshot(const SnapshotImpl* snapshot, bool& is_cached_snapshot);
// true if the snapshot provided has been referenced, otherwise false
bool RefSnapshot(int64_t unix_time, bool is_write_conflict_boundary,
SnapshotImpl* snapshot);
virtual void ReleaseSnapshot(const Snapshot* snapshot) override;
// Create a timestamped snapshot. This snapshot can be shared by multiple
// readers. If any of them uses it for write conflict checking, then
Expand Down
40 changes: 40 additions & 0 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2048,6 +2048,46 @@ TEST_F(DBTest2, DuplicateSnapshot) {
}
}

#ifdef SPEEDB_SNAP_OPTIMIZATION
// This test should run only if there is snapshot optimization enabled
TEST_F(DBTest2, RefSnapshot) {
Options options;
options = CurrentOptions(options);
std::vector<const Snapshot*> snapshots;
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
SequenceNumber oldest_ww_snap, first_ww_snap;

ASSERT_OK(Put("k", "v")); // inc seq
snapshots.push_back(db_->GetSnapshot());
snapshots.push_back(db_->GetSnapshot());
ASSERT_OK(Put("k", "v")); // inc seq
snapshots.push_back(db_->GetSnapshot());
snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
first_ww_snap = snapshots.back()->GetSequenceNumber();
ASSERT_OK(Put("k", "v")); // inc seq
snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
snapshots.push_back(db_->GetSnapshot());
ASSERT_OK(Put("k", "v")); // inc seq
snapshots.push_back(db_->GetSnapshot());
snapshots.push_back(db_->GetSnapshot()); // this should create a reference

{
InstrumentedMutexLock l(dbi->mutex());
auto seqs = dbi->snapshots().GetAll(&oldest_ww_snap);
ASSERT_EQ(seqs.size(), 4); // duplicates are not counted
ASSERT_EQ(oldest_ww_snap, first_ww_snap);
ASSERT_EQ(dbi->snapshots().count(),
6); // how many snapshots stored in SnapshotList
ASSERT_EQ(dbi->snapshots().logical_count(),
8); // how many snapshots in the system
}

for (auto s : snapshots) {
db_->ReleaseSnapshot(s);
}
}
#endif

class PinL0IndexAndFilterBlocksTest
: public DBTestBase,
public testing::WithParamInterface<std::tuple<bool, bool>> {
Expand Down
Loading

0 comments on commit 523a6fa

Please sign in to comment.