diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go index 922ac7614d8..8cbbae2ffa4 100644 --- a/gossip/service/gossip_service.go +++ b/gossip/service/gossip_service.go @@ -77,7 +77,8 @@ type gossipServiceImpl struct { deliveryService deliverclient.DeliverService deliveryFactory DeliveryServiceFactory lock sync.RWMutex - msgCrypto identity.Mapper + idMapper identity.Mapper + mcs api.MessageCryptoService peerIdentity []byte secAdv api.SecurityAdvisor } @@ -132,10 +133,11 @@ func InitGossipServiceCustomDeliveryFactory(peerIdentity []byte, endpoint string gossip := integration.NewGossipComponent(peerIdentity, endpoint, s, secAdv, mcs, idMapper, dialOpts, bootPeers...) gossipServiceInstance = &gossipServiceImpl{ + mcs: mcs, gossipSvc: gossip, chains: make(map[string]state.GossipStateProvider), deliveryFactory: factory, - msgCrypto: idMapper, + idMapper: idMapper, peerIdentity: peerIdentity, secAdv: secAdv, } @@ -158,7 +160,7 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committe defer g.lock.Unlock() // Initialize new state provider for given committer logger.Debug("Creating state provider for chainID", chainID) - g.chains[chainID] = state.NewGossipStateProvider(chainID, g, committer) + g.chains[chainID] = state.NewGossipStateProvider(chainID, g, committer, g.mcs) if g.deliveryService == nil { var err error g.deliveryService, err = g.deliveryFactory.Service(gossipServiceInstance) diff --git a/gossip/state/state.go b/gossip/state/state.go index e6789154f3c..40c7b5aee9f 100644 --- a/gossip/state/state.go +++ b/gossip/state/state.go @@ -25,6 +25,7 @@ import ( pb "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/core/committer" + "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/comm" common2 "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/gossip" @@ -47,10 +48,6 @@ type GossipStateProvider interface { Stop() } -var remoteStateMsgFilter = func(message interface{}) bool { - return message.(proto.ReceivedMessage).GetGossipMessage().IsRemoteStateMessage() -} - const ( defPollingPeriod = 200 * time.Millisecond defAntiEntropyInterval = 10 * time.Second @@ -60,6 +57,9 @@ const ( // the struct to handle in memory sliding window of // new ledger block to be acquired by hyper ledger type GossipStateProviderImpl struct { + // MessageCryptoService + mcs api.MessageCryptoService + // Chain id chainID string @@ -87,7 +87,7 @@ type GossipStateProviderImpl struct { } // NewGossipStateProvider creates initialized instance of gossip state provider -func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer.Committer) GossipStateProvider { +func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer.Committer, mcs api.MessageCryptoService) GossipStateProvider { logger := util.GetLogger(util.LoggingStateModule, "") gossipChan, _ := g.Accept(func(message interface{}) bool { @@ -96,6 +96,26 @@ func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer bytes.Equal(message.(*proto.GossipMessage).Channel, []byte(chainID)) }, false) + remoteStateMsgFilter := func(message interface{}) bool { + receivedMsg := message.(proto.ReceivedMessage) + msg := receivedMsg.GetGossipMessage() + if !msg.IsRemoteStateMessage() { + return false + } + // If we're not running with authentication, no point + // in enforcing access control + if !receivedMsg.GetConnectionInfo().IsAuthenticated() { + return true + } + connInfo := receivedMsg.GetConnectionInfo() + authErr := mcs.VerifyByChannel(msg.Channel, connInfo.Identity, connInfo.Auth.Signature, connInfo.Auth.SignedData) + if authErr != nil { + logger.Warning("Got unauthorized state transfer request from", string(connInfo.Identity)) + return false + } + return true + } + // Filter message which are only relevant for state transfer _, commChan := g.Accept(remoteStateMsgFilter, true) @@ -109,6 +129,10 @@ func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer } s := &GossipStateProviderImpl{ + // MessageCryptoService + mcs: mcs, + + // Chain ID chainID: chainID, // Instance of the gossip diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go index de2de563ba7..dac5d2f9eed 100644 --- a/gossip/state/state_test.go +++ b/gossip/state/state_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "errors" + pb "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/common/configtx/test" "github.com/hyperledger/fabric/common/util" @@ -49,6 +51,12 @@ var ( var orgID = []byte("ORG1") +type peerIdentityAcceptor func(identity api.PeerIdentityType) error + +var noopPeerIdentityAcceptor = func(identity api.PeerIdentityType) error { + return nil +} + type joinChanMsg struct { } @@ -78,30 +86,33 @@ func (*orgCryptoService) Verify(joinChanMsg api.JoinChannelMessage) error { return nil } -type naiveCryptoService struct { +type cryptoServiceMock struct { + acceptor peerIdentityAcceptor } // GetPKIidOfCert returns the PKI-ID of a peer's identity -func (*naiveCryptoService) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType { +func (*cryptoServiceMock) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType { return common.PKIidType(peerIdentity) } // VerifyBlock returns nil if the block is properly signed, // else returns error -func (*naiveCryptoService) VerifyBlock(chainID common.ChainID, signedBlock api.SignedBlock) error { +func (*cryptoServiceMock) VerifyBlock(chainID common.ChainID, signedBlock api.SignedBlock) error { return nil } // Sign signs msg with this peer's signing key and outputs // the signature if no error occurred. -func (*naiveCryptoService) Sign(msg []byte) ([]byte, error) { - return msg, nil +func (*cryptoServiceMock) Sign(msg []byte) ([]byte, error) { + clone := make([]byte, len(msg)) + copy(clone, msg) + return clone, nil } // Verify checks that signature is a valid signature of message under a peer's verification key. // If the verification succeeded, Verify returns nil meaning no error occurred. // If peerCert is nil, then the signature is verified against this peer's verification key. -func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error { +func (*cryptoServiceMock) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error { equal := bytes.Equal(signature, message) if !equal { return fmt.Errorf("Wrong signature:%v, %v", signature, message) @@ -113,11 +124,11 @@ func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature, // under a peer's verification key, but also in the context of a specific channel. // If the verification succeeded, Verify returns nil meaning no error occurred. // If peerIdentity is nil, then the signature is verified against this peer's verification key. -func (*naiveCryptoService) VerifyByChannel(chainID common.ChainID, peerIdentity api.PeerIdentityType, signature, message []byte) error { - return nil +func (cs *cryptoServiceMock) VerifyByChannel(chainID common.ChainID, peerIdentity api.PeerIdentityType, signature, message []byte) error { + return cs.acceptor(peerIdentity) } -func (*naiveCryptoService) ValidateIdentity(peerIdentity api.PeerIdentityType) error { +func (*cryptoServiceMock) ValidateIdentity(peerIdentity api.PeerIdentityType) error { return nil } @@ -132,9 +143,10 @@ func bootPeers(ids ...int) []string { // Simple presentation of peer which includes only // communication module, gossip and state transfer type peerNode struct { - g gossip.Gossip - s GossipStateProvider - + port int + g gossip.Gossip + s GossipStateProvider + cs *cryptoServiceMock commit committer.Committer } @@ -145,13 +157,13 @@ func (node *peerNode) shutdown() { } // Default configuration to be used for gossip and communication modules -func newGossipConfig(id int, maxMsgCount int, boot ...int) *gossip.Config { +func newGossipConfig(id int, boot ...int) *gossip.Config { port := id + portPrefix return &gossip.Config{ BindPort: port, BootstrapPeers: bootPeers(boot...), ID: fmt.Sprintf("p%d", id), - MaxBlockCountToStore: maxMsgCount, + MaxBlockCountToStore: 0, MaxPropagationBurstLatency: time.Duration(10) * time.Millisecond, MaxPropagationBurstSize: 10, PropagateIterations: 1, @@ -166,11 +178,10 @@ func newGossipConfig(id int, maxMsgCount int, boot ...int) *gossip.Config { } // Create gossip instance -func newGossipInstance(config *gossip.Config) gossip.Gossip { - cryptoService := &naiveCryptoService{} - idMapper := identity.NewIdentityMapper(cryptoService) +func newGossipInstance(config *gossip.Config, mcs api.MessageCryptoService) gossip.Gossip { + idMapper := identity.NewIdentityMapper(mcs) - return gossip.NewGossipServiceWithServer(config, &orgCryptoService{}, cryptoService, idMapper, []byte(config.InternalEndpoint)) + return gossip.NewGossipServiceWithServer(config, &orgCryptoService{}, mcs, idMapper, []byte(config.InternalEndpoint)) } // Create new instance of KVLedger to be used for testing @@ -182,24 +193,117 @@ func newCommitter(id int) committer.Committer { } // Constructing pseudo peer node, simulating only gossip and state transfer part -func newPeerNode(config *gossip.Config, committer committer.Committer) *peerNode { - +func newPeerNode(config *gossip.Config, committer committer.Committer, acceptor peerIdentityAcceptor) *peerNode { + cs := &cryptoServiceMock{acceptor: acceptor} // Gossip component based on configuration provided and communication module - gossip := newGossipInstance(config) + gossip := newGossipInstance(config, &cryptoServiceMock{acceptor: noopPeerIdentityAcceptor}) logger.Debug("Joinning channel", util.GetTestChainID()) gossip.JoinChan(&joinChanMsg{}, common.ChainID(util.GetTestChainID())) // Initialize pseudo peer simulator, which has only three // basic parts - return &peerNode{ - g: gossip, - s: NewGossipStateProvider(util.GetTestChainID(), gossip, committer), + return &peerNode{ + port: config.BindPort, + g: gossip, + s: NewGossipStateProvider(util.GetTestChainID(), gossip, committer, cs), commit: committer, + cs: cs, } } +func TestAccessControl(t *testing.T) { + viper.Set("peer.fileSystemPath", "/tmp/tests/ledger/node") + ledgermgmt.InitializeTestEnv() + defer ledgermgmt.CleanupTestEnv() + + bootstrapSetSize := 5 + bootstrapSet := make([]*peerNode, 0) + + authorizedPeers := map[string]struct{}{ + "localhost:5610": {}, + "localhost:5615": {}, + "localhost:5618": {}, + "localhost:5621": {}, + } + + blockPullPolicy := func(identity api.PeerIdentityType) error { + if _, isAuthorized := authorizedPeers[string(identity)]; isAuthorized { + return nil + } + return errors.New("Not authorized") + } + + for i := 0; i < bootstrapSetSize; i++ { + committer := newCommitter(i) + bootstrapSet = append(bootstrapSet, newPeerNode(newGossipConfig(i), committer, blockPullPolicy)) + } + + defer func() { + for _, p := range bootstrapSet { + p.shutdown() + } + }() + + msgCount := 5 + + for i := 1; i <= msgCount; i++ { + rawblock := pcomm.NewBlock(uint64(i), []byte{}) + if bytes, err := pb.Marshal(rawblock); err == nil { + payload := &proto.Payload{uint64(i), "", bytes} + bootstrapSet[0].s.AddPayload(payload) + } else { + t.Fail() + } + } + + standardPeerSetSize := 10 + peersSet := make([]*peerNode, 0) + + for i := 0; i < standardPeerSetSize; i++ { + committer := newCommitter(bootstrapSetSize + i) + peersSet = append(peersSet, newPeerNode(newGossipConfig(bootstrapSetSize+i, 0, 1, 2, 3, 4), committer, blockPullPolicy)) + } + + defer func() { + for _, p := range peersSet { + p.shutdown() + } + }() + + waitUntilTrueOrTimeout(t, func() bool { + for _, p := range peersSet { + if len(p.g.PeersOfChannel(common.ChainID(util.GetTestChainID()))) != bootstrapSetSize+standardPeerSetSize-1 { + logger.Debug("Peer discovery has not finished yet") + return false + } + } + logger.Debug("All peer discovered each other!!!") + return true + }, 30*time.Second) + + logger.Debug("Waiting for all blocks to arrive.") + waitUntilTrueOrTimeout(t, func() bool { + logger.Debug("Trying to see all authorized peers get all blocks, and all non-authorized didn't") + for _, p := range peersSet { + height, err := p.commit.LedgerHeight() + id := fmt.Sprintf("localhost:%d", p.port) + if _, isAuthorized := authorizedPeers[id]; isAuthorized { + if height != uint64(msgCount+1) || err != nil { + return false + } + } else { + if err == nil && height > 1 { + assert.Fail(t, "Peer", id, "got message but isn't authorized! Height:", height) + } + } + } + logger.Debug("All peers have same ledger height!!!") + return true + }, 60*time.Second) +} + /*// Simple scenario to start first booting node, gossip a message // then start second node and verify second node also receives it func TestNewGossipStateProvider_GossipingOneMessage(t *testing.T) { @@ -289,7 +393,7 @@ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) { for i := 0; i < bootstrapSetSize; i++ { committer := newCommitter(i) - bootstrapSet = append(bootstrapSet, newPeerNode(newGossipConfig(i, 100), committer)) + bootstrapSet = append(bootstrapSet, newPeerNode(newGossipConfig(i), committer, noopPeerIdentityAcceptor)) } defer func() { @@ -315,7 +419,7 @@ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) { for i := 0; i < standartPeersSize; i++ { committer := newCommitter(bootstrapSetSize + i) - peersSet = append(peersSet, newPeerNode(newGossipConfig(bootstrapSetSize+i, 100, 0, 1, 2, 3, 4), committer)) + peersSet = append(peersSet, newPeerNode(newGossipConfig(bootstrapSetSize+i, 0, 1, 2, 3, 4), committer, noopPeerIdentityAcceptor)) } defer func() { @@ -354,14 +458,18 @@ func TestGossipStateProvider_TestStateMessages(t *testing.T) { ledgermgmt.InitializeTestEnv() defer ledgermgmt.CleanupTestEnv() - bootPeer := newPeerNode(newGossipConfig(0, 100), newCommitter(0)) + bootPeer := newPeerNode(newGossipConfig(0), newCommitter(0), noopPeerIdentityAcceptor) defer bootPeer.shutdown() - peer := newPeerNode(newGossipConfig(1, 100, 0), newCommitter(1)) + peer := newPeerNode(newGossipConfig(1, 0), newCommitter(1), noopPeerIdentityAcceptor) defer peer.shutdown() - _, bootCh := bootPeer.g.Accept(remoteStateMsgFilter, true) - _, peerCh := peer.g.Accept(remoteStateMsgFilter, true) + naiveStateMsgPredicate := func(message interface{}) bool { + return message.(proto.ReceivedMessage).GetGossipMessage().IsRemoteStateMessage() + } + + _, bootCh := bootPeer.g.Accept(naiveStateMsgPredicate, true) + _, peerCh := peer.g.Accept(naiveStateMsgPredicate, true) wg := sync.WaitGroup{} wg.Add(2)