From bf3e72c5623308be9458e4a709a1693e3c4cc049 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sat, 18 Jul 2015 16:56:34 -0700 Subject: [PATCH] nsqd: cleanup gossipDelegate --- nsqd/gossip.go | 3 --- nsqd/gossip_test.go | 41 ++++++++++------------------------------- nsqd/options.go | 11 ----------- 3 files changed, 10 insertions(+), 45 deletions(-) diff --git a/nsqd/gossip.go b/nsqd/gossip.go index 5fd6ca129..bc23f00cf 100644 --- a/nsqd/gossip.go +++ b/nsqd/gossip.go @@ -261,9 +261,6 @@ func (n *NSQD) serfEventLoop() { default: n.logf("WARNING: un-handled Serf event: %#v", ev) } - if n.getOpts().gossipDelegate != nil { - n.getOpts().gossipDelegate.notify() - } case <-n.exitChan: goto exit } diff --git a/nsqd/gossip_test.go b/nsqd/gossip_test.go index a66706f6a..e331018df 100644 --- a/nsqd/gossip_test.go +++ b/nsqd/gossip_test.go @@ -7,25 +7,11 @@ import ( "time" ) -type gossipTester struct { - c chan struct{} -} - -func (g gossipTester) notify() { - g.c <- struct{}{} - select { - case g.c <- struct{}{}: - default: - } -} - func TestGossip(t *testing.T) { var nsqds []*NSQD var seedNode *NSQD var tcpPorts []int - convergenceTester := gossipTester{make(chan struct{}, 20)} - num := 3 for i := 0; i < num; i++ { // find an open port @@ -39,7 +25,6 @@ func TestGossip(t *testing.T) { opts.Logger = newTestLogger(t) opts.GossipAddress = addr.String() opts.BroadcastAddress = "127.0.0.1" - opts.gossipDelegate = convergenceTester if seedNode != nil { opts.GossipSeedAddresses = []string{seedNode.getOpts().GossipAddress} } @@ -57,7 +42,7 @@ func TestGossip(t *testing.T) { sort.Ints(tcpPorts) // wait for convergence - converged := converge(5*time.Second, nsqds, convergenceTester.c, func() bool { + converged := converge(5*time.Second, nsqds, func() bool { for _, nsqd := range nsqds { if len(nsqd.rdb.FindProducers("client", "", "")) != num { return false @@ -86,7 +71,7 @@ func TestGossip(t *testing.T) { topic.GetChannel("ch") firstPort := nsqds[0].tcpListener.Addr().(*net.TCPAddr).Port - converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool { + converged = converge(10*time.Second, nsqds, func() bool { for _, nsqd := range nsqds { if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 || len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 { @@ -114,8 +99,6 @@ func TestGossipResync(t *testing.T) { var seedNode *NSQD var tcpPorts []int - convergenceTester := gossipTester{make(chan struct{}, 20)} - num := 3 for i := 0; i < num; i++ { // find an open port @@ -133,7 +116,6 @@ func TestGossipResync(t *testing.T) { opts.GossipReconnectTimeout = 100 * time.Millisecond opts.GossipSuspicionMult = 1 opts.GossipProbeInterval = 100 * time.Millisecond - opts.gossipDelegate = convergenceTester if seedNode != nil { opts.GossipSeedAddresses = []string{seedNode.getOpts().GossipAddress} } @@ -156,7 +138,7 @@ func TestGossipResync(t *testing.T) { topic.GetChannel("ch") firstPort := nsqds[0].tcpListener.Addr().(*net.TCPAddr).Port - converged := converge(10*time.Second, nsqds, convergenceTester.c, func() bool { + converged := converge(10*time.Second, nsqds, func() bool { for _, nsqd := range nsqds { if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 || len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 { @@ -182,7 +164,7 @@ func TestGossipResync(t *testing.T) { stillAlive := nsqds[:num-1] // check that other nodes see it as closed - converged = converge(10*time.Second, stillAlive, convergenceTester.c, func() bool { + converged = converge(10*time.Second, stillAlive, func() bool { for _, nsqd := range stillAlive { if len(nsqd.serf.Members()) != len(stillAlive) { return false @@ -198,7 +180,7 @@ func TestGossipResync(t *testing.T) { nsqds[num-1] = nsqd // check that other nodes see it as back open - converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool { + converged = converge(10*time.Second, nsqds, func() bool { for _, nsqd := range nsqds { if len(nsqd.serf.Members()) != len(nsqds) { return false @@ -209,7 +191,7 @@ func TestGossipResync(t *testing.T) { equal(t, converged, true) // check that all nodes see the restarted first node - converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool { + converged = converge(10*time.Second, nsqds, func() bool { for _, nsqd := range nsqds { if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 || len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 { @@ -237,8 +219,6 @@ func TestRegossip(t *testing.T) { var seedNode *NSQD var tcpPorts []int - convergenceTester := gossipTester{make(chan struct{}, 20)} - num := 3 for i := 0; i < num; i++ { // find an open port @@ -253,7 +233,6 @@ func TestRegossip(t *testing.T) { opts.GossipAddress = addr.String() opts.BroadcastAddress = "127.0.0.1" opts.GossipRegossipInterval = 1 * time.Second - opts.gossipDelegate = convergenceTester if seedNode != nil { opts.GossipSeedAddresses = []string{seedNode.getOpts().GossipAddress} } @@ -276,7 +255,7 @@ func TestRegossip(t *testing.T) { topic.GetChannel("ch") firstPort := nsqds[0].tcpListener.Addr().(*net.TCPAddr).Port - converged := converge(10*time.Second, nsqds, convergenceTester.c, func() bool { + converged := converge(10*time.Second, nsqds, func() bool { for _, nsqd := range nsqds { if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 || len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 { @@ -305,7 +284,7 @@ func TestRegossip(t *testing.T) { } // wait for regossip - converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool { + converged = converge(10*time.Second, nsqds, func() bool { for _, nsqd := range nsqds { if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 || len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 { @@ -328,12 +307,12 @@ func TestRegossip(t *testing.T) { } } -func converge(timeout time.Duration, nsqds []*NSQD, notifyChan chan struct{}, isConverged func() bool) bool { +func converge(timeout time.Duration, nsqds []*NSQD, isConverged func() bool) bool { for { select { case <-time.After(timeout): return false - case <-notifyChan: + case <-time.After(10 * time.Millisecond): if isConverged() { goto exit } diff --git a/nsqd/options.go b/nsqd/options.go index f9b697a47..d8c10cbb7 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -80,18 +80,8 @@ type Options struct { SnappyEnabled bool `flag:"snappy"` Logger logger - - gossipDelegate gossipDelegate -} - -type gossipDelegate interface { - notify() } -type nilGossipDelegate struct{} - -func (_ nilGossipDelegate) notify() {} - func NewOptions() *Options { hostname, err := os.Hostname() if err != nil { @@ -150,7 +140,6 @@ func NewOptions() *Options { Logger: log.New(os.Stderr, "[nsqd] ", log.Ldate|log.Ltime|log.Lmicroseconds), - gossipDelegate: nilGossipDelegate{}, GossipRegossipInterval: 60 * time.Second, GossipProbeInterval: 1 * time.Second, GossipSuspicionMult: 5,