Skip to content

Commit

Permalink
[FAB-2198] Gossip envelope refactoring - End
Browse files Browse the repository at this point in the history
This commit finishes the refactoring of the gossip message to
support envelopes with payloads and signatures.

It adds a new message type that embeds both an envelope
and both a GossipMessage: SignedGossipMessage.
since it embeds the GossipMessage, it inherits all of GossipMessage's
methods.

Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
Change-Id: I7b9f8ec8e33353d194de4cb266bb7d3124830378
  • Loading branch information
yacovm committed Feb 25, 2017
1 parent b7b5c4e commit 86cd87e
Show file tree
Hide file tree
Showing 26 changed files with 586 additions and 594 deletions.
2 changes: 1 addition & 1 deletion gossip/comm/comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Comm interface {
GetPKIid() common.PKIidType

// Send sends a message to remote peers
Send(msg *proto.GossipMessage, peers ...*RemotePeer)
Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer)

// Probe probes a remote node and returns nil if its responsive
Probe(peer *RemotePeer) error
Expand Down
37 changes: 20 additions & 17 deletions gossip/comm/comm_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,12 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT
conn.pkiID = pkiID
conn.logger = c.logger

h := func(m *proto.GossipMessage) {
h := func(m *proto.SignedGossipMessage) {
c.logger.Debug("Got message:", m)
c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
conn: conn,
lock: conn,
GossipMessage: m,
conn: conn,
lock: conn,
SignedGossipMessage: m,
})
}
conn.handler = h
Expand All @@ -211,15 +211,15 @@ func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidT
return nil, err
}

