Skip to content

Commit

Permalink
fix(cluster): should remove the master if it's not node in the cluster
Browse files Browse the repository at this point in the history
Currently, the replica won't remove the master replication while it's
not a node in the clsuter, which an unexpcted behavior for the living
master node in the cluster.
  • Loading branch information
git-hulk committed Nov 3, 2024
1 parent 2f520f9 commit de20bb7
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 24 deletions.
19 changes: 10 additions & 9 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ Status Cluster::SetNodeId(const std::string &node_id) {
}

// Set replication relationship
if (myself_) return SetMasterSlaveRepl();

return Status::OK();
return SetMasterSlaveRepl();
}

// The reason why the new version MUST be +1 of current version is that,
Expand Down Expand Up @@ -204,11 +202,8 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
}

// Set replication relationship
if (myself_) {
s = SetMasterSlaveRepl();
if (!s.IsOK()) {
return s.Prefixed("failed to set master-replica replication");
}
if (auto s = SetMasterSlaveRepl(); !s.IsOK()) {
return s.Prefixed("failed to set master-replica replication");
}

// Clear data of migrated slots
Expand All @@ -234,7 +229,13 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
Status Cluster::SetMasterSlaveRepl() {
if (!srv_) return Status::OK();

if (!myself_) return Status::OK();
// If the node is not in the cluster topology, remove the master replication if it's a replica.
if (!myself_) {
if (auto s = srv_->RemoveMaster(); !s.IsOK()) {
return s.Prefixed("failed to remove master");
}
return Status::OK();
}

if (myself_->role == kClusterMaster) {
// Master mode
Expand Down
51 changes: 36 additions & 15 deletions tests/gocase/integration/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,58 +154,79 @@ func TestClusterNodes(t *testing.T) {
}

func TestClusterReplicas(t *testing.T) {
srv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
defer srv.Close()

ctx := context.Background()
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()
srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
rdb1 := srv1.NewClient()
srv2 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
rdb2 := srv2.NewClient()

defer func() {
srv1.Close()
srv2.Close()
require.NoError(t, rdb1.Close())
require.NoError(t, rdb2.Close())
}()

nodes := ""

master1ID := "bb2e5b3c5282086df51eff6b3e35519aede96fa6"
master1Node := fmt.Sprintf("%s %s %d master - 0-8191", master1ID, srv.Host(), srv.Port())
master1Node := fmt.Sprintf("%s %s %d master - 0-8191", master1ID, srv1.Host(), srv1.Port())
nodes += master1Node + "\n"

master2ID := "159dde1194ebf5bfc5a293dff839c3d1476f2a49"
master2Node := fmt.Sprintf("%s %s %d master - 8192-16383", master2ID, srv.Host(), srv.Port())
master2Node := fmt.Sprintf("%s %s %d master - 8192-16383", master2ID, srv1.Host(), srv1.Port())
nodes += master2Node + "\n"

replica2ID := "7dbee3d628f04cc5d763b36e92b10533e627a1d0"
replica2Node := fmt.Sprintf("%s %s %d slave %s", replica2ID, srv.Host(), srv.Port(), master2ID)
replica2Node := fmt.Sprintf("%s %s %d slave %s", replica2ID, srv2.Host(), srv2.Port(), master2ID)
nodes += replica2Node

require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODES", nodes, "2").Err())
require.EqualValues(t, "2", rdb.Do(ctx, "clusterx", "version").Val())
require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", nodes, "2").Err())
require.EqualValues(t, "2", rdb1.Do(ctx, "clusterx", "version").Val())
require.NoError(t, rdb2.Do(ctx, "clusterx", "SETNODES", nodes, "2").Err())

t.Run("with replicas", func(t *testing.T) {
replicas, err := rdb.Do(ctx, "cluster", "replicas", "159dde1194ebf5bfc5a293dff839c3d1476f2a49").Text()
replicas, err := rdb1.Do(ctx, "cluster", "replicas", "159dde1194ebf5bfc5a293dff839c3d1476f2a49").Text()
require.NoError(t, err)
fields := strings.Split(replicas, " ")
require.Len(t, fields, 8)
require.Equal(t, fmt.Sprintf("%s@%d", srv.HostPort(), srv.Port()+10000), fields[1])
require.Equal(t, fmt.Sprintf("%s@%d", srv2.HostPort(), srv2.Port()+10000), fields[1])
require.Equal(t, "slave", fields[2])
require.Equal(t, master2ID, fields[3])
require.Equal(t, "connected\n", fields[7])
})

t.Run("without replicas", func(t *testing.T) {
replicas, err := rdb.Do(ctx, "cluster", "replicas", "bb2e5b3c5282086df51eff6b3e35519aede96fa6").Text()
replicas, err := rdb1.Do(ctx, "cluster", "replicas", "bb2e5b3c5282086df51eff6b3e35519aede96fa6").Text()
require.NoError(t, err)
require.Empty(t, replicas)
})

t.Run("send command to replica", func(t *testing.T) {
err := rdb.Do(ctx, "cluster", "replicas", "7dbee3d628f04cc5d763b36e92b10533e627a1d0").Err()
err := rdb1.Do(ctx, "cluster", "replicas", "7dbee3d628f04cc5d763b36e92b10533e627a1d0").Err()
require.Error(t, err)
require.Contains(t, err.Error(), "The node isn't a master")
})

t.Run("unknown node", func(t *testing.T) {
err := rdb.Do(ctx, "cluster", "replicas", "unknown").Err()
err := rdb1.Do(ctx, "cluster", "replicas", "unknown").Err()
require.Error(t, err)
require.Contains(t, err.Error(), "Invalid cluster node id")
})

t.Run("remove the replication if the node is not in the cluster", func(t *testing.T) {
require.Equal(t, "slave", util.FindInfoEntry(rdb2, "role"))
// remove the cluster replica node
clusterNode := fmt.Sprintf("%s\n%s", master1Node, master2Node)
err := rdb1.Do(ctx, "clusterx", "SETNODES", clusterNode, "3").Err()
require.NoError(t, err)
err = rdb2.Do(ctx, "clusterx", "SETNODES", clusterNode, "3").Err()
require.NoError(t, err)

require.Eventually(t, func() bool {
return util.FindInfoEntry(rdb2, "role") == "master"
}, 5*time.Second, 100*time.Millisecond)
})
}

func TestClusterDumpAndLoadClusterNodesInfo(t *testing.T) {
Expand Down

0 comments on commit de20bb7

Please sign in to comment.