Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update protection info on recovered logs data #9875

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,9 @@ Status DBImpl::RecoverLogFiles(std::vector<uint64_t>& wal_numbers,
// Read all the records and add to a memtable
std::string scratch;
Slice record;
WriteBatch batch;
// We create a new batch and make sure it has a valid prot_info_ to store
// the data checksums
WriteBatch batch(0, 0, 8, 0);

TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal",
/*arg=*/nullptr);
Expand Down
89 changes: 80 additions & 9 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,6 @@ struct SavePoints {
std::stack<SavePoint, autovector<SavePoint>> stack;
};

WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
: content_flags_(0), max_bytes_(max_bytes), rep_() {
rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
? reserved_bytes
: WriteBatchInternal::kHeader);
rep_.resize(WriteBatchInternal::kHeader);
}

WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes,
size_t protection_bytes_per_key, size_t default_cf_ts_sz)
: content_flags_(0),
Expand Down Expand Up @@ -2613,11 +2605,90 @@ Status WriteBatchInternal::InsertInto(
return s;
}

// This class adds checksums for a WriteBatch.
class ChecksumAdder : public WriteBatch::Handler {
akomurav marked this conversation as resolved.
Show resolved Hide resolved
public:
explicit ChecksumAdder(WriteBatch::ProtectionInfo* prot_info)
: prot_info_(prot_info) {}

~ChecksumAdder() override {}

Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override {
return add_checksum(cf, key, val, kTypeValue);
}

Status DeleteCF(uint32_t cf, const Slice& key) override {
return add_checksum(cf, key, "", kTypeDeletion);
}

Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
return add_checksum(cf, key, "", kTypeSingleDeletion);
}

Status DeleteRangeCF(uint32_t cf, const Slice& begin_key,
const Slice& end_key) override {
return add_checksum(cf, begin_key, end_key, kTypeRangeDeletion);
}

Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override {
return add_checksum(cf, key, val, kTypeMerge);
}

Status PutBlobIndexCF(uint32_t cf, const Slice& key,
const Slice& val) override {
return add_checksum(cf, key, val, kTypeValue);
}

Status MarkBeginPrepare(bool /* unprepare */) override {
return Status::OK();
}

Status MarkEndPrepare(const Slice& /* xid */) override {
return Status::OK();
}

Status MarkCommit(const Slice& /* xid */) override { return Status::OK(); }

Status MarkCommitWithTimestamp(const Slice& /* xid */,
const Slice& /* ts */) override {
return Status::OK();
}

Status MarkRollback(const Slice& /* xid */) override { return Status::OK(); }

Status MarkNoop(bool /* empty_batch */) override { return Status::OK(); }

private:
Status add_checksum(uint32_t cf, const Slice& key, const Slice& val,
const ValueType op_type) {
if (prot_info_) {
prot_info_->entries_.emplace_back(
ProtectionInfo64().ProtectKVO(key, val, op_type).ProtectC(cf));
}
return Status::OK();
}

// No copy or move.
ChecksumAdder(const ChecksumAdder&) = delete;
ChecksumAdder(ChecksumAdder&&) = delete;
ChecksumAdder& operator=(const ChecksumAdder&) = delete;
ChecksumAdder& operator=(ChecksumAdder&&) = delete;

WriteBatch::ProtectionInfo* const prot_info_ = nullptr;
};

Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
assert(contents.size() >= WriteBatchInternal::kHeader);
assert(b->prot_info_ == nullptr);

b->rep_.assign(contents.data(), contents.size());
b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);

// If we have a prot_info_, compute checksums for the batch.
if (b->prot_info_) {
ChecksumAdder cs_adder(b->prot_info_.get());
akomurav marked this conversation as resolved.
Show resolved Hide resolved
return b->Iterate(&cs_adder);
}

return Status::OK();
}

Expand Down
4 changes: 3 additions & 1 deletion include/rocksdb/write_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ struct SavePoint {

class WriteBatch : public WriteBatchBase {
public:
explicit WriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0);
explicit WriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0)
: WriteBatch(reserved_bytes, max_bytes, 0, 0) {}

// `protection_bytes_per_key` is the number of bytes used to store
// protection information for each key entry. Currently supported values are
// zero (disabled) and eight.
Expand Down