Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
event: document select case slice use and add edge case test (#16680)
Browse files Browse the repository at this point in the history
Feed keeps active subscription channels in a slice called 'f.sendCases'.
The Send method tracks the active cases in a local variable 'cases'
whose value is f.sendCases initially. 'cases' shrinks to a shorter
prefix of f.sendCases every time a send succeeds, moving the successful
case out of range of the active case list.

This can be confusing because the two slices share a backing array. Add
more comments to document what is going on. Also add a test for removing
a case that is in 'f.sentCases' but not 'cases'.
  • Loading branch information
fjl authored and gbalint committed May 23, 2018
1 parent 2b2130b commit 7638956
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
5 changes: 4 additions & 1 deletion event/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ func (f *Feed) Send(value interface{}) (nsent int) {
f.sendCases[i].Send = rvalue
}

// Send until all channels except removeSub have been chosen.
// Send until all channels except removeSub have been chosen. 'cases' tracks a prefix
// of sendCases. When a send succeeds, the corresponding case moves to the end of
// 'cases' and it shrinks by one element.
cases := f.sendCases
for {
// Fast path: try sending without blocking before adding to the select set.
Expand All @@ -170,6 +172,7 @@ func (f *Feed) Send(value interface{}) (nsent int) {
index := f.sendCases.find(recv.Interface())
f.sendCases = f.sendCases.delete(index)
if index >= 0 && index < len(cases) {
// Shrink 'cases' too because the removed case was still active.
cases = f.sendCases[:len(cases)-1]
}
} else {
Expand Down
39 changes: 39 additions & 0 deletions event/feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,45 @@ func TestFeedUnsubscribeBlockedPost(t *testing.T) {
wg.Wait()
}

// Checks that unsubscribing a channel during Send works even if that
// channel has already been sent on.
func TestFeedUnsubscribeSentChan(t *testing.T) {
var (
feed Feed
ch1 = make(chan int)
ch2 = make(chan int)
sub1 = feed.Subscribe(ch1)
sub2 = feed.Subscribe(ch2)
wg sync.WaitGroup
)
defer sub2.Unsubscribe()

wg.Add(1)
go func() {
feed.Send(0)
wg.Done()
}()

// Wait for the value on ch1.
<-ch1
// Unsubscribe ch1, removing it from the send cases.
sub1.Unsubscribe()

// Receive ch2, finishing Send.
<-ch2
wg.Wait()

// Send again. This should send to ch2 only, so the wait group will unblock
// as soon as a value is received on ch2.
wg.Add(1)
go func() {
feed.Send(0)
wg.Done()
}()
<-ch2
wg.Wait()
}

func TestFeedUnsubscribeFromInbox(t *testing.T) {
var (
feed Feed
Expand Down

0 comments on commit 7638956

Please sign in to comment.