Skip to content

Commit

Permalink
[FIXED] o.chkflr must not be greater than o.asflr+1
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen committed Nov 20, 2024
1 parent 71ba974 commit 7cdb600
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
5 changes: 3 additions & 2 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6045,8 +6045,9 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {

o.mu.Lock()
// Update our check floor.
if seq > o.chkflr {
o.chkflr = seq
// Check floor must never be greater than ack floor+1, otherwise subsequent calls to this function would skip work.
if asflr+1 > o.chkflr {
o.chkflr = asflr + 1
}
// See if we need to process this update if our parent stream is not a limits policy stream.
state, _ = o.store.State()
Expand Down
13 changes: 7 additions & 6 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11065,17 +11065,18 @@ func TestNoRaceJetStreamClusterCheckInterestStatePerformanceWQ(t *testing.T) {
// Was > 30 ms before fix for comparison, M2 macbook air.
require_LessThan(t, elapsed, 5*time.Millisecond)

// Make sure we set the chkflr correctly.
checkFloor := func(o *consumer) uint64 {
// Make sure we set the chkflr correctly. The chkflr should be equal to asflr+1.
// Otherwise, if chkflr would be set higher a subsequent call to checkInterestState will be ineffective.
requireFloorsEqual := func(o *consumer) {
require_True(t, o != nil)
o.mu.RLock()
defer o.mu.RUnlock()
return o.chkflr
require_Equal(t, o.chkflr, o.asflr+1)
}

require_Equal(t, checkFloor(mset.lookupConsumer("A")), 1)
require_Equal(t, checkFloor(mset.lookupConsumer("B")), 110_001)
require_Equal(t, checkFloor(mset.lookupConsumer("C")), 110_001)
requireFloorsEqual(mset.lookupConsumer("A"))
requireFloorsEqual(mset.lookupConsumer("B"))
requireFloorsEqual(mset.lookupConsumer("C"))

// Expire all the blocks again.
expireAllBlks()
Expand Down

0 comments on commit 7cdb600

Please sign in to comment.