Skip to content

Commit

Permalink
[FAB-17675] Prevent gossip probe from registering as a connection
Browse files Browse the repository at this point in the history
Part of FAB-17672:

Gossip bootstrap peer and anchor peer connection establishments are a two step process:

I)  Connect to the remote endpoint, and figure out its organization association
II) Connect to the remote endpoint once again, this time sending membership information according to what it is eligible to.

Unfortunately, if the responding peer connects to the initiator peer at the same time, the connection of (1) will be overwritten by the connection (gossip keeps a single connection among every pair of peers) from the responder peer and the initiator peer will consider the responder peer as dead.

In production, where peers run for a long time and constnatly retry, this is not a problem.

However, in integration tests that expect data to be disseminated in a timely manner, this might cause flakes.

this change set adds a flag that indicates this connection is not a long lasting one, and thus there is no point to add it to the connection store.

Change-Id: Id8f4ee12145748527233301356af830c3401ee02
Signed-off-by: yacovm <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Mar 26, 2020
1 parent 4d515e0 commit e6b3f4d
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 129 deletions.
4 changes: 2 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 21 additions & 6 deletions gossip/comm/comm_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ const (
DefSendBuffSize = 20
)

var (
errProbe = errors.New("probe")
)

// SecurityAdvisor defines an external auxiliary object
// that provides security and identity related capabilities
type SecurityAdvisor interface {
Expand Down Expand Up @@ -163,7 +167,7 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT

ctx, cancel = context.WithCancel(context.Background())
if stream, err = cl.GossipStream(ctx); err == nil {
connInfo, err = c.authenticateRemotePeer(stream, true)
connInfo, err = c.authenticateRemotePeer(stream, true, false)
if err == nil {
pkiID = connInfo.ID
// PKIID is nil when we don't know the remote PKI id's
Expand Down Expand Up @@ -305,7 +309,7 @@ func (c *commImpl) Handshake(remotePeer *RemotePeer) (api.PeerIdentityType, erro
if err != nil {
return nil, err
}
connInfo, err := c.authenticateRemotePeer(stream, true)
connInfo, err := c.authenticateRemotePeer(stream, true, true)
if err != nil {
c.logger.Warningf("Authentication failed: %v", err)
return nil, err
Expand Down Expand Up @@ -404,7 +408,7 @@ func extractRemoteAddress(stream stream) string {
return remoteAddress
}

func (c *commImpl) authenticateRemotePeer(stream stream, initiator bool) (*protoext.ConnectionInfo, error) {
func (c *commImpl) authenticateRemotePeer(stream stream, initiator, isProbe bool) (*protoext.ConnectionInfo, error) {
ctx := stream.Context()
remoteAddress := extractRemoteAddress(stream)
remoteCertHash := extractCertificateHashFromContext(ctx)
Expand All @@ -431,7 +435,7 @@ func (c *commImpl) authenticateRemotePeer(stream stream, initiator bool) (*proto
return nil, fmt.Errorf("No TLS certificate")
}

cMsg, err = c.createConnectionMsg(c.PKIID, selfCertHash, c.peerIdentity, signer)
cMsg, err = c.createConnectionMsg(c.PKIID, selfCertHash, c.peerIdentity, signer, isProbe)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -492,6 +496,10 @@ func (c *commImpl) authenticateRemotePeer(stream stream, initiator bool) (*proto

c.logger.Debug("Authenticated", remoteAddress)

if receivedMsg.Probe {
return connInfo, errProbe
}

return connInfo, nil
}

Expand Down Expand Up @@ -556,7 +564,13 @@ func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error {
if c.isStopping() {
return fmt.Errorf("Shutting down")
}
connInfo, err := c.authenticateRemotePeer(stream, false)
connInfo, err := c.authenticateRemotePeer(stream, false, false)

if err == errProbe {
c.logger.Infof("Peer %s (%s) probed us", connInfo.ID, connInfo.Endpoint)
return nil
}

if err != nil {
c.logger.Errorf("Authentication failed: %v", err)
return err
Expand Down Expand Up @@ -618,7 +632,7 @@ func readWithTimeout(stream stream, timeout time.Duration, address string) (*pro
}
}

func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, certHash []byte, cert api.PeerIdentityType, signer protoext.Signer) (*protoext.SignedGossipMessage, error) {
func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, certHash []byte, cert api.PeerIdentityType, signer protoext.Signer, isProbe bool) (*protoext.SignedGossipMessage, error) {
m := &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: 0,
Expand All @@ -627,6 +641,7 @@ func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, certHash []byte,
TlsCertHash: certHash,
Identity: cert,
PkiId: pkiID,
Probe: isProbe,
},
},
}
Expand Down
4 changes: 2 additions & 2 deletions gossip/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func handshaker(port int, endpoint string, comm Comm, t *testing.T, connMutator
mac := hmac.New(sha256.New, hmacKey)
mac.Write(msg)
return mac.Sum(nil), nil
})
}, false)
// Mutate connection message to test negative paths
msg = connMutator(msg)
// Send your own connection message
Expand Down Expand Up @@ -555,7 +555,7 @@ func TestCloseConn(t *testing.T) {
mac := hmac.New(sha256.New, hmacKey)
mac.Write(msg)
return mac.Sum(nil), nil
})
}, false)
assert.NoError(t, stream.Send(connMsg.Envelope))
stream.Send(createGossipMsg().Envelope)
select {
Expand Down
Loading

0 comments on commit e6b3f4d

Please sign in to comment.