Skip to content

Commit

Permalink
Buffer info logs when picking compactions and write them out after re…
Browse files Browse the repository at this point in the history
…leasing the mutex

Summary: Now while the background thread is picking compactions, it writes out multiple info_logs, especially for universal compaction, which introduces a chance of waiting log writing in mutex, which is bad. To remove this risk, write all those info logs to a buffer and flush it after releasing the mutex.

Test Plan:
make all check
check the log lines while running some tests that trigger compactions.

Reviewers: haobo, igor, dhruba

Reviewed By: dhruba

CC: i.am.jin.lei, dhruba, yhchiang, leveldb, nkg-

Differential Revision: https://reviews.facebook.net/D16515
  • Loading branch information
siying committed Mar 5, 2014
1 parent 4405f3a commit ecb1ffa
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 120 deletions.
106 changes: 50 additions & 56 deletions db/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
return c;
}

Compaction* LevelCompactionPicker::PickCompaction(Version* version) {
Compaction* LevelCompactionPicker::PickCompaction(Version* version,
LogBuffer* log_buffer) {
Compaction* c = nullptr;
int level = -1;

Expand Down Expand Up @@ -544,40 +545,43 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version,
// Universal style of compaction. Pick files that are contiguous in
// time-range to compact.
//
Compaction* UniversalCompactionPicker::PickCompaction(Version* version) {
Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
LogBuffer* log_buffer) {
int level = 0;
double score = version->compaction_score_[0];

if ((version->files_[level].size() <
(unsigned int)options_->level0_file_num_compaction_trigger)) {
Log(options_->info_log, "Universal: nothing to do\n");
LogToBuffer(log_buffer, "Universal: nothing to do\n");
return nullptr;
}
Version::FileSummaryStorage tmp;
Log(options_->info_log, "Universal: candidate files(%zu): %s\n",
version->files_[level].size(),
version->LevelFileSummary(&tmp, 0));
LogToBuffer(log_buffer, "Universal: candidate files(%zu): %s\n",
version->files_[level].size(),
version->LevelFileSummary(&tmp, 0));

// Check for size amplification first.
Compaction* c;
if ((c = PickCompactionUniversalSizeAmp(version, score)) != nullptr) {
Log(options_->info_log, "Universal: compacting for size amp\n");
if ((c = PickCompactionUniversalSizeAmp(version, score, log_buffer)) !=
nullptr) {
LogToBuffer(log_buffer, "Universal: compacting for size amp\n");
} else {

// Size amplification is within limits. Try reducing read
// amplification while maintaining file size ratios.
unsigned int ratio = options_->compaction_options_universal.size_ratio;

if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX)) != nullptr) {
Log(options_->info_log, "Universal: compacting for size ratio\n");
if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX,
log_buffer)) != nullptr) {
LogToBuffer(log_buffer, "Universal: compacting for size ratio\n");
} else {
// Size amplification and file size ratios are within configured limits.
// If max read amplification is exceeding configured limits, then force
// compaction without looking at filesize ratios and try to reduce
// the number of files to fewer than level0_file_num_compaction_trigger.
unsigned int num_files = version->files_[level].size() -
options_->level0_file_num_compaction_trigger;
if ((c = PickCompactionUniversalReadAmp(version, score, UINT_MAX, num_files)) != nullptr) {
if ((c = PickCompactionUniversalReadAmp(
version, score, UINT_MAX, num_files, log_buffer)) != nullptr) {
Log(options_->info_log, "Universal: compacting for file num\n");
}
}
Expand Down Expand Up @@ -631,7 +635,7 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) {
//
Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
Version* version, double score, unsigned int ratio,
unsigned int max_number_of_files_to_compact) {
unsigned int max_number_of_files_to_compact, LogBuffer* log_buffer) {
int level = 0;

unsigned int min_merge_width =
Expand Down Expand Up @@ -666,18 +670,18 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
candidate_count = 1;
break;
}
Log(options_->info_log,
"Universal: file %lu[%d] being compacted, skipping",
(unsigned long)f->number, loop);
LogToBuffer(log_buffer,
"Universal: file %lu[%d] being compacted, skipping",
(unsigned long)f->number, loop);
f = nullptr;
}

// This file is not being compacted. Consider it as the
// first candidate to be compacted.
uint64_t candidate_size = f != nullptr? f->file_size : 0;
if (f != nullptr) {
Log(options_->info_log, "Universal: Possible candidate file %lu[%d].",
(unsigned long)f->number, loop);
LogToBuffer(log_buffer, "Universal: Possible candidate file %lu[%d].",
(unsigned long)f->number, loop);
}

