Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use manifest to persist pre-allocated seqnos #11995

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 36 additions & 13 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -796,10 +796,8 @@ Status DBImpl::StartPeriodicTaskScheduler() {
return s;
}

Status DBImpl::RegisterRecordSeqnoTimeWorker(bool from_db_open) {
if (!from_db_open) {
options_mutex_.AssertHeld();
}
Status DBImpl::RegisterRecordSeqnoTimeWorker(bool is_new_db) {
options_mutex_.AssertHeld();

uint64_t min_preserve_seconds = std::numeric_limits<uint64_t>::max();
uint64_t max_preserve_seconds = std::numeric_limits<uint64_t>::min();
Expand Down Expand Up @@ -853,7 +851,17 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(bool from_db_open) {
// 2) In any DB, any data written after setting preserve/preclude options
// must have a reasonable time estimate (so that we can accurately place
// the data), which means at least one entry in seqno_to_time_mapping_.
if (from_db_open && GetLatestSequenceNumber() == 0) {
//
// FIXME: We don't currently guarantee that if the first column family with
// that setting is added or configured after initial DB::Open but before
// the first user Write. Fixing this causes complications with the crash
// test because if DB starts without preserve/preclude option, does some
// user writes but all those writes are lost in crash, then re-opens with
// preserve/preclude option, it sees seqno==1 which looks like one of the
// user writes was recovered, when actually it was not.
bool last_seqno_zero = GetLatestSequenceNumber() == 0;
assert(!is_new_db || last_seqno_zero);
if (is_new_db && last_seqno_zero) {
// Pre-allocate seqnos and pre-populate historical mapping
assert(mapping_was_empty);

Expand All @@ -862,16 +870,31 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(bool from_db_open) {
versions_->SetLastAllocatedSequence(kMax);
versions_->SetLastPublishedSequence(kMax);
versions_->SetLastSequence(kMax);

// And record in manifest, to avoid going backwards in seqno on re-open
// (potentially with different options). Concurrency is simple because we
// are in DB::Open
{
InstrumentedMutexLock l(&mutex_);
VersionEdit edit;
edit.SetLastSequence(kMax);
s = versions_->LogAndApplyToDefaultColumnFamily(
{}, &edit, &mutex_, directories_.GetDbDir());
if (!s.ok() && versions_->io_status().IsIOError()) {
s = error_handler_.SetBGError(versions_->io_status(),
BackgroundErrorReason::kManifestWrite);
}
}

// Pre-populate mappings for reserved sequence numbers.
RecordSeqnoToTimeMapping(max_preserve_seconds);
} else if (mapping_was_empty) {
// To ensure there is at least one mapping, we need a non-zero sequence
// number. Outside of DB::Open, we have to be careful.
versions_->EnsureNonZeroSequence();
assert(GetLatestSequenceNumber() > 0);

// Ensure at least one mapping (or log a warning)
RecordSeqnoToTimeMapping(/*populate_historical_seconds=*/0);
if (!last_seqno_zero) {
// Ensure at least one mapping (or log a warning)
RecordSeqnoToTimeMapping(/*populate_historical_seconds=*/0);
} else {
// FIXME (see limitation described above)
}
}

s = periodic_task_scheduler_.Register(
Expand Down Expand Up @@ -6493,7 +6516,7 @@ void DBImpl::RecordSeqnoToTimeMapping(uint64_t populate_historical_seconds) {
assert(unix_time > populate_historical_seconds);
}
} else {
assert(seqno > 0);
// FIXME: assert(seqno > 0);
appended = seqno_to_time_mapping_.Append(seqno, unix_time);
}
}
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1395,6 +1395,7 @@ class DBImpl : public DB {
autovector<autovector<VersionEdit*>> edit_lists_;
// files_to_delete_ contains sst files
std::unordered_set<std::string> files_to_delete_;
bool is_new_db_ = false;
};

// Persist options to options file. Must be holding options_mutex_.
Expand Down Expand Up @@ -2168,7 +2169,7 @@ class DBImpl : public DB {
// Cancel scheduled periodic tasks
Status CancelPeriodicTaskScheduler();

Status RegisterRecordSeqnoTimeWorker(bool from_db_open);
Status RegisterRecordSeqnoTimeWorker(bool is_new_db);

void PrintStatistics();

Expand Down
5 changes: 3 additions & 2 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,8 @@ Status DBImpl::Recover(
uint64_t* recovered_seq, RecoveryContext* recovery_ctx) {
mutex_.AssertHeld();

bool is_new_db = false;
bool tmp_is_new_db = false;
bool& is_new_db = recovery_ctx ? recovery_ctx->is_new_db_ : tmp_is_new_db;
assert(db_lock_ == nullptr);
std::vector<std::string> files_in_dbname;
if (!read_only) {
Expand Down Expand Up @@ -2247,7 +2248,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
s = impl->StartPeriodicTaskScheduler();
}
if (s.ok()) {
s = impl->RegisterRecordSeqnoTimeWorker(/*from_db_open=*/true);
s = impl->RegisterRecordSeqnoTimeWorker(recovery_ctx.is_new_db_);
}
impl->options_mutex_.Unlock();
if (!s.ok()) {
Expand Down
24 changes: 24 additions & 0 deletions db/seqno_time_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,10 @@ TEST_P(SeqnoTimeTablePropTest, PrePopulateInDB) {
// interfere with the seqno-to-time mapping getting a starting entry.
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
} else {
// FIXME: currently, starting entry after CreateColumnFamily requires
// non-zero seqno
ASSERT_OK(Delete("blah"));
}

// Unfortunately, if we add a CF with preserve/preclude option after
Expand Down Expand Up @@ -899,13 +903,26 @@ TEST_P(SeqnoTimeTablePropTest, PrePopulateInDB) {
}
sttm = dbfull()->TEST_GetSeqnoToTimeMapping();
ASSERT_EQ(sttm.Size(), 0);
if (!with_write) {
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0);
}

ASSERT_OK(ReadOnlyReopen(track_options));
if (with_write) {
ASSERT_EQ(Get("foo"), "bar");
}
sttm = dbfull()->TEST_GetSeqnoToTimeMapping();
ASSERT_EQ(sttm.Size(), 0);
if (!with_write) {
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0);

// And even if we re-open read-write, we do not get pre-population,
// because that's only for new DBs.
Reopen(track_options);
sttm = dbfull()->TEST_GetSeqnoToTimeMapping();
ASSERT_EQ(sttm.Size(), 0);
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0);
}
}

// #### DB#5: Destroy and open with preserve/preclude option ####
Expand Down Expand Up @@ -987,6 +1004,13 @@ TEST_P(SeqnoTimeTablePropTest, PrePopulateInDB) {
// Oldest tracking time maps to first pre-allocated seqno
ASSERT_EQ(sttm.GetProximalSeqnoBeforeTime(start_time - kPreserveSecs), 1);

// Even after no writes and DB re-open without tracking options, sequence
// numbers should not go backward into those that were pre-allocated.
// (Future work: persist the mapping)
ReopenWithColumnFamilies({"default", "one"},
List({base_options, base_options}));
ASSERT_EQ(latest_seqno, db_->GetLatestSequenceNumber());

Close();
}

Expand Down
14 changes: 0 additions & 14 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7234,20 +7234,6 @@ Status VersionSet::VerifyFileMetadata(const ReadOptions& read_options,
return status;
}

void VersionSet::EnsureNonZeroSequence() {
uint64_t expected = 0;
// Update each from 0->1, in order, or abort if any becomes non-zero in
// parallel
if (last_allocated_sequence_.compare_exchange_strong(expected, 1)) {
if (last_published_sequence_.compare_exchange_strong(expected, 1)) {
(void)last_sequence_.compare_exchange_strong(expected, 1);
}
}
assert(last_allocated_sequence_.load() > 0);
assert(last_published_sequence_.load() > 0);
assert(last_sequence_.load() > 0);
}

ReactiveVersionSet::ReactiveVersionSet(
const std::string& dbname, const ImmutableDBOptions* _db_options,
const FileOptions& _file_options, Cache* table_cache,
Expand Down
3 changes: 0 additions & 3 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -1344,9 +1344,6 @@ class VersionSet {
last_allocated_sequence_.store(s, std::memory_order_seq_cst);
}

// Allocate a dummy sequence number as needed to ensure last is non-zero.
void EnsureNonZeroSequence();

// Note: memory_order_release must be sufficient
uint64_t FetchAddLastAllocatedSequence(uint64_t s) {
return last_allocated_sequence_.fetch_add(s, std::memory_order_seq_cst);
Expand Down