Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Make TestGossipRouter goroutine easier to follow
Browse files Browse the repository at this point in the history
  • Loading branch information
bboreham committed Jun 26, 2015
1 parent 25f5da9 commit 0589201
Showing 1 changed file with 33 additions and 27 deletions.
60 changes: 33 additions & 27 deletions ipam/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func AssertNothingSentErr(t *testing.T, ch <-chan error) {

// Router to convey gossip from one gossiper to another, for testing
type unicastMessage struct {
sender *router.PeerName
sender router.PeerName
buf []byte
}
type broadcastMessage struct {
Expand Down Expand Up @@ -250,49 +250,55 @@ type TestGossipRouterClient struct {
sender router.PeerName
}

func (grouter *TestGossipRouter) connect(sender router.PeerName, gossiper router.Gossiper) router.Gossip {
gossipChan := make(chan interface{}, 100)
func (grouter *TestGossipRouter) run(gossiper router.Gossiper, gossipChan chan interface{}) {
gossipTimer := time.Tick(10 * time.Second)
for {
select {
case gossip := <-gossipChan:
switch message := gossip.(type) {
case exitMessage:
close(message.exitChan)
return

go func() {
gossipTimer := time.Tick(10 * time.Second)
for {
select {
case gossip := <-gossipChan:
if message, isExit := gossip.(exitMessage); isExit {
close(message.exitChan)
return
case flushMessage:
close(message.flushChan)

case unicastMessage:
if rand.Float32() > (1.0 - grouter.loss) {
continue
}
if message, isFlush := gossip.(flushMessage); isFlush {
close(message.flushChan)
if err := gossiper.OnGossipUnicast(message.sender, message.buf); err != nil {
panic(fmt.Sprintf("Error doing gossip unicast to %s: %s", message.sender, err))
}

case broadcastMessage:
if rand.Float32() > (1.0 - grouter.loss) {
continue
}
switch message := gossip.(type) {
case unicastMessage:
if err := gossiper.OnGossipUnicast(*message.sender, message.buf); err != nil {
panic(fmt.Sprintf("Error doing gossip unicast to %s: %s", sender, err))
}
case broadcastMessage:
for _, msg := range message.data.Encode() {
if _, err := gossiper.OnGossipBroadcast(msg); err != nil {
panic(fmt.Sprintf("Error doing gossip broadcast to %s: %s", sender, err))
}
for _, msg := range message.data.Encode() {
if _, err := gossiper.OnGossipBroadcast(msg); err != nil {
panic(fmt.Sprintf("Error doing gossip broadcast: %s", err))
}
}
case <-gossipTimer:
grouter.GossipBroadcast(gossiper.Gossip())
}
case <-gossipTimer:
grouter.GossipBroadcast(gossiper.Gossip())
}
}()
}
}

func (grouter *TestGossipRouter) connect(sender router.PeerName, gossiper router.Gossiper) router.Gossip {
gossipChan := make(chan interface{}, 100)

go grouter.run(gossiper, gossipChan)

grouter.gossipChans[sender] = gossipChan
return TestGossipRouterClient{grouter, sender}
}

func (client TestGossipRouterClient) GossipUnicast(dstPeerName router.PeerName, buf []byte) error {
select {
case client.router.gossipChans[dstPeerName] <- unicastMessage{sender: &client.sender, buf: buf}:
case client.router.gossipChans[dstPeerName] <- unicastMessage{sender: client.sender, buf: buf}:
default: // drop the message if we cannot send it
common.Error.Printf("Dropping message")
}
Expand Down

0 comments on commit 0589201

Please sign in to comment.