Skip to content

Commit

Permalink
Preserve Options File (#13074)
Browse files Browse the repository at this point in the history
Summary:
In #13025 , we made a change to load the latest options file in the remote worker instead of serializing the entire set of options.

That was done under assumption that OPTIONS file do not get purged often. While testing, we learned that this happens more often than we want it to be, so we want to prevent the OPTIONS file from getting purged anytime between when the remote compaction is scheduled and the option is loaded in the remote worker.

Like how we are protecting new SST files from getting purged using `min_pending_output`, we are doing the same by keeping track of `min_options_file_number`. Any OPTIONS file with number greater than `min_options_file_number` will be protected from getting purged. Just like `min_pending_output`, `min_options_file_number` gets bumped when the compaction is done. This is only applicable when `options.compaction_service` is set.

Pull Request resolved: #13074

Test Plan:
```
./compaction_service_test --gtest_filter="*PreservedOptionsLocalCompaction*"
./compaction_service_test --gtest_filter="*PreservedOptionsRemoteCompaction*"
```

Reviewed By: anand1976

Differential Revision: D64433795

Pulled By: jaykorean

fbshipit-source-id: 0d902773f0909d9481dec40abf0b4c54ce5e86b2
  • Loading branch information
jaykorean committed Oct 17, 2024
1 parent eca4f10 commit 11f21cf
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 12 deletions.
2 changes: 2 additions & 0 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,8 @@ struct CompactionServiceInput {
bool has_end = false;
std::string end;

uint64_t options_file_number;

// serialization interface to read and write the object
static Status Read(const std::string& data_str, CompactionServiceInput* obj);
Status Write(std::string* output);
Expand Down
12 changes: 12 additions & 0 deletions db/compaction/compaction_service_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
compaction_input.has_end = sub_compact->end.has_value();
compaction_input.end =
compaction_input.has_end ? sub_compact->end->ToString() : "";
compaction_input.options_file_number =
sub_compact->compaction->input_version()
->version_set()
->options_file_number();

TEST_SYNC_POINT_CALLBACK(
"CompactionServiceJob::ProcessKeyValueCompactionWithCompactionService",
&compaction_input);

std::string compaction_input_binary;
Status s = compaction_input.Write(&compaction_input_binary);
Expand Down Expand Up @@ -438,6 +446,10 @@ static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = {
{"end",
{offsetof(struct CompactionServiceInput, end), OptionType::kEncodedString,
OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
{"options_file_number",
{offsetof(struct CompactionServiceInput, options_file_number),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
};

static std::unordered_map<std::string, OptionTypeInfo>
Expand Down
133 changes: 132 additions & 1 deletion db/compaction/compaction_service_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).


#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/utilities/options_util.h"
#include "table/unique_id_impl.h"

namespace ROCKSDB_NAMESPACE {
Expand Down Expand Up @@ -396,6 +396,137 @@ TEST_F(CompactionServiceTest, ManualCompaction) {
ASSERT_TRUE(result.stats.is_remote_compaction);
}

TEST_F(CompactionServiceTest, PreservedOptionsLocalCompaction) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 2;
options.disable_auto_compactions = true;
DestroyAndReopen(options);

Random rnd(301);
for (auto i = 0; i < 2; ++i) {
for (auto j = 0; j < 10; ++j) {
ASSERT_OK(
Put("foo" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
}
ASSERT_OK(Flush());
}

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::ProcessKeyValueCompaction()::Processing", [&](void* arg) {
auto compaction = static_cast<Compaction*>(arg);
std::string options_file_name = OptionsFileName(
dbname_,
compaction->input_version()->version_set()->options_file_number());

// Change option twice to make sure the very first OPTIONS file gets
// purged
ASSERT_OK(dbfull()->SetOptions(
{{"level0_file_num_compaction_trigger", "4"}}));
ASSERT_EQ(4, dbfull()->GetOptions().level0_file_num_compaction_trigger);
ASSERT_OK(dbfull()->SetOptions(
{{"level0_file_num_compaction_trigger", "6"}}));
ASSERT_EQ(6, dbfull()->GetOptions().level0_file_num_compaction_trigger);
dbfull()->TEST_DeleteObsoleteFiles();

// For non-remote compactions, OPTIONS file can be deleted while
// using option at the start of the compaction
Status s = env_->FileExists(options_file_name);
ASSERT_NOK(s);
ASSERT_TRUE(s.IsNotFound());
// Should be old value
ASSERT_EQ(2, compaction->mutable_cf_options()
->level0_file_num_compaction_trigger);
ASSERT_TRUE(dbfull()->min_options_file_numbers_.empty());
});

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Status s = dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_TRUE(s.ok());
}

TEST_F(CompactionServiceTest, PreservedOptionsRemoteCompaction) {
// For non-remote compaction do not preserve options file
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 2;
options.disable_auto_compactions = true;
ReopenWithCompactionService(&options);
GenerateTestData();

auto my_cs = GetCompactionService();

Random rnd(301);
for (auto i = 0; i < 2; ++i) {
for (auto j = 0; j < 10; ++j) {
ASSERT_OK(
Put("foo" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
}
ASSERT_OK(Flush());
}

bool is_primary_called = false;
// This will be called twice. One from primary and one from remote.
// Try changing the option when called from remote. Otherwise, the new option
// will be used
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial:BeforeRun", [&](void* /*arg*/) {
if (!is_primary_called) {
is_primary_called = true;
return;
}
// Change the option right before the compaction run
ASSERT_OK(dbfull()->SetOptions(
{{"level0_file_num_compaction_trigger", "4"}}));
ASSERT_EQ(4, dbfull()->GetOptions().level0_file_num_compaction_trigger);
dbfull()->TEST_DeleteObsoleteFiles();
});

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionServiceJob::ProcessKeyValueCompactionWithCompactionService",
[&](void* arg) {
auto input = static_cast<CompactionServiceInput*>(arg);
std::string options_file_name =
OptionsFileName(dbname_, input->options_file_number);

ASSERT_OK(env_->FileExists(options_file_name));
ASSERT_FALSE(dbfull()->min_options_file_numbers_.empty());
ASSERT_EQ(dbfull()->min_options_file_numbers_.front(),
input->options_file_number);

DBOptions db_options;
ConfigOptions config_options;
std::vector<ColumnFamilyDescriptor> all_column_families;
config_options.env = env_;
ASSERT_OK(LoadOptionsFromFile(config_options, options_file_name,
&db_options, &all_column_families));
bool has_cf = false;
for (auto& cf : all_column_families) {
if (cf.name == input->cf_name) {
// Should be old value
ASSERT_EQ(2, cf.options.level0_file_num_compaction_trigger);
has_cf = true;
}
}
ASSERT_TRUE(has_cf);
});

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::ProcessKeyValueCompaction()::Processing", [&](void* arg) {
auto compaction = static_cast<Compaction*>(arg);
ASSERT_EQ(2, compaction->mutable_cf_options()
->level0_file_num_compaction_trigger);
});

ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Status s = dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_TRUE(s.ok());

CompactionServiceResult result;
my_cs->GetResult(&result);
ASSERT_OK(result.status);
ASSERT_TRUE(result.stats.is_manual_compaction);
ASSERT_TRUE(result.stats.is_remote_compaction);
}

TEST_F(CompactionServiceTest, CorruptedOutput) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
Expand Down
18 changes: 18 additions & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4779,6 +4779,24 @@ void DBImpl::ReleaseFileNumberFromPendingOutputs(
}
}

std::list<uint64_t>::iterator DBImpl::CaptureOptionsFileNumber() {
// We need to remember the iterator of our insert, because after the
// compaction is done, we need to remove that element from
// min_options_file_numbers_.
min_options_file_numbers_.push_back(versions_->options_file_number());
auto min_options_file_numbers_inserted_elem = min_options_file_numbers_.end();
--min_options_file_numbers_inserted_elem;
return min_options_file_numbers_inserted_elem;
}

void DBImpl::ReleaseOptionsFileNumber(
std::unique_ptr<std::list<uint64_t>::iterator>& v) {
if (v.get() != nullptr) {
min_options_file_numbers_.erase(*v.get());
v.reset();
}
}

Status DBImpl::GetUpdatesSince(
SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
const TransactionLogIterator::ReadOptions& read_options) {
Expand Down
15 changes: 15 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,8 @@ class DBImpl : public DB {

uint64_t GetObsoleteSstFilesSize();

uint64_t MinOptionsFileNumberToKeep();

// Returns the list of live files in 'live' and the list
// of all files in the filesystem in 'candidate_files'.
// If force == false and the last call was less than
Expand Down Expand Up @@ -1694,6 +1696,8 @@ class DBImpl : public DB {
friend class XFTransactionWriteHandler;
friend class DBBlobIndexTest;
friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
friend class CompactionServiceTest_PreservedOptionsLocalCompaction_Test;
friend class CompactionServiceTest_PreservedOptionsRemoteCompaction_Test;
#endif

struct CompactionState;
Expand Down Expand Up @@ -1965,6 +1969,12 @@ class DBImpl : public DB {
void ReleaseFileNumberFromPendingOutputs(
std::unique_ptr<std::list<uint64_t>::iterator>& v);

// Similar to pending_outputs, preserve OPTIONS file. Used for remote
// compaction.
std::list<uint64_t>::iterator CaptureOptionsFileNumber();
void ReleaseOptionsFileNumber(
std::unique_ptr<std::list<uint64_t>::iterator>& v);

// Sets bg error if there is an error writing to WAL.
IOStatus SyncClosedWals(const WriteOptions& write_options,
JobContext* job_context, VersionEdit* synced_wals,
Expand Down Expand Up @@ -2755,6 +2765,11 @@ class DBImpl : public DB {
// State is protected with db mutex.
std::list<uint64_t> pending_outputs_;

// Similar to pending_outputs_, FindObsoleteFiles()/PurgeObsoleteFiles() never
// deletes any OPTIONS file that has number bigger than any of the file number
// in min_options_file_numbers_.
std::list<uint64_t> min_options_file_numbers_;

// flush_queue_ and compaction_queue_ hold column families that we need to
// flush and compact, respectively.
// A column family is inserted into flush_queue_ when it satisfies condition
Expand Down
21 changes: 21 additions & 0 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1561,6 +1561,12 @@ Status DBImpl::CompactFilesImpl(

compaction_job.Prepare();

std::unique_ptr<std::list<uint64_t>::iterator> min_options_file_number_elem;
if (immutable_db_options().compaction_service != nullptr) {
min_options_file_number_elem.reset(
new std::list<uint64_t>::iterator(CaptureOptionsFileNumber()));
}

mutex_.Unlock();
TEST_SYNC_POINT("CompactFilesImpl:0");
TEST_SYNC_POINT("CompactFilesImpl:1");
Expand All @@ -1570,6 +1576,10 @@ Status DBImpl::CompactFilesImpl(
TEST_SYNC_POINT("CompactFilesImpl:3");
mutex_.Lock();

if (immutable_db_options().compaction_service != nullptr) {
ReleaseOptionsFileNumber(min_options_file_number_elem);
}

bool compaction_released = false;
Status status =
compaction_job.Install(*c->mutable_cf_options(), &compaction_released);
Expand Down Expand Up @@ -3901,6 +3911,12 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
&bg_bottom_compaction_scheduled_);
compaction_job.Prepare();

std::unique_ptr<std::list<uint64_t>::iterator> min_options_file_number_elem;
if (immutable_db_options().compaction_service != nullptr) {
min_options_file_number_elem.reset(
new std::list<uint64_t>::iterator(CaptureOptionsFileNumber()));
}

NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
compaction_job_stats, job_context->job_id);
mutex_.Unlock();
Expand All @@ -3910,6 +3926,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
compaction_job.Run().PermitUncheckedError();
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
mutex_.Lock();

if (immutable_db_options().compaction_service != nullptr) {
ReleaseOptionsFileNumber(min_options_file_number_elem);
}

status =
compaction_job.Install(*c->mutable_cf_options(), &compaction_released);
io_s = compaction_job.io_status();
Expand Down
16 changes: 15 additions & 1 deletion db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ uint64_t DBImpl::GetObsoleteSstFilesSize() {
return versions_->GetObsoleteSstFilesSize();
}

uint64_t DBImpl::MinOptionsFileNumberToKeep() {
mutex_.AssertHeld();
if (!min_options_file_numbers_.empty()) {
return *min_options_file_numbers_.begin();
}
return std::numeric_limits<uint64_t>::max();
}

Status DBImpl::DisableFileDeletions() {
Status s;
int my_disable_delete_obsolete_files;
Expand Down Expand Up @@ -147,6 +155,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// here but later find newer generated unfinalized files while scanning.
job_context->min_pending_output = MinObsoleteSstNumberToKeep();
job_context->files_to_quarantine = error_handler_.GetFilesToQuarantine();
job_context->min_options_file_number = MinOptionsFileNumberToKeep();

// Get obsolete files. This function will also update the list of
// pending files in VersionSet().
Expand Down Expand Up @@ -498,7 +507,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
dbname_);

// File numbers of most recent two OPTIONS file in candidate_files (found in
// previos FindObsoleteFiles(full_scan=true))
// previous FindObsoleteFiles(full_scan=true))
// At this point, there must not be any duplicate file numbers in
// candidate_files.
uint64_t optsfile_num1 = std::numeric_limits<uint64_t>::min();
Expand All @@ -519,6 +528,11 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
}
}

// For remote compactions, we need to keep OPTIONS file that may get
// referenced by the remote worker

optsfile_num2 = std::min(optsfile_num2, state.min_options_file_number);

// Close WALs before trying to delete them.
for (const auto w : state.logs_to_free) {
// TODO: maybe check the return value of Close.
Expand Down
18 changes: 8 additions & 10 deletions db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -951,21 +951,19 @@ Status DB::OpenAndCompact(
return s;
}

// 2. Load the options from latest OPTIONS file
// 2. Load the options
DBOptions db_options;
ConfigOptions config_options;
config_options.env = override_options.env;
std::vector<ColumnFamilyDescriptor> all_column_families;
s = LoadLatestOptions(config_options, name, &db_options,
&all_column_families);
// In a very rare scenario, loading options may fail if the options changed by
// the primary host at the same time. Just retry once for now.
if (!s.ok()) {
s = LoadLatestOptions(config_options, name, &db_options,

std::string options_file_name =
OptionsFileName(name, compaction_input.options_file_number);

s = LoadOptionsFromFile(config_options, options_file_name, &db_options,
&all_column_families);
if (!s.ok()) {
return s;
}
if (!s.ok()) {
return s;
}

// 3. Override pointer configurations in DBOptions with
Expand Down
4 changes: 4 additions & 0 deletions db/job_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ struct JobContext {
// that corresponds to the set of files in 'live'.
uint64_t manifest_file_number;
uint64_t pending_manifest_file_number;

// Used for remote compaction. To prevent OPTIONS files from getting
// purged by PurgeObsoleteFiles() of the primary host
uint64_t min_options_file_number;
uint64_t log_number;
uint64_t prev_log_number;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
OPTIONS file to be loaded by remote worker is now preserved so that it does not get purged by the primary host. A similar technique as how we are preserving new SST files from getting purged is used for this. min_options_file_numbers_ is tracked like pending_outputs_ is tracked.

0 comments on commit 11f21cf

Please sign in to comment.