Skip to content

Commit

Permalink
Fallback to use the redis command migration type if the target don't …
Browse files Browse the repository at this point in the history
…support the ApplyBatch command (#2117)
  • Loading branch information
caipengbo authored Feb 28, 2024
1 parent 350816c commit 019faed
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 4 deletions.
47 changes: 43 additions & 4 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -290,24 +290,36 @@ Status SlotMigrator::startMigration() {
return s.Prefixed(errFailedToSetImportStatus);
}

migration_type_ = srv_->GetConfig()->migrate_type;

// If the APPLYBATCH command is not supported on the destination,
// we will fall back to the redis-command migration type.
if (migration_type_ == MigrationType::kRawKeyValue) {
bool supported = GET_OR_RET(supportedApplyBatchCommandOnDstNode(*dst_fd_));
if (!supported) {
LOG(INFO) << "APPLYBATCH command is not supported, use redis command for migration";
migration_type_ = MigrationType::kRedisCommand;
}
}

LOG(INFO) << "[migrate] Start migrating slot " << migrating_slot_ << ", connect destination fd " << *dst_fd_;

return Status::OK();
}

Status SlotMigrator::sendSnapshot() {
if (srv_->GetConfig()->migrate_type == MigrationType::kRedisCommand) {
if (migration_type_ == MigrationType::kRedisCommand) {
return sendSnapshotByCmd();
} else if (srv_->GetConfig()->migrate_type == MigrationType::kRawKeyValue) {
} else if (migration_type_ == MigrationType::kRawKeyValue) {
return sendSnapshotByRawKV();
}
return {Status::NotOK, errUnsupportedMigrationType};
}

Status SlotMigrator::syncWAL() {
if (srv_->GetConfig()->migrate_type == MigrationType::kRedisCommand) {
if (migration_type_ == MigrationType::kRedisCommand) {
return syncWALByCmd();
} else if (srv_->GetConfig()->migrate_type == MigrationType::kRawKeyValue) {
} else if (migration_type_ == MigrationType::kRawKeyValue) {
return syncWALByRawKV();
}
return {Status::NotOK, errUnsupportedMigrationType};
Expand Down Expand Up @@ -485,6 +497,33 @@ Status SlotMigrator::setImportStatusOnDstNode(int sock_fd, int status) {
return Status::OK();
}

StatusOr<bool> SlotMigrator::supportedApplyBatchCommandOnDstNode(int sock_fd) {
std::string cmd = redis::ArrayOfBulkStrings({"command", "info", "applybatch"});
auto s = util::SockSend(sock_fd, cmd);
if (!s.IsOK()) {
return s.Prefixed("failed to send command info to the destination node");
}

UniqueEvbuf evbuf;
if (evbuffer_read(evbuf.get(), sock_fd, -1) <= 0) {
return Status::FromErrno("read response error");
}

UniqueEvbufReadln line(evbuf.get(), EVBUFFER_EOL_CRLF_STRICT);
if (!line) {
return Status::FromErrno("read empty response");
}

if (line[0] == '*') {
line = UniqueEvbufReadln(evbuf.get(), EVBUFFER_EOL_LF);
if (line && line[0] == '*') {
return true;
}
}

return false;
}

Status SlotMigrator::checkSingleResponse(int sock_fd) { return checkMultipleResponses(sock_fd, 1); }

// Commands | Response | Instance
Expand Down
2 changes: 2 additions & 0 deletions src/cluster/slot_migrate.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class SlotMigrator : public redis::Database {

Status authOnDstNode(int sock_fd, const std::string &password);
Status setImportStatusOnDstNode(int sock_fd, int status);
static StatusOr<bool> supportedApplyBatchCommandOnDstNode(int sock_fd);

Status sendSnapshotByCmd();
Status syncWALByCmd();
Expand Down Expand Up @@ -187,6 +188,7 @@ class SlotMigrator : public redis::Database {
int dst_port_ = -1;
UniqueFD dst_fd_;

MigrationType migration_type_ = MigrationType::kRedisCommand;
std::atomic<int16_t> forbidden_slot_ = -1;
std::atomic<int16_t> migrating_slot_ = -1;
int16_t migrate_failed_slot_ = -1;
Expand Down
44 changes: 44 additions & 0 deletions tests/gocase/integration/slotmigrate/slotmigrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,50 @@ func TestSlotMigrateDataType(t *testing.T) {
})
}

func TestSlotMigrateTypeFallback(t *testing.T) {
ctx := context.Background()

srv0 := util.StartServer(t, map[string]string{
"cluster-enabled": "yes",
"migrate-type": "raw-key-value",
})

defer srv0.Close()
rdb0 := srv0.NewClient()
defer func() { require.NoError(t, rdb0.Close()) }()
id0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
require.NoError(t, rdb0.Do(ctx, "clusterx", "setnodeid", id0).Err())

srv1 := util.StartServer(t, map[string]string{
"cluster-enabled": "yes",
"rename-command": "APPLYBATCH APPLYBATCH_RENAMED",
})
defer srv1.Close()
rdb1 := srv1.NewClient()
defer func() { require.NoError(t, rdb1.Close()) }()
id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
require.NoError(t, rdb1.Do(ctx, "clusterx", "setnodeid", id1).Err())

clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", id0, srv0.Host(), srv0.Port())
clusterNodes += fmt.Sprintf("%s %s %d master -", id1, srv1.Host(), srv1.Port())
require.NoError(t, rdb0.Do(ctx, "clusterx", "setnodes", clusterNodes, "1").Err())
require.NoError(t, rdb1.Do(ctx, "clusterx", "setnodes", clusterNodes, "1").Err())

t.Run("MIGRATE - Fall back to redis-command migration type when the destination does not support APPLYBATCH", func(t *testing.T) {
info, err := rdb1.Do(ctx, "command", "info", "applybatch").Slice()
require.NoError(t, err)
require.Len(t, info, 1)
require.Nil(t, info[0])
testSlot += 1
key := util.SlotTable[testSlot]
value := "value"
require.NoError(t, rdb0.Set(ctx, key, value, 0).Err())
require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", testSlot, id1).Val())
waitForMigrateState(t, rdb0, testSlot, SlotMigrationStateSuccess)
require.Equal(t, value, rdb1.Get(ctx, key).Val())
})
}

func waitForMigrateState(t testing.TB, client *redis.Client, slot int, state SlotMigrationState) {
waitForMigrateStateInDuration(t, client, slot, state, 5*time.Second)
}
Expand Down

0 comments on commit 019faed

Please sign in to comment.