Skip to content

Commit

Permalink
[FAB-6207] Refactor gossip state demux direct msgs
Browse files Browse the repository at this point in the history
Previously, while introduced CR for FAB-5084, new channel has been added
into state transfer to reflect replication of the private data across
peers. This commit, commit removes duplicated code by leveraging same
channel and demuxing direct messages based on the type. This also used
to increase testability of the state code.

Change-Id: Iab76d818da592f55bfcdbb83c7f948df66691e7c
Signed-off-by: Artem Barger <bartem@il.ibm.com>
  • Loading branch information
C0rWin committed Sep 19, 2017
1 parent 9fc8db0 commit e64c810
Showing 1 changed file with 17 additions and 29 deletions.
46 changes: 17 additions & 29 deletions gossip/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,6 @@ type GossipStateProviderImpl struct {

commChan <-chan proto.ReceivedMessage

pvtDataChan <-chan proto.ReceivedMessage

// Queue of payloads which wasn't acquired yet
payloads PayloadsBuffer

Expand Down Expand Up @@ -177,13 +175,13 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
remoteStateMsgFilter := func(message interface{}) bool {
receivedMsg := message.(proto.ReceivedMessage)
msg := receivedMsg.GetGossipMessage()
if !msg.IsRemoteStateMessage() {
if !(msg.IsRemoteStateMessage() || msg.GetPrivateData() != nil) {
return false
}
connInfo := receivedMsg.GetConnectionInfo()
authErr := services.VerifyByChannel(msg.Channel, connInfo.Identity, connInfo.Auth.Signature, connInfo.Auth.SignedData)
if authErr != nil {
logger.Warning("Got unauthorized nodeMetastate transfer request from", string(connInfo.Identity))
logger.Warning("Got unauthorized request from", string(connInfo.Identity))
return false
}
return true
Expand All @@ -192,23 +190,6 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
// Filter message which are only relevant for nodeMetastate transfer
_, commChan := services.Accept(remoteStateMsgFilter, true)

// Filter private data messages
_, pvtDataChan := services.Accept(func(message interface{}) bool {
receivedMsg := message.(proto.ReceivedMessage)
msg := receivedMsg.GetGossipMessage()
if msg.GetPrivateData() == nil {
return false
}
connInfo := receivedMsg.GetConnectionInfo()
authErr := services.VerifyByChannel(msg.Channel, connInfo.Identity, connInfo.Auth.Signature, connInfo.Auth.SignedData)
if authErr != nil {
logger.Warning("Got unauthorized private data message from", string(connInfo.Identity))
return false
}
return true

}, true)

height, err := ledger.LedgerHeight()
if height == 0 {
// Panic here since this is an indication of invalid situation which should not happen in normal
Expand Down Expand Up @@ -236,9 +217,6 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
// Channel to read direct messages from other peers
commChan: commChan,

// Channel for private data messages
pvtDataChan: pvtDataChan,

// Create a queue for payload received
payloads: NewPayloadsBuffer(height),

Expand Down Expand Up @@ -291,18 +269,28 @@ func (s *GossipStateProviderImpl) listen() {
logger.Debug("Received new message via gossip channel")
go s.queueNewMessage(msg)
case msg := <-s.commChan:
logger.Debug("Direct message ", msg)
go s.directMessage(msg)
case msg := <-s.pvtDataChan:
logger.Debug("Private data message ", msg)
go s.privateDataMessage(msg)
logger.Debug("Dispatching a message", msg)
go s.dispatch(msg)
case <-s.stopCh:
s.stopCh <- struct{}{}
logger.Debug("Stop listening for new messages")
return
}
}
}
func (s *GossipStateProviderImpl) dispatch(msg proto.ReceivedMessage) {
// Check type of the message
if msg.GetGossipMessage().IsRemoteStateMessage() {
logger.Debug("Handling direct state transfer message")
// Got state transfer request response
s.directMessage(msg)
} else if msg.GetGossipMessage().GetPrivateData() != nil {
logger.Debug("Handling private data collection message")
// Handling private data replication message
s.privateDataMessage(msg)
}

}
func (s *GossipStateProviderImpl) privateDataMessage(msg proto.ReceivedMessage) {
if !bytes.Equal(msg.GetGossipMessage().Channel, []byte(s.chainID)) {
logger.Warning("Received state transfer request for channel",
Expand Down

0 comments on commit e64c810

Please sign in to comment.