Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jowlyzhang committed Jul 24, 2023
1 parent 126b916 commit a80d373
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 39 deletions.
3 changes: 2 additions & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1389,7 +1389,8 @@ Status ColumnFamilyData::ValidateOptions(
size_t name_size = strlen(comparator_name);
const char* suffix = ".u64ts";
size_t suffix_size = strlen(suffix);
if (strcmp(comparator_name + name_size - suffix_size, suffix) != 0) {
if (name_size <= suffix_size ||
strcmp(comparator_name + name_size - suffix_size, suffix) != 0) {
return Status::NotSupported(
"Not persisting user-defined timestamps"
"feature only support user-defined timestamps formatted as "
Expand Down
28 changes: 0 additions & 28 deletions db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3482,34 +3482,6 @@ TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredUserTryAgain) {
Close();
}

TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredContinueFlush) {
Open();
std::string write_ts;
uint64_t cutoff_ts = 2;
PutFixed64(&write_ts, cutoff_ts);
ASSERT_OK(Put(0, "foo", write_ts, "v1"));
std::string full_history_ts_low;
PutFixed64(&full_history_ts_low, 1);
ASSERT_OK(db_->IncreaseFullHistoryTsLow(handles_[0], full_history_ts_low));
ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
ASSERT_OK(db_->SetOptions(cfh, {{"max_write_buffer_number", "1"}}));
// Although some user-defined timestamps haven't expired w.r.t
// full_history_ts_low, flush is continued to avoid write stall because
// max_write_buffer_number is 1.
ASSERT_OK(Flush(0));

std::string effective_full_history_ts_low;
ASSERT_OK(
db_->GetFullHistoryTsLow(handles_[0], &effective_full_history_ts_low));
std::string expected_new_full_history_ts_low;
// The effective new full_history_ts_low should
// be the newest UDT in the flushed Memtables, plus 1 (the next immediately
// larger) udt.
PutFixed64(&expected_new_full_history_ts_low, cutoff_ts + 1);
ASSERT_EQ(expected_new_full_history_ts_low, effective_full_history_ts_low);
Close();
}

TEST_F(ColumnFamilyRetainUDTTest, NotAllKeysExpiredFlushRescheduled) {
Open();
std::string write_ts;
Expand Down
20 changes: 13 additions & 7 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,15 @@ bool DBImpl::ShouldRescheduleFlushRequestToRetainUDT(
// Write stall entered because of the accumulation of write buffers can be
// alleviated if we continue with the flush instead of postponing it.
const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
const auto* vstorage = cfd->current()->storage_info();

int mem_to_flush = cfd->mem()->ApproximateMemoryUsage() >=
mutable_cf_options.write_buffer_size * 0.5
? 1
: 0;
WriteStallCondition write_stall =
ColumnFamilyData::GetWriteStallConditionAndCause(
cfd->imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
vstorage->estimated_compaction_needed_bytes(), mutable_cf_options,
cfd->imm()->NumNotFlushed() + mem_to_flush, /*num_l0_files=*/0,
/*num_compaction_needed_bytes=*/0, mutable_cf_options,
*cfd->ioptions())
.first;
if (write_stall != WriteStallCondition::kNormal) {
Expand Down Expand Up @@ -2238,9 +2242,8 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
if (s.ok()) {
if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
!cached_recoverable_state_empty_.load()) {
FlushRequest req{flush_reason, {{cfd, flush_memtable_id}}};
if (flush_options.wait &&
ShouldRescheduleFlushRequestToRetainUDT(req)) {
if (flush_options.wait && !cfd->IsDropped() &&
cfd->ShouldPostponeFlushToRetainUDT(flush_memtable_id)) {
std::ostringstream oss;
oss << "Flush cannot continue until all contained user-defined "
"timestamps are above full_history_ts_low. Increase"
Expand All @@ -2255,6 +2258,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
}
return Status::TryAgain(oss.str());
}
FlushRequest req{flush_reason, {{cfd, flush_memtable_id}}};
flush_reqs.emplace_back(std::move(req));
memtable_ids_to_wait.emplace_back(cfd->imm()->GetLatestMemTableID());
}
Expand Down Expand Up @@ -3026,7 +3030,9 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1);
ColumnFamilyData* cfd =
flush_req.cfd_to_max_mem_id_to_persist.begin()->first;
cfd->UnrefAndTryDelete();
if (cfd->UnrefAndTryDelete()) {
return Status::OK();
}
ROCKS_LOG_BUFFER(log_buffer,
"FlushRequest for column family %s is re-scheduled to "
"retain user-defined timestamps.",
Expand Down
4 changes: 2 additions & 2 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ void FlushJob::PickMemTable() {

// Track effective cutoff user-defined timestamp during flush if
// user-defined timestamps can be stripped.
GetEffectiveCutoffUDTForPickedMemTables();
PopulateEffectiveCutoffUDTForPickedMemTables();

ReportFlushInputSize(mems_);

Expand Down Expand Up @@ -1105,7 +1105,7 @@ std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
return info;
}

void FlushJob::GetEffectiveCutoffUDTForPickedMemTables() {
void FlushJob::PopulateEffectiveCutoffUDTForPickedMemTables() {
db_mutex_->AssertHeld();
assert(pick_memtable_called);
const auto* ucmp = cfd_->internal_comparator().user_comparator();
Expand Down
2 changes: 1 addition & 1 deletion db/flush_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class FlushJob {
// removed because of flush, and use it to increase `full_history_ts_low` if
// the effective cutoff timestamp is newer. See
// `MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT` for details.
void GetEffectiveCutoffUDTForPickedMemTables();
void PopulateEffectiveCutoffUDTForPickedMemTables();

Status MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT();

Expand Down

0 comments on commit a80d373

Please sign in to comment.