Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce options.check_flush_compaction_key_order #7467

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ set(SOURCES
db/memtable_list.cc
db/merge_helper.cc
db/merge_operator.cc
db/output_validator.cc
db/range_del_aggregator.cc
db/range_tombstone_fragmenter.cc
db/repair.cc
Expand Down
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

### New Features
* Methods to configure serialize, and compare -- such as TableFactory -- are exposed directly through the Configurable base class (from which these objects inherit). This change will allow for better and more thorough configuration management and retrieval in the future. The options for a Configurable object can be set via the ConfigureFromMap, ConfigureFromString, or ConfigureOption method. The serialized version of the options of an object can be retrieved via the GetOptionString, ToString, or GetOption methods. The list of options supported by an object can be obtained via the GetOptionNames method. The "raw" object (such as the BlockBasedTableOption) for an option may be retrieved via the GetOptions method. Configurable options can be compared via the AreEquivalent method. The settings within a Configurable object may be validated via the ValidateOptions method. The object may be intialized (at which point only mutable options may be updated) via the PrepareOptions method.
* Introduce options.check_flush_compaction_key_order with default value to be true. With this option, during flush and compaction, key order will be checked when writing to each SST file. If the order is violated, the flush or compaction will fail.

## 6.13 (09/12/2020)
### Bug fixes
Expand Down
1 change: 1 addition & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ cpp_library(
"db/memtable_list.cc",
"db/merge_helper.cc",
"db/merge_operator.cc",
"db/output_validator.cc",
"db/range_del_aggregator.cc",
"db/range_tombstone_fragmenter.cc",
"db/repair.cc",
Expand Down
56 changes: 31 additions & 25 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "db/event_helpers.h"
#include "db/internal_stats.h"
#include "db/merge_helper.h"
#include "db/output_validator.h"
#include "db/range_del_aggregator.h"
#include "db/table_cache.h"
#include "db/version_edit.h"
Expand Down Expand Up @@ -96,7 +97,11 @@ Status BuildTable(
column_family_name.empty());
// Reports the IOStats for flush for every following bytes.
const size_t kReportFlushIOStatsEvery = 1048576;
uint64_t paranoid_hash = 0;
OutputValidator output_validator(
internal_comparator,
/*enable_order_check=*/
mutable_cf_options.check_flush_compaction_key_order,
/*enable_hash=*/paranoid_file_checks);
Status s;
meta->fd.file_size = 0;
iter->SeekToFirst();
Expand Down Expand Up @@ -187,10 +192,10 @@ Status BuildTable(
const Slice& key = c_iter.key();
const Slice& value = c_iter.value();
const ParsedInternalKey& ikey = c_iter.ikey();
if (paranoid_file_checks) {
// Generate a rolling 64-bit hash of the key and values
paranoid_hash = Hash64(key.data(), key.size(), paranoid_hash);
paranoid_hash = Hash64(value.data(), value.size(), paranoid_hash);
// Generate a rolling 64-bit hash of the key and values
s = output_validator.Add(key, value);
if (!s.ok()) {
break;
}
builder->Add(key, value);
meta->UpdateBoundaries(key, value, ikey.sequence, ikey.type);
Expand All @@ -202,23 +207,24 @@ Status BuildTable(
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
}
}
if (s.ok()) {
auto range_del_it = range_del_agg->NewIterator();
for (range_del_it->SeekToFirst(); range_del_it->Valid();
range_del_it->Next()) {
auto tombstone = range_del_it->Tombstone();
auto kv = tombstone.Serialize();
builder->Add(kv.first.Encode(), kv.second);
meta->UpdateBoundariesForRange(kv.first, tombstone.SerializeEndKey(),
tombstone.seq_, internal_comparator);
}

auto range_del_it = range_del_agg->NewIterator();
for (range_del_it->SeekToFirst(); range_del_it->Valid();
range_del_it->Next()) {
auto tombstone = range_del_it->Tombstone();
auto kv = tombstone.Serialize();
builder->Add(kv.first.Encode(), kv.second);
meta->UpdateBoundariesForRange(kv.first, tombstone.SerializeEndKey(),
tombstone.seq_, internal_comparator);
}

// Finish and check for builder errors
s = c_iter.status();
// Finish and check for builder errors
s = c_iter.status();

if (blob_file_builder) {
if (s.ok()) {
s = blob_file_builder->Finish();
if (blob_file_builder) {
if (s.ok()) {
s = blob_file_builder->Finish();
}
}
}

Expand Down Expand Up @@ -291,15 +297,15 @@ Status BuildTable(
/*allow_unprepared_value*/ false));
s = it->status();
if (s.ok() && paranoid_file_checks) {
uint64_t check_hash = 0;
OutputValidator file_validator(internal_comparator,
/*enable_order_check=*/true,
/*enable_hash=*/true);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
// Generate a rolling 64-bit hash of the key and values
check_hash = Hash64(it->key().data(), it->key().size(), check_hash);
check_hash =
Hash64(it->value().data(), it->value().size(), check_hash);
file_validator.Add(it->key(), it->value()).PermitUncheckedError();
}
s = it->status();
if (s.ok() && check_hash != paranoid_hash) {
if (s.ok() && !output_validator.CompareValidator(file_validator)) {
s = Status::Corruption("Paranoid checksums do not match");
}
}
Expand Down
65 changes: 41 additions & 24 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/output_validator.h"
#include "db/range_del_aggregator.h"
#include "db/version_set.h"
#include "file/filename.h"
Expand Down Expand Up @@ -124,9 +125,14 @@ struct CompactionJob::SubcompactionState {

// Files produced by this subcompaction
struct Output {
Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp,
bool _enable_order_check, bool _enable_hash)
: meta(std::move(_meta)),
validator(_icmp, _enable_order_check, _enable_hash),
finished(false) {}
FileMetaData meta;
OutputValidator validator;
bool finished;
uint64_t paranoid_hash;
std::shared_ptr<const TableProperties> table_properties;
};

Expand Down Expand Up @@ -170,17 +176,16 @@ struct CompactionJob::SubcompactionState {

// Adds the key and value to the builder
// If paranoid is true, adds the key-value to the paranoid hash
void AddToBuilder(const Slice& key, const Slice& value, bool paranoid) {
Status AddToBuilder(const Slice& key, const Slice& value) {
auto curr = current_output();
assert(builder != nullptr);
assert(curr != nullptr);
if (paranoid) {
// Generate a rolling 64-bit hash of the key and values
curr->paranoid_hash = Hash64(key.data(), key.size(), curr->paranoid_hash);
curr->paranoid_hash =
Hash64(value.data(), value.size(), curr->paranoid_hash);
Status s = curr->validator.Add(key, value);
if (!s.ok()) {
return s;
}
builder->Add(key, value);
return Status::OK();
}

// Returns true iff we should stop building the current output
Expand Down Expand Up @@ -662,14 +667,20 @@ Status CompactionJob::Run() {
auto s = iter->status();

if (s.ok() && paranoid_file_checks_) {
uint64_t hash = 0;
OutputValidator validator(cfd->internal_comparator(),
/*_enable_order_check=*/true,
/*_enable_hash=*/true);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
// Generate a rolling 64-bit hash of the key and values, using the
hash = Hash64(iter->key().data(), iter->key().size(), hash);
hash = Hash64(iter->value().data(), iter->value().size(), hash);
s = validator.Add(iter->key(), iter->value());
if (!s.ok()) {
break;
}
}
s = iter->status();
if (s.ok() && hash != files_output[file_idx]->paranoid_hash) {
if (s.ok()) {
s = iter->status();
}
if (s.ok() &&
!validator.CompareValidator(files_output[file_idx]->validator)) {
s = Status::Corruption("Paranoid checksums do not match");
}
}
Expand Down Expand Up @@ -961,7 +972,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
break;
}
}
sub_compact->AddToBuilder(key, value, paranoid_file_checks_);
status = sub_compact->AddToBuilder(key, value);
if (!status.ok()) {
break;
}

sub_compact->current_output_file_size =
sub_compact->builder->EstimatedFileSize();
Expand Down Expand Up @@ -1276,8 +1290,8 @@ Status CompactionJob::FinishCompactionOutputFile(
auto kv = tombstone.Serialize();
assert(lower_bound == nullptr ||
ucmp->Compare(*lower_bound, kv.second) < 0);
sub_compact->AddToBuilder(kv.first.Encode(), kv.second,
paranoid_file_checks_);
// Range tombstone is not supported by output validator yet.
sub_compact->builder->Add(kv.first.Encode(), kv.second);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes the problem with paranoid_file_checks + DeleteRange(), right? @mrambacher would it solve the problem to extract this into a separate PR that we can easily backport to 6.12 and 6.13?

InternalKey smallest_candidate = std::move(kv.first);
if (lower_bound != nullptr &&
ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {
Expand Down Expand Up @@ -1594,14 +1608,17 @@ Status CompactionJob::OpenCompactionOutputFile(

// Initialize a SubcompactionState::Output and add it to sub_compact->outputs
{
SubcompactionState::Output out;
out.meta.fd = FileDescriptor(file_number,
sub_compact->compaction->output_path_id(), 0);
out.meta.oldest_ancester_time = oldest_ancester_time;
out.meta.file_creation_time = current_time;
out.finished = false;
out.paranoid_hash = 0;
sub_compact->outputs.push_back(out);
FileMetaData meta;
meta.fd = FileDescriptor(file_number,
sub_compact->compaction->output_path_id(), 0);
meta.oldest_ancester_time = oldest_ancester_time;
meta.file_creation_time = current_time;
sub_compact->outputs.emplace_back(
std::move(meta), cfd->internal_comparator(),
/*enable_order_check=*/
sub_compact->compaction->mutable_cf_options()
->check_flush_compaction_key_order,
/*enable_hash=*/paranoid_file_checks_);
}

writable_file->SetIOPriority(Env::IOPriority::IO_LOW);
Expand Down
22 changes: 14 additions & 8 deletions db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class CompactionJobTest : public testing::Test {
return blob_index;
}

void AddMockFile(const stl_wrappers::KVMap& contents, int level = 0) {
void AddMockFile(const mock::KVVector& contents, int level = 0) {
assert(contents.size() > 0);

bool first_key = true;
Expand Down Expand Up @@ -205,8 +205,8 @@ class CompactionJobTest : public testing::Test {
}

// returns expected result after compaction
stl_wrappers::KVMap CreateTwoFiles(bool gen_corrupted_keys) {
auto expected_results = mock::MakeMockFile();
mock::KVVector CreateTwoFiles(bool gen_corrupted_keys) {
stl_wrappers::KVMap expected_results;
const int kKeysPerFile = 10000;
const int kCorruptKeysPerFile = 200;
const int kMatchingKeys = kKeysPerFile / 2;
Expand All @@ -232,19 +232,25 @@ class CompactionJobTest : public testing::Test {
test::CorruptKeyType(&internal_key);
test::CorruptKeyType(&bottommost_internal_key);
}
contents.insert({ internal_key.Encode().ToString(), value });
contents.push_back({internal_key.Encode().ToString(), value});
if (i == 1 || k < kMatchingKeys || corrupt_id(k - kMatchingKeys)) {
expected_results.insert(
{ bottommost_internal_key.Encode().ToString(), value });
{bottommost_internal_key.Encode().ToString(), value});
}
}
mock::SortKVVector(&contents);

AddMockFile(contents);
}

SetLastSequence(sequence_number);

return expected_results;
mock::KVVector expected_results_kvvector;
for (auto& kv : expected_results) {
expected_results_kvvector.push_back({kv.first, kv.second});
}

return expected_results_kvvector;
}

void NewDB() {
Expand Down Expand Up @@ -299,7 +305,7 @@ class CompactionJobTest : public testing::Test {

void RunCompaction(
const std::vector<std::vector<FileMetaData*>>& input_files,
const stl_wrappers::KVMap& expected_results,
const mock::KVVector& expected_results,
const std::vector<SequenceNumber>& snapshots = {},
SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
int output_level = 1, bool verify = true,
Expand Down Expand Up @@ -644,7 +650,7 @@ TEST_F(CompactionJobTest, FilterAllMergeOperands) {
SetLastSequence(11U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);

stl_wrappers::KVMap empty_map;
mock::KVVector empty_map;
RunCompaction({files}, empty_map);
}

Expand Down
Loading