Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Compaction] Load latest options from OPTIONS file in Remote host #13025

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,7 @@ class CompactionJob {
// doesn't contain the LSM tree information, which is passed though MANIFEST
// file.
struct CompactionServiceInput {
ColumnFamilyDescriptor column_family;

DBOptions db_options;
std::string cf_name;

std::vector<SequenceNumber> snapshots;

Expand All @@ -402,9 +400,6 @@ struct CompactionServiceInput {
static Status Read(const std::string& data_str, CompactionServiceInput* obj);
Status Write(std::string* output);

// Initialize a dummy ColumnFamilyDescriptor
CompactionServiceInput() : column_family("", ColumnFamilyOptions()) {}

#ifndef NDEBUG
bool TEST_Equals(CompactionServiceInput* other);
bool TEST_Equals(CompactionServiceInput* other, std::string* mismatch);
Expand Down
16 changes: 3 additions & 13 deletions db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1568,17 +1568,7 @@ TEST_F(CompactionJobTest, InputSerialization) {
const int kStrMaxLen = 1000;
Random rnd(static_cast<uint32_t>(time(nullptr)));
Random64 rnd64(time(nullptr));
input.column_family.name = rnd.RandomString(rnd.Uniform(kStrMaxLen));
input.column_family.options.comparator = ReverseBytewiseComparator();
input.column_family.options.max_bytes_for_level_base =
rnd64.Uniform(UINT64_MAX);
input.column_family.options.disable_auto_compactions = rnd.OneIn(2);
input.column_family.options.compression = kZSTD;
input.column_family.options.compression_opts.level = 4;
input.db_options.max_background_flushes = 10;
input.db_options.paranoid_checks = rnd.OneIn(2);
input.db_options.statistics = CreateDBStatistics();
input.db_options.env = env_;
input.cf_name = rnd.RandomString(rnd.Uniform(kStrMaxLen));
while (!rnd.OneIn(10)) {
input.snapshots.emplace_back(rnd64.Uniform(UINT64_MAX));
}
Expand Down Expand Up @@ -1606,10 +1596,10 @@ TEST_F(CompactionJobTest, InputSerialization) {
ASSERT_TRUE(deserialized1.TEST_Equals(&input));

// Test mismatch
deserialized1.db_options.max_background_flushes += 10;
deserialized1.output_level += 10;
std::string mismatch;
ASSERT_FALSE(deserialized1.TEST_Equals(&input, &mismatch));
ASSERT_EQ(mismatch, "db_options.max_background_flushes");
ASSERT_EQ(mismatch, "output_level");

// Test unknown field
CompactionServiceInput deserialized2;
Expand Down
70 changes: 18 additions & 52 deletions db/compaction/compaction_service_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
#include "rocksdb/utilities/options_type.h"
#include "rocksdb/utilities/options_util.h"

namespace ROCKSDB_NAMESPACE {
class SubcompactionState;
Expand All @@ -28,6 +29,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(

const Compaction* compaction = sub_compact->compaction;
CompactionServiceInput compaction_input;

compaction_input.output_level = compaction->output_level();
compaction_input.db_id = db_id_;

Expand All @@ -39,12 +41,8 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
MakeTableFileName(file->fd.GetNumber()));
}
}
compaction_input.column_family.name =
compaction->column_family_data()->GetName();
compaction_input.column_family.options =
compaction->column_family_data()->GetLatestCFOptions();
compaction_input.db_options =
BuildDBOptions(db_options_, mutable_db_options_copy_);

compaction_input.cf_name = compaction->column_family_data()->GetName();
compaction_input.snapshots = existing_snapshots_;
compaction_input.has_begin = sub_compact->start.has_value();
compaction_input.begin =
Expand All @@ -70,7 +68,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
compaction_input.column_family.name.c_str(), job_id_,
compaction->column_family_data()->GetName().c_str(), job_id_,
compaction_input.output_level, input_files_oss.str().c_str());
CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_,
GetCompactionId(sub_compact), thread_pri_);
Expand All @@ -84,13 +82,14 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
"CompactionService failed to schedule a remote compaction job.");
ROCKS_LOG_WARN(db_options_.info_log,
"[%s] [JOB %d] Remote compaction failed to start.",
compaction_input.column_family.name.c_str(), job_id_);
compaction->column_family_data()->GetName().c_str(),
job_id_);
return response.status;
case CompactionServiceJobStatus::kUseLocal:
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)",
compaction_input.column_family.name.c_str(), job_id_);
compaction->column_family_data()->GetName().c_str(), job_id_);
return response.status;
default:
assert(false); // unknown status
Expand All @@ -99,7 +98,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(

ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Waiting for remote compaction...",
compaction_input.column_family.name.c_str(), job_id_);
compaction->column_family_data()->GetName().c_str(), job_id_);
std::string compaction_result_binary;
CompactionServiceJobStatus compaction_status =
db_options_.compaction_service->Wait(response.scheduled_job_id,
Expand All @@ -109,7 +108,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
ROCKS_LOG_INFO(
db_options_.info_log,
"[%s] [JOB %d] Remote compaction fallback to local by API (Wait)",
compaction_input.column_family.name.c_str(), job_id_);
compaction->column_family_data()->GetName().c_str(), job_id_);
return compaction_status;
}

