Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix missing stream increment batch in cluster migration #1345

Merged
merged 5 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 2 additions & 15 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -764,23 +764,10 @@ Status SlotMigrate::MigrateStream(const Slice &key, const StreamMetadata &metada
break;
}

// Parse values of the complex key
// InternalKey is adopted to get complex key's value from the formatted key returned by iterator of rocksdb
InternalKey inkey(iter->key(), true);
std::vector<std::string> values;
auto s = Redis::DecodeRawStreamEntryValue(iter->value().ToString(), &values);
auto s = WriteBatchExtractor::ExtractStreamAddCommand(true, iter->key(), iter->value(), &user_cmd);
if (!s.IsOK()) {
return {Status::NotOK, fmt::format("failed to decode stream values: {}", s.Msg())};
return s;
}

Slice encoded_id = inkey.GetSubKey();
Redis::StreamEntryID entry_id;
GetFixed64(&encoded_id, &entry_id.ms);
GetFixed64(&encoded_id, &entry_id.seq);

user_cmd.emplace_back(entry_id.ToString());
user_cmd.insert(user_cmd.end(), values.begin(), values.end());

*restore_cmds += Redis::MultiBulkString(user_cmd, false);
current_pipeline_size_++;

Expand Down
55 changes: 52 additions & 3 deletions src/storage/batch_extractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,26 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
}
}

if (metadata.Type() == kRedisStream) {
auto args = log_data_.GetArguments();
bool isSetID = args && args->size() > 0 && (*args)[0] == "XSETID";
if (!isSetID) {
return rocksdb::Status::OK();
}
StreamMetadata stream_metadata(kRedisStream);
auto s = stream_metadata.Decode(value.ToString());
if (!s.ok()) return s;
command_args = {"XSETID",
user_key,
stream_metadata.last_entry_id.ToString(),
"ENTRIESADDED",
std::to_string(stream_metadata.entries_added),
"MAXDELETEDID",
stream_metadata.max_deleted_entry_id.ToString()};
resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args));
}
return rocksdb::Status::OK();
}

if (column_family_id == kColumnFamilyIDDefault) {
} else if (column_family_id == kColumnFamilyIDDefault) {
InternalKey ikey(key, is_slotid_encoded_);
user_key = ikey.GetKey().ToString();
if (slot_ >= 0) {
Expand Down Expand Up @@ -193,6 +209,12 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
default:
break;
}
} else if (column_family_id == kColumnFamilyIDStream) {
auto s = ExtractStreamAddCommand(is_slotid_encoded_, key, value, &command_args);
if (!s.IsOK()) {
LOG(ERROR) << "Fail to parse write_batch for the stream type: " << s.Msg();
return rocksdb::Status::OK();
}
}

if (!command_args.empty()) {
Expand Down Expand Up @@ -278,6 +300,13 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const S
default:
break;
}
} else if (column_family_id == kColumnFamilyIDStream) {
InternalKey ikey(key, is_slotid_encoded_);
Slice encoded_id = ikey.GetSubKey();
Redis::StreamEntryID entry_id;
GetFixed64(&encoded_id, &entry_id.ms);
GetFixed64(&encoded_id, &entry_id.seq);
command_args = {"XDEL", ikey.GetKey().ToString(), entry_id.ToString()};
}

if (!command_args.empty()) {
Expand All @@ -291,3 +320,23 @@ rocksdb::Status WriteBatchExtractor::DeleteRangeCF(uint32_t column_family_id, co
// Do nothing about DeleteRange operations
return rocksdb::Status::OK();
}

Status WriteBatchExtractor::ExtractStreamAddCommand(bool is_slotid_encoded, const Slice &subkey, const Slice &value,
std::vector<std::string> *command_args) {
InternalKey ikey(subkey, is_slotid_encoded);
std::string user_key = ikey.GetKey().ToString();
*command_args = {"XADD", user_key};
std::vector<std::string> values;
auto s = Redis::DecodeRawStreamEntryValue(value.ToString(), &values);
if (!s.IsOK()) {
return s.Prefixed("failed to decode stream values");
}
Slice encoded_id = ikey.GetSubKey();
Redis::StreamEntryID entry_id;
GetFixed64(&encoded_id, &entry_id.ms);
GetFixed64(&encoded_id, &entry_id.seq);

command_args->emplace_back(entry_id.ToString());
command_args->insert(command_args->end(), values.begin(), values.end());
return Status::OK();
}
3 changes: 3 additions & 0 deletions src/storage/batch_extractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class WriteBatchExtractor : public rocksdb::WriteBatch::Handler {
rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const Slice &begin_key, const Slice &end_key) override;
std::map<std::string, std::vector<std::string>> *GetRESPCommands() { return &resp_commands_; }

static Status ExtractStreamAddCommand(bool is_slotid_encoded, const Slice &subkey, const Slice &value,
std::vector<std::string> *command_args);

private:
std::map<std::string, std::vector<std::string>> resp_commands_;
Redis::WriteBatchLogData log_data_;
Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ rocksdb::Status Stream::SetId(const Slice &stream_name, const StreamEntryID &las
}

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisStream);
WriteBatchLogData log_data(kRedisStream, {"XSETID"});
batch->PutLogData(log_data.Encode());

std::string bytes;
Expand Down
39 changes: 39 additions & 0 deletions tests/gocase/integration/slotmigrate/slotmigrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,45 @@ func TestSlotMigrateDataType(t *testing.T) {
}
})

t.Run("MIGRATE - increment sync stream from WAL", func(t *testing.T) {
slot := 3
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
keys := make(map[string]string, 0)
for _, typ := range []string{"stream"} {
keys[typ] = fmt.Sprintf("%s_{%s}", typ, util.SlotTable[slot])
require.NoError(t, rdb0.Del(ctx, keys[typ]).Err())
}
for i := 1; i < 10; i++ {
idxStr := strconv.FormatInt(int64(i), 10)
require.NoError(t, rdb0.XAdd(ctx, &redis.XAddArgs{
Stream: keys["stream"],
ID: idxStr + "-0",
Values: []string{"key" + idxStr, "value" + idxStr},
}).Err())
}
streamInfo := rdb0.XInfoStream(ctx, keys["stream"]).Val()
require.EqualValues(t, "9-0", streamInfo.LastGeneratedID)
require.EqualValues(t, 9, streamInfo.EntriesAdded)
require.EqualValues(t, "0-0", streamInfo.MaxDeletedEntryID)
require.EqualValues(t, 9, streamInfo.Length)

require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val())
newStreamID := "10"
require.NoError(t, rdb0.XAdd(ctx, &redis.XAddArgs{
Stream: keys["stream"],
ID: newStreamID + "-0",
Values: []string{"key" + newStreamID, "value" + newStreamID},
}).Err())
require.NoError(t, rdb0.XDel(ctx, keys["stream"], "1-0").Err())
require.NoError(t, rdb0.Do(ctx, "XSETID", keys["stream"], "10-0", "MAXDELETEDID", "2-0").Err())
waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess)

streamInfo = rdb1.XInfoStream(ctx, keys["stream"]).Val()
require.EqualValues(t, "10-0", streamInfo.LastGeneratedID)
require.EqualValues(t, 10, streamInfo.EntriesAdded)
require.EqualValues(t, "2-0", streamInfo.MaxDeletedEntryID)
require.EqualValues(t, 9, streamInfo.Length)
})

t.Run("MIGRATE - Migrating empty stream", func(t *testing.T) {
slot := 31
key := fmt.Sprintf("stream_{%s}", util.SlotTable[slot])
Expand Down