Skip to content

Commit

Permalink
[FIXED] o.chkflr must not be greater than o.asflr & allow retries whe…
Browse files Browse the repository at this point in the history
…n removing messages in mset.ackMsg

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen committed Nov 20, 2024
1 parent 2dadd72 commit 9410103
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 16 deletions.
17 changes: 14 additions & 3 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6033,6 +6033,7 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
fseq = chkfloor
}

var retryAsflr uint64
for seq = fseq; asflr > 0 && seq <= asflr; seq++ {
if filters != nil {
_, nseq, err = store.LoadNextMsgMulti(filters, seq, &smv)
Expand All @@ -6045,14 +6046,24 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
}
// Only ack though if no error and seq <= ack floor.
if err == nil && seq <= asflr {
mset.ackMsg(o, seq)
didRemove := mset.ackMsg(o, seq)
// Removing the message could fail.
// Overwrite retry floor (only the first time) to allow us to check next time if the removal was successful.
if didRemove && retryAsflr == 0 {
retryAsflr = seq
}
}
}
// If retry floor was not overwritten, set to ack floor+1, we don't need to account for any retries below it.
if retryAsflr == 0 {
retryAsflr = asflr + 1
}

o.mu.Lock()
// Update our check floor.
if seq > o.chkflr {
o.chkflr = seq
// Check floor must never be greater than ack floor+1, otherwise subsequent calls to this function would skip work.
if retryAsflr > o.chkflr {
o.chkflr = retryAsflr
}
// See if we need to process this update if our parent stream is not a limits policy stream.
state, _ = o.store.State()
Expand Down
15 changes: 8 additions & 7 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11065,17 +11065,18 @@ func TestNoRaceJetStreamClusterCheckInterestStatePerformanceWQ(t *testing.T) {
// Was > 30 ms before fix for comparison, M2 macbook air.
require_LessThan(t, elapsed, 5*time.Millisecond)

// Make sure we set the chkflr correctly.
checkFloor := func(o *consumer) uint64 {
// Make sure we set the chkflr correctly. The chkflr should be equal to asflr+1.
// Otherwise, if chkflr would be set higher a subsequent call to checkInterestState will be ineffective.
requireFloorsEqual := func(o *consumer) {
require_True(t, o != nil)
o.mu.RLock()
defer o.mu.RUnlock()
return o.chkflr
require_Equal(t, o.chkflr, o.asflr+1)
}

require_Equal(t, checkFloor(mset.lookupConsumer("A")), 1)
require_Equal(t, checkFloor(mset.lookupConsumer("B")), 110_001)
require_Equal(t, checkFloor(mset.lookupConsumer("C")), 110_001)
requireFloorsEqual(mset.lookupConsumer("A"))
requireFloorsEqual(mset.lookupConsumer("B"))
requireFloorsEqual(mset.lookupConsumer("C"))

// Expire all the blocks again.
expireAllBlks()
Expand Down Expand Up @@ -11163,7 +11164,7 @@ func TestNoRaceJetStreamClusterCheckInterestStatePerformanceInterest(t *testing.
}

require_Equal(t, checkFloor(mset.lookupConsumer("A")), 1)
require_Equal(t, checkFloor(mset.lookupConsumer("B")), 100_001)
require_Equal(t, checkFloor(mset.lookupConsumer("B")), 90_001)
require_Equal(t, checkFloor(mset.lookupConsumer("C")), 100_001)

// This checks the chkflr state. For this test this should be much faster,
Expand Down
14 changes: 8 additions & 6 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6028,16 +6028,17 @@ func (mset *stream) clearPreAck(o *consumer, seq uint64) {
}

// ackMsg is called into from a consumer when we have a WorkQueue or Interest Retention Policy.
func (mset *stream) ackMsg(o *consumer, seq uint64) {
// Returns whether the message at seq was removed as a result of the ACK.
func (mset *stream) ackMsg(o *consumer, seq uint64) bool {
if seq == 0 {
return
return false
}

// Don't make this RLock(). We need to have only 1 running at a time to gauge interest across all consumers.
mset.mu.Lock()
if mset.closed.Load() || mset.cfg.Retention == LimitsPolicy {
mset.mu.Unlock()
return
return false
}

store := mset.store
Expand All @@ -6048,7 +6049,7 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) {
if seq > state.LastSeq {
mset.registerPreAck(o, seq)
mset.mu.Unlock()
return
return false
}

// Always clear pre-ack if here.
Expand All @@ -6057,7 +6058,7 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) {
// Make sure this sequence is not below our first sequence.
if seq < state.FirstSeq {
mset.mu.Unlock()
return
return false
}

var shouldRemove bool
Expand All @@ -6073,14 +6074,15 @@ func (mset *stream) ackMsg(o *consumer, seq uint64) {

// If nothing else to do.
if !shouldRemove {
return
return false
}

// If we are here we should attempt to remove.
if _, err := store.RemoveMsg(seq); err == ErrStoreEOF {
// This should not happen, but being pedantic.
mset.registerPreAckLock(o, seq)
}
return true
}

// Snapshot creates a snapshot for the stream and possibly consumers.
Expand Down

0 comments on commit 9410103

Please sign in to comment.