diff --git a/src/storage/iterator.cc b/src/storage/iterator.cc index 3717afb00a5..6514207b35d 100644 --- a/src/storage/iterator.cc +++ b/src/storage/iterator.cc @@ -25,7 +25,7 @@ #include "db_util.h" namespace engine { -DBIterator::DBIterator(Storage* storage, rocksdb::ReadOptions read_options, int slot) +DBIterator::DBIterator(Storage *storage, rocksdb::ReadOptions read_options, int slot) : storage_(storage), read_options_(std::move(read_options)), slot_(slot) { metadata_cf_handle_ = storage_->GetCFHandle(kMetadataColumnFamilyName); metadata_iter_ = util::UniqueIterator(storage_->NewIterator(read_options_, metadata_cf_handle_)); @@ -80,7 +80,7 @@ void DBIterator::Reset() { if (metadata_iter_) metadata_iter_.reset(); } -void DBIterator::Seek(const std::string& target) { +void DBIterator::Seek(const std::string &target) { if (!metadata_iter_) return; // Iterate with the slot id but storage didn't enable slot id encoding @@ -112,7 +112,7 @@ std::unique_ptr DBIterator::GetSubKeyIterator() const { return std::make_unique(storage_, read_options_, type, std::move(prefix)); } -SubKeyIterator::SubKeyIterator(Storage* storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix) +SubKeyIterator::SubKeyIterator(Storage *storage, rocksdb::ReadOptions read_options, RedisType type, std::string prefix) : storage_(storage), read_options_(std::move(read_options)), type_(type), prefix_(std::move(prefix)) { if (type_ == kRedisStream) { cf_handle_ = storage_->GetCFHandle(kStreamColumnFamilyName); @@ -145,7 +145,7 @@ Slice SubKeyIterator::UserKey() const { return internal_key.GetSubKey(); } -rocksdb::ColumnFamilyHandle* SubKeyIterator::ColumnFamilyHandle() const { return Valid() ? this->cf_handle_ : nullptr; } +rocksdb::ColumnFamilyHandle *SubKeyIterator::ColumnFamilyHandle() const { return Valid() ? this->cf_handle_ : nullptr; } Slice SubKeyIterator::Value() const { return Valid() ? iter_->value() : Slice(); } @@ -164,4 +164,125 @@ void SubKeyIterator::Reset() { if (iter_) iter_.reset(); } +rocksdb::Status WALBatchExtractor::PutCF(uint32_t column_family_id, const Slice &key, const Slice &value) { + if (slot_ != -1 && slot_ != ExtractSlotId(key)) { + return rocksdb::Status::OK(); + } + items_.emplace_back(WALItem::Type::kTypePut, column_family_id, key.ToString(), value.ToString()); + return rocksdb::Status::OK(); +} + +rocksdb::Status WALBatchExtractor::DeleteCF(uint32_t column_family_id, const rocksdb::Slice &key) { + if (slot_ != -1 && slot_ != ExtractSlotId(key)) { + return rocksdb::Status::OK(); + } + items_.emplace_back(WALItem::Type::kTypeDelete, column_family_id, key.ToString(), std::string{}); + return rocksdb::Status::OK(); +} + +rocksdb::Status WALBatchExtractor::DeleteRangeCF(uint32_t column_family_id, const rocksdb::Slice &begin_key, + const rocksdb::Slice &end_key) { + items_.emplace_back(WALItem::Type::kTypeDeleteRange, column_family_id, begin_key.ToString(), end_key.ToString()); + return rocksdb::Status::OK(); +} + +void WALBatchExtractor::LogData(const rocksdb::Slice &blob) { + items_.emplace_back(WALItem::Type::kTypeLogData, 0, blob.ToString(), std::string{}); +}; + +void WALBatchExtractor::Clear() { items_.clear(); } + +WALBatchExtractor::Iter WALBatchExtractor::GetIter() { return Iter(&items_); } + +bool WALBatchExtractor::Iter::Valid() { return items_ && cur_ < items_->size(); } + +void WALBatchExtractor::Iter::Next() { cur_++; } + +WALItem WALBatchExtractor::Iter::Value() { + if (!Valid()) { + return {}; + } + return (*items_)[cur_]; +} + +void WALIterator::Reset() { + if (iter_) { + iter_.reset(); + } + if (batch_iter_) { + batch_iter_.reset(); + } + extractor_.Clear(); + next_batch_seq_ = 0; +} + +bool WALIterator::Valid() const { return (batch_iter_ && batch_iter_->Valid()) || (iter_ && iter_->Valid()); } + +void WALIterator::nextBatch() { + if (!iter_ || !iter_->Valid()) { + Reset(); + return; + } + + auto batch = iter_->GetBatch(); + if (batch.sequence != next_batch_seq_ || !batch.writeBatchPtr) { + Reset(); + return; + } + + extractor_.Clear(); + + auto s = batch.writeBatchPtr->Iterate(&extractor_); + if (!s.ok()) { + Reset(); + return; + } + + next_batch_seq_ += batch.writeBatchPtr->Count(); + batch_iter_ = std::make_unique(extractor_.GetIter()); +} + +void WALIterator::Seek(rocksdb::SequenceNumber seq) { + if (slot_ != -1 && !storage_->IsSlotIdEncoded()) { + Reset(); + return; + } + + auto s = storage_->GetWALIter(seq, &iter_); + if (!s.IsOK()) { + Reset(); + return; + } + + next_batch_seq_ = seq; + + nextBatch(); +} + +WALItem WALIterator::Item() { + if (batch_iter_ && batch_iter_->Valid()) { + return batch_iter_->Value(); + } + return {}; +} + +rocksdb::SequenceNumber WALIterator::NextSequenceNumber() const { return next_batch_seq_; } + +void WALIterator::Next() { + if (!Valid()) { + Reset(); + return; + } + + if (batch_iter_ && batch_iter_->Valid()) { + batch_iter_->Next(); + if (batch_iter_->Valid()) { + return; + } + } + + iter_->Next(); + nextBatch(); +} + } // namespace engine diff --git a/src/storage/iterator.h b/src/storage/iterator.h index c257df6fdc5..2f123630c3e 100644 --- a/src/storage/iterator.h +++ b/src/storage/iterator.h @@ -80,4 +80,86 @@ class DBIterator { std::unique_ptr subkey_iter_; }; +struct WALItem { + enum class Type : uint8_t { + kTypeInvalid = 0, + kTypeLogData = 1, + kTypePut = 2, + kTypeDelete = 3, + kTypeDeleteRange = 4, + }; + + WALItem() = default; + WALItem(WALItem::Type t, uint32_t cf_id, std::string k, std::string v) + : type(t), column_family_id(cf_id), key(std::move(k)), value(std::move(v)) {} + + WALItem::Type type = WALItem::Type::kTypeInvalid; + uint32_t column_family_id = 0; + std::string key; + std::string value; +}; + +class WALBatchExtractor : public rocksdb::WriteBatch::Handler { + public: + // If set slot, storage must enable slot id encoding + explicit WALBatchExtractor(int slot = -1) : slot_(slot) {} + + rocksdb::Status PutCF(uint32_t column_family_id, const Slice &key, const Slice &value) override; + + rocksdb::Status DeleteCF(uint32_t column_family_id, const rocksdb::Slice &key) override; + + rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const rocksdb::Slice &begin_key, + const rocksdb::Slice &end_key) override; + + void LogData(const rocksdb::Slice &blob) override; + + void Clear(); + + class Iter { + friend class WALBatchExtractor; + + public: + bool Valid(); + void Next(); + WALItem Value(); + + private: + explicit Iter(std::vector *items) : items_(items), cur_(0) {} + std::vector *items_; + size_t cur_; + }; + + WALBatchExtractor::Iter GetIter(); + + private: + std::vector items_; + int slot_; +}; + +class WALIterator { + public: + explicit WALIterator(engine::Storage *storage, int slot = -1) + : storage_(storage), slot_(slot), extractor_(slot), next_batch_seq_(0){}; + ~WALIterator() = default; + + bool Valid() const; + void Seek(rocksdb::SequenceNumber seq); + void Next(); + WALItem Item(); + + rocksdb::SequenceNumber NextSequenceNumber() const; + void Reset(); + + private: + void nextBatch(); + + engine::Storage *storage_; + int slot_; + + std::unique_ptr iter_; + WALBatchExtractor extractor_; + std::unique_ptr batch_iter_; + rocksdb::SequenceNumber next_batch_seq_; +}; + } // namespace engine diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc index 458ab4e880e..1bca93d77dc 100644 --- a/src/storage/redis_metadata.cc +++ b/src/storage/redis_metadata.cc @@ -101,6 +101,17 @@ bool InternalKey::operator==(const InternalKey &that) const { return version_ == that.version_; } +// Must slot encoded +uint16_t ExtractSlotId(Slice ns_key) { + uint8_t namespace_size = 0; + GetFixed8(&ns_key, &namespace_size); + ns_key.remove_prefix(namespace_size); + + uint16_t slot_id = HASH_SLOTS_SIZE; + GetFixed16(&ns_key, &slot_id); + return slot_id; +} + template std::tuple ExtractNamespaceKey(Slice ns_key, bool slot_id_encoded) { uint8_t namespace_size = 0; diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h index 8fe52f3910e..ce1026443af 100644 --- a/src/storage/redis_metadata.h +++ b/src/storage/redis_metadata.h @@ -105,6 +105,7 @@ struct KeyNumStats { uint64_t avg_ttl = 0; }; +[[nodiscard]] uint16_t ExtractSlotId(Slice ns_key); template [[nodiscard]] std::tuple ExtractNamespaceKey(Slice ns_key, bool slot_id_encoded); [[nodiscard]] std::string ComposeNamespaceKey(const Slice &ns, const Slice &key, bool slot_id_encoded); diff --git a/tests/cppunit/iterator_test.cc b/tests/cppunit/iterator_test.cc index 4bbd24089ea..6a2437a126b 100644 --- a/tests/cppunit/iterator_test.cc +++ b/tests/cppunit/iterator_test.cc @@ -19,6 +19,7 @@ */ #include +#include #include #include #include @@ -32,10 +33,10 @@ #include "test_base.h" #include "types/redis_string.h" -class IteratorTest : public TestBase { +class DBIteratorTest : public TestBase { protected: - explicit IteratorTest() = default; - ~IteratorTest() override = default; + explicit DBIteratorTest() = default; + ~DBIteratorTest() override = default; void SetUp() override { { // string @@ -112,7 +113,7 @@ class IteratorTest : public TestBase { } }; -TEST_F(IteratorTest, AllKeys) { +TEST_F(DBIteratorTest, AllKeys) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); std::vector live_keys = {"a", "b", "d", "hash-1", "set-1", "zset-1", "list-1", "stream-1", "bitmap-1", "json-1", "json-2", "json-3", "sortedint-1"}; @@ -126,7 +127,7 @@ TEST_F(IteratorTest, AllKeys) { ASSERT_TRUE(live_keys.empty()); } -TEST_F(IteratorTest, BasicString) { +TEST_F(DBIteratorTest, BasicString) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); std::vector expected_keys = {"a", "b", "d"}; @@ -148,7 +149,7 @@ TEST_F(IteratorTest, BasicString) { ASSERT_TRUE(expected_keys.empty()); } -TEST_F(IteratorTest, BasicHash) { +TEST_F(DBIteratorTest, BasicHash) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); auto prefix = ComposeNamespaceKey("test_ns1", "", storage_->IsSlotIdEncoded()); for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { @@ -171,7 +172,7 @@ TEST_F(IteratorTest, BasicHash) { } } -TEST_F(IteratorTest, BasicSet) { +TEST_F(DBIteratorTest, BasicSet) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); auto prefix = ComposeNamespaceKey("test_ns2", "", storage_->IsSlotIdEncoded()); for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { @@ -194,7 +195,7 @@ TEST_F(IteratorTest, BasicSet) { } } -TEST_F(IteratorTest, BasicZSet) { +TEST_F(DBIteratorTest, BasicZSet) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); auto prefix = ComposeNamespaceKey("test_ns3", "", storage_->IsSlotIdEncoded()); for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { @@ -217,7 +218,7 @@ TEST_F(IteratorTest, BasicZSet) { } } -TEST_F(IteratorTest, BasicList) { +TEST_F(DBIteratorTest, BasicList) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); auto prefix = ComposeNamespaceKey("test_ns4", "", storage_->IsSlotIdEncoded()); for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { @@ -240,7 +241,7 @@ TEST_F(IteratorTest, BasicList) { } } -TEST_F(IteratorTest, BasicStream) { +TEST_F(DBIteratorTest, BasicStream) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); auto prefix = ComposeNamespaceKey("test_ns5", "", storage_->IsSlotIdEncoded()); for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { @@ -266,7 +267,7 @@ TEST_F(IteratorTest, BasicStream) { } } -TEST_F(IteratorTest, BasicBitmap) { +TEST_F(DBIteratorTest, BasicBitmap) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); auto prefix = ComposeNamespaceKey("test_ns6", "", storage_->IsSlotIdEncoded()); for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { @@ -288,7 +289,7 @@ TEST_F(IteratorTest, BasicBitmap) { } } -TEST_F(IteratorTest, BasicJSON) { +TEST_F(DBIteratorTest, BasicJSON) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); std::vector expected_keys = {"json-1", "json-2", "json-3"}; @@ -310,7 +311,7 @@ TEST_F(IteratorTest, BasicJSON) { ASSERT_TRUE(expected_keys.empty()); } -TEST_F(IteratorTest, BasicSortedInt) { +TEST_F(DBIteratorTest, BasicSortedInt) { engine::DBIterator iter(storage_, rocksdb::ReadOptions()); auto prefix = ComposeNamespaceKey("test_ns8", "", storage_->IsSlotIdEncoded()); @@ -343,6 +344,7 @@ class SlotIteratorTest : public TestBase { TEST_F(SlotIteratorTest, LiveKeys) { redis::String string(storage_, kDefaultNamespace); + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); std::vector keys = {"{x}a", "{x}b", "{y}c", "{y}d", "{x}e"}; for (const auto &key : keys) { string.Set(key, "1"); @@ -363,4 +365,495 @@ TEST_F(SlotIteratorTest, LiveKeys) { count++; } ASSERT_EQ(count, same_slot_keys.size()); + + engine::WALIterator wal_iter(storage_, slot_id); + count = 0; + for (wal_iter.Seek(start_seq + 1); wal_iter.Valid(); wal_iter.Next()) { + auto item = wal_iter.Item(); + if (item.type == engine::WALItem::Type::kTypePut) { + auto [_, user_key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(slot_id, GetSlotIdFromKey(user_key.ToString())) << user_key.ToString(); + count++; + } + } + ASSERT_EQ(count, same_slot_keys.size()); +} + +class WALIteratorTest : public TestBase { + protected: + explicit WALIteratorTest() = default; + ~WALIteratorTest() override = default; + void SetUp() override {} +}; + +TEST_F(WALIteratorTest, BasicString) { + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + redis::String string(storage_, "test_ns0"); + string.Set("a", "1"); + string.MSet({{"b", "2"}, {"c", "3"}}); + ASSERT_TRUE(string.Del("b").ok()); + + std::vector put_keys, delete_keys; + auto expected_put_keys = {"a", "b", "c"}; + auto expected_delete_keys = {"b"}; + + engine::WALIterator iter(storage_); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + auto item = iter.Item(); + switch (item.type) { + case engine::WALItem::Type::kTypePut: { + auto [ns, key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(ns.ToString(), "test_ns0"); + put_keys.emplace_back(key.ToString()); + break; + } + case engine::WALItem::Type::kTypeLogData: { + redis::WriteBatchLogData log_data; + ASSERT_TRUE(log_data.Decode(item.key).IsOK()); + ASSERT_EQ(log_data.GetRedisType(), kRedisString); + break; + } + case engine::WALItem::Type::kTypeDelete: { + auto [ns, key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(ns.ToString(), "test_ns0"); + delete_keys.emplace_back(key.ToString()); + break; + } + default: + FAIL() << "Unexpected wal item type" << uint8_t(item.type); + } + } + ASSERT_EQ(expected_put_keys.size(), put_keys.size()); + ASSERT_EQ(expected_delete_keys.size(), delete_keys.size()); + ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), put_keys.begin())); + ASSERT_TRUE(std::equal(expected_delete_keys.begin(), expected_delete_keys.end(), delete_keys.begin())); +} + +TEST_F(WALIteratorTest, BasicHash) { + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + redis::Hash hash(storage_, "test_ns1"); + uint64_t ret = 0; + hash.MSet("hash-1", {{"f0", "v0"}, {"f1", "v1"}, {"f2", "v2"}, {"f3", "v3"}}, false, &ret); + uint64_t deleted_cnt = 0; + hash.Delete("hash-1", {"f0"}, &deleted_cnt); + + // Delete will put meta key again + auto expected_put_keys = {"hash-1", "hash-1"}; + // Sub key will be putted in reverse order + auto expected_put_fields = {"f3", "f2", "f1", "f0"}; + auto expected_delete_fields = {"f0"}; + std::vector put_keys, put_fields, delete_fields; + + engine::WALIterator iter(storage_); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + auto item = iter.Item(); + switch (item.type) { + case engine::WALItem::Type::kTypePut: { + if (item.column_family_id == kColumnFamilyIDDefault) { + InternalKey internal_key(item.key, storage_->IsSlotIdEncoded()); + put_fields.emplace_back(internal_key.GetSubKey().ToString()); + } else if (item.column_family_id == kColumnFamilyIDMetadata) { + auto [ns, key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(ns.ToString(), "test_ns1"); + put_keys.emplace_back(key.ToString()); + } + break; + } + case engine::WALItem::Type::kTypeLogData: { + redis::WriteBatchLogData log_data; + ASSERT_TRUE(log_data.Decode(item.key).IsOK()); + ASSERT_EQ(log_data.GetRedisType(), kRedisHash); + break; + } + case engine::WALItem::Type::kTypeDelete: { + InternalKey internal_key(item.key, storage_->IsSlotIdEncoded()); + delete_fields.emplace_back(internal_key.GetSubKey().ToString()); + break; + } + default: + FAIL() << "Unexpected wal item type" << uint8_t(item.type); + } + } + ASSERT_EQ(expected_put_keys.size(), put_keys.size()); + ASSERT_EQ(expected_put_fields.size(), put_fields.size()); + ASSERT_EQ(expected_delete_fields.size(), delete_fields.size()); + ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), put_keys.begin())); + ASSERT_TRUE(std::equal(expected_put_fields.begin(), expected_put_fields.end(), put_fields.begin())); + ASSERT_TRUE(std::equal(expected_delete_fields.begin(), expected_delete_fields.end(), delete_fields.begin())); +} + +TEST_F(WALIteratorTest, BasicSet) { + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + + uint64_t ret = 0; + redis::Set set(storage_, "test_ns2"); + set.Add("set-1", {"e0", "e1", "e2"}, &ret); + uint64_t removed_cnt = 0; + set.Remove("set-1", {"e0", "e1"}, &removed_cnt); + + auto expected_put_keys = {"set-1", "set-1"}; + auto expected_put_members = {"e0", "e1", "e2"}; + auto expected_delete_members = {"e0", "e1"}; + std::vector put_keys, put_members, delete_members; + + engine::WALIterator iter(storage_); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + auto item = iter.Item(); + switch (item.type) { + case engine::WALItem::Type::kTypePut: { + if (item.column_family_id == kColumnFamilyIDDefault) { + InternalKey internal_key(item.key, storage_->IsSlotIdEncoded()); + put_members.emplace_back(internal_key.GetSubKey().ToString()); + } else if (item.column_family_id == kColumnFamilyIDMetadata) { + auto [ns, key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(ns.ToString(), "test_ns2"); + put_keys.emplace_back(key.ToString()); + } + break; + } + case engine::WALItem::Type::kTypeLogData: { + redis::WriteBatchLogData log_data; + ASSERT_TRUE(log_data.Decode(item.key).IsOK()); + ASSERT_EQ(log_data.GetRedisType(), kRedisSet); + break; + } + case engine::WALItem::Type::kTypeDelete: { + InternalKey internal_key(item.key, storage_->IsSlotIdEncoded()); + delete_members.emplace_back(internal_key.GetSubKey().ToString()); + break; + } + default: + FAIL() << "Unexpected wal item type" << uint8_t(item.type); + } + } + + ASSERT_EQ(expected_put_keys.size(), put_keys.size()); + ASSERT_EQ(expected_put_members.size(), put_members.size()); + ASSERT_EQ(expected_delete_members.size(), delete_members.size()); + ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), put_keys.begin())); + ASSERT_TRUE(std::equal(expected_put_members.begin(), expected_put_members.end(), put_members.begin())); + ASSERT_TRUE(std::equal(expected_delete_members.begin(), expected_delete_members.end(), delete_members.begin())); +} + +TEST_F(WALIteratorTest, BasicZSet) { + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + uint64_t ret = 0; + redis::ZSet zset(storage_, "test_ns3"); + auto mscores = std::vector{{"z0", 0}, {"z1", 1}, {"z2", 2}}; + zset.Add("zset-1", ZAddFlags(), &mscores, &ret); + uint64_t removed_cnt = 0; + zset.Remove("zset-1", {"z0"}, &removed_cnt); + + auto expected_put_keys = {"zset-1", "zset-1"}; + auto expected_put_members = {"z2", "z1", "z0"}; + // member and score + int expected_delete_count = 2, delete_count = 0; + std::vector put_keys, put_members; + + engine::WALIterator iter(storage_); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + auto item = iter.Item(); + switch (item.type) { + case engine::WALItem::Type::kTypePut: { + if (item.column_family_id == kColumnFamilyIDDefault) { + InternalKey internal_key(item.key, storage_->IsSlotIdEncoded()); + put_members.emplace_back(internal_key.GetSubKey().ToString()); + } else if (item.column_family_id == kColumnFamilyIDMetadata) { + auto [ns, key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(ns.ToString(), "test_ns3"); + put_keys.emplace_back(key.ToString()); + } + break; + } + case engine::WALItem::Type::kTypeLogData: { + redis::WriteBatchLogData log_data; + ASSERT_TRUE(log_data.Decode(item.key).IsOK()); + ASSERT_EQ(log_data.GetRedisType(), kRedisZSet); + break; + } + case engine::WALItem::Type::kTypeDelete: { + delete_count++; + break; + } + default: + FAIL() << "Unexpected wal item type" << uint8_t(item.type); + } + } + + ASSERT_EQ(expected_put_keys.size(), put_keys.size()); + ASSERT_EQ(expected_put_members.size(), put_members.size()); + ASSERT_EQ(expected_delete_count, delete_count); + ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), put_keys.begin())); + ASSERT_TRUE(std::equal(expected_put_members.begin(), expected_put_members.end(), put_members.begin())); +} + +TEST_F(WALIteratorTest, BasicList) { + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + uint64_t ret = 0; + redis::List list(storage_, "test_ns4"); + list.Push("list-1", {"l0", "l1", "l2", "l3", "l4"}, false, &ret); + ASSERT_TRUE(list.Trim("list-1", 2, 4).ok()); + + auto expected_put_keys = {"list-1", "list-1"}; + auto expected_put_values = {"l0", "l1", "l2", "l3", "l4"}; + auto expected_delete_count = 2, delete_count = 0; + std::vector put_keys, put_values; + + engine::WALIterator iter(storage_); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + auto item = iter.Item(); + switch (item.type) { + case engine::WALItem::Type::kTypePut: { + if (item.column_family_id == kColumnFamilyIDDefault) { + put_values.emplace_back(item.value); + } else if (item.column_family_id == kColumnFamilyIDMetadata) { + auto [ns, key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(ns.ToString(), "test_ns4"); + put_keys.emplace_back(key.ToString()); + } + break; + } + case engine::WALItem::Type::kTypeLogData: { + redis::WriteBatchLogData log_data; + ASSERT_TRUE(log_data.Decode(item.key).IsOK()); + ASSERT_EQ(log_data.GetRedisType(), kRedisList); + break; + } + case engine::WALItem::Type::kTypeDelete: { + delete_count++; + break; + } + default: + FAIL() << "Unexpected wal item type" << uint8_t(item.type); + } + } + + ASSERT_EQ(expected_put_keys.size(), put_keys.size()); + ASSERT_EQ(expected_put_values.size(), put_values.size()); + ASSERT_EQ(expected_delete_count, delete_count); + ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), put_keys.begin())); + ASSERT_TRUE(std::equal(expected_put_values.begin(), expected_put_values.end(), put_values.begin())); +} + +TEST_F(WALIteratorTest, BasicStream) { + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + redis::Stream stream(storage_, "test_ns5"); + redis::StreamEntryID ret; + redis::StreamAddOptions options; + options.next_id_strategy = std::make_unique(); + stream.Add("stream-1", options, {"x0"}, &ret); + stream.Add("stream-1", options, {"x1"}, &ret); + stream.Add("stream-1", options, {"x2"}, &ret); + uint64_t deleted = 0; + ASSERT_TRUE(stream.DeleteEntries("stream-1", {ret}, &deleted).ok()); + + auto expected_put_keys = {"stream-1", "stream-1", "stream-1", "stream-1"}; + auto expected_put_values = {"x0", "x1", "x2"}; + int delete_count = 0; + std::vector put_keys, put_values; + + engine::WALIterator iter(storage_); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + auto item = iter.Item(); + switch (item.type) { + case engine::WALItem::Type::kTypePut: { + if (item.column_family_id == kColumnFamilyIDStream) { + std::vector elems; + auto s = redis::DecodeRawStreamEntryValue(item.value, &elems); + ASSERT_TRUE(s.IsOK() && !elems.empty()); + put_values.emplace_back(elems[0]); + } else if (item.column_family_id == kColumnFamilyIDMetadata) { + auto [ns, key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(ns.ToString(), "test_ns5"); + put_keys.emplace_back(key.ToString()); + } + break; + } + case engine::WALItem::Type::kTypeLogData: { + redis::WriteBatchLogData log_data; + ASSERT_TRUE(log_data.Decode(item.key).IsOK()); + ASSERT_EQ(log_data.GetRedisType(), kRedisStream); + break; + } + case engine::WALItem::Type::kTypeDelete: { + delete_count++; + break; + } + default: + FAIL() << "Unexpected wal item type" << uint8_t(item.type); + } + } + + ASSERT_EQ(expected_put_keys.size(), put_keys.size()); + ASSERT_EQ(expected_put_values.size(), put_values.size()); + ASSERT_EQ(deleted, delete_count); + ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), put_keys.begin())); + ASSERT_TRUE(std::equal(expected_put_values.begin(), expected_put_values.end(), put_values.begin())); +} + +TEST_F(WALIteratorTest, BasicBitmap) { + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + + redis::Bitmap bitmap(storage_, "test_ns6"); + bool ret = false; + bitmap.SetBit("bitmap-1", 0, true, &ret); + bitmap.SetBit("bitmap-1", 8 * 1024, true, &ret); + bitmap.SetBit("bitmap-1", 2 * 8 * 1024, true, &ret); + + auto expected_put_values = {"\x1", "\x1", "\x1"}; + std::vector put_values; + + engine::WALIterator iter(storage_); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + auto item = iter.Item(); + switch (item.type) { + case engine::WALItem::Type::kTypePut: { + if (item.column_family_id == kColumnFamilyIDDefault) { + put_values.emplace_back(item.value); + } + break; + } + case engine::WALItem::Type::kTypeLogData: { + redis::WriteBatchLogData log_data; + ASSERT_TRUE(log_data.Decode(item.key).IsOK()); + ASSERT_EQ(log_data.GetRedisType(), kRedisBitmap); + break; + } + default: + FAIL() << "Unexpected wal item type" << uint8_t(item.type); + } + } + ASSERT_EQ(expected_put_values.size(), put_values.size()); + ASSERT_TRUE(std::equal(expected_put_values.begin(), expected_put_values.end(), put_values.begin())); +} + +TEST_F(WALIteratorTest, BasicJSON) { + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + redis::Json json(storage_, "test_ns7"); + json.Set("json-1", "$", "{\"a\": 1, \"b\": 2}"); + json.Set("json-2", "$", "{\"a\": 1, \"b\": 2}"); + json.Set("json-3", "$", "{\"a\": 1, \"b\": 2}"); + + size_t result = 0; + ASSERT_TRUE(json.Del("json-3", "$", &result).ok()); + + auto expected_put_keys = {"json-1", "json-2", "json-3"}; + auto expected_delete_keys = {"json-3"}; + std::vector put_keys, delete_keys; + + engine::WALIterator iter(storage_); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + auto item = iter.Item(); + switch (item.type) { + case engine::WALItem::Type::kTypePut: { + ASSERT_EQ(item.column_family_id, kColumnFamilyIDMetadata); + auto [ns, key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(ns.ToString(), "test_ns7"); + put_keys.emplace_back(key.ToString()); + break; + } + case engine::WALItem::Type::kTypeLogData: { + redis::WriteBatchLogData log_data; + ASSERT_TRUE(log_data.Decode(item.key).IsOK()); + ASSERT_EQ(log_data.GetRedisType(), kRedisJson); + break; + } + case engine::WALItem::Type::kTypeDelete: { + ASSERT_EQ(item.column_family_id, kColumnFamilyIDMetadata); + auto [ns, key] = ExtractNamespaceKey(item.key, storage_->IsSlotIdEncoded()); + ASSERT_EQ(ns.ToString(), "test_ns7"); + delete_keys.emplace_back(key.ToString()); + break; + } + default: + FAIL() << "Unexpected wal item type" << uint8_t(item.type); + } + } + + ASSERT_EQ(expected_put_keys.size(), put_keys.size()); + ASSERT_EQ(expected_delete_keys.size(), delete_keys.size()); + ASSERT_TRUE(std::equal(expected_put_keys.begin(), expected_put_keys.end(), put_keys.begin())); + ASSERT_TRUE(std::equal(expected_delete_keys.begin(), expected_delete_keys.end(), delete_keys.begin())); +} + +TEST_F(WALIteratorTest, BasicSortedInt) { + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + redis::Sortedint sortedint(storage_, "test_ns8"); + uint64_t ret = 0; + sortedint.Add("sortedint-1", {1, 2, 3}, &ret); + uint64_t removed_cnt = 0; + sortedint.Remove("sortedint-1", {2}, &removed_cnt); + + std::vector expected_values = {1, 2, 3}, put_values; + std::vector expected_delete_values = {2}, delete_values; + + engine::WALIterator iter(storage_); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + auto item = iter.Item(); + switch (item.type) { + case engine::WALItem::Type::kTypePut: { + if (item.column_family_id == kColumnFamilyIDDefault) { + const InternalKey internal_key(item.key, storage_->IsSlotIdEncoded()); + auto value = DecodeFixed64(internal_key.GetSubKey().data()); + put_values.emplace_back(value); + } + break; + } + case engine::WALItem::Type::kTypeLogData: { + redis::WriteBatchLogData log_data; + ASSERT_TRUE(log_data.Decode(item.key).IsOK()); + ASSERT_EQ(log_data.GetRedisType(), kRedisSortedint); + break; + } + case engine::WALItem::Type::kTypeDelete: { + const InternalKey internal_key(item.key, storage_->IsSlotIdEncoded()); + auto value = DecodeFixed64(internal_key.GetSubKey().data()); + delete_values.emplace_back(value); + break; + } + default: + FAIL() << "Unexpected wal item type" << uint8_t(item.type); + } + } + ASSERT_EQ(expected_values.size(), put_values.size()); + ASSERT_EQ(expected_delete_values.size(), delete_values.size()); + ASSERT_TRUE(std::equal(expected_values.begin(), expected_values.end(), put_values.begin())); + ASSERT_TRUE(std::equal(expected_delete_values.begin(), expected_delete_values.end(), delete_values.begin())); +} + +TEST_F(WALIteratorTest, NextSequence) { + std::vector expected_next_sequences; + std::set next_sequences_set; + + auto start_seq = storage_->GetDB()->GetLatestSequenceNumber(); + uint64_t ret = 0; + redis::List list(storage_, "test_ns2"); + list.Push("list-1", {"l0", "l1", "l2", "l3", "l4"}, false, &ret); + expected_next_sequences.emplace_back(storage_->GetDB()->GetLatestSequenceNumber() + 1); + list.Push("list-2", {"l0", "l1", "l2"}, false, &ret); + expected_next_sequences.emplace_back(storage_->GetDB()->GetLatestSequenceNumber() + 1); + ASSERT_TRUE(list.Trim("list-1", 2, 4).ok()); + expected_next_sequences.emplace_back(storage_->GetDB()->GetLatestSequenceNumber() + 1); + + engine::WALIterator iter(storage_); + + ASSERT_EQ(iter.NextSequenceNumber(), 0); + + for (iter.Seek(start_seq + 1); iter.Valid(); iter.Next()) { + next_sequences_set.emplace(iter.NextSequenceNumber()); + } + + std::vector next_sequences(next_sequences_set.begin(), next_sequences_set.end()); + + ASSERT_EQ(expected_next_sequences.size(), next_sequences.size()); + ASSERT_TRUE(std::equal(expected_next_sequences.begin(), expected_next_sequences.end(), next_sequences.begin())); }