Skip to content

Commit

Permalink
nsqd: improve gossip test time/robustness
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Apr 16, 2016
1 parent d363fe0 commit 6b0d185
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 17 deletions.
5 changes: 4 additions & 1 deletion nsqd/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,16 @@ func initSerf(opts *Options,
serfConfig.MemberlistConfig.BindPort = gossipAddr.Port
serfConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
serfConfig.MemberlistConfig.GossipNodes = 5
serfConfig.MemberlistConfig.ProbeInterval = opts.GossipProbeInterval
serfConfig.MemberlistConfig.SuspicionMult = opts.GossipSuspicionMult
serfConfig.MemberlistConfig.LogOutput = logWriter{opts.Logger, []byte("memberlist:")}
if len(key) != 0 {
serfConfig.MemberlistConfig.SecretKey = key
}
serfConfig.EventCh = serfEventChan
serfConfig.EventBuffer = 1024
serfConfig.ReconnectTimeout = time.Hour
serfConfig.ReapInterval = opts.GossipReapInterval
serfConfig.ReconnectTimeout = opts.GossipReconnectTimeout
serfConfig.LogOutput = logWriter{opts.Logger, []byte("serf:")}

return serf.Create(serfConfig)
Expand Down
45 changes: 29 additions & 16 deletions nsqd/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,15 @@ func TestGossip(t *testing.T) {
sort.Ints(tcpPorts)

// wait for convergence
converge(5*time.Second, nsqds, convergenceTester.c, func() bool {
converged := converge(5*time.Second, nsqds, convergenceTester.c, func() bool {
for _, nsqd := range nsqds {
if len(nsqd.rdb.FindProducers("client", "", "")) != num {
return false
}
}
return true
})
equal(t, converged, true)

// all nodes in the cluster should have registrations
for _, nsqd := range nsqds {
Expand All @@ -85,7 +86,7 @@ func TestGossip(t *testing.T) {
topic.GetChannel("ch")
firstPort := nsqds[0].tcpListener.Addr().(*net.TCPAddr).Port

converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
for _, nsqd := range nsqds {
if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 ||
len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 {
Expand All @@ -94,6 +95,7 @@ func TestGossip(t *testing.T) {
}
return true
})
equal(t, converged, true)

for _, nsqd := range nsqds {
producers := nsqd.rdb.FindProducers("topic", topicName, "")
Expand Down Expand Up @@ -127,6 +129,10 @@ func TestGossipResync(t *testing.T) {
opts.Logger = newTestLogger(t)
opts.GossipAddress = addr.String()
opts.BroadcastAddress = "127.0.0.1"
opts.GossipReapInterval = 200 * time.Millisecond
opts.GossipReconnectTimeout = 100 * time.Millisecond
opts.GossipSuspicionMult = 1
opts.GossipProbeInterval = 100 * time.Millisecond
opts.gossipDelegate = convergenceTester
if seedNode != nil {
opts.GossipSeedAddresses = []string{seedNode.getOpts().GossipAddress}
Expand All @@ -150,7 +156,7 @@ func TestGossipResync(t *testing.T) {
topic.GetChannel("ch")
firstPort := nsqds[0].tcpListener.Addr().(*net.TCPAddr).Port

converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
converged := converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
for _, nsqd := range nsqds {
if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 ||
len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 {
Expand All @@ -159,6 +165,7 @@ func TestGossipResync(t *testing.T) {
}
return true
})
equal(t, converged, true)

for _, nsqd := range nsqds {
producers := nsqd.rdb.FindProducers("topic", topicName, "")
Expand All @@ -175,32 +182,34 @@ func TestGossipResync(t *testing.T) {
stillAlive := nsqds[:num-1]

// check that other nodes see it as closed
converge(10*time.Second, stillAlive, convergenceTester.c, func() bool {
converged = converge(10*time.Second, stillAlive, convergenceTester.c, func() bool {
for _, nsqd := range stillAlive {
if len(nsqd.serf.Members()) != len(stillAlive) {
return false
}
}
return true
})
equal(t, converged, true)

// restart stopped node
_, _, nsqd := mustStartNSQD(nsqds[num-1].getOpts())
defer nsqd.Exit()
nsqds[num-1] = nsqd

// check that other nodes see it as back open
converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
for _, nsqd := range nsqds {
if len(nsqd.serf.Members()) != len(nsqds) {
return false
}
}
return true
})
equal(t, converged, true)

// check that all nodes see the restarted first node
converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
for _, nsqd := range nsqds {
if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 ||
len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 {
Expand All @@ -209,6 +218,7 @@ func TestGossipResync(t *testing.T) {
}
return true
})
equal(t, converged, true)

// we should have producers for the topic/channel back now
for _, nsqd := range nsqds {
Expand Down Expand Up @@ -266,7 +276,7 @@ func TestRegossip(t *testing.T) {
topic.GetChannel("ch")
firstPort := nsqds[0].tcpListener.Addr().(*net.TCPAddr).Port

converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
converged := converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
for _, nsqd := range nsqds {
if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 ||
len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 {
Expand All @@ -275,6 +285,7 @@ func TestRegossip(t *testing.T) {
}
return true
})
equal(t, converged, true)

for _, nsqd := range nsqds {
producers := nsqd.rdb.FindProducers("topic", topicName, "")
Expand All @@ -294,7 +305,7 @@ func TestRegossip(t *testing.T) {
}

// wait for regossip
converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
for _, nsqd := range nsqds {
if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 ||
len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 {
Expand All @@ -303,6 +314,7 @@ func TestRegossip(t *testing.T) {
}
return true
})
equal(t, converged, true)

// we should have producers for the topic/channel back now on all nodes
for _, nsqd := range nsqds {
Expand All @@ -316,16 +328,17 @@ func TestRegossip(t *testing.T) {
}
}

func converge(timeout time.Duration, nsqds []*NSQD, notifyChan chan struct{}, isConverged func() bool) {
// wait for convergence
converged := false
t := time.NewTimer(timeout)
for !converged {
func converge(timeout time.Duration, nsqds []*NSQD, notifyChan chan struct{}, isConverged func() bool) bool {
for {
select {
case <-t.C:
converged = true
case <-time.After(timeout):
return false
case <-notifyChan:
converged = isConverged()
if isConverged() {
goto exit
}
}
}
exit:
return true
}
8 changes: 8 additions & 0 deletions nsqd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ type Options struct {
GossipAddress string `flag:"gossip-address"`
GossipSeedAddresses []string `flag:"gossip-seed-address"`
GossipRegossipInterval time.Duration `flag:"gossip-regossip-interval"`
GossipProbeInterval time.Duration
GossipSuspicionMult int
GossipReapInterval time.Duration
GossipReconnectTimeout time.Duration

// diskqueue options
DataPath string `flag:"data-path"`
Expand Down Expand Up @@ -148,5 +152,9 @@ func NewOptions() *Options {

gossipDelegate: nilGossipDelegate{},
GossipRegossipInterval: 60 * time.Second,
GossipProbeInterval: 1 * time.Second,
GossipSuspicionMult: 5,
GossipReapInterval: 15 * time.Second,
GossipReconnectTimeout: 1 * time.Hour,
}
}

0 comments on commit 6b0d185

Please sign in to comment.