// Check if the suceeding files need compaction.
Expand Down Expand Up @@ -727,12 +731,10 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
i < loop + candidate_count && i < file_by_time.size(); i++) {
int index = file_by_time[i];
FileMetaData* f = version->files_[level][index];
Log(options_->info_log,
"Universal: Skipping file %lu[%d] with size %lu %d\n",
(unsigned long)f->number,
i,
(unsigned long)f->file_size,
f->being_compacted);
LogToBuffer(log_buffer,
"Universal: Skipping file %lu[%d] with size %lu %d\n",
(unsigned long)f->number, i, (unsigned long)f->file_size,
f->being_compacted);
}
}
}
Expand Down Expand Up @@ -766,10 +768,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
int index = file_by_time[i];
FileMetaData* f = c->input_version_->files_[level][index];
c->inputs_[0].push_back(f);
Log(options_->info_log, "Universal: Picking file %lu[%d] with size %lu\n",
(unsigned long)f->number,
i,
(unsigned long)f->file_size);
LogToBuffer(log_buffer, "Universal: Picking file %lu[%d] with size %lu\n",
(unsigned long)f->number, i, (unsigned long)f->file_size);
}
return c;
}
Expand All @@ -781,7 +781,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
// min_merge_width and max_merge_width).
//
Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
Version* version, double score) {
Version* version, double score, LogBuffer* log_buffer) {
int level = 0;

// percentage flexibilty while reducing size amplification
Expand All @@ -805,31 +805,27 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
start_index = loop; // Consider this as the first candidate.
break;
}
Log(options_->info_log, "Universal: skipping file %lu[%d] compacted %s",
(unsigned long)f->number,
loop,
" cannot be a candidate to reduce size amp.\n");
LogToBuffer(log_buffer, "Universal: skipping file %lu[%d] compacted %s",
(unsigned long)f->number, loop,
" cannot be a candidate to reduce size amp.\n");
f = nullptr;
}
if (f == nullptr) {
return nullptr; // no candidate files
}

Log(options_->info_log, "Universal: First candidate file %lu[%d] %s",
(unsigned long)f->number,
start_index,
" to reduce size amp.\n");
LogToBuffer(log_buffer, "Universal: First candidate file %lu[%d] %s",
(unsigned long)f->number, start_index, " to reduce size amp.\n");

// keep adding up all the remaining files
for (unsigned int loop = start_index; loop < file_by_time.size() - 1;
loop++) {
int index = file_by_time[loop];
f = version->files_[level][index];
if (f->being_compacted) {
Log(options_->info_log,
"Universal: Possible candidate file %lu[%d] %s.",
(unsigned long)f->number,
loop,
LogToBuffer(
log_buffer, "Universal: Possible candidate file %lu[%d] %s.",
(unsigned long)f->number, loop,
" is already being compacted. No size amp reduction possible.\n");
return nullptr;
}
Expand All @@ -846,18 +842,18 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(

// size amplification = percentage of additional size
if (candidate_size * 100 < ratio * earliest_file_size) {
Log(options_->info_log,
"Universal: size amp not needed. newer-files-total-size %lu "
"earliest-file-size %lu",
(unsigned long)candidate_size,
(unsigned long)earliest_file_size);
LogToBuffer(log_buffer,
"Universal: size amp not needed. newer-files-total-size %lu "
"earliest-file-size %lu",
(unsigned long)candidate_size,
(unsigned long)earliest_file_size);
return nullptr;
} else {
Log(options_->info_log,
"Universal: size amp needed. newer-files-total-size %lu "
"earliest-file-size %lu",
(unsigned long)candidate_size,
(unsigned long)earliest_file_size);
LogToBuffer(log_buffer,
"Universal: size amp needed. newer-files-total-size %lu "
"earliest-file-size %lu",
(unsigned long)candidate_size,
(unsigned long)earliest_file_size);
}
assert(start_index >= 0 && start_index < file_by_time.size() - 1);

Expand All @@ -871,11 +867,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
int index = file_by_time[loop];
f = c->input_version_->files_[level][index];
c->inputs_[0].push_back(f);
Log(options_->info_log,
"Universal: size amp picking file %lu[%d] with size %lu",
(unsigned long)f->number,
index,
(unsigned long)f->file_size);
LogToBuffer(log_buffer,
"Universal: size amp picking file %lu[%d] with size %lu",
(unsigned long)f->number, index, (unsigned long)f->file_size);
}
return c;
}
Expand Down
16 changes: 11 additions & 5 deletions db/compaction_picker.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

namespace rocksdb {

class LogBuffer;
class Compaction;
class Version;

Expand All @@ -31,7 +32,8 @@ class CompactionPicker {
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
virtual Compaction* PickCompaction(Version* version) = 0;
virtual Compaction* PickCompaction(Version* version,
LogBuffer* log_buffer) = 0;

// Return a compaction object for compacting the range [begin,end] in
// the specified level. Returns nullptr if there is nothing in that
Expand Down Expand Up @@ -127,24 +129,28 @@ class UniversalCompactionPicker : public CompactionPicker {
UniversalCompactionPicker(const Options* options,
const InternalKeyComparator* icmp)
: CompactionPicker(options, icmp) {}
virtual Compaction* PickCompaction(Version* version) override;
virtual Compaction* PickCompaction(Version* version,
LogBuffer* log_buffer) override;

private:
// Pick Universal compaction to limit read amplification
Compaction* PickCompactionUniversalReadAmp(Version* version, double score,
unsigned int ratio,
unsigned int num_files);
unsigned int num_files,
LogBuffer* log_buffer);

// Pick Universal compaction to limit space amplification.
Compaction* PickCompactionUniversalSizeAmp(Version* version, double score);
Compaction* PickCompactionUniversalSizeAmp(Version* version, double score,
LogBuffer* log_buffer);
};

class LevelCompactionPicker : public CompactionPicker {
public:
LevelCompactionPicker(const Options* options,
const InternalKeyComparator* icmp)
: CompactionPicker(options, icmp) {}
virtual Compaction* PickCompaction(Version* version) override;
virtual Compaction* PickCompaction(Version* version,
LogBuffer* log_buffer) override;

private:
// For the specfied level, pick a compaction.
Expand Down
Loading

0 comments on commit ecb1ffa

Please sign in to comment.