Skip to content

Commit

Permalink
Merge "Gossip tests tweaks"
Browse files Browse the repository at this point in the history
  • Loading branch information
mastersingh24 authored and Gerrit Code Review committed Nov 3, 2016
2 parents 75cfb3b + 2800ab9 commit e6d2c9b
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 124 deletions.
3 changes: 3 additions & 0 deletions gossip/comm/comm_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,9 @@ func (c *commImpl) Ping(context.Context, *proto.Empty) (*proto.Empty, error) {
}

func (c *commImpl) disconnect(pkiID PKIidType) {
if c.isStopping() {
return
}
c.deadEndpoints <- pkiID
c.connStore.closeByPKIid(pkiID)
}
Expand Down
4 changes: 4 additions & 0 deletions gossip/comm/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error)

destinationLock.Unlock()

if cs.isClosing {
return nil, fmt.Errorf("ConnStore is closing")
}

cs.Lock()
delete(cs.destinationLocks, string(pkiID))
defer cs.Unlock()
Expand Down
107 changes: 72 additions & 35 deletions gossip/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"google.golang.org/grpc"
)

var timeout = time.Second * time.Duration(15)

type dummyCommModule struct {
id string
presumeDead chan PKIidType
Expand Down Expand Up @@ -257,35 +259,47 @@ func TestUpdate(t *testing.T) {
instances = append(instances, inst)
}

time.Sleep(time.Duration(5) * time.Second)
assert.Equal(t, nodeNum-1, len(instances[nodeNum-1].GetMembership()))
fullMembership := func() bool {
return nodeNum-1 == len(instances[nodeNum-1].GetMembership())
}

waitUntilOrFail(t, fullMembership)

instances[0].UpdateMetadata([]byte("bla bla"))
instances[nodeNum-1].UpdateEndpoint("localhost:5511")
time.Sleep(time.Duration(5) * time.Second)

for _, member := range instances[nodeNum-1].GetMembership() {
if string(member.PKIid) == instances[0].comm.id {
assert.Equal(t, "bla bla", string(member.Metadata))
checkMembership := func() bool {
for _, member := range instances[nodeNum-1].GetMembership() {
if string(member.PKIid) == instances[0].comm.id {
if "bla bla" != string(member.Metadata) {
return false
}
}
}
}

for _, member := range instances[0].GetMembership() {
if string(member.PKIid) == instances[nodeNum-1].comm.id {
assert.Equal(t, "localhost:5511", string(member.Endpoint))
for _, member := range instances[0].GetMembership() {
if string(member.PKIid) == instances[nodeNum-1].comm.id {
if "localhost:5511" != string(member.Endpoint) {
return false
}
}
}
return true
}


waitUntilOrFail(t, checkMembership)

stopAction := &sync.WaitGroup{}
for i, inst := range instances {
fmt.Println("Stopping instance ", i)
for _, inst := range instances {
stopAction.Add(1)
go func(inst *gossipInstance) {
defer stopAction.Done()
inst.Stop()
}(inst)
}
stopAction.Wait()

waitUntilOrFailBlocking(t, stopAction.Wait)
}

func TestExpiration(t *testing.T) {
Expand All @@ -305,30 +319,35 @@ func TestExpiration(t *testing.T) {
instances = append(instances, inst)
}

time.Sleep(time.Duration(4) * time.Second)
assert.Equal(t, nodeNum-1, len(instances[nodeNum-1].GetMembership()))
fullMembership := func() bool {
return nodeNum-1 == len(instances[nodeNum-1].GetMembership())
}

waitUntilOrFail(t, fullMembership)

instances[nodeNum-1].Stop()
instances[nodeNum-2].Stop()
waitUntilOrFailBlocking(t, instances[nodeNum-1].Stop)
waitUntilOrFailBlocking(t, instances[nodeNum-2].Stop)

time.Sleep(time.Duration(2) * time.Second)
assert.Equal(t, nodeNum-3, len(instances[0].GetMembership()))
membershipReduced := func() bool {
return nodeNum-3 == len(instances[0].GetMembership())
}

waitUntilOrFail(t, membershipReduced)

//fmt.Println("Stopping members")
stopAction := &sync.WaitGroup{}
for i, inst := range instances {
if i+2 == nodeNum {
break
}
//fmt.Println("Stopping instance", inst.DiscoveryService.Self().Endpoint)
stopAction.Add(1)
go func(inst *gossipInstance) {
defer stopAction.Done()
inst.Stop()
//fmt.Println("Stopped instance", inst.DiscoveryService.Self().Endpoint)
}(inst)
}
stopAction.Wait()

waitUntilOrFailBlocking(t, stopAction.Wait)
}

func TestGetFullMembership(t *testing.T) {
Expand All @@ -348,8 +367,10 @@ func TestGetFullMembership(t *testing.T) {
instances = append(instances, inst)
}

time.Sleep(time.Duration(5) * time.Second)
assert.Equal(t, nodeNum-1, len(instances[nodeNum-1].GetMembership()))
fullMembership := func() bool {
return nodeNum - 1 == len(instances[nodeNum-1].GetMembership())
}
waitUntilOrFail(t, fullMembership)

stopAction := &sync.WaitGroup{}
for _, inst := range instances {
Expand All @@ -359,24 +380,40 @@ func TestGetFullMembership(t *testing.T) {
inst.Stop()
}(inst)
}
stopAction.Wait()

waitUntilOrFailBlocking(t, stopAction.Wait)
}

func TestGossipDiscoveryStopping(t *testing.T) {
inst := createDiscoveryInstance(9611, "d1", []string{bootPeer(9611)})
time.Sleep(time.Second)
waitUntilOrFailBlocking(t, inst.Stop)

diedChan := make(chan struct{})
go func(inst *gossipInstance) {
inst.Stop()
diedChan <- struct{}{}
}(inst)
}

timer := time.Tick(time.Duration(500) * time.Millisecond)
func waitUntilOrFail(t *testing.T, pred func() bool) {
start := time.Now()
limit := start.UnixNano() + timeout.Nanoseconds()
for time.Now().UnixNano() < limit {
if pred() {
return
}
time.Sleep(timeout / 10)
}
assert.Fail(t, "Timeout expired!")
}

func waitUntilOrFailBlocking(t *testing.T, f func()) {
successChan := make(chan struct{}, 1)
go func() {
f()
successChan <- struct{}{}
}()
select {
case <-timer:
t.Fatal("Didn't stop within a timely manner")
t.Fail()
case <-diedChan:
case <-time.NewTimer(timeout).C:
break
case <-successChan:
return
}
assert.Fail(t, "Timeout expired!")
}
Loading

0 comments on commit e6d2c9b

Please sign in to comment.