Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(2.11) Don't send meta snapshot when becoming metaleader #5700

Merged
merged 3 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1637,6 +1637,16 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool {
return false
}

const (
defaultConsumerNotActiveStartInterval = 30 * time.Second
defaultConsumerNotActiveMaxInterval = 5 * time.Minute
)

var (
consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval
consumerNotActiveMaxInterval = defaultConsumerNotActiveMaxInterval
)

func (o *consumer) deleteNotActive() {
o.mu.Lock()
if o.mset == nil {
Expand Down Expand Up @@ -1702,12 +1712,8 @@ func (o *consumer) deleteNotActive() {
// Check to make sure we went away.
// Don't think this needs to be a monitored go routine.
go func() {
const (
startInterval = 30 * time.Second
maxInterval = 5 * time.Minute
)
jitter := time.Duration(rand.Int63n(int64(startInterval)))
interval := startInterval + jitter
jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval)))
interval := consumerNotActiveStartInterval + jitter
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
Expand All @@ -1722,7 +1728,7 @@ func (o *consumer) deleteNotActive() {
if nca != nil && nca == ca {
s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name)
meta.ForwardProposal(removeEntry)
if interval < maxInterval {
if interval < consumerNotActiveMaxInterval {
interval *= 2
ticker.Reset(interval)
}
Expand Down
4 changes: 0 additions & 4 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1406,10 +1406,6 @@ func (js *jetStream) monitorCluster() {
aq.recycle(&ces)

case isLeader = <-lch:
// For meta layer synchronize everyone to our state on becoming leader.
if isLeader && n.ApplyQ().len() == 0 {
n.SendSnapshot(js.metaSnapshot())
}
// Process the change.
js.processLeaderChange(isLeader)
if isLeader {
Expand Down
11 changes: 9 additions & 2 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,11 @@ func TestJetStreamClusterParallelConsumerCreation(t *testing.T) {
}

func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
consumerNotActiveStartInterval = time.Second * 5
defer func() {
consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval
}()

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down Expand Up @@ -1632,6 +1637,7 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
time.Sleep(2 * time.Second)

// Restart first and wait so that we know it will try cleanup without a metaleader.
// It will fail as there's no metaleader at that time, it should keep retrying on an interval.
c.restartServer(rs)
time.Sleep(time.Second)

Expand All @@ -1643,8 +1649,9 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
defer nc.Close()

subj := fmt.Sprintf(JSApiConsumerListT, "TEST")
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
m, err := nc.Request(subj, nil, time.Second)
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
// Request will take at most 4 seconds if some consumers can't be found.
m, err := nc.Request(subj, nil, 5*time.Second)
if err != nil {
return err
}
Expand Down
78 changes: 78 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"math/rand"
"os"
"path"
"path/filepath"
"runtime"
"slices"
Expand Down Expand Up @@ -4159,3 +4162,78 @@ func TestJetStreamClusterDesyncAfterCatchupTooManyRetries(t *testing.T) {
newStreamLeaderServer := c.streamLeader(globalAccountName, "TEST")
require_Equal(t, newStreamLeaderServer.Name(), clusterResetServerName)
}

func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

copyDir := func(dst, src string) error {
srcFS := os.DirFS(src)
return fs.WalkDir(srcFS, ".", func(p string, d os.DirEntry, err error) error {
if err != nil {
return err
}
newPath := path.Join(dst, p)
if d.IsDir() {
return os.MkdirAll(newPath, defaultDirPerms)
}
r, err := srcFS.Open(p)
if err != nil {
return err
}
defer r.Close()

w, err := os.OpenFile(newPath, os.O_CREATE|os.O_WRONLY, defaultFilePerms)
if err != nil {
return err
}
defer w.Close()
_, err = io.Copy(w, r)
return err
})
}

// Simulate being hard killed by:
// 1. copy directories before shutdown
copyToSrcMap := make(map[string]string)
for _, s := range c.servers {
sd := s.StoreDir()
copySd := path.Join(t.TempDir(), JetStreamStoreDir)
err = copyDir(copySd, sd)
require_NoError(t, err)
copyToSrcMap[copySd] = sd
}

// 2. stop all
nc.Close()
c.stopAll()

// 3. revert directories to before shutdown
for cp, dest := range copyToSrcMap {
err = os.RemoveAll(dest)
require_NoError(t, err)
err = copyDir(dest, cp)
require_NoError(t, err)
}

// 4. restart
c.restartAll()
c.waitOnAllCurrent()

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

// Stream should exist still and not be removed after hard killing all servers, so expect no error.
_, err = js.StreamInfo("TEST")
require_NoError(t, err)
}
26 changes: 13 additions & 13 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6577,6 +6577,11 @@ func TestNoRaceJetStreamConsumerCreateTimeNumPending(t *testing.T) {
}

func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) {
consumerNotActiveStartInterval = time.Second * 5
defer func() {
consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval
}()

c := createJetStreamClusterExplicit(t, "GHOST", 3)
defer c.shutdown()

Expand Down Expand Up @@ -6670,22 +6675,17 @@ func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) {
time.Sleep(5 * time.Second)
cancel()

getMissing := func() []string {
m, err := nc.Request("$JS.API.CONSUMER.LIST.TEST", nil, time.Second*10)
require_NoError(t, err)

checkFor(t, 30*time.Second, time.Second, func() error {
m, err := nc.Request("$JS.API.CONSUMER.LIST.TEST", nil, time.Second)
if err != nil {
return err
}
var resp JSApiConsumerListResponse
err = json.Unmarshal(m.Data, &resp)
require_NoError(t, err)
return resp.Missing
}

checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
missing := getMissing()
if len(missing) == 0 {
require_NoError(t, json.Unmarshal(m.Data, &resp))
if len(resp.Missing) == 0 {
return nil
}
return fmt.Errorf("Still have missing: %+v", missing)
return fmt.Errorf("Still have missing: %+v", resp.Missing)
})
}

Expand Down