Skip to content

Commit

Permalink
De-flake drifting test, check consumer pending after stabilized (#6170)
Browse files Browse the repository at this point in the history
The test checks if stream state and messages equal, but also checks if
the consumer pending count matches with the amount of messages in the
stream. For clustered streams this consumer pending check would be done
before the cluster is stabilized, so it could fail at that stage.

Move those checks down so we at least confirm the stream states are
equal first before we check pending.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
derekcollison authored Nov 25, 2024
2 parents a9614d6 + 5f33ce7 commit 1f1d8c6
Showing 1 changed file with 19 additions and 19 deletions.
38 changes: 19 additions & 19 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1146,13 +1146,6 @@ func TestJetStreamClusterStreamOrphanMsgsAndReplicasDrifting(t *testing.T) {
// Wait until context is done then check state.
<-ctx.Done()

var consumerPending int
for i := 0; i < 10; i++ {
ci, err := js.ConsumerInfo(sc.Name, fmt.Sprintf("consumer:EEEEE:%d", i))
require_NoError(t, err)
consumerPending += int(ci.NumPending)
}

getStreamDetails := func(t *testing.T, srv *Server) *StreamDetail {
t.Helper()
jsz, err := srv.Jsz(&JSzOptions{Accounts: true, Streams: true, Consumer: true})
Expand Down Expand Up @@ -1241,18 +1234,6 @@ func TestJetStreamClusterStreamOrphanMsgsAndReplicasDrifting(t *testing.T) {
}
}

// Check state of streams and consumers.
si, err := js.StreamInfo(sc.Name)
require_NoError(t, err)

// Only check if there are any pending messages.
if consumerPending > 0 {
streamPending := int(si.State.Msgs)
if streamPending != consumerPending {
t.Errorf("Unexpected number of pending messages, stream=%d, consumers=%d", streamPending, consumerPending)
}
}

// If clustered, check whether leader and followers have drifted.
if sc.Replicas > 1 {
// If we have drifted do not have to wait too long, usually its stuck for good.
Expand All @@ -1265,6 +1246,25 @@ func TestJetStreamClusterStreamOrphanMsgsAndReplicasDrifting(t *testing.T) {
checkMsgsEqual(t)
}

var consumerPending int
for i := 0; i < 10; i++ {
ci, err := js.ConsumerInfo(sc.Name, fmt.Sprintf("consumer:EEEEE:%d", i))
require_NoError(t, err)
consumerPending += int(ci.NumPending)
}

// Check state of streams and consumers.
si, err := js.StreamInfo(sc.Name)
require_NoError(t, err)

// Only check if there are any pending messages.
if consumerPending > 0 {
streamPending := int(si.State.Msgs)
if streamPending != consumerPending {
t.Errorf("Unexpected number of pending messages, stream=%d, consumers=%d", streamPending, consumerPending)
}
}

wg.Wait()
}

Expand Down

0 comments on commit 1f1d8c6

Please sign in to comment.