diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index 4484e72e264..7e558a118a5 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -764,23 +764,10 @@ Status SlotMigrate::MigrateStream(const Slice &key, const StreamMetadata &metada break; } - // Parse values of the complex key - // InternalKey is adopted to get complex key's value from the formatted key returned by iterator of rocksdb - InternalKey inkey(iter->key(), true); - std::vector values; - auto s = Redis::DecodeRawStreamEntryValue(iter->value().ToString(), &values); + auto s = WriteBatchExtractor::ExtractStreamAddCommand(true, iter->key(), iter->value(), &user_cmd); if (!s.IsOK()) { - return {Status::NotOK, fmt::format("failed to decode stream values: {}", s.Msg())}; + return s; } - - Slice encoded_id = inkey.GetSubKey(); - Redis::StreamEntryID entry_id; - GetFixed64(&encoded_id, &entry_id.ms); - GetFixed64(&encoded_id, &entry_id.seq); - - user_cmd.emplace_back(entry_id.ToString()); - user_cmd.insert(user_cmd.end(), values.begin(), values.end()); - *restore_cmds += Redis::MultiBulkString(user_cmd, false); current_pipeline_size_++; diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc index cd7453ab13c..611c3c2830e 100644 --- a/src/storage/batch_extractor.cc +++ b/src/storage/batch_extractor.cc @@ -80,10 +80,26 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic } } + if (metadata.Type() == kRedisStream) { + auto args = log_data_.GetArguments(); + bool isSetID = args && args->size() > 0 && (*args)[0] == "XSETID"; + if (!isSetID) { + return rocksdb::Status::OK(); + } + StreamMetadata stream_metadata(kRedisStream); + auto s = stream_metadata.Decode(value.ToString()); + if (!s.ok()) return s; + command_args = {"XSETID", + user_key, + stream_metadata.last_entry_id.ToString(), + "ENTRIESADDED", + std::to_string(stream_metadata.entries_added), + "MAXDELETEDID", + stream_metadata.max_deleted_entry_id.ToString()}; + resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args)); + } return rocksdb::Status::OK(); - } - - if (column_family_id == kColumnFamilyIDDefault) { + } else if (column_family_id == kColumnFamilyIDDefault) { InternalKey ikey(key, is_slotid_encoded_); user_key = ikey.GetKey().ToString(); if (slot_ >= 0) { @@ -193,6 +209,12 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic default: break; } + } else if (column_family_id == kColumnFamilyIDStream) { + auto s = ExtractStreamAddCommand(is_slotid_encoded_, key, value, &command_args); + if (!s.IsOK()) { + LOG(ERROR) << "Fail to parse write_batch for the stream type: " << s.Msg(); + return rocksdb::Status::OK(); + } } if (!command_args.empty()) { @@ -278,6 +300,13 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const S default: break; } + } else if (column_family_id == kColumnFamilyIDStream) { + InternalKey ikey(key, is_slotid_encoded_); + Slice encoded_id = ikey.GetSubKey(); + Redis::StreamEntryID entry_id; + GetFixed64(&encoded_id, &entry_id.ms); + GetFixed64(&encoded_id, &entry_id.seq); + command_args = {"XDEL", ikey.GetKey().ToString(), entry_id.ToString()}; } if (!command_args.empty()) { @@ -291,3 +320,23 @@ rocksdb::Status WriteBatchExtractor::DeleteRangeCF(uint32_t column_family_id, co // Do nothing about DeleteRange operations return rocksdb::Status::OK(); } + +Status WriteBatchExtractor::ExtractStreamAddCommand(bool is_slotid_encoded, const Slice &subkey, const Slice &value, + std::vector *command_args) { + InternalKey ikey(subkey, is_slotid_encoded); + std::string user_key = ikey.GetKey().ToString(); + *command_args = {"XADD", user_key}; + std::vector values; + auto s = Redis::DecodeRawStreamEntryValue(value.ToString(), &values); + if (!s.IsOK()) { + return s.Prefixed("failed to decode stream values"); + } + Slice encoded_id = ikey.GetSubKey(); + Redis::StreamEntryID entry_id; + GetFixed64(&encoded_id, &entry_id.ms); + GetFixed64(&encoded_id, &entry_id.seq); + + command_args->emplace_back(entry_id.ToString()); + command_args->insert(command_args->end(), values.begin(), values.end()); + return Status::OK(); +} diff --git a/src/storage/batch_extractor.h b/src/storage/batch_extractor.h index a787e65a084..31c56fc45e3 100644 --- a/src/storage/batch_extractor.h +++ b/src/storage/batch_extractor.h @@ -40,6 +40,9 @@ class WriteBatchExtractor : public rocksdb::WriteBatch::Handler { rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const Slice &begin_key, const Slice &end_key) override; std::map> *GetRESPCommands() { return &resp_commands_; } + static Status ExtractStreamAddCommand(bool is_slotid_encoded, const Slice &subkey, const Slice &value, + std::vector *command_args); + private: std::map> resp_commands_; Redis::WriteBatchLogData log_data_; diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index f9aafafd3a8..2b122a166a7 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -719,7 +719,7 @@ rocksdb::Status Stream::SetId(const Slice &stream_name, const StreamEntryID &las } auto batch = storage_->GetWriteBatchBase(); - WriteBatchLogData log_data(kRedisStream); + WriteBatchLogData log_data(kRedisStream, {"XSETID"}); batch->PutLogData(log_data.Encode()); std::string bytes; diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go b/tests/gocase/integration/slotmigrate/slotmigrate_test.go index e6deb71e8eb..7040ae4e728 100644 --- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go +++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go @@ -542,6 +542,50 @@ func TestSlotMigrateDataType(t *testing.T) { } }) + t.Run("MIGRATE - increment sync stream from WAL", func(t *testing.T) { + slot := 40 + keys := make(map[string]string, 0) + for _, typ := range []string{"stream"} { + keys[typ] = fmt.Sprintf("%s_{%s}", typ, util.SlotTable[slot]) + require.NoError(t, rdb0.Del(ctx, keys[typ]).Err()) + } + for i := 1; i < 1000; i++ { + idxStr := strconv.FormatInt(int64(i), 10) + require.NoError(t, rdb0.XAdd(ctx, &redis.XAddArgs{ + Stream: keys["stream"], + ID: idxStr + "-0", + Values: []string{"key" + idxStr, "value" + idxStr}, + }).Err()) + } + streamInfo := rdb0.XInfoStream(ctx, keys["stream"]).Val() + require.EqualValues(t, "999-0", streamInfo.LastGeneratedID) + require.EqualValues(t, 999, streamInfo.EntriesAdded) + require.EqualValues(t, "0-0", streamInfo.MaxDeletedEntryID) + require.EqualValues(t, 999, streamInfo.Length) + + // Slowdown the migration speed to prevent running before next increment commands + require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "256").Err()) + defer func() { + require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "4096").Err()) + }() + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + newStreamID := "1001" + require.NoError(t, rdb0.XAdd(ctx, &redis.XAddArgs{ + Stream: keys["stream"], + ID: newStreamID + "-0", + Values: []string{"key" + newStreamID, "value" + newStreamID}, + }).Err()) + require.NoError(t, rdb0.XDel(ctx, keys["stream"], "1-0").Err()) + require.NoError(t, rdb0.Do(ctx, "XSETID", keys["stream"], "1001-0", "MAXDELETEDID", "2-0").Err()) + waitForMigrateStateInDuration(t, rdb0, slot, SlotMigrationStateSuccess, time.Minute) + + streamInfo = rdb1.XInfoStream(ctx, keys["stream"]).Val() + require.EqualValues(t, "1001-0", streamInfo.LastGeneratedID) + require.EqualValues(t, 1000, streamInfo.EntriesAdded) + require.EqualValues(t, "2-0", streamInfo.MaxDeletedEntryID) + require.EqualValues(t, 999, streamInfo.Length) + }) + t.Run("MIGRATE - Migrating empty stream", func(t *testing.T) { slot := 31 key := fmt.Sprintf("stream_{%s}", util.SlotTable[slot])