Skip to content

Commit

Permalink
Merge pull request #8975 from hashicorp/dnephin/stream-close-on-unsub
Browse files Browse the repository at this point in the history
stream: close the subscription on Unsubscribe
  • Loading branch information
dnephin authored Oct 23, 2020
2 parents 9c04cbc + fb57d9b commit 8bd1a2c
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 37 deletions.
9 changes: 5 additions & 4 deletions agent/consul/state/store_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {

// Ensure the reset event was sent.
err = assertErr(t, eventCh)
require.Equal(stream.ErrSubscriptionClosed, err)
require.Equal(stream.ErrSubForceClosed, err)

// Register another subscription.
subscription2 := &stream.SubscribeRequest{
Expand Down Expand Up @@ -93,7 +93,7 @@ func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {

// Ensure the reset event was sent.
err = assertErr(t, eventCh2)
require.Equal(stream.ErrSubscriptionClosed, err)
require.Equal(stream.ErrSubForceClosed, err)
}

func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
Expand Down Expand Up @@ -162,6 +162,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {
}
sub, err = publisher.Subscribe(subscription2)
require.NoError(err)
defer sub.Unsubscribe()

eventCh = testRunSub(sub)

Expand All @@ -180,7 +181,7 @@ func TestStore_IntegrationWithEventPublisher_ACLPolicyUpdate(t *testing.T) {

// Ensure the reload event was sent.
err = assertErr(t, eventCh)
require.Equal(stream.ErrSubscriptionClosed, err)
require.Equal(stream.ErrSubForceClosed, err)

// Register another subscription.
subscription3 := &stream.SubscribeRequest{
Expand Down Expand Up @@ -367,7 +368,7 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
}
}
require.Error(t, next.Err)
require.Equal(t, stream.ErrSubscriptionClosed, next.Err)
require.Equal(t, stream.ErrSubForceClosed, next.Err)
return
case <-time.After(100 * time.Millisecond):
t.Fatalf("no err after 100ms")
Expand Down
4 changes: 2 additions & 2 deletions agent/consul/stream/event_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,13 @@ func newBufferItem(events []Event) *bufferItem {

// Next return the next buffer item in the buffer. It may block until ctx is
// cancelled or until the next item is published.
func (i *bufferItem) Next(ctx context.Context, forceClose <-chan struct{}) (*bufferItem, error) {
func (i *bufferItem) Next(ctx context.Context, closed <-chan struct{}) (*bufferItem, error) {
// See if there is already a next value, block if so. Note we don't rely on
// state change (chan nil) as that's not threadsafe but detecting close is.
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-forceClose:
case <-closed:
return nil, fmt.Errorf("subscription closed")
case <-i.link.ch:
}
Expand Down
33 changes: 29 additions & 4 deletions agent/consul/stream/event_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestEventPublisher_SubscribeWithIndex0(t *testing.T) {

sub, err := publisher.Subscribe(req)
require.NoError(t, err)
defer sub.Unsubscribe()
eventCh := runSubscription(ctx, sub)

next := getNextEvent(t, eventCh)
Expand Down Expand Up @@ -141,10 +142,10 @@ func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) {
cancel() // Shutdown

err = consumeSub(context.Background(), sub1)
require.Equal(t, err, ErrSubscriptionClosed)
require.Equal(t, err, ErrSubForceClosed)

_, err = sub2.Next(context.Background())
require.Equal(t, err, ErrSubscriptionClosed)
require.Equal(t, err, ErrSubForceClosed)
}

func consumeSub(ctx context.Context, sub *Subscription) error {
Expand All @@ -169,14 +170,15 @@ func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) {

publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
go publisher.Run(ctx)
_, err := publisher.Subscribe(req)
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
sub.Unsubscribe()

publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) {
return 0, fmt.Errorf("error should not be seen, cache should have been used")
}

sub, err := publisher.Subscribe(req)
sub, err = publisher.Subscribe(req)
require.NoError(t, err)

eventCh := runSubscription(ctx, sub)
Expand Down Expand Up @@ -357,6 +359,7 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
newReq.Index = 1
sub, err := publisher.Subscribe(&newReq)
require.NoError(t, err)
defer sub.Unsubscribe()

eventCh := runSubscription(ctx, sub)
next := getNextEvent(t, eventCh)
Expand All @@ -379,3 +382,25 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.FailNow()
}
}

func TestEventPublisher_Unsubscribe_ClosesSubscription(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)

sub, err := publisher.Subscribe(req)
require.NoError(t, err)

_, err = sub.Next(ctx)
require.NoError(t, err)

sub.Unsubscribe()
_, err = sub.Next(ctx)
require.Error(t, err)
require.Contains(t, err.Error(), "subscription was closed by unsubscribe")
}
68 changes: 43 additions & 25 deletions agent/consul/stream/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,33 @@ package stream
import (
"context"
"errors"
"fmt"
"sync/atomic"
)

const (
// subscriptionStateOpen is the default state of a subscription. An open
// subscription may receive new events.
subscriptionStateOpen uint32 = 0
// subStateOpen is the default state of a subscription. An open subscription
// may return new events.
subStateOpen = 0

// subscriptionStateClosed indicates that the subscription was closed, possibly
// as a result of a change to an ACL token, and will not receive new events.
// subStateForceClosed indicates the subscription was forced closed by
// the EventPublisher, possibly as a result of a change to an ACL token, and
// will not return new events.
// The subscriber must issue a new Subscribe request.
subscriptionStateClosed uint32 = 1
subStateForceClosed = 1

// subStateUnsub indicates the subscription was closed by the caller, and
// will not return new events.
subStateUnsub = 2
)

// ErrSubscriptionClosed is a error signalling the subscription has been
// ErrSubForceClosed is a error signalling the subscription has been
// closed. The client should Unsubscribe, then re-Subscribe.
var ErrSubscriptionClosed = errors.New("subscription closed by server, client must reset state and resubscribe")
var ErrSubForceClosed = errors.New("subscription closed by server, client must reset state and resubscribe")

// Subscription provides events on a Topic. Events may be filtered by Key.
// Events are returned by Next(), and may start with a Snapshot of events.
type Subscription struct {
// state is accessed atomically 0 means open, 1 means closed with reload
state uint32

// req is the requests that we are responding to
Expand All @@ -34,9 +39,9 @@ type Subscription struct {
// is mutated by calls to Next.
currentItem *bufferItem

// forceClosed is closed when forceClose is called. It is used by
// EventPublisher to cancel Next().
forceClosed chan struct{}
// closed is a channel which is closed when the subscription is closed. It
// is used to exit the blocking select.
closed chan struct{}

// unsub is a function set by EventPublisher that is called to free resources
// when the subscription is no longer needed.
Expand All @@ -58,7 +63,7 @@ type SubscribeRequest struct {
// calling Unsubscribe when it is done with the subscription, to free resources.
func newSubscription(req SubscribeRequest, item *bufferItem, unsub func()) *Subscription {
return &Subscription{
forceClosed: make(chan struct{}),
closed: make(chan struct{}),
req: req,
currentItem: item,
unsub: unsub,
Expand All @@ -68,16 +73,16 @@ func newSubscription(req SubscribeRequest, item *bufferItem, unsub func()) *Subs
// Next returns the next Event to deliver. It must only be called from a
// single goroutine concurrently as it mutates the Subscription.
func (s *Subscription) Next(ctx context.Context) (Event, error) {
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
return Event{}, ErrSubscriptionClosed
}

for {
next, err := s.currentItem.Next(ctx, s.forceClosed)
switch {
case err != nil && atomic.LoadUint32(&s.state) == subscriptionStateClosed:
return Event{}, ErrSubscriptionClosed
case err != nil:
if err := s.requireStateOpen(); err != nil {
return Event{}, err
}

next, err := s.currentItem.Next(ctx, s.closed)
if err := s.requireStateOpen(); err != nil {
return Event{}, err
}
if err != nil {
return Event{}, err
}
s.currentItem = next
Expand All @@ -92,6 +97,17 @@ func (s *Subscription) Next(ctx context.Context) (Event, error) {
}
}

func (s *Subscription) requireStateOpen() error {
switch atomic.LoadUint32(&s.state) {
case subStateForceClosed:
return ErrSubForceClosed
case subStateUnsub:
return fmt.Errorf("subscription was closed by unsubscribe")
default:
return nil
}
}

func newEventFromBatch(req SubscribeRequest, events []Event) Event {
first := events[0]
if len(events) == 1 {
Expand Down Expand Up @@ -121,13 +137,15 @@ func filterByKey(req SubscribeRequest, events []Event) (Event, bool) {
// and will need to perform a new Subscribe request.
// It is safe to call from any goroutine.
func (s *Subscription) forceClose() {
swapped := atomic.CompareAndSwapUint32(&s.state, subscriptionStateOpen, subscriptionStateClosed)
if swapped {
close(s.forceClosed)
if atomic.CompareAndSwapUint32(&s.state, subStateOpen, subStateForceClosed) {
close(s.closed)
}
}

// Unsubscribe the subscription, freeing resources.
func (s *Subscription) Unsubscribe() {
if atomic.CompareAndSwapUint32(&s.state, subStateOpen, subStateUnsub) {
close(s.closed)
}
s.unsub()
}
2 changes: 1 addition & 1 deletion agent/consul/stream/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestSubscription_Close(t *testing.T) {
_, err = sub.Next(ctx)
elapsed = time.Since(start)
require.Error(t, err)
require.Equal(t, ErrSubscriptionClosed, err)
require.Equal(t, ErrSubForceClosed, err)
require.True(t, elapsed > 200*time.Millisecond,
"Reload should have happened after blocking 200ms, took %s", elapsed)
require.True(t, elapsed < 2*time.Second,
Expand Down
2 changes: 1 addition & 1 deletion agent/rpc/subscribe/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
for {
event, err := sub.Next(ctx)
switch {
case errors.Is(err, stream.ErrSubscriptionClosed):
case errors.Is(err, stream.ErrSubForceClosed):
logger.Trace("subscription reset by server")
return status.Error(codes.Aborted, err.Error())
case err != nil:
Expand Down

0 comments on commit 8bd1a2c

Please sign in to comment.