From 201ba83dca6c5ba5bd0e2323578f9be4eb7f5979 Mon Sep 17 00:00:00 2001 From: Myth Date: Fri, 26 Jan 2024 10:04:56 +0800 Subject: [PATCH] Implement the unify WAL iterator (#2040) Implement the unified WAL iterator for kvrocks, similar to the DBIterator. It will wrap rocksdb WAL Iterator and can return different item types. It is possible to implement their processing logic depending on the concrete type. --- src/storage/iterator.cc | 129 +++++++- src/storage/iterator.h | 82 ++++++ src/storage/redis_metadata.cc | 11 + src/storage/redis_metadata.h | 1 + tests/cppunit/iterator_test.cc | 519 ++++++++++++++++++++++++++++++++- 5 files changed, 725 insertions(+), 17 deletions(-) diff --git a/src/storage/iterator.cc b/src/storage/iterator.cc index 3717afb00a5..6514207b35d 100644 --- a/src/storage/iterator.cc +++ b/src/storage/iterator.cc @@ -25,7 +25,7 @@ #include "db_util.h" namespace engine { -DBIterator::DBIterator(Storage* storage, rocksdb::ReadOptions read_options, int slot) +DBIterator::DBIterator(Storage *storage, rocksdb::ReadOptions read_options, int slot) : storage_(storage), read_options_(std::move(read_options)), slot_(slot) { metadata_cf_handle_ = storage_->GetCFHandle(kMetadataColumnFamilyName); metadata_iter_ = util::UniqueIterator(storage_->NewIterator(read_options_, metadata_cf_handle_)); @@ -80,7 +80,7 @@ void DBIterator::Reset() { if (metadata_iter_) metadata_iter_.reset(); } -void DBIterator::Seek(const std::string& target) { +void DBIterator::Seek(const std::string &target) { if (!metadata_iter_) return; // Iterate with the slot id but storage didn't enable slot id encoding @@ -112,7 +112,7 @@ std::unique_ptr DBIterator::GetSubKeyIterator() const { return std::make_unique(storage_, read_options_, type, std::move(prefix)); } -SubKeyIterator::SubKeyIterator(Storage* storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix) +SubKeyIterator::SubKeyIterator(Storage *storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix) : storage_(storage), read_options_(std::move(read_options)), type_(type), prefix_(std::move(prefix)) { if (type_ == kRedisStream) { cf_handle_ = storage_->GetCFHandle(kStreamColumnFamilyName); @@ -145,7 +145,7 @@ Slice SubKeyIterator::UserKey() const { return internal_key.GetSubKey(); } -rocksdb::ColumnFamilyHandle* SubKeyIterator::ColumnFamilyHandle() const { return Valid() ? this->cf_handle_ : nullptr; } +rocksdb::ColumnFamilyHandle *SubKeyIterator::ColumnFamilyHandle() const { return Valid() ? this->cf_handle_ : nullptr; } Slice SubKeyIterator::Value() const { return Valid() ? iter_->value() : Slice(); } @@ -164,4 +164,125 @@ void SubKeyIterator::Reset() { if (iter_) iter_.reset(); } +rocksdb::Status WALBatchExtractor::PutCF(uint32_t column_family_id, const Slice &key, const Slice &value) { + if (slot_ != -1 && slot_ != ExtractSlotId(key)) { + return rocksdb::Status::OK(); + } + items_.emplace_back(WALItem::Type::kTypePut, column_family_id, key.ToString(), value.ToString()); + return rocksdb::Status::OK(); +} + +rocksdb::Status WALBatchExtractor::DeleteCF(uint32_t column_family_id, const rocksdb::Slice &key) { + if (slot_ != -1 && slot_ != ExtractSlotId(key)) { + return rocksdb::Status::OK(); + } + items_.emplace_back(WALItem::Type::kTypeDelete, column_family_id, key.ToString(), std::string{}); + return rocksdb::Status::OK(); +} + +rocksdb::Status WALBatchExtractor::DeleteRangeCF(uint32_t column_family_id, const rocksdb::Slice &begin_key, + const rocksdb::Slice &end_key) { + items_.emplace_back(WALItem::Type::kTypeDeleteRange, column_family_id, begin_key.ToString(), end_key.ToString()); + return rocksdb::Status::OK(); +} + +void WALBatchExtractor::LogData(const rocksdb::Slice &blob) { + items_.emplace_back(WALItem::Type::kTypeLogData, 0, blob.ToString(), std::string{}); +}; + +void WALBatchExtractor::Clear() { items_.clear(); } + +WALBatchExtractor::Iter WALBatchExtractor::GetIter() { return Iter(&items_); } + +bool WALBatchExtractor::Iter::Valid() { return items_ && cur_ < items_->size(); } + +void WALBatchExtractor::Iter::Next() { cur_++; } + +WALItem WALBatchExtractor::Iter::Value() { + if (!Valid()) { + return {}; + } + return (*items_)[cur_]; +} + +void WALIterator::Reset() { + if (iter_) { + iter_.reset(); + } + if (batch_iter_) { + batch_iter_.reset(); + } + extractor_.Clear(); + next_batch_seq_ = 0; +} + +bool WALIterator::Valid() const { return (batch_iter_ && batch_iter_->Valid()) || (iter_ && iter_->Valid()); } + +void WALIterator::nextBatch() { + if (!iter_ || !iter_->Valid()) { + Reset(); + return; + } + + auto batch = iter_->GetBatch(); + if (batch.sequence != next_batch_seq_ || !batch.writeBatchPtr) { + Reset(); + return; + } + + extractor_.Clear(); + + auto s = batch.writeBatchPtr->Iterate(&extractor_); + if (!s.ok()) { + Reset(); + return; + } + + next_batch_seq_ += batch.writeBatchPtr->Count(); + batch_iter_ = std::make_unique(extractor_.GetIter()); +} + +void WALIterator::Seek(rocksdb::SequenceNumber seq) { + if (slot_ != -1 && !storage_->IsSlotIdEncoded()) { + Reset(); + return; + } + + auto s = storage_->GetWALIter(seq, &iter_); + if (!s.IsOK()) { + Reset(); + return; + } + + next_batch_seq_ = seq; + + nextBatch(); +} + +WALItem WALIterator::Item() { + if (batch_iter_ && batch_iter_->Valid()) { + return batch_iter_->Value(); + } + return {}; +} + +rocksdb::SequenceNumber WALIterator::NextSequenceNumber() const { return next_batch_seq_; } + +void WALIterator::Next() { + if (!Valid()) { + Reset(); + return; + } + + if (batch_iter_ && batch_iter_->Valid()) { + batch_iter_->Next(); + if (batch_iter_->Valid()) { + return; + } + } + + iter_->Next(); + nextBatch(); +} + } // namespace engine diff --git a/src/storage/iterator.h b/src/storage/iterator.h index c257df6fdc5..2f123630c3e 100644 --- a/src/storage/iterator.h +++ b/src/storage/iterator.h @@ -80,4 +80,86 @@ class DBIterator { std::unique_ptr subkey_iter_; }; +struct WALItem { + enum class Type : uint8_t { + kTypeInvalid = 0, + kTypeLogData = 1, + kTypePut = 2, + kTypeDelete = 3, + kTypeDeleteRange = 4, + }; + + WALItem() = default; + WALItem(WALItem::Type t, uint32_t cf_id, std::string k, std::string v) + : type(t), column_family_id(cf_id), key(std::move(k)), value(std::move(v)) {} + + WALItem::Type type = WALItem::Type::kTypeInvalid; + uint32_t column_family_id = 0; + std::string key; + std::string value; +}; + +class WALBatchExtractor : public rocksdb::WriteBatch::Handler { + public: + // If set slot, storage must enable slot id encoding + explicit WALBatchExtractor(int slot = -1) : slot_(slot) {} + + rocksdb::Status PutCF(uint32_t column_family_id, const Slice &key, const Slice &value) override; + + rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice &key) override; + + rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const rocksdb::Slice &begin_key, + const rocksdb::Slice &end_key) override; + + void LogData(const rocksdb::Slice &blob) override; + + void Clear(); + + class Iter { + friend class WALBatchExtractor; + + public: + bool Valid(); + void Next(); + WALItem Value(); + + private: + explicit Iter(std::vector *items) : items_(items), cur_(0) {} + std::vector *items_; + size_t cur_; + }; + + WALBatchExtractor::Iter GetIter(); + + private: + std::vector items_; + int slot_; +}; + +class WALIterator { + public: + explicit WALIterator(engine::Storage *storage, int slot = -1) + : storage_(storage), slot_(slot), extractor_(slot), next_batch_seq_(0){}; + ~WALIterator() = default; + + bool Valid() const; + void Seek(rocksdb::SequenceNumber seq); + void Next(); + WALItem Item(); + + rocksdb::SequenceNumber NextSequenceNumber() const; + void Reset(); + + private: + void nextBatch(); + + engine::Storage *storage_; + int slot_; + + std::unique_ptr iter_; + WALBatchExtractor extractor_; + std::unique_ptr batch_iter_; + rocksdb::SequenceNumber next_batch_seq_; +}; + } // namespace engine diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc index 458ab4e880e..1bca93d77dc 100644 --- a/src/storage/redis_metadata.cc +++ b/src/storage/redis_metadata.cc @@ -101,6 +101,17 @@ bool InternalKey::operator==(const InternalKey &that) const { return version_ == that.version_; } +// Must slot encoded +uint16_t ExtractSlotId(Slice ns_key) { + uint8_t namespace_size = 0; + GetFixed8(&ns_key, &namespace_size); + ns_key.remove_prefix(namespace_size); + + uint16_t slot_id = HASH_SLOTS_SIZE; + GetFixed16(&ns_key, &slot_id); + return slot_id; +} + template std::tuple ExtractNamespaceKey(Slice ns_key, bool slot_id_encoded) { uint8_t namespace_size = 0; diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h index 8fe52f3910e..ce1026443af 100644 --- a/src/storage/redis_metadata.h +++ b/src/storage/redis_metadata.h @@ -105,6 +105,7 @@ struct KeyNumStats { uint64_t avg_ttl = 0; }; +[[nodiscard]] uint16_t ExtractSlotId(Slice ns_key); template [[nodiscard]] std::tuple ExtractNamespaceKey(Slice ns_key, bool slot_id_encoded); [[nodiscard]] std::string ComposeNamespaceKey(const Slice &ns, const Slice &key, bool slot_id_encoded); diff --git a/tests/cppunit/iterator_test.cc b/tests/cppunit/iterator_test.cc index 4bbd24089ea..6a2437a126b 100644 --- a/tests/cppunit/iterator_test.cc +++ b/tests/cppunit/iterator_test.cc @@ -19,6 +19,7 @@ */ #include +#include #include #include #include @@ -32,10 +33,10 @@ #include "test_base.h" #include "types/redis_string.h" -class IteratorTest : public TestBase { +class DBIteratorTest : public TestBase { protected: - explicit IteratorTest() = default; - ~IteratorTest() override = default; + explicit DBIteratorTest() = default; + ~DBIteratorTest() override = default; void SetUp() override { { // string @@ -112,7 +113,7 @@ class IteratorTest : public TestBase { } }; -TEST_F(IteratorTest, AllKeys) { +TEST_F(DBIteratorTest, AllKeys) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); std::vector live_keys = {"a", "b", "d", "hash-1", "set-1", "zset-1", "list-1", "stream-1", "bitmap-1", "json-1", "json-2", "json-3", "sortedint-1"}; @@ -126,7 +127,7 @@ TEST_F(IteratorTest, AllKeys) { ASSERT_TRUE(live_keys.empty()); } -TEST_F(IteratorTest, BasicString) { +TEST_F(DBIteratorTest, BasicString) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); std::vector expected_keys = {"a", "b", "d"}; @@ -148,7 +149,7 @@ TEST_F(IteratorTest, BasicString) { ASSERT_TRUE(expected_keys.empty()); } -TEST_F(IteratorTest, BasicHash) { +TEST_F(DBIteratorTest, BasicHash) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); auto prefix = ComposeNamespaceKey("test_ns1", "", storage_->IsSlotIdEncoded()); for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { @@ -171,7 +172,7 @@ TEST_F(IteratorTest, BasicHash) { } } -TEST_F(IteratorTest, BasicSet) { +TEST_F(DBIteratorTest, BasicSet) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); auto prefix = ComposeNamespaceKey("test_ns2", "", storage_->IsSlotIdEncoded()); for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { @@ -194,7 +195,7 @@ TEST_F(IteratorTest, BasicSet) { } } -TEST_F(IteratorTest, BasicZSet) { +TEST_F(DBIteratorTest, BasicZSet) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); auto prefix = ComposeNamespaceKey("test_ns3", "", storage_->IsSlotIdEncoded()); for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { @@ -217,7 +218,7 @@ TEST_F(IteratorTest, BasicZSet) { } } -TEST_F(IteratorTest, BasicList) { +TEST_F(DBIteratorTest, BasicList) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); auto prefix = ComposeNamespaceKey("test_ns4", "", storage_->IsSlotIdEncoded()); for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { @@ -240,7 +241,7 @@ TEST_F(IteratorTest, BasicList) { } } -TEST_F(IteratorTest, BasicStream) { +TEST_F(DBIteratorTest, BasicStream) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); auto prefix = ComposeNamespaceKey("test_ns5", "", storage_->IsSlotIdEncoded()); for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { @@ -266,7 +267,7 @@ TEST_F(IteratorTest, BasicStream) { } } -TEST_F(IteratorTest, BasicBitmap) { +TEST_F(DBIteratorTest, BasicBitmap) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); auto prefix = ComposeNamespaceKey("test_ns6", "", storage_->IsSlotIdEncoded()); for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { @@ -288,7 +289,7 @@ TEST_F(IteratorTest, BasicBitmap) { } } -TEST_F(IteratorTest, BasicJSON) { +TEST_F(DBIteratorTest, BasicJSON) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); std::vector expected_keys = {"json-1", "json-2", "json-3"}; @@ -310,7 +311,7 @@ TEST_F(IteratorTest, BasicJSON) { ASSERT_TRUE(expected_keys.empty()); } -TEST_F(IteratorTest, BasicSortedInt) { +TEST_F(DBIteratorTest, BasicSortedInt) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); auto prefix = ComposeNamespaceKey("test_ns8", "", storage_->IsSlotIdEncoded()); @@ -343,6 +344,7 @@ class SlotIteratorTest : public TestBase { TEST_F(SlotIteratorTest, LiveKeys) { redis::String string(storage_, kDefaultNamespace); + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); std::vector keys = {"{x}a", "{x}b", "{y}c", "{y}d", "{x}e"}; for (const auto &key : keys) { string.Set(key, "1"); @@ -363,4 +365,495 @@ TEST_F(SlotIteratorTest, LiveKeys) { count++; } ASSERT_EQ(count, same_slot_keys.size()); + + engine::WALIterator wal_iter(storage_, slot_id); + count = 0; + for (wal_iter.Seek(start_seq + 1); wal_iter.Valid(); wal_iter.Next()) { + auto item = wal_iter.Item(); + if (item.type == engine::WALItem::Type::kTypePut) { + auto [_, user_key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(slot_id, GetSlotIdFromKey(user_key.ToString())) << user_key.ToString(); + count++; + } + } + ASSERT_EQ(count, same_slot_keys.size()); +} + +class WALIteratorTest : public TestBase { + protected: + explicit WALIteratorTest() = default; + ~WALIteratorTest() override = default; + void SetUp() override {} +}; + +TEST_F(WALIteratorTest, BasicString) { + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + redis::String string(storage_, "test_ns0"); + string.Set("a", "1"); + string.MSet({{"b", "2"}, {"c", "3"}}); + ASSERT_TRUE(string.Del("b").ok()); + + std::vector put_keys, delete_keys; + auto expected_put_keys = {"a", "b", "c"}; + auto expected_delete_keys = {"b"}; + + engine::WALIterator iter(storage_); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + auto item = iter.Item(); + switch (item.type) { + case engine::WALItem::Type::kTypePut: { + auto [ns, key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(ns.ToString(), "test_ns0"); + put_keys.emplace_back(key.ToString()); + break; + } + case engine::WALItem::Type::kTypeLogData: { + redis::WriteBatchLogData log_data; + ASSERT_TRUE(log_data.Decode(item.key).IsOK()); + ASSERT_EQ(log_data.GetRedisType(), kRedisString); + break; + } + case engine::WALItem::Type::kTypeDelete: { + auto [ns, key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(ns.ToString(), "test_ns0"); + delete_keys.emplace_back(key.ToString()); + break; + } + default: + FAIL() << "Unexpected wal item type" << uint8_t(item.type); + } + } + ASSERT_EQ(expected_put_keys.size(), put_keys.size()); + ASSERT_EQ(expected_delete_keys.size(), delete_keys.size()); + ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), put_keys.begin())); + ASSERT_TRUE(std::equal(expected_delete_keys.begin(), expected_delete_keys.end(), delete_keys.begin())); +} + +TEST_F(WALIteratorTest, BasicHash) { + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + redis::Hash hash(storage_, "test_ns1"); + uint64_t ret = 0; + hash.MSet("hash-1", {{"f0", "v0"}, {"f1", "v1"}, {"f2", "v2"}, {"f3", "v3"}}, false, &ret); + uint64_t deleted_cnt = 0; + hash.Delete("hash-1", {"f0"}, &deleted_cnt); + + // Delete will put meta key again + auto expected_put_keys = {"hash-1", "hash-1"}; + // Sub key will be putted in reverse order + auto expected_put_fields = {"f3", "f2", "f1", "f0"}; + auto expected_delete_fields = {"f0"}; + std::vector put_keys, put_fields, delete_fields; + + engine::WALIterator iter(storage_); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + auto item = iter.Item(); + switch (item.type) { + case engine::WALItem::Type::kTypePut: { + if (item.column_family_id == kColumnFamilyIDDefault) { + InternalKey internal_key(item.key, storage_->IsSlotIdEncoded()); + put_fields.emplace_back(internal_key.GetSubKey().ToString()); + } else if (item.column_family_id == kColumnFamilyIDMetadata) { + auto [ns, key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(ns.ToString(), "test_ns1"); + put_keys.emplace_back(key.ToString()); + } + break; + } + case engine::WALItem::Type::kTypeLogData: { + redis::WriteBatchLogData log_data; + ASSERT_TRUE(log_data.Decode(item.key).IsOK()); + ASSERT_EQ(log_data.GetRedisType(), kRedisHash); + break; + } + case engine::WALItem::Type::kTypeDelete: { + InternalKey internal_key(item.key, storage_->IsSlotIdEncoded()); + delete_fields.emplace_back(internal_key.GetSubKey().ToString()); + break; + } + default: + FAIL() << "Unexpected wal item type" << uint8_t(item.type); + } + } + ASSERT_EQ(expected_put_keys.size(), put_keys.size()); + ASSERT_EQ(expected_put_fields.size(), put_fields.size()); + ASSERT_EQ(expected_delete_fields.size(), delete_fields.size()); + ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), put_keys.begin())); + ASSERT_TRUE(std::equal(expected_put_fields.begin(), expected_put_fields.end(), put_fields.begin())); + ASSERT_TRUE(std::equal(expected_delete_fields.begin(), expected_delete_fields.end(), delete_fields.begin())); +} + +TEST_F(WALIteratorTest, BasicSet) { + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + + uint64_t ret = 0; + redis::Set set(storage_, "test_ns2"); + set.Add("set-1", {"e0", "e1", "e2"}, &ret); + uint64_t removed_cnt = 0; + set.Remove("set-1", {"e0", "e1"}, &removed_cnt); + + auto expected_put_keys = {"set-1", "set-1"}; + auto expected_put_members = {"e0", "e1", "e2"}; + auto expected_delete_members = {"e0", "e1"}; + std::vector put_keys, put_members, delete_members; + + engine::WALIterator iter(storage_); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + auto item = iter.Item(); + switch (item.type) { + case engine::WALItem::Type::kTypePut: { + if (item.column_family_id == kColumnFamilyIDDefault) { + InternalKey internal_key(item.key, storage_->IsSlotIdEncoded()); + put_members.emplace_back(internal_key.GetSubKey().ToString()); + } else if (item.column_family_id == kColumnFamilyIDMetadata) { + auto [ns, key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(ns.ToString(), "test_ns2"); + put_keys.emplace_back(key.ToString()); + } + break; + } + case engine::WALItem::Type::kTypeLogData: { + redis::WriteBatchLogData log_data; + ASSERT_TRUE(log_data.Decode(item.key).IsOK()); + ASSERT_EQ(log_data.GetRedisType(), kRedisSet); + break; + } + case engine::WALItem::Type::kTypeDelete: { + InternalKey internal_key(item.key, storage_->IsSlotIdEncoded()); + delete_members.emplace_back(internal_key.GetSubKey().ToString()); + break; + } + default: + FAIL() << "Unexpected wal item type" << uint8_t(item.type); + } + } + + ASSERT_EQ(expected_put_keys.size(), put_keys.size()); + ASSERT_EQ(expected_put_members.size(), put_members.size()); + ASSERT_EQ(expected_delete_members.size(), delete_members.size()); + ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), put_keys.begin())); + ASSERT_TRUE(std::equal(expected_put_members.begin(), expected_put_members.end(), put_members.begin())); + ASSERT_TRUE(std::equal(expected_delete_members.begin(), expected_delete_members.end(), delete_members.begin())); +} + +TEST_F(WALIteratorTest, BasicZSet) { + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + uint64_t ret = 0; + redis::ZSet zset(storage_, "test_ns3"); + auto mscores = std::vector{{"z0", 0}, {"z1", 1}, {"z2", 2}}; + zset.Add("zset-1", ZAddFlags(), &mscores, &ret); + uint64_t removed_cnt = 0; + zset.Remove("zset-1", {"z0"}, &removed_cnt); + + auto expected_put_keys = {"zset-1", "zset-1"}; + auto expected_put_members = {"z2", "z1", "z0"}; + // member and score + int expected_delete_count = 2, delete_count = 0; + std::vector put_keys, put_members; + + engine::WALIterator iter(storage_); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + auto item = iter.Item(); + switch (item.type) { + case engine::WALItem::Type::kTypePut: { + if (item.column_family_id == kColumnFamilyIDDefault) { + InternalKey internal_key(item.key, storage_->IsSlotIdEncoded()); + put_members.emplace_back(internal_key.GetSubKey().ToString()); + } else if (item.column_family_id == kColumnFamilyIDMetadata) { + auto [ns, key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(ns.ToString(), "test_ns3"); + put_keys.emplace_back(key.ToString()); + } + break; + } + case engine::WALItem::Type::kTypeLogData: { + redis::WriteBatchLogData log_data; + ASSERT_TRUE(log_data.Decode(item.key).IsOK()); + ASSERT_EQ(log_data.GetRedisType(), kRedisZSet); + break; + } + case engine::WALItem::Type::kTypeDelete: { + delete_count++; + break; + } + default: + FAIL() << "Unexpected wal item type" << uint8_t(item.type); + } + } + + ASSERT_EQ(expected_put_keys.size(), put_keys.size()); + ASSERT_EQ(expected_put_members.size(), put_members.size()); + ASSERT_EQ(expected_delete_count, delete_count); + ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), put_keys.begin())); + ASSERT_TRUE(std::equal(expected_put_members.begin(), expected_put_members.end(), put_members.begin())); +} + +TEST_F(WALIteratorTest, BasicList) { + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + uint64_t ret = 0; + redis::List list(storage_, "test_ns4"); + list.Push("list-1", {"l0", "l1", "l2", "l3", "l4"}, false, &ret); + ASSERT_TRUE(list.Trim("list-1", 2, 4).ok()); + + auto expected_put_keys = {"list-1", "list-1"}; + auto expected_put_values = {"l0", "l1", "l2", "l3", "l4"}; + auto expected_delete_count = 2, delete_count = 0; + std::vector put_keys, put_values; + + engine::WALIterator iter(storage_); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + auto item = iter.Item(); + switch (item.type) { + case engine::WALItem::Type::kTypePut: { + if (item.column_family_id == kColumnFamilyIDDefault) { + put_values.emplace_back(item.value); + } else if (item.column_family_id == kColumnFamilyIDMetadata) { + auto [ns, key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(ns.ToString(), "test_ns4"); + put_keys.emplace_back(key.ToString()); + } + break; + } + case engine::WALItem::Type::kTypeLogData: { + redis::WriteBatchLogData log_data; + ASSERT_TRUE(log_data.Decode(item.key).IsOK()); + ASSERT_EQ(log_data.GetRedisType(), kRedisList); + break; + } + case engine::WALItem::Type::kTypeDelete: { + delete_count++; + break; + } + default: + FAIL() << "Unexpected wal item type" << uint8_t(item.type); + } + } + + ASSERT_EQ(expected_put_keys.size(), put_keys.size()); + ASSERT_EQ(expected_put_values.size(), put_values.size()); + ASSERT_EQ(expected_delete_count, delete_count); + ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), put_keys.begin())); + ASSERT_TRUE(std::equal(expected_put_values.begin(), expected_put_values.end(), put_values.begin())); +} + +TEST_F(WALIteratorTest, BasicStream) { + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + redis::Stream stream(storage_, "test_ns5"); + redis::StreamEntryID ret; + redis::StreamAddOptions options; + options.next_id_strategy = std::make_unique(); + stream.Add("stream-1", options, {"x0"}, &ret); + stream.Add("stream-1", options, {"x1"}, &ret); + stream.Add("stream-1", options, {"x2"}, &ret); + uint64_t deleted = 0; + ASSERT_TRUE(stream.DeleteEntries("stream-1", {ret}, &deleted).ok()); + + auto expected_put_keys = {"stream-1", "stream-1", "stream-1", "stream-1"}; + auto expected_put_values = {"x0", "x1", "x2"}; + int delete_count = 0; + std::vector put_keys, put_values; + + engine::WALIterator iter(storage_); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + auto item = iter.Item(); + switch (item.type) { + case engine::WALItem::Type::kTypePut: { + if (item.column_family_id == kColumnFamilyIDStream) { + std::vector elems; + auto s = redis::DecodeRawStreamEntryValue(item.value, &elems); + ASSERT_TRUE(s.IsOK() && !elems.empty()); + put_values.emplace_back(elems[0]); + } else if (item.column_family_id == kColumnFamilyIDMetadata) { + auto [ns, key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(ns.ToString(), "test_ns5"); + put_keys.emplace_back(key.ToString()); + } + break; + } + case engine::WALItem::Type::kTypeLogData: { + redis::WriteBatchLogData log_data; + ASSERT_TRUE(log_data.Decode(item.key).IsOK()); + ASSERT_EQ(log_data.GetRedisType(), kRedisStream); + break; + } + case engine::WALItem::Type::kTypeDelete: { + delete_count++; + break; + } + default: + FAIL() << "Unexpected wal item type" << uint8_t(item.type); + } + } + + ASSERT_EQ(expected_put_keys.size(), put_keys.size()); + ASSERT_EQ(expected_put_values.size(), put_values.size()); + ASSERT_EQ(deleted, delete_count); + ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), put_keys.begin())); + ASSERT_TRUE(std::equal(expected_put_values.begin(), expected_put_values.end(), put_values.begin())); +} + +TEST_F(WALIteratorTest, BasicBitmap) { + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + + redis::Bitmap bitmap(storage_, "test_ns6"); + bool ret = false; + bitmap.SetBit("bitmap-1", 0, true, &ret); + bitmap.SetBit("bitmap-1", 8 * 1024, true, &ret); + bitmap.SetBit("bitmap-1", 2 * 8 * 1024, true, &ret); + + auto expected_put_values = {"\x1", "\x1", "\x1"}; + std::vector put_values; + + engine::WALIterator iter(storage_); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + auto item = iter.Item(); + switch (item.type) { + case engine::WALItem::Type::kTypePut: { + if (item.column_family_id == kColumnFamilyIDDefault) { + put_values.emplace_back(item.value); + } + break; + } + case engine::WALItem::Type::kTypeLogData: { + redis::WriteBatchLogData log_data; + ASSERT_TRUE(log_data.Decode(item.key).IsOK()); + ASSERT_EQ(log_data.GetRedisType(), kRedisBitmap); + break; + } + default: + FAIL() << "Unexpected wal item type" << uint8_t(item.type); + } + } + ASSERT_EQ(expected_put_values.size(), put_values.size()); + ASSERT_TRUE(std::equal(expected_put_values.begin(), expected_put_values.end(), put_values.begin())); +} + +TEST_F(WALIteratorTest, BasicJSON) { + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + redis::Json json(storage_, "test_ns7"); + json.Set("json-1", "$", "{\"a\": 1, \"b\": 2}"); + json.Set("json-2", "$", "{\"a\": 1, \"b\": 2}"); + json.Set("json-3", "$", "{\"a\": 1, \"b\": 2}"); + + size_t result = 0; + ASSERT_TRUE(json.Del("json-3", "$", &result).ok()); + + auto expected_put_keys = {"json-1", "json-2", "json-3"}; + auto expected_delete_keys = {"json-3"}; + std::vector put_keys, delete_keys; + + engine::WALIterator iter(storage_); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + auto item = iter.Item(); + switch (item.type) { + case engine::WALItem::Type::kTypePut: { + ASSERT_EQ(item.column_family_id, kColumnFamilyIDMetadata); + auto [ns, key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(ns.ToString(), "test_ns7"); + put_keys.emplace_back(key.ToString()); + break; + } + case engine::WALItem::Type::kTypeLogData: { + redis::WriteBatchLogData log_data; + ASSERT_TRUE(log_data.Decode(item.key).IsOK()); + ASSERT_EQ(log_data.GetRedisType(), kRedisJson); + break; + } + case engine::WALItem::Type::kTypeDelete: { + ASSERT_EQ(item.column_family_id, kColumnFamilyIDMetadata); + auto [ns, key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(ns.ToString(), "test_ns7"); + delete_keys.emplace_back(key.ToString()); + break; + } + default: + FAIL() << "Unexpected wal item type" << uint8_t(item.type); + } + } + + ASSERT_EQ(expected_put_keys.size(), put_keys.size()); + ASSERT_EQ(expected_delete_keys.size(), delete_keys.size()); + ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), put_keys.begin())); + ASSERT_TRUE(std::equal(expected_delete_keys.begin(), expected_delete_keys.end(), delete_keys.begin())); +} + +TEST_F(WALIteratorTest, BasicSortedInt) { + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + redis::Sortedint sortedint(storage_, "test_ns8"); + uint64_t ret = 0; + sortedint.Add("sortedint-1", {1, 2, 3}, &ret); + uint64_t removed_cnt = 0; + sortedint.Remove("sortedint-1", {2}, &removed_cnt); + + std::vector expected_values = {1, 2, 3}, put_values; + std::vector expected_delete_values = {2}, delete_values; + + engine::WALIterator iter(storage_); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + auto item = iter.Item(); + switch (item.type) { + case engine::WALItem::Type::kTypePut: { + if (item.column_family_id == kColumnFamilyIDDefault) { + const InternalKey internal_key(item.key, storage_->IsSlotIdEncoded()); + auto value = DecodeFixed64(internal_key.GetSubKey().data()); + put_values.emplace_back(value); + } + break; + } + case engine::WALItem::Type::kTypeLogData: { + redis::WriteBatchLogData log_data; + ASSERT_TRUE(log_data.Decode(item.key).IsOK()); + ASSERT_EQ(log_data.GetRedisType(), kRedisSortedint); + break; + } + case engine::WALItem::Type::kTypeDelete: { + const InternalKey internal_key(item.key, storage_->IsSlotIdEncoded()); + auto value = DecodeFixed64(internal_key.GetSubKey().data()); + delete_values.emplace_back(value); + break; + } + default: + FAIL() << "Unexpected wal item type" << uint8_t(item.type); + } + } + ASSERT_EQ(expected_values.size(), put_values.size()); + ASSERT_EQ(expected_delete_values.size(), delete_values.size()); + ASSERT_TRUE(std::equal(expected_values.begin(), expected_values.end(), put_values.begin())); + ASSERT_TRUE(std::equal(expected_delete_values.begin(), expected_delete_values.end(), delete_values.begin())); +} + +TEST_F(WALIteratorTest, NextSequence) { + std::vector expected_next_sequences; + std::set next_sequences_set; + + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + uint64_t ret = 0; + redis::List list(storage_, "test_ns2"); + list.Push("list-1", {"l0", "l1", "l2", "l3", "l4"}, false, &ret); + expected_next_sequences.emplace_back(storage_->GetDB()->GetLatestSequenceNumber() + 1); + list.Push("list-2", {"l0", "l1", "l2"}, false, &ret); + expected_next_sequences.emplace_back(storage_->GetDB()->GetLatestSequenceNumber() + 1); + ASSERT_TRUE(list.Trim("list-1", 2, 4).ok()); + expected_next_sequences.emplace_back(storage_->GetDB()->GetLatestSequenceNumber() + 1); + + engine::WALIterator iter(storage_); + + ASSERT_EQ(iter.NextSequenceNumber(), 0); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + next_sequences_set.emplace(iter.NextSequenceNumber()); + } + + std::vector next_sequences(next_sequences_set.begin(), next_sequences_set.end()); + + ASSERT_EQ(expected_next_sequences.size(), next_sequences.size()); + ASSERT_TRUE(std::equal(expected_next_sequences.begin(), expected_next_sequences.end(), next_sequences.begin())); }