Skip to content

Commit

Permalink
(2.11) Don't send meta snapshot when becoming metaleader (#5700)
Browse files Browse the repository at this point in the history
Antithesis testing has found that late or out-of-order delivery of these
snapshots, likely due to latency or thread pauses, can cause stream
assignments to be reverted which results in assets being deleted and
recreated. There may also be a race condition where the metalayer comes
up before network connectivity to all other nodes is fully established
so we may end up generating snapshots that don't include assets we don't
know about yet.

We will want to audit all uses of `SendSnapshot` as it somewhat breaks
the consistency model, especially now that we have fixed a significant
number of Raft bugs that `SendSnapshot` usage may have been papering
over.

Further Antithesis runs without this code run fine and have eliminated a
number of unexpected calls to `processStreamRemoval`.

We've also added a new unit test
`TestJetStreamClusterHardKillAfterStreamAdd` for a long-known issue, as
well as a couple tweaks to the ghost consumer tests to make them
reliable.

Signed-off-by: Neil Twigg <neil@nats.io>

---------

Signed-off-by: Neil Twigg <neil@nats.io>
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
Co-authored-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
neilalexander and MauriceVanVeen committed Nov 19, 2024
1 parent b3098ea commit 74e8cfd
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 18 deletions.
20 changes: 13 additions & 7 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1563,6 +1563,16 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool {
return false
}

const (
defaultConsumerNotActiveStartInterval = 30 * time.Second
defaultConsumerNotActiveMaxInterval = 5 * time.Minute
)

var (
consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval
consumerNotActiveMaxInterval = defaultConsumerNotActiveMaxInterval
)

func (o *consumer) deleteNotActive() {
o.mu.Lock()
if o.mset == nil {
Expand Down Expand Up @@ -1628,12 +1638,8 @@ func (o *consumer) deleteNotActive() {
// Check to make sure we went away.
// Don't think this needs to be a monitored go routine.
go func() {
const (
startInterval = 30 * time.Second
maxInterval = 5 * time.Minute
)
jitter := time.Duration(rand.Int63n(int64(startInterval)))
interval := startInterval + jitter
jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval)))
interval := consumerNotActiveStartInterval + jitter
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
Expand All @@ -1648,7 +1654,7 @@ func (o *consumer) deleteNotActive() {
if nca != nil && nca == ca {
s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name)
meta.ForwardProposal(removeEntry)
if interval < maxInterval {
if interval < consumerNotActiveMaxInterval {
interval *= 2
ticker.Reset(interval)
}
Expand Down
4 changes: 0 additions & 4 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1431,10 +1431,6 @@ func (js *jetStream) monitorCluster() {
aq.recycle(&ces)

case isLeader = <-lch:
// For meta layer synchronize everyone to our state on becoming leader.
if isLeader && n.ApplyQ().len() == 0 {
n.SendSnapshot(js.metaSnapshot())
}
// Process the change.
js.processLeaderChange(isLeader)
if isLeader {
Expand Down
11 changes: 9 additions & 2 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,11 @@ func TestJetStreamClusterParallelConsumerCreation(t *testing.T) {
}

func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
consumerNotActiveStartInterval = time.Second * 5
defer func() {
consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval
}()

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down Expand Up @@ -1632,6 +1637,7 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
time.Sleep(2 * time.Second)

// Restart first and wait so that we know it will try cleanup without a metaleader.
// It will fail as there's no metaleader at that time, it should keep retrying on an interval.
c.restartServer(rs)
time.Sleep(time.Second)

Expand All @@ -1643,8 +1649,9 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
defer nc.Close()

subj := fmt.Sprintf(JSApiConsumerListT, "TEST")
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
m, err := nc.Request(subj, nil, time.Second)
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
// Request will take at most 4 seconds if some consumers can't be found.
m, err := nc.Request(subj, nil, 5*time.Second)
if err != nil {
return err
}
Expand Down
77 changes: 77 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"math/rand"
"os"
"path"
Expand Down Expand Up @@ -4152,3 +4154,78 @@ func TestJetStreamClusterPreserveWALDuringCatchupWithMatchingTerm(t *testing.T)
}
}
}

func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

copyDir := func(dst, src string) error {
srcFS := os.DirFS(src)
return fs.WalkDir(srcFS, ".", func(p string, d os.DirEntry, err error) error {
if err != nil {
return err
}
newPath := path.Join(dst, p)
if d.IsDir() {
return os.MkdirAll(newPath, defaultDirPerms)
}
r, err := srcFS.Open(p)
if err != nil {
return err
}
defer r.Close()

w, err := os.OpenFile(newPath, os.O_CREATE|os.O_WRONLY, defaultFilePerms)
if err != nil {
return err
}
defer w.Close()
_, err = io.Copy(w, r)
return err
})
}

// Simulate being hard killed by:
// 1. copy directories before shutdown
copyToSrcMap := make(map[string]string)
for _, s := range c.servers {
sd := s.StoreDir()
copySd := path.Join(t.TempDir(), JetStreamStoreDir)
err = copyDir(copySd, sd)
require_NoError(t, err)
copyToSrcMap[copySd] = sd
}

// 2. stop all
nc.Close()
c.stopAll()

// 3. revert directories to before shutdown
for cp, dest := range copyToSrcMap {
err = os.RemoveAll(dest)
require_NoError(t, err)
err = copyDir(dest, cp)
require_NoError(t, err)
}

// 4. restart
c.restartAll()
c.waitOnAllCurrent()

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

// Stream should exist still and not be removed after hard killing all servers, so expect no error.
_, err = js.StreamInfo("TEST")
require_NoError(t, err)
}
12 changes: 7 additions & 5 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6577,6 +6577,11 @@ func TestNoRaceJetStreamConsumerCreateTimeNumPending(t *testing.T) {
}

func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) {
consumerNotActiveStartInterval = time.Second * 5
defer func() {
consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval
}()

c := createJetStreamClusterExplicit(t, "GHOST", 3)
defer c.shutdown()

Expand Down Expand Up @@ -6670,11 +6675,8 @@ func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) {
time.Sleep(5 * time.Second)
cancel()

// Check we don't report missing consumers.
subj := fmt.Sprintf(JSApiConsumerListT, "TEST")
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
// Request will take at most 4 seconds if some consumers can't be found.
m, err := nc.Request(subj, nil, 5*time.Second)
checkFor(t, 30*time.Second, time.Second, func() error {
m, err := nc.Request("$JS.API.CONSUMER.LIST.TEST", nil, time.Second)
if err != nil {
return err
}
Expand Down

0 comments on commit 74e8cfd

Please sign in to comment.