diff --git a/server/consumer.go b/server/consumer.go index 57acc91d4f..552f7f457b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -829,6 +829,12 @@ func checkConsumerCfg( } } } + + // For now don't allow preferred server in placement. + if cfg.Placement != nil && cfg.Placement.Preferred != _EMPTY_ { + return NewJSStreamInvalidConfigError(fmt.Errorf("preferred server not permitted in placement")) + } + return nil } diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 3ce8a632b6..9d29776fb9 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -20,7 +20,6 @@ import ( "errors" "fmt" "io" - "math/rand" "os" "path/filepath" "runtime" @@ -2987,34 +2986,9 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) return } - if req.Placement != nil { - if len(req.Placement.Tags) > 0 { - // Tags currently not supported. - resp.Error = NewJSClusterTagsError() - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - cn := req.Placement.Cluster - var peers []string - ourID := cc.meta.ID() - for _, p := range cc.meta.Peers() { - if si, ok := s.nodeToInfo.Load(p.ID); ok && si != nil { - if ni := si.(nodeInfo); ni.offline || ni.cluster != cn || p.ID == ourID { - continue - } - peers = append(peers, p.ID) - } - } - if len(peers) == 0 { - resp.Error = NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected")) - s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) - return - } - // Randomize and select. - if len(peers) > 1 { - rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] }) - } - preferredLeader = peers[0] + if preferredLeader, resp.Error = s.getStepDownPreferredPlacement(cc.meta, req.Placement); resp.Error != nil { + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return } } @@ -3066,6 +3040,84 @@ func isEmptyRequest(req []byte) bool { return len(vm) == 0 } +// getStepDownPreferredPlacement attempts to work out what the best placement is +// for a stepdown request. The preferred server name always takes precedence, but +// if not specified, the placement will be used to filter by cluster. The caller +// should check for return API errors and return those to the requestor if needed. +func (s *Server) getStepDownPreferredPlacement(group RaftNode, placement *Placement) (string, *ApiError) { + if placement == nil { + return _EMPTY_, nil + } + var preferredLeader string + if placement.Preferred != _EMPTY_ { + for _, p := range group.Peers() { + si, ok := s.nodeToInfo.Load(p.ID) + if !ok || si == nil { + continue + } + if si.(nodeInfo).name == placement.Preferred { + preferredLeader = p.ID + break + } + } + if preferredLeader == group.ID() { + return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q is already leader", placement.Preferred)) + } + if preferredLeader == _EMPTY_ { + return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q not known", placement.Preferred)) + } + } else { + possiblePeers := make(map[*Peer]nodeInfo, len(group.Peers())) + ourID := group.ID() + for _, p := range group.Peers() { + if p == nil { + continue // ... shouldn't happen. + } + si, ok := s.nodeToInfo.Load(p.ID) + if !ok || si == nil { + continue + } + ni := si.(nodeInfo) + if ni.offline || p.ID == ourID { + continue + } + possiblePeers[p] = ni + } + // If cluster is specified, filter out anything not matching the cluster name. + if placement.Cluster != _EMPTY_ { + for p, si := range possiblePeers { + if si.cluster != placement.Cluster { + delete(possiblePeers, p) + } + } + } + // If tags are specified, filter out anything not matching all supplied tags. + if len(placement.Tags) > 0 { + for p, si := range possiblePeers { + matchesAll := true + for _, tag := range placement.Tags { + if matchesAll = matchesAll && si.tags.Contains(tag); !matchesAll { + break + } + } + if !matchesAll { + delete(possiblePeers, p) + } + } + } + // If there are no possible peers, return an error. + if len(possiblePeers) == 0 { + return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected")) + } + // Take advantage of random map iteration order to select the preferred. + for p := range possiblePeers { + preferredLeader = p.ID + break + } + } + return preferredLeader, nil +} + // Request to delete a stream. func (s *Server) jsStreamDeleteRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { if c == nil || !s.JetStreamEnabled() { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 5a176b0efb..e5de479eb9 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -79,8 +79,9 @@ type inflightInfo struct { // Used to guide placement of streams and meta controllers in clustered JetStream. type Placement struct { - Cluster string `json:"cluster,omitempty"` - Tags []string `json:"tags,omitempty"` + Cluster string `json:"cluster,omitempty"` + Tags []string `json:"tags,omitempty"` + Preferred string `json:"preferred,omitempty"` } // Define types of the entry. diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index bfd0590ff8..2e3e63a80f 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -5153,3 +5153,79 @@ func TestJetStreamClusterExpectedPerSubjectConsistency(t *testing.T) { require_Len(t, len(mset.expectedPerSubjectSequence), 0) require_Len(t, len(mset.expectedPerSubjectInProcess), 0) } + +func TestJetStreamClusterMetaStepdownPreferred(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!")) + defer nc.Close() + + // We know of the preferred server and will successfully hand over to it. + t.Run("KnownPreferred", func(t *testing.T) { + leader := c.leader() + var preferred *Server + for _, s := range c.servers { + if s == leader { + continue + } + preferred = s + break + } + + body, err := json.Marshal(JSApiLeaderStepdownRequest{ + Placement: &Placement{ + Preferred: preferred.Name(), + }, + }) + require_NoError(t, err) + + resp, err := nc.Request(JSApiLeaderStepDown, body, time.Second) + require_NoError(t, err) + + var apiresp JSApiLeaderStepDownResponse + require_NoError(t, json.Unmarshal(resp.Data, &apiresp)) + require_True(t, apiresp.Success) + require_Equal(t, apiresp.Error, nil) + + c.waitOnLeader() + require_Equal(t, preferred, c.leader()) + }) + + // We don't know of a server that matches that name so the stepdown fails. + t.Run("UnknownPreferred", func(t *testing.T) { + body, err := json.Marshal(JSApiLeaderStepdownRequest{ + Placement: &Placement{ + Preferred: "i_dont_exist", + }, + }) + require_NoError(t, err) + + resp, err := nc.Request(JSApiLeaderStepDown, body, time.Second) + require_NoError(t, err) + + var apiresp JSApiLeaderStepDownResponse + require_NoError(t, json.Unmarshal(resp.Data, &apiresp)) + require_False(t, apiresp.Success) + require_NotNil(t, apiresp.Error) + require_Equal(t, ErrorIdentifier(apiresp.Error.ErrCode), JSClusterNoPeersErrF) + }) + + // The preferred server happens to already be the leader so the stepdown fails. + t.Run("SamePreferred", func(t *testing.T) { + body, err := json.Marshal(JSApiLeaderStepdownRequest{ + Placement: &Placement{ + Preferred: c.leader().Name(), + }, + }) + require_NoError(t, err) + + resp, err := nc.Request(JSApiLeaderStepDown, body, time.Second) + require_NoError(t, err) + + var apiresp ApiResponse + require_NoError(t, json.Unmarshal(resp.Data, &apiresp)) + require_NotNil(t, apiresp.Error) + require_Equal(t, ErrorIdentifier(apiresp.Error.ErrCode), JSClusterNoPeersErrF) + }) +} diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 1512012a0b..fccb2646da 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -326,26 +326,29 @@ func createJetStreamTaggedSuperClusterWithGWProxy(t *testing.T, gwm gwProxyMap) } // Make first cluster AWS, US country code. - for _, s := range sc.clusterForName("C1").servers { + for i, s := range sc.clusterForName("C1").servers { s.optsMu.Lock() s.opts.Tags.Add("cloud:aws") s.opts.Tags.Add("country:us") + s.opts.Tags.Add(fmt.Sprintf("node:%d", i+1)) s.optsMu.Unlock() reset(s) } // Make second cluster GCP, UK country code. - for _, s := range sc.clusterForName("C2").servers { + for i, s := range sc.clusterForName("C2").servers { s.optsMu.Lock() s.opts.Tags.Add("cloud:gcp") s.opts.Tags.Add("country:uk") + s.opts.Tags.Add(fmt.Sprintf("node:%d", i+1)) s.optsMu.Unlock() reset(s) } // Make third cluster AZ, JP country code. - for _, s := range sc.clusterForName("C3").servers { + for i, s := range sc.clusterForName("C3").servers { s.optsMu.Lock() s.opts.Tags.Add("cloud:az") s.opts.Tags.Add("country:jp") + s.opts.Tags.Add(fmt.Sprintf("node:%d", i+1)) s.optsMu.Unlock() reset(s) } diff --git a/server/jetstream_super_cluster_test.go b/server/jetstream_super_cluster_test.go index 3f6928457c..c8edd7151b 100644 --- a/server/jetstream_super_cluster_test.go +++ b/server/jetstream_super_cluster_test.go @@ -35,21 +35,10 @@ import ( "github.com/nats-io/nkeys" ) -func TestJetStreamSuperClusterMetaPlacement(t *testing.T) { - sc := createJetStreamSuperCluster(t, 3, 3) +func TestJetStreamSuperClusterMetaStepDown(t *testing.T) { + sc := createJetStreamTaggedSuperCluster(t) defer sc.shutdown() - - // We want to influence where the meta leader will place itself when we ask the - // current leader to stepdown. - ml := sc.leader() - cn := ml.ClusterName() - var pcn string - for _, c := range sc.clusters { - if c.name != cn { - pcn = c.name - break - } - } + sc.waitOnLeader() // Client based API s := sc.randomServer() @@ -59,42 +48,195 @@ func TestJetStreamSuperClusterMetaPlacement(t *testing.T) { } defer nc.Close() - stepdown := func(cn string) *JSApiLeaderStepDownResponse { - req := &JSApiLeaderStepdownRequest{Placement: &Placement{Cluster: cn}} - jreq, err := json.Marshal(req) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } + stepdown := func(preferred, cn string, tags []string) *JSApiLeaderStepDownResponse { + jreq, err := json.Marshal(&JSApiLeaderStepdownRequest{ + Placement: &Placement{ + Cluster: cn, + Tags: tags, + Preferred: preferred, + }, + }) + require_NoError(t, err) resp, err := nc.Request(JSApiLeaderStepDown, jreq, time.Second) - if err != nil { - t.Fatalf("Error on stepdown request: %v", err) - } + require_NoError(t, err) + var sdr JSApiLeaderStepDownResponse - if err := json.Unmarshal(resp.Data, &sdr); err != nil { - t.Fatalf("Unexpected error: %v", err) - } + require_NoError(t, json.Unmarshal(resp.Data, &sdr)) return &sdr } + // Make sure we get correct errors for clusters we don't know about. + t.Run("UnknownCluster", func(t *testing.T) { + sdr := stepdown(_EMPTY_, "ThisClusterDoesntExist", nil) + require_NotNil(t, sdr.Error) + require_Equal(t, sdr.Error.Code, 400) + require_Equal(t, ErrorIdentifier(sdr.Error.ErrCode), JSClusterNoPeersErrF) + }) + + // Make sure we get correct errors for servers we don't know about. + t.Run("UnknownPreferredServer", func(t *testing.T) { + sdr := stepdown("ThisServerDoesntExist", _EMPTY_, nil) + require_NotNil(t, sdr.Error) + require_Equal(t, sdr.Error.Code, 400) + require_Equal(t, ErrorIdentifier(sdr.Error.ErrCode), JSClusterNoPeersErrF) + }) + + // Make sure we get correct errors for tags that don't match any servers. + t.Run("UnknownTag", func(t *testing.T) { + sdr := stepdown(_EMPTY_, _EMPTY_, []string{"thistag:doesntexist"}) + require_NotNil(t, sdr.Error) + require_Equal(t, sdr.Error.Code, 400) + require_Equal(t, ErrorIdentifier(sdr.Error.ErrCode), JSClusterNoPeersErrF) + }) + + // Make sure we get correct errors for servers that are already leader. + t.Run("PreferredServerAlreadyLeader", func(t *testing.T) { + ml := sc.leader() + require_NotNil(t, ml) + + sdr := stepdown(ml.Name(), _EMPTY_, nil) + require_NotNil(t, sdr.Error) + require_Equal(t, sdr.Error.Code, 400) + require_Equal(t, ErrorIdentifier(sdr.Error.ErrCode), JSClusterNoPeersErrF) + }) + // Make sure we get correct errors for tags and bad or unavailable cluster placement. - sdr := stepdown("C22") - if sdr.Error == nil || !strings.Contains(sdr.Error.Description, "no replacement peer connected") { - t.Fatalf("Got incorrect error result: %+v", sdr.Error) - } - // Should work. - sdr = stepdown(pcn) - if sdr.Error != nil { - t.Fatalf("Got an error on stepdown: %+v", sdr.Error) - } + t.Run("PlacementByPreferredServer", func(t *testing.T) { + ml := sc.leader() + require_NotNil(t, ml) - sc.waitOnLeader() - ml = sc.leader() - cn = ml.ClusterName() + var preferredServer string + clusters: + for _, c := range sc.clusters { + for _, s := range c.servers { + if s == ml { + continue + } + preferredServer = s.Name() + break clusters + } + } - if cn != pcn { - t.Fatalf("Expected new metaleader to be in cluster %q, got %q", pcn, cn) - } + sdr := stepdown(preferredServer, _EMPTY_, nil) + require_Equal(t, sdr.Error, nil) + + sc.waitOnLeader() + ml = sc.leader() + require_Equal(t, ml.Name(), preferredServer) + }) + + // Influence the placement by using the cluster name. + t.Run("PlacementByCluster", func(t *testing.T) { + ml := sc.leader() + require_NotNil(t, ml) + + cn := ml.ClusterName() + var pcn string + for _, c := range sc.clusters { + if c.name != cn { + pcn = c.name + break + } + } + + sdr := stepdown(_EMPTY_, pcn, nil) + require_Equal(t, sdr.Error, nil) + + sc.waitOnLeader() + ml = sc.leader() + require_NotNil(t, ml) + require_Equal(t, ml.ClusterName(), pcn) + }) + + // Influence the placement by using tag names. + t.Run("PlacementByTag", func(t *testing.T) { + ml := sc.leader() + require_NotNil(t, ml) + + // Work out what the tags are of the current leader, so we can pick + // different ones. + possibleTags := map[string]struct{}{ + "cloud:aws": {}, + "cloud:gcp": {}, + "cloud:az": {}, + } + // Remove the current leader's tags from the list. + for _, tag := range ml.getOpts().Tags { + delete(possibleTags, tag) + } + // Now pick the first tag as our new chosen tag. + var chosenTag string + for tag := range possibleTags { + chosenTag = tag + break + } + + sdr := stepdown(_EMPTY_, _EMPTY_, []string{chosenTag}) + require_Equal(t, sdr.Error, nil) + + sc.waitOnLeader() + ml = sc.leader() + require_NotNil(t, ml) + require_True(t, ml.getOpts().Tags.Contains(chosenTag)) + }) + + // Influence the placement by using tag names, we need to match all of them. + t.Run("PlacementByMultipleTags", func(t *testing.T) { + ml := sc.leader() + require_NotNil(t, ml) + + // Work out what the tags are of the current leader, so we can pick + // different ones. + possibleTags := map[string]struct{}{ + "cloud:aws": {}, + "cloud:gcp": {}, + "cloud:az": {}, + } + // Remove the current leader's tags from the list. + for _, tag := range ml.getOpts().Tags { + delete(possibleTags, tag) + } + // Now pick the first tag as our new chosen tag. + var chosenTag string + for tag := range possibleTags { + chosenTag = tag + break + } + + sdr := stepdown(_EMPTY_, _EMPTY_, []string{chosenTag, "node:1"}) + require_Equal(t, sdr.Error, nil) + + sc.waitOnLeader() + ml = sc.leader() + require_NotNil(t, ml) + require_True(t, ml.getOpts().Tags.Contains(chosenTag)) + require_True(t, ml.getOpts().Tags.Contains("node:1")) + }) + + // Influence the placement by using the cluster name and a tag. + t.Run("PlacementByClusterAndTag", func(t *testing.T) { + ml := sc.leader() + require_NotNil(t, ml) + + cn := ml.ClusterName() + var pcn string + for _, c := range sc.clusters { + if c.name != cn { + pcn = c.name + break + } + } + + sdr := stepdown(_EMPTY_, pcn, []string{"node:1"}) + require_Equal(t, sdr.Error, nil) + + sc.waitOnLeader() + ml = sc.leader() + require_NotNil(t, ml) + require_Equal(t, ml.ClusterName(), pcn) + require_True(t, ml.getOpts().Tags.Contains("node:1")) + }) } func TestJetStreamSuperClusterUniquePlacementTag(t *testing.T) { diff --git a/server/stream.go b/server/stream.go index 84a8a05a42..995d129363 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1691,6 +1691,11 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account, pedantic boo } } + // For now don't allow preferred server in placement. + if cfg.Placement != nil && cfg.Placement.Preferred != _EMPTY_ { + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("preferred server not permitted in placement")) + } + return cfg, nil }