Skip to content

Commit

Permalink
Add support for tag placement of metaleader
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Dec 19, 2024
1 parent f607366 commit 63bc8d5
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 59 deletions.
55 changes: 39 additions & 16 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -3066,29 +3065,53 @@ func (s *Server) getStepDownPreferredPlacement(group RaftNode, preferredServer s
return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("preferred server %q not known", preferredServer))
}
} else if placement != nil {
if len(placement.Tags) > 0 {
// Tags currently not supported.
return _EMPTY_, NewJSClusterTagsError()
}
cn := placement.Cluster
var peers []string
possiblePeers := make(map[*Peer]nodeInfo, len(group.Peers()))
ourID := group.ID()
for _, p := range group.Peers() {
if si, ok := s.nodeToInfo.Load(p.ID); ok && si != nil {
if ni := si.(nodeInfo); ni.offline || ni.cluster != cn || p.ID == ourID {
continue
if p == nil {
continue // ... shouldn't happen.
}
si, ok := s.nodeToInfo.Load(p.ID)
if !ok || si == nil {
continue
}
ni := si.(nodeInfo)
if ni.offline || p.ID == ourID {
continue
}
possiblePeers[p] = ni
}
// If cluster is specified, filter out anything not matching the cluster name.
if placement.Cluster != _EMPTY_ {
for p, si := range possiblePeers {
if si.cluster != placement.Cluster {
delete(possiblePeers, p)
}
}
}
// If tags are specified, filter out anything not matching all supplied tags.
if len(placement.Tags) > 0 {
for p, si := range possiblePeers {
matchesAll := true
for _, tag := range placement.Tags {
if matchesAll = matchesAll && si.tags.Contains(tag); !matchesAll {
break
}
}
if !matchesAll {
delete(possiblePeers, p)
}
peers = append(peers, p.ID)
}
}
if len(peers) == 0 {
// If there are no possible peers, return an error.
if len(possiblePeers) == 0 {
return _EMPTY_, NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected"))
}
// Randomize and select.
if len(peers) > 1 {
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })
// Take advantage of random map iteration order to select the preferred.
for p := range possiblePeers {
preferredLeader = p.ID
break
}
preferredLeader = peers[0]
}
return preferredLeader, nil
}
Expand Down
9 changes: 6 additions & 3 deletions server/jetstream_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,26 +326,29 @@ func createJetStreamTaggedSuperClusterWithGWProxy(t *testing.T, gwm gwProxyMap)
}

// Make first cluster AWS, US country code.
for _, s := range sc.clusterForName("C1").servers {
for i, s := range sc.clusterForName("C1").servers {
s.optsMu.Lock()
s.opts.Tags.Add("cloud:aws")
s.opts.Tags.Add("country:us")
s.opts.Tags.Add(fmt.Sprintf("node:%d", i+1))
s.optsMu.Unlock()
reset(s)
}
// Make second cluster GCP, UK country code.
for _, s := range sc.clusterForName("C2").servers {
for i, s := range sc.clusterForName("C2").servers {
s.optsMu.Lock()
s.opts.Tags.Add("cloud:gcp")
s.opts.Tags.Add("country:uk")
s.opts.Tags.Add(fmt.Sprintf("node:%d", i+1))
s.optsMu.Unlock()
reset(s)
}
// Make third cluster AZ, JP country code.
for _, s := range sc.clusterForName("C3").servers {
for i, s := range sc.clusterForName("C3").servers {
s.optsMu.Lock()
s.opts.Tags.Add("cloud:az")
s.opts.Tags.Add("country:jp")
s.opts.Tags.Add(fmt.Sprintf("node:%d", i+1))
s.optsMu.Unlock()
reset(s)
}
Expand Down
169 changes: 129 additions & 40 deletions server/jetstream_super_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,9 @@ import (
)

func TestJetStreamSuperClusterMetaPlacement(t *testing.T) {
sc := createJetStreamSuperCluster(t, 3, 3)
sc := createJetStreamTaggedSuperCluster(t)
defer sc.shutdown()

// We want to influence where the meta leader will place itself when we ask the
// current leader to stepdown.
ml := sc.leader()
cn := ml.ClusterName()
var pcn string
for _, c := range sc.clusters {
if c.name != cn {
pcn = c.name
break
}
}
sc.waitOnLeader()

// Client based API
s := sc.randomServer()
Expand All @@ -59,42 +48,142 @@ func TestJetStreamSuperClusterMetaPlacement(t *testing.T) {
}
defer nc.Close()

stepdown := func(cn string) *JSApiLeaderStepDownResponse {
req := &JSApiLeaderStepdownRequest{Placement: &Placement{Cluster: cn}}
jreq, err := json.Marshal(req)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
stepdown := func(cn string, tags []string) *JSApiLeaderStepDownResponse {
jreq, err := json.Marshal(&JSApiLeaderStepdownRequest{
Placement: &Placement{
Cluster: cn,
Tags: tags,
},
})
require_NoError(t, err)

resp, err := nc.Request(JSApiLeaderStepDown, jreq, time.Second)
if err != nil {
t.Fatalf("Error on stepdown request: %v", err)
}
require_NoError(t, err)

var sdr JSApiLeaderStepDownResponse
if err := json.Unmarshal(resp.Data, &sdr); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
require_NoError(t, json.Unmarshal(resp.Data, &sdr))
return &sdr
}

// Make sure we get correct errors for tags and bad or unavailable cluster placement.
sdr := stepdown("C22")
if sdr.Error == nil || !strings.Contains(sdr.Error.Description, "no replacement peer connected") {
t.Fatalf("Got incorrect error result: %+v", sdr.Error)
}
// Should work.
sdr = stepdown(pcn)
if sdr.Error != nil {
t.Fatalf("Got an error on stepdown: %+v", sdr.Error)
}
t.Run("Unavailable", func(t *testing.T) {
sdr := stepdown("C22", nil)
if sdr.Error == nil || !strings.Contains(sdr.Error.Description, "no replacement peer connected") {
t.Fatalf("Got incorrect error result: %+v", sdr.Error)
}
})

sc.waitOnLeader()
ml = sc.leader()
cn = ml.ClusterName()
// Influence the placement by using the cluster name.
t.Run("ByCluster", func(t *testing.T) {
ml := sc.leader()
require_NotNil(t, ml)

if cn != pcn {
t.Fatalf("Expected new metaleader to be in cluster %q, got %q", pcn, cn)
}
cn := ml.ClusterName()
var pcn string
for _, c := range sc.clusters {
if c.name != cn {
pcn = c.name
break
}
}

sdr := stepdown(pcn, nil)
require_Equal(t, sdr.Error, nil)

sc.waitOnLeader()
ml = sc.leader()
require_NotNil(t, ml)
require_Equal(t, ml.ClusterName(), pcn)
})

// Influence the placement by using tag names.
t.Run("ByTag", func(t *testing.T) {
ml := sc.leader()
require_NotNil(t, ml)

// Work out what the tags are of the current leader, so we can pick
// different ones.
possibleTags := map[string]struct{}{
"cloud:aws": {},
"cloud:gcp": {},
"cloud:az": {},
}
// Remove the current leader's tags from the list.
for _, tag := range ml.getOpts().Tags {
delete(possibleTags, tag)
}
// Now pick the first tag as our new chosen tag.
var chosenTag string
for tag := range possibleTags {
chosenTag = tag
break
}

sdr := stepdown(_EMPTY_, []string{chosenTag})
require_Equal(t, sdr.Error, nil)

sc.waitOnLeader()
ml = sc.leader()
require_NotNil(t, ml)
require_True(t, ml.getOpts().Tags.Contains(chosenTag))
})

// Influence the placement by using tag names, we need to match all of them.
t.Run("ByMultipleTags", func(t *testing.T) {
ml := sc.leader()
require_NotNil(t, ml)

// Work out what the tags are of the current leader, so we can pick
// different ones.
possibleTags := map[string]struct{}{
"cloud:aws": {},
"cloud:gcp": {},
"cloud:az": {},
}
// Remove the current leader's tags from the list.
for _, tag := range ml.getOpts().Tags {
delete(possibleTags, tag)
}
// Now pick the first tag as our new chosen tag.
var chosenTag string
for tag := range possibleTags {
chosenTag = tag
break
}

sdr := stepdown(_EMPTY_, []string{chosenTag, "node:1"})
require_Equal(t, sdr.Error, nil)

sc.waitOnLeader()
ml = sc.leader()
require_NotNil(t, ml)
require_True(t, ml.getOpts().Tags.Contains(chosenTag))
require_True(t, ml.getOpts().Tags.Contains("node:1"))
})

// Influence the placement by using the cluster name and a tag.
t.Run("ByClusterAndTag", func(t *testing.T) {
ml := sc.leader()
require_NotNil(t, ml)

cn := ml.ClusterName()
var pcn string
for _, c := range sc.clusters {
if c.name != cn {
pcn = c.name
break
}
}

sdr := stepdown(pcn, []string{"node:1"})
require_Equal(t, sdr.Error, nil)

sc.waitOnLeader()
ml = sc.leader()
require_NotNil(t, ml)
require_Equal(t, ml.ClusterName(), pcn)
require_True(t, ml.getOpts().Tags.Contains("node:1"))
})
}

func TestJetStreamSuperClusterUniquePlacementTag(t *testing.T) {
Expand Down

0 comments on commit 63bc8d5

Please sign in to comment.