Skip to content

Commit

Permalink
Cherry-picks for 2.10.25-RC.2 (#6371)
Browse files Browse the repository at this point in the history
Includes the following:

- #6361
- #6362
- #6364
- #6367

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander authored Jan 13, 2025
2 parents 1f3b383 + c68bc8f commit 5365153
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 206 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/nats-io/nkeys v0.4.9
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.6.0
golang.org/x/crypto v0.31.0
golang.org/x/crypto v0.32.0
golang.org/x/sys v0.29.0
golang.org/x/time v0.9.0
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand Down
135 changes: 26 additions & 109 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,12 @@ func (s *Server) JetStreamSnapshotMeta() error {
return errNotLeader
}

return meta.InstallSnapshot(js.metaSnapshot())
snap, err := js.metaSnapshot()
if err != nil {
return err
}

return meta.InstallSnapshot(snap)
}

func (s *Server) JetStreamStepdownStream(account, stream string) error {
Expand Down Expand Up @@ -437,73 +442,6 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool {
return false
}

// Restart the stream in question.
// Should only be called when the stream is known to be in a bad state.
func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) {
js.mu.Lock()
s, cc := js.srv, js.cluster
if cc == nil {
js.mu.Unlock()
return
}
// Need to lookup the one directly from the meta layer, what we get handed is a copy if coming from isStreamHealthy.
asa := cc.streams[acc.Name]
if asa == nil {
js.mu.Unlock()
return
}
sa := asa[csa.Config.Name]
if sa == nil {
js.mu.Unlock()
return
}
// Make sure to clear out the raft node if still present in the meta layer.
if rg := sa.Group; rg != nil && rg.node != nil {
if rg.node.State() != Closed {
rg.node.Stop()
}
rg.node = nil
}
sinceCreation := time.Since(sa.Created)
js.mu.Unlock()

// Process stream assignment to recreate.
// Check that we have given system enough time to start us up.
// This will be longer than obvious, and matches consumer logic in case system very busy.
if sinceCreation < 10*time.Second {
s.Debugf("Not restarting missing stream '%s > %s', too soon since creation %v",
acc, csa.Config.Name, sinceCreation)
return
}

js.processStreamAssignment(sa)

// If we had consumers assigned to this server they will be present in the copy, csa.
// They also need to be processed. The csa consumers is a copy of only our consumers,
// those assigned to us, but the consumer assignment's there are direct from the meta
// layer to make this part much easier and avoid excessive lookups.
for _, cca := range csa.consumers {
if cca.deleted {
continue
}
// Need to look up original as well here to make sure node is nil.
js.mu.Lock()
ca := sa.consumers[cca.Name]
if ca != nil && ca.Group != nil {
// Make sure the node is stopped if still running.
if node := ca.Group.node; node != nil && node.State() != Closed {
node.Stop()
}
// Make sure node is wiped.
ca.Group.node = nil
}
js.mu.Unlock()
if ca != nil {
js.processConsumerAssignment(ca)
}
}
}

// isStreamHealthy will determine if the stream is up to date or very close.
// For R1 it will make sure the stream is present on this server.
func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
Expand All @@ -529,7 +467,6 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
// First lookup stream and make sure its there.
mset, err := acc.lookupStream(streamName)
if err != nil {
js.restartStream(acc, sa)
return false
}

Expand All @@ -554,8 +491,6 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName)
node.Delete()
mset.resetClusteredState(nil)
} else if node.State() == Closed {
js.restartStream(acc, sa)
}
}
return false
Expand Down Expand Up @@ -585,37 +520,9 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
node := ca.Group.node
js.mu.RUnlock()

// When we try to restart we nil out the node if applicable
// and reprocess the consumer assignment.
restartConsumer := func() {
mset.mu.RLock()
accName, streamName := mset.acc.GetName(), mset.cfg.Name
mset.mu.RUnlock()

js.mu.Lock()
deleted := ca.deleted
// Check that we have not just been created.
if !deleted && time.Since(ca.Created) < 10*time.Second {
s.Debugf("Not restarting missing consumer '%s > %s > %s', too soon since creation %v",
accName, streamName, consumer, time.Since(ca.Created))
js.mu.Unlock()
return
}
// Make sure the node is stopped if still running.
if node != nil && node.State() != Closed {
node.Stop()
}
ca.Group.node = nil
js.mu.Unlock()
if !deleted {
js.processConsumerAssignment(ca)
}
}

// Check if not running at all.
o := mset.lookupConsumer(consumer)
if o == nil {
restartConsumer()
return false
}

Expand All @@ -630,11 +537,12 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
s.Warnf("Detected consumer cluster node skew '%s > %s > %s'", accName, streamName, consumer)
node.Delete()
o.deleteWithoutAdvisory()
restartConsumer()
} else if node.State() == Closed {
// We have a consumer, and it should have a running node but it is closed.
o.stop()
restartConsumer()

// When we try to restart we nil out the node and reprocess the consumer assignment.
js.mu.Lock()
ca.Group.node = nil
js.mu.Unlock()
js.processConsumerAssignment(ca)
}
}
return false
Expand Down Expand Up @@ -1340,7 +1248,10 @@ func (js *jetStream) monitorCluster() {
}
// For the meta layer we want to snapshot when asked if we need one or have any entries that we can compact.
if ne, _ := n.Size(); ne > 0 || n.NeedSnapshot() {
if err := n.InstallSnapshot(js.metaSnapshot()); err == nil {
snap, err := js.metaSnapshot()
if err != nil {
s.Warnf("Error generating JetStream cluster snapshot: %v", err)
} else if err = n.InstallSnapshot(snap); err == nil {
lastSnapTime = time.Now()
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
Expand Down Expand Up @@ -1534,7 +1445,7 @@ func (js *jetStream) clusterStreamConfig(accName, streamName string) (StreamConf
return StreamConfig{}, false
}

func (js *jetStream) metaSnapshot() []byte {
func (js *jetStream) metaSnapshot() ([]byte, error) {
start := time.Now()
js.mu.RLock()
s := js.srv
Expand Down Expand Up @@ -1574,16 +1485,22 @@ func (js *jetStream) metaSnapshot() []byte {

if len(streams) == 0 {
js.mu.RUnlock()
return nil
return nil, nil
}

// Track how long it took to marshal the JSON
mstart := time.Now()
b, _ := json.Marshal(streams)
b, err := json.Marshal(streams)
mend := time.Since(mstart)

js.mu.RUnlock()

// Must not be possible for a JSON marshaling error to result
// in an empty snapshot.
if err != nil {
return nil, err
}

// Track how long it took to compress the JSON
cstart := time.Now()
snap := s2.Encode(nil, b)
Expand All @@ -1593,7 +1510,7 @@ func (js *jetStream) metaSnapshot() []byte {
s.rateLimitFormatWarnf("Metalayer snapshot took %.3fs (streams: %d, consumers: %d, marshal: %.3fs, s2: %.3fs, uncompressed: %d, compressed: %d)",
took.Seconds(), nsa, nca, mend.Seconds(), cend.Seconds(), len(b), len(snap))
}
return snap
return snap, nil
}

func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecovering bool) error {
Expand Down
Loading

0 comments on commit 5365153

Please sign in to comment.