Skip to content

Commit

Permalink
streaming: add simulateExistingSubscriber test helper
Browse files Browse the repository at this point in the history
  • Loading branch information
boxofrad committed Jan 28, 2022
1 parent 8337962 commit 4a78287
Showing 1 changed file with 21 additions and 36 deletions.
57 changes: 21 additions & 36 deletions agent/consul/stream/event_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,21 +238,10 @@ func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) {
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
go publisher.Run(ctx)

// Start a separate subscription that remains open throughout the entire test
// to prevent the buffer getting cleaned up because there aren't any subscribers.
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
defer sub.Unsubscribe()
simulateExistingSubscriber(t, publisher, req)

// Publish the testSnapshotEvent, to ensure that it is skipped over when
// splicing the topic buffer onto the snapshot.
//
// Note: we must evict the snapshot that was created by the above subscription
// first, as it has already had the topic buffer spliced onto it (so therefore
// would include the snapshot event again).
publisher.lock.Lock()
delete(publisher.snapCache, req.topicSubject())
publisher.lock.Unlock()
publisher.publishEvent([]Event{testSnapshotEvent})

runStep(t, "start a subscription and unsub", func(t *testing.T) {
Expand Down Expand Up @@ -358,21 +347,10 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
go publisher.Run(ctx)

// Start a separate subscription that remains open throughout the entire test
// to prevent the buffer getting cleaned up because there aren't any subscribers.
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
defer sub.Unsubscribe()
simulateExistingSubscriber(t, publisher, req)

// Publish the testSnapshotEvent, to ensure that it is skipped over when
// splicing the topic buffer onto the snapshot.
//
// Note: we must evict the snapshot that was created by the above subscription
// first, as it has already had the topic buffer spliced onto it (so therefore
// would include the snapshot event again).
publisher.lock.Lock()
delete(publisher.snapCache, req.topicSubject())
publisher.lock.Unlock()
publisher.publishEvent([]Event{testSnapshotEvent})

runStep(t, "start a subscription and unsub", func(t *testing.T) {
Expand Down Expand Up @@ -456,21 +434,10 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testi
publisher := NewEventPublisher(handlers, time.Second)
go publisher.Run(ctx)

// Start a separate subscription that remains open throughout the entire test
// to prevent the buffer getting cleaned up because there aren't any subscribers.
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
defer sub.Unsubscribe()
simulateExistingSubscriber(t, publisher, req)

// Publish the events, to ensure they are is skipped over when splicing the
// topic buffer onto the snapshot.
//
// Note: we must evict the snapshot that was created by the above subscription
// first, as it has already had the topic buffer spliced onto it (so therefore
// would include the events again).
publisher.lock.Lock()
delete(publisher.snapCache, req.topicSubject())
publisher.lock.Unlock()
publisher.publishEvent([]Event{testSnapshotEvent})
publisher.publishEvent([]Event{nextEvent})

Expand Down Expand Up @@ -583,3 +550,21 @@ func TestEventPublisher_Unsubscribe_FreesResourcesWhenThereAreNoSubscribers(t *t
require.Nil(t, publisher.snapCache[req.topicSubject()])
publisher.lock.Unlock()
}

// simulateExistingSubscriber creates a subscription that remains open throughout
// a test to prevent the topic buffer getting garbage-collected.
//
// It evicts the created snapshot from the cache immediately (simulating an
// existing subscription that has been open long enough the snapshot's TTL has
// been reached) so you can test snapshots getting created afresh.
func simulateExistingSubscriber(t *testing.T, p *EventPublisher, r *SubscribeRequest) {
t.Helper()

sub, err := p.Subscribe(r)
require.NoError(t, err)
t.Cleanup(sub.Unsubscribe)

p.lock.Lock()
delete(p.snapCache, r.topicSubject())
p.lock.Unlock()
}

0 comments on commit 4a78287

Please sign in to comment.