Skip to content

Commit

Permalink
Don't process multiple stream assignment responses for the same assig…
Browse files Browse the repository at this point in the history
…nment

If we are reassigning a stream assignment to a different placement, i.e.
due to insufficient resources or another placement error, then don't
process any further stream assignment results until the new assignment
is processed. Otherwise we might generate multiple new assignments
unnecessarily which could delete/recreate/update the stream multiple
times, potentially on different peer sets.

This should also de-flake `TestJetStreamSuperClusterConcurrentOverflow`.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Nov 13, 2024
1 parent e1b2f5d commit cdd41b5
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,11 @@ type streamAssignment struct {
Reply string `json:"reply"`
Restore *StreamState `json:"restore_state,omitempty"`
// Internal
consumers map[string]*consumerAssignment
responded bool
recovering bool
err error
consumers map[string]*consumerAssignment
responded bool
recovering bool
reassigning bool // i.e. due to placement issues, lack of resources, etc.
err error
}

// consumerAssignment is what the meta controller uses to assign consumers to streams.
Expand Down Expand Up @@ -5487,8 +5488,7 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client
// then we will do the proper thing. Otherwise will be a no-op.
cc.removeInflightProposal(result.Account, result.Stream)

// FIXME(dlc) - suppress duplicates?
if sa := js.streamAssignment(result.Account, result.Stream); sa != nil {
if sa := js.streamAssignment(result.Account, result.Stream); sa != nil && !sa.reassigning {
canDelete := !result.Update && time.Since(sa.Created) < 5*time.Second

// See if we should retry in case this cluster is full but there are others.
Expand All @@ -5514,6 +5514,10 @@ func (js *jetStream) processStreamAssignmentResults(sub *subscription, c *client
// Propose new.
sa.Group, sa.err = rg, nil
cc.meta.Propose(encodeAddStreamAssignment(sa))
// When the new stream assignment is processed, sa.reassigning will be
// automatically set back to false. Until then, don't process any more
// assignment results.
sa.reassigning = true
return
}
}
Expand Down

0 comments on commit cdd41b5

Please sign in to comment.