Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Flush messages from mock gossip in IPAM test #1024

Merged
merged 5 commits into from
Jun 26, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions ipam/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestCancel(t *testing.T) {
CIDR = "10.0.1.7/26"
)

router := TestGossipRouter{make(map[router.PeerName]chan gossipMessage), 0.0}
router := TestGossipRouter{make(map[router.PeerName]chan interface{}), 0.0}

alloc1, subnet := makeAllocator("01:00:00:02:00:00", CIDR, 2)
alloc1.SetInterfaces(router.connect(alloc1.ourName, alloc1))
Expand Down Expand Up @@ -256,13 +256,17 @@ func TestTransfer(t *testing.T) {
wt.AssertTrue(t, err == nil, "Failed to get address")

router.GossipBroadcast(alloc2.Gossip())
router.flush()
router.GossipBroadcast(alloc3.Gossip())
router.flush()
router.removePeer(alloc2.ourName)
router.removePeer(alloc3.ourName)
alloc2.Stop()
alloc3.Stop()
router.flush()
wt.AssertSuccess(t, alloc1.AdminTakeoverRanges(alloc2.ourName.String()))
wt.AssertSuccess(t, alloc1.AdminTakeoverRanges(alloc3.ourName.String()))
router.flush()

wt.AssertEquals(t, alloc1.NumFreeAddresses(subnet), address.Offset(1022))

Expand All @@ -282,7 +286,8 @@ func TestFakeRouterSimple(t *testing.T) {
alloc1 := allocs[0]
//alloc2 := allocs[1]

alloc1.Allocate("foo", subnet, nil)
_, err := alloc1.Allocate("foo", subnet, nil)
wt.AssertTrue(t, err == nil, "Failed to get address")
}

func TestAllocatorFuzz(t *testing.T) {
Expand Down
96 changes: 59 additions & 37 deletions ipam/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,33 +200,47 @@ func AssertNothingSentErr(t *testing.T, ch <-chan error) {
}

// Router to convey gossip from one gossiper to another, for testing
type gossipMessage struct {
isUnicast bool
sender *router.PeerName
buf []byte // for unicast
data router.GossipData // for broadcast
exitChan chan bool
type unicastMessage struct {
sender router.PeerName
buf []byte
}
type broadcastMessage struct {
data router.GossipData
}
type exitMessage struct {
exitChan chan struct{}
}
type flushMessage struct {
flushChan chan struct{}
}

type TestGossipRouter struct {
gossipChans map[router.PeerName]chan gossipMessage
gossipChans map[router.PeerName]chan interface{}
loss float32 // 0.0 means no loss
}

func (grouter *TestGossipRouter) GossipBroadcast(update router.GossipData) error {
for _, gossipChan := range grouter.gossipChans {
select {
case gossipChan <- gossipMessage{data: update}:
case gossipChan <- broadcastMessage{data: update}:
default: // drop the message if we cannot send it
}
}
return nil
}

func (grouter *TestGossipRouter) flush() {
for _, gossipChan := range grouter.gossipChans {
flushChan := make(chan struct{})
gossipChan <- flushMessage{flushChan: flushChan}
<-flushChan
}
}

func (grouter *TestGossipRouter) removePeer(peer router.PeerName) {
gossipChan := grouter.gossipChans[peer]
resultChan := make(chan bool)
gossipChan <- gossipMessage{exitChan: resultChan}
resultChan := make(chan struct{})
gossipChan <- exitMessage{exitChan: resultChan}
<-resultChan
delete(grouter.gossipChans, peer)
}
Expand All @@ -236,47 +250,55 @@ type TestGossipRouterClient struct {
sender router.PeerName
}

func (grouter *TestGossipRouter) connect(sender router.PeerName, gossiper router.Gossiper) router.Gossip {
gossipChan := make(chan gossipMessage, 100)

go func() {
gossipTimer := time.Tick(10 * time.Second)
for {
select {
case message := <-gossipChan:
if message.exitChan != nil {
message.exitChan <- true
return
}
func (grouter *TestGossipRouter) run(gossiper router.Gossiper, gossipChan chan interface{}) {
gossipTimer := time.Tick(10 * time.Second)
for {
select {
case gossip := <-gossipChan:
switch message := gossip.(type) {
case exitMessage:
close(message.exitChan)
return

case flushMessage:
close(message.flushChan)

case unicastMessage:
if rand.Float32() > (1.0 - grouter.loss) {
continue
}
if err := gossiper.OnGossipUnicast(message.sender, message.buf); err != nil {
panic(fmt.Sprintf("Error doing gossip unicast to %s: %s", message.sender, err))
}

if message.isUnicast {
if err := gossiper.OnGossipUnicast(*message.sender, message.buf); err != nil {
panic(fmt.Sprintf("Error doing gossip unicast to %s: %s", sender, err))
}
} else {
for _, msg := range message.data.Encode() {
if _, err := gossiper.OnGossipBroadcast(msg); err != nil {
panic(fmt.Sprintf("Error doing gossip broadcast to %s: %s", sender, err))
}
case broadcastMessage:
if rand.Float32() > (1.0 - grouter.loss) {
continue
}
for _, msg := range message.data.Encode() {
if _, err := gossiper.OnGossipBroadcast(msg); err != nil {
panic(fmt.Sprintf("Error doing gossip broadcast: %s", err))
}
}
case <-gossipTimer:
grouter.GossipBroadcast(gossiper.Gossip())
}
case <-gossipTimer:
grouter.GossipBroadcast(gossiper.Gossip())
}
}()
}
}

func (grouter *TestGossipRouter) connect(sender router.PeerName, gossiper router.Gossiper) router.Gossip {
gossipChan := make(chan interface{}, 100)

go grouter.run(gossiper, gossipChan)

grouter.gossipChans[sender] = gossipChan
return TestGossipRouterClient{grouter, sender}
}

func (client TestGossipRouterClient) GossipUnicast(dstPeerName router.PeerName, buf []byte) error {
select {
case client.router.gossipChans[dstPeerName] <- gossipMessage{isUnicast: true, sender: &client.sender, buf: buf}:
case client.router.gossipChans[dstPeerName] <- unicastMessage{sender: client.sender, buf: buf}:
default: // drop the message if we cannot send it
common.Error.Printf("Dropping message")
}
Expand All @@ -289,7 +311,7 @@ func (client TestGossipRouterClient) GossipBroadcast(update router.GossipData) e

func makeNetworkOfAllocators(size int, cidr string) ([]*Allocator, TestGossipRouter, address.Range) {

gossipRouter := TestGossipRouter{make(map[router.PeerName]chan gossipMessage), 0.0}
gossipRouter := TestGossipRouter{make(map[router.PeerName]chan interface{}), 0.0}
allocs := make([]*Allocator, size)
var subnet address.Range

Expand All @@ -303,7 +325,7 @@ func makeNetworkOfAllocators(size int, cidr string) ([]*Allocator, TestGossipRou
}

gossipRouter.GossipBroadcast(allocs[size-1].Gossip())
time.Sleep(1000 * time.Millisecond)
gossipRouter.flush()
return allocs, gossipRouter, subnet
}

Expand Down