Skip to content

Commit

Permalink
[FAB-7491] client TLS cert support for gossip
Browse files Browse the repository at this point in the history
This change set adds client TLS certificate support for gossip.
The handshake method now selects the certificate to hash and attach
its binding to the handshake message according to whether the peer
initiated the connection or not.

The change set adds the support in the form of a struct that
wraps the TLS certificates with atomic references.

This is in order to prepare for the future where we might have
dynamic TLS certificate updates.

Unit tests haven't been added, because I changed the test code
to always have a different client and server certificate,
and in case only the server certificate is present than the code
that is executed in the production path is code that is already
tested, because it is computed by code in core/peer/start.go

Change-Id: I1edddb2321c629f88080510befe1db26fa0b6925
Signed-off-by: yacovm <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Dec 19, 2017
1 parent 4d802d1 commit 95f14a9
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 87 deletions.
2 changes: 1 addition & 1 deletion core/peer/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestCreateChainFromBlock(t *testing.T) {
return dialOpts
}
err = service.InitGossipServiceCustomDeliveryFactory(
identity, "localhost:13611", grpcServer,
identity, "localhost:13611", grpcServer, nil,
&mockDeliveryClientFactory{},
messageCryptoService, secAdv, defaultSecureDialOpts)

Expand Down
2 changes: 1 addition & 1 deletion core/scc/cscc/configure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestConfigerInvokeJoinChainCorrectParams(t *testing.T) {
identity, _ := mgmt.GetLocalSigningIdentityOrPanic().Serialize()
messageCryptoService := peergossip.NewMCS(&mocks.ChannelPolicyManagerGetter{}, localmsp.NewSigner(), mgmt.NewDeserializersManager())
secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager())
err := service.InitGossipServiceCustomDeliveryFactory(identity, peerEndpoint, nil, &mockDeliveryClientFactory{}, messageCryptoService, secAdv, nil)
err := service.InitGossipServiceCustomDeliveryFactory(identity, peerEndpoint, nil, nil, &mockDeliveryClientFactory{}, messageCryptoService, secAdv, nil)
assert.NoError(t, err)

// Successful path for JoinChain
Expand Down
56 changes: 30 additions & 26 deletions gossip/comm/comm_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,14 @@ func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity

var ll net.Listener
var s *grpc.Server
var certHash []byte
var certs *common.TLSCertificates

if port > 0 {
s, ll, secureDialOpts, certHash = createGRPCLayer(port)
s, ll, secureDialOpts, certs = createGRPCLayer(port)
}

commInst := &commImpl{
pubSub: util.NewPubSub(),
selfCertHash: certHash,
PKIID: idMapper.GetPKIidOfCert(peerIdentity),
idMapper: idMapper,
logger: util.GetLogger(util.LoggingCommModule, fmt.Sprintf("%d", port)),
Expand All @@ -82,6 +81,7 @@ func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity
exitChan: make(chan struct{}, 1),
subscriptions: make([]chan proto.ReceivedMessage, 0),
dialTimeout: util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout),
tlsCerts: certs,
}
commInst.connStore = newConnStore(commInst, commInst.logger)

Expand All @@ -98,7 +98,7 @@ func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity
}

// NewCommInstance creates a new comm instance that binds itself to the given gRPC server
func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,
func NewCommInstance(s *grpc.Server, certs *common.TLSCertificates, idStore identity.Mapper,
peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,
dialOpts ...grpc.DialOption) (Comm, error) {

Expand All @@ -107,23 +107,16 @@ func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Map
return nil, errors.WithStack(err)
}

if cert != nil {
inst := commInst.(*commImpl)
if len(cert.Certificate) == 0 {
inst.logger.Panic("Certificate supplied but certificate chain is empty")
} else {
inst.selfCertHash = certHashFromRawCert(cert.Certificate[0])
}
}
commInst.(*commImpl).tlsCerts = certs

proto.RegisterGossipServer(s, commInst.(*commImpl))

return commInst, nil
}

type commImpl struct {
tlsCerts *common.TLSCertificates
pubSub *util.PubSub
selfCertHash []byte
peerIdentity api.PeerIdentityType
idMapper identity.Mapper
logger *logging.Logger
Expand Down Expand Up @@ -179,7 +172,7 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT

ctx, cf := context.WithCancel(context.Background())
if stream, err = cl.GossipStream(ctx); err == nil {
connInfo, err = c.authenticateRemotePeer(stream)
connInfo, err = c.authenticateRemotePeer(stream, true)
if err == nil {
pkiID = connInfo.ID
if expectedPKIID != nil && !bytes.Equal(pkiID, expectedPKIID) {
Expand Down Expand Up @@ -301,7 +294,7 @@ func (c *commImpl) Handshake(remotePeer *RemotePeer) (api.PeerIdentityType, erro
if err != nil {
return nil, err
}
connInfo, err := c.authenticateRemotePeer(stream)
connInfo, err := c.authenticateRemotePeer(stream, true)
if err != nil {
c.logger.Warningf("Authentication failed: %v", err)
return nil, err
Expand Down Expand Up @@ -402,13 +395,22 @@ func extractRemoteAddress(stream stream) string {
return remoteAddress
}

func (c *commImpl) authenticateRemotePeer(stream stream) (*proto.ConnectionInfo, error) {
func (c *commImpl) authenticateRemotePeer(stream stream, initiator bool) (*proto.ConnectionInfo, error) {
ctx := stream.Context()
remoteAddress := extractRemoteAddress(stream)
remoteCertHash := extractCertificateHashFromContext(ctx)
var err error
var cMsg *proto.SignedGossipMessage
useTLS := c.selfCertHash != nil
useTLS := c.tlsCerts != nil
var selfCertHash []byte

if useTLS {
certReference := c.tlsCerts.TLSServerCert
if initiator {
certReference = c.tlsCerts.TLSClientCert
}
selfCertHash = certHashFromRawCert(certReference.Load().(*tls.Certificate).Certificate[0])
}

signer := func(msg []byte) ([]byte, error) {
return c.idMapper.Sign(msg)
Expand All @@ -420,7 +422,7 @@ func (c *commImpl) authenticateRemotePeer(stream stream) (*proto.ConnectionInfo,
return nil, fmt.Errorf("No TLS certificate")
}

cMsg, err = c.createConnectionMsg(c.PKIID, c.selfCertHash, c.peerIdentity, signer)
cMsg, err = c.createConnectionMsg(c.PKIID, selfCertHash, c.peerIdentity, signer)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -545,7 +547,7 @@ func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error {
if c.isStopping() {
return fmt.Errorf("Shutting down")
}
connInfo, err := c.authenticateRemotePeer(stream)
connInfo, err := c.authenticateRemotePeer(stream, false)
if err != nil {
c.logger.Errorf("Authentication failed: %v", err)
return err
Expand Down Expand Up @@ -653,25 +655,24 @@ type stream interface {
grpc.Stream
}

func createGRPCLayer(port int) (*grpc.Server, net.Listener, api.PeerSecureDialOpts, []byte) {
var returnedCertHash []byte
func createGRPCLayer(port int) (*grpc.Server, net.Listener, api.PeerSecureDialOpts, *common.TLSCertificates) {
var s *grpc.Server
var ll net.Listener
var err error
var serverOpts []grpc.ServerOption
var dialOpts []grpc.DialOption

cert := GenerateCertificatesOrPanic()
returnedCertHash = certHashFromRawCert(cert.Certificate[0])
clientCert := GenerateCertificatesOrPanic()
serverCert := GenerateCertificatesOrPanic()

tlsConf := &tls.Config{
Certificates: []tls.Certificate{cert},
Certificates: []tls.Certificate{serverCert},
ClientAuth: tls.RequestClientCert,
InsecureSkipVerify: true,
}
serverOpts = append(serverOpts, grpc.Creds(credentials.NewTLS(tlsConf)))
ta := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
Certificates: []tls.Certificate{clientCert},
InsecureSkipVerify: true,
})
dialOpts = append(dialOpts, grpc.WithTransportCredentials(ta))
Expand All @@ -685,7 +686,10 @@ func createGRPCLayer(port int) (*grpc.Server, net.Listener, api.PeerSecureDialOp
return dialOpts
}
s = grpc.NewServer(serverOpts...)
return s, ll, secureDialOpts, returnedCertHash
certs := &common.TLSCertificates{}
certs.TLSServerCert.Store(&serverCert)
certs.TLSClientCert.Store(&clientCert)
return s, ll, secureDialOpts, certs
}

func topicForAck(nonce uint64, pkiID common.PKIidType) string {
Expand Down
12 changes: 4 additions & 8 deletions gossip/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,22 +350,18 @@ func TestBasic(t *testing.T) {

func TestProdConstructor(t *testing.T) {
t.Parallel()
peerIdentity := GenerateCertificatesOrPanic()
srv, lsnr, dialOpts, certHash := createGRPCLayer(20000)
srv, lsnr, dialOpts, certs := createGRPCLayer(20000)
defer srv.Stop()
defer lsnr.Close()
id := []byte("localhost:20000")
comm1, _ := NewCommInstance(srv, &peerIdentity, identity.NewIdentityMapper(naiveSec, id, noopPurgeIdentity), id, dialOpts)
comm1.(*commImpl).selfCertHash = certHash
comm1, _ := NewCommInstance(srv, certs, identity.NewIdentityMapper(naiveSec, id, noopPurgeIdentity), id, dialOpts)
go srv.Serve(lsnr)

peerIdentity = GenerateCertificatesOrPanic()
srv, lsnr, dialOpts, certHash = createGRPCLayer(30000)
srv, lsnr, dialOpts, certs = createGRPCLayer(30000)
defer srv.Stop()
defer lsnr.Close()
id = []byte("localhost:30000")
comm2, _ := NewCommInstance(srv, &peerIdentity, identity.NewIdentityMapper(naiveSec, id, noopPurgeIdentity), id, dialOpts)
comm2.(*commImpl).selfCertHash = certHash
comm2, _ := NewCommInstance(srv, certs, identity.NewIdentityMapper(naiveSec, id, noopPurgeIdentity), id, dialOpts)
go srv.Serve(lsnr)
defer comm1.Stop()
defer comm2.Stop()
Expand Down
17 changes: 17 additions & 0 deletions gossip/common/cert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package common

import (
"sync/atomic"
)

// TLSCertificates aggregates server and client TLS certificates
type TLSCertificates struct {
TLSServerCert atomic.Value // *tls.Certificate server certificate of the peer
TLSClientCert atomic.Value // *tls.Certificate client certificate of the peer
}
10 changes: 5 additions & 5 deletions gossip/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ SPDX-License-Identifier: Apache-2.0
package gossip

import (
"crypto/tls"
"fmt"
"time"

Expand Down Expand Up @@ -113,10 +112,11 @@ type Config struct {

SkipBlockVerification bool // Should we skip verifying block messages or not

PublishCertPeriod time.Duration // Time from startup certificates are included in Alive messages
PublishStateInfoInterval time.Duration // Determines frequency of pushing state info messages to peers
RequestStateInfoInterval time.Duration // Determines frequency of pulling state info messages from peers
TLSServerCert *tls.Certificate // TLS certificate of the peer
PublishCertPeriod time.Duration // Time from startup certificates are included in Alive messages
PublishStateInfoInterval time.Duration // Determines frequency of pushing state info messages to peers
RequestStateInfoInterval time.Duration // Determines frequency of pulling state info messages from peers

TLSCerts *common.TLSCertificates // TLS certificates of the peer

InternalEndpoint string // Endpoint we publish to peers in our organization
ExternalEndpoint string // Peer publishes this endpoint instead of SelfEndpoint to foreign organizations
Expand Down
7 changes: 3 additions & 4 deletions gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package gossip

import (
"bytes"
"crypto/tls"
"fmt"
"reflect"
"sync"
Expand Down Expand Up @@ -97,7 +96,7 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis
if s == nil {
g.comm, err = createCommWithServer(conf.BindPort, g.idMapper, selfIdentity, secureDialOpts)
} else {
g.comm, err = createCommWithoutServer(s, conf.TLSServerCert, g.idMapper, selfIdentity, secureDialOpts)
g.comm, err = createCommWithoutServer(s, conf.TLSCerts, g.idMapper, selfIdentity, secureDialOpts)
}

if err != nil {
Expand Down Expand Up @@ -159,9 +158,9 @@ func newChannelState(g *gossipServiceImpl) *channelState {
}
}

func createCommWithoutServer(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,
func createCommWithoutServer(s *grpc.Server, certs *common.TLSCertificates, idStore identity.Mapper,
identity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts) (comm.Comm, error) {
return comm.NewCommInstance(s, cert, idStore, identity, secureDialOpts)
return comm.NewCommInstance(s, certs, idStore, identity, secureDialOpts)
}

// NewGossipServiceWithServer creates a new gossip instance with a gRPC server
Expand Down
26 changes: 9 additions & 17 deletions gossip/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ SPDX-License-Identifier: Apache-2.0
package integration

import (
"crypto/tls"
"net"
"strconv"
"time"

"github.com/hyperledger/fabric/core/config"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/gossip"
"github.com/hyperledger/fabric/gossip/util"
"github.com/pkg/errors"
Expand All @@ -23,7 +22,7 @@ import (

// This file is used to bootstrap a gossip instance and/or leader election service instance

func newConfig(selfEndpoint string, externalEndpoint string, bootPeers ...string) (*gossip.Config, error) {
func newConfig(selfEndpoint string, externalEndpoint string, certs *common.TLSCertificates, bootPeers ...string) (*gossip.Config, error) {
_, p, err := net.SplitHostPort(selfEndpoint)

if err != nil {
Expand All @@ -35,16 +34,7 @@ func newConfig(selfEndpoint string, externalEndpoint string, bootPeers ...string
return nil, errors.Wrapf(err, "misconfigured endpoint %s, failed to parse port number", selfEndpoint)
}

var cert *tls.Certificate
if viper.GetBool("peer.tls.enabled") {
certTmp, err := tls.LoadX509KeyPair(config.GetPath("peer.tls.cert.file"), config.GetPath("peer.tls.key.file"))
if err != nil {
return nil, errors.Wrap(err, "failed to load certificates")
}
cert = &certTmp
}

return &gossip.Config{
conf := &gossip.Config{
BindPort: int(port),
BootstrapPeers: bootPeers,
ID: selfEndpoint,
Expand All @@ -61,18 +51,20 @@ func newConfig(selfEndpoint string, externalEndpoint string, bootPeers ...string
RequestStateInfoInterval: util.GetDurationOrDefault("peer.gossip.requestStateInfoInterval", 4*time.Second),
PublishStateInfoInterval: util.GetDurationOrDefault("peer.gossip.publishStateInfoInterval", 4*time.Second),
SkipBlockVerification: viper.GetBool("peer.gossip.skipBlockVerification"),
TLSServerCert: cert,
}, nil
TLSCerts: certs,
}

return conf, nil
}

// NewGossipComponent creates a gossip component that attaches itself to the given gRPC server
func NewGossipComponent(peerIdentity []byte, endpoint string, s *grpc.Server,
secAdv api.SecurityAdvisor, cryptSvc api.MessageCryptoService,
secureDialOpts api.PeerSecureDialOpts, bootPeers ...string) (gossip.Gossip, error) {
secureDialOpts api.PeerSecureDialOpts, certs *common.TLSCertificates, bootPeers ...string) (gossip.Gossip, error) {

externalEndpoint := viper.GetString("peer.gossip.externalEndpoint")

conf, err := newConfig(endpoint, externalEndpoint, bootPeers...)
conf, err := newConfig(endpoint, externalEndpoint, certs, bootPeers...)
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
18 changes: 3 additions & 15 deletions gossip/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ func TestNewGossipCryptoService(t *testing.T) {
msptesttools.LoadMSPSetupForTesting()
peerIdentity, _ := mgmt.GetLocalSigningIdentityOrPanic().Serialize()
g1, err := NewGossipComponent(peerIdentity, endpoint1, s1, secAdv, cryptSvc,
defaultSecureDialOpts)
defaultSecureDialOpts, nil)
assert.NoError(t, err)
g2, err := NewGossipComponent(peerIdentity, endpoint2, s2, secAdv, cryptSvc,
defaultSecureDialOpts, endpoint1)
defaultSecureDialOpts, nil, endpoint1)
assert.NoError(t, err)
g3, err := NewGossipComponent(peerIdentity, endpoint3, s3, secAdv, cryptSvc,
defaultSecureDialOpts, endpoint1)
defaultSecureDialOpts, nil, endpoint1)
assert.NoError(t, err)
defer g1.Stop()
defer g2.Stop()
Expand All @@ -70,18 +70,6 @@ func TestNewGossipCryptoService(t *testing.T) {
go s3.Serve(ll3)
}

func TestBadInitialization(t *testing.T) {
msptesttools.LoadMSPSetupForTesting()
peerIdentity, _ := mgmt.GetLocalSigningIdentityOrPanic().Serialize()
s1 := grpc.NewServer()
_, err := newConfig("anEndpointWithoutAPort", "anEndpointWithoutAPort")

viper.Set("peer.tls.enabled", true)
_, err = NewGossipComponent(peerIdentity, "localhost:5000", s1, secAdv, cryptSvc,
defaultSecureDialOpts)
assert.Error(t, err)
}

func setupTestEnv() {
viper.SetConfigName("core")
viper.SetEnvPrefix("CORE")
Expand Down
Loading

0 comments on commit 95f14a9

Please sign in to comment.