From cdd41b5560eae0152cf343e5c76df717601220d3 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 13 Nov 2024 14:01:19 +0000 Subject: [PATCH] Don't process multiple stream assignment responses for the same assignment If we are reassigning a stream assignment to a different placement, i.e. due to insufficient resources or another placement error, then don't process any further stream assignment results until the new assignment is processed. Otherwise we might generate multiple new assignments unnecessarily which could delete/recreate/update the stream multiple times, potentially on different peer sets. This should also de-flake `TestJetStreamSuperClusterConcurrentOverflow`. Signed-off-by: Neil Twigg --- server/jetstream_cluster.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index acf42fc694..c68a910885 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -138,10 +138,11 @@ type streamAssignment struct { Reply string `json:"reply"` Restore *StreamState `json:"restore_state,omitempty"` // Internal - consumers map[string]*consumerAssignment - responded bool - recovering bool - err error + consumers map[string]*consumerAssignment + responded bool + recovering bool + reassigning bool // i.e. due to placement issues, lack of resources, etc. + err error } // consumerAssignment is what the meta controller uses to assign consumers to streams. @@ -5487,8 +5488,7 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client // then we will do the proper thing. Otherwise will be a no-op. cc.removeInflightProposal(result.Account, result.Stream) - // FIXME(dlc) - suppress duplicates? - if sa := js.streamAssignment(result.Account, result.Stream); sa != nil { + if sa := js.streamAssignment(result.Account, result.Stream); sa != nil && !sa.reassigning { canDelete := !result.Update && time.Since(sa.Created) < 5*time.Second // See if we should retry in case this cluster is full but there are others. @@ -5514,6 +5514,10 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client // Propose new. sa.Group, sa.err = rg, nil cc.meta.Propose(encodeAddStreamAssignment(sa)) + // When the new stream assignment is processed, sa.reassigning will be + // automatically set back to false. Until then, don't process any more + // assignment results. + sa.reassigning = true return } }