Skip to content

Commit

Permalink
Make max_subcompactions dynamically changeable (#7159)
Browse files Browse the repository at this point in the history
Summary:
Make `max-subcompactions` dynamically changeable by passing the `DBOption` to Compaction.

Pull Request resolved: #7159

Reviewed By: siying

Differential Revision: D22671238

Pulled By: jay-zhuang

fbshipit-source-id: 311ca9f6bb606965544d8708616d358cfed5be42
  • Loading branch information
jay-zhuang authored and facebook-github-bot committed Jul 23, 2020
1 parent 0d04a84 commit b0c5ecd
Show file tree
Hide file tree
Showing 23 changed files with 331 additions and 136 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* Added experimental option BlockBasedTableOptions::optimize_filters_for_memory for reducing allocated memory size of Bloom filters (~10% savings with Jemalloc) while preserving the same general accuracy. To have an effect, the option requires format_version=5 and malloc_usable_size. Enabling this option is forward and backward compatible with existing format_version=5.
* `BackupTableNameOption BackupableDBOptions::share_files_with_checksum_naming` is added, where `BackupTableNameOption` is an `enum` type with two enumerators `kChecksumAndFileSize` and `kOptionalChecksumAndDbSessionId`. By default, `BackupableDBOptions::share_files_with_checksum_naming` is set to `kOptionalChecksumAndDbSessionId`. In the default case, backup table filenames generated by this version of RocksDB are of the form either `<file_number>_<crc32c>_<db_session_id>.sst` or `<file_number>_<db_session_id>.sst` as opposed to `<file_number>_<crc32c>_<file_size>.sst`. Specifically, table filenames are of the form `<file_number>_<crc32c>_<db_session_id>.sst` if `DBOptions::file_checksum_gen_factory` is set to `GetFileChecksumGenCrc32cFactory()`. Futhermore, the checksum value `<crc32c>` appeared in the filenames is hexadecimal-encoded, instead of being decimal-encoded `uint32_t` value. If `DBOptions::file_checksum_gen_factory` is `nullptr`, the table filenames are of the form `<file_number>_<db_session_id>.sst`. The new default behavior fixes the backup file name collision problem, which might be possible at large scale, but the option `kChecksumAndFileSize` is added to allow use of old naming in case it is needed. Moreover, for table files generated prior to this version of RocksDB, using `kOptionalChecksumAndDbSessionId` will fall back on `kChecksumAndFileSize`. In these cases, the checksum value `<crc32c>` in the filenames `<file_number>_<crc32c>_<file_size>.sst` is decimal-encoded `uint32_t` value as before. This default behavior change is not an upgrade issue, because previous versions of RocksDB can read, restore, and delete backups using new names, and it's OK for a backup directory to use a mixture of table file naming schemes. Note that `share_files_with_checksum_naming` comes into effect only when both `share_files_with_checksum` and `share_table_files` are true.
* Added auto resume function to automatically recover the DB from background Retryable IO Error. When retryable IOError happens during flush and WAL write, the error is mapped to Hard Error and DB will be in read mode. When retryable IO Error happens during compaction, the error will be mapped to Soft Error. DB is still in write/read mode. Autoresume function will create a thread for a DB to call DB->ResumeImpl() to try the recover for Retryable IO Error during flush and WAL write. Compaction will be rescheduled by itself if retryable IO Error happens. Auto resume may also cause other Retryable IO Error during the recovery, so the recovery will fail. Retry the auto resume may solve the issue, so we use max_bgerror_resume_count to decide how many resume cycles will be tried in total. If it is <=0, auto resume retryable IO Error is disabled. Default is INT_MAX, which will lead to a infinit auto resume. bgerror_resume_retry_interval decides the time interval between two auto resumes.
* Option `max_subcompactions` can be set dynamically using DB::SetDBOptions().

### Bug Fixes
* Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further.
Expand Down
15 changes: 9 additions & 6 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1047,13 +1047,14 @@ bool ColumnFamilyData::NeedsCompaction() const {
}

Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
const MutableCFOptions& mutable_options,
const MutableDBOptions& mutable_db_options, LogBuffer* log_buffer) {
SequenceNumber earliest_mem_seqno =
std::min(mem_->GetEarliestSequenceNumber(),
imm_.current()->GetEarliestSequenceNumber(false));
auto* result = compaction_picker_->PickCompaction(
GetName(), mutable_options, current_->storage_info(), log_buffer,
earliest_mem_seqno);
GetName(), mutable_options, mutable_db_options, current_->storage_info(),
log_buffer, earliest_mem_seqno);
if (result != nullptr) {
result->SetInputVersion(current_);
}
Expand Down Expand Up @@ -1123,14 +1124,16 @@ const int ColumnFamilyData::kCompactAllLevels = -1;
const int ColumnFamilyData::kCompactToBaseLevel = -2;

