diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index acf42fc694..c68a910885 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -138,10 +138,11 @@ type streamAssignment struct { Reply string `json:"reply"` Restore *StreamState `json:"restore_state,omitempty"` // Internal - consumers map[string]*consumerAssignment - responded bool - recovering bool - err error + consumers map[string]*consumerAssignment + responded bool + recovering bool + reassigning bool // i.e. due to placement issues, lack of resources, etc. + err error } // consumerAssignment is what the meta controller uses to assign consumers to streams. @@ -5487,8 +5488,7 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client // then we will do the proper thing. Otherwise will be a no-op. cc.removeInflightProposal(result.Account, result.Stream) - // FIXME(dlc) - suppress duplicates? - if sa := js.streamAssignment(result.Account, result.Stream); sa != nil { + if sa := js.streamAssignment(result.Account, result.Stream); sa != nil && !sa.reassigning { canDelete := !result.Update && time.Since(sa.Created) < 5*time.Second // See if we should retry in case this cluster is full but there are others. @@ -5514,6 +5514,10 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client // Propose new. sa.Group, sa.err = rg, nil cc.meta.Propose(encodeAddStreamAssignment(sa)) + // When the new stream assignment is processed, sa.reassigning will be + // automatically set back to false. Until then, don't process any more + // assignment results. + sa.reassigning = true return } }