From e576cf13e67e5f43d370f91d581e4ea549de4637 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 25 Jul 2024 09:57:07 +0100 Subject: [PATCH 1/3] Don't send meta snapshot when becoming metaleader Antithesis testing has found that late or out-of-order delivery of these snapshots, likely due to latency or thread pauses, can cause stream assignments to be reverted which results in assets being deleted and recreated. There may also be a race condition where the metalayer comes up before network connectivity is fully established so we may end up generating snapshots that don't include assets we don't know about yet. We will want to audit all uses of `SendSnapshot` as it somewhat breaks the consistency model, especially now that we have fixed a significant number of Raft bugs that `SendSnapshot` usage may have been papering over. Further Antithesis runs without this code run fine and have eliminated a number of unexpected calls to `processStreamRemoval`. Signed-off-by: Neil Twigg --- server/jetstream_cluster.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 6928d3199e..e43869c371 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1406,10 +1406,6 @@ func (js *jetStream) monitorCluster() { aq.recycle(&ces) case isLeader = <-lch: - // For meta layer synchronize everyone to our state on becoming leader. - if isLeader && n.ApplyQ().len() == 0 { - n.SendSnapshot(js.metaSnapshot()) - } // Process the change. js.processLeaderChange(isLeader) if isLeader { From 82c1371821a9b2e8d65a0a9b533d8195a6c6ccfb Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 2 Oct 2024 11:38:22 +0200 Subject: [PATCH 2/3] Test hard kill after stream add should not remove stream Signed-off-by: Maurice van Veen --- server/jetstream_cluster_4_test.go | 78 ++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 2311638d57..eeb95924cd 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -21,8 +21,11 @@ import ( "encoding/json" "errors" "fmt" + "io" + "io/fs" "math/rand" "os" + "path" "path/filepath" "runtime" "slices" @@ -4159,3 +4162,78 @@ func TestJetStreamClusterDesyncAfterCatchupTooManyRetries(t *testing.T) { newStreamLeaderServer := c.streamLeader(globalAccountName, "TEST") require_Equal(t, newStreamLeaderServer.Name(), clusterResetServerName) } + +func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + copyDir := func(dst, src string) error { + srcFS := os.DirFS(src) + return fs.WalkDir(srcFS, ".", func(p string, d os.DirEntry, err error) error { + if err != nil { + return err + } + newPath := path.Join(dst, p) + if d.IsDir() { + return os.MkdirAll(newPath, defaultDirPerms) + } + r, err := srcFS.Open(p) + if err != nil { + return err + } + defer r.Close() + + w, err := os.OpenFile(newPath, os.O_CREATE|os.O_WRONLY, defaultFilePerms) + if err != nil { + return err + } + defer w.Close() + _, err = io.Copy(w, r) + return err + }) + } + + // Simulate being hard killed by: + // 1. copy directories before shutdown + copyToSrcMap := make(map[string]string) + for _, s := range c.servers { + sd := s.StoreDir() + copySd := path.Join(t.TempDir(), JetStreamStoreDir) + err = copyDir(copySd, sd) + require_NoError(t, err) + copyToSrcMap[copySd] = sd + } + + // 2. stop all + nc.Close() + c.stopAll() + + // 3. revert directories to before shutdown + for cp, dest := range copyToSrcMap { + err = os.RemoveAll(dest) + require_NoError(t, err) + err = copyDir(dest, cp) + require_NoError(t, err) + } + + // 4. restart + c.restartAll() + c.waitOnAllCurrent() + + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Stream should exist still and not be removed after hard killing all servers, so expect no error. + _, err = js.StreamInfo("TEST") + require_NoError(t, err) +} From 03ed9c1e250590f148eada977b64d14f9471acfc Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 2 Oct 2024 14:41:20 +0200 Subject: [PATCH 3/3] Factor out consumer cleanup times to deflake orphaned consumer tests Signed-off-by: Neil Twigg Co-authored-by: Maurice van Veen --- server/consumer.go | 20 +++++++++++++------- server/jetstream_cluster_3_test.go | 11 +++++++++-- server/norace_test.go | 26 +++++++++++++------------- 3 files changed, 35 insertions(+), 22 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 61d6420bd3..a30fb31dc8 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1637,6 +1637,16 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool { return false } +const ( + defaultConsumerNotActiveStartInterval = 30 * time.Second + defaultConsumerNotActiveMaxInterval = 5 * time.Minute +) + +var ( + consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval + consumerNotActiveMaxInterval = defaultConsumerNotActiveMaxInterval +) + func (o *consumer) deleteNotActive() { o.mu.Lock() if o.mset == nil { @@ -1702,12 +1712,8 @@ func (o *consumer) deleteNotActive() { // Check to make sure we went away. // Don't think this needs to be a monitored go routine. go func() { - const ( - startInterval = 30 * time.Second - maxInterval = 5 * time.Minute - ) - jitter := time.Duration(rand.Int63n(int64(startInterval))) - interval := startInterval + jitter + jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval))) + interval := consumerNotActiveStartInterval + jitter ticker := time.NewTicker(interval) defer ticker.Stop() for range ticker.C { @@ -1722,7 +1728,7 @@ func (o *consumer) deleteNotActive() { if nca != nil && nca == ca { s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name) meta.ForwardProposal(removeEntry) - if interval < maxInterval { + if interval < consumerNotActiveMaxInterval { interval *= 2 ticker.Reset(interval) } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 045aa5c1c7..001b114c9e 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -1600,6 +1600,11 @@ func TestJetStreamClusterParallelConsumerCreation(t *testing.T) { } func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) { + consumerNotActiveStartInterval = time.Second * 5 + defer func() { + consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval + }() + c := createJetStreamClusterExplicit(t, "R3S", 3) defer c.shutdown() @@ -1632,6 +1637,7 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) { time.Sleep(2 * time.Second) // Restart first and wait so that we know it will try cleanup without a metaleader. + // It will fail as there's no metaleader at that time, it should keep retrying on an interval. c.restartServer(rs) time.Sleep(time.Second) @@ -1643,8 +1649,9 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) { defer nc.Close() subj := fmt.Sprintf(JSApiConsumerListT, "TEST") - checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { - m, err := nc.Request(subj, nil, time.Second) + checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { + // Request will take at most 4 seconds if some consumers can't be found. + m, err := nc.Request(subj, nil, 5*time.Second) if err != nil { return err } diff --git a/server/norace_test.go b/server/norace_test.go index 5e01b6b9ed..98d29cb2bc 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -6577,6 +6577,11 @@ func TestNoRaceJetStreamConsumerCreateTimeNumPending(t *testing.T) { } func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) { + consumerNotActiveStartInterval = time.Second * 5 + defer func() { + consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval + }() + c := createJetStreamClusterExplicit(t, "GHOST", 3) defer c.shutdown() @@ -6670,22 +6675,17 @@ func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) { time.Sleep(5 * time.Second) cancel() - getMissing := func() []string { - m, err := nc.Request("$JS.API.CONSUMER.LIST.TEST", nil, time.Second*10) - require_NoError(t, err) - + checkFor(t, 30*time.Second, time.Second, func() error { + m, err := nc.Request("$JS.API.CONSUMER.LIST.TEST", nil, time.Second) + if err != nil { + return err + } var resp JSApiConsumerListResponse - err = json.Unmarshal(m.Data, &resp) - require_NoError(t, err) - return resp.Missing - } - - checkFor(t, 10*time.Second, 500*time.Millisecond, func() error { - missing := getMissing() - if len(missing) == 0 { + require_NoError(t, json.Unmarshal(m.Data, &resp)) + if len(resp.Missing) == 0 { return nil } - return fmt.Errorf("Still have missing: %+v", missing) + return fmt.Errorf("Still have missing: %+v", resp.Missing) }) }