From 9cd60c649aaa5db0abd33c7e94f8215c2adc61a0 Mon Sep 17 00:00:00 2001 From: ofriedma <48631098+ofriedma@users.noreply.github.com> Date: Wed, 2 Aug 2023 14:05:24 +0300 Subject: [PATCH] Snapshot Optimization (#547) * Snapshot Optimization (https://github.com/speedb-io/speedb/issues/35) Motivation: The most important information inside a snapshot is its Sequence number, which allows the compaction to know if the key-value should be deleted or not. The sequence number is being changed when modification happens in the db. This feature allows the db to take a snapshot without acquiring db mutex when the last snapshot has the same sequence number as a new one. In transactional db with mostly read operations, it should improve performance when used with multithreaded environment and as well other scenarios of taking large amount of snapshots with mostly read operations. This Feature must have folly library installed. In order to cache the snapshots, there is last_snapshot_ (folly::atomic_shared_ptr, lock free atomic_shared_ptr) in order to access the last_snapshot_ created and point to it. For every GetSnapshotImpl call (where snapshots are being created), the function checks if the sequence number is different than last_snapshot_, if no, it creates new snapshot and inside this snapshot it adds a reference to last_snapshot_ (the reference is cached_snapshot), so this sequence number will remain inside SnapshotList (SnapshotList is the list of the snapshots in the system and used in compaction to show which snapshots are being used), if there are still snapshots holding this sequence number. If the sequence number as changed or the last_snapshot_ is nullptr it will create the snapshot while acquiring db_mutex. For ReleaseSnapshotImpl (deleting a snapshot). We will unref the last_snapshot_ (using comapre_exchange_weak) and if the refcount becomes 0, it will call Deleter and remove this snapshot entirely from the SnapshotList and continue with taking the db mutex. If there are still references, it will return without taking it out from the SnapshotList nor taking the db mutex --- CMakeLists.txt | 12 +++ HISTORY.md | 4 + cmake/modules/FindFolly.cmake | 31 +++++++ db/db_impl/db_impl.cc | 53 +++++------- db/db_impl/db_impl.h | 5 ++ db/db_test2.cc | 40 +++++++++ db/snapshot_impl.h | 150 ++++++++++++++++++++++++++++++---- 7 files changed, 246 insertions(+), 49 deletions(-) create mode 100644 cmake/modules/FindFolly.cmake diff --git a/CMakeLists.txt b/CMakeLists.txt index de8e5aa7ad..987ca1e132 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -396,6 +396,16 @@ if(WITH_TBB) list(APPEND THIRDPARTY_LIBS TBB::TBB) endif() +option(WITH_SNAP_OPTIMIZATION "Optimize Snapshot performance for read mostly workload" OFF) +if(WITH_SNAP_OPTIMIZATION) + find_package(folly REQUIRED) + add_definitions(-DSPEEDB_SNAP_OPTIMIZATION) + list(APPEND THIRDPARTY_LIBS folly) + message(STATUS "Enabling RTTI in all builds - part of folly requirements") + set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI") + set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DROCKSDB_USE_RTTI") +endif() + # Stall notifications eat some performance from inserts option(DISABLE_STALL_NOTIF "Build with stall notifications" OFF) if(DISABLE_STALL_NOTIF) @@ -415,6 +425,7 @@ endif() # RTTI is by default AUTO which enables it in debug and disables it in release. +if(NOT WITH_SNAP_OPTIMIZATION) set(USE_RTTI AUTO CACHE STRING "Enable RTTI in builds") set_property(CACHE USE_RTTI PROPERTY STRINGS AUTO ON OFF) if(USE_RTTI STREQUAL "AUTO") @@ -440,6 +451,7 @@ else() set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti") endif() endif() +endif() # Used to run CI build and tests so we can run faster option(OPTDBG "Build optimized debug build with MSVC" OFF) diff --git a/HISTORY.md b/HISTORY.md index 1dddafc28e..abf3e3ab4c 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,7 +1,11 @@ # Speedb Change Log ## Unreleased + +### New Features +* Snapshot optimization - The most important information inside a snapshot is its Sequence number, which allows the compaction to know if the key-value should be deleted or not. The sequence number is being changed when modification happens in the db. This feature allows the db to take a snapshot without acquiring db mutex when the last snapshot has the same sequence number as a new one. In transactional db with mostly read operations, it should improve performance when used with multithreaded environment and as well other scenarios of taking large amount of snapshots with mostly read operations. * Add a TablePinningPolicy to the BlockBasedTableOptions. This class controls when blocks should be pinned in memory for a block based table. The default behavior uses the MetadataCacheOptions to control pinning and behaves identical to the previous releases. + ### Enhancements * db_bench: add estimate-table-readers-mem benchmark which prints these stats. diff --git a/cmake/modules/FindFolly.cmake b/cmake/modules/FindFolly.cmake new file mode 100644 index 0000000000..9b12b6730f --- /dev/null +++ b/cmake/modules/FindFolly.cmake @@ -0,0 +1,31 @@ +find_path(FOLLY_ROOT_DIR + NAMES include/folly/folly-config.h +) + +find_library(FOLLY_LIBRARIES + NAMES folly + HINTS ${FOLLY_ROOT_DIR}/lib +) + +find_library(FOLLY_BENCHMARK_LIBRARIES + NAMES follybenchmark + HINTS ${FOLLY_ROOT_DIR}/lib +) + +find_path(FOLLY_INCLUDE_DIR + NAMES folly/folly-config.h + HINTS ${FOLLY_ROOT_DIR}/include +) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(Folly DEFAULT_MSG + FOLLY_LIBRARIES + FOLLY_INCLUDE_DIR +) + +mark_as_advanced( + FOLLY_ROOT_DIR + FOLLY_LIBRARIES + FOLLY_BENCHMARK_LIBRARIES + FOLLY_INCLUDE_DIR +) \ No newline at end of file diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 0467f63747..0c128ad95f 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -219,6 +219,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, nonmem_write_thread_(immutable_db_options_), write_controller_(immutable_db_options_.write_controller), last_batch_group_size_(0), + snapshots_(immutable_db_options_.clock), unscheduled_flushes_(0), unscheduled_compactions_(0), bg_bottom_compaction_scheduled_(0), @@ -3872,27 +3873,22 @@ Status DBImpl::GetTimestampedSnapshots( SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary, bool lock) { - int64_t unix_time = 0; - immutable_db_options_.clock->GetCurrentTime(&unix_time) - .PermitUncheckedError(); // Ignore error - SnapshotImpl* s = new SnapshotImpl; + if (!is_snapshot_supported_) { + return nullptr; + } + SnapshotImpl* snapshot = snapshots_.RefSnapshot(is_write_conflict_boundary, + GetLastPublishedSequence()); + if (snapshot) { + return snapshot; + } if (lock) { mutex_.Lock(); } else { mutex_.AssertHeld(); } - // returns null if the underlying memtable does not support snapshot. - if (!is_snapshot_supported_) { - if (lock) { - mutex_.Unlock(); - } - delete s; - return nullptr; - } - auto snapshot_seq = GetLastPublishedSequence(); - SnapshotImpl* snapshot = - snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary); + snapshot = + snapshots_.New(GetLastPublishedSequence(), is_write_conflict_boundary); if (lock) { mutex_.Unlock(); } @@ -3902,10 +3898,11 @@ SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary, std::pair> DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts, bool lock) { - int64_t unix_time = 0; - immutable_db_options_.clock->GetCurrentTime(&unix_time) - .PermitUncheckedError(); // Ignore error - SnapshotImpl* s = new SnapshotImpl; + // returns null if the underlying memtable does not support snapshot. + if (!is_snapshot_supported_) { + return std::make_pair( + Status::NotSupported("Memtable does not support snapshot"), nullptr); + } const bool need_update_seq = (snapshot_seq != kMaxSequenceNumber); @@ -3914,16 +3911,6 @@ DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts, } else { mutex_.AssertHeld(); } - // returns null if the underlying memtable does not support snapshot. - if (!is_snapshot_supported_) { - if (lock) { - mutex_.Unlock(); - } - delete s; - return std::make_pair( - Status::NotSupported("Memtable does not support snapshot"), nullptr); - } - // Caller is not write thread, thus didn't provide a valid snapshot_seq. // Obtain seq from db. if (!need_update_seq) { @@ -3973,7 +3960,6 @@ DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts, if (lock) { mutex_.Unlock(); } - delete s; return std::make_pair(status, ret); } else { status.PermitUncheckedError(); @@ -3981,7 +3967,7 @@ DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts, } SnapshotImpl* snapshot = - snapshots_.New(s, snapshot_seq, unix_time, + snapshots_.New(snapshot_seq, /*is_write_conflict_boundary=*/true, ts); std::shared_ptr ret( @@ -4028,9 +4014,13 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { return; } const SnapshotImpl* casted_s = reinterpret_cast(s); + if (snapshots_.UnRefSnapshot(casted_s)) { + return; + } { InstrumentedMutexLock l(&mutex_); snapshots_.Delete(casted_s); + std::unique_lock snapshotlist_lock(snapshots_.lock_); uint64_t oldest_snapshot; if (snapshots_.empty()) { oldest_snapshot = GetLastPublishedSequence(); @@ -4071,7 +4061,6 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { bottommost_files_mark_threshold_ = new_bottommost_files_mark_threshold; } } - delete casted_s; } Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family, diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 474a81be10..da0ad686be 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -346,6 +346,11 @@ class DBImpl : public DB { std::vector* iterators) override; virtual const Snapshot* GetSnapshot() override; + // Will unref a snapshot copy + // Returns true if the snapshot has not been deleted from SnapshotList + bool UnRefSnapshot(const SnapshotImpl* snapshot, bool& is_cached_snapshot); + // true if the snapshot provided has been referenced, otherwise false + bool RefSnapshot(bool is_write_conflict_boundary, SnapshotImpl* snapshot); virtual void ReleaseSnapshot(const Snapshot* snapshot) override; // Create a timestamped snapshot. This snapshot can be shared by multiple // readers. If any of them uses it for write conflict checking, then diff --git a/db/db_test2.cc b/db/db_test2.cc index 8d707aaffc..c48020c7c2 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2048,6 +2048,46 @@ TEST_F(DBTest2, DuplicateSnapshot) { } } +#ifdef SPEEDB_SNAP_OPTIMIZATION +// This test should run only if there is snapshot optimization enabled +TEST_F(DBTest2, RefSnapshot) { + Options options; + options = CurrentOptions(options); + std::vector snapshots; + DBImpl* dbi = static_cast_with_check(db_); + SequenceNumber oldest_ww_snap, first_ww_snap; + + ASSERT_OK(Put("k", "v")); // inc seq + snapshots.push_back(db_->GetSnapshot()); + snapshots.push_back(db_->GetSnapshot()); + ASSERT_OK(Put("k", "v")); // inc seq + snapshots.push_back(db_->GetSnapshot()); + snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary()); + first_ww_snap = snapshots.back()->GetSequenceNumber(); + ASSERT_OK(Put("k", "v")); // inc seq + snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary()); + snapshots.push_back(db_->GetSnapshot()); + ASSERT_OK(Put("k", "v")); // inc seq + snapshots.push_back(db_->GetSnapshot()); + snapshots.push_back(db_->GetSnapshot()); // this should create a reference + + { + InstrumentedMutexLock l(dbi->mutex()); + auto seqs = dbi->snapshots().GetAll(&oldest_ww_snap); + ASSERT_EQ(seqs.size(), 4); // duplicates are not counted + ASSERT_EQ(oldest_ww_snap, first_ww_snap); + ASSERT_EQ(dbi->snapshots().count(), + 6); // how many snapshots stored in SnapshotList + ASSERT_EQ(dbi->snapshots().logical_count(), + 8); // how many snapshots in the system + } + + for (auto s : snapshots) { + db_->ReleaseSnapshot(s); + } +} +#endif + class PinL0IndexAndFilterBlocksTest : public DBTestBase, public testing::WithParamInterface> { diff --git a/db/snapshot_impl.h b/db/snapshot_impl.h index 23e5e98cd2..cb7f138be6 100644 --- a/db/snapshot_impl.h +++ b/db/snapshot_impl.h @@ -8,10 +8,15 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include #include #include "db/dbformat.h" +#ifdef SPEEDB_SNAP_OPTIMIZATION +#include "folly/concurrency/AtomicSharedPtr.h" +#endif #include "rocksdb/db.h" +#include "rocksdb/types.h" #include "util/autovector.h" namespace ROCKSDB_NAMESPACE { @@ -22,17 +27,39 @@ class SnapshotList; // Each SnapshotImpl corresponds to a particular sequence number. class SnapshotImpl : public Snapshot { public: + int64_t unix_time_; + uint64_t timestamp_; + // Will this snapshot be used by a Transaction to do write-conflict checking? + bool is_write_conflict_boundary_; + + SnapshotImpl() {} + + SnapshotImpl(SnapshotImpl* s) { + number_ = s->number_; + unix_time_ = s->unix_time_; + is_write_conflict_boundary_ = s->is_write_conflict_boundary_; + timestamp_ = s->timestamp_; + } + +#ifdef SPEEDB_SNAP_OPTIMIZATION + std::atomic_uint64_t refcount = {1}; + std::shared_ptr cached_snapshot = nullptr; + + struct Deleter { + inline void operator()(SnapshotImpl* snap) const; + }; + // Will this snapshot be used by a Transaction to do write-conflict checking? +#endif SequenceNumber number_; // const after creation // It indicates the smallest uncommitted data at the time the snapshot was // taken. This is currently used by WritePrepared transactions to limit the // scope of queries to IsInSnapshot. SequenceNumber min_uncommitted_ = kMinUnCommittedSeq; - SequenceNumber GetSequenceNumber() const override { return number_; } - int64_t GetUnixTime() const override { return unix_time_; } uint64_t GetTimestamp() const override { return timestamp_; } + SequenceNumber GetSequenceNumber() const override { return number_; } private: friend class SnapshotList; @@ -41,19 +68,19 @@ class SnapshotImpl : public Snapshot { SnapshotImpl* prev_; SnapshotImpl* next_; - SnapshotList* list_; // just for sanity checks - - int64_t unix_time_; - - uint64_t timestamp_; - - // Will this snapshot be used by a Transaction to do write-conflict checking? - bool is_write_conflict_boundary_; + SnapshotList* list_; }; class SnapshotList { public: - SnapshotList() { + mutable std::mutex lock_; + SystemClock* clock_; +#ifdef SPEEDB_SNAP_OPTIMIZATION + bool deleteitem_ = false; + folly::atomic_shared_ptr last_snapshot_; +#endif + SnapshotList(SystemClock* clock) { + clock_ = clock; list_.prev_ = &list_; list_.next_ = &list_; list_.number_ = 0xFFFFFFFFL; // placeholder marker, for debugging @@ -63,6 +90,29 @@ class SnapshotList { list_.timestamp_ = 0; list_.is_write_conflict_boundary_ = false; count_ = 0; +#ifdef SPEEDB_SNAP_OPTIMIZATION + last_snapshot_ = nullptr; +#endif + } + SnapshotImpl* RefSnapshot([[maybe_unused]] bool is_write_conflict_boundary, + [[maybe_unused]] SequenceNumber last_seq) { +#ifdef SPEEDB_SNAP_OPTIMIZATION + std::shared_ptr shared_snap = last_snapshot_; + if (shared_snap && shared_snap->GetSequenceNumber() == last_seq && + shared_snap->is_write_conflict_boundary_ == + is_write_conflict_boundary) { + SnapshotImpl* snapshot = new SnapshotImpl; + clock_->GetCurrentTime(&snapshot->unix_time_) + .PermitUncheckedError(); // Ignore error + snapshot->cached_snapshot = shared_snap; + logical_count_.fetch_add(1); + shared_snap->refcount.fetch_add(1); + snapshot->number_ = shared_snap->GetSequenceNumber(); + snapshot->is_write_conflict_boundary_ = is_write_conflict_boundary; + return snapshot; + } +#endif + return nullptr; } // No copy-construct. @@ -81,11 +131,48 @@ class SnapshotList { return list_.prev_; } - SnapshotImpl* New(SnapshotImpl* s, SequenceNumber seq, uint64_t unix_time, - bool is_write_conflict_boundary, +#ifdef SPEEDB_SNAP_OPTIMIZATION + SnapshotImpl* NewSnapRef(SnapshotImpl* s) { + // user snapshot is a reference to the snapshot inside the SnapshotList + // Unfortunatly right now the snapshot api cannot return shared_ptr to the + // user so a deep copy should be created + // s is the original snapshot that is being stored in the SnapshotList + SnapshotImpl* user_snapshot = new SnapshotImpl(s); + auto new_last_snapshot = + std::shared_ptr(s, SnapshotImpl::Deleter{}); + // may call Deleter + last_snapshot_ = new_last_snapshot; + user_snapshot->cached_snapshot = last_snapshot_; + return user_snapshot; + } +#endif + bool UnRefSnapshot([[maybe_unused]] const SnapshotImpl* snapshot) { +#ifdef SPEEDB_SNAP_OPTIMIZATION + SnapshotImpl* snap = const_cast(snapshot); + logical_count_.fetch_sub(1); + size_t cnt = snap->cached_snapshot->refcount.fetch_sub(1); + if (cnt < 2) { + last_snapshot_.compare_exchange_weak(snap->cached_snapshot, nullptr); + } + delete snap; + if (!deleteitem_) { + // item has not been deleted from SnapshotList + return true; + } +#endif + return false; + } + + SnapshotImpl* New(SequenceNumber seq, bool is_write_conflict_boundary, uint64_t ts = std::numeric_limits::max()) { + SnapshotImpl* s = new SnapshotImpl; +#ifdef SPEEDB_SNAP_OPTIMIZATION + std::unique_lock l(lock_); + logical_count_.fetch_add(1); +#endif + clock_->GetCurrentTime(&s->unix_time_) + .PermitUncheckedError(); // Ignore error s->number_ = seq; - s->unix_time_ = unix_time; s->timestamp_ = ts; s->is_write_conflict_boundary_ = is_write_conflict_boundary; s->list_ = this; @@ -94,15 +181,25 @@ class SnapshotList { s->prev_->next_ = s; s->next_->prev_ = s; count_++; +#ifdef SPEEDB_SNAP_OPTIMIZATION + l.unlock(); + return NewSnapRef(s); +#endif return s; } // Do not responsible to free the object. void Delete(const SnapshotImpl* s) { +#ifdef SPEEDB_SNAP_OPTIMIZATION + std::unique_lock l(lock_); + deleteitem_ = false; +#else assert(s->list_ == this); + count_--; s->prev_->next_ = s->next_; s->next_->prev_ = s->prev_; - count_--; + delete s; +#endif } // retrieve all snapshot numbers up until max_seq. They are sorted in @@ -118,6 +215,9 @@ class SnapshotList { void GetAll(std::vector* snap_vector, SequenceNumber* oldest_write_conflict_snapshot = nullptr, const SequenceNumber& max_seq = kMaxSequenceNumber) const { +#ifdef SPEEDB_SNAP_OPTIMIZATION + std::scoped_lock l(lock_); +#endif std::vector& ret = *snap_vector; // So far we have no use case that would pass a non-empty vector assert(ret.size() == 0); @@ -176,12 +276,17 @@ class SnapshotList { } } + // How many snapshots in the SnapshotList uint64_t count() const { return count_; } + // How many snapshots in the system included those that created refcount + uint64_t logical_count() const { return logical_count_; } + + std::atomic_uint64_t logical_count_ = {0}; + uint64_t count_; private: // Dummy head of doubly-linked list of snapshots SnapshotImpl list_; - uint64_t count_; }; // All operations on TimestampedSnapshotList must be protected by db mutex. @@ -235,5 +340,16 @@ class TimestampedSnapshotList { private: std::map> snapshots_; }; - +#ifdef SPEEDB_SNAP_OPTIMIZATION +inline void SnapshotImpl::Deleter::operator()(SnapshotImpl* snap) const { + if (snap->cached_snapshot == nullptr) { + std::scoped_lock l(snap->list_->lock_); + snap->list_->count_--; + snap->prev_->next_ = snap->next_; + snap->next_->prev_ = snap->prev_; + snap->list_->deleteitem_ = true; + } + delete snap; +} +#endif } // namespace ROCKSDB_NAMESPACE