Skip to content

Commit

Permalink
stream: fix a snapshot cache bug
Browse files Browse the repository at this point in the history
Previously a snapshot created as part of a resumse-stream request could have incorrectly
cached the newSnapshotToFollow event. This would cause clients to error because they
received an unexpected framing event.
  • Loading branch information
dnephin committed Feb 16, 2021
1 parent 9b3c6da commit ba3a1b9
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 18 deletions.
4 changes: 4 additions & 0 deletions .changelog/9772.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
```release-note:bug
streaming: fixes a bug caused by caching an incorrect snapshot, that would cause clients
to error until the cache expired.
```
35 changes: 17 additions & 18 deletions agent/consul/stream/event_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,28 +176,27 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
}

snapFromCache := e.getCachedSnapshotLocked(req)
if req.Index == 0 && snapFromCache != nil {
return e.subscriptions.add(req, snapFromCache.First), nil
if snapFromCache == nil {
snap := newEventSnapshot()
snap.appendAndSplice(*req, handler, topicHead)
e.setCachedSnapshotLocked(req, snap)
snapFromCache = snap
}
snap := newEventSnapshot()

// if the request has an Index the client view is stale and must be reset
// with a NewSnapshotToFollow event.
if req.Index > 0 {
snap.buffer.Append([]Event{{
Topic: req.Topic,
Payload: newSnapshotToFollow{},
}})

if snapFromCache != nil {
snap.buffer.AppendItem(snapFromCache.First)
return e.subscriptions.add(req, snap.First), nil
}
// If the request.Index is 0 the client has no view, send a full snapshot.
if req.Index == 0 {
return e.subscriptions.add(req, snapFromCache.First), nil
}

snap.appendAndSplice(*req, handler, topicHead)
e.setCachedSnapshotLocked(req, snap)
return e.subscriptions.add(req, snap.First), nil
// otherwise the request has an Index, the client view is stale and must be reset
// with a NewSnapshotToFollow event.
result := newEventSnapshot()
result.buffer.Append([]Event{{
Topic: req.Topic,
Payload: newSnapshotToFollow{},
}})
result.buffer.AppendItem(snapFromCache.First)
return e.subscriptions.add(req, result.First), nil
}

func (s *subscriptions) add(req *SubscribeRequest, head *bufferItem) *Subscription {
Expand Down

0 comments on commit ba3a1b9

Please sign in to comment.