func (c *commImpl) Send(msg *proto.GossipMessage, peers ...*RemotePeer) {
func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer) {
if c.isStopping() || len(peers) == 0 {
return
}

c.logger.Debug("Entering, sending", msg, "to ", len(peers), "peers")

for _, peer := range peers {
go func(peer *RemotePeer, msg *proto.GossipMessage) {
go func(peer *RemotePeer, msg *proto.SignedGossipMessage) {
c.sendToEndpoint(peer, msg)
}(peer, msg)
}
Expand Down Expand Up @@ -247,7 +247,7 @@ func (c *commImpl) isPKIblackListed(p common.PKIidType) bool {
return false
}

func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.GossipMessage) {
func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessage) {
if c.isStopping() {
return
}
Expand Down Expand Up @@ -388,7 +388,7 @@ func (c *commImpl) authenticateRemotePeer(stream stream) (common.PKIidType, erro
remoteAddress := extractRemoteAddress(stream)
remoteCertHash := extractCertificateHashFromContext(ctx)
var err error
var cMsg *proto.GossipMessage
var cMsg *proto.SignedGossipMessage
var signer proto.Signer

// If TLS is detected, sign the hash of our cert to bind our TLS cert
Expand Down Expand Up @@ -475,11 +475,11 @@ func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error {
return nil
}

h := func(m *proto.GossipMessage) {
h := func(m *proto.SignedGossipMessage) {
c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{
conn: conn,
lock: conn,
GossipMessage: m,
conn: conn,
lock: conn,
SignedGossipMessage: m,
})
}

Expand All @@ -506,8 +506,8 @@ func (c *commImpl) disconnect(pkiID common.PKIidType) {
c.connStore.closeByPKIid(pkiID)
}

func readWithTimeout(stream interface{}, timeout time.Duration, address string) (*proto.GossipMessage, error) {
incChan := make(chan *proto.GossipMessage, 1)
func readWithTimeout(stream interface{}, timeout time.Duration, address string) (*proto.SignedGossipMessage, error) {
incChan := make(chan *proto.SignedGossipMessage, 1)
errChan := make(chan error, 1)
go func() {
if srvStr, isServerStr := stream.(proto.Gossip_GossipStreamServer); isServerStr {
Expand Down Expand Up @@ -541,7 +541,7 @@ func readWithTimeout(stream interface{}, timeout time.Duration, address string)
}
}

func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, hash []byte, cert api.PeerIdentityType, signer proto.Signer) *proto.GossipMessage {
func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, hash []byte, cert api.PeerIdentityType, signer proto.Signer) *proto.SignedGossipMessage {
m := &proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: 0,
Expand All @@ -553,8 +553,11 @@ func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, hash []byte, cert
},
},
}
m.Sign(signer)
return m
sMsg := &proto.SignedGossipMessage{
GossipMessage: m,
}
sMsg.Sign(signer)
return sMsg
}

type stream interface {
Expand Down
10 changes: 5 additions & 5 deletions gossip/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func handshaker(endpoint string, comm Comm, t *testing.T, sigMutator func([]byte
msg2Send := createGossipMsg()
nonce := uint64(rand.Int())
msg2Send.Nonce = nonce
go stream.Send(msg2Send.NoopSign())
go stream.Send(msg2Send.Envelope)
return acceptChan
}

Expand Down Expand Up @@ -355,7 +355,7 @@ func TestResponses(t *testing.T) {
for m := range inChan {
reply := createGossipMsg()
reply.Nonce = m.GetGossipMessage().Nonce + 1
m.Respond(reply)
m.Respond(reply.GossipMessage)
}
}()
expectedNOnce := uint64(msg.Nonce + 1)
Expand Down Expand Up @@ -515,14 +515,14 @@ func TestPresumedDead(t *testing.T) {
}
}

func createGossipMsg() *proto.GossipMessage {
return &proto.GossipMessage{
func createGossipMsg() *proto.SignedGossipMessage {
return (&proto.GossipMessage{
Tag: proto.GossipMessage_EMPTY,
Nonce: uint64(rand.Int()),
Content: &proto.GossipMessage_DataMsg{
DataMsg: &proto.DataMessage{},
},
}
}).NoopSign()
}

func remotePeer(port int) *RemotePeer {
Expand Down
12 changes: 4 additions & 8 deletions gossip/comm/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"google.golang.org/grpc"
)

type handler func(*proto.GossipMessage)
type handler func(message *proto.SignedGossipMessage)

type connFactory interface {
createConnection(endpoint string, pkiID common.PKIidType) (*connection, error)
Expand Down Expand Up @@ -239,7 +239,7 @@ func (conn *connection) toDie() bool {
return atomic.LoadInt32(&(conn.stopFlag)) == int32(1)
}

func (conn *connection) send(msg *proto.GossipMessage, onErr func(error)) {
func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error)) {
conn.Lock()
defer conn.Unlock()

Expand All @@ -248,10 +248,6 @@ func (conn *connection) send(msg *proto.GossipMessage, onErr func(error)) {
return
}

if msg.Envelope == nil {
msg.NoopSign()
}

m := &msgSending{
envelope: msg.Envelope,
onErr: onErr,
Expand All @@ -262,7 +258,7 @@ func (conn *connection) send(msg *proto.GossipMessage, onErr func(error)) {

func (conn *connection) serviceConnection() error {
errChan := make(chan error, 1)
msgChan := make(chan *proto.GossipMessage, util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize))
msgChan := make(chan *proto.SignedGossipMessage, util.GetIntOrDefault("peer.gossip.recvBuffSize", defRecvBuffSize))
defer close(msgChan)

// Call stream.Recv() asynchronously in readFromStream(),
Expand Down Expand Up @@ -312,7 +308,7 @@ func (conn *connection) writeToStream() {
}
}

func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.GossipMessage) {
func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.SignedGossipMessage) {
defer func() {
recover()
}() // msgChan might be closed
Expand Down
8 changes: 4 additions & 4 deletions gossip/comm/mock/mock_comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (packet *packetMock) Respond(msg *proto.GossipMessage) {
packet.src.socket <- &packetMock{
src: packet.dst,
dst: packet.src,
msg: msg,
msg: msg.NoopSign(),
}
}

Expand All @@ -98,8 +98,8 @@ func (packet *packetMock) GetSourceEnvelope() *proto.Envelope {
}

// GetGossipMessage returns the underlying GossipMessage
func (packet *packetMock) GetGossipMessage() *proto.GossipMessage {
return packet.msg.(*proto.GossipMessage)
func (packet *packetMock) GetGossipMessage() *proto.SignedGossipMessage {
return packet.msg.(*proto.SignedGossipMessage)
}

// GetPKIID returns the PKI-ID of the remote peer
Expand Down Expand Up @@ -141,7 +141,7 @@ func (mock *commMock) GetPKIid() common.PKIidType {
}

// Send sends a message to remote peers
func (mock *commMock) Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) {
func (mock *commMock) Send(msg *proto.SignedGossipMessage, peers ...*comm.RemotePeer) {
for _, peer := range peers {
logger.Debug("Sending message to peer ", peer.Endpoint, "from ", mock.id)
mock.members[peer.Endpoint].socket <- &packetMock{
Expand Down
8 changes: 4 additions & 4 deletions gossip/comm/mock/mock_comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ func TestMockComm(t *testing.T) {
comm2 := NewCommMock(second.endpoint, members)
defer comm2.Stop()

comm2.Send(&proto.GossipMessage{
comm2.Send((&proto.GossipMessage{
Content: &proto.GossipMessage_StateRequest{&proto.RemoteStateRequest{
SeqNums: []uint64{1, 2, 3},
}},
}, &comm.RemotePeer{"first", common.PKIidType("first")})
}).NoopSign(), &comm.RemotePeer{"first", common.PKIidType("first")})

msg := <-msgCh

Expand All @@ -71,12 +71,12 @@ func TestMockComm_PingPong(t *testing.T) {
rcvChA := peerA.Accept(all)
rcvChB := peerB.Accept(all)

peerA.Send(&proto.GossipMessage{
peerA.Send((&proto.GossipMessage{
Content: &proto.GossipMessage_DataMsg{
&proto.DataMessage{
&proto.Payload{1, "", []byte("Ping")},
}},
}, &comm.RemotePeer{"peerB", common.PKIidType("peerB")})
}).NoopSign(), &comm.RemotePeer{"peerB", common.PKIidType("peerB")})

msg := <-rcvChB
dataMsg := msg.GetGossipMessage().GetDataMsg()
Expand Down
8 changes: 4 additions & 4 deletions gossip/comm/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

// ReceivedMessageImpl is an implementation of ReceivedMessage
type ReceivedMessageImpl struct {
*proto.GossipMessage
*proto.SignedGossipMessage
lock sync.Locker
conn *connection
}
Expand All @@ -38,12 +38,12 @@ func (m *ReceivedMessageImpl) GetSourceEnvelope() *proto.Envelope {

// Respond sends a msg to the source that sent the ReceivedMessageImpl
func (m *ReceivedMessageImpl) Respond(msg *proto.GossipMessage) {
m.conn.send(msg, func(e error) {})
m.conn.send(msg.NoopSign(), func(e error) {})
}

// GetGossipMessage returns the inner GossipMessage
func (m *ReceivedMessageImpl) GetGossipMessage() *proto.GossipMessage {
return m.GossipMessage
func (m *ReceivedMessageImpl) GetGossipMessage() *proto.SignedGossipMessage {
return m.SignedGossipMessage
}

// GetPKIID returns the PKI-ID of the remote peer
Expand Down
8 changes: 4 additions & 4 deletions gossip/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
// CryptoService is an interface that the discovery expects to be implemented and passed on creation
type CryptoService interface {
// ValidateAliveMsg validates that an Alive message is authentic
ValidateAliveMsg(*proto.GossipMessage) bool
ValidateAliveMsg(message *proto.SignedGossipMessage) bool

// SignMessage signs a message
SignMessage(m *proto.GossipMessage, internalEndpoint string) *proto.Envelope
Expand All @@ -33,17 +33,17 @@ type CryptoService interface {
// CommService is an interface that the discovery expects to be implemented and passed on creation
type CommService interface {
// Gossip gossips a message
Gossip(msg *proto.GossipMessage)
Gossip(msg *proto.SignedGossipMessage)

// SendToPeer sends to a given peer a message.
// The nonce can be anything since the communication module handles the nonce itself
SendToPeer(peer *NetworkMember, msg *proto.GossipMessage)
SendToPeer(peer *NetworkMember, msg *proto.SignedGossipMessage)

// Ping probes a remote peer and returns if it's responsive or not
Ping(peer *NetworkMember) bool

// Accept returns a read-only channel for membership messages sent from remote peers
Accept() <-chan *proto.GossipMessage
Accept() <-chan *proto.SignedGossipMessage

// PresumedDead returns a read-only channel for peers that are presumed to be dead
PresumedDead() <-chan common.PKIidType
Expand Down
Loading

0 comments on commit 86cd87e

Please sign in to comment.