Compaction* ColumnFamilyData::CompactRange(
const MutableCFOptions& mutable_cf_options, int input_level,
const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, int input_level,
int output_level, const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* conflict,
uint64_t max_file_num_to_ignore) {
auto* result = compaction_picker_->CompactRange(
GetName(), mutable_cf_options, current_->storage_info(), input_level,
output_level, compact_range_options, begin, end, compaction_end, conflict,
GetName(), mutable_cf_options, mutable_db_options,
current_->storage_info(), input_level, output_level,
compact_range_options, begin, end, compaction_end, conflict,
max_file_num_to_ignore);
if (result != nullptr) {
result->SetInputVersion(current_);
Expand Down
2 changes: 2 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ class ColumnFamilyData {
bool NeedsCompaction() const;
// REQUIRES: DB mutex held
Compaction* PickCompaction(const MutableCFOptions& mutable_options,
const MutableDBOptions& mutable_db_options,
LogBuffer* log_buffer);

// Check if the passed range overlap with any running compactions.
Expand All @@ -412,6 +413,7 @@ class ColumnFamilyData {
static const int kCompactToBaseLevel;
// REQUIRES: DB mutex held
Compaction* CompactRange(const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options,
int input_level, int output_level,
const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end,
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ bool Compaction::IsFullCompaction(
Compaction::Compaction(VersionStorageInfo* vstorage,
const ImmutableCFOptions& _immutable_cf_options,
const MutableCFOptions& _mutable_cf_options,
const MutableDBOptions& _mutable_db_options,
std::vector<CompactionInputFiles> _inputs,
int _output_level, uint64_t _target_file_size,
uint64_t _max_compaction_bytes, uint32_t _output_path_id,
Expand Down Expand Up @@ -243,7 +244,7 @@ Compaction::Compaction(VersionStorageInfo* vstorage,
compaction_reason_ = CompactionReason::kManualCompaction;
}
if (max_subcompactions_ == 0) {
max_subcompactions_ = immutable_cf_options_.max_subcompactions;
max_subcompactions_ = _mutable_db_options.max_subcompactions;
}
if (!bottommost_level_) {
// Currently we only enable dictionary compression during compaction to the
Expand Down
1 change: 1 addition & 0 deletions db/compaction/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class Compaction {
Compaction(VersionStorageInfo* input_version,
const ImmutableCFOptions& immutable_cf_options,
const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options,
std::vector<CompactionInputFiles> inputs, int output_level,
uint64_t target_file_size, uint64_t max_compaction_bytes,
uint32_t output_path_id, CompressionType compression,
Expand Down
9 changes: 6 additions & 3 deletions db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class CompactionJobTest : public testing::Test {
dbname_(test::PerThreadDBPath("compaction_job_test")),
db_options_(),
mutable_cf_options_(cf_options_),
mutable_db_options_(),
table_cache_(NewLRUCache(50000, 16)),
write_buffer_manager_(db_options_.db_write_buffer_size),
versions_(new VersionSet(dbname_, &db_options_, env_options_,
Expand Down Expand Up @@ -316,9 +317,10 @@ class CompactionJobTest : public testing::Test {

Compaction compaction(
cfd->current()->storage_info(), *cfd->ioptions(),
*cfd->GetLatestMutableCFOptions(), compaction_input_files, output_level,
1024 * 1024, 10 * 1024 * 1024, 0, kNoCompression,
cfd->GetLatestMutableCFOptions()->compression_opts, 0, {}, true);
*cfd->GetLatestMutableCFOptions(), mutable_db_options_,
compaction_input_files, output_level, 1024 * 1024, 10 * 1024 * 1024, 0,
kNoCompression, cfd->GetLatestMutableCFOptions()->compression_opts, 0,
{}, true);
compaction.SetInputVersion(cfd->current());

LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
Expand Down Expand Up @@ -370,6 +372,7 @@ class CompactionJobTest : public testing::Test {
ImmutableDBOptions db_options_;
ColumnFamilyOptions cf_options_;
MutableCFOptions mutable_cf_options_;
MutableDBOptions mutable_db_options_;
std::shared_ptr<Cache> table_cache_;
WriteController write_controller_;
WriteBufferManager write_buffer_manager_;
Expand Down
17 changes: 9 additions & 8 deletions db/compaction/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ Compaction* CompactionPicker::CompactFiles(
const CompactionOptions& compact_options,
const std::vector<CompactionInputFiles>& input_files, int output_level,
VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options,
uint32_t output_path_id) {
const MutableDBOptions& mutable_db_options, uint32_t output_path_id) {
assert(input_files.size());
// This compaction output should not overlap with a running compaction as
// `SanitizeCompactionInputFiles` should've checked earlier and db mutex
Expand All @@ -356,8 +356,8 @@ Compaction* CompactionPicker::CompactFiles(
compression_type = compact_options.compression;
}
auto c = new Compaction(
vstorage, ioptions_, mutable_cf_options, input_files, output_level,
compact_options.output_file_size_limit,
vstorage, ioptions_, mutable_cf_options, mutable_db_options, input_files,
output_level, compact_options.output_file_size_limit,
mutable_cf_options.max_compaction_bytes, output_path_id, compression_type,
GetCompressionOptions(mutable_cf_options, vstorage, output_level),
compact_options.max_subcompactions,
Expand Down Expand Up @@ -563,7 +563,8 @@ void CompactionPicker::GetGrandparents(

Compaction* CompactionPicker::CompactRange(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, int input_level, int output_level,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
int input_level, int output_level,
const CompactRangeOptions& compact_range_options, const InternalKey* begin,
const InternalKey* end, InternalKey** compaction_end, bool* manual_conflict,
uint64_t max_file_num_to_ignore) {
Expand Down Expand Up @@ -626,8 +627,8 @@ Compaction* CompactionPicker::CompactRange(
}

Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, std::move(inputs),
output_level,
vstorage, ioptions_, mutable_cf_options, mutable_db_options,
std::move(inputs), output_level,
MaxFileSizeForLevel(mutable_cf_options, output_level,
ioptions_.compaction_style),
/* max_compaction_bytes */ LLONG_MAX,
Expand Down Expand Up @@ -778,8 +779,8 @@ Compaction* CompactionPicker::CompactRange(
std::vector<FileMetaData*> grandparents;
GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents);
Compaction* compaction = new Compaction(
vstorage, ioptions_, mutable_cf_options, std::move(compaction_inputs),
output_level,
vstorage, ioptions_, mutable_cf_options, mutable_db_options,
std::move(compaction_inputs), output_level,
MaxFileSizeForLevel(mutable_cf_options, output_level,
ioptions_.compaction_style, vstorage->base_level(),
ioptions_.level_compaction_dynamic_level_bytes),
Expand Down
9 changes: 7 additions & 2 deletions db/compaction/compaction_picker.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class CompactionPicker {
// describes the compaction. Caller should delete the result.
virtual Compaction* PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer,
SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) = 0;

// Return a compaction object for compacting the range [begin,end] in
Expand All @@ -72,7 +73,8 @@ class CompactionPicker {
// *compaction_end should point to valid InternalKey!
virtual Compaction* CompactRange(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, int input_level, int output_level,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
int input_level, int output_level,
const CompactRangeOptions& compact_range_options,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end, bool* manual_conflict,
Expand Down Expand Up @@ -113,6 +115,7 @@ class CompactionPicker {
const std::vector<CompactionInputFiles>& input_files,
int output_level, VersionStorageInfo* vstorage,
const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options,
uint32_t output_path_id);

// Converts a set of compaction input file numbers into
Expand Down Expand Up @@ -250,6 +253,7 @@ class NullCompactionPicker : public CompactionPicker {
Compaction* PickCompaction(
const std::string& /*cf_name*/,
const MutableCFOptions& /*mutable_cf_options*/,
const MutableDBOptions& /*mutable_db_options*/,
VersionStorageInfo* /*vstorage*/, LogBuffer* /* log_buffer */,
SequenceNumber /* earliest_memtable_seqno */) override {
return nullptr;
Expand All @@ -258,6 +262,7 @@ class NullCompactionPicker : public CompactionPicker {
// Always return "nullptr"
Compaction* CompactRange(const std::string& /*cf_name*/,
const MutableCFOptions& /*mutable_cf_options*/,
const MutableDBOptions& /*mutable_db_options*/,
VersionStorageInfo* /*vstorage*/,
int /*input_level*/, int /*output_level*/,
const CompactRangeOptions& /*compact_range_options*/,
Expand Down
37 changes: 22 additions & 15 deletions db/compaction/compaction_picker_fifo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ bool FIFOCompactionPicker::NeedsCompaction(

Compaction* FIFOCompactionPicker::PickTTLCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer) {
assert(mutable_cf_options.ttl > 0);

const int kLevel0 = 0;
Expand Down Expand Up @@ -108,8 +109,9 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(
}

Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
kNoCompression, mutable_cf_options.compression_opts,
vstorage, ioptions_, mutable_cf_options, mutable_db_options,
std::move(inputs), 0, 0, 0, 0, kNoCompression,
mutable_cf_options.compression_opts,
/* max_subcompactions */ 0, {}, /* is manual */ false,
vstorage->CompactionScore(0),
/* is deletion compaction */ true, CompactionReason::kFIFOTtl);
Expand All @@ -118,7 +120,8 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(

Compaction* FIFOCompactionPicker::PickSizeCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer) {
const int kLevel0 = 0;
const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
uint64_t total_size = GetTotalFilesSize(level_files);
Expand Down Expand Up @@ -147,8 +150,8 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
max_compact_bytes_per_del_file,
mutable_cf_options.max_compaction_bytes, &comp_inputs)) {
Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, {comp_inputs}, 0,
16 * 1024 * 1024 /* output file size limit */,
vstorage, ioptions_, mutable_cf_options, mutable_db_options,
{comp_inputs}, 0, 16 * 1024 * 1024 /* output file size limit */,
0 /* max compaction bytes, not applicable */,
0 /* output path ID */, mutable_cf_options.compression,
mutable_cf_options.compression_opts, 0 /* max_subcompactions */, {},
Expand Down Expand Up @@ -198,8 +201,9 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
}

Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
kNoCompression, mutable_cf_options.compression_opts,
vstorage, ioptions_, mutable_cf_options, mutable_db_options,
std::move(inputs), 0, 0, 0, 0, kNoCompression,
mutable_cf_options.compression_opts,
/* max_subcompactions */ 0, {}, /* is manual */ false,
vstorage->CompactionScore(0),
/* is deletion compaction */ true, CompactionReason::kFIFOMaxSize);
Expand All @@ -208,24 +212,27 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(

Compaction* FIFOCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer,
SequenceNumber /*earliest_memtable_seqno*/) {
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer, SequenceNumber /*earliest_memtable_seqno*/) {
assert(vstorage->num_levels() == 1);

Compaction* c = nullptr;
if (mutable_cf_options.ttl > 0) {
c = PickTTLCompaction(cf_name, mutable_cf_options, vstorage, log_buffer);
c = PickTTLCompaction(cf_name, mutable_cf_options, mutable_db_options,
vstorage, log_buffer);
}
if (c == nullptr) {
c = PickSizeCompaction(cf_name, mutable_cf_options, vstorage, log_buffer);
c = PickSizeCompaction(cf_name, mutable_cf_options, mutable_db_options,
vstorage, log_buffer);
}
RegisterCompaction(c);
return c;
}

Compaction* FIFOCompactionPicker::CompactRange(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, int input_level, int output_level,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
int input_level, int output_level,
const CompactRangeOptions& /*compact_range_options*/,
const InternalKey* /*begin*/, const InternalKey* /*end*/,
InternalKey** compaction_end, bool* /*manual_conflict*/,
Expand All @@ -238,8 +245,8 @@ Compaction* FIFOCompactionPicker::CompactRange(
assert(output_level == 0);
*compaction_end = nullptr;
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.info_log);
Compaction* c =
PickCompaction(cf_name, mutable_cf_options, vstorage, &log_buffer);
Compaction* c = PickCompaction(cf_name, mutable_cf_options,
mutable_db_options, vstorage, &log_buffer);
log_buffer.FlushBufferToLog();
return c;
}
Expand Down
Loading

0 comments on commit b0c5ecd

Please sign in to comment.