Skip to content

Commit

Permalink
Use manifest to persist pre-allocated seqnos (#11995)
Browse files Browse the repository at this point in the history
Summary:
... and other fixes for crash test after #11922.
* When pre-allocating sequence numbers for establishing a time history, record that last sequence number in the manifest so that it is (most likely) restored on recovery even if no user writes were made or were recovered (e.g. no WAL).
* When pre-allocating sequence numbers for establishing a time history, only do this for actually new DBs.
* Remove the feature that ensures non-zero sequence number on creating the first column family with preserve/preclude option after initial DB::Open. Until fixed in a way compatible with the crash test, this creates a gap where some data written with active preserve/preclude option won't have a known associated time.

Together, these ensure we don't upset the crash test by manipulating sequence numbers after initial DB creation (esp when re-opening with different options). (The crash test expects that the seqno after re-open corresponds to a known point in time from previous crash test operation, matching an expected DB state.)

Follow-up work:
* Re-fill the gap to ensure all data written under preserve/preclude settings have a known time estimate.

Pull Request resolved: #11995

Test Plan:
Added to unit test SeqnoTimeTablePropTest.PrePopulateInDB

Verified fixes two crash test scenarios:
## 1st reproducer
First apply
```
 diff --git a/db_stress_tool/expected_state.cc b/db_stress_tool/expected_state.cc
index b483e154c..ef63b8d6c 100644
 --- a/db_stress_tool/expected_state.cc
+++ b/db_stress_tool/expected_state.cc
@@ -333,6 +333,7 @@ Status FileExpectedStateManager::SaveAtAndAfter(DB* db) {
     s = NewFileTraceWriter(Env::Default(), soptions, trace_file_path,
                            &trace_writer);
   }
+  if (getenv("CRASH")) assert(false);
   if (s.ok()) {
     TraceOptions trace_opts;
     trace_opts.filter |= kTraceFilterGet;
```

Then
```
mkdir -p /dev/shm/rocksdb_test/rocksdb_crashtest_expected
mkdir -p /dev/shm/rocksdb_test/rocksdb_crashtest_whitebox
rm -rf /dev/shm/rocksdb_test/rocksdb_crashtest_*/*
CRASH=1 ./db_stress --db=/dev/shm/rocksdb_test/rocksdb_crashtest_whitebox --expected_values_dir=/dev/shm/rocksdb_test/rocksdb_crashtest_expected --destroy_db_initially=1 --manual_wal_flush_one_in=1000000 --clear_column_family_one_in=0 --preserve_internal_time_seconds=36000
./db_stress --db=/dev/shm/rocksdb_test/rocksdb_crashtest_whitebox --expected_values_dir=/dev/shm/rocksdb_test/rocksdb_crashtest_expected --destroy_db_initially=0 --manual_wal_flush_one_in=1000000 --clear_column_family_one_in=0 --preserve_internal_time_seconds=0
```

Without the fix you get
```
...
DB path: [/dev/shm/rocksdb_test/rocksdb_crashtest_whitebox]
(Re-)verified 34 unique IDs
Error restoring historical expected values: Corruption: DB is older than any restorable expected state
```

## 2nd reproducer
First apply
```
 diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc
index 62ddead7b..f2654980f 100644
 --- a/db_stress_tool/db_stress_test_base.cc
+++ b/db_stress_tool/db_stress_test_base.cc
@@ -1126,6 +1126,7 @@ void StressTest::OperateDb(ThreadState* thread) {
         // OPERATION write
         TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys,
                 value);
+        if (getenv("CRASH")) assert(false);
       } else if (prob_op < del_bound) {
         assert(write_bound <= prob_op);
         // OPERATION delete
```

Then
```
rm -rf /dev/shm/rocksdb_test/rocksdb_crashtest_*/*
CRASH=1 ./db_stress --db=/dev/shm/rocksdb_test/rocksdb_crashtest_whitebox --expected_values_dir=/dev/shm/rocksdb_test/rocksdb_crashtest_expected --destroy_db_initially=1 --manual_wal_flush_one_in=1000000 --clear_column_family_one_in=0 --disable_wal=1 --reopen=0 --preserve_internal_time_seconds=0
./db_stress --db=/dev/shm/rocksdb_test/rocksdb_crashtest_whitebox --expected_values_dir=/dev/shm/rocksdb_test/rocksdb_crashtest_expected --destroy_db_initially=0 --manual_wal_flush_one_in=1000000 --clear_column_family_one_in=0 --disable_wal=1 --reopen=0 --preserve_internal_time_seconds=3600
```

Without the fix you get
```
DB path: [/dev/shm/rocksdb_test/rocksdb_crashtest_whitebox]
(Re-)verified 34 unique IDs
db_stress: db_stress_tool/expected_state.cc:380: virtual rocksdb::{anonymous}::ExpectedStateTraceRecordHandler::~
ExpectedStateTraceRecordHandler(): Assertion `IsDone()' failed.
```

Reviewed By: jowlyzhang

Differential Revision: D50533346

Pulled By: pdillinger

fbshipit-source-id: 1056be45c5b9e537c8c601b28c4b27431a782477
  • Loading branch information
pdillinger authored and facebook-github-bot committed Oct 23, 2023
1 parent 543191f commit 4155087
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 33 deletions.
49 changes: 36 additions & 13 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -796,10 +796,8 @@ Status DBImpl::StartPeriodicTaskScheduler() {
return s;
}

Status DBImpl::RegisterRecordSeqnoTimeWorker(bool from_db_open) {
if (!from_db_open) {
options_mutex_.AssertHeld();
}
Status DBImpl::RegisterRecordSeqnoTimeWorker(bool is_new_db) {
options_mutex_.AssertHeld();

uint64_t min_preserve_seconds = std::numeric_limits<uint64_t>::max();
uint64_t max_preserve_seconds = std::numeric_limits<uint64_t>::min();
Expand Down Expand Up @@ -853,7 +851,17 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(bool from_db_open) {
// 2) In any DB, any data written after setting preserve/preclude options
// must have a reasonable time estimate (so that we can accurately place
// the data), which means at least one entry in seqno_to_time_mapping_.
if (from_db_open && GetLatestSequenceNumber() == 0) {
//
// FIXME: We don't currently guarantee that if the first column family with
// that setting is added or configured after initial DB::Open but before
// the first user Write. Fixing this causes complications with the crash
// test because if DB starts without preserve/preclude option, does some
// user writes but all those writes are lost in crash, then re-opens with
// preserve/preclude option, it sees seqno==1 which looks like one of the
// user writes was recovered, when actually it was not.
bool last_seqno_zero = GetLatestSequenceNumber() == 0;
assert(!is_new_db || last_seqno_zero);
if (is_new_db && last_seqno_zero) {
// Pre-allocate seqnos and pre-populate historical mapping
assert(mapping_was_empty);

Expand All @@ -862,16 +870,31 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker(bool from_db_open) {
versions_->SetLastAllocatedSequence(kMax);
versions_->SetLastPublishedSequence(kMax);
versions_->SetLastSequence(kMax);

// And record in manifest, to avoid going backwards in seqno on re-open
// (potentially with different options). Concurrency is simple because we
// are in DB::Open
{
InstrumentedMutexLock l(&mutex_);
VersionEdit edit;
edit.SetLastSequence(kMax);
s = versions_->LogAndApplyToDefaultColumnFamily(
{}, &edit, &mutex_, directories_.GetDbDir());
if (!s.ok() && versions_->io_status().IsIOError()) {
s = error_handler_.SetBGError(versions_->io_status(),
BackgroundErrorReason::kManifestWrite);
}
}

// Pre-populate mappings for reserved sequence numbers.
RecordSeqnoToTimeMapping(max_preserve_seconds);
} else if (mapping_was_empty) {
// To ensure there is at least one mapping, we need a non-zero sequence
// number. Outside of DB::Open, we have to be careful.
versions_->EnsureNonZeroSequence();
assert(GetLatestSequenceNumber() > 0);

// Ensure at least one mapping (or log a warning)
RecordSeqnoToTimeMapping(/*populate_historical_seconds=*/0);
if (!last_seqno_zero) {
// Ensure at least one mapping (or log a warning)
RecordSeqnoToTimeMapping(/*populate_historical_seconds=*/0);
} else {
// FIXME (see limitation described above)
}
}

s = periodic_task_scheduler_.Register(
Expand Down Expand Up @@ -6493,7 +6516,7 @@ void DBImpl::RecordSeqnoToTimeMapping(uint64_t populate_historical_seconds) {
assert(unix_time > populate_historical_seconds);
}
} else {
assert(seqno > 0);
// FIXME: assert(seqno > 0);
appended = seqno_to_time_mapping_.Append(seqno, unix_time);
}
}
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1395,6 +1395,7 @@ class DBImpl : public DB {
autovector<autovector<VersionEdit*>> edit_lists_;
// files_to_delete_ contains sst files
std::unordered_set<std::string> files_to_delete_;
bool is_new_db_ = false;
};

// Persist options to options file. Must be holding options_mutex_.
Expand Down Expand Up @@ -2168,7 +2169,7 @@ class DBImpl : public DB {
// Cancel scheduled periodic tasks
Status CancelPeriodicTaskScheduler();

Status RegisterRecordSeqnoTimeWorker(bool from_db_open);
Status RegisterRecordSeqnoTimeWorker(bool is_new_db);

void PrintStatistics();

Expand Down
5 changes: 3 additions & 2 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,8 @@ Status DBImpl::Recover(
uint64_t* recovered_seq, RecoveryContext* recovery_ctx) {
mutex_.AssertHeld();

bool is_new_db = false;
bool tmp_is_new_db = false;
bool& is_new_db = recovery_ctx ? recovery_ctx->is_new_db_ : tmp_is_new_db;
assert(db_lock_ == nullptr);
std::vector<std::string> files_in_dbname;
if (!read_only) {
Expand Down Expand Up @@ -2247,7 +2248,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
s = impl->StartPeriodicTaskScheduler();
}
if (s.ok()) {
s = impl->RegisterRecordSeqnoTimeWorker(/*from_db_open=*/true);
s = impl->RegisterRecordSeqnoTimeWorker(recovery_ctx.is_new_db_);
}
impl->options_mutex_.Unlock();
if (!s.ok()) {
Expand Down
24 changes: 24 additions & 0 deletions db/seqno_time_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,10 @@ TEST_P(SeqnoTimeTablePropTest, PrePopulateInDB) {
// interfere with the seqno-to-time mapping getting a starting entry.
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
} else {
// FIXME: currently, starting entry after CreateColumnFamily requires
// non-zero seqno
ASSERT_OK(Delete("blah"));
}

// Unfortunately, if we add a CF with preserve/preclude option after
Expand Down Expand Up @@ -899,13 +903,26 @@ TEST_P(SeqnoTimeTablePropTest, PrePopulateInDB) {
}
sttm = dbfull()->TEST_GetSeqnoToTimeMapping();
ASSERT_EQ(sttm.Size(), 0);
if (!with_write) {
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0);
}

ASSERT_OK(ReadOnlyReopen(track_options));
if (with_write) {
ASSERT_EQ(Get("foo"), "bar");
}
sttm = dbfull()->TEST_GetSeqnoToTimeMapping();
ASSERT_EQ(sttm.Size(), 0);
if (!with_write) {
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0);

// And even if we re-open read-write, we do not get pre-population,
// because that's only for new DBs.
Reopen(track_options);
sttm = dbfull()->TEST_GetSeqnoToTimeMapping();
ASSERT_EQ(sttm.Size(), 0);
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0);
}
}

// #### DB#5: Destroy and open with preserve/preclude option ####
Expand Down Expand Up @@ -987,6 +1004,13 @@ TEST_P(SeqnoTimeTablePropTest, PrePopulateInDB) {
// Oldest tracking time maps to first pre-allocated seqno
ASSERT_EQ(sttm.GetProximalSeqnoBeforeTime(start_time - kPreserveSecs), 1);

// Even after no writes and DB re-open without tracking options, sequence
// numbers should not go backward into those that were pre-allocated.
// (Future work: persist the mapping)
ReopenWithColumnFamilies({"default", "one"},
List({base_options, base_options}));
ASSERT_EQ(latest_seqno, db_->GetLatestSequenceNumber());

Close();
}

Expand Down
14 changes: 0 additions & 14 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7234,20 +7234,6 @@ Status VersionSet::VerifyFileMetadata(const ReadOptions& read_options,
return status;
}

void VersionSet::EnsureNonZeroSequence() {
uint64_t expected = 0;
// Update each from 0->1, in order, or abort if any becomes non-zero in
// parallel
if (last_allocated_sequence_.compare_exchange_strong(expected, 1)) {
if (last_published_sequence_.compare_exchange_strong(expected, 1)) {
(void)last_sequence_.compare_exchange_strong(expected, 1);
}
}
assert(last_allocated_sequence_.load() > 0);
assert(last_published_sequence_.load() > 0);
assert(last_sequence_.load() > 0);
}

ReactiveVersionSet::ReactiveVersionSet(
const std::string& dbname, const ImmutableDBOptions* _db_options,
const FileOptions& _file_options, Cache* table_cache,
Expand Down
3 changes: 0 additions & 3 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -1344,9 +1344,6 @@ class VersionSet {
last_allocated_sequence_.store(s, std::memory_order_seq_cst);
}

// Allocate a dummy sequence number as needed to ensure last is non-zero.
void EnsureNonZeroSequence();

// Note: memory_order_release must be sufficient
uint64_t FetchAddLastAllocatedSequence(uint64_t s) {
return last_allocated_sequence_.fetch_add(s, std::memory_order_seq_cst);
Expand Down

0 comments on commit 4155087

Please sign in to comment.