Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update protection info on recovered logs data #9875

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,6 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
// Read all the records and add to a memtable
std::string scratch;
Slice record;
WriteBatch batch;

TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal",
/*arg=*/nullptr);
Expand All @@ -961,7 +960,13 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
continue;
}

// We create a new batch and initialize with a valid prot_info_ to store
// the data checksums; prot_info_ might be reset below
WriteBatch batch(0, 0, 8, 0);
status = WriteBatchInternal::SetContents(&batch, record);
// If no prot_info_ entries end up being created for `record`, reset
// prot_info_
batch.ClearProtectionInfoIfEmpty();
akomurav marked this conversation as resolved.
Show resolved Hide resolved
if (!status.ok()) {
return status;
}
Expand Down
194 changes: 170 additions & 24 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,6 @@ struct SavePoints {
std::stack<SavePoint, autovector<SavePoint>> stack;
};

WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
: content_flags_(0), max_bytes_(max_bytes), rep_() {
rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
? reserved_bytes
: WriteBatchInternal::kHeader);
rep_.resize(WriteBatchInternal::kHeader);
}

WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes,
size_t protection_bytes_per_key, size_t default_cf_ts_sz)
: content_flags_(0),
Expand Down Expand Up @@ -311,6 +303,12 @@ bool WriteBatch::HasMerge() const {
return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0;
}

void WriteBatch::ClearProtectionInfoIfEmpty() {
if (prot_info_->entries_.empty()) {
prot_info_.reset(nullptr);
}
}

bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) {
assert(input != nullptr && key != nullptr);
// Skip tag byte
Expand Down Expand Up @@ -580,14 +578,16 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
s = handler->MarkBeginPrepare();
assert(s.ok());
empty_batch = false;
if (!handler->WriteAfterCommit()) {
if (handler->WriteAfterCommit() ==
WriteBatch::Handler::OptionState::kDisabled) {
s = Status::NotSupported(
"WriteCommitted txn tag when write_after_commit_ is disabled (in "
"WritePrepared/WriteUnprepared mode). If it is not due to "
"corruption, the WAL must be emptied before changing the "
"WritePolicy.");
}
if (handler->WriteBeforePrepare()) {
if (handler->WriteBeforePrepare() ==
WriteBatch::Handler::OptionState::kEnabled) {
s = Status::NotSupported(
"WriteCommitted txn tag when write_before_prepare_ is enabled "
"(in WriteUnprepared mode). If it is not due to corruption, the "
Expand All @@ -600,7 +600,8 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
s = handler->MarkBeginPrepare();
assert(s.ok());
empty_batch = false;
if (handler->WriteAfterCommit()) {
if (handler->WriteAfterCommit() ==
WriteBatch::Handler::OptionState::kEnabled) {
s = Status::NotSupported(
"WritePrepared/WriteUnprepared txn tag when write_after_commit_ "
"is enabled (in default WriteCommitted mode). If it is not due "
Expand All @@ -614,13 +615,15 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
s = handler->MarkBeginPrepare(true /* unprepared */);
assert(s.ok());
empty_batch = false;
if (handler->WriteAfterCommit()) {
if (handler->WriteAfterCommit() ==
WriteBatch::Handler::OptionState::kEnabled) {
s = Status::NotSupported(
"WriteUnprepared txn tag when write_after_commit_ is enabled (in "
"default WriteCommitted mode). If it is not due to corruption, "
"the WAL must be emptied before changing the WritePolicy.");
}
if (!handler->WriteBeforePrepare()) {
if (handler->WriteBeforePrepare() ==
WriteBatch::Handler::OptionState::kDisabled) {
s = Status::NotSupported(
"WriteUnprepared txn tag when write_before_prepare_ is disabled "
"(in WriteCommitted/WritePrepared mode). If it is not due to "
Expand Down Expand Up @@ -1581,9 +1584,24 @@ class MemTableInserter : public WriteBatch::Handler {
return res;
}

void DecrementProtectionInfoIdxForTryAgain() {
if (prot_info_ != nullptr) --prot_info_idx_;
}

void ResetProtectionInfo() {
prot_info_idx_ = 0;
prot_info_ = nullptr;
}

protected:
bool WriteBeforePrepare() const override { return write_before_prepare_; }
bool WriteAfterCommit() const override { return write_after_commit_; }
Handler::OptionState WriteBeforePrepare() const override {
return write_before_prepare_ ? Handler::OptionState::kEnabled
: Handler::OptionState::kDisabled;
}
Handler::OptionState WriteAfterCommit() const override {
return write_after_commit_ ? Handler::OptionState::kEnabled
: Handler::OptionState::kDisabled;
}

public:
// cf_mems should not be shared with concurrent inserters
Expand Down Expand Up @@ -1871,15 +1889,25 @@ class MemTableInserter : public WriteBatch::Handler {
Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
const auto* kv_prot_info = NextProtectionInfo();
Status ret_status;
if (kv_prot_info != nullptr) {
// Memtable needs seqno, doesn't need CF ID
auto mem_kv_prot_info =
kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
return PutCFImpl(column_family_id, key, value, kTypeValue,
&mem_kv_prot_info);
ret_status = PutCFImpl(column_family_id, key, value, kTypeValue,
&mem_kv_prot_info);
} else {
ret_status = PutCFImpl(column_family_id, key, value, kTypeValue,
nullptr /* kv_prot_info */);
}
return PutCFImpl(column_family_id, key, value, kTypeValue,
nullptr /* kv_prot_info */);
// TODO: this assumes that if TryAgain status is returned to the caller,
// the operation is actually tried again. The proper way to do this is to
// pass a `try_again` parameter to the operation itself and decrement
// prot_info_idx_ based on that
akomurav marked this conversation as resolved.
Show resolved Hide resolved
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}

Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key,
Expand Down Expand Up @@ -1926,6 +1954,9 @@ class MemTableInserter : public WriteBatch::Handler {
} else if (ret_status.ok()) {
MaybeAdvanceSeq(false /* batch_boundary */);
}
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}

Expand Down Expand Up @@ -1957,6 +1988,9 @@ class MemTableInserter : public WriteBatch::Handler {
ret_status =
WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
}
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}

Expand Down Expand Up @@ -1985,6 +2019,9 @@ class MemTableInserter : public WriteBatch::Handler {
} else if (ret_status.ok()) {
MaybeAdvanceSeq(false /* batch_boundary */);
}
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}
assert(ret_status.ok());
Expand All @@ -2009,6 +2046,9 @@ class MemTableInserter : public WriteBatch::Handler {
ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
column_family_id, key);
}
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}

Expand Down Expand Up @@ -2038,6 +2078,9 @@ class MemTableInserter : public WriteBatch::Handler {
} else if (ret_status.ok()) {
MaybeAdvanceSeq(false /* batch_boundary */);
}
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}
assert(ret_status.ok());
Expand Down Expand Up @@ -2092,6 +2135,9 @@ class MemTableInserter : public WriteBatch::Handler {
ret_status = WriteBatchInternal::DeleteRange(
rebuilding_trx_, column_family_id, begin_key, end_key);
}
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}

Expand Down Expand Up @@ -2121,6 +2167,9 @@ class MemTableInserter : public WriteBatch::Handler {
} else if (ret_status.ok()) {
MaybeAdvanceSeq(false /* batch_boundary */);
}
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}
assert(ret_status.ok());
Expand Down Expand Up @@ -2242,23 +2291,31 @@ class MemTableInserter : public WriteBatch::Handler {
ret_status = WriteBatchInternal::Merge(rebuilding_trx_, column_family_id,
key, value);
}
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}

Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
const auto* kv_prot_info = NextProtectionInfo();
Status ret_status;
if (kv_prot_info != nullptr) {
// Memtable needs seqno, doesn't need CF ID
auto mem_kv_prot_info =
kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
// Same as PutCF except for value type.
return PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
&mem_kv_prot_info);
ret_status = PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
&mem_kv_prot_info);
} else {
return PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
nullptr /* kv_prot_info */);
ret_status = PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
nullptr /* kv_prot_info */);
}
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}

void CheckMemtableFull() {
Expand Down Expand Up @@ -2401,6 +2458,7 @@ class MemTableInserter : public WriteBatch::Handler {
const auto& batch_info = trx->batches_.begin()->second;
// all inserts must reference this trx log number
log_number_ref_ = batch_info.log_number_;
ResetProtectionInfo();
s = batch_info.batch_->Iterate(this);
log_number_ref_ = 0;
}
Expand All @@ -2422,6 +2480,10 @@ class MemTableInserter : public WriteBatch::Handler {
const bool batch_boundry = true;
MaybeAdvanceSeq(batch_boundry);

if (UNLIKELY(s.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}

return s;
}

Expand Down Expand Up @@ -2466,6 +2528,7 @@ class MemTableInserter : public WriteBatch::Handler {
return ucmp->timestamp_size();
});
if (s.ok()) {
ResetProtectionInfo();
s = batch_info.batch_->Iterate(this);
log_number_ref_ = 0;
}
Expand All @@ -2488,6 +2551,10 @@ class MemTableInserter : public WriteBatch::Handler {
constexpr bool batch_boundary = true;
MaybeAdvanceSeq(batch_boundary);

if (UNLIKELY(s.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}

return s;
}

Expand Down Expand Up @@ -2613,11 +2680,90 @@ Status WriteBatchInternal::InsertInto(
return s;
}

// This class updates protection info for a WriteBatch.
class ProtectionInfoUpdater : public WriteBatch::Handler {
public:
explicit ProtectionInfoUpdater(WriteBatch::ProtectionInfo* prot_info)
: prot_info_(prot_info) {}

~ProtectionInfoUpdater() override {}

Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override {
return update_prot_info(cf, key, val, kTypeValue);
}

Status DeleteCF(uint32_t cf, const Slice& key) override {
return update_prot_info(cf, key, "", kTypeDeletion);
}

Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
return update_prot_info(cf, key, "", kTypeSingleDeletion);
}

Status DeleteRangeCF(uint32_t cf, const Slice& begin_key,
const Slice& end_key) override {
return update_prot_info(cf, begin_key, end_key, kTypeRangeDeletion);
}

Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override {
return update_prot_info(cf, key, val, kTypeMerge);
}

Status PutBlobIndexCF(uint32_t cf, const Slice& key,
const Slice& val) override {
return update_prot_info(cf, key, val, kTypeBlobIndex);
}

Status MarkBeginPrepare(bool /* unprepare */) override {
return Status::OK();
}

Status MarkEndPrepare(const Slice& /* xid */) override {
return Status::OK();
}

Status MarkCommit(const Slice& /* xid */) override { return Status::OK(); }

Status MarkCommitWithTimestamp(const Slice& /* xid */,
const Slice& /* ts */) override {
return Status::OK();
}

Status MarkRollback(const Slice& /* xid */) override { return Status::OK(); }

Status MarkNoop(bool /* empty_batch */) override { return Status::OK(); }

private:
Status update_prot_info(uint32_t cf, const Slice& key, const Slice& val,
akomurav marked this conversation as resolved.
Show resolved Hide resolved
const ValueType op_type) {
if (prot_info_) {
prot_info_->entries_.emplace_back(
ProtectionInfo64().ProtectKVO(key, val, op_type).ProtectC(cf));
}
return Status::OK();
}

// No copy or move.
ProtectionInfoUpdater(const ProtectionInfoUpdater&) = delete;
ProtectionInfoUpdater(ProtectionInfoUpdater&&) = delete;
ProtectionInfoUpdater& operator=(const ProtectionInfoUpdater&) = delete;
ProtectionInfoUpdater& operator=(ProtectionInfoUpdater&&) = delete;

WriteBatch::ProtectionInfo* const prot_info_ = nullptr;
};

Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
assert(contents.size() >= WriteBatchInternal::kHeader);
assert(b->prot_info_ == nullptr);

b->rep_.assign(contents.data(), contents.size());
b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);

// If we have a prot_info_, update protection info entries for the batch.
if (b->prot_info_) {
ProtectionInfoUpdater prot_info_updater(b->prot_info_.get());
return b->Iterate(&prot_info_updater);
}

return Status::OK();
}

Expand Down
Loading