diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 328f8fecd4..92551d4d33 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2261,7 +2261,12 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // Make sure to stop the raft group on exit to prevent accidental memory bloat. // This should be below the checkInMonitor call though to avoid stopping it out // from underneath the one that is running since it will be the same raft node. - defer n.Stop() + defer func() { + // We might be closing during shutdown, don't pre-emptively stop here since we'll still want to install snapshots. + if !mset.closed.Load() { + n.Stop() + } + }() qch, mqch, lch, aq, uch, ourPeerId := n.QuitC(), mset.monitorQuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), meta.ID() @@ -3133,13 +3138,10 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco } } - if !isRecovering && !mset.IsLeader() { + if isRecovering || !mset.IsLeader() { if err := mset.processSnapshot(ss); err != nil { return err } - } else if isRecovering { - // On recovery, reset CLFS/FAILED. - mset.setCLFS(ss.Failed) } } else if e.Type == EntryRemovePeer { js.mu.RLock() diff --git a/server/stream.go b/server/stream.go index fc5af3c36f..5ddac6c1bc 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5340,6 +5340,10 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { // Kick monitor and collect consumers first. mset.mu.Lock() + + // Mark closed. + mset.closed.Store(true) + // Signal to the monitor loop. // Can't use qch here. if mset.mqch != nil { @@ -5400,9 +5404,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { mset.sendDeleteAdvisoryLocked() } - // Mark closed. - mset.closed.Store(true) - // Quit channel, do this after sending the delete advisory if mset.qch != nil { close(mset.qch)