diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc index bb6c30de752..686b63a0566 100644 --- a/src/cluster/cluster.cc +++ b/src/cluster/cluster.cc @@ -318,6 +318,10 @@ Status Cluster::ImportSlot(redis::Connection *conn, int slot, int state) { if (!IsValidSlot(slot)) { return {Status::NotOK, errSlotOutOfRange}; } + auto source_node = srv_->cluster->slots_nodes_[slot]; + if (source_node && source_node->id == myid_) { + return {Status::NotOK, "Can't import slot which belongs to me"}; + } Status s; switch (state) { diff --git a/src/cluster/slot_import.cc b/src/cluster/slot_import.cc index a3988e76840..dcc97d66351 100644 --- a/src/cluster/slot_import.cc +++ b/src/cluster/slot_import.cc @@ -31,6 +31,10 @@ SlotImport::SlotImport(Server *srv) Status SlotImport::Start(int slot) { std::lock_guard guard(mutex_); if (import_status_ == kImportStart) { + // return ok if the same slot is importing + if (import_slot_ == slot) { + return Status::OK(); + } return {Status::NotOK, fmt::format("only one importing slot is allowed, current slot is: {}", import_slot_)}; } diff --git a/tests/gocase/integration/slotimport/slotimport_test.go b/tests/gocase/integration/slotimport/slotimport_test.go index 9a1830faf58..1d427b807fd 100644 --- a/tests/gocase/integration/slotimport/slotimport_test.go +++ b/tests/gocase/integration/slotimport/slotimport_test.go @@ -22,6 +22,7 @@ package slotimport import ( "context" "fmt" + "strings" "testing" "time" @@ -61,23 +62,39 @@ func TestImportSlaveServer(t *testing.T) { func TestImportedServer(t *testing.T) { ctx := context.Background() - srv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) - defer func() { srv.Close() }() - rdb := srv.NewClient() - defer func() { require.NoError(t, rdb.Close()) }() - srvID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" - clusterNodes := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383", srvID, srv.Port()) - require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODEID", srvID).Err()) - require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + srvA := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { srvA.Close() }() + rdbA := srvA.NewClient() + defer func() { require.NoError(t, rdbA.Close()) }() + srvAID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, rdbA.Do(ctx, "clusterx", "SETNODEID", srvAID).Err()) + + srvB := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { srvB.Close() }() + rdbB := srvB.NewClient() + defer func() { require.NoError(t, rdbB.Close()) }() + srvBID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, rdbB.Do(ctx, "clusterx", "SETNODEID", srvBID).Err()) + + clusterNodes := fmt.Sprintf("%s 127.0.0.1 %d master - 0-8191", srvAID, srvA.Port()) + clusterNodes = fmt.Sprintf("%s\n%s 127.0.0.1 %d master - 8192-16383", clusterNodes, srvBID, srvB.Port()) + + require.NoError(t, rdbA.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, rdbB.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) t.Run("IMPORT - error slot", func(t *testing.T) { - require.ErrorContains(t, rdb.Do(ctx, "cluster", "import", -1, 0).Err(), "Slot is out of range") - require.ErrorContains(t, rdb.Do(ctx, "cluster", "import", 16384, 0).Err(), "Slot is out of range") + require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", -1, 0).Err(), "Slot is out of range") + require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", 16384, 0).Err(), "Slot is out of range") }) t.Run("IMPORT - slot with error state", func(t *testing.T) { - require.ErrorContains(t, rdb.Do(ctx, "cluster", "import", 1, 4).Err(), "Invalid import state") - require.ErrorContains(t, rdb.Do(ctx, "cluster", "import", 1, -3).Err(), "Invalid import state") + require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", 1, 4).Err(), "Invalid import state") + require.ErrorContains(t, rdbA.Do(ctx, "cluster", "import", 1, -3).Err(), "Invalid import state") + }) + + t.Run("IMPORT - slot with wrong state", func(t *testing.T) { + require.Contains(t, rdbA.Do(ctx, "cluster", "import", 1, 0).Err(), + "Can't import slot which belongs to me") }) t.Run("IMPORT - slot states in right order", func(t *testing.T) { @@ -85,66 +102,73 @@ func TestImportedServer(t *testing.T) { slotKey := util.SlotTable[slotNum] // import start - require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import", slotNum, 0).Val()) - require.NoError(t, rdb.Set(ctx, slotKey, "slot1", 0).Err()) - require.Equal(t, "slot1", rdb.Get(ctx, slotKey).Val()) - clusterInfo := rdb.ClusterInfo(ctx).Val() + require.NoError(t, rdbA.Set(ctx, slotKey, "slot1", 0).Err()) + require.Equal(t, "slot1", rdbA.Get(ctx, slotKey).Val()) + require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import", slotNum, 0).Val()) + clusterInfo := rdbB.ClusterInfo(ctx).Val() require.Contains(t, clusterInfo, "importing_slot: 1") require.Contains(t, clusterInfo, "import_state: start") + clusterNodes := rdbB.ClusterNodes(ctx).Val() + require.Contains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]", slotNum, srvAID)) + + require.NoError(t, rdbA.Do(ctx, "clusterx", "migrate", slotNum, srvBID).Err()) + require.Eventually(t, func() bool { + clusterInfo := rdbA.ClusterInfo(context.Background()).Val() + return strings.Contains(clusterInfo, fmt.Sprintf("migrating_slot: %d", slotNum)) && + strings.Contains(clusterInfo, fmt.Sprintf("migrating_state: %s", "success")) + }, 5*time.Second, 100*time.Millisecond) - clusterNodes := rdb.ClusterNodes(ctx).Val() - require.Contains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]", slotNum, srvID)) // import success - require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import", slotNum, 1).Val()) - clusterInfo = rdb.ClusterInfo(ctx).Val() + require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import", slotNum, 1).Val()) + clusterInfo = rdbB.ClusterInfo(ctx).Val() require.Contains(t, clusterInfo, "importing_slot: 1") require.Contains(t, clusterInfo, "import_state: success") // import finish and should not contain the import section - clusterNodes = rdb.ClusterNodes(ctx).Val() - require.NotContains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]", slotNum, srvID)) + clusterNodes = rdbB.ClusterNodes(ctx).Val() + require.NotContains(t, clusterNodes, fmt.Sprintf("[%d-<-%s]", slotNum, srvAID)) time.Sleep(50 * time.Millisecond) - require.Equal(t, "slot1", rdb.Get(ctx, slotKey).Val()) + require.Equal(t, "slot1", rdbB.Get(ctx, slotKey).Val()) }) t.Run("IMPORT - slot state 'error'", func(t *testing.T) { slotNum := 10 slotKey := util.SlotTable[slotNum] - require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import", slotNum, 0).Val()) - require.NoError(t, rdb.Set(ctx, slotKey, "slot10_again", 0).Err()) - require.Equal(t, "slot10_again", rdb.Get(ctx, slotKey).Val()) + require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import", slotNum, 0).Val()) + require.NoError(t, rdbB.Set(ctx, slotKey, "slot10_again", 0).Err()) + require.Equal(t, "slot10_again", rdbB.Get(ctx, slotKey).Val()) // import error - require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import", slotNum, 2).Val()) + require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import", slotNum, 2).Val()) time.Sleep(50 * time.Millisecond) - clusterInfo := rdb.ClusterInfo(ctx).Val() + clusterInfo := rdbB.ClusterInfo(ctx).Val() require.Contains(t, clusterInfo, "importing_slot: 10") require.Contains(t, clusterInfo, "import_state: error") // get empty - require.Zero(t, rdb.Exists(ctx, slotKey).Val()) + require.Zero(t, rdbB.Exists(ctx, slotKey).Val()) }) t.Run("IMPORT - connection broken", func(t *testing.T) { slotNum := 11 slotKey := util.SlotTable[slotNum] - require.Equal(t, "OK", rdb.Do(ctx, "cluster", "import", slotNum, 0).Val()) - require.NoError(t, rdb.Set(ctx, slotKey, "slot11", 0).Err()) - require.Equal(t, "slot11", rdb.Get(ctx, slotKey).Val()) + require.Equal(t, "OK", rdbB.Do(ctx, "cluster", "import", slotNum, 0).Val()) + require.NoError(t, rdbB.Set(ctx, slotKey, "slot11", 0).Err()) + require.Equal(t, "slot11", rdbB.Get(ctx, slotKey).Val()) // close connection, server will stop importing - require.NoError(t, rdb.Close()) - rdb = srv.NewClient() + require.NoError(t, rdbB.Close()) + rdbB = srvB.NewClient() time.Sleep(50 * time.Millisecond) - clusterInfo := rdb.ClusterInfo(ctx).Val() + clusterInfo := rdbB.ClusterInfo(ctx).Val() require.Contains(t, clusterInfo, "importing_slot: 11") require.Contains(t, clusterInfo, "import_state: error") // get empty - require.Zero(t, rdb.Exists(ctx, slotKey).Val()) + require.Zero(t, rdbB.Exists(ctx, slotKey).Val()) }) }