Skip to content

Commit

Permalink
Remove TestJetStreamClusterHealthzCheckForStoppedAssets
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen committed Jan 10, 2025
1 parent b3386e5 commit f109094
Showing 1 changed file with 0 additions and 86 deletions.
86 changes: 0 additions & 86 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3813,92 +3813,6 @@ func TestJetStreamClusterConsumerInfoForJszForFollowers(t *testing.T) {
}
}

// Under certain scenarios we have seen consumers become stopped and cause healthz to fail.
// The specific scneario is heavy loads, and stream resets on upgrades that could orphan consumers.
func TestJetStreamClusterHealthzCheckForStoppedAssets(t *testing.T) {
c := createJetStreamClusterExplicit(t, "NATS", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
})
require_NoError(t, err)

for i := 0; i < 1000; i++ {
sendStreamMsg(t, nc, "foo", "HELLO")
}

sub, err := js.PullSubscribe("foo", "d")
require_NoError(t, err)

fetch, ack := 122, 22
msgs, err := sub.Fetch(fetch, nats.MaxWait(10*time.Second))
require_NoError(t, err)
require_True(t, len(msgs) == fetch)
for _, m := range msgs[:ack] {
m.AckSync()
}
// Let acks propagate.
time.Sleep(100 * time.Millisecond)

// We will now stop a stream on a given server.
s := c.randomServer()
mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
// Stop the stream
mset.stop(false, false)

// Wait for exit.
time.Sleep(100 * time.Millisecond)

checkFor(t, 15*time.Second, 500*time.Millisecond, func() error {
hs := s.healthz(nil)
if hs.Error != _EMPTY_ {
return errors.New(hs.Error)
}
return nil
})

// Now take out the consumer.
mset, err = s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)

o := mset.lookupConsumer("d")
require_NotNil(t, o)

o.stop()
// Wait for exit.
time.Sleep(100 * time.Millisecond)

checkFor(t, 5*time.Second, 500*time.Millisecond, func() error {
hs := s.healthz(nil)
if hs.Error != _EMPTY_ {
return errors.New(hs.Error)
}
return nil
})

// Now just stop the raft node from underneath the consumer.
o = mset.lookupConsumer("d")
require_NotNil(t, o)
node := o.raftNode()
require_NotNil(t, node)
node.Stop()

checkFor(t, 5*time.Second, 500*time.Millisecond, func() error {
hs := s.healthz(nil)
if hs.Error != _EMPTY_ {
return errors.New(hs.Error)
}
return nil
})
}

// Make sure that stopping a stream shutdowns down it's raft node.
func TestJetStreamClusterStreamNodeShutdownBugOnStop(t *testing.T) {
c := createJetStreamClusterExplicit(t, "NATS", 3)
Expand Down

0 comments on commit f109094

Please sign in to comment.