Skip to content

Commit

Permalink
[FIXED] Subscription redelivery count map was not cleaned-up
Browse files Browse the repository at this point in the history
PR #997 introduced a map to keep track of how many times a message
was redelivered to a subscription (or a queue group). However, sequences
added to this map was never removed when the ACK was processed from
the subscription!

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed May 17, 2021
1 parent 99d6ed1 commit d31981b
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 7 deletions.
11 changes: 10 additions & 1 deletion server/clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7808,7 +7808,7 @@ func TestClusteringRedeliveryCount(t *testing.T) {
atomic.StoreInt32(&restarted, 1)
s1 = runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()
getLeader(t, 10*time.Second, s1, s2, s3)
leader := getLeader(t, 10*time.Second, s1, s2, s3)

select {
case e := <-errCh:
Expand Down Expand Up @@ -7839,6 +7839,15 @@ func TestClusteringRedeliveryCount(t *testing.T) {
case <-time.After(time.Second):
t.Fatalf("Timedout")
}

// Make sure that deliver count map gets cleaned-up once messages are acknowledged.
sub := leader.clients.getSubs(clientName)[0]
waitForCount(t, 0, func() (string, int) {
sub.RLock()
l := len(sub.rdlvCount)
sub.RUnlock()
return "redelivery map size", l
})
}

func testRemoveNode(t *testing.T, nc *nats.Conn, node string, timeoutExpected bool) {
Expand Down
21 changes: 18 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5428,6 +5428,15 @@ func (s *StanServer) processAck(c *channel, sub *subState, sequence uint64, from
return
}
delete(sub.acksPending, sequence)
// Remove from redelivery count map only if processing an ACK from the user,
// not simply when reassigning to a new member of a queue group.
if fromUser {
if qs != nil {
delete(qs.rdlvCount, sequence)
} else {
delete(sub.rdlvCount, sequence)
}
}
} else if qs != nil && fromUser {
// For queue members, if this is not an internally generated ACK
// and we don't find the sequence in this sub's pending, we are
Expand All @@ -5438,13 +5447,19 @@ func (s *StanServer) processAck(c *channel, sub *subState, sequence uint64, from
continue
}
qsub.Lock()
if _, found := qsub.acksPending[sequence]; found {
_, found := qsub.acksPending[sequence]
if found {
delete(qsub.acksPending, sequence)
persistAck(qsub)
qsub.Unlock()
break
}
qsub.Unlock()
if found {
// We are still under the qstate lock. Since we found this message
// in one of the member of the group, remove it from the redelivery
// count map now.
delete(qs.rdlvCount, sequence)
break
}
}
sub.Lock()
// Proceed with original sub (regardless if member was found
Expand Down
24 changes: 21 additions & 3 deletions server/server_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,7 @@ func TestQueueRedeliveryCount(t *testing.T) {
defer sc.Close()

errCh := make(chan error, 2)
ch := make(chan bool, 1)
var mu sync.Mutex
var prev uint32
cb := func(m *stan.Msg) {
Expand All @@ -1293,6 +1294,10 @@ func TestQueueRedeliveryCount(t *testing.T) {
return
}
prev = m.RedeliveryCount
if m.RedeliveryCount == 5 {
m.Ack()
ch <- true
}
mu.Unlock()
}
}
Expand All @@ -1308,7 +1313,20 @@ func TestQueueRedeliveryCount(t *testing.T) {
select {
case e := <-errCh:
t.Fatal(e.Error())
case <-time.After(500 * time.Millisecond):
// ok!
}
case <-ch:
case <-time.After(time.Second):
t.Fatalf("Timedout")
}

// Make sure that deliver count map gets cleaned-up once messages are acknowledged.
sub := s.clients.getSubs(clientName)[0]
sub.RLock()
qs := sub.qstate
sub.RUnlock()
waitForCount(t, 0, func() (string, int) {
qs.RLock()
l := len(qs.rdlvCount)
qs.RUnlock()
return "queue redelivery map size", l
})
}
9 changes: 9 additions & 0 deletions server/server_redelivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1748,6 +1748,15 @@ func TestPersistentStoreRedeliveryCount(t *testing.T) {
case <-time.After(time.Second):
t.Fatalf("Timedout")
}

// Make sure that deliver count map gets cleaned-up once messages are acknowledged.
sub := s.clients.getSubs(clientName)[0]
waitForCount(t, 0, func() (string, int) {
sub.RLock()
l := len(sub.rdlvCount)
sub.RUnlock()
return "redelivery map size", l
})
}

type testRdlvRaceWithAck struct {
Expand Down

0 comments on commit d31981b

Please sign in to comment.