Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block per key-value checksum #11287

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Rocksdb Change Log
## Unreleased
### New Features
* Introduced a new option `block_protection_bytes_per_key`, which can be used to enable per key-value integrity protection for in-memory blocks in block cache (#11287).

## 8.2.0 (04/24/2023)
### Public API Changes
Expand Down
3 changes: 2 additions & 1 deletion db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,8 @@ Status BuildTable(
MaxFileSizeForL0MetaPin(mutable_cf_options),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key*/ nullptr,
/*allow_unprepared_value*/ false));
/*allow_unprepared_value*/ false,
mutable_cf_options.block_protection_bytes_per_key));
s = it->status();
if (s.ok() && paranoid_file_checks) {
OutputValidator file_validator(tboptions.internal_comparator,
Expand Down
6 changes: 6 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,12 @@ Status ColumnFamilyData::ValidateOptions(
"Memtable per key-value checksum protection only supports 0, 1, 2, 4 "
"or 8 bytes per key.");
}
if (std::find(supported.begin(), supported.end(),
cf_options.block_protection_bytes_per_key) == supported.end()) {
return Status::NotSupported(
"Block per key-value checksum protection only supports 0, 1, 2, 4 "
"or 8 bytes per key.");
}
return s;
}

Expand Down
8 changes: 6 additions & 2 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,9 @@ void CompactionJob::GenSubcompactionBoundaries() {
FileMetaData* f = flevel->files[i].file_metadata;
std::vector<TableReader::Anchor> my_anchors;
Status s = cfd->table_cache()->ApproximateKeyAnchors(
read_options, icomp, *f, my_anchors);
read_options, icomp, *f,
c->mutable_cf_options()->block_protection_bytes_per_key,
my_anchors);
if (!s.ok() || my_anchors.empty()) {
my_anchors.emplace_back(f->largest.user_key(), f->fd.GetFileSize());
}
Expand Down Expand Up @@ -735,7 +737,9 @@ Status CompactionJob::Run() {
*compact_->compaction->mutable_cf_options()),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr,
/*allow_unprepared_value=*/false);
/*allow_unprepared_value=*/false,
compact_->compaction->mutable_cf_options()
->block_protection_bytes_per_key);
auto s = iter->status();

if (s.ok() && paranoid_file_checks_) {
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,8 @@ class CompactionJobTestBase : public testing::Test {
Status s = cf_options_.table_factory->NewTableReader(
read_opts,
TableReaderOptions(*cfd->ioptions(), nullptr, FileOptions(),
cfd_->internal_comparator()),
cfd_->internal_comparator(),
0 /* block_protection_bytes_per_key */),
std::move(freader), file_size, &table_reader, false);
ASSERT_OK(s);
assert(table_reader);
Expand Down
4 changes: 2 additions & 2 deletions db/convenience.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ Status VerifySstFileChecksum(const Options& options,
const bool kImmortal = true;
auto reader_options = TableReaderOptions(
ioptions, options.prefix_extractor, env_options, internal_comparator,
false /* skip_filters */, !kImmortal, false /* force_direct_prefetch */,
-1 /* level */);
options.block_protection_bytes_per_key, false /* skip_filters */,
!kImmortal, false /* force_direct_prefetch */, -1 /* level */);
reader_options.largest_seqno = largest_seqno;
s = ioptions.table_factory->NewTableReader(
reader_options, std::move(file_reader), file_size, &table_reader,
Expand Down
1 change: 1 addition & 0 deletions db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
TableReaderOptions(
*cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor,
env_options_, cfd_->internal_comparator(),
sv->mutable_cf_options.block_protection_bytes_per_key,
/*skip_filters*/ false, /*immortal*/ false,
/*force_direct_prefetch*/ false, /*level*/ -1,
/*block_cache_tracer*/ nullptr,
Expand Down
21 changes: 14 additions & 7 deletions db/forward_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ForwardLevelIterator : public InternalIterator {
const ColumnFamilyData* const cfd, const ReadOptions& read_options,
const std::vector<FileMetaData*>& files,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
bool allow_unprepared_value)
bool allow_unprepared_value, uint8_t block_protection_bytes_per_key)
: cfd_(cfd),
read_options_(read_options),
files_(files),
Expand All @@ -45,7 +45,8 @@ class ForwardLevelIterator : public InternalIterator {
file_iter_(nullptr),
pinned_iters_mgr_(nullptr),
prefix_extractor_(prefix_extractor),
allow_unprepared_value_(allow_unprepared_value) {
allow_unprepared_value_(allow_unprepared_value),
block_protection_bytes_per_key_(block_protection_bytes_per_key) {
status_.PermitUncheckedError(); // Allow uninitialized status through
}

Expand Down Expand Up @@ -87,7 +88,8 @@ class ForwardLevelIterator : public InternalIterator {
/*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1,
/*max_file_size_for_l0_meta_pin=*/0,
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr, allow_unprepared_value_);
/*largest_compaction_key=*/nullptr, allow_unprepared_value_,
block_protection_bytes_per_key_);
file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
valid_ = false;
if (!range_del_agg.IsEmpty()) {
Expand Down Expand Up @@ -211,6 +213,7 @@ class ForwardLevelIterator : public InternalIterator {
// Kept alive by ForwardIterator::sv_->mutable_cf_options
const std::shared_ptr<const SliceTransform>& prefix_extractor_;
const bool allow_unprepared_value_;
const uint8_t block_protection_bytes_per_key_;
};

ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
Expand Down Expand Up @@ -738,7 +741,8 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
/*skip_filters=*/false, /*level=*/-1,
MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr, allow_unprepared_value_));
/*largest_compaction_key=*/nullptr, allow_unprepared_value_,
sv_->mutable_cf_options.block_protection_bytes_per_key));
}
BuildLevelIterators(vstorage, sv_);
current_ = nullptr;
Expand Down Expand Up @@ -819,7 +823,8 @@ void ForwardIterator::RenewIterators() {
/*skip_filters=*/false, /*level=*/-1,
MaxFileSizeForL0MetaPin(svnew->mutable_cf_options),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr, allow_unprepared_value_));
/*largest_compaction_key=*/nullptr, allow_unprepared_value_,
svnew->mutable_cf_options.block_protection_bytes_per_key));
}

for (auto* f : l0_iters_) {
Expand Down Expand Up @@ -863,7 +868,8 @@ void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage,
} else {
level_iters_.push_back(new ForwardLevelIterator(
cfd_, read_options_, level_files,
sv->mutable_cf_options.prefix_extractor, allow_unprepared_value_));
sv->mutable_cf_options.prefix_extractor, allow_unprepared_value_,
sv->mutable_cf_options.block_protection_bytes_per_key));
}
}
}
Expand All @@ -885,7 +891,8 @@ void ForwardIterator::ResetIncompleteIterators() {
/*skip_filters=*/false, /*level=*/-1,
MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr, allow_unprepared_value_);
/*largest_compaction_key=*/nullptr, allow_unprepared_value_,
sv_->mutable_cf_options.block_protection_bytes_per_key);
l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_);
}

Expand Down
1 change: 1 addition & 0 deletions db/import_column_family_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo(
TableReaderOptions(
*cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor,
env_options_, cfd_->internal_comparator(),
sv->mutable_cf_options.block_protection_bytes_per_key,
/*skip_filters*/ false, /*immortal*/ false,
/*force_direct_prefetch*/ false, /*level*/ -1,
/*block_cache_tracer*/ nullptr,
Expand Down
98 changes: 92 additions & 6 deletions db/kv_checksum.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ template <typename T>
class ProtectionInfoKVOC;
template <typename T>
class ProtectionInfoKVOS;
template <typename T>
class ProtectionInfoKV;

// Aliases for 64-bit protection infos.
using ProtectionInfo64 = ProtectionInfo<uint64_t>;
Expand All @@ -64,13 +66,13 @@ class ProtectionInfo {
ProtectionInfoKVO<T> ProtectKVO(const SliceParts& key,
const SliceParts& value,
ValueType op_type) const;

T GetVal() const { return val_; }
ProtectionInfoKV<T> ProtectKV(const Slice& key, const Slice& value) const;

private:
friend class ProtectionInfoKVO<T>;
friend class ProtectionInfoKVOS<T>;
friend class ProtectionInfoKVOC<T>;
friend class ProtectionInfoKV<T>;

// Each field is hashed with an independent value so we can catch fields being
// swapped. Per the `NPHash64()` docs, using consecutive seeds is a pitfall,
Expand All @@ -89,8 +91,47 @@ class ProtectionInfo {
static_assert(sizeof(ProtectionInfo<T>) == sizeof(T), "");
}

T GetVal() const { return val_; }
void SetVal(T val) { val_ = val; }

void Encode(uint8_t len, char* dst) const {
assert(sizeof(val_) >= len);
switch (len) {
case 1:
dst[0] = static_cast<uint8_t>(val_);
break;
case 2:
EncodeFixed16(dst, static_cast<uint16_t>(val_));
break;
case 4:
EncodeFixed32(dst, static_cast<uint32_t>(val_));
break;
case 8:
EncodeFixed64(dst, static_cast<uint64_t>(val_));
break;
default:
assert(false);
}
}

bool Verify(uint8_t len, const char* checksum_ptr) const {
assert(sizeof(val_) >= len);
switch (len) {
case 1:
return static_cast<uint8_t>(checksum_ptr[0]) ==
static_cast<uint8_t>(val_);
case 2:
return DecodeFixed16(checksum_ptr) == static_cast<uint16_t>(val_);
case 4:
return DecodeFixed32(checksum_ptr) == static_cast<uint32_t>(val_);
case 8:
return DecodeFixed64(checksum_ptr) == static_cast<uint64_t>(val_);
default:
assert(false);
return false;
}
}

T val_ = 0;
};

Expand All @@ -113,7 +154,14 @@ class ProtectionInfoKVO {
void UpdateV(const SliceParts& old_value, const SliceParts& new_value);
void UpdateO(ValueType old_op_type, ValueType new_op_type);

T GetVal() const { return info_.GetVal(); }
// Encode this protection info into `len` bytes and stores them in `dst`.
void Encode(uint8_t len, char* dst) const { info_.Encode(len, dst); }
// Verify this protection info against the protection info encoded by Encode()
// at the first `len` bytes of `checksum_ptr`.
// Returns true iff the verification is successful.
bool Verify(uint8_t len, const char* checksum_ptr) const {
return info_.Verify(len, checksum_ptr);
}

private:
friend class ProtectionInfo<T>;
Expand All @@ -124,6 +172,7 @@ class ProtectionInfoKVO {
static_assert(sizeof(ProtectionInfoKVO<T>) == sizeof(T), "");
}

T GetVal() const { return info_.GetVal(); }
void SetVal(T val) { info_.SetVal(val); }

ProtectionInfo<T> info_;
Expand Down Expand Up @@ -154,7 +203,10 @@ class ProtectionInfoKVOC {
void UpdateC(ColumnFamilyId old_column_family_id,
ColumnFamilyId new_column_family_id);

T GetVal() const { return kvo_.GetVal(); }
void Encode(uint8_t len, char* dst) const { kvo_.Encode(len, dst); }
bool Verify(uint8_t len, const char* checksum_ptr) const {
return kvo_.Verify(len, checksum_ptr);
}

private:
friend class ProtectionInfoKVO<T>;
Expand All @@ -163,6 +215,7 @@ class ProtectionInfoKVOC {
static_assert(sizeof(ProtectionInfoKVOC<T>) == sizeof(T), "");
}

T GetVal() const { return kvo_.GetVal(); }
void SetVal(T val) { kvo_.SetVal(val); }

ProtectionInfoKVO<T> kvo_;
Expand Down Expand Up @@ -193,7 +246,10 @@ class ProtectionInfoKVOS {
void UpdateS(SequenceNumber old_sequence_number,
SequenceNumber new_sequence_number);

T GetVal() const { return kvo_.GetVal(); }
void Encode(uint8_t len, char* dst) const { kvo_.Encode(len, dst); }
bool Verify(uint8_t len, const char* checksum_ptr) const {
return kvo_.Verify(len, checksum_ptr);
}

private:
friend class ProtectionInfoKVO<T>;
Expand All @@ -202,11 +258,32 @@ class ProtectionInfoKVOS {
static_assert(sizeof(ProtectionInfoKVOS<T>) == sizeof(T), "");
}

T GetVal() const { return kvo_.GetVal(); }
void SetVal(T val) { kvo_.SetVal(val); }

ProtectionInfoKVO<T> kvo_;
};

template <typename T>
class ProtectionInfoKV {
public:
ProtectionInfoKV() = default;

void Encode(uint8_t len, char* dst) const { info_.Encode(len, dst); }
bool Verify(uint8_t len, const char* checksum_ptr) const {
return info_.Verify(len, checksum_ptr);
}

private:
friend class ProtectionInfo<T>;

explicit ProtectionInfoKV(T val) : info_(val) {
static_assert(sizeof(ProtectionInfoKV<T>) == sizeof(T));
}

ProtectionInfo<T> info_;
};

template <typename T>
Status ProtectionInfo<T>::GetStatus() const {
if (val_ != 0) {
Expand Down Expand Up @@ -244,6 +321,16 @@ ProtectionInfoKVO<T> ProtectionInfo<T>::ProtectKVO(const SliceParts& key,
return ProtectionInfoKVO<T>(val);
}

template <typename T>
ProtectionInfoKV<T> ProtectionInfo<T>::ProtectKV(const Slice& key,
const Slice& value) const {
T val = GetVal();
val = val ^ static_cast<T>(GetSliceNPHash64(key, ProtectionInfo<T>::kSeedK));
val =
val ^ static_cast<T>(GetSliceNPHash64(value, ProtectionInfo<T>::kSeedV));
return ProtectionInfoKV<T>(val);
}

template <typename T>
void ProtectionInfoKVO<T>::UpdateK(const Slice& old_key, const Slice& new_key) {
T val = GetVal();
Expand Down Expand Up @@ -394,5 +481,4 @@ void ProtectionInfoKVOS<T>::UpdateS(SequenceNumber old_sequence_number,
sizeof(new_sequence_number), ProtectionInfo<T>::kSeedS));
SetVal(val);
}

} // namespace ROCKSDB_NAMESPACE
Loading