Skip to content

Commit

Permalink
(2.11) Don't send meta snapshot when becoming metaleader (#5700)
Browse files Browse the repository at this point in the history
Antithesis testing has found that late or out-of-order delivery of these
snapshots, likely due to latency or thread pauses, can cause stream
assignments to be reverted which results in assets being deleted and
recreated. There may also be a race condition where the metalayer comes
up before network connectivity to all other nodes is fully established
so we may end up generating snapshots that don't include assets we don't
know about yet.

We will want to audit all uses of `SendSnapshot` as it somewhat breaks
the consistency model, especially now that we have fixed a significant
number of Raft bugs that `SendSnapshot` usage may have been papering
over.

Further Antithesis runs without this code run fine and have eliminated a
number of unexpected calls to `processStreamRemoval`.

We've also added a new unit test
`TestJetStreamClusterHardKillAfterStreamAdd` for a long-known issue, as
well as a couple tweaks to the ghost consumer tests to make them
reliable.

Signed-off-by: Neil Twigg <neil@nats.io>

---------

Signed-off-by: Neil Twigg <neil@nats.io>
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
Co-authored-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
neilalexander and MauriceVanVeen authored Oct 3, 2024
1 parent ef7d912 commit acbca0f
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 26 deletions.
20 changes: 13 additions & 7 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1637,6 +1637,16 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool {
return false
}

const (
defaultConsumerNotActiveStartInterval = 30 * time.Second
defaultConsumerNotActiveMaxInterval = 5 * time.Minute
)

var (
consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval
consumerNotActiveMaxInterval = defaultConsumerNotActiveMaxInterval
)

func (o *consumer) deleteNotActive() {
o.mu.Lock()
if o.mset == nil {
Expand Down Expand Up @@ -1702,12 +1712,8 @@ func (o *consumer) deleteNotActive() {
// Check to make sure we went away.
// Don't think this needs to be a monitored go routine.
go func() {
const (
startInterval = 30 * time.Second
maxInterval = 5 * time.Minute
)
jitter := time.Duration(rand.Int63n(int64(startInterval)))
interval := startInterval + jitter
jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval)))
interval := consumerNotActiveStartInterval + jitter
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
Expand All @@ -1722,7 +1728,7 @@ func (o *consumer) deleteNotActive() {
if nca != nil && nca == ca {
s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name)
meta.ForwardProposal(removeEntry)
if interval < maxInterval {
if interval < consumerNotActiveMaxInterval {
interval *= 2
ticker.Reset(interval)
}
Expand Down
4 changes: 0 additions & 4 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1406,10 +1406,6 @@ func (js *jetStream) monitorCluster() {
aq.recycle(&ces)

case isLeader = <-lch:
// For meta layer synchronize everyone to our state on becoming leader.
if isLeader && n.ApplyQ().len() == 0 {
n.SendSnapshot(js.metaSnapshot())
}
// Process the change.
js.processLeaderChange(isLeader)
if isLeader {
Expand Down
11 changes: 9 additions & 2 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,11 @@ func TestJetStreamClusterParallelConsumerCreation(t *testing.T) {
}

func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
consumerNotActiveStartInterval = time.Second * 5
defer func() {
consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval
}()

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down Expand Up @@ -1632,6 +1637,7 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
time.Sleep(2 * time.Second)

// Restart first and wait so that we know it will try cleanup without a metaleader.
// It will fail as there's no metaleader at that time, it should keep retrying on an interval.
c.restartServer(rs)
time.Sleep(time.Second)

Expand All @@ -1643,8 +1649,9 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
defer nc.Close()

subj := fmt.Sprintf(JSApiConsumerListT, "TEST")
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
m, err := nc.Request(subj, nil, time.Second)
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
// Request will take at most 4 seconds if some consumers can't be found.
m, err := nc.Request(subj, nil, 5*time.Second)
if err != nil {
return err
}
Expand Down
78 changes: 78 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"math/rand"
"os"
"path"
"path/filepath"
"runtime"
"slices"
Expand Down Expand Up @@ -4159,3 +4162,78 @@ func TestJetStreamClusterDesyncAfterCatchupTooManyRetries(t *testing.T) {
newStreamLeaderServer := c.streamLeader(globalAccountName, "TEST")
require_Equal(t, newStreamLeaderServer.Name(), clusterResetServerName)
}

func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

copyDir := func(dst, src string) error {
srcFS := os.DirFS(src)
return fs.WalkDir(srcFS, ".", func(p string, d os.DirEntry, err error) error {
if err != nil {
return err
}
newPath := path.Join(dst, p)
if d.IsDir() {
return os.MkdirAll(newPath, defaultDirPerms)
}
r, err := srcFS.Open(p)
if err != nil {
return err
}
defer r.Close()

w, err := os.OpenFile(newPath, os.O_CREATE|os.O_WRONLY, defaultFilePerms)
if err != nil {
return err
}
defer w.Close()
_, err = io.Copy(w, r)
return err
})
}

// Simulate being hard killed by:
// 1. copy directories before shutdown
copyToSrcMap := make(map[string]string)
for _, s := range c.servers {
sd := s.StoreDir()
copySd := path.Join(t.TempDir(), JetStreamStoreDir)
err = copyDir(copySd, sd)
require_NoError(t, err)
copyToSrcMap[copySd] = sd
}

// 2. stop all
nc.Close()
c.stopAll()

// 3. revert directories to before shutdown
for cp, dest := range copyToSrcMap {
err = os.RemoveAll(dest)
require_NoError(t, err)
err = copyDir(dest, cp)
require_NoError(t, err)
}

// 4. restart
c.restartAll()
c.waitOnAllCurrent()

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

// Stream should exist still and not be removed after hard killing all servers, so expect no error.
_, err = js.StreamInfo("TEST")
require_NoError(t, err)
}
26 changes: 13 additions & 13 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6577,6 +6577,11 @@ func TestNoRaceJetStreamConsumerCreateTimeNumPending(t *testing.T) {
}

func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) {
consumerNotActiveStartInterval = time.Second * 5
defer func() {
consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval
}()

c := createJetStreamClusterExplicit(t, "GHOST", 3)
defer c.shutdown()

Expand Down Expand Up @@ -6670,22 +6675,17 @@ func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) {
time.Sleep(5 * time.Second)
cancel()

getMissing := func() []string {
m, err := nc.Request("$JS.API.CONSUMER.LIST.TEST", nil, time.Second*10)
require_NoError(t, err)

checkFor(t, 30*time.Second, time.Second, func() error {
m, err := nc.Request("$JS.API.CONSUMER.LIST.TEST", nil, time.Second)
if err != nil {
return err
}
var resp JSApiConsumerListResponse
err = json.Unmarshal(m.Data, &resp)
require_NoError(t, err)
return resp.Missing
}

checkFor(t, 10*time.Second, 500*time.Millisecond, func() error {
missing := getMissing()
if len(missing) == 0 {
require_NoError(t, json.Unmarshal(m.Data, &resp))
if len(resp.Missing) == 0 {
return nil
}
return fmt.Errorf("Still have missing: %+v", missing)
return fmt.Errorf("Still have missing: %+v", resp.Missing)
})
}

Expand Down

0 comments on commit acbca0f

Please sign in to comment.