Expand All @@ -134,9 +133,9 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
"result is returned).");
compaction_result.status.PermitUncheckedError();
}
ROCKS_LOG_WARN(db_options_.info_log,
"[%s] [JOB %d] Remote compaction failed.",
compaction_input.column_family.name.c_str(), job_id_);
ROCKS_LOG_WARN(
db_options_.info_log, "[%s] [JOB %d] Remote compaction failed.",
compaction->column_family_data()->GetName().c_str(), job_id_);
return compaction_status;
}

Expand All @@ -162,7 +161,7 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
db_options_.info_log,
"[%s] [JOB %d] Received remote compaction result, output path: "
"%s, files: %s",
compaction_input.column_family.name.c_str(), job_id_,
compaction->column_family_data()->GetName().c_str(), job_id_,
compaction_result.output_path.c_str(), output_files_oss.str().c_str());

// Installation Starts
Expand Down Expand Up @@ -264,8 +263,8 @@ Status CompactionServiceCompactionJob::Run() {
const VersionStorageInfo* storage_info = c->input_version()->storage_info();
assert(storage_info);
assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0);

write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level());

bottommost_level_ = c->bottommost_level();

Slice begin = compaction_input_.begin;
Expand Down Expand Up @@ -404,42 +403,9 @@ static std::unordered_map<std::string, OptionTypeInfo> cfd_type_info = {
};

