From 3c0afd22bdf420dda8488d019c0de06a449b59b8 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 7 Nov 2024 15:05:44 +0100 Subject: [PATCH] [FIXED] Ghost consumers after failed meta proposal Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 29 +++++++++++++++++------------ server/jetstream_cluster_2_test.go | 6 ++++++ 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 5eb6d9fa97..5a4dcc5bcd 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -138,10 +138,11 @@ type streamAssignment struct { Reply string `json:"reply"` Restore *StreamState `json:"restore_state,omitempty"` // Internal - consumers map[string]*consumerAssignment - responded bool - recovering bool - err error + consumers map[string]*consumerAssignment + pendingConsumers map[string]struct{} + responded bool + recovering bool + err error } // consumerAssignment is what the meta controller uses to assign consumers to streams. @@ -4260,6 +4261,10 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { // Place into our internal map under the stream assignment. // Ok to replace an existing one, we check on process call below. sa.consumers[ca.Name] = ca + delete(sa.pendingConsumers, ca.Name) + if len(sa.pendingConsumers) == 0 { + sa.pendingConsumers = nil + } js.mu.Unlock() acc, err := s.LookupAccount(accName) @@ -7411,7 +7416,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec } if maxc > 0 { // Don't count DIRECTS. - total := 0 + total := len(sa.pendingConsumers) for cn, ca := range sa.consumers { if action == ActionCreateOrUpdate { // If the consumer name is specified and we think it already exists, then @@ -7669,14 +7674,14 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec ca = nca } - // Mark this as pending. - if sa.consumers == nil { - sa.consumers = make(map[string]*consumerAssignment) - } - sa.consumers[ca.Name] = ca - // Do formal proposal. - cc.meta.Propose(encodeAddConsumerAssignment(ca)) + if err := cc.meta.Propose(encodeAddConsumerAssignment(ca)); err == nil { + // Mark this as pending. + if sa.pendingConsumers == nil { + sa.pendingConsumers = make(map[string]struct{}) + } + sa.pendingConsumers[ca.Name] = struct{}{} + } } func encodeAddConsumerAssignment(ca *consumerAssignment) []byte { diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 338e58e9f5..2b0364444f 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -2072,6 +2072,12 @@ func TestJetStreamClusterMaxConsumersMultipleConcurrentRequests(t *testing.T) { if nc := len(names); nc > 1 { t.Fatalf("Expected only 1 consumer, got %d", nc) } + + metaLeader := c.leader() + mjs := metaLeader.getJetStream() + sa := mjs.streamAssignment(globalAccountName, "MAXCC") + require_NotNil(t, sa) + require_True(t, sa.pendingConsumers == nil) } func TestJetStreamClusterAccountMaxStreamsAndConsumersMultipleConcurrentRequests(t *testing.T) {