Skip to content

Commit

Permalink
[FAB-10951] race in TestUpdateRootsFromConfigBlock
Browse files Browse the repository at this point in the history
Change the gossip pull engine to memoize the timeout configuration at
construction instead of constantly retrieving it at runtime from viper.

This resolves a race where TestUpdateRootsFromConfigBlock modifies the
viper configuration causing a concurrent map modification race.

Also add missing error assertions in TestUpdateRootsFromConfigBlock and
update the formatting to be a bit more idiomatic.

Change-Id: I094b7426e494bdcff00e156f9e804c58aceb9f06
Signed-off-by: Matthew Sykes <sykesmat@us.ibm.com>
  • Loading branch information
sykesm committed Jul 2, 2018
1 parent a06dd35 commit 4af2b13
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 85 deletions.
147 changes: 69 additions & 78 deletions core/peer/pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// default timeout for grpc connections
Expand Down Expand Up @@ -94,7 +95,8 @@ func createMSPConfig(rootCerts, tlsRootCerts, tlsIntermediateCerts [][]byte,
RootCerts: rootCerts,
TlsRootCerts: tlsRootCerts,
TlsIntermediateCerts: tlsIntermediateCerts,
Name: mspID}
Name: mspID,
}

fmpsjs, err := proto.Marshal(fmspconf)
if err != nil {
Expand All @@ -120,106 +122,94 @@ func createConfigBlock(chainID string, appMSPConf, ordererMSPConf *mspproto.MSPC
func TestUpdateRootsFromConfigBlock(t *testing.T) {
// load test certs from testdata
org1CA, err := ioutil.ReadFile(filepath.Join("testdata", "Org1-cert.pem"))
org1Server1Key, err := ioutil.ReadFile(filepath.Join("testdata",
"Org1-server1-key.pem"))
org1Server1Cert, err := ioutil.ReadFile(filepath.Join("testdata",
"Org1-server1-cert.pem"))
require.NoError(t, err)
org1Server1Key, err := ioutil.ReadFile(filepath.Join("testdata", "Org1-server1-key.pem"))
require.NoError(t, err)
org1Server1Cert, err := ioutil.ReadFile(filepath.Join("testdata", "Org1-server1-cert.pem"))
require.NoError(t, err)
org2CA, err := ioutil.ReadFile(filepath.Join("testdata", "Org2-cert.pem"))
org2Server1Key, err := ioutil.ReadFile(filepath.Join("testdata",
"Org2-server1-key.pem"))
org2Server1Cert, err := ioutil.ReadFile(filepath.Join("testdata",
"Org2-server1-cert.pem"))
org2IntermediateCA, err := ioutil.ReadFile(filepath.Join("testdata",
"Org2-child1-cert.pem"))
org2IntermediateServer1Key, err := ioutil.ReadFile(filepath.Join("testdata",
"Org2-child1-server1-key.pem"))
org2IntermediateServer1Cert, err := ioutil.ReadFile(filepath.Join("testdata",
"Org2-child1-server1-cert.pem"))
require.NoError(t, err)
org2Server1Key, err := ioutil.ReadFile(filepath.Join("testdata", "Org2-server1-key.pem"))
require.NoError(t, err)
org2Server1Cert, err := ioutil.ReadFile(filepath.Join("testdata", "Org2-server1-cert.pem"))
require.NoError(t, err)
org2IntermediateCA, err := ioutil.ReadFile(filepath.Join("testdata", "Org2-child1-cert.pem"))
require.NoError(t, err)
org2IntermediateServer1Key, err := ioutil.ReadFile(filepath.Join("testdata", "Org2-child1-server1-key.pem"))
require.NoError(t, err)
org2IntermediateServer1Cert, err := ioutil.ReadFile(filepath.Join("testdata", "Org2-child1-server1-cert.pem"))
require.NoError(t, err)
ordererOrgCA, err := ioutil.ReadFile(filepath.Join("testdata", "Org3-cert.pem"))
ordererOrgServer1Key, err := ioutil.ReadFile(filepath.Join("testdata",
"Org3-server1-key.pem"))
ordererOrgServer1Cert, err := ioutil.ReadFile(filepath.Join("testdata",
"Org3-server1-cert.pem"))

if err != nil {
t.Fatalf("Failed to load test certificates: %v", err)
}
require.NoError(t, err)
ordererOrgServer1Key, err := ioutil.ReadFile(filepath.Join("testdata", "Org3-server1-key.pem"))
require.NoError(t, err)
ordererOrgServer1Cert, err := ioutil.ReadFile(filepath.Join("testdata", "Org3-server1-cert.pem"))
require.NoError(t, err)

// create test MSPConfigs
org1MSPConf, err := createMSPConfig([][]byte{org2CA}, [][]byte{org1CA},
[][]byte{}, "Org1MSP")
org2MSPConf, err := createMSPConfig([][]byte{org1CA}, [][]byte{org2CA},
[][]byte{}, "Org2MSP")
org2IntermediateMSPConf, err := createMSPConfig([][]byte{org1CA},
[][]byte{org2CA}, [][]byte{org2IntermediateCA}, "Org2IntermediateMSP")
ordererOrgMSPConf, err := createMSPConfig([][]byte{org1CA},
[][]byte{ordererOrgCA}, [][]byte{}, "OrdererOrgMSP")
if err != nil {
t.Fatalf("Failed to create MSPConfigs (%s)", err)
}
org1MSPConf, err := createMSPConfig([][]byte{org2CA}, [][]byte{org1CA}, [][]byte{}, "Org1MSP")
require.NoError(t, err)
org2MSPConf, err := createMSPConfig([][]byte{org1CA}, [][]byte{org2CA}, [][]byte{}, "Org2MSP")
require.NoError(t, err)
org2IntermediateMSPConf, err := createMSPConfig([][]byte{org1CA}, [][]byte{org2CA}, [][]byte{org2IntermediateCA}, "Org2IntermediateMSP")
require.NoError(t, err)
ordererOrgMSPConf, err := createMSPConfig([][]byte{org1CA}, [][]byte{ordererOrgCA}, [][]byte{}, "OrdererOrgMSP")
require.NoError(t, err)

// create test channel create blocks
channel1Block, err := createConfigBlock("channel1", org1MSPConf,
ordererOrgMSPConf, "Org1MSP", "OrdererOrgMSP")
channel2Block, err := createConfigBlock("channel2", org2MSPConf,
ordererOrgMSPConf, "Org2MSP", "OrdererOrgMSP")
channel3Block, err := createConfigBlock("channel3", org2IntermediateMSPConf,
ordererOrgMSPConf, "Org2IntermediateMSP", "OrdererOrgMSP")
channel1Block, err := createConfigBlock("channel1", org1MSPConf, ordererOrgMSPConf, "Org1MSP", "OrdererOrgMSP")
require.NoError(t, err)
channel2Block, err := createConfigBlock("channel2", org2MSPConf, ordererOrgMSPConf, "Org2MSP", "OrdererOrgMSP")
require.NoError(t, err)
channel3Block, err := createConfigBlock("channel3", org2IntermediateMSPConf, ordererOrgMSPConf, "Org2IntermediateMSP", "OrdererOrgMSP")
require.NoError(t, err)

createChannel := func(cid string, block *cb.Block) {
testDir, err := ioutil.TempDir("", "peer-pkg")
require.NoError(t, err)
defer os.RemoveAll(testDir)

viper.Set("peer.tls.enabled", true)
viper.Set("peer.tls.cert.file", filepath.Join("testdata",
"Org1-server1-cert.pem"))
viper.Set("peer.tls.key.file", filepath.Join("testdata",
"Org1-server1-key.pem"))
viper.Set("peer.tls.rootcert.file", filepath.Join("testdata",
"Org1-cert.pem"))
viper.Set("peer.fileSystemPath", "/var/hyperledger/test/")
defer os.RemoveAll("/var/hyperledger/test/")
err := peer.Default.CreateChainFromBlock(block, nil, nil)
viper.Set("peer.tls.cert.file", filepath.Join("testdata", "Org1-server1-cert.pem"))
viper.Set("peer.tls.key.file", filepath.Join("testdata", "Org1-server1-key.pem"))
viper.Set("peer.tls.rootcert.file", filepath.Join("testdata", "Org1-cert.pem"))
viper.Set("peer.fileSystemPath", testDir)
err = peer.Default.CreateChainFromBlock(block, nil, nil)
if err != nil {
t.Fatalf("Failed to create config block (%s)", err)
}
t.Logf("Channel %s MSPIDs: (%s)", cid, peer.Default.GetMSPIDs(cid))
}

org1CertPool, err := createCertPool([][]byte{org1CA})

if err != nil {
t.Fatalf("Failed to load root certificates into pool: %v", err)
}
require.NoError(t, err)

// use server cert as client cert
org1ClientCert, err := tls.X509KeyPair(org1Server1Cert, org1Server1Key)
if err != nil {
t.Fatalf("Failed to load client certificate: %v", err)
}
require.NoError(t, err)

org1Creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{org1ClientCert},
RootCAs: org1CertPool,
})

org2ClientCert, err := tls.X509KeyPair(org2Server1Cert, org2Server1Key)
if err != nil {
t.Fatalf("Failed to load client certificate: %v", err)
}
require.NoError(t, err)
org2Creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{org2ClientCert},
RootCAs: org1CertPool,
})
org2IntermediateClientCert, err := tls.X509KeyPair(
org2IntermediateServer1Cert, org2IntermediateServer1Key)
if err != nil {
t.Fatalf("Failed to load client certificate: %v", err)
}

org2IntermediateClientCert, err := tls.X509KeyPair(org2IntermediateServer1Cert, org2IntermediateServer1Key)
require.NoError(t, err)
org2IntermediateCreds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{org2IntermediateClientCert},
RootCAs: org1CertPool,
})
ordererOrgClientCert, err := tls.X509KeyPair(ordererOrgServer1Cert,
ordererOrgServer1Key)
if err != nil {
t.Fatalf("Failed to load client certificate: %v", err)
}

ordererOrgClientCert, err := tls.X509KeyPair(ordererOrgServer1Cert, ordererOrgServer1Key)
require.NoError(t, err)

ordererOrgCreds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{ordererOrgClientCert},
RootCAs: org1CertPool,
Expand All @@ -235,7 +225,6 @@ func TestUpdateRootsFromConfigBlock(t *testing.T) {
numAppCAs int
numOrdererCAs int
}{

{
name: "MutualTLSOrg1Org1",
serverConfig: comm.ServerConfig{
Expand Down Expand Up @@ -266,9 +255,11 @@ func TestUpdateRootsFromConfigBlock(t *testing.T) {
},
createChannel: func() { createChannel("channel2", channel2Block) },
goodOptions: []grpc.DialOption{
grpc.WithTransportCredentials(org2Creds)},
grpc.WithTransportCredentials(org2Creds),
},
badOptions: []grpc.DialOption{
grpc.WithTransportCredentials(ordererOrgCreds)},
grpc.WithTransportCredentials(ordererOrgCreds),
},
numAppCAs: 6,
numOrdererCAs: 2,
},
Expand All @@ -285,9 +276,11 @@ func TestUpdateRootsFromConfigBlock(t *testing.T) {
},
createChannel: func() { createChannel("channel3", channel3Block) },
goodOptions: []grpc.DialOption{
grpc.WithTransportCredentials(org2IntermediateCreds)},
grpc.WithTransportCredentials(org2IntermediateCreds),
},
badOptions: []grpc.DialOption{
grpc.WithTransportCredentials(ordererOrgCreds)},
grpc.WithTransportCredentials(ordererOrgCreds),
},
numAppCAs: 10,
numOrdererCAs: 3,
},
Expand Down Expand Up @@ -327,10 +320,8 @@ func TestUpdateRootsFromConfigBlock(t *testing.T) {

// make sure we have the expected number of CAs
appCAs, ordererCAs := comm.GetCredentialSupport().GetClientRootCAs()
assert.Equal(t, test.numAppCAs, len(appCAs),
"Did not find expected number of app CAs for channel")
assert.Equal(t, test.numOrdererCAs, len(ordererCAs),
"Did not find expected number of orderer CAs for channel")
assert.Equal(t, test.numAppCAs, len(appCAs), "Did not find expected number of app CAs for channel")
assert.Equal(t, test.numOrdererCAs, len(ordererCAs), "Did not find expected number of orderer CAs for channel")

// invoke the EmptyCall service with good options
_, err = invokeEmptyCall(testAddress, test.goodOptions)
Expand Down
17 changes: 10 additions & 7 deletions gossip/gossip/algo/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ type PullEngine struct {
outgoingNONCES *util.Set
incomingNONCES *util.Set
digFilter DigestFilter

digestWaitTime time.Duration
requestWaitTime time.Duration
responseWaitTime time.Duration
}

// NewPullEngineWithFilter creates an instance of a PullEngine with a certain sleep time
Expand All @@ -116,6 +120,9 @@ func NewPullEngineWithFilter(participant PullAdapter, sleepTime time.Duration, d
incomingNONCES: util.NewSet(),
outgoingNONCES: util.NewSet(),
digFilter: df,
digestWaitTime: util.GetDurationOrDefault("peer.gossip.digestWaitTime", defDigestWaitTime),
requestWaitTime: util.GetDurationOrDefault("peer.gossip.requestWaitTime", defRequestWaitTime),
responseWaitTime: util.GetDurationOrDefault("peer.gossip.responseWaitTime", defResponseWaitTime),
}

go func() {
Expand Down Expand Up @@ -184,8 +191,7 @@ func (engine *PullEngine) initiatePull() {
engine.Hello(peer, nonce)
}

digestWaitTime := util.GetDurationOrDefault("peer.gossip.digestWaitTime", defDigestWaitTime)
time.AfterFunc(digestWaitTime, func() {
time.AfterFunc(engine.digestWaitTime, func() {
engine.processIncomingDigests()
})
}
Expand Down Expand Up @@ -213,9 +219,7 @@ func (engine *PullEngine) processIncomingDigests() {
engine.SendReq(dest, seqsToReq, engine.peers2nonces[dest])
}

responseWaitTime := util.GetDurationOrDefault("peer.gossip.responseWaitTime", defResponseWaitTime)
time.AfterFunc(responseWaitTime, engine.endPull)

time.AfterFunc(engine.responseWaitTime, engine.endPull)
}

func (engine *PullEngine) endPull() {
Expand Down Expand Up @@ -270,8 +274,7 @@ func (engine *PullEngine) Remove(seqs ...string) {
func (engine *PullEngine) OnHello(nonce uint64, context interface{}) {
engine.incomingNONCES.Add(nonce)

requestWaitTime := util.GetDurationOrDefault("peer.gossip.requestWaitTime", defRequestWaitTime)
time.AfterFunc(requestWaitTime, func() {
time.AfterFunc(engine.requestWaitTime, func() {
engine.incomingNONCES.Remove(nonce)
})

Expand Down

0 comments on commit 4af2b13

Please sign in to comment.