Skip to content

Commit

Permalink
Respect cutoff timestamp during flush
Browse files Browse the repository at this point in the history
  • Loading branch information
jowlyzhang committed Jul 11, 2023
1 parent 854eb76 commit a62b324
Show file tree
Hide file tree
Showing 17 changed files with 663 additions and 59 deletions.
58 changes: 58 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,27 @@ Status ColumnFamilyData::ValidateOptions(
}
}

const auto* ucmp = cf_options.comparator;
assert(ucmp);
if (ucmp->timestamp_size() > 0 &&
!cf_options.persist_user_defined_timestamps) {
if (db_options.atomic_flush) {
return Status::NotSupported(
"Not persisting user-defined timestamps feature is not supported"
"in combination with atomic flush.");
}
const char* comparator_name = cf_options.comparator->Name();
size_t name_size = strlen(comparator_name);
const char* suffix = ".u64ts";
size_t suffix_size = strlen(suffix);
if (strcmp(comparator_name + name_size - suffix_size, suffix) != 0) {
return Status::NotSupported(
"Not persisting user-defined timestamps"
"feature only support user-defined timestamps formatted as "
"uint64_t.");
}
}

if (cf_options.enable_blob_garbage_collection) {
if (cf_options.blob_garbage_collection_age_cutoff < 0.0 ||
cf_options.blob_garbage_collection_age_cutoff > 1.0) {
Expand Down Expand Up @@ -1515,6 +1536,43 @@ FSDirectory* ColumnFamilyData::GetDataDir(size_t path_id) const {
return data_dirs_[path_id].get();
}

bool ColumnFamilyData::ShouldPostponeFlushToRetainUDT(
uint64_t max_memtable_id) {
const Comparator* ucmp = user_comparator();
const size_t ts_sz = ucmp->timestamp_size();
if (ts_sz == 0 || ioptions_.persist_user_defined_timestamps) {
return false;
}
// If users set the `persist_user_defined_timestamps` flag to false, they
// should also set the `full_history_ts_low` flag to indicate the range of
// user-defined timestamps to retain in memory. Otherwise, we do not
// explicitly postpone flush to retain UDTs.
const std::string& full_history_ts_low = GetFullHistoryTsLow();
if (full_history_ts_low.empty()) {
return false;
}
#ifndef NDEBUG
Slice last_table_newest_udt;
#endif /* !NDEBUG */
for (const Slice& table_newest_udt :
imm()->GetTablesNewestUDT(max_memtable_id)) {
assert(table_newest_udt.size() == full_history_ts_low.size());
assert(last_table_newest_udt.empty() ||
ucmp->CompareTimestamp(table_newest_udt, last_table_newest_udt) >=
0);
// Checking the newest UDT contained in MemTable with ascending ID up to
// `max_memtable_id`. MemTable with bigger ID will have newer UDT, return
// immediately on finding the first MemTable that needs postponing.
if (ucmp->CompareTimestamp(table_newest_udt, full_history_ts_low) >= 0) {
return true;
}
#ifndef NDEBUG
last_table_newest_udt = table_newest_udt;
#endif /* !NDEBUG */
}
return false;
}

void ColumnFamilyData::RecoverEpochNumbers() {
assert(current_);
auto* vstorage = current_->storage_info();
Expand Down
6 changes: 6 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,12 @@ class ColumnFamilyData {
return full_history_ts_low_;
}

// REQUIRES: DB mutex held.
// Return true if flushing up to MemTables with ID `max_memtable_id`
// should be postponed to retain user-defined timestamps according to the
// user's setting. Called by background flush job.
bool ShouldPostponeFlushToRetainUDT(uint64_t max_memtable_id);

ThreadLocalPtr* TEST_GetLocalSV() { return local_sv_.get(); }
WriteBufferManager* write_buffer_mgr() { return write_buffer_manager_; }
std::shared_ptr<CacheReservationManager>
Expand Down
169 changes: 169 additions & 0 deletions db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "options/options_parser.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/comparator.h"
#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
Expand Down Expand Up @@ -63,6 +64,9 @@ class ColumnFamilyTestBase : public testing::Test {
db_options_.create_if_missing = true;
db_options_.fail_if_options_file_error = true;
db_options_.env = env_;
}

void SetUp() override {
EXPECT_OK(DestroyDB(dbname_, Options(db_options_, column_family_options_)));
}

Expand Down Expand Up @@ -3380,6 +3384,171 @@ TEST(ColumnFamilyTest, ValidateMemtableKVChecksumOption) {
ASSERT_OK(ColumnFamilyData::ValidateOptions(db_options, cf_options));
}

// Tests the flushing behavior of a column family to retain user-defined
// timestamp when `persist_user_defined_timestamp` is false.
class ColumnFamilyRetainUDTTest : public ColumnFamilyTestBase {
public:
ColumnFamilyRetainUDTTest() : ColumnFamilyTestBase(kLatestFormatVersion) {}

void SetUp() override {
column_family_options_.comparator =
test::BytewiseComparatorWithU64TsWrapper();
column_family_options_.persist_user_defined_timestamps = false;
ColumnFamilyTestBase::SetUp();
}

Status Put(int cf, const std::string& key, const std::string& ts,
const std::string& value) {
return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(ts),
Slice(value));
}
};

class TestTsComparator : public Comparator {
public:
TestTsComparator() : Comparator(8 /*ts_sz*/) {}

int Compare(const ROCKSDB_NAMESPACE::Slice& /*a*/,
const ROCKSDB_NAMESPACE::Slice& /*b*/) const override {
return 0;
}
const char* Name() const override { return "TestTs"; }
void FindShortestSeparator(
std::string* /*start*/,
const ROCKSDB_NAMESPACE::Slice& /*limit*/) const override {}
void FindShortSuccessor(std::string* /*key*/) const override {}
};

TEST_F(ColumnFamilyRetainUDTTest, SanityCheck) {
Open();
ColumnFamilyOptions cf_options;
cf_options.persist_user_defined_timestamps = false;
TestTsComparator test_comparator;
cf_options.comparator = &test_comparator;
ColumnFamilyHandle* handle;
// Not persisting user-defined timestamps feature only supports user-defined
// timestamps formatted as uint64_t.
ASSERT_TRUE(
db_->CreateColumnFamily(cf_options, "pikachu", &handle).IsNotSupported());

Destroy();
// Not persisting user-defined timestamps feature doesn't work in combination
// with atomic flush.
db_options_.atomic_flush = true;
ASSERT_TRUE(TryOpen({"default"}).IsNotSupported());
Close();
}

TEST_F(ColumnFamilyRetainUDTTest, FullHistoryTsLowNotSet) {
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
// Flush() defaults to wait, so OK status indicates flushing success.
ASSERT_OK(Flush(0));
Close();
}

TEST_F(ColumnFamilyRetainUDTTest, AllKeysExpired) {
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 2);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
// Flush() defaults to wait, so OK status indicates success flushing.
ASSERT_OK(Flush(0));
Close();
}

TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredUserTryAgain) {
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
// Flush() defaults to wait, wait cannot succeed because `full_history_ts_low`
// treats the Memtable as still worth postponing its flush to retain UDT.
ASSERT_TRUE(Flush(0).IsTryAgain());

// Try flush again after increasing full_history_ts_low succeeds.
cutoff_ts.clear();
PutFixed64(&cutoff_ts, 2);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(Flush(0));
Close();
}

TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredContinueFlush) {
Open();
std::string write_ts;
uint64_t cutoff_ts = 2;
PutFixed64(&write_ts, cutoff_ts);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], full_history_ts_low));
ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
ASSERT_OK(db_->SetOptions(cfh, {{"max_write_buffer_number", "1"}}));
// Although some user-defined timestamps haven't expired w.r.t
// full_history_ts_low, flush is continued to avoid write stall because
// max_write_buffer_number is 1.
ASSERT_OK(Flush(0));

std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
std::string expected_new_full_history_ts_low;
// The effective new full_history_ts_low should
// be the newest UDT in the flushed Memtables, plus 1 (the next immediately
// larger) udt.
PutFixed64(&expected_new_full_history_ts_low, cutoff_ts + 1);
ASSERT_EQ(expected_new_full_history_ts_low, effective_full_history_ts_low);
Close();
}

TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
FlushOptions flush_options;
// Setting wait to false, to test the FlushRequest reschedule code path, which
// is shared between manual flush and automatic flush.
flush_options.wait = false;
ASSERT_OK(db_->Flush(flush_options, handles_[0]));

// No sst files created because flush is rescheduled and won't continue
// until full_history_ts_low has a value that marks all user-defined
// timestamps in the Memtable as expired.
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
ASSERT_GE(metadata.size(), 0);

// Increase full_history_ts_low so that the rescheduled flush can continue
// with the actual flushing now.
cutoff_ts.clear();
PutFixed64(&cutoff_ts, 2);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
WaitForFlush(0);
db_->GetLiveFilesMetaData(&metadata);
ASSERT_GE(metadata.size(), 1);

std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
// Original flush request rescheduled to respect full_histor_ts_low, as a
// result, full_history_ts_low stays unchanged.
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
Close();
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
6 changes: 6 additions & 0 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5231,6 +5231,12 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) {
}

auto manual_compaction_thread = port::Thread([this]() {
// Write something to make the current Memtable non-empty, so an extra
// immutable Memtable will be created upon manual flush requested by
// CompactRange, triggering a write stall mode to be entered because of
// accumulation of write buffers due to manual flush.
Random compact_rnd(301);
ASSERT_OK(Put(Key(0), compact_rnd.RandomString(1024)));
CompactRangeOptions cro;
cro.allow_write_stall = false;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
Expand Down
7 changes: 7 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2091,6 +2091,7 @@ class DBImpl : public DB {
Env::Priority thread_pri);
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer, FlushReason* reason,
bool* flush_rescheduled_to_retain_udt,
Env::Priority thread_pri);

bool EnoughRoomForCompaction(ColumnFamilyData* cfd,
Expand All @@ -2103,6 +2104,12 @@ class DBImpl : public DB {
std::unique_ptr<TaskLimiterToken>* token,
LogBuffer* log_buffer);

// Return true if the `FlushRequest` can be rescheduled to retain the UDT.
// Only true if there are user-defined timestamps in the involved MemTables
// with newer than cutoff timestamp `full_history_ts_low` and not flushing
// immediately will not cause entering write stall mode.
bool ShouldRescheduleFlushRequestToRetainUDT(const FlushRequest& flush_req);

// Schedule background tasks
Status StartPeriodicTaskScheduler();

Expand Down
Loading

0 comments on commit a62b324

Please sign in to comment.