static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = {
{"column_family",
OptionTypeInfo::Struct(
"column_family", &cfd_type_info,
offsetof(struct CompactionServiceInput, column_family),
OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
{"db_options",
{offsetof(struct CompactionServiceInput, db_options),
OptionType::kConfigurable, OptionVerificationType::kNormal,
OptionTypeFlags::kNone,
[](const ConfigOptions& opts, const std::string& /*name*/,
const std::string& value, void* addr) {
auto options = static_cast<DBOptions*>(addr);
return GetDBOptionsFromString(opts, DBOptions(), value, options);
},
[](const ConfigOptions& opts, const std::string& /*name*/,
const void* addr, std::string* value) {
const auto options = static_cast<const DBOptions*>(addr);
std::string result;
auto status = GetStringFromDBOptions(opts, *options, &result);
*value = "{" + result + "}";
return status;
},
[](const ConfigOptions& opts, const std::string& name, const void* addr1,
const void* addr2, std::string* mismatch) {
const auto this_one = static_cast<const DBOptions*>(addr1);
const auto that_one = static_cast<const DBOptions*>(addr2);
auto this_conf = DBOptionsAsConfigurable(*this_one);
auto that_conf = DBOptionsAsConfigurable(*that_one);
std::string mismatch_opt;
bool result =
this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);
if (!result) {
*mismatch = name + "." + mismatch_opt;
}
return result;
}}},
{"cf_name",
{offsetof(struct CompactionServiceInput, cf_name),
OptionType::kEncodedString}},
{"snapshots", OptionTypeInfo::Vector<uint64_t>(
offsetof(struct CompactionServiceInput, snapshots),
OptionVerificationType::kNormal, OptionTypeFlags::kNone,
Expand Down
113 changes: 74 additions & 39 deletions db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
#include "logging/auto_roll_logger.h"
#include "logging/logging.h"
#include "monitoring/perf_context_imp.h"
#include "rocksdb/configurable.h"
#include "rocksdb/convenience.h"
#include "rocksdb/utilities/options_util.h"
#include "util/cast_util.h"
#include "util/write_batch_util.h"

Expand Down Expand Up @@ -938,69 +939,103 @@ Status DB::OpenAndCompact(
const std::string& output_directory, const std::string& input,
std::string* output,
const CompactionServiceOptionsOverride& override_options) {
// Check for cancellation
if (options.canceled && options.canceled->load(std::memory_order_acquire)) {
return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}

// 1. Deserialize Compaction Input
CompactionServiceInput compaction_input;
Status s = CompactionServiceInput::Read(input, &compaction_input);
if (!s.ok()) {
return s;
}

compaction_input.db_options.max_open_files = -1;
compaction_input.db_options.compaction_service = nullptr;
if (compaction_input.db_options.statistics) {
compaction_input.db_options.statistics.reset();
// 2. Load the options from latest OPTIONS file
DBOptions db_options;
ConfigOptions config_options;
config_options.env = override_options.env;
std::vector<ColumnFamilyDescriptor> all_column_families;
s = LoadLatestOptions(config_options, name, &db_options,
&all_column_families);
// In a very rare scenario, loading options may fail if the options changed by
// the primary host at the same time. Just retry once for now.
if (!s.ok()) {
s = LoadLatestOptions(config_options, name, &db_options,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe log the error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually Logger is not available at this point before the options is loaded, so I didn't bother creating a tmp logger just for this. We can consider adding it later if really needed.

&all_column_families);
if (!s.ok()) {
return s;
}
}
compaction_input.db_options.env = override_options.env;
compaction_input.db_options.file_checksum_gen_factory =
override_options.file_checksum_gen_factory;
compaction_input.db_options.statistics = override_options.statistics;
compaction_input.column_family.options.comparator =
override_options.comparator;
compaction_input.column_family.options.merge_operator =
override_options.merge_operator;
compaction_input.column_family.options.compaction_filter =
override_options.compaction_filter;
compaction_input.column_family.options.compaction_filter_factory =
override_options.compaction_filter_factory;
compaction_input.column_family.options.prefix_extractor =
override_options.prefix_extractor;
compaction_input.column_family.options.table_factory =
override_options.table_factory;
compaction_input.column_family.options.sst_partitioner_factory =
override_options.sst_partitioner_factory;
compaction_input.column_family.options.table_properties_collector_factories =
override_options.table_properties_collector_factories;
compaction_input.db_options.listeners = override_options.listeners;

// 3. Override pointer configurations in DBOptions with
// CompactionServiceOptionsOverride
db_options.env = override_options.env;
db_options.file_checksum_gen_factory =
override_options.file_checksum_gen_factory;
db_options.statistics = override_options.statistics;
db_options.listeners = override_options.listeners;
db_options.compaction_service = nullptr;
// We will close the DB after the compaction anyway.
// Open as many files as needed for the compaction.
db_options.max_open_files = -1;

// 4. Filter CFs that are needed for OpenAndCompact()
// We do not need to open all column families for the remote compaction.
// Only open default CF + target CF. If target CF == default CF, we will open
// just the default CF (Due to current limitation, DB cannot open without the
// default CF)
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(compaction_input.column_family);
// TODO: we have to open default CF, because of an implementation limitation,
// currently we just use the same CF option from input, which is not collect
// and open may fail.
if (compaction_input.column_family.name != kDefaultColumnFamilyName) {
column_families.emplace_back(kDefaultColumnFamilyName,
compaction_input.column_family.options);
for (auto& cf : all_column_families) {
if (cf.name == compaction_input.cf_name) {
cf.options.comparator = override_options.comparator;
cf.options.merge_operator = override_options.merge_operator;
cf.options.compaction_filter = override_options.compaction_filter;
cf.options.compaction_filter_factory =
override_options.compaction_filter_factory;
cf.options.prefix_extractor = override_options.prefix_extractor;
cf.options.table_factory = override_options.table_factory;
cf.options.sst_partitioner_factory =
override_options.sst_partitioner_factory;
cf.options.table_properties_collector_factories =
override_options.table_properties_collector_factories;
column_families.emplace_back(cf);
} else if (cf.name == kDefaultColumnFamilyName) {
column_families.emplace_back(cf);
}
}

// 5. Open db As Secondary
DB* db;
std::vector<ColumnFamilyHandle*> handles;

s = DB::OpenAsSecondary(compaction_input.db_options, name, output_directory,
column_families, &handles, &db);
s = DB::OpenAsSecondary(db_options, name, output_directory, column_families,
&handles, &db);
if (!s.ok()) {
return s;
}
assert(db);

// 6. Find the handle of the Column Family that this will compact
ColumnFamilyHandle* cfh = nullptr;
for (auto* handle : handles) {
if (compaction_input.cf_name == handle->GetName()) {
cfh = handle;
break;
}
}
assert(cfh);

// 7. Run the compaction without installation.
// Output will be stored in the directory specified by output_directory
CompactionServiceResult compaction_result;
DBImplSecondary* db_secondary = static_cast_with_check<DBImplSecondary>(db);
assert(handles.size() > 0);
s = db_secondary->CompactWithoutInstallation(
options, handles[0], compaction_input, &compaction_result);
s = db_secondary->CompactWithoutInstallation(options, cfh, compaction_input,
&compaction_result);

// 8. Serialize the result
Status serialization_status = compaction_result.Write(output);

// 9. Close the db and return
for (auto& handle : handles) {
delete handle;
}
Expand Down
3 changes: 0 additions & 3 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -2295,9 +2295,6 @@ struct SizeApproximationOptions {
};

struct CompactionServiceOptionsOverride {
// Currently pointer configurations are not passed to compaction service
// compaction so the user needs to set it. It will be removed once pointer
// configuration passing is supported.
Env* env = Env::Default();
std::shared_ptr<FileChecksumGenFactory> file_checksum_gen_factory = nullptr;

Expand Down
Loading