Skip to content

Commit

Permalink
nsqd: cleanup gossipDelegate
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Aug 29, 2015
1 parent ff45df6 commit aab65e9
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 45 deletions.
3 changes: 0 additions & 3 deletions nsqd/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,6 @@ func (n *NSQD) serfEventLoop() {
default:
n.logf("WARNING: un-handled Serf event: %#v", ev)
}
if n.getOpts().gossipDelegate != nil {
n.getOpts().gossipDelegate.notify()
}
case <-n.exitChan:
goto exit
}
Expand Down
41 changes: 10 additions & 31 deletions nsqd/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,11 @@ import (
"time"
)

type gossipTester struct {
c chan struct{}
}

func (g gossipTester) notify() {
g.c <- struct{}{}
select {
case g.c <- struct{}{}:
default:
}
}

func TestGossip(t *testing.T) {
var nsqds []*NSQD
var seedNode *NSQD
var tcpPorts []int

convergenceTester := gossipTester{make(chan struct{}, 20)}

num := 3
for i := 0; i < num; i++ {
// find an open port
Expand All @@ -39,7 +25,6 @@ func TestGossip(t *testing.T) {
opts.Logger = newTestLogger(t)
opts.GossipAddress = addr.String()
opts.BroadcastAddress = "127.0.0.1"
opts.gossipDelegate = convergenceTester
if seedNode != nil {
opts.GossipSeedAddresses = []string{seedNode.getOpts().GossipAddress}
}
Expand All @@ -57,7 +42,7 @@ func TestGossip(t *testing.T) {
sort.Ints(tcpPorts)

// wait for convergence
converged := converge(5*time.Second, nsqds, convergenceTester.c, func() bool {
converged := converge(5*time.Second, nsqds, func() bool {
for _, nsqd := range nsqds {
if len(nsqd.rdb.FindProducers("client", "", "")) != num {
return false
Expand Down Expand Up @@ -86,7 +71,7 @@ func TestGossip(t *testing.T) {
topic.GetChannel("ch")
firstPort := nsqds[0].tcpListener.Addr().(*net.TCPAddr).Port

converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
converged = converge(10*time.Second, nsqds, func() bool {
for _, nsqd := range nsqds {
if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 ||
len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 {
Expand Down Expand Up @@ -114,8 +99,6 @@ func TestGossipResync(t *testing.T) {
var seedNode *NSQD
var tcpPorts []int

convergenceTester := gossipTester{make(chan struct{}, 20)}

num := 3
for i := 0; i < num; i++ {
// find an open port
Expand All @@ -133,7 +116,6 @@ func TestGossipResync(t *testing.T) {
opts.GossipReconnectTimeout = 100 * time.Millisecond
opts.GossipSuspicionMult = 1
opts.GossipProbeInterval = 100 * time.Millisecond
opts.gossipDelegate = convergenceTester
if seedNode != nil {
opts.GossipSeedAddresses = []string{seedNode.getOpts().GossipAddress}
}
Expand All @@ -156,7 +138,7 @@ func TestGossipResync(t *testing.T) {
topic.GetChannel("ch")
firstPort := nsqds[0].tcpListener.Addr().(*net.TCPAddr).Port

converged := converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
converged := converge(10*time.Second, nsqds, func() bool {
for _, nsqd := range nsqds {
if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 ||
len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 {
Expand All @@ -182,7 +164,7 @@ func TestGossipResync(t *testing.T) {
stillAlive := nsqds[:num-1]

// check that other nodes see it as closed
converged = converge(10*time.Second, stillAlive, convergenceTester.c, func() bool {
converged = converge(10*time.Second, stillAlive, func() bool {
for _, nsqd := range stillAlive {
if len(nsqd.serf.Members()) != len(stillAlive) {
return false
Expand All @@ -198,7 +180,7 @@ func TestGossipResync(t *testing.T) {
nsqds[num-1] = nsqd

// check that other nodes see it as back open
converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
converged = converge(10*time.Second, nsqds, func() bool {
for _, nsqd := range nsqds {
if len(nsqd.serf.Members()) != len(nsqds) {
return false
Expand All @@ -209,7 +191,7 @@ func TestGossipResync(t *testing.T) {
equal(t, converged, true)

// check that all nodes see the restarted first node
converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
converged = converge(10*time.Second, nsqds, func() bool {
for _, nsqd := range nsqds {
if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 ||
len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 {
Expand Down Expand Up @@ -237,8 +219,6 @@ func TestRegossip(t *testing.T) {
var seedNode *NSQD
var tcpPorts []int

convergenceTester := gossipTester{make(chan struct{}, 20)}

num := 3
for i := 0; i < num; i++ {
// find an open port
Expand All @@ -253,7 +233,6 @@ func TestRegossip(t *testing.T) {
opts.GossipAddress = addr.String()
opts.BroadcastAddress = "127.0.0.1"
opts.GossipRegossipInterval = 1 * time.Second
opts.gossipDelegate = convergenceTester
if seedNode != nil {
opts.GossipSeedAddresses = []string{seedNode.getOpts().GossipAddress}
}
Expand All @@ -276,7 +255,7 @@ func TestRegossip(t *testing.T) {
topic.GetChannel("ch")
firstPort := nsqds[0].tcpListener.Addr().(*net.TCPAddr).Port

converged := converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
converged := converge(10*time.Second, nsqds, func() bool {
for _, nsqd := range nsqds {
if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 ||
len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 {
Expand Down Expand Up @@ -305,7 +284,7 @@ func TestRegossip(t *testing.T) {
}

// wait for regossip
converged = converge(10*time.Second, nsqds, convergenceTester.c, func() bool {
converged = converge(10*time.Second, nsqds, func() bool {
for _, nsqd := range nsqds {
if len(nsqd.rdb.FindProducers("topic", topicName, "")) != 1 ||
len(nsqd.rdb.FindProducers("channel", topicName, "ch")) != 1 {
Expand All @@ -328,12 +307,12 @@ func TestRegossip(t *testing.T) {
}
}

func converge(timeout time.Duration, nsqds []*NSQD, notifyChan chan struct{}, isConverged func() bool) bool {
func converge(timeout time.Duration, nsqds []*NSQD, isConverged func() bool) bool {
for {
select {
case <-time.After(timeout):
return false
case <-notifyChan:
case <-time.After(10 * time.Millisecond):
if isConverged() {
goto exit
}
Expand Down
11 changes: 0 additions & 11 deletions nsqd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,8 @@ type Options struct {
SnappyEnabled bool `flag:"snappy"`

Logger logger

gossipDelegate gossipDelegate
}

type gossipDelegate interface {
notify()
}

type nilGossipDelegate struct{}

func (_ nilGossipDelegate) notify() {}

func NewOptions() *Options {
hostname, err := os.Hostname()
if err != nil {
Expand Down Expand Up @@ -150,7 +140,6 @@ func NewOptions() *Options {

Logger: log.New(os.Stderr, "[nsqd] ", log.Ldate|log.Ltime|log.Lmicroseconds),

gossipDelegate: nilGossipDelegate{},
GossipRegossipInterval: 60 * time.Second,
GossipProbeInterval: 1 * time.Second,
GossipSuspicionMult: 5,
Expand Down

0 comments on commit aab65e9

Please sign in to comment.