Skip to content

Commit

Permalink
Reconstruct GC stats on reopen (#130)
Browse files Browse the repository at this point in the history
Summary:
* On DB reopen, iterate all SST file's properties and sum up live data size of all blob files to use as GC stats.
* Remove previous logic to set GC mark on blob files on reopen.
* Refactor `OnFlushCompleted` and `OnCompactionCompleted`. Check file state is `kPendingLSM` before transit file state. 
* Refactor `BlobFileMeta`.

Test Plan:
Updated unit tests

Signed-off-by: Yi Wu <yiwu@pingcap.com>
  • Loading branch information
yiwu-arbug authored Feb 4, 2020
1 parent 3b267dc commit ffaa9d1
Show file tree
Hide file tree
Showing 20 changed files with 721 additions and 372 deletions.
1 change: 1 addition & 0 deletions src/blob_file_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ void BlobFileBuilder::Add(const BlobRecord& record, BlobHandle* handle) {
encoder_.EncodeRecord(record);
handle->offset = file_->GetFileSize();
handle->size = encoder_.GetEncodedSize();
live_data_size_ += handle->size;

status_ = file_->Append(encoder_.GetHeader());
if (ok()) {
Expand Down
5 changes: 4 additions & 1 deletion src/blob_file_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class BlobFileBuilder {
const std::string& GetSmallestKey() { return smallest_key_; }
const std::string& GetLargestKey() { return largest_key_; }

uint64_t live_data_size() const { return live_data_size_; }

private:
bool ok() const { return status().ok(); }

Expand All @@ -69,9 +71,10 @@ class BlobFileBuilder {
Status status_;
BlobEncoder encoder_;

uint64_t num_entries_{0};
uint64_t num_entries_ = 0;
std::string smallest_key_;
std::string largest_key_;
uint64_t live_data_size_ = 0;
};

} // namespace titandb
Expand Down
6 changes: 0 additions & 6 deletions src/blob_file_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,6 @@ Status BlobFileSet::Recover() {
"Next blob file number is %" PRIu64 ".", next_file_number);
}

// Make sure perform gc on all files at the beginning
MarkAllFilesForGC();
for (auto& cf : column_families_) {
cf.second->ComputeGCScore();
}

auto new_manifest_file_number = NewFileNumber();
s = OpenManifest(new_manifest_file_number);
if (!s.ok()) return s;
Expand Down
7 changes: 0 additions & 7 deletions src/blob_file_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,6 @@ class BlobFileSet {
void GetObsoleteFiles(std::vector<std::string>* obsolete_files,
SequenceNumber oldest_sequence);

// REQUIRES: mutex is held
void MarkAllFilesForGC() {
for (auto& cf : column_families_) {
cf.second->MarkAllFilesForGC();
}
}

// REQUIRES: mutex is held
bool IsColumnFamilyObsolete(uint32_t cf_id) {
return obsolete_columns_.count(cf_id) > 0;
Expand Down
11 changes: 0 additions & 11 deletions src/blob_format.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,6 @@ void BlobFileMeta::FileStateTransit(const FileEvent& event) {
}
}

void BlobFileMeta::AddDiscardableSize(uint64_t _discardable_size) {
assert(_discardable_size < file_size_);
discardable_size_ += _discardable_size;
assert(discardable_size_ < file_size_);
}

TitanInternalStats::StatsType BlobFileMeta::GetDiscardableRatioLevel() const {
auto ratio = GetDiscardableRatio();
TitanInternalStats::StatsType type;
Expand All @@ -260,11 +254,6 @@ TitanInternalStats::StatsType BlobFileMeta::GetDiscardableRatioLevel() const {
return type;
}

double BlobFileMeta::GetDiscardableRatio() const {
return static_cast<double>(discardable_size_) /
static_cast<double>(file_size_);
}

void BlobFileHeader::EncodeTo(std::string* dst) const {
PutFixed32(dst, kHeaderMagicNumber);
PutFixed32(dst, version);
Expand Down
35 changes: 19 additions & 16 deletions src/blob_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ struct BlobIndex {
//
class BlobFileMeta {
public:
enum class FileEvent {
enum class FileEvent : int {
kInit,
kFlushCompleted,
kCompactionCompleted,
Expand All @@ -185,7 +185,7 @@ class BlobFileMeta {
kReset, // reset file to normal for test
};

enum class FileState {
enum class FileState : int {
kInit, // file never at this state
kNormal,
kPendingLSM, // waiting keys adding to LSM
Expand Down Expand Up @@ -216,32 +216,40 @@ class BlobFileMeta {

uint64_t file_number() const { return file_number_; }
uint64_t file_size() const { return file_size_; }
uint64_t live_data_size() const { return live_data_size_; }
void set_live_data_size(uint64_t size) { live_data_size_ = size; }
uint64_t file_entries() const { return file_entries_; }
uint32_t file_level() const { return file_level_; }
const std::string& smallest_key() const { return smallest_key_; }
const std::string& largest_key() const { return largest_key_; }

FileState file_state() const { return state_; }
bool is_obsolete() const { return state_ == FileState::kObsolete; }
uint64_t discardable_size() const { return discardable_size_; }

bool gc_mark() const { return gc_mark_; }
void set_gc_mark(bool mark) { gc_mark_ = mark; }

void FileStateTransit(const FileEvent& event);

void AddDiscardableSize(uint64_t _discardable_size);
double GetDiscardableRatio() const;
bool NoLiveData() {
return discardable_size_ ==
file_size_ - kBlobMaxHeaderSize - kBlobFooterSize;
bool UpdateLiveDataSize(int64_t delta) {
int64_t result = static_cast<int64_t>(live_data_size_) + delta;
if (result < 0) {
live_data_size_ = 0;
return false;
}
live_data_size_ = static_cast<uint64_t>(result);
return true;
}
bool NoLiveData() { return live_data_size_ == 0; }
double GetDiscardableRatio() const {
// TODO: Exclude metadata size from file size.
return 1 - (static_cast<double>(live_data_size_) / file_size_);
}
TitanInternalStats::StatsType GetDiscardableRatioLevel() const;

private:
// Persistent field
uint64_t file_number_{0};
uint64_t file_size_{0};
// Size of data with reference from SST files.
uint64_t live_data_size_{0};
uint64_t file_entries_;
// Target level of compaction/flush which generates this blob file
uint32_t file_level_;
Expand All @@ -252,11 +260,6 @@ class BlobFileMeta {

// Not persistent field
FileState state_{FileState::kInit};

uint64_t discardable_size_{0};
// gc_mark is set to true when this file is recovered from re-opening the DB
// that means this file needs to be checked for GC
bool gc_mark_{false};
};

// Format of blob file header for version 1 (8 bytes):
Expand Down
1 change: 0 additions & 1 deletion src/blob_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ void BlobGC::MarkFilesBeingGC() {

void BlobGC::ReleaseGcFiles() {
for (auto& f : inputs_) {
f->set_gc_mark(false);
f->FileStateTransit(BlobFileMeta::FileEvent::kGCCompleted);
}

Expand Down
7 changes: 0 additions & 7 deletions src/blob_gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ class BlobGC {

const std::vector<BlobFileMeta*>& inputs() { return inputs_; }

void set_sampled_inputs(std::vector<BlobFileMeta*>&& files) {
sampled_inputs_ = std::move(files);
}

const std::vector<BlobFileMeta*>& sampled_inputs() { return sampled_inputs_; }

const TitanCFOptions& titan_cf_options() { return titan_cf_options_; }

void SetColumnFamily(ColumnFamilyHandle* cfh);
Expand All @@ -47,7 +41,6 @@ class BlobGC {

private:
std::vector<BlobFileMeta*> inputs_;
std::vector<BlobFileMeta*> sampled_inputs_;
std::vector<BlobFileMeta*> outputs_;
TitanCFOptions titan_cf_options_;
ColumnFamilyHandle* cfh_{nullptr};
Expand Down
125 changes: 5 additions & 120 deletions src/blob_gc_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,135 +119,19 @@ Status BlobGCJob::Prepare() {
}

Status BlobGCJob::Run() {
Status s = SampleCandidateFiles();
if (!s.ok()) {
return s;
}

std::string tmp;
for (const auto& f : blob_gc_->inputs()) {
if (!tmp.empty()) {
tmp.append(" ");
}
tmp.append(std::to_string(f->file_number()));
}

std::string tmp2;
for (const auto& f : blob_gc_->sampled_inputs()) {
if (!tmp2.empty()) {
tmp2.append(" ");
}
tmp2.append(std::to_string(f->file_number()));
}

ROCKS_LOG_BUFFER(log_buffer_, "[%s] Titan GC candidates[%s] selected[%s]",
ROCKS_LOG_BUFFER(log_buffer_, "[%s] Titan GC candidates[%s]",
blob_gc_->column_family_handle()->GetName().c_str(),
tmp.c_str(), tmp2.c_str());

if (blob_gc_->sampled_inputs().empty()) {
return Status::OK();
}

tmp.c_str());
return DoRunGC();
}

Status BlobGCJob::SampleCandidateFiles() {
TitanStopWatch sw(env_, metrics_.gc_sampling_micros);
std::vector<BlobFileMeta*> result;
for (const auto& file : blob_gc_->inputs()) {
bool selected = false;
Status s = DoSample(file, &selected);
if (!s.ok()) {
return s;
}
if (selected) {
result.push_back(file);
}
}
if (!result.empty()) {
blob_gc_->set_sampled_inputs(std::move(result));
}
return Status::OK();
}

Status BlobGCJob::DoSample(const BlobFileMeta* file, bool* selected) {
assert(selected != nullptr);
if (file->file_size() <=
blob_gc_->titan_cf_options().merge_small_file_threshold) {
metrics_.gc_small_file += 1;
*selected = true;
} else if (file->GetDiscardableRatio() >=
blob_gc_->titan_cf_options().blob_file_discardable_ratio) {
metrics_.gc_discardable += 1;
*selected = true;
}
if (*selected) return Status::OK();

// TODO: add do sample count metrics
// `records_size` won't be accurate if the file is version 1, but this method
// is planned to be removed soon.
auto records_size = file->file_size() - BlobFileHeader::kMaxEncodedLength -
BlobFileFooter::kEncodedLength;
Status s;
uint64_t sample_size_window = static_cast<uint64_t>(
records_size * blob_gc_->titan_cf_options().sample_file_size_ratio);
uint64_t sample_begin_offset = BlobFileHeader::kMaxEncodedLength;
if (records_size != sample_size_window) {
Random64 random64(records_size);
sample_begin_offset += random64.Uniform(records_size - sample_size_window);
}
std::unique_ptr<RandomAccessFileReader> file_reader;
const int readahead = 256 << 10;
s = NewBlobFileReader(file->file_number(), readahead, db_options_,
env_options_, env_, &file_reader);
if (!s.ok()) {
return s;
}
BlobFileIterator iter(std::move(file_reader), file->file_number(),
file->file_size(), blob_gc_->titan_cf_options());
iter.IterateForPrev(sample_begin_offset);
// TODO(@DorianZheng) sample_begin_offset maybe out of data block size, need
// more elegant solution
if (iter.status().IsInvalidArgument()) {
iter.IterateForPrev(BlobFileHeader::kMaxEncodedLength);
}
if (!iter.status().ok()) {
s = iter.status();
ROCKS_LOG_ERROR(db_options_.info_log,
"IterateForPrev failed, file number[%" PRIu64
"] size[%" PRIu64 "] status[%s]",
file->file_number(), file->file_size(),
s.ToString().c_str());
return s;
}

uint64_t iterated_size{0};
uint64_t discardable_size{0};
for (iter.Next();
iterated_size < sample_size_window && iter.status().ok() && iter.Valid();
iter.Next()) {
BlobIndex blob_index = iter.GetBlobIndex();
uint64_t total_length = blob_index.blob_handle.size;
iterated_size += total_length;
bool discardable = false;
s = DiscardEntry(iter.key(), blob_index, &discardable);
if (!s.ok()) {
return s;
}
if (discardable) {
discardable_size += total_length;
}
}
metrics_.bytes_read += iterated_size;
assert(iter.status().ok());

*selected =
discardable_size >=
std::ceil(sample_size_window *
blob_gc_->titan_cf_options().blob_file_discardable_ratio);
return s;
}

Status BlobGCJob::DoRunGC() {
Status s;

Expand Down Expand Up @@ -382,7 +266,7 @@ Status BlobGCJob::DoRunGC() {
Status BlobGCJob::BuildIterator(
std::unique_ptr<BlobFileMergeIterator>* result) {
Status s;
const auto& inputs = blob_gc_->sampled_inputs();
const auto& inputs = blob_gc_->inputs();
assert(!inputs.empty());
std::vector<std::unique_ptr<BlobFileIterator>> list;
for (std::size_t i = 0; i < inputs.size(); ++i) {
Expand Down Expand Up @@ -493,6 +377,7 @@ Status BlobGCJob::InstallOutputBlobFiles() {
builder.first->GetNumber(), builder.first->GetFile()->GetFileSize(),
0, 0, builder.second->GetSmallestKey(),
builder.second->GetLargestKey());
file->set_live_data_size(builder.second->live_data_size());
file->FileStateTransit(BlobFileMeta::FileEvent::kGCOutput);
RecordInHistogram(stats_, TitanStats::GC_OUTPUT_FILE_SIZE,
file->file_size());
Expand Down Expand Up @@ -593,7 +478,7 @@ Status BlobGCJob::DeleteInputBlobFiles() {
Status s;
VersionEdit edit;
edit.SetColumnFamilyID(blob_gc_->column_family_handle()->GetID());
for (const auto& file : blob_gc_->sampled_inputs()) {
for (const auto& file : blob_gc_->inputs()) {
ROCKS_LOG_INFO(db_options_.info_log,
"Titan add obsolete file [%" PRIu64 "] range [%s, %s]",
file->file_number(),
Expand Down
2 changes: 0 additions & 2 deletions src/blob_gc_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ class BlobGCJob {
uint64_t io_bytes_read_ = 0;
uint64_t io_bytes_written_ = 0;

Status SampleCandidateFiles();
Status DoSample(const BlobFileMeta* file, bool* selected);
Status DoRunGC();
Status BuildIterator(std::unique_ptr<BlobFileMergeIterator>* result);
Status DiscardEntry(const Slice& key, const BlobIndex& blob_index,
Expand Down
Loading

0 comments on commit ffaa9d1

Please sign in to comment.