Skip to content

Commit

Permalink
[FAB-9146] fix TestCreateChainFromBlock data race
Browse files Browse the repository at this point in the history
The test performs explicit viper configuration before creating each test
and the gossip discovery service was using viper to retrieve the
configuration at runtime from go routines it spawns. Since viper does
not synchronize access to its maps, this resulted in a race.

The race is avoided by creating fields on gossipDiscoveryImpl to hold
the timeout values, setting them during construction, and referencing
the fields at runtime instead of viper.

TestCreateChainFromBlock was also modified to address issues that
prevented execution with `go test -count n`.

Change-Id: I57331d81be8c24f2ed83362f4551c14747596498
Signed-off-by: Matthew Sykes <sykesmat@us.ibm.com>
  • Loading branch information
sykesm committed Jul 2, 2018
1 parent 6dd7353 commit a72b7fd
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 10 deletions.
9 changes: 8 additions & 1 deletion core/peer/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package peer

import (
"fmt"
"math/rand"
"net"
"testing"

Expand Down Expand Up @@ -90,7 +91,8 @@ func TestCreateChainFromBlock(t *testing.T) {
cleanup := setupPeerFS(t)
defer cleanup()

testChainID := "mytestchainid"
Initialize(nil, &ccprovider.MockCcProviderImpl{}, (&mscc.MocksccProviderFactory{}).NewSystemChaincodeProvider(), txvalidator.MapBasedPluginMapper(map[string]validation.PluginFactory{}))
testChainID := fmt.Sprintf("mytestchainid-%d", rand.Int())
block, err := configtxtest.MakeGenesisBlock(testChainID)
if err != nil {
fmt.Printf("Failed to create a config block, err %s\n", err)
Expand Down Expand Up @@ -187,6 +189,11 @@ func TestCreateChainFromBlock(t *testing.T) {
if len(channels) != 1 {
t.Fatalf("incorrect number of channels")
}

// cleanup the chain referenes to enable execution with -count n
chains.Lock()
chains.list = map[string]*chain{}
chains.Unlock()
}

func TestGetLocalIP(t *testing.T) {
Expand Down
28 changes: 19 additions & 9 deletions gossip/discovery/discovery_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ type gossipDiscoveryImpl struct {
logger *logging.Logger
disclosurePolicy DisclosurePolicy
pubsub *util.PubSub

aliveTimeInterval time.Duration
aliveExpirationTimeout time.Duration
aliveExpirationCheckInterval time.Duration
reconnectInterval time.Duration
}

// NewDiscoveryService returns a new discovery service with the comm module passed and the crypto service passed
Expand All @@ -110,6 +115,11 @@ func NewDiscoveryService(self NetworkMember, comm CommService, crypt CryptoServi
logger: util.GetLogger(util.LoggingDiscoveryModule, self.InternalEndpoint),
disclosurePolicy: disPol,
pubsub: util.NewPubSub(),

aliveTimeInterval: getAliveTimeInterval(),
aliveExpirationTimeout: getAliveExpirationTimeout(),
aliveExpirationCheckInterval: getAliveExpirationCheckInterval(),
reconnectInterval: getReconnectInterval(),
}

d.validateSelfConfig()
Expand Down Expand Up @@ -153,7 +163,7 @@ func (d *gossipDiscoveryImpl) Connect(member NetworkMember, id identifier) {
return
}
d.logger.Warningf("Could not connect to %v : %v", member, err)
time.Sleep(getReconnectInterval())
time.Sleep(d.reconnectInterval)
continue
}
peer := &NetworkMember{
Expand Down Expand Up @@ -219,7 +229,7 @@ func (d *gossipDiscoveryImpl) sendUntilAcked(peer *NetworkMember, message *proto
if _, timeoutErr := sub.Listen(); timeoutErr == nil {
return
}
time.Sleep(getReconnectInterval())
time.Sleep(d.reconnectInterval)
}
}

Expand Down Expand Up @@ -602,8 +612,8 @@ func (d *gossipDiscoveryImpl) periodicalReconnectToDead() {
}

wg.Wait()
d.logger.Debug("Sleeping", getReconnectInterval())
time.Sleep(getReconnectInterval())
d.logger.Debug("Sleeping", d.reconnectInterval)
time.Sleep(d.reconnectInterval)
}
}

Expand Down Expand Up @@ -657,7 +667,7 @@ func (d *gossipDiscoveryImpl) periodicalCheckAlive() {
defer d.logger.Debug("Stopped")

for !d.toDie() {
time.Sleep(getAliveExpirationCheckInterval())
time.Sleep(d.aliveExpirationCheckInterval)
dead := d.getDeadMembers()
if len(dead) > 0 {
d.logger.Debugf("Got %v dead members: %v", len(dead), dead)
Expand Down Expand Up @@ -707,7 +717,7 @@ func (d *gossipDiscoveryImpl) getDeadMembers() []common.PKIidType {
dead := []common.PKIidType{}
for id, last := range d.aliveLastTS {
elapsedNonAliveTime := time.Since(last.lastSeen)
if elapsedNonAliveTime.Nanoseconds() > getAliveExpirationTimeout().Nanoseconds() {
if elapsedNonAliveTime > d.aliveExpirationTimeout {
d.logger.Warning("Haven't heard from", []byte(id), "for", elapsedNonAliveTime)
dead = append(dead, common.PKIidType(id))
}
Expand All @@ -719,8 +729,8 @@ func (d *gossipDiscoveryImpl) periodicalSendAlive() {
defer d.logger.Debug("Stopped")

for !d.toDie() {
d.logger.Debug("Sleeping", getAliveTimeInterval())
time.Sleep(getAliveTimeInterval())
d.logger.Debug("Sleeping", d.aliveTimeInterval)
time.Sleep(d.aliveTimeInterval)
msg, err := d.createSignedAliveMessage(true)
if err != nil {
d.logger.Warningf("Failed creating alive message: %+v", errors.WithStack(err))
Expand Down Expand Up @@ -1007,7 +1017,7 @@ type aliveMsgStore struct {
func newAliveMsgStore(d *gossipDiscoveryImpl) *aliveMsgStore {
policy := proto.NewGossipMessageComparator(0)
trigger := func(m interface{}) {}
aliveMsgTTL := getAliveExpirationTimeout() * msgExpirationFactor
aliveMsgTTL := d.aliveExpirationTimeout * msgExpirationFactor
externalLock := func() { d.lock.Lock() }
externalUnlock := func() { d.lock.Unlock() }
callback := func(m interface{}) {
Expand Down

0 comments on commit a72b7fd

Please sign in to comment.