diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 2cebbc5aa8..c5d67078a2 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -150,7 +150,7 @@ type consumerAssignment struct { Client *ClientInfo `json:"client,omitempty"` Created time.Time `json:"created"` Name string `json:"name"` - Stream string `json:"stream,omitempty"` // Not actually optional but empty in meta snapshots. + Stream string `json:"stream"` Config *ConsumerConfig `json:"consumer"` Group *raftGroup `json:"group"` Subject string `json:"subject,omitempty"` @@ -1556,7 +1556,7 @@ func (js *jetStream) metaSnapshot() []byte { continue } cca := *ca - cca.Stream = _EMPTY_ // Will be rehydrated from parent stream assignment on decoding. + cca.Stream = wsa.Config.Name // Needed for safe roll-backs. cca.Client = cca.Client.forAssignmentSnap() cca.Subject, cca.Reply = _EMPTY_, _EMPTY_ wsa.Consumers = append(wsa.Consumers, &cca) @@ -1615,7 +1615,9 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove if len(wsa.Consumers) > 0 { sa.consumers = make(map[string]*consumerAssignment) for _, ca := range wsa.Consumers { - ca.Stream = sa.Config.Name // Rehydrate from the stream name. + if ca.Stream == _EMPTY_ { + ca.Stream = sa.Config.Name // Rehydrate from the stream name. + } sa.consumers[ca.Name] = ca } }