Skip to content

Commit

Permalink
Re-add "stream" to consumer assignments
Browse files Browse the repository at this point in the history
This was changed in #6189 but we need it to be in the snapshot after all
in order to be able to safely roll back to earlier versions.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Dec 3, 2024
1 parent 529eeb2 commit 53cafd8
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ type consumerAssignment struct {
Client *ClientInfo `json:"client,omitempty"`
Created time.Time `json:"created"`
Name string `json:"name"`
Stream string `json:"stream,omitempty"` // Not actually optional but empty in meta snapshots.
Stream string `json:"stream"`
Config *ConsumerConfig `json:"consumer"`
Group *raftGroup `json:"group"`
Subject string `json:"subject,omitempty"`
Expand Down Expand Up @@ -1556,7 +1556,7 @@ func (js *jetStream) metaSnapshot() []byte {
continue
}
cca := *ca
cca.Stream = _EMPTY_ // Will be rehydrated from parent stream assignment on decoding.
cca.Stream = wsa.Config.Name // Needed for safe roll-backs.
cca.Client = cca.Client.forAssignmentSnap()
cca.Subject, cca.Reply = _EMPTY_, _EMPTY_
wsa.Consumers = append(wsa.Consumers, &cca)
Expand Down Expand Up @@ -1615,7 +1615,9 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
if len(wsa.Consumers) > 0 {
sa.consumers = make(map[string]*consumerAssignment)
for _, ca := range wsa.Consumers {
ca.Stream = sa.Config.Name // Rehydrate from the stream name.
if ca.Stream == _EMPTY_ {
ca.Stream = sa.Config.Name // Rehydrate from the stream name.
}
sa.consumers[ca.Name] = ca
}
}
Expand Down

0 comments on commit 53cafd8

Please sign in to comment.