From 198eb428bea60c621c6ea5d0e1ae77d45592ded8 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 19 Nov 2024 11:39:21 +0100 Subject: [PATCH] [FIXED] o.chkflr must not be greater than o.asflr & allow retries when removing messages in mset.ackMsg Signed-off-by: Maurice van Veen --- server/consumer.go | 12 +++++++++--- server/norace_test.go | 19 ++++++++++--------- server/stream.go | 14 ++++++++------ 3 files changed, 27 insertions(+), 18 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 28951dec53e..af286761939 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -6033,6 +6033,7 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error { fseq = chkfloor } + retryAsflr := asflr for seq = fseq; asflr > 0 && seq <= asflr; seq++ { if filters != nil { _, nseq, err = store.LoadNextMsgMulti(filters, seq, &smv) @@ -6045,14 +6046,19 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error { } // Only ack though if no error and seq <= ack floor. if err == nil && seq <= asflr { - mset.ackMsg(o, seq) + shouldRemove := mset.ackMsg(o, seq) + // Removing the message could fail. Lowering the floor here allows us to retry later if the removal failed. + if shouldRemove && retryAsflr == asflr { + retryAsflr = seq + } } } o.mu.Lock() // Update our check floor. - if seq > o.chkflr { - o.chkflr = seq + // Check floor must never be greater than ack floor, otherwise subsequent calls to this function would skip work. + if retryAsflr > o.chkflr { + o.chkflr = retryAsflr } // See if we need to process this update if our parent stream is not a limits policy stream. state, _ = o.store.State() diff --git a/server/norace_test.go b/server/norace_test.go index 3472d16f1b5..83a89095063 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -11065,17 +11065,18 @@ func TestNoRaceJetStreamClusterCheckInterestStatePerformanceWQ(t *testing.T) { // Was > 30 ms before fix for comparison, M2 macbook air. require_LessThan(t, elapsed, 5*time.Millisecond) - // Make sure we set the chkflr correctly. - checkFloor := func(o *consumer) uint64 { + // Make sure we set the chkflr correctly. The chkflr should be equal to asflr. + // Otherwise, a subsequent call to checkInterestState after asflr got upped will be ineffective. + requireFloorsEqual := func(o *consumer) { require_True(t, o != nil) o.mu.RLock() defer o.mu.RUnlock() - return o.chkflr + require_Equal(t, o.chkflr, o.asflr) } - require_Equal(t, checkFloor(mset.lookupConsumer("A")), 1) - require_Equal(t, checkFloor(mset.lookupConsumer("B")), 110_001) - require_Equal(t, checkFloor(mset.lookupConsumer("C")), 110_001) + requireFloorsEqual(mset.lookupConsumer("A")) + requireFloorsEqual(mset.lookupConsumer("B")) + requireFloorsEqual(mset.lookupConsumer("C")) // Expire all the blocks again. expireAllBlks() @@ -11162,9 +11163,9 @@ func TestNoRaceJetStreamClusterCheckInterestStatePerformanceInterest(t *testing. return o.chkflr } - require_Equal(t, checkFloor(mset.lookupConsumer("A")), 1) - require_Equal(t, checkFloor(mset.lookupConsumer("B")), 100_001) - require_Equal(t, checkFloor(mset.lookupConsumer("C")), 100_001) + require_Equal(t, checkFloor(mset.lookupConsumer("A")), 0) + require_Equal(t, checkFloor(mset.lookupConsumer("B")), 90_001) + require_Equal(t, checkFloor(mset.lookupConsumer("C")), 100_000) // This checks the chkflr state. For this test this should be much faster, // two orders of magnitude then the first time. diff --git a/server/stream.go b/server/stream.go index bd02a1265fb..75f94f04902 100644 --- a/server/stream.go +++ b/server/stream.go @@ -6028,16 +6028,17 @@ func (mset *stream) clearPreAck(o *consumer, seq uint64) { } // ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy. -func (mset *stream) ackMsg(o *consumer, seq uint64) { +// Returns whether the message at seq should be removed as a result of the ACK. +func (mset *stream) ackMsg(o *consumer, seq uint64) bool { if seq == 0 { - return + return false } // Don't make this RLock(). We need to have only 1 running at a time to gauge interest across all consumers. mset.mu.Lock() if mset.closed.Load() || mset.cfg.Retention == LimitsPolicy { mset.mu.Unlock() - return + return false } store := mset.store @@ -6048,7 +6049,7 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { if seq > state.LastSeq { mset.registerPreAck(o, seq) mset.mu.Unlock() - return + return false } // Always clear pre-ack if here. @@ -6057,7 +6058,7 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { // Make sure this sequence is not below our first sequence. if seq < state.FirstSeq { mset.mu.Unlock() - return + return false } var shouldRemove bool @@ -6073,7 +6074,7 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { // If nothing else to do. if !shouldRemove { - return + return false } // If we are here we should attempt to remove. @@ -6081,6 +6082,7 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) { // This should not happen, but being pedantic. mset.registerPreAckLock(o, seq) } + return true } // Snapshot creates a snapshot for the stream and possibly consumers.