Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Respect cutoff timestamp during flush #11599

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,33 @@ Status ColumnFamilyData::ValidateOptions(
}
}

const auto* ucmp = cf_options.comparator;
assert(ucmp);
if (ucmp->timestamp_size() > 0 &&
!cf_options.persist_user_defined_timestamps) {
if (db_options.atomic_flush) {
return Status::NotSupported(
"Not persisting user-defined timestamps feature is not supported"
"in combination with atomic flush.");
}
if (db_options.allow_concurrent_memtable_write) {
return Status::NotSupported(
"Not persisting user-defined timestamps feature is not supported"
" in combination with concurrent memtable write.");
}
const char* comparator_name = cf_options.comparator->Name();
size_t name_size = strlen(comparator_name);
const char* suffix = ".u64ts";
size_t suffix_size = strlen(suffix);
if (name_size <= suffix_size ||
strcmp(comparator_name + name_size - suffix_size, suffix) != 0) {
return Status::NotSupported(
"Not persisting user-defined timestamps"
"feature only support user-defined timestamps formatted as "
"uint64_t.");
}
}

if (cf_options.enable_blob_garbage_collection) {
if (cf_options.blob_garbage_collection_age_cutoff < 0.0 ||
cf_options.blob_garbage_collection_age_cutoff > 1.0) {
Expand Down Expand Up @@ -1515,6 +1542,43 @@ FSDirectory* ColumnFamilyData::GetDataDir(size_t path_id) const {
return data_dirs_[path_id].get();
}

bool ColumnFamilyData::ShouldPostponeFlushToRetainUDT(
uint64_t max_memtable_id) {
const Comparator* ucmp = user_comparator();
const size_t ts_sz = ucmp->timestamp_size();
if (ts_sz == 0 || ioptions_.persist_user_defined_timestamps) {
return false;
}
// If users set the `persist_user_defined_timestamps` flag to false, they
// should also set the `full_history_ts_low` flag to indicate the range of
// user-defined timestamps to retain in memory. Otherwise, we do not
// explicitly postpone flush to retain UDTs.
const std::string& full_history_ts_low = GetFullHistoryTsLow();
if (full_history_ts_low.empty()) {
return false;
}
#ifndef NDEBUG
Slice last_table_newest_udt;
#endif /* !NDEBUG */
for (const Slice& table_newest_udt :
imm()->GetTablesNewestUDT(max_memtable_id)) {
assert(table_newest_udt.size() == full_history_ts_low.size());
assert(last_table_newest_udt.empty() ||
ucmp->CompareTimestamp(table_newest_udt, last_table_newest_udt) >=
0);
// Checking the newest UDT contained in MemTable with ascending ID up to
// `max_memtable_id`. MemTable with bigger ID will have newer UDT, return
// immediately on finding the first MemTable that needs postponing.
if (ucmp->CompareTimestamp(table_newest_udt, full_history_ts_low) >= 0) {
return true;
}
#ifndef NDEBUG
last_table_newest_udt = table_newest_udt;
#endif /* !NDEBUG */
}
return false;
}

void ColumnFamilyData::RecoverEpochNumbers() {
assert(current_);
auto* vstorage = current_->storage_info();
Expand Down
6 changes: 6 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,12 @@ class ColumnFamilyData {
return full_history_ts_low_;
}

// REQUIRES: DB mutex held.
// Return true if flushing up to MemTables with ID `max_memtable_id`
// should be postponed to retain user-defined timestamps according to the
// user's setting. Called by background flush job.
bool ShouldPostponeFlushToRetainUDT(uint64_t max_memtable_id);

ThreadLocalPtr* TEST_GetLocalSV() { return local_sv_.get(); }
WriteBufferManager* write_buffer_mgr() { return write_buffer_manager_; }
std::shared_ptr<CacheReservationManager>
Expand Down
203 changes: 203 additions & 0 deletions db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "options/options_parser.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/comparator.h"
#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
Expand Down Expand Up @@ -63,6 +64,9 @@ class ColumnFamilyTestBase : public testing::Test {
db_options_.create_if_missing = true;
db_options_.fail_if_options_file_error = true;
db_options_.env = env_;
}

void SetUp() override {
EXPECT_OK(DestroyDB(dbname_, Options(db_options_, column_family_options_)));
}

Expand Down Expand Up @@ -3380,6 +3384,205 @@ TEST(ColumnFamilyTest, ValidateMemtableKVChecksumOption) {
ASSERT_OK(ColumnFamilyData::ValidateOptions(db_options, cf_options));
}

// Tests the flushing behavior of a column family to retain user-defined
// timestamp when `persist_user_defined_timestamp` is false.
class ColumnFamilyRetainUDTTest : public ColumnFamilyTestBase {
public:
ColumnFamilyRetainUDTTest() : ColumnFamilyTestBase(kLatestFormatVersion) {}

void SetUp() override {
db_options_.allow_concurrent_memtable_write = false;
column_family_options_.comparator =
test::BytewiseComparatorWithU64TsWrapper();
column_family_options_.persist_user_defined_timestamps = false;
ColumnFamilyTestBase::SetUp();
}

Status Put(int cf, const std::string& key, const std::string& ts,
const std::string& value) {
return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(ts),
Slice(value));
}
};

class TestTsComparator : public Comparator {
public:
TestTsComparator() : Comparator(8 /*ts_sz*/) {}

int Compare(const ROCKSDB_NAMESPACE::Slice& /*a*/,
const ROCKSDB_NAMESPACE::Slice& /*b*/) const override {
return 0;
}
const char* Name() const override { return "TestTs"; }
void FindShortestSeparator(
std::string* /*start*/,
const ROCKSDB_NAMESPACE::Slice& /*limit*/) const override {}
void FindShortSuccessor(std::string* /*key*/) const override {}
};

TEST_F(ColumnFamilyRetainUDTTest, SanityCheck) {
Open();
ColumnFamilyOptions cf_options;
cf_options.persist_user_defined_timestamps = false;
TestTsComparator test_comparator;
cf_options.comparator = &test_comparator;
ColumnFamilyHandle* handle;
// Not persisting user-defined timestamps feature only supports user-defined
// timestamps formatted as uint64_t.
ASSERT_TRUE(
db_->CreateColumnFamily(cf_options, "pikachu", &handle).IsNotSupported());

Destroy();
// Not persisting user-defined timestamps feature doesn't work in combination
// with atomic flush.
db_options_.atomic_flush = true;
ASSERT_TRUE(TryOpen({"default"}).IsNotSupported());

// Not persisting user-defined timestamps feature doesn't work in combination
// with concurrent memtable write.
db_options_.atomic_flush = false;
db_options_.allow_concurrent_memtable_write = true;
ASSERT_TRUE(TryOpen({"default"}).IsNotSupported());
Close();
}

TEST_F(ColumnFamilyRetainUDTTest, FullHistoryTsLowNotSet) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto reschedule_count = *static_cast<int*>(arg);
ASSERT_EQ(1, reschedule_count);
});

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
// No `full_history_ts_low` explicitly set by user, flush is continued
// without checking if its UDTs expired.
ASSERT_OK(Flush(0));

// After flush, `full_history_ts_low` should be automatically advanced to
// the effective cutoff timestamp: write_ts + 1
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 2);
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_F(ColumnFamilyRetainUDTTest, AllKeysExpired) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto reschedule_count = *static_cast<int*>(arg);
ASSERT_EQ(1, reschedule_count);
});

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 3);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
// All keys expired w.r.t the configured `full_history_ts_low`, flush continue
// without the need for a re-schedule.
ASSERT_OK(Flush(0));

// `full_history_ts_low` stays unchanged after flush.
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto reschedule_count = *static_cast<int*>(arg);
ASSERT_EQ(1, reschedule_count);
});

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string cutoff_ts;
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
PutFixed64(&cutoff_ts, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
ASSERT_OK(db_->SetOptions(handles_[0], {{"max_write_buffer_number", "1"}}));
// Not all keys expired, but flush is continued without a re-schedule because
// of risk of write stall.
ASSERT_OK(Flush(0));

// After flush, `full_history_ts_low` should be automatically advanced to
// the effective cutoff timestamp: write_ts + 1
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));

cutoff_ts.clear();
PutFixed64(&cutoff_ts, 2);
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
std::string cutoff_ts;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::AfterRetainUDTReschedule:cb", [&](void* /*arg*/) {
// Increasing full_history_ts_low so all keys expired after the initial
// FlushRequest is rescheduled
cutoff_ts.clear();
PutFixed64(&cutoff_ts, 3);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto reschedule_count = *static_cast<int*>(arg);
ASSERT_EQ(2, reschedule_count);
});
SyncPoint::GetInstance()->EnableProcessing();

Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
PutFixed64(&cutoff_ts, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
// Not all keys expired, and there is no risk of write stall. Flush is
// rescheduled. The actual flush happens after `full_history_ts_low` is
// increased to mark all keys expired.
ASSERT_OK(Flush(0));

std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
// `full_history_ts_low` stays unchanged.
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
6 changes: 6 additions & 0 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5231,6 +5231,12 @@ TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) {
}

auto manual_compaction_thread = port::Thread([this]() {
// Write something to make the current Memtable non-empty, so an extra
// immutable Memtable will be created upon manual flush requested by
// CompactRange, triggering a write stall mode to be entered because of
// accumulation of write buffers due to manual flush.
Random compact_rnd(301);
ASSERT_OK(Put(Key(0), compact_rnd.RandomString(1024)));
CompactRangeOptions cro;
cro.allow_write_stall = false;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
Expand Down
11 changes: 11 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2063,6 +2063,10 @@ class DBImpl : public DB {
// flush is considered complete.
std::unordered_map<ColumnFamilyData*, uint64_t>
cfd_to_max_mem_id_to_persist;

#ifndef NDEBUG
int reschedule_count = 1;
#endif /* !NDEBUG */
};

void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
Expand Down Expand Up @@ -2091,6 +2095,7 @@ class DBImpl : public DB {
Env::Priority thread_pri);
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer, FlushReason* reason,
bool* flush_rescheduled_to_retain_udt,
Env::Priority thread_pri);

bool EnoughRoomForCompaction(ColumnFamilyData* cfd,
Expand All @@ -2103,6 +2108,12 @@ class DBImpl : public DB {
std::unique_ptr<TaskLimiterToken>* token,
LogBuffer* log_buffer);

// Return true if the `FlushRequest` can be rescheduled to retain the UDT.
// Only true if there are user-defined timestamps in the involved MemTables
// with newer than cutoff timestamp `full_history_ts_low` and not flushing
// immediately will not cause entering write stall mode.
bool ShouldRescheduleFlushRequestToRetainUDT(const FlushRequest& flush_req);

// Schedule background tasks
Status StartPeriodicTaskScheduler();

Expand Down
Loading
Loading