Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshot Optimization (https://github.com/speedb-io/speedb/issues/35) #547

Merged
merged 3 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,16 @@ if(WITH_TBB)
list(APPEND THIRDPARTY_LIBS TBB::TBB)
endif()

option(WITH_SNAP_OPTIMIZATION "Optimize Snapshot performance for read mostly workload" OFF)
if(WITH_SNAP_OPTIMIZATION)
find_package(folly REQUIRED)
add_definitions(-DSPEEDB_SNAP_OPTIMIZATION)
list(APPEND THIRDPARTY_LIBS folly)
message(STATUS "Enabling RTTI in all builds - part of folly requirements")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DROCKSDB_USE_RTTI")
endif()

# Stall notifications eat some performance from inserts
option(DISABLE_STALL_NOTIF "Build with stall notifications" OFF)
if(DISABLE_STALL_NOTIF)
Expand All @@ -446,6 +456,7 @@ endif()


# RTTI is by default AUTO which enables it in debug and disables it in release.
if(NOT WITH_SNAP_OPTIMIZATION)
set(USE_RTTI AUTO CACHE STRING "Enable RTTI in builds")
set_property(CACHE USE_RTTI PROPERTY STRINGS AUTO ON OFF)
if(USE_RTTI STREQUAL "AUTO")
Expand All @@ -471,6 +482,7 @@ else()
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti")
endif()
endif()
endif()

# Used to run CI build and tests so we can run faster
option(OPTDBG "Build optimized debug build with MSVC" OFF)
Expand Down
4 changes: 4 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# Speedb Change Log

## Unreleased

### New Features
* Snapshot optimization - The most important information inside a snapshot is its Sequence number, which allows the compaction to know if the key-value should be deleted or not. The sequence number is being changed when modification happens in the db. This feature allows the db to take a snapshot without acquiring db mutex when the last snapshot has the same sequence number as a new one. In transactional db with mostly read operations, it should improve performance when used with multithreaded environment and as well other scenarios of taking large amount of snapshots with mostly read operations.
* Add a TablePinningPolicy to the BlockBasedTableOptions. This class controls when blocks should be pinned in memory for a block based table. The default behavior uses the MetadataCacheOptions to control pinning and behaves identical to the previous releases.

### Enhancements
* db_bench: add estimate-table-readers-mem benchmark which prints these stats.

Expand Down
31 changes: 31 additions & 0 deletions cmake/modules/FindFolly.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
find_path(FOLLY_ROOT_DIR
NAMES include/folly/folly-config.h
)

find_library(FOLLY_LIBRARIES
NAMES folly
HINTS ${FOLLY_ROOT_DIR}/lib
)

find_library(FOLLY_BENCHMARK_LIBRARIES
NAMES follybenchmark
HINTS ${FOLLY_ROOT_DIR}/lib
)

find_path(FOLLY_INCLUDE_DIR
NAMES folly/folly-config.h
HINTS ${FOLLY_ROOT_DIR}/include
)

include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(Folly DEFAULT_MSG
FOLLY_LIBRARIES
FOLLY_INCLUDE_DIR
)

mark_as_advanced(
FOLLY_ROOT_DIR
FOLLY_LIBRARIES
FOLLY_BENCHMARK_LIBRARIES
FOLLY_INCLUDE_DIR
)
53 changes: 21 additions & 32 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
nonmem_write_thread_(immutable_db_options_),
write_controller_(immutable_db_options_.write_controller),
last_batch_group_size_(0),
snapshots_(immutable_db_options_.clock),
unscheduled_flushes_(0),
unscheduled_compactions_(0),
bg_bottom_compaction_scheduled_(0),
Expand Down Expand Up @@ -3714,27 +3715,22 @@ Status DBImpl::GetTimestampedSnapshots(

SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to check if snapshot supported first... and if not exit immediately

bool lock) {
int64_t unix_time = 0;
immutable_db_options_.clock->GetCurrentTime(&unix_time)
.PermitUncheckedError(); // Ignore error
SnapshotImpl* s = new SnapshotImpl;
if (!is_snapshot_supported_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you didnt remove this check in line 3734

return nullptr;
}
SnapshotImpl* snapshot = snapshots_.RefSnapshot(is_write_conflict_boundary,
GetLastPublishedSequence());
if (snapshot) {
return snapshot;
}

if (lock) {
mutex_.Lock();
} else {
mutex_.AssertHeld();
}
// returns null if the underlying memtable does not support snapshot.
if (!is_snapshot_supported_) {
if (lock) {
mutex_.Unlock();
}
delete s;
return nullptr;
}
auto snapshot_seq = GetLastPublishedSequence();
SnapshotImpl* snapshot =
snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
snapshot =
snapshots_.New(GetLastPublishedSequence(), is_write_conflict_boundary);
if (lock) {
mutex_.Unlock();
}
Expand All @@ -3744,10 +3740,11 @@ SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
std::pair<Status, std::shared_ptr<const SnapshotImpl>>
DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts,
bool lock) {
int64_t unix_time = 0;
immutable_db_options_.clock->GetCurrentTime(&unix_time)
.PermitUncheckedError(); // Ignore error
SnapshotImpl* s = new SnapshotImpl;
// returns null if the underlying memtable does not support snapshot.
if (!is_snapshot_supported_) {
return std::make_pair(
Status::NotSupported("Memtable does not support snapshot"), nullptr);
}

const bool need_update_seq = (snapshot_seq != kMaxSequenceNumber);

Expand All @@ -3756,16 +3753,6 @@ DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts,
} else {
mutex_.AssertHeld();
}
// returns null if the underlying memtable does not support snapshot.
if (!is_snapshot_supported_) {
if (lock) {
mutex_.Unlock();
}
delete s;
return std::make_pair(
Status::NotSupported("Memtable does not support snapshot"), nullptr);
}

// Caller is not write thread, thus didn't provide a valid snapshot_seq.
// Obtain seq from db.
if (!need_update_seq) {
Expand Down Expand Up @@ -3815,15 +3802,14 @@ DBImpl::CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts,
if (lock) {
mutex_.Unlock();
}
delete s;
return std::make_pair(status, ret);
} else {
status.PermitUncheckedError();
}
}

SnapshotImpl* snapshot =
snapshots_.New(s, snapshot_seq, unix_time,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the signature of the function shouldn't contains unix_time

snapshots_.New(snapshot_seq,
/*is_write_conflict_boundary=*/true, ts);

std::shared_ptr<const SnapshotImpl> ret(
Expand Down Expand Up @@ -3870,9 +3856,13 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
return;
}
const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s);
if (snapshots_.UnRefSnapshot(casted_s)) {
return;
}
{
InstrumentedMutexLock l(&mutex_);
snapshots_.Delete(casted_s);
std::unique_lock<std::mutex> snapshotlist_lock(snapshots_.lock_);
uint64_t oldest_snapshot;
if (snapshots_.empty()) {
oldest_snapshot = GetLastPublishedSequence();
Expand Down Expand Up @@ -3913,7 +3903,6 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) {
bottommost_files_mark_threshold_ = new_bottommost_files_mark_threshold;
}
}
delete casted_s;
}

Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
Expand Down
5 changes: 5 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,11 @@ class DBImpl : public DB {
std::vector<Iterator*>* iterators) override;

virtual const Snapshot* GetSnapshot() override;
// Will unref a snapshot copy
// Returns true if the snapshot has not been deleted from SnapshotList
bool UnRefSnapshot(const SnapshotImpl* snapshot, bool& is_cached_snapshot);
// true if the snapshot provided has been referenced, otherwise false
bool RefSnapshot(bool is_write_conflict_boundary, SnapshotImpl* snapshot);
virtual void ReleaseSnapshot(const Snapshot* snapshot) override;
// Create a timestamped snapshot. This snapshot can be shared by multiple
// readers. If any of them uses it for write conflict checking, then
Expand Down
40 changes: 40 additions & 0 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2048,6 +2048,46 @@ TEST_F(DBTest2, DuplicateSnapshot) {
}
}

#ifdef SPEEDB_SNAP_OPTIMIZATION
// This test should run only if there is snapshot optimization enabled
TEST_F(DBTest2, RefSnapshot) {
Options options;
options = CurrentOptions(options);
std::vector<const Snapshot*> snapshots;
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
SequenceNumber oldest_ww_snap, first_ww_snap;

ASSERT_OK(Put("k", "v")); // inc seq
snapshots.push_back(db_->GetSnapshot());
snapshots.push_back(db_->GetSnapshot());
ASSERT_OK(Put("k", "v")); // inc seq
snapshots.push_back(db_->GetSnapshot());
snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
first_ww_snap = snapshots.back()->GetSequenceNumber();
ASSERT_OK(Put("k", "v")); // inc seq
snapshots.push_back(dbi->GetSnapshotForWriteConflictBoundary());
snapshots.push_back(db_->GetSnapshot());
ASSERT_OK(Put("k", "v")); // inc seq
snapshots.push_back(db_->GetSnapshot());
snapshots.push_back(db_->GetSnapshot()); // this should create a reference

{
InstrumentedMutexLock l(dbi->mutex());
auto seqs = dbi->snapshots().GetAll(&oldest_ww_snap);
ASSERT_EQ(seqs.size(), 4); // duplicates are not counted
ASSERT_EQ(oldest_ww_snap, first_ww_snap);
ASSERT_EQ(dbi->snapshots().count(),
6); // how many snapshots stored in SnapshotList
ASSERT_EQ(dbi->snapshots().logical_count(),
8); // how many snapshots in the system
}

for (auto s : snapshots) {
db_->ReleaseSnapshot(s);
}
}
#endif

class PinL0IndexAndFilterBlocksTest
: public DBTestBase,
public testing::WithParamInterface<std::tuple<bool, bool>> {
Expand Down
Loading