Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jowlyzhang committed Jul 25, 2023
1 parent a139f22 commit 445ebb2
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 72 deletions.
128 changes: 115 additions & 13 deletions db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3440,49 +3440,150 @@ TEST_F(ColumnFamilyRetainUDTTest, SanityCheck) {
}

TEST_F(ColumnFamilyRetainUDTTest, FullHistoryTsLowNotSet) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto reschedule_count = *static_cast<int*>(arg);
ASSERT_EQ(1, reschedule_count);
});

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
// Flush() defaults to wait, so OK status indicates flushing success.
// No `full_history_ts_low` explicitly set by user, flush is continued
// without checking if its UDTs expired.
ASSERT_OK(Flush(0));

// After flush, `full_history_ts_low` should be automatically advanced to
// the effective cutoff timestamp: write_ts + 1
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 2);
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_F(ColumnFamilyRetainUDTTest, AllKeysExpired) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto reschedule_count = *static_cast<int*>(arg);
ASSERT_EQ(1, reschedule_count);
});

SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 2);
PutFixed64(&cutoff_ts, 3);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
// Flush() defaults to wait, so OK status indicates success flushing.
// All keys expired w.r.t the configured `full_history_ts_low`, flush continue
// without the need for a re-schedule.
ASSERT_OK(Flush(0));

// `full_history_ts_low` stays unchanged after flush.
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushToAvoidWriteStall) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto reschedule_count = *static_cast<int*>(arg);
ASSERT_EQ(1, reschedule_count);
});

TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredUserTryAgain) {
SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string cutoff_ts;
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
std::string cutoff_ts;
PutFixed64(&cutoff_ts, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
// Flush() defaults to wait, wait cannot succeed because `full_history_ts_low`
// treats the Memtable as still worth postponing its flush to retain UDT.
ASSERT_TRUE(Flush(0).IsTryAgain());
ASSERT_OK(db_->SetOptions(handles_[0], {{"max_write_buffer_number", "1"}}));
// Not all keys expired, but flush is continued without a re-schedule because
// of risk of write stall.
ASSERT_OK(Flush(0));

// After flush, `full_history_ts_low` should be automatically advanced to
// the effective cutoff timestamp: write_ts + 1
std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));

// Try flush again after increasing full_history_ts_low succeeds.
cutoff_ts.clear();
PutFixed64(&cutoff_ts, 2);
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushAndWait) {
std::string cutoff_ts;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::AfterRetainUDTReschedule:cb", [&](void* /*arg*/) {
// Increasing full_history_ts_low so all keys expired after the initial
// FlushRequest is rescheduled
cutoff_ts.clear();
PutFixed64(&cutoff_ts, 3);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto reschedule_count = *static_cast<int*>(arg);
ASSERT_EQ(2, reschedule_count);
});
SyncPoint::GetInstance()->EnableProcessing();

Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
PutFixed64(&cutoff_ts, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
// Not all keys expired, and there is no risk of write stall. Flush is
// rescheduled. The actual flush happens after `full_history_ts_low` is
// increased to mark all keys expired.
ASSERT_OK(Flush(0));

std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
// `full_history_ts_low` stays unchanged.
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushAndNoWait) {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundFlush:CheckFlushRequest:cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto reschedule_count = *static_cast<int*>(arg);
ASSERT_EQ(2, reschedule_count);
});
SyncPoint::GetInstance()->EnableProcessing();
Open();
std::string write_ts;
PutFixed64(&write_ts, 1);
Expand All @@ -3491,8 +3592,6 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
PutFixed64(&cutoff_ts, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
FlushOptions flush_options;
// Setting wait to false, to test the FlushRequest reschedule code path, which
// is shared between manual flush and automatic flush.
flush_options.wait = false;
ASSERT_OK(db_->Flush(flush_options, handles_[0]));

Expand All @@ -3506,7 +3605,7 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
// Increase full_history_ts_low so that the rescheduled flush can continue
// with the actual flushing now.
cutoff_ts.clear();
PutFixed64(&cutoff_ts, 2);
PutFixed64(&cutoff_ts, 3);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], cutoff_ts));
WaitForFlush(0);
db_->GetLiveFilesMetaData(&metadata);
Expand All @@ -3519,6 +3618,9 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
// result, full_history_ts_low stays unchanged.
ASSERT_EQ(cutoff_ts, effective_full_history_ts_low);
Close();

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

} // namespace ROCKSDB_NAMESPACE
Expand Down
4 changes: 4 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2063,6 +2063,10 @@ class DBImpl : public DB {
// flush is considered complete.
std::unordered_map<ColumnFamilyData*, uint64_t>
cfd_to_max_mem_id_to_persist;

#ifndef NDEBUG
int reschedule_count = 1;
#endif /* !NDEBUG */
};

void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
Expand Down
25 changes: 7 additions & 18 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ bool DBImpl::ShouldRescheduleFlushRequestToRetainUDT(
// alleviated if we continue with the flush instead of postponing it.
const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();

int mem_to_flush = cfd->mem()->ApproximateMemoryUsage() >=
mutable_cf_options.write_buffer_size * 0.5
// Taking the status of the active Memtable into consideration so that we are
// not just checking if DB is currently already in write stall mode.
int mem_to_flush = cfd->mem()->ApproximateMemoryUsageFast() >=
cfd->mem()->write_buffer_size() / 2
? 1
: 0;
WriteStallCondition write_stall =
Expand Down Expand Up @@ -2242,22 +2244,6 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
if (s.ok()) {
if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
!cached_recoverable_state_empty_.load()) {
if (flush_options.wait && !cfd->IsDropped() &&
cfd->ShouldPostponeFlushToRetainUDT(flush_memtable_id)) {
std::ostringstream oss;
oss << "Flush cannot continue until all contained user-defined "
"timestamps are above full_history_ts_low. Increase"
"full_history_ts_low and try again. Or set FlushOptions.wait"
"to false and let background flush jobs automatically"
"continue when it detects full_history_ts_low increase.";
if (needs_to_join_write_thread) {
write_thread_.ExitUnbatched(&w);
if (two_write_queues_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
}
return Status::TryAgain(oss.str());
}
FlushRequest req{flush_reason, {{cfd, flush_memtable_id}}};
flush_reqs.emplace_back(std::move(req));
memtable_ids_to_wait.emplace_back(cfd->imm()->GetLatestMemTableID());
Expand Down Expand Up @@ -3079,6 +3065,8 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
// `unscheduled_flushes_`. So it's sufficient to make each `BackgroundFlush`
// handle one `FlushRequest` and each have a Status returned.
if (!bg_flush_args.empty() || !column_families_not_to_flush.empty()) {
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundFlush:CheckFlushRequest:cb",
const_cast<int*>(&flush_req.reschedule_count));
break;
}
}
Expand Down Expand Up @@ -3147,6 +3135,7 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
if (s.IsTryAgain() && flush_rescheduled_to_retain_udt) {
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock();
TEST_SYNC_POINT_CALLBACK("DBImpl::AfterRetainUDTReschedule:cb", nullptr);
immutable_db_options_.clock->SleepForMicroseconds(
100000); // prevent hot loop
mutex_.Lock();
Expand Down
4 changes: 2 additions & 2 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ void FlushJob::PickMemTable() {

// Track effective cutoff user-defined timestamp during flush if
// user-defined timestamps can be stripped.
PopulateEffectiveCutoffUDTForPickedMemTables();
GetEffectiveCutoffUDTForPickedMemTables();

ReportFlushInputSize(mems_);

Expand Down Expand Up @@ -1105,7 +1105,7 @@ std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
return info;
}

void FlushJob::PopulateEffectiveCutoffUDTForPickedMemTables() {
void FlushJob::GetEffectiveCutoffUDTForPickedMemTables() {
db_mutex_->AssertHeld();
assert(pick_memtable_called);
const auto* ucmp = cfd_->internal_comparator().user_comparator();
Expand Down
2 changes: 1 addition & 1 deletion db/flush_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class FlushJob {
// removed because of flush, and use it to increase `full_history_ts_low` if
// the effective cutoff timestamp is newer. See
// `MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT` for details.
void PopulateEffectiveCutoffUDTForPickedMemTables();
void GetEffectiveCutoffUDTForPickedMemTables();

Status MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT();

Expand Down
Loading

0 comments on commit 445ebb2

Please sign in to comment.