From 76c3eb8270008830c241b48d751178eacdddcec3 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 20 Aug 2024 17:55:40 +0200 Subject: [PATCH 1/2] Fix install stream snapshots on graceful shutdown Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 7 ++++++- server/stream.go | 7 ++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 328f8fecd4..11db8eb3e0 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2261,7 +2261,12 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Make sure to stop the raft group on exit to prevent accidental memory bloat. // This should be below the checkInMonitor call though to avoid stopping it out // from underneath the one that is running since it will be the same raft node. - defer n.Stop() + defer func() { + // We might be closing during shutdown, don't pre-emptively stop here since we'll still want to install snapshots. + if !mset.closed.Load() { + n.Stop() + } + }() qch, mqch, lch, aq, uch, ourPeerId := n.QuitC(), mset.monitorQuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), meta.ID() diff --git a/server/stream.go b/server/stream.go index fc5af3c36f..5ddac6c1bc 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5340,6 +5340,10 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { // Kick monitor and collect consumers first. mset.mu.Lock() + + // Mark closed. + mset.closed.Store(true) + // Signal to the monitor loop. // Can't use qch here. if mset.mqch != nil { @@ -5400,9 +5404,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { mset.sendDeleteAdvisoryLocked() } - // Mark closed. - mset.closed.Store(true) - // Quit channel, do this after sending the delete advisory if mset.qch != nil { close(mset.qch) From 420a121d9717fcb033f146af9f1246bb30dff997 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 20 Aug 2024 22:26:58 +0200 Subject: [PATCH 2/2] Also processSnapshot on recovery Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 11db8eb3e0..92551d4d33 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3138,13 +3138,10 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco } } - if !isRecovering && !mset.IsLeader() { + if isRecovering || !mset.IsLeader() { if err := mset.processSnapshot(ss); err != nil { return err } - } else if isRecovering { - // On recovery, reset CLFS/FAILED. - mset.setCLFS(ss.Failed) } } else if e.Type == EntryRemovePeer { js.mu.RLock()