diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..6bd296c --- /dev/null +++ b/go.mod @@ -0,0 +1,11 @@ +module github.com/whyrusleeping/pubsub + +go 1.17 + +require github.com/stretchr/testify v1.7.1 + +require ( + github.com/davecgh/go-spew v1.1.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..2dca7c9 --- /dev/null +++ b/go.sum @@ -0,0 +1,11 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pubsub.go b/pubsub.go index 9cbf9cf..5b432e9 100644 --- a/pubsub.go +++ b/pubsub.go @@ -74,13 +74,24 @@ func (ps *PubSub) Pub(msg interface{}, topics ...string) { // Unsub unsubscribes the given channel from the specified // topics. If no topic is specified, it is unsubscribed -// from all topics. +// from all topics and the channel is drained. +// +// If topics _are_ specified, it is up to the caller to keep draining the subscription channel. func (ps *PubSub) Unsub(ch chan interface{}, topics ...string) { if len(topics) == 0 { - ps.cmdChan <- cmd{op: unsubAll, ch: ch} - return + // Keep trying to unsubscribe while also draining the channel to prevent any + // deadlocks. + for { + select { + case ps.cmdChan <- cmd{op: unsubAll, ch: ch}: + // Drain the sub channel. + for range ch { + } + return + case <-ch: + } + } } - ps.cmdChan <- cmd{op: unsub, topics: topics, ch: ch} } diff --git a/pubsub_test.go b/pubsub_test.go index 64c1901..250dd44 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -4,109 +4,107 @@ package pubsub -/* import ( - check "launchpad.net/gocheck" "runtime" "testing" "time" -) - -var _ = check.Suite(new(Suite)) - -func Test(t *testing.T) { - check.TestingT(t) -} -type Suite struct{} + "github.com/stretchr/testify/require" +) -func (s *Suite) TestSub(c *check.C) { +func TestSub(t *testing.T) { ps := New(1) ch1 := ps.Sub("t1") ch2 := ps.Sub("t1") ch3 := ps.Sub("t2") ps.Pub("hi", "t1") - c.Check(<-ch1, check.Equals, "hi") - c.Check(<-ch2, check.Equals, "hi") + require.Equal(t, <-ch1, "hi") + require.Equal(t, <-ch2, "hi") ps.Pub("hello", "t2") - c.Check(<-ch3, check.Equals, "hello") + require.Equal(t, <-ch3, "hello") ps.Shutdown() _, ok := <-ch1 - c.Check(ok, check.Equals, false) + require.Equal(t, ok, false) _, ok = <-ch2 - c.Check(ok, check.Equals, false) + require.Equal(t, ok, false) _, ok = <-ch3 - c.Check(ok, check.Equals, false) + require.Equal(t, ok, false) } -func (s *Suite) TestSubOnce(c *check.C) { +func TestSubOnce(t *testing.T) { ps := New(1) ch := ps.SubOnce("t1") ps.Pub("hi", "t1") - c.Check(<-ch, check.Equals, "hi") + require.Equal(t, <-ch, "hi") _, ok := <-ch - c.Check(ok, check.Equals, false) + require.Equal(t, ok, false) ps.Shutdown() } -func (s *Suite) TestAddSub(c *check.C) { +func TestAddSub(t *testing.T) { ps := New(1) ch1 := ps.Sub("t1") ch2 := ps.Sub("t2") ps.Pub("hi1", "t1") - c.Check(<-ch1, check.Equals, "hi1") + require.Equal(t, <-ch1, "hi1") ps.Pub("hi2", "t2") - c.Check(<-ch2, check.Equals, "hi2") + require.Equal(t, <-ch2, "hi2") ps.AddSub(ch1, "t2", "t3") ps.Pub("hi3", "t2") - c.Check(<-ch1, check.Equals, "hi3") - c.Check(<-ch2, check.Equals, "hi3") + require.Equal(t, <-ch1, "hi3") + require.Equal(t, <-ch2, "hi3") ps.Pub("hi4", "t3") - c.Check(<-ch1, check.Equals, "hi4") + require.Equal(t, <-ch1, "hi4") ps.Shutdown() } -func (s *Suite) TestUnsub(c *check.C) { +func TestUnsub(t *testing.T) { ps := New(1) ch := ps.Sub("t1") ps.Pub("hi", "t1") - c.Check(<-ch, check.Equals, "hi") + require.Equal(t, <-ch, "hi") ps.Unsub(ch, "t1") _, ok := <-ch - c.Check(ok, check.Equals, false) + require.Equal(t, ok, false) ps.Shutdown() } -func (s *Suite) TestUnsubAll(c *check.C) { +func TestUnsubAll(t *testing.T) { ps := New(1) ch1 := ps.Sub("t1", "t2", "t3") ch2 := ps.Sub("t1", "t3") ps.Unsub(ch1) - m, ok := <-ch1 - c.Check(ok, check.Equals, false) + _, ok := <-ch1 + require.Equal(t, ok, false) ps.Pub("hi", "t1") - m, ok = <-ch2 - c.Check(m, check.Equals, "hi") + m := <-ch2 + require.Equal(t, m, "hi") + + ps.Pub("bye", "t1") + ps.Unsub(ch2) + _, ok = <-ch2 + // unsubscribing all should drain channel. + require.Equal(t, ok, false) ps.Shutdown() } -func (s *Suite) TestClose(c *check.C) { +func TestClose(t *testing.T) { ps := New(1) ch1 := ps.Sub("t1") ch2 := ps.Sub("t1") @@ -115,25 +113,25 @@ func (s *Suite) TestClose(c *check.C) { ps.Pub("hi", "t1") ps.Pub("hello", "t2") - c.Check(<-ch1, check.Equals, "hi") - c.Check(<-ch2, check.Equals, "hi") - c.Check(<-ch3, check.Equals, "hello") + require.Equal(t, <-ch1, "hi") + require.Equal(t, <-ch2, "hi") + require.Equal(t, <-ch3, "hello") ps.Close("t1", "t2") _, ok := <-ch1 - c.Check(ok, check.Equals, false) + require.Equal(t, ok, false) _, ok = <-ch2 - c.Check(ok, check.Equals, false) + require.Equal(t, ok, false) _, ok = <-ch3 - c.Check(ok, check.Equals, false) + require.Equal(t, ok, false) ps.Pub("welcome", "t3") - c.Check(<-ch4, check.Equals, "welcome") + require.Equal(t, <-ch4, "welcome") ps.Shutdown() } -func (s *Suite) TestUnsubAfterClose(c *check.C) { +func TestUnsubAfterClose(t *testing.T) { ps := New(1) ch := ps.Sub("t1") defer func() { @@ -143,58 +141,61 @@ func (s *Suite) TestUnsubAfterClose(c *check.C) { ps.Close("t1") _, ok := <-ch - c.Check(ok, check.Equals, false) + require.Equal(t, ok, false) } -func (s *Suite) TestShutdown(c *check.C) { +func TestShutdown(t *testing.T) { start := runtime.NumGoroutine() - New(10).Shutdown() - time.Sleep(1) - c.Check(runtime.NumGoroutine()-start, check.Equals, 1) + for i := 0; i < 10; i++ { + New(10).Shutdown() + } + time.Sleep(1 * time.Second) + // 2 because go... + require.LessOrEqual(t, runtime.NumGoroutine()-start, 2) } -func (s *Suite) TestMultiSub(c *check.C) { +func TestMultiSub(t *testing.T) { ps := New(1) ch := ps.Sub("t1", "t2") ps.Pub("hi", "t1") - c.Check(<-ch, check.Equals, "hi") + require.Equal(t, <-ch, "hi") ps.Pub("hello", "t2") - c.Check(<-ch, check.Equals, "hello") + require.Equal(t, <-ch, "hello") ps.Shutdown() _, ok := <-ch - c.Check(ok, check.Equals, false) + require.Equal(t, ok, false) } -func (s *Suite) TestMultiSubOnce(c *check.C) { +func TestMultiSubOnce(t *testing.T) { ps := New(1) ch := ps.SubOnce("t1", "t2") ps.Pub("hi", "t1") - c.Check(<-ch, check.Equals, "hi") + require.Equal(t, <-ch, "hi") ps.Pub("hello", "t2") _, ok := <-ch - c.Check(ok, check.Equals, false) + require.Equal(t, ok, false) ps.Shutdown() } -func (s *Suite) TestMultiPub(c *check.C) { +func TestMultiPub(t *testing.T) { ps := New(1) ch1 := ps.Sub("t1") ch2 := ps.Sub("t2") ps.Pub("hi", "t1", "t2") - c.Check(<-ch1, check.Equals, "hi") - c.Check(<-ch2, check.Equals, "hi") + require.Equal(t, <-ch1, "hi") + require.Equal(t, <-ch2, "hi") ps.Shutdown() } -func (s *Suite) TestMultiUnsub(c *check.C) { +func TestMultiUnsub(t *testing.T) { ps := New(1) ch := ps.Sub("t1", "t2", "t3") @@ -203,30 +204,29 @@ func (s *Suite) TestMultiUnsub(c *check.C) { ps.Pub("hi", "t1") ps.Pub("hello", "t2") - c.Check(<-ch, check.Equals, "hello") + require.Equal(t, <-ch, "hello") ps.Unsub(ch, "t2", "t3") _, ok := <-ch - c.Check(ok, check.Equals, false) + require.Equal(t, ok, false) ps.Shutdown() } -func (s *Suite) TestMultiClose(c *check.C) { +func TestMultiClose(t *testing.T) { ps := New(1) ch := ps.Sub("t1", "t2") ps.Pub("hi", "t1") - c.Check(<-ch, check.Equals, "hi") + require.Equal(t, <-ch, "hi") ps.Close("t1") ps.Pub("hello", "t2") - c.Check(<-ch, check.Equals, "hello") + require.Equal(t, <-ch, "hello") ps.Close("t2") _, ok := <-ch - c.Check(ok, check.Equals, false) + require.Equal(t, ok, false) ps.Shutdown() } -*/