From 96574bd09c1cd03c19c0743df98e2e34ae3df2a8 Mon Sep 17 00:00:00 2001 From: Yang Zhang Date: Fri, 27 Sep 2024 00:40:04 -0700 Subject: [PATCH] Enrich write batch (#379) Signed-off-by: Little-Wallace Signed-off-by: tabokie Signed-off-by: Yang Zhang Co-authored-by: Wallace --- db/write_batch.cc | 36 +++++++++++++++++++++++++++++++ db/write_batch_internal.h | 2 ++ include/rocksdb/write_batch.h | 40 +++++++++++++++++++++++++++++++++++ 3 files changed, 78 insertions(+) diff --git a/db/write_batch.cc b/db/write_batch.cc index 75f6e1eb481..e734bb3f617 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -3100,6 +3100,16 @@ Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { return Status::OK(); } +Status WriteBatchInternal::AppendContents(WriteBatch* dst, + const Slice& content) { + size_t src_len = content.size() - WriteBatchInternal::kHeader; + SetCount(dst, Count(dst) + DecodeFixed32(content.data() + 8)); + assert(content.size() >= WriteBatchInternal::kHeader); + dst->rep_.append(content.data() + WriteBatchInternal::kHeader, src_len); + dst->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed); + return Status::OK(); +} + Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src, const bool wal_only) { assert(dst->Count() == 0 || @@ -3191,4 +3201,30 @@ Status WriteBatchInternal::UpdateProtectionInfo(WriteBatch* wb, "WriteBatch protection info must be zero or eight bytes/key"); } +void WriteBatch::Iterator::SeekToFirst() { + input_ = rep_; + if (input_.size() < WriteBatchInternal::kHeader) { + valid_ = false; + return; + } + input_.remove_prefix(WriteBatchInternal::kHeader); + valid_ = true; + Next(); +} + +void WriteBatch::Iterator::Next() { + if (input_.empty() || !valid_) { + valid_ = false; + return; + } + Slice blob, xid; + Status s = ReadRecordFromWriteBatch(&input_, &tag_, &column_family_, &key_, + &value_, &blob, &xid); + valid_ = s.ok(); +} + +int WriteBatch::WriteBatchRef::Count() const { + return DecodeFixed32(rep_.data() + 8); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 52bbe4545b4..36e7f71f4c1 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -211,6 +211,8 @@ class WriteBatchInternal { static Status Append(WriteBatch* dst, const WriteBatch* src, const bool WAL_only = false); + static Status AppendContents(WriteBatch* dst, const Slice& content); + // Returns the byte size of appending a WriteBatch with ByteSize // leftByteSize and a WriteBatch with ByteSize rightByteSize static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize); diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 5c87f940581..7740a65968b 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -368,6 +368,8 @@ class WriteBatch : public WriteBatchBase { } }; Status Iterate(Handler* handler) const; + class Iterator; + Iterator* NewIterator() const { return new Iterator(rep_); } // Retrieve the serialized version of this batch. const std::string& Data() const { return rep_; } @@ -506,6 +508,44 @@ class WriteBatch : public WriteBatchBase { protected: std::string rep_; // See comment in write_batch.cc for the format of rep_ + public: + class Iterator { + private: + Slice rep_; + Slice input_; + Slice key_; + Slice value_; + uint32_t column_family_; + char tag_; + bool valid_; + + public: + explicit Iterator(const Slice& rep) : rep_(rep), valid_(false) {} + + bool Valid() const { return valid_; } + + Slice Key() const { return key_; } + + Slice Value() const { return value_; } + + uint32_t GetColumnFamilyId() const { return column_family_; } + + char GetValueType() const { return tag_; }; + + void SeekToFirst(); + + void Next(); + }; + class WriteBatchRef { + public: + explicit WriteBatchRef(const Slice& rep) : rep_(rep) {} + Iterator* NewIterator() const { return new Iterator(rep_); } + + int Count() const; + + private: + const Slice& rep_; + }; }; } // namespace ROCKSDB_NAMESPACE