Skip to content

Commit

Permalink
[FIXED] Health check must not recreate stream/consumer (#6362)
Browse files Browse the repository at this point in the history
Calling healthz could result in streams and/or consumers to be
restarted.
There's a race condition that can happen where a user recreates a
stream/consumer and the health check kicks in at that moment. This would
result in reviving a just-deleted stream/consumer, resulting in either
dead streams/consumers remaining or potentially leaderless states if
different RAFT groups would remain.

A stream/consumer must not be restarted in the health check as it has no
awareness of what's happening in another part of the system. Is the
stream just deleted, is it restarting due to an error, is it actually
stopped due to a bug? It can't know, and it shouldn't assume it's safe
to restart. Due to the way the JS lock is used combined with creating
copies of the stream/consumer assignment means that various race
conditions can happen where restarting would be the wrong choice.

More importantly (and put simply), stream/consumer assignments MUST only
be changed via meta entries or meta snapshots. Doing it in any other
place can result in race conditions/ordering issues. (Just like
snapshotting in any other place than in the monitor routine resulted in
race conditions before:
#6153)

Detecting and correcting RAFT node skew is kept, although likely the
health check shouldn't be doing that either. However, there was a bug
where RAFT node skew would be detected for a consumer, it would be
deleted, but not recreated if it was initially created within <10s.
That's now fixed as well.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
derekcollison authored Jan 10, 2025
2 parents 05d5f87 + f109094 commit 158d7cd
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 189 deletions.
109 changes: 6 additions & 103 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,73 +443,6 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool {
return false
}

// Restart the stream in question.
// Should only be called when the stream is known to be in a bad state.
func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) {
js.mu.Lock()
s, cc := js.srv, js.cluster
if cc == nil {
js.mu.Unlock()
return
}
// Need to lookup the one directly from the meta layer, what we get handed is a copy if coming from isStreamHealthy.
asa := cc.streams[acc.Name]
if asa == nil {
js.mu.Unlock()
return
}
sa := asa[csa.Config.Name]
if sa == nil {
js.mu.Unlock()
return
}
// Make sure to clear out the raft node if still present in the meta layer.
if rg := sa.Group; rg != nil && rg.node != nil {
if rg.node.State() != Closed {
rg.node.Stop()
}
rg.node = nil
}
sinceCreation := time.Since(sa.Created)
js.mu.Unlock()

// Process stream assignment to recreate.
// Check that we have given system enough time to start us up.
// This will be longer than obvious, and matches consumer logic in case system very busy.
if sinceCreation < 10*time.Second {
s.Debugf("Not restarting missing stream '%s > %s', too soon since creation %v",
acc, csa.Config.Name, sinceCreation)
return
}

js.processStreamAssignment(sa)

// If we had consumers assigned to this server they will be present in the copy, csa.
// They also need to be processed. The csa consumers is a copy of only our consumers,
// those assigned to us, but the consumer assignment's there are direct from the meta
// layer to make this part much easier and avoid excessive lookups.
for _, cca := range csa.consumers {
if cca.deleted {
continue
}
// Need to look up original as well here to make sure node is nil.
js.mu.Lock()
ca := sa.consumers[cca.Name]
if ca != nil && ca.Group != nil {
// Make sure the node is stopped if still running.
if node := ca.Group.node; node != nil && node.State() != Closed {
node.Stop()
}
// Make sure node is wiped.
ca.Group.node = nil
}
js.mu.Unlock()
if ca != nil {
js.processConsumerAssignment(ca)
}
}
}

// isStreamHealthy will determine if the stream is up to date or very close.
// For R1 it will make sure the stream is present on this server.
func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
Expand All @@ -535,7 +468,6 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
// First lookup stream and make sure its there.
mset, err := acc.lookupStream(streamName)
if err != nil {
js.restartStream(acc, sa)
return false
}

Expand All @@ -560,8 +492,6 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName)
node.Delete()
mset.resetClusteredState(nil)
} else if node.State() == Closed {
js.restartStream(acc, sa)
}
}
return false
Expand Down Expand Up @@ -591,37 +521,9 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
node := ca.Group.node
js.mu.RUnlock()

// When we try to restart we nil out the node if applicable
// and reprocess the consumer assignment.
restartConsumer := func() {
mset.mu.RLock()
accName, streamName := mset.acc.GetName(), mset.cfg.Name
mset.mu.RUnlock()

js.mu.Lock()
deleted := ca.deleted
// Check that we have not just been created.
if !deleted && time.Since(ca.Created) < 10*time.Second {
s.Debugf("Not restarting missing consumer '%s > %s > %s', too soon since creation %v",
accName, streamName, consumer, time.Since(ca.Created))
js.mu.Unlock()
return
}
// Make sure the node is stopped if still running.
if node != nil && node.State() != Closed {
node.Stop()
}
ca.Group.node = nil
js.mu.Unlock()
if !deleted {
js.processConsumerAssignment(ca)
}
}

// Check if not running at all.
o := mset.lookupConsumer(consumer)
if o == nil {
restartConsumer()
return false
}

Expand All @@ -636,11 +538,12 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
s.Warnf("Detected consumer cluster node skew '%s > %s > %s'", accName, streamName, consumer)
node.Delete()
o.deleteWithoutAdvisory()
restartConsumer()
} else if node.State() == Closed {
// We have a consumer, and it should have a running node but it is closed.
o.stop()
restartConsumer()

// When we try to restart we nil out the node and reprocess the consumer assignment.
js.mu.Lock()
ca.Group.node = nil
js.mu.Unlock()
js.processConsumerAssignment(ca)
}
}
return false
Expand Down
231 changes: 231 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7092,6 +7092,237 @@ func TestJetStreamClusterClearAllPreAcksOnRemoveMsg(t *testing.T) {
checkPreAcks(3, 0)
}

func TestJetStreamClusterStreamHealthCheckMustNotRecreate(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

waitForStreamAssignments := func() {
t.Helper()
checkFor(t, 5*time.Second, time.Second, func() error {
for _, s := range c.servers {
if s.getJetStream().streamAssignment(globalAccountName, "TEST") == nil {
return fmt.Errorf("stream assignment not found on %s", s.Name())
}
}
return nil
})
}
waitForNoStreamAssignments := func() {
t.Helper()
checkFor(t, 5*time.Second, time.Second, func() error {
for _, s := range c.servers {
if s.getJetStream().streamAssignment(globalAccountName, "TEST") != nil {
return fmt.Errorf("stream assignment still available on %s", s.Name())
}
}
return nil
})
}
getStreamAssignment := func(rs *Server) (*jetStream, *Account, *streamAssignment, *stream) {
acc, err := rs.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NotNil(t, err)

sjs := rs.getJetStream()
sjs.mu.RLock()
defer sjs.mu.RUnlock()

sas := sjs.cluster.streams[globalAccountName]
require_True(t, sas != nil)
sa := sas["TEST"]
require_True(t, sa != nil)
sa.Created = time.Time{}
return sjs, acc, sa, mset
}
checkNodeIsClosed := func(sa *streamAssignment) {
t.Helper()
require_True(t, sa.Group != nil)
rg := sa.Group
require_True(t, rg.node != nil)
n := rg.node
require_Equal(t, n.State(), Closed)
}

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
waitForStreamAssignments()

// We manually stop the RAFT node and ensure it doesn't get restarted.
rs := c.randomNonStreamLeader(globalAccountName, "TEST")
sjs, acc, sa, mset := getStreamAssignment(rs)
require_True(t, sa.Group != nil)
rg := sa.Group
require_True(t, rg.node != nil)
n := rg.node
n.Stop()
n.WaitForStop()

// We wait for the monitor to exit, so we can set the flag back manually.
checkFor(t, 5*time.Second, time.Second, func() error {
mset.mu.RLock()
defer mset.mu.RUnlock()
if mset.inMonitor {
return errors.New("waiting for monitor to stop")
}
return nil
})
mset.mu.Lock()
mset.inMonitor = true
mset.mu.Unlock()

// The RAFT node should be closed. Checking health must not change that.
// Simulates a race condition where we're shutting down, but we're still in the stream monitor.
checkNodeIsClosed(sa)
sjs.isStreamHealthy(acc, sa)
checkNodeIsClosed(sa)

err = js.DeleteStream("TEST")
require_NoError(t, err)
waitForNoStreamAssignments()

// Underlying layer would be aware the health check made a copy.
// So we sneakily set these values back, which simulates a race condition where
// the health check is called while the deletion is in progress. This could happen
// depending on how the locks are used.
sjs.mu.Lock()
sjs.cluster.streams = make(map[string]map[string]*streamAssignment)
sjs.cluster.streams[globalAccountName] = make(map[string]*streamAssignment)
sjs.cluster.streams[globalAccountName]["TEST"] = sa
sa.Group.node = n
sjs.mu.Unlock()

// The underlying stream has been deleted. Checking health must not recreate the stream.
checkNodeIsClosed(sa)
sjs.isStreamHealthy(acc, sa)
checkNodeIsClosed(sa)
}

func TestJetStreamClusterConsumerHealthCheckMustNotRecreate(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

waitForConsumerAssignments := func() {
t.Helper()
checkFor(t, 5*time.Second, time.Second, func() error {
for _, s := range c.servers {
if s.getJetStream().consumerAssignment(globalAccountName, "TEST", "CONSUMER") == nil {
return fmt.Errorf("stream assignment not found on %s", s.Name())
}
}
return nil
})
}
waitForNoConsumerAssignments := func() {
t.Helper()
checkFor(t, 5*time.Second, time.Second, func() error {
for _, s := range c.servers {
if s.getJetStream().consumerAssignment(globalAccountName, "TEST", "CONSUMER") != nil {
return fmt.Errorf("stream assignment still available on %s", s.Name())
}
}
return nil
})
}
getConsumerAssignment := func(rs *Server) (*jetStream, *streamAssignment, *consumerAssignment, *stream) {
acc, err := rs.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NotNil(t, err)

sjs := rs.getJetStream()
sjs.mu.RLock()
defer sjs.mu.RUnlock()

sas := sjs.cluster.streams[globalAccountName]
require_True(t, sas != nil)
sa := sas["TEST"]
require_True(t, sa != nil)
ca := sa.consumers["CONSUMER"]
require_True(t, ca != nil)
ca.Created = time.Time{}
return sjs, sa, ca, mset
}
checkNodeIsClosed := func(ca *consumerAssignment) {
t.Helper()
require_True(t, ca.Group != nil)
rg := ca.Group
require_True(t, rg.node != nil)
n := rg.node
require_Equal(t, n.State(), Closed)
}

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"})
require_NoError(t, err)
waitForConsumerAssignments()

// We manually stop the RAFT node and ensure it doesn't get restarted.
rs := c.randomNonConsumerLeader(globalAccountName, "TEST", "CONSUMER")
sjs, sa, ca, mset := getConsumerAssignment(rs)
require_True(t, ca.Group != nil)
rg := ca.Group
require_True(t, rg.node != nil)
n := rg.node
n.Stop()
n.WaitForStop()

// The RAFT node should be closed. Checking health must not change that.
// Simulates a race condition where we're shutting down.
checkNodeIsClosed(ca)
sjs.isConsumerHealthy(mset, "CONSUMER", ca)
checkNodeIsClosed(ca)

// We create a new RAFT group, the health check should detect this skew and restart.
err = sjs.createRaftGroup(globalAccountName, ca.Group, MemoryStorage, pprofLabels{})
require_NoError(t, err)
sjs.mu.Lock()
// We set creating to now, since previously it would delete all data but NOT restart if created within <10s.
ca.Created = time.Now()
// Setting ca.pending, since a side effect of js.processConsumerAssignment is that it resets it.
ca.pending = true
sjs.mu.Unlock()
sjs.isConsumerHealthy(mset, "CONSUMER", ca)
require_False(t, ca.pending)

err = js.DeleteConsumer("TEST", "CONSUMER")
require_NoError(t, err)
waitForNoConsumerAssignments()

// Underlying layer would be aware the health check made a copy.
// So we sneakily set these values back, which simulates a race condition where
// the health check is called while the deletion is in progress. This could happen
// depending on how the locks are used.
sjs.mu.Lock()
sjs.cluster.streams = make(map[string]map[string]*streamAssignment)
sjs.cluster.streams[globalAccountName] = make(map[string]*streamAssignment)
sjs.cluster.streams[globalAccountName]["TEST"] = sa
ca.Created = time.Time{}
ca.Group.node = n
ca.deleted = false
sjs.mu.Unlock()

// The underlying consumer has been deleted. Checking health must not recreate the consumer.
checkNodeIsClosed(ca)
sjs.isConsumerHealthy(mset, "CONSUMER", ca)
checkNodeIsClosed(ca)
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
Loading

0 comments on commit 158d7cd

Please sign in to comment.