From 4a78287c66a27a84b3117f75444ec500b8b94806 Mon Sep 17 00:00:00 2001 From: Daniel Upton Date: Fri, 28 Jan 2022 11:59:31 +0000 Subject: [PATCH] streaming: add `simulateExistingSubscriber` test helper --- agent/consul/stream/event_publisher_test.go | 57 ++++++++------------- 1 file changed, 21 insertions(+), 36 deletions(-) diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index 81ba8f3aeec7..f90af0b1b0f9 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -238,21 +238,10 @@ func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) { publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second) go publisher.Run(ctx) - // Start a separate subscription that remains open throughout the entire test - // to prevent the buffer getting cleaned up because there aren't any subscribers. - sub, err := publisher.Subscribe(req) - require.NoError(t, err) - defer sub.Unsubscribe() + simulateExistingSubscriber(t, publisher, req) // Publish the testSnapshotEvent, to ensure that it is skipped over when // splicing the topic buffer onto the snapshot. - // - // Note: we must evict the snapshot that was created by the above subscription - // first, as it has already had the topic buffer spliced onto it (so therefore - // would include the snapshot event again). - publisher.lock.Lock() - delete(publisher.snapCache, req.topicSubject()) - publisher.lock.Unlock() publisher.publishEvent([]Event{testSnapshotEvent}) runStep(t, "start a subscription and unsub", func(t *testing.T) { @@ -358,21 +347,10 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second) go publisher.Run(ctx) - // Start a separate subscription that remains open throughout the entire test - // to prevent the buffer getting cleaned up because there aren't any subscribers. - sub, err := publisher.Subscribe(req) - require.NoError(t, err) - defer sub.Unsubscribe() + simulateExistingSubscriber(t, publisher, req) // Publish the testSnapshotEvent, to ensure that it is skipped over when // splicing the topic buffer onto the snapshot. - // - // Note: we must evict the snapshot that was created by the above subscription - // first, as it has already had the topic buffer spliced onto it (so therefore - // would include the snapshot event again). - publisher.lock.Lock() - delete(publisher.snapCache, req.topicSubject()) - publisher.lock.Unlock() publisher.publishEvent([]Event{testSnapshotEvent}) runStep(t, "start a subscription and unsub", func(t *testing.T) { @@ -456,21 +434,10 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testi publisher := NewEventPublisher(handlers, time.Second) go publisher.Run(ctx) - // Start a separate subscription that remains open throughout the entire test - // to prevent the buffer getting cleaned up because there aren't any subscribers. - sub, err := publisher.Subscribe(req) - require.NoError(t, err) - defer sub.Unsubscribe() + simulateExistingSubscriber(t, publisher, req) // Publish the events, to ensure they are is skipped over when splicing the // topic buffer onto the snapshot. - // - // Note: we must evict the snapshot that was created by the above subscription - // first, as it has already had the topic buffer spliced onto it (so therefore - // would include the events again). - publisher.lock.Lock() - delete(publisher.snapCache, req.topicSubject()) - publisher.lock.Unlock() publisher.publishEvent([]Event{testSnapshotEvent}) publisher.publishEvent([]Event{nextEvent}) @@ -583,3 +550,21 @@ func TestEventPublisher_Unsubscribe_FreesResourcesWhenThereAreNoSubscribers(t *t require.Nil(t, publisher.snapCache[req.topicSubject()]) publisher.lock.Unlock() } + +// simulateExistingSubscriber creates a subscription that remains open throughout +// a test to prevent the topic buffer getting garbage-collected. +// +// It evicts the created snapshot from the cache immediately (simulating an +// existing subscription that has been open long enough the snapshot's TTL has +// been reached) so you can test snapshots getting created afresh. +func simulateExistingSubscriber(t *testing.T, p *EventPublisher, r *SubscribeRequest) { + t.Helper() + + sub, err := p.Subscribe(r) + require.NoError(t, err) + t.Cleanup(sub.Unsubscribe) + + p.lock.Lock() + delete(p.snapCache, r.topicSubject()) + p.lock.Unlock() +}