From 44f38d5946449a88b61e4e9c5734e19335a06630 Mon Sep 17 00:00:00 2001 From: git-hulk Date: Thu, 23 Mar 2023 00:15:57 +0800 Subject: [PATCH 1/5] Fix didn't parse the stream column family when syncing WAL in cluster migration Currently, the cluster slot migration will split into two stages: send snapshots and sync WAL if the snapshot was sent. For the stream, we only parsed the KVs from snapshot and missing the WAL part. --- src/cluster/slot_migrate.cc | 17 +-------- src/storage/batch_extractor.cc | 37 +++++++++++++++++-- src/storage/batch_extractor.h | 3 ++ .../slotmigrate/slotmigrate_test.go | 19 +++++++--- 4 files changed, 53 insertions(+), 23 deletions(-) diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index 4484e72e264..7e558a118a5 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -764,23 +764,10 @@ Status SlotMigrate::MigrateStream(const Slice &key, const StreamMetadata &metada break; } - // Parse values of the complex key - // InternalKey is adopted to get complex key's value from the formatted key returned by iterator of rocksdb - InternalKey inkey(iter->key(), true); - std::vector values; - auto s = Redis::DecodeRawStreamEntryValue(iter->value().ToString(), &values); + auto s = WriteBatchExtractor::ExtractStreamAddCommand(true, iter->key(), iter->value(), &user_cmd); if (!s.IsOK()) { - return {Status::NotOK, fmt::format("failed to decode stream values: {}", s.Msg())}; + return s; } - - Slice encoded_id = inkey.GetSubKey(); - Redis::StreamEntryID entry_id; - GetFixed64(&encoded_id, &entry_id.ms); - GetFixed64(&encoded_id, &entry_id.seq); - - user_cmd.emplace_back(entry_id.ToString()); - user_cmd.insert(user_cmd.end(), values.begin(), values.end()); - *restore_cmds += Redis::MultiBulkString(user_cmd, false); current_pipeline_size_++; diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc index cd7453ab13c..47ac0d37dbb 100644 --- a/src/storage/batch_extractor.cc +++ b/src/storage/batch_extractor.cc @@ -81,9 +81,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic } return rocksdb::Status::OK(); - } - - if (column_family_id == kColumnFamilyIDDefault) { + } else if (column_family_id == kColumnFamilyIDDefault) { InternalKey ikey(key, is_slotid_encoded_); user_key = ikey.GetKey().ToString(); if (slot_ >= 0) { @@ -193,6 +191,12 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic default: break; } + } else if (column_family_id == kColumnFamilyIDStream) { + auto s = ExtractStreamAddCommand(is_slotid_encoded_, key, value, &command_args); + if (!s.IsOK()) { + LOG(ERROR) << "Fail to parse write_batch for the stream type: " << s.Msg(); + return rocksdb::Status::OK(); + } } if (!command_args.empty()) { @@ -278,6 +282,13 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const S default: break; } + } else if (column_family_id == kColumnFamilyIDStream) { + InternalKey ikey(key, is_slotid_encoded_); + Slice encoded_id = ikey.GetSubKey(); + Redis::StreamEntryID entry_id; + GetFixed64(&encoded_id, &entry_id.ms); + GetFixed64(&encoded_id, &entry_id.seq); + command_args = {"XDEL", ikey.GetKey().ToString(), entry_id.ToString()}; } if (!command_args.empty()) { @@ -291,3 +302,23 @@ rocksdb::Status WriteBatchExtractor::DeleteRangeCF(uint32_t column_family_id, co // Do nothing about DeleteRange operations return rocksdb::Status::OK(); } + +Status WriteBatchExtractor::ExtractStreamAddCommand(bool is_slotid_encoded, const Slice &subkey, const Slice &value, + std::vector *command_args) { + InternalKey ikey(subkey, is_slotid_encoded); + std::string user_key = ikey.GetKey().ToString(); + *command_args = {"XADD", user_key}; + std::vector values; + auto s = Redis::DecodeRawStreamEntryValue(value.ToString(), &values); + if (!s.IsOK()) { + return {Status::NotOK, fmt::format("failed to decode stream values: {}", s.Msg())}; + } + Slice encoded_id = ikey.GetSubKey(); + Redis::StreamEntryID entry_id; + GetFixed64(&encoded_id, &entry_id.ms); + GetFixed64(&encoded_id, &entry_id.seq); + + command_args->emplace_back(entry_id.ToString()); + command_args->insert(command_args->end(), values.begin(), values.end()); + return Status::OK(); +} diff --git a/src/storage/batch_extractor.h b/src/storage/batch_extractor.h index a787e65a084..31c56fc45e3 100644 --- a/src/storage/batch_extractor.h +++ b/src/storage/batch_extractor.h @@ -40,6 +40,9 @@ class WriteBatchExtractor : public rocksdb::WriteBatch::Handler { rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const Slice &begin_key, const Slice &end_key) override; std::map> *GetRESPCommands() { return &resp_commands_; } + static Status ExtractStreamAddCommand(bool is_slotid_encoded, const Slice &subkey, const Slice &value, + std::vector *command_args); + private: std::map> resp_commands_; Redis::WriteBatchLogData log_data_; diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go b/tests/gocase/integration/slotmigrate/slotmigrate_test.go index e6deb71e8eb..b4b97ecd38c 100644 --- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go +++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go @@ -486,7 +486,7 @@ func TestSlotMigrateDataType(t *testing.T) { sv := rdb0.SMembers(ctx, keys["set"]).Val() zv := rdb0.ZRangeWithScores(ctx, keys["zset"], 0, -1).Val() siv := rdb0.Do(ctx, "SIRANGE", keys["sortint"], 0, -1).Val() - stV := rdb0.XRange(ctx, keys["stream"], "-", "+").Val() + stV := rdb0.XRange(ctx, keys["stream"], "2", "+").Val() streamInfo := rdb0.XInfoStream(ctx, keys["stream"]).Val() require.EqualValues(t, "19-0", streamInfo.LastGeneratedID) require.EqualValues(t, 19, streamInfo.EntriesAdded) @@ -495,6 +495,15 @@ func TestSlotMigrateDataType(t *testing.T) { // migrate slot 1, all keys above are belong to slot 1 require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + + // increment WAL migration + newStreamID := "20" + require.NoError(t, rdb0.XAdd(ctx, &redis.XAddArgs{ + Stream: keys["stream"], + ID: newStreamID + "-0", + Values: []string{"key" + newStreamID, "value" + newStreamID}, + }).Err()) + require.NoError(t, rdb0.XDel(ctx, keys["stream"], "1-0").Err()) waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) // check destination data @@ -529,12 +538,12 @@ func TestSlotMigrateDataType(t *testing.T) { require.EqualValues(t, siv, rdb1.Do(ctx, "SIRANGE", keys["sortint"], 0, -1).Val()) util.BetweenValues(t, rdb1.TTL(ctx, keys["sortint"]).Val(), time.Second, 10*time.Second) // type stream - require.EqualValues(t, stV, rdb1.XRange(ctx, keys["stream"], "-", "+").Val()) + require.EqualValues(t, stV, rdb1.XRange(ctx, keys["stream"], "-", "19").Val()) util.BetweenValues(t, rdb1.TTL(ctx, keys["stream"]).Val(), time.Second, 10*time.Second) streamInfo = rdb1.XInfoStream(ctx, keys["stream"]).Val() - require.EqualValues(t, "19-0", streamInfo.LastGeneratedID) - require.EqualValues(t, 19, streamInfo.EntriesAdded) - require.EqualValues(t, "0-0", streamInfo.MaxDeletedEntryID) + require.EqualValues(t, "20-0", streamInfo.LastGeneratedID) + require.EqualValues(t, 20, streamInfo.EntriesAdded) + require.EqualValues(t, "1-0", streamInfo.MaxDeletedEntryID) require.EqualValues(t, 19, streamInfo.Length) // topology is changed on source server for _, typ := range []string{"string", "list", "hash", "set", "zset", "bitmap", "sortint", "stream"} { From 27fde6e6135e0b61820f807ba0ae4d74fce2d1e1 Mon Sep 17 00:00:00 2001 From: hulk Date: Thu, 23 Mar 2023 13:19:46 +0800 Subject: [PATCH 2/5] Update src/storage/batch_extractor.cc Co-authored-by: Twice --- src/storage/batch_extractor.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc index 47ac0d37dbb..0d2a3811a2d 100644 --- a/src/storage/batch_extractor.cc +++ b/src/storage/batch_extractor.cc @@ -311,7 +311,7 @@ Status WriteBatchExtractor::ExtractStreamAddCommand(bool is_slotid_encoded, cons std::vector values; auto s = Redis::DecodeRawStreamEntryValue(value.ToString(), &values); if (!s.IsOK()) { - return {Status::NotOK, fmt::format("failed to decode stream values: {}", s.Msg())}; + return s.Prefixed("failed to decode stream values"); } Slice encoded_id = ikey.GetSubKey(); Redis::StreamEntryID entry_id; From dd37f9e1180248edfe0bf184db9b86bd14a22bb2 Mon Sep 17 00:00:00 2001 From: git-hulk Date: Fri, 24 Mar 2023 00:23:00 +0800 Subject: [PATCH 3/5] Support migrating XSETID from the rocksdb WAL --- src/storage/batch_extractor.cc | 18 ++++++ src/types/redis_stream.cc | 2 +- .../slotmigrate/slotmigrate_test.go | 58 ++++++++++++++----- 3 files changed, 63 insertions(+), 15 deletions(-) diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc index 0d2a3811a2d..611c3c2830e 100644 --- a/src/storage/batch_extractor.cc +++ b/src/storage/batch_extractor.cc @@ -80,6 +80,24 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic } } + if (metadata.Type() == kRedisStream) { + auto args = log_data_.GetArguments(); + bool isSetID = args && args->size() > 0 && (*args)[0] == "XSETID"; + if (!isSetID) { + return rocksdb::Status::OK(); + } + StreamMetadata stream_metadata(kRedisStream); + auto s = stream_metadata.Decode(value.ToString()); + if (!s.ok()) return s; + command_args = {"XSETID", + user_key, + stream_metadata.last_entry_id.ToString(), + "ENTRIESADDED", + std::to_string(stream_metadata.entries_added), + "MAXDELETEDID", + stream_metadata.max_deleted_entry_id.ToString()}; + resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args)); + } return rocksdb::Status::OK(); } else if (column_family_id == kColumnFamilyIDDefault) { InternalKey ikey(key, is_slotid_encoded_); diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index f9aafafd3a8..2b122a166a7 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -719,7 +719,7 @@ rocksdb::Status Stream::SetId(const Slice &stream_name, const StreamEntryID &las } auto batch = storage_->GetWriteBatchBase(); - WriteBatchLogData log_data(kRedisStream); + WriteBatchLogData log_data(kRedisStream, {"XSETID"}); batch->PutLogData(log_data.Encode()); std::string bytes; diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go b/tests/gocase/integration/slotmigrate/slotmigrate_test.go index b4b97ecd38c..2416341cdbe 100644 --- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go +++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go @@ -486,7 +486,7 @@ func TestSlotMigrateDataType(t *testing.T) { sv := rdb0.SMembers(ctx, keys["set"]).Val() zv := rdb0.ZRangeWithScores(ctx, keys["zset"], 0, -1).Val() siv := rdb0.Do(ctx, "SIRANGE", keys["sortint"], 0, -1).Val() - stV := rdb0.XRange(ctx, keys["stream"], "2", "+").Val() + stV := rdb0.XRange(ctx, keys["stream"], "-", "+").Val() streamInfo := rdb0.XInfoStream(ctx, keys["stream"]).Val() require.EqualValues(t, "19-0", streamInfo.LastGeneratedID) require.EqualValues(t, 19, streamInfo.EntriesAdded) @@ -495,15 +495,6 @@ func TestSlotMigrateDataType(t *testing.T) { // migrate slot 1, all keys above are belong to slot 1 require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) - - // increment WAL migration - newStreamID := "20" - require.NoError(t, rdb0.XAdd(ctx, &redis.XAddArgs{ - Stream: keys["stream"], - ID: newStreamID + "-0", - Values: []string{"key" + newStreamID, "value" + newStreamID}, - }).Err()) - require.NoError(t, rdb0.XDel(ctx, keys["stream"], "1-0").Err()) waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) // check destination data @@ -538,12 +529,12 @@ func TestSlotMigrateDataType(t *testing.T) { require.EqualValues(t, siv, rdb1.Do(ctx, "SIRANGE", keys["sortint"], 0, -1).Val()) util.BetweenValues(t, rdb1.TTL(ctx, keys["sortint"]).Val(), time.Second, 10*time.Second) // type stream - require.EqualValues(t, stV, rdb1.XRange(ctx, keys["stream"], "-", "19").Val()) + require.EqualValues(t, stV, rdb1.XRange(ctx, keys["stream"], "-", "+").Val()) util.BetweenValues(t, rdb1.TTL(ctx, keys["stream"]).Val(), time.Second, 10*time.Second) streamInfo = rdb1.XInfoStream(ctx, keys["stream"]).Val() - require.EqualValues(t, "20-0", streamInfo.LastGeneratedID) - require.EqualValues(t, 20, streamInfo.EntriesAdded) - require.EqualValues(t, "1-0", streamInfo.MaxDeletedEntryID) + require.EqualValues(t, "19-0", streamInfo.LastGeneratedID) + require.EqualValues(t, 19, streamInfo.EntriesAdded) + require.EqualValues(t, "0-0", streamInfo.MaxDeletedEntryID) require.EqualValues(t, 19, streamInfo.Length) // topology is changed on source server for _, typ := range []string{"string", "list", "hash", "set", "zset", "bitmap", "sortint", "stream"} { @@ -551,6 +542,45 @@ func TestSlotMigrateDataType(t *testing.T) { } }) + t.Run("MIGRATE - increment sync stream from WAL", func(t *testing.T) { + slot := 3 + keys := make(map[string]string, 0) + for _, typ := range []string{"stream"} { + keys[typ] = fmt.Sprintf("%s_{%s}", typ, util.SlotTable[slot]) + require.NoError(t, rdb0.Del(ctx, keys[typ]).Err()) + } + for i := 1; i < 10; i++ { + idxStr := strconv.FormatInt(int64(i), 10) + require.NoError(t, rdb0.XAdd(ctx, &redis.XAddArgs{ + Stream: keys["stream"], + ID: idxStr + "-0", + Values: []string{"key" + idxStr, "value" + idxStr}, + }).Err()) + } + streamInfo := rdb0.XInfoStream(ctx, keys["stream"]).Val() + require.EqualValues(t, "9-0", streamInfo.LastGeneratedID) + require.EqualValues(t, 9, streamInfo.EntriesAdded) + require.EqualValues(t, "0-0", streamInfo.MaxDeletedEntryID) + require.EqualValues(t, 9, streamInfo.Length) + + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + newStreamID := "10" + require.NoError(t, rdb0.XAdd(ctx, &redis.XAddArgs{ + Stream: keys["stream"], + ID: newStreamID + "-0", + Values: []string{"key" + newStreamID, "value" + newStreamID}, + }).Err()) + require.NoError(t, rdb0.XDel(ctx, keys["stream"], "1-0").Err()) + require.NoError(t, rdb0.Do(ctx, "XSETID", keys["stream"], "10-0", "MAXDELETEDID", "2-0").Err()) + waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) + + streamInfo = rdb1.XInfoStream(ctx, keys["stream"]).Val() + require.EqualValues(t, "10-0", streamInfo.LastGeneratedID) + require.EqualValues(t, 10, streamInfo.EntriesAdded) + require.EqualValues(t, "2-0", streamInfo.MaxDeletedEntryID) + require.EqualValues(t, 9, streamInfo.Length) + }) + t.Run("MIGRATE - Migrating empty stream", func(t *testing.T) { slot := 31 key := fmt.Sprintf("stream_{%s}", util.SlotTable[slot]) From f0fe7df314346f474564a67c159d7795fa1318bd Mon Sep 17 00:00:00 2001 From: hulk Date: Fri, 24 Mar 2023 10:13:29 +0800 Subject: [PATCH 4/5] Update tests/gocase/integration/slotmigrate/slotmigrate_test.go --- tests/gocase/integration/slotmigrate/slotmigrate_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go b/tests/gocase/integration/slotmigrate/slotmigrate_test.go index 2416341cdbe..8afb8fb2b5a 100644 --- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go +++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go @@ -543,7 +543,7 @@ func TestSlotMigrateDataType(t *testing.T) { }) t.Run("MIGRATE - increment sync stream from WAL", func(t *testing.T) { - slot := 3 + slot := 40 keys := make(map[string]string, 0) for _, typ := range []string{"stream"} { keys[typ] = fmt.Sprintf("%s_{%s}", typ, util.SlotTable[slot]) From 4c213ad92b3816ea92e7f04098bbfb740ca1e754 Mon Sep 17 00:00:00 2001 From: git-hulk Date: Fri, 24 Mar 2023 12:13:27 +0800 Subject: [PATCH 5/5] Increase migrate dataset to prevent flaky test --- .../slotmigrate/slotmigrate_test.go | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go b/tests/gocase/integration/slotmigrate/slotmigrate_test.go index 8afb8fb2b5a..7040ae4e728 100644 --- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go +++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go @@ -549,7 +549,7 @@ func TestSlotMigrateDataType(t *testing.T) { keys[typ] = fmt.Sprintf("%s_{%s}", typ, util.SlotTable[slot]) require.NoError(t, rdb0.Del(ctx, keys[typ]).Err()) } - for i := 1; i < 10; i++ { + for i := 1; i < 1000; i++ { idxStr := strconv.FormatInt(int64(i), 10) require.NoError(t, rdb0.XAdd(ctx, &redis.XAddArgs{ Stream: keys["stream"], @@ -558,27 +558,32 @@ func TestSlotMigrateDataType(t *testing.T) { }).Err()) } streamInfo := rdb0.XInfoStream(ctx, keys["stream"]).Val() - require.EqualValues(t, "9-0", streamInfo.LastGeneratedID) - require.EqualValues(t, 9, streamInfo.EntriesAdded) + require.EqualValues(t, "999-0", streamInfo.LastGeneratedID) + require.EqualValues(t, 999, streamInfo.EntriesAdded) require.EqualValues(t, "0-0", streamInfo.MaxDeletedEntryID) - require.EqualValues(t, 9, streamInfo.Length) + require.EqualValues(t, 999, streamInfo.Length) + // Slowdown the migration speed to prevent running before next increment commands + require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "256").Err()) + defer func() { + require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "4096").Err()) + }() require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) - newStreamID := "10" + newStreamID := "1001" require.NoError(t, rdb0.XAdd(ctx, &redis.XAddArgs{ Stream: keys["stream"], ID: newStreamID + "-0", Values: []string{"key" + newStreamID, "value" + newStreamID}, }).Err()) require.NoError(t, rdb0.XDel(ctx, keys["stream"], "1-0").Err()) - require.NoError(t, rdb0.Do(ctx, "XSETID", keys["stream"], "10-0", "MAXDELETEDID", "2-0").Err()) - waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) + require.NoError(t, rdb0.Do(ctx, "XSETID", keys["stream"], "1001-0", "MAXDELETEDID", "2-0").Err()) + waitForMigrateStateInDuration(t, rdb0, slot, SlotMigrationStateSuccess, time.Minute) streamInfo = rdb1.XInfoStream(ctx, keys["stream"]).Val() - require.EqualValues(t, "10-0", streamInfo.LastGeneratedID) - require.EqualValues(t, 10, streamInfo.EntriesAdded) + require.EqualValues(t, "1001-0", streamInfo.LastGeneratedID) + require.EqualValues(t, 1000, streamInfo.EntriesAdded) require.EqualValues(t, "2-0", streamInfo.MaxDeletedEntryID) - require.EqualValues(t, 9, streamInfo.Length) + require.EqualValues(t, 999, streamInfo.Length) }) t.Run("MIGRATE - Migrating empty stream", func(t *testing.T) {