Skip to content

Commit

Permalink
Fix should forbid importing the slot which belongs to itself in clust…
Browse files Browse the repository at this point in the history
…er mode (#2209)
  • Loading branch information
git-hulk authored Mar 30, 2024
1 parent ded67ca commit bb665f6
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 36 deletions.
4 changes: 4 additions & 0 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ Status Cluster::ImportSlot(redis::Connection *conn, int slot, int state) {
if (!IsValidSlot(slot)) {
return {Status::NotOK, errSlotOutOfRange};
}
auto source_node = srv_->cluster->slots_nodes_[slot];
if (source_node && source_node->id == myid_) {
return {Status::NotOK, "Can't import slot which belongs to me"};
}

Status s;
switch (state) {
Expand Down
4 changes: 4 additions & 0 deletions src/cluster/slot_import.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ SlotImport::SlotImport(Server *srv)
Status SlotImport::Start(int slot) {
std::lock_guard<std::mutex> guard(mutex_);
if (import_status_ == kImportStart) {
// return ok if the same slot is importing
if (import_slot_ == slot) {
return Status::OK();
}
return {Status::NotOK, fmt::format("only one importing slot is allowed, current slot is: {}", import_slot_)};
}

Expand Down
96 changes: 60 additions & 36 deletions tests/gocase/integration/slotimport/slotimport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package slotimport
import (
"context"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -61,90 +62,113 @@ func TestImportSlaveServer(t *testing.T) {
func TestImportedServer(t *testing.T) {
ctx := context.Background()

srv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
defer func() { srv.Close() }()
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()
srvID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
clusterNodes := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383", srvID, srv.Port())
require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODEID", srvID).Err())
require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
srvA := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
defer func() { srvA.Close() }()
rdbA := srvA.NewClient()
defer func() { require.NoError(t, rdbA.Close()) }()
srvAID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
require.NoError(t, rdbA.Do(ctx, "clusterx", "SETNODEID", srvAID).Err())

srvB := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
defer func() { srvB.Close() }()
rdbB := srvB.NewClient()
defer func() { require.NoError(t, rdbB.Close()) }()
srvBID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
require.NoError(t, rdbB.Do(ctx, "clusterx", "SETNODEID", srvBID).Err())

clusterNodes := fmt.Sprintf("%s 127.0.0.1 %d master - 0-8191", srvAID, srvA.Port())
clusterNodes = fmt.Sprintf("%s\n%s 127.0.0.1 %d master - 8192-16383", clusterNodes, srvBID, srvB.Port())

require.NoError(t, rdbA.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
require.NoError(t, rdbB.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())

t.Run("IMPORT - error slot", func(t *testing.T) {
require.ErrorContains(t, rdb.Do(ctx, "cluster", "import", -1, 0).Err(), "Slot is out of range")
require.ErrorContains(t, rdb.Do(ctx, "cluster", "import", 16384, 0).Err(), "Slot is out of range")
require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", -1, 0).Err(), "Slot is out of range")
require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", 16384, 0).Err(), "Slot is out of range")
})

t.Run("IMPORT - slot with error state", func(t *testing.T) {
require.ErrorContains(t, rdb.Do(ctx, "cluster", "import", 1, 4).Err(), "Invalid import state")
require.ErrorContains(t, rdb.Do(ctx, "cluster", "import", 1, -3).Err(), "Invalid import state")
require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", 1, 4).Err(), "Invalid import state")
require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", 1, -3).Err(), "Invalid import state")
})

t.Run("IMPORT - slot with wrong state", func(t *testing.T) {
require.Contains(t, rdbA.Do(ctx, "cluster", "import", 1, 0).Err(),
"Can't import slot which belongs to me")
})

t.Run("IMPORT - slot states in right order", func(t *testing.T) {
slotNum := 1
slotKey := util.SlotTable[slotNum]

// import start
require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import", slotNum, 0).Val())
require.NoError(t, rdb.Set(ctx, slotKey, "slot1", 0).Err())
require.Equal(t, "slot1", rdb.Get(ctx, slotKey).Val())
clusterInfo := rdb.ClusterInfo(ctx).Val()
require.NoError(t, rdbA.Set(ctx, slotKey, "slot1", 0).Err())
require.Equal(t, "slot1", rdbA.Get(ctx, slotKey).Val())
require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import", slotNum, 0).Val())
clusterInfo := rdbB.ClusterInfo(ctx).Val()
require.Contains(t, clusterInfo, "importing_slot: 1")
require.Contains(t, clusterInfo, "import_state: start")
clusterNodes := rdbB.ClusterNodes(ctx).Val()
require.Contains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]", slotNum, srvAID))

require.NoError(t, rdbA.Do(ctx, "clusterx", "migrate", slotNum, srvBID).Err())
require.Eventually(t, func() bool {
clusterInfo := rdbA.ClusterInfo(context.Background()).Val()
return strings.Contains(clusterInfo, fmt.Sprintf("migrating_slot: %d", slotNum)) &&
strings.Contains(clusterInfo, fmt.Sprintf("migrating_state: %s", "success"))
}, 5*time.Second, 100*time.Millisecond)

clusterNodes := rdb.ClusterNodes(ctx).Val()
require.Contains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]", slotNum, srvID))
// import success
require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import", slotNum, 1).Val())
clusterInfo = rdb.ClusterInfo(ctx).Val()
require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import", slotNum, 1).Val())
clusterInfo = rdbB.ClusterInfo(ctx).Val()
require.Contains(t, clusterInfo, "importing_slot: 1")
require.Contains(t, clusterInfo, "import_state: success")

// import finish and should not contain the import section
clusterNodes = rdb.ClusterNodes(ctx).Val()
require.NotContains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]", slotNum, srvID))
clusterNodes = rdbB.ClusterNodes(ctx).Val()
require.NotContains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]", slotNum, srvAID))

time.Sleep(50 * time.Millisecond)
require.Equal(t, "slot1", rdb.Get(ctx, slotKey).Val())
require.Equal(t, "slot1", rdbB.Get(ctx, slotKey).Val())
})

t.Run("IMPORT - slot state 'error'", func(t *testing.T) {
slotNum := 10
slotKey := util.SlotTable[slotNum]

require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import", slotNum, 0).Val())
require.NoError(t, rdb.Set(ctx, slotKey, "slot10_again", 0).Err())
require.Equal(t, "slot10_again", rdb.Get(ctx, slotKey).Val())
require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import", slotNum, 0).Val())
require.NoError(t, rdbB.Set(ctx, slotKey, "slot10_again", 0).Err())
require.Equal(t, "slot10_again", rdbB.Get(ctx, slotKey).Val())

// import error
require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import", slotNum, 2).Val())
require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import", slotNum, 2).Val())
time.Sleep(50 * time.Millisecond)

clusterInfo := rdb.ClusterInfo(ctx).Val()
clusterInfo := rdbB.ClusterInfo(ctx).Val()
require.Contains(t, clusterInfo, "importing_slot: 10")
require.Contains(t, clusterInfo, "import_state: error")

// get empty
require.Zero(t, rdb.Exists(ctx, slotKey).Val())
require.Zero(t, rdbB.Exists(ctx, slotKey).Val())
})

t.Run("IMPORT - connection broken", func(t *testing.T) {
slotNum := 11
slotKey := util.SlotTable[slotNum]
require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import", slotNum, 0).Val())
require.NoError(t, rdb.Set(ctx, slotKey, "slot11", 0).Err())
require.Equal(t, "slot11", rdb.Get(ctx, slotKey).Val())
require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import", slotNum, 0).Val())
require.NoError(t, rdbB.Set(ctx, slotKey, "slot11", 0).Err())
require.Equal(t, "slot11", rdbB.Get(ctx, slotKey).Val())

// close connection, server will stop importing
require.NoError(t, rdb.Close())
rdb = srv.NewClient()
require.NoError(t, rdbB.Close())
rdbB = srvB.NewClient()
time.Sleep(50 * time.Millisecond)

clusterInfo := rdb.ClusterInfo(ctx).Val()
clusterInfo := rdbB.ClusterInfo(ctx).Val()
require.Contains(t, clusterInfo, "importing_slot: 11")
require.Contains(t, clusterInfo, "import_state: error")

// get empty
require.Zero(t, rdb.Exists(ctx, slotKey).Val())
require.Zero(t, rdbB.Exists(ctx, slotKey).Val())
})
}

0 comments on commit bb665f6

Please sign in to comment.