From 9531611feb086c9be59a9bbb581d4d0bd66657d7 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 11 Sep 2023 16:35:16 -0700 Subject: [PATCH] Add in utility to detect and delete any NRG orphans. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 73 +++++++++++++++++++++++++++++- server/jetstream_cluster_3_test.go | 50 +++++++++++++++++++- server/monitor.go | 3 +- 3 files changed, 123 insertions(+), 3 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 30a0384c7a..149b585dac 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1167,6 +1167,65 @@ func (js *jetStream) checkForOrphans() { } } +// Check and delete any orphans we may come across. +func (s *Server) checkForNRGOrphans() { + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil || js.isMetaRecovering() { + // No cluster means no NRGs. Also return if still recovering. + return + } + + // Track which assets R>1 should be on this server. + nrgMap := make(map[string]struct{}) + trackGroup := func(rg *raftGroup) { + // If R>1 track this as a legit NRG. + if rg.node != nil { + nrgMap[rg.Name] = struct{}{} + } + } + // Register our meta. + js.mu.RLock() + meta := cc.meta + if meta == nil { + js.mu.RUnlock() + // Bail with no meta node. + return + } + + ourID := meta.ID() + nrgMap[meta.Group()] = struct{}{} + + // Collect all valid groups from our assignments. + for _, asa := range cc.streams { + for _, sa := range asa { + if sa.Group.isMember(ourID) && sa.Restore == nil { + trackGroup(sa.Group) + for _, ca := range sa.consumers { + if ca.Group.isMember(ourID) { + trackGroup(ca.Group) + } + } + } + } + } + js.mu.RUnlock() + + // Check NRGs that are running. + var needDelete []RaftNode + s.rnMu.RLock() + for name, n := range s.raftNodes { + if _, ok := nrgMap[name]; !ok { + needDelete = append(needDelete, n) + } + } + s.rnMu.RUnlock() + + for _, n := range needDelete { + s.Warnf("Detected orphaned NRG %q, will cleanup", n.Group()) + n.Delete() + } +} + func (js *jetStream) monitorCluster() { s, n := js.server(), js.getMetaGroup() qch, rqch, lch, aq := js.clusterQuitC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ() @@ -1197,6 +1256,8 @@ func (js *jetStream) monitorCluster() { if hs := s.healthz(nil); hs.Error != _EMPTY_ { s.Warnf("%v", hs.Error) } + // Also check for orphaned NRGs. + s.checkForNRGOrphans() } var ( @@ -1277,7 +1338,6 @@ func (js *jetStream) monitorCluster() { go checkHealth() continue } - // FIXME(dlc) - Deal with errors. if didSnap, didStreamRemoval, didConsumerRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil { _, nb := n.Applied(ce.Index) if js.hasPeerEntries(ce.Entries) || didStreamRemoval || (didSnap && !isLeader) { @@ -1288,6 +1348,8 @@ func (js *jetStream) monitorCluster() { doSnapshot() } ce.ReturnToPool() + } else { + s.Warnf("Error applying JetStream cluster entries: %v", err) } } aq.recycle(&ces) @@ -2037,6 +2099,15 @@ func (mset *stream) removeNode() { } } +func (mset *stream) clearRaftNode() { + if mset == nil { + return + } + mset.mu.Lock() + defer mset.mu.Unlock() + mset.node = nil +} + // Helper function to generate peer info. // lists and sets for old and new. func genPeerInfo(peers []string, split int) (newPeers, oldPeers []string, newPeerSet, oldPeerSet map[string]bool) { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 5d559faceb..629f797661 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -5555,7 +5555,7 @@ func TestJetStreamClusterCheckFileStoreBlkSizes(t *testing.T) { nc, js := jsClientConnect(t, c.randomServer()) defer nc.Close() - // Nowmal Stream + // Normal Stream _, err := js.AddStream(&nats.StreamConfig{ Name: "TEST", Subjects: []string{"*"}, @@ -5634,3 +5634,51 @@ func TestJetStreamClusterCheckFileStoreBlkSizes(t *testing.T) { require_True(t, blkSize(fs) == defaultMediumBlockSize) } } + +func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Normal Stream + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "DC", + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + // We will force an orphan for a certain server. + s := c.randomNonStreamLeader(globalAccountName, "TEST") + + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + sgn := mset.raftNode().Group() + mset.clearRaftNode() + + o := mset.lookupConsumer("DC") + require_True(t, o != nil) + ogn := o.raftNode().Group() + o.clearRaftNode() + + require_NoError(t, js.DeleteStream("TEST")) + + // Check that we do in fact have orphans. + require_True(t, s.numRaftNodes() > 1) + + // This function will detect orphans and clean them up. + s.checkForNRGOrphans() + + // Should only be meta NRG left. + require_True(t, s.numRaftNodes() == 1) + require_True(t, s.lookupRaftNode(sgn) == nil) + require_True(t, s.lookupRaftNode(ogn) == nil) +} diff --git a/server/monitor.go b/server/monitor.go index 5884c1e86e..eb6301da4b 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -3225,7 +3225,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { } details := opts.Details defer func() { - // for response with details enabled, ses status to either "error" or "ok" + // for response with details enabled, set status to either "error" or "ok" if details { if len(health.Errors) != 0 { health.Status = "error" @@ -3492,6 +3492,7 @@ func (s *Server) healthz(opts *HealthzOptions) *HealthStatus { var streams map[string]map[string]*streamAssignment js.mu.RLock() if opts.Account == _EMPTY_ { + // Collect all relevant streams and consumers. streams = make(map[string]map[string]*streamAssignment, len(cc.streams)) for acc, asa := range cc.streams { nasa := make(map[string]*streamAssignment)