Skip to content

Commit

Permalink
Fix missing stream increment batch in cluster migration (apache#1345)
Browse files Browse the repository at this point in the history
Currently, the cluster slot migration will split into two stages: send
snapshots and sync WAL if the snapshot was sent. For the stream, we only
parsed the KVs from the snapshot and missed the WAL part.

Co-authored-by: Twice <twice@apache.org>
  • Loading branch information
git-hulk and PragmaTwice authored Mar 24, 2023
1 parent dbc8a24 commit 76a80b5
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 19 deletions.
17 changes: 2 additions & 15 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -764,23 +764,10 @@ Status SlotMigrate::MigrateStream(const Slice &key, const StreamMetadata &metada
break;
}

// Parse values of the complex key
// InternalKey is adopted to get complex key's value from the formatted key returned by iterator of rocksdb
InternalKey inkey(iter->key(), true);
std::vector<std::string> values;
auto s = Redis::DecodeRawStreamEntryValue(iter->value().ToString(), &values);
auto s = WriteBatchExtractor::ExtractStreamAddCommand(true, iter->key(), iter->value(), &user_cmd);
if (!s.IsOK()) {
return {Status::NotOK, fmt::format("failed to decode stream values: {}", s.Msg())};
return s;
}

Slice encoded_id = inkey.GetSubKey();
Redis::StreamEntryID entry_id;
GetFixed64(&encoded_id, &entry_id.ms);
GetFixed64(&encoded_id, &entry_id.seq);

user_cmd.emplace_back(entry_id.ToString());
user_cmd.insert(user_cmd.end(), values.begin(), values.end());

*restore_cmds += Redis::MultiBulkString(user_cmd, false);
current_pipeline_size_++;

Expand Down
55 changes: 52 additions & 3 deletions src/storage/batch_extractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,26 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
}
}

if (metadata.Type() == kRedisStream) {
auto args = log_data_.GetArguments();
bool isSetID = args && args->size() > 0 && (*args)[0] == "XSETID";
if (!isSetID) {
return rocksdb::Status::OK();
}
StreamMetadata stream_metadata(kRedisStream);
auto s = stream_metadata.Decode(value.ToString());
if (!s.ok()) return s;
command_args = {"XSETID",
user_key,
stream_metadata.last_entry_id.ToString(),
"ENTRIESADDED",
std::to_string(stream_metadata.entries_added),
"MAXDELETEDID",
stream_metadata.max_deleted_entry_id.ToString()};
resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args));
}
return rocksdb::Status::OK();
}

if (column_family_id == kColumnFamilyIDDefault) {
} else if (column_family_id == kColumnFamilyIDDefault) {
InternalKey ikey(key, is_slotid_encoded_);
user_key = ikey.GetKey().ToString();
if (slot_ >= 0) {
Expand Down Expand Up @@ -193,6 +209,12 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
default:
break;
}
} else if (column_family_id == kColumnFamilyIDStream) {
auto s = ExtractStreamAddCommand(is_slotid_encoded_, key, value, &command_args);
if (!s.IsOK()) {
LOG(ERROR) << "Fail to parse write_batch for the stream type: " << s.Msg();
return rocksdb::Status::OK();
}
}

if (!command_args.empty()) {
Expand Down Expand Up @@ -278,6 +300,13 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const S
default:
break;
}
} else if (column_family_id == kColumnFamilyIDStream) {
InternalKey ikey(key, is_slotid_encoded_);
Slice encoded_id = ikey.GetSubKey();
Redis::StreamEntryID entry_id;
GetFixed64(&encoded_id, &entry_id.ms);
GetFixed64(&encoded_id, &entry_id.seq);
command_args = {"XDEL", ikey.GetKey().ToString(), entry_id.ToString()};
}

if (!command_args.empty()) {
Expand All @@ -291,3 +320,23 @@ rocksdb::Status WriteBatchExtractor::DeleteRangeCF(uint32_t column_family_id, co
// Do nothing about DeleteRange operations
return rocksdb::Status::OK();
}

Status WriteBatchExtractor::ExtractStreamAddCommand(bool is_slotid_encoded, const Slice &subkey, const Slice &value,
std::vector<std::string> *command_args) {
InternalKey ikey(subkey, is_slotid_encoded);
std::string user_key = ikey.GetKey().ToString();
*command_args = {"XADD", user_key};
std::vector<std::string> values;
auto s = Redis::DecodeRawStreamEntryValue(value.ToString(), &values);
if (!s.IsOK()) {
return s.Prefixed("failed to decode stream values");
}
Slice encoded_id = ikey.GetSubKey();
Redis::StreamEntryID entry_id;
GetFixed64(&encoded_id, &entry_id.ms);
GetFixed64(&encoded_id, &entry_id.seq);

command_args->emplace_back(entry_id.ToString());
command_args->insert(command_args->end(), values.begin(), values.end());
return Status::OK();
}
3 changes: 3 additions & 0 deletions src/storage/batch_extractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class WriteBatchExtractor : public rocksdb::WriteBatch::Handler {
rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const Slice &begin_key, const Slice &end_key) override;
std::map<std::string, std::vector<std::string>> *GetRESPCommands() { return &resp_commands_; }

static Status ExtractStreamAddCommand(bool is_slotid_encoded, const Slice &subkey, const Slice &value,
std::vector<std::string> *command_args);

private:
std::map<std::string, std::vector<std::string>> resp_commands_;
Redis::WriteBatchLogData log_data_;
Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ rocksdb::Status Stream::SetId(const Slice &stream_name, const StreamEntryID &las
}

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisStream);
WriteBatchLogData log_data(kRedisStream, {"XSETID"});
batch->PutLogData(log_data.Encode());

std::string bytes;
Expand Down
44 changes: 44 additions & 0 deletions tests/gocase/integration/slotmigrate/slotmigrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,50 @@ func TestSlotMigrateDataType(t *testing.T) {
}
})

t.Run("MIGRATE - increment sync stream from WAL", func(t *testing.T) {
slot := 40
keys := make(map[string]string, 0)
for _, typ := range []string{"stream"} {
keys[typ] = fmt.Sprintf("%s_{%s}", typ, util.SlotTable[slot])
require.NoError(t, rdb0.Del(ctx, keys[typ]).Err())
}
for i := 1; i < 1000; i++ {
idxStr := strconv.FormatInt(int64(i), 10)
require.NoError(t, rdb0.XAdd(ctx, &redis.XAddArgs{
Stream: keys["stream"],
ID: idxStr + "-0",
Values: []string{"key" + idxStr, "value" + idxStr},
}).Err())
}
streamInfo := rdb0.XInfoStream(ctx, keys["stream"]).Val()
require.EqualValues(t, "999-0", streamInfo.LastGeneratedID)
require.EqualValues(t, 999, streamInfo.EntriesAdded)
require.EqualValues(t, "0-0", streamInfo.MaxDeletedEntryID)
require.EqualValues(t, 999, streamInfo.Length)

// Slowdown the migration speed to prevent running before next increment commands
require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "256").Err())
defer func() {
require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "4096").Err())
}()
require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val())
newStreamID := "1001"
require.NoError(t, rdb0.XAdd(ctx, &redis.XAddArgs{
Stream: keys["stream"],
ID: newStreamID + "-0",
Values: []string{"key" + newStreamID, "value" + newStreamID},
}).Err())
require.NoError(t, rdb0.XDel(ctx, keys["stream"], "1-0").Err())
require.NoError(t, rdb0.Do(ctx, "XSETID", keys["stream"], "1001-0", "MAXDELETEDID", "2-0").Err())
waitForMigrateStateInDuration(t, rdb0, slot, SlotMigrationStateSuccess, time.Minute)

streamInfo = rdb1.XInfoStream(ctx, keys["stream"]).Val()
require.EqualValues(t, "1001-0", streamInfo.LastGeneratedID)
require.EqualValues(t, 1000, streamInfo.EntriesAdded)
require.EqualValues(t, "2-0", streamInfo.MaxDeletedEntryID)
require.EqualValues(t, 999, streamInfo.Length)
})

t.Run("MIGRATE - Migrating empty stream", func(t *testing.T) {
slot := 31
key := fmt.Sprintf("stream_{%s}", util.SlotTable[slot])
Expand Down

0 comments on commit 76a80b5

Please sign in to comment.