Skip to content

Commit

Permalink
[FAB-2424] Gossip: Extract connection authInfo in comm
Browse files Browse the repository at this point in the history
In the previous chapter of FAB-2424(https://gerrit.hyperledger.org/r/#/c/6393/)
we took care of channel-based access control by consulting the
MSP whether a given peer is eligible for a specific channel.
This works for blocks that are broadcasted, and also for blocks
that are pulled from peers in the gossip layer,
but alas - the state transfer layer still stays bare and exposed to
the mercy of malicious peers!

This commit extends the protos/gossip/extensions.go:ReceivedMessage
interface and replaces GetPKIID() with GetConnectionInfo() that returns:
	ID       common.PKIidType
	Auth     *AuthInfo:
			SignedData []byte
			Signature  []byte
	Identity api.PeerIdentityType
Using this, in the next commit I'll be able to modify the state
transfer module by having the predicate it passes to the gossip
layer when listening for messages from remote peers to also
call the method provided by the MSP: VerifyByChannel()
and in this way- to verify that the remote peer
should indeed receive blocks or not.

Change-Id: I9e2e6f4da430ed062a6fa12bebdfab4add6c4843
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Feb 27, 2017
1 parent b36a664 commit 00a9bd7
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 44 deletions.
27 changes: 21 additions & 6 deletions gossip/comm/comm_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT
var cc *grpc.ClientConn
var stream proto.Gossip_GossipStreamClient
var pkiID common.PKIidType
var connInfo *proto.ConnectionInfo

c.logger.Debug("Entering", endpoint, expectedPKIID)
defer c.logger.Debug("Exiting")
Expand All @@ -185,15 +186,17 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT
}

if stream, err = cl.GossipStream(context.Background()); err == nil {
pkiID, err = c.authenticateRemotePeer(stream)
connInfo, err = c.authenticateRemotePeer(stream)
if err == nil {
pkiID = connInfo.ID
if expectedPKIID != nil && !bytes.Equal(pkiID, expectedPKIID) {
// PKIID is nil when we don't know the remote PKI id's
c.logger.Warning("Remote endpoint claims to be a different peer, expected", expectedPKIID, "but got", pkiID)
return nil, errors.New("Authentication failure")
}
conn := newConnection(cl, cc, stream, nil)
conn.pkiID = pkiID
conn.info = connInfo
conn.logger = c.logger

h := func(m *proto.SignedGossipMessage) {
Expand All @@ -202,6 +205,7 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT
conn: conn,
lock: conn,
SignedGossipMessage: m,
connInfo: connInfo,
})
}
conn.handler = h
Expand Down Expand Up @@ -384,7 +388,7 @@ func extractRemoteAddress(stream stream) string {
return remoteAddress
}

func (c *commImpl) authenticateRemotePeer(stream stream) (common.PKIidType, error) {
func (c *commImpl) authenticateRemotePeer(stream stream) (*proto.ConnectionInfo, error) {
ctx := stream.Context()
remoteAddress := extractRemoteAddress(stream)
remoteCertHash := extractCertificateHashFromContext(ctx)
Expand Down Expand Up @@ -437,6 +441,11 @@ func (c *commImpl) authenticateRemotePeer(stream stream) (common.PKIidType, erro
return nil, err
}

connInfo := &proto.ConnectionInfo{
ID: receivedMsg.PkiID,
Identity: receivedMsg.Cert,
}

// if TLS is detected, verify remote peer
if remoteCertHash != nil && c.selfCertHash != nil {
if !bytes.Equal(remoteCertHash, receivedMsg.Hash) {
Expand All @@ -451,24 +460,29 @@ func (c *commImpl) authenticateRemotePeer(stream stream) (common.PKIidType, erro
c.logger.Error("Failed verifying signature from", remoteAddress, ":", err)
return nil, err
}
connInfo.Auth = &proto.AuthInfo{
Signature: m.Signature,
SignedData: m.Payload,
}
}

c.logger.Debug("Authenticated", remoteAddress)
return receivedMsg.PkiID, nil

return connInfo, nil
}

func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error {
if c.isStopping() {
return errors.New("Shutting down")
}
PKIID, err := c.authenticateRemotePeer(stream)
connInfo, err := c.authenticateRemotePeer(stream)
if err != nil {
c.logger.Error("Authentication failed")
return err
}
c.logger.Debug("Servicing", extractRemoteAddress(stream))

conn := c.connStore.onConnected(stream, PKIID)
conn := c.connStore.onConnected(stream, connInfo)

// if connStore denied the connection, it means we already have a connection to that peer
// so close this stream
Expand All @@ -481,14 +495,15 @@ func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error {
conn: conn,
lock: conn,
SignedGossipMessage: m,
connInfo: connInfo,
})
}

conn.handler = h

defer func() {
c.logger.Debug("Client", extractRemoteAddress(stream), " disconnected")
c.connStore.closeByPKIid(PKIID)
c.connStore.closeByPKIid(connInfo.ID)
conn.close()
}()

Expand Down
14 changes: 11 additions & 3 deletions gossip/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"testing"
"time"

"github.com/hyperledger/fabric/bccsp/factory"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/identity"
Expand All @@ -41,6 +42,7 @@ import (

func init() {
rand.Seed(42)
factory.InitFactories(nil)
}

func acceptAll(msg interface{}) bool {
Expand Down Expand Up @@ -178,7 +180,13 @@ func TestHandshake(t *testing.T) {
acceptChan := handshaker("localhost:9610", comm, t, nil, nil)
time.Sleep(2 * time.Second)
assert.Equal(t, 1, len(acceptChan))

msg := <-acceptChan
expectedPKIID := common.PKIidType("localhost:9610")
assert.Equal(t, expectedPKIID, msg.GetConnectionInfo().ID)
assert.Equal(t, api.PeerIdentityType("localhost:9610"), msg.GetConnectionInfo().Identity)
assert.NotNil(t, msg.GetConnectionInfo().Auth)
assert.True(t, msg.GetConnectionInfo().IsAuthenticated())
assert.Equal(t, msg.GetConnectionInfo().Auth.Signature, msg.GetConnectionInfo().Auth.SignedData)
// negative path, nothing should be read from the channel because the signature is wrong
mutateSig := func(b []byte) []byte {
if b[0] == 0 {
Expand Down Expand Up @@ -222,7 +230,7 @@ func TestBasic(t *testing.T) {
waitForMessages(t, out, 2, "Didn't receive 2 messages")
}

func TestGetPKIID(t *testing.T) {
func TestGetConnectionInfo(t *testing.T) {
t.Parallel()
comm1, _ := newCommInstance(6000, naiveSec)
comm2, _ := newCommInstance(7000, naiveSec)
Expand All @@ -234,7 +242,7 @@ func TestGetPKIID(t *testing.T) {
case <-time.After(time.Second * 10):
t.Fatal("Didn't receive a message in time")
case msg := <-m1:
assert.Equal(t, comm2.GetPKIid(), msg.GetPKIID())
assert.Equal(t, comm2.GetPKIid(), msg.GetConnectionInfo().ID)
}
}

Expand Down
14 changes: 8 additions & 6 deletions gossip/comm/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,22 +154,23 @@ func (cs *connectionStore) shutdown() {
wg.Wait()
}

func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer, pkiID common.PKIidType) *connection {
func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer, connInfo *proto.ConnectionInfo) *connection {
cs.Lock()
defer cs.Unlock()

if c, exists := cs.pki2Conn[string(pkiID)]; exists {
if c, exists := cs.pki2Conn[string(connInfo.Identity)]; exists {
c.close()
}

return cs.registerConn(pkiID, serverStream)
return cs.registerConn(connInfo, serverStream)
}

func (cs *connectionStore) registerConn(pkiID common.PKIidType, serverStream proto.Gossip_GossipStreamServer) *connection {
func (cs *connectionStore) registerConn(connInfo *proto.ConnectionInfo, serverStream proto.Gossip_GossipStreamServer) *connection {
conn := newConnection(nil, nil, nil, serverStream)
conn.pkiID = pkiID
conn.pkiID = connInfo.ID
conn.info = connInfo
conn.logger = cs.logger
cs.pki2Conn[string(pkiID)] = conn
cs.pki2Conn[string(connInfo.ID)] = conn
return conn
}

Expand Down Expand Up @@ -197,6 +198,7 @@ func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_Go
}

type connection struct {
info *proto.ConnectionInfo
outBuff chan *msgSending
logger *logging.Logger // logger
pkiID common.PKIidType // pkiID of the remote endpoint
Expand Down
4 changes: 2 additions & 2 deletions gossip/comm/mock/mock_comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ func (packet *packetMock) GetGossipMessage() *proto.SignedGossipMessage {
return packet.msg.(*proto.SignedGossipMessage)
}

// GetPKIID returns the PKI-ID of the remote peer
// GetConnectionInfo returns information about the remote peer
// that sent the message
func (packet *packetMock) GetPKIID() common.PKIidType {
func (packet *packetMock) GetConnectionInfo() *proto.ConnectionInfo {
return nil
}

Expand Down
14 changes: 7 additions & 7 deletions gossip/comm/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ package comm
import (
"sync"

"github.com/hyperledger/fabric/gossip/common"
proto "github.com/hyperledger/fabric/protos/gossip"
)

// ReceivedMessageImpl is an implementation of ReceivedMessage
type ReceivedMessageImpl struct {
*proto.SignedGossipMessage
lock sync.Locker
conn *connection
lock sync.Locker
conn *connection
connInfo *proto.ConnectionInfo
}

// GetSourceEnvelope Returns the Envelope the ReceivedMessage was
Expand All @@ -46,8 +46,8 @@ func (m *ReceivedMessageImpl) GetGossipMessage() *proto.SignedGossipMessage {
return m.SignedGossipMessage
}

// GetPKIID returns the PKI-ID of the remote peer
// that sent the message
func (m *ReceivedMessageImpl) GetPKIID() common.PKIidType {
return m.conn.pkiID
// GetConnectionInfo returns information about the remote peer
// that send the message
func (m *ReceivedMessageImpl) GetConnectionInfo() *proto.ConnectionInfo {
return m.connInfo
}
3 changes: 1 addition & 2 deletions gossip/gossip/certstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/gossip/gossip/algo"
"github.com/hyperledger/fabric/gossip/gossip/pull"
Expand Down Expand Up @@ -64,7 +63,7 @@ func (s *sentMsg) GetGossipMessage() *proto.SignedGossipMessage {
return s.msg
}

func (s *sentMsg) GetPKIID() common.PKIidType {
func (s *sentMsg) GetConnectionInfo() *proto.ConnectionInfo {
return nil
}

Expand Down
20 changes: 10 additions & 10 deletions gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,13 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
gc.logger.Warning("Got message", msg.GetGossipMessage(), "but it's not a per-channel message, discarding it")
return
}
orgID := gc.GetOrgOfPeer(msg.GetPKIID())
orgID := gc.GetOrgOfPeer(msg.GetConnectionInfo().ID)
if orgID == nil {
gc.logger.Warning("Couldn't find org identity of peer", msg.GetPKIID())
gc.logger.Warning("Couldn't find org identity of peer", msg.GetConnectionInfo().ID)
return
}
if !gc.IsOrgInChannel(orgID) {
gc.logger.Warning("Point to point message came from", msg.GetPKIID(), "but it's not eligible for the channel", msg.GetGossipMessage().Channel)
gc.logger.Warning("Point to point message came from", msg.GetConnectionInfo().ID, "but it's not eligible for the channel", msg.GetGossipMessage().Channel)
return
}

Expand All @@ -375,7 +375,7 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
}

if m.IsStateInfoSnapshot() {
gc.handleStateInfSnapshot(m.GossipMessage, msg.GetPKIID())
gc.handleStateInfSnapshot(m.GossipMessage, msg.GetConnectionInfo().ID)
return
}

Expand All @@ -384,10 +384,10 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {

if m.IsDataMsg() {
if m.GetDataMsg().Payload == nil {
gc.logger.Warning("Payload is empty, got it from", msg.GetPKIID())
gc.logger.Warning("Payload is empty, got it from", msg.GetConnectionInfo().ID)
return
}
if !gc.verifyBlock(m.GossipMessage, msg.GetPKIID()) {
if !gc.verifyBlock(m.GossipMessage, msg.GetConnectionInfo().ID) {
gc.logger.Warning("Failed verifying block", m.GetDataMsg().Payload.SeqNum)
return
}
Expand All @@ -410,8 +410,8 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
return
}
if m.IsPullMsg() && m.GetPullMsgType() == proto.PullMsgType_BlockMessage {
if !gc.EligibleForChannel(discovery.NetworkMember{PKIid: msg.GetPKIID()}) {
gc.logger.Warning(msg.GetPKIID(), "isn't eligible for channel", gc.chainID)
if !gc.EligibleForChannel(discovery.NetworkMember{PKIid: msg.GetConnectionInfo().ID}) {
gc.logger.Warning(msg.GetConnectionInfo().ID, "isn't eligible for channel", gc.chainID)
return
}
if m.IsDataUpdate() {
Expand All @@ -425,7 +425,7 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
gc.logger.Warning("DataUpdate message contains item with channel", gMsg.Channel, "but should be", gc.chainID)
return
}
if !gc.verifyBlock(gMsg.GossipMessage, msg.GetPKIID()) {
if !gc.verifyBlock(gMsg.GossipMessage, msg.GetConnectionInfo().ID) {
return
}
}
Expand Down Expand Up @@ -525,7 +525,7 @@ func (gc *gossipChannel) verifyMsg(msg proto.ReceivedMessage) bool {
return false
}

if msg.GetPKIID() == nil {
if msg.GetConnectionInfo().ID == nil {
gc.logger.Warning("Message has nil PKI-ID")
return false
}
Expand Down
6 changes: 4 additions & 2 deletions gossip/gossip/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,10 @@ func (m *receivedMsg) Respond(msg *proto.GossipMessage) {
m.Called(msg)
}

func (m *receivedMsg) GetPKIID() common.PKIidType {
return m.PKIID
func (m *receivedMsg) GetConnectionInfo() *proto.ConnectionInfo {
return &proto.ConnectionInfo{
ID: m.PKIID,
}
}

type gossipAdapterMock struct {
Expand Down
4 changes: 2 additions & 2 deletions gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (g *gossipServiceImpl) handleMessage(m proto.ReceivedMessage) {

msg := m.GetGossipMessage()

g.logger.Debug("Entering,", m.GetPKIID(), "sent us", msg)
g.logger.Debug("Entering,", m.GetConnectionInfo().ID, "sent us", msg)
defer g.logger.Debug("Exiting")

if !g.validateMsg(m) {
Expand Down Expand Up @@ -302,7 +302,7 @@ func (g *gossipServiceImpl) handleMessage(m proto.ReceivedMessage) {
if msg.IsChannelRestricted() {
if gc := g.chanState.getGossipChannelByChainID(msg.Channel); gc == nil {
// If we're not in the channel but we should forward to peers of our org
if g.isInMyorg(discovery.NetworkMember{PKIid: m.GetPKIID()}) && msg.IsStateInfoMsg() {
if g.isInMyorg(discovery.NetworkMember{PKIid: m.GetConnectionInfo().ID}) && msg.IsStateInfoMsg() {
if g.stateInfoMsgStore.Add(msg) {
g.emitter.Add(msg)
}
Expand Down
3 changes: 1 addition & 2 deletions gossip/gossip/pull/pullstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/hyperledger/fabric/gossip/comm"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/gossip/gossip/algo"
"github.com/hyperledger/fabric/gossip/util"
Expand Down Expand Up @@ -63,7 +62,7 @@ func (pm *pullMsg) GetGossipMessage() *proto.SignedGossipMessage {
return pm.msg
}

func (pm *pullMsg) GetPKIID() common.PKIidType {
func (pm *pullMsg) GetConnectionInfo() *proto.ConnectionInfo {
return nil
}

Expand Down
Loading

0 comments on commit 00a9bd7

Please sign in to comment.