Skip to content

Commit

Permalink
De-flake don't InstallSnapshot when stopping tests (#6168)
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
derekcollison authored Nov 25, 2024
2 parents 1a4fec8 + 92d9c35 commit a9614d6
Showing 1 changed file with 24 additions and 10 deletions.
34 changes: 24 additions & 10 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4605,8 +4605,8 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) {
require_NoError(t, err)

// Wait for all servers to have applied everything.
// Expect 2: EntryPeerState, streamMsgOp
expectedApplied := uint64(2)
var maxApplied uint64
var tries int
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
for _, s := range c.servers {
acc, err := s.lookupAccount(globalAccountName)
Expand All @@ -4617,10 +4617,17 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) {
if err != nil {
return err
}
if _, _, applied := mset.node.Progress(); applied != expectedApplied {
return fmt.Errorf("applied doesn't match, expected %d, got %d", expectedApplied, applied)
if _, _, applied := mset.node.Progress(); applied > maxApplied {
maxApplied, tries = applied, 0
return fmt.Errorf("applied upped to %d", maxApplied)
} else if applied != maxApplied {
return fmt.Errorf("applied doesn't match, expected %d, got %d", maxApplied, applied)
}
}
tries++
if tries < 3 {
return fmt.Errorf("retrying for applied %d (try %d)", maxApplied, tries)
}
return nil
})

Expand All @@ -4636,7 +4643,7 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) {
// Validate the snapshot reflects applied.
validateStreamState := func(snap *snapshot) {
t.Helper()
require_Equal(t, snap.lastIndex, expectedApplied)
require_Equal(t, snap.lastIndex, maxApplied)
ss, err := DecodeStreamState(snap.data)
require_NoError(t, err)
require_Equal(t, ss.FirstSeq, 1)
Expand Down Expand Up @@ -4696,8 +4703,8 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) {
require_NoError(t, err)

// Wait for all servers to have applied everything.
// Expect 4: addPendingRequest, removePendingRequest, updateDeliveredOp, updateAcksOp
expectedApplied := uint64(4)
var maxApplied uint64
var tries int
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
for _, s := range c.servers {
acc, err := s.lookupAccount(globalAccountName)
Expand All @@ -4712,10 +4719,17 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) {
if o == nil {
return errors.New("consumer not found")
}
if _, _, applied := o.node.Progress(); applied != expectedApplied {
return fmt.Errorf("applied doesn't match, expected %d, got %d", expectedApplied, applied)
if _, _, applied := o.node.Progress(); applied > maxApplied {
maxApplied, tries = applied, 0
return fmt.Errorf("applied upped to %d", maxApplied)
} else if applied != maxApplied {
return fmt.Errorf("applied doesn't match, expected %d, got %d", maxApplied, applied)
}
}
tries++
if tries < 3 {
return fmt.Errorf("retrying for applied %d (try %d)", maxApplied, tries)
}
return nil
})

Expand All @@ -4735,7 +4749,7 @@ func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) {
// Validate the snapshot reflects applied.
validateConsumerState := func(snap *snapshot) {
t.Helper()
require_Equal(t, snap.lastIndex, expectedApplied)
require_Equal(t, snap.lastIndex, maxApplied)
state, err := decodeConsumerState(snap.data)
require_NoError(t, err)
require_Equal(t, state.Delivered.Consumer, 1)
Expand Down

0 comments on commit a9614d6

Please sign in to comment.