From 6b0d185ccf471b24586011ac17304df90107a382 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Sat, 18 Jul 2015 15:57:59 -0700 Subject: [PATCH] nsqd: improve gossip test time/robustness --- nsqd/gossip.go | 5 ++++- nsqd/gossip_test.go | 45 +++++++++++++++++++++++++++++---------------- nsqd/options.go | 8 ++++++++ 3 files changed, 41 insertions(+), 17 deletions(-) diff --git a/nsqd/gossip.go b/nsqd/gossip.go index 908311001..5fd6ca129 100644 --- a/nsqd/gossip.go +++ b/nsqd/gossip.go @@ -95,13 +95,16 @@ func initSerf(opts *Options, serfConfig.MemberlistConfig.BindPort = gossipAddr.Port serfConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond serfConfig.MemberlistConfig.GossipNodes = 5 + serfConfig.MemberlistConfig.ProbeInterval = opts.GossipProbeInterval + serfConfig.MemberlistConfig.SuspicionMult = opts.GossipSuspicionMult serfConfig.MemberlistConfig.LogOutput = logWriter{opts.Logger, []byte("memberlist:")} if len(key) != 0 { serfConfig.MemberlistConfig.SecretKey = key } serfConfig.EventCh = serfEventChan serfConfig.EventBuffer = 1024 - serfConfig.ReconnectTimeout = time.Hour + serfConfig.ReapInterval = opts.GossipReapInterval + serfConfig.ReconnectTimeout = opts.GossipReconnectTimeout serfConfig.LogOutput = logWriter{opts.Logger, []byte("serf:")} return serf.Create(serfConfig) diff --git a/nsqd/gossip_test.go b/nsqd/gossip_test.go index 8182801b4..a66706f6a 100644 --- a/nsqd/gossip_test.go +++ b/nsqd/gossip_test.go @@ -57,7 +57,7 @@ func TestGossip(t *testing.T) { sort.Ints(tcpPorts) // wait for convergence - converge(5*time.Second, nsqds, convergenceTester.c, func() bool { + converged := converge(5*time.Second, nsqds, convergenceTester.c, func() bool { for _, nsqd := range nsqds { if len(nsqd.rdb.FindProducers("client", "", "")) != num { return false @@ -65,6 +65,7 @@ func TestGossip(t *testing.T) { } return true }) + equal(t, converged, true) // all nodes in the cluster should have registrations for _, nsqd := range nsqds { @@ -85,7 +86,7 @@ func TestGossip(t *testing.T) { topic.GetChannel("ch") firstPort := nsqds[0].tcpListener.Addr().(*net.TCPAddr).Port - converge(10*time.Second, nsqds, convergenceTester.c, func() bool { + converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool { for _, nsqd := range nsqds { if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 || len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 { @@ -94,6 +95,7 @@ func TestGossip(t *testing.T) { } return true }) + equal(t, converged, true) for _, nsqd := range nsqds { producers := nsqd.rdb.FindProducers("topic", topicName, "") @@ -127,6 +129,10 @@ func TestGossipResync(t *testing.T) { opts.Logger = newTestLogger(t) opts.GossipAddress = addr.String() opts.BroadcastAddress = "127.0.0.1" + opts.GossipReapInterval = 200 * time.Millisecond + opts.GossipReconnectTimeout = 100 * time.Millisecond + opts.GossipSuspicionMult = 1 + opts.GossipProbeInterval = 100 * time.Millisecond opts.gossipDelegate = convergenceTester if seedNode != nil { opts.GossipSeedAddresses = []string{seedNode.getOpts().GossipAddress} @@ -150,7 +156,7 @@ func TestGossipResync(t *testing.T) { topic.GetChannel("ch") firstPort := nsqds[0].tcpListener.Addr().(*net.TCPAddr).Port - converge(10*time.Second, nsqds, convergenceTester.c, func() bool { + converged := converge(10*time.Second, nsqds, convergenceTester.c, func() bool { for _, nsqd := range nsqds { if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 || len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 { @@ -159,6 +165,7 @@ func TestGossipResync(t *testing.T) { } return true }) + equal(t, converged, true) for _, nsqd := range nsqds { producers := nsqd.rdb.FindProducers("topic", topicName, "") @@ -175,7 +182,7 @@ func TestGossipResync(t *testing.T) { stillAlive := nsqds[:num-1] // check that other nodes see it as closed - converge(10*time.Second, stillAlive, convergenceTester.c, func() bool { + converged = converge(10*time.Second, stillAlive, convergenceTester.c, func() bool { for _, nsqd := range stillAlive { if len(nsqd.serf.Members()) != len(stillAlive) { return false @@ -183,6 +190,7 @@ func TestGossipResync(t *testing.T) { } return true }) + equal(t, converged, true) // restart stopped node _, _, nsqd := mustStartNSQD(nsqds[num-1].getOpts()) @@ -190,7 +198,7 @@ func TestGossipResync(t *testing.T) { nsqds[num-1] = nsqd // check that other nodes see it as back open - converge(10*time.Second, nsqds, convergenceTester.c, func() bool { + converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool { for _, nsqd := range nsqds { if len(nsqd.serf.Members()) != len(nsqds) { return false @@ -198,9 +206,10 @@ func TestGossipResync(t *testing.T) { } return true }) + equal(t, converged, true) // check that all nodes see the restarted first node - converge(10*time.Second, nsqds, convergenceTester.c, func() bool { + converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool { for _, nsqd := range nsqds { if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 || len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 { @@ -209,6 +218,7 @@ func TestGossipResync(t *testing.T) { } return true }) + equal(t, converged, true) // we should have producers for the topic/channel back now for _, nsqd := range nsqds { @@ -266,7 +276,7 @@ func TestRegossip(t *testing.T) { topic.GetChannel("ch") firstPort := nsqds[0].tcpListener.Addr().(*net.TCPAddr).Port - converge(10*time.Second, nsqds, convergenceTester.c, func() bool { + converged := converge(10*time.Second, nsqds, convergenceTester.c, func() bool { for _, nsqd := range nsqds { if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 || len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 { @@ -275,6 +285,7 @@ func TestRegossip(t *testing.T) { } return true }) + equal(t, converged, true) for _, nsqd := range nsqds { producers := nsqd.rdb.FindProducers("topic", topicName, "") @@ -294,7 +305,7 @@ func TestRegossip(t *testing.T) { } // wait for regossip - converge(10*time.Second, nsqds, convergenceTester.c, func() bool { + converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool { for _, nsqd := range nsqds { if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 || len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 { @@ -303,6 +314,7 @@ func TestRegossip(t *testing.T) { } return true }) + equal(t, converged, true) // we should have producers for the topic/channel back now on all nodes for _, nsqd := range nsqds { @@ -316,16 +328,17 @@ func TestRegossip(t *testing.T) { } } -func converge(timeout time.Duration, nsqds []*NSQD, notifyChan chan struct{}, isConverged func() bool) { - // wait for convergence - converged := false - t := time.NewTimer(timeout) - for !converged { +func converge(timeout time.Duration, nsqds []*NSQD, notifyChan chan struct{}, isConverged func() bool) bool { + for { select { - case <-t.C: - converged = true + case <-time.After(timeout): + return false case <-notifyChan: - converged = isConverged() + if isConverged() { + goto exit + } } } +exit: + return true } diff --git a/nsqd/options.go b/nsqd/options.go index 5a8cc7d49..f9b697a47 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -24,6 +24,10 @@ type Options struct { GossipAddress string `flag:"gossip-address"` GossipSeedAddresses []string `flag:"gossip-seed-address"` GossipRegossipInterval time.Duration `flag:"gossip-regossip-interval"` + GossipProbeInterval time.Duration + GossipSuspicionMult int + GossipReapInterval time.Duration + GossipReconnectTimeout time.Duration // diskqueue options DataPath string `flag:"data-path"` @@ -148,5 +152,9 @@ func NewOptions() *Options { gossipDelegate: nilGossipDelegate{}, GossipRegossipInterval: 60 * time.Second, + GossipProbeInterval: 1 * time.Second, + GossipSuspicionMult: 5, + GossipReapInterval: 15 * time.Second, + GossipReconnectTimeout: 1 * time.Hour, } }