Skip to content

Commit

Permalink
[FAB-5677]Add services mediator for state transfer
Browse files Browse the repository at this point in the history
Currently state transfer uses two external services which helps to
manage replication of missing blocks, theirs verification, reordering
and etc... This commits unifies those service into single mediator
component allowing better control of required API.

Change-Id: Ia37cfe760597221dfc7e0d4a767781dba7c936c9
Signed-off-by: Artem Barger <bartem@il.ibm.com>
  • Loading branch information
C0rWin committed Aug 9, 2017
1 parent c661446 commit f560850
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 26 deletions.
3 changes: 2 additions & 1 deletion gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committe
defer g.lock.Unlock()
// Initialize new state provider for given committer
logger.Debug("Creating state provider for chainID", chainID)
g.chains[chainID] = state.NewGossipStateProvider(chainID, g, committer, g.mcs)
servicesAdapater := &state.ServicesMediator{GossipAdapter: g, MCSAdapter: g.mcs}
g.chains[chainID] = state.NewGossipStateProvider(chainID, servicesAdapater, committer)
if g.deliveryService == nil {
var err error
g.deliveryService, err = g.deliveryFactory.Service(gossipServiceInstance, endpoints, g.mcs)
Expand Down
57 changes: 36 additions & 21 deletions gossip/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,37 @@ type GossipAdapter interface {
PeersOfChannel(common2.ChainID) []discovery.NetworkMember
}

// MCSAdapter adapter of message crypto service interface to bound
// specific APIs required by state transfer service
type MCSAdapter interface {
// VerifyBlock returns nil if the block is properly signed, and the claimed seqNum is the
// sequence number that the block's header contains.
// else returns error
VerifyBlock(chainID common2.ChainID, seqNum uint64, signedBlock []byte) error

// VerifyByChannel checks that signature is a valid signature of message
// under a peer's verification key, but also in the context of a specific channel.
// If the verification succeeded, Verify returns nil meaning no error occurred.
// If peerIdentity is nil, then the verification fails.
VerifyByChannel(chainID common2.ChainID, peerIdentity api.PeerIdentityType, signature, message []byte) error
}

// ServicesMediator aggregated adapter to compound all mediator
// required by state transfer into single struct
type ServicesMediator struct {
GossipAdapter
MCSAdapter
}

// GossipStateProviderImpl the implementation of the GossipStateProvider interface
// the struct to handle in memory sliding window of
// new ledger block to be acquired by hyper ledger
type GossipStateProviderImpl struct {
// MessageCryptoService
mcs api.MessageCryptoService

// Chain id
chainID string

// The gossiping service
gossip GossipAdapter
mediator *ServicesMediator

// Channel to read gossip messages from
gossipChan <-chan *proto.GossipMessage
Expand Down Expand Up @@ -114,12 +133,11 @@ func init() {

// NewGossipCoordinatedStateProvider creates state provider with coordinator instance
// to orchestrate arrival of private rwsets and blocks before committing them into the ledger.
func NewGossipCoordinatedStateProvider(chainID string, g GossipAdapter,
coordinator Coordinator, mcs api.MessageCryptoService) GossipStateProvider {
func NewGossipCoordinatedStateProvider(chainID string, services *ServicesMediator, coordinator Coordinator) GossipStateProvider {

logger := util.GetLogger(util.LoggingStateModule, "")

gossipChan, _ := g.Accept(func(message interface{}) bool {
gossipChan, _ := services.Accept(func(message interface{}) bool {
// Get only data messages
return message.(*proto.GossipMessage).IsDataMsg() &&
bytes.Equal(message.(*proto.GossipMessage).Channel, []byte(chainID))
Expand All @@ -137,7 +155,7 @@ func NewGossipCoordinatedStateProvider(chainID string, g GossipAdapter,
return true
}
connInfo := receivedMsg.GetConnectionInfo()
authErr := mcs.VerifyByChannel(msg.Channel, connInfo.Identity, connInfo.Auth.Signature, connInfo.Auth.SignedData)
authErr := services.VerifyByChannel(msg.Channel, connInfo.Identity, connInfo.Auth.Signature, connInfo.Auth.SignedData)
if authErr != nil {
logger.Warning("Got unauthorized nodeMetastate transfer request from", string(connInfo.Identity))
return false
Expand All @@ -146,7 +164,7 @@ func NewGossipCoordinatedStateProvider(chainID string, g GossipAdapter,
}

// Filter message which are only relevant for nodeMetastate transfer
_, commChan := g.Accept(remoteStateMsgFilter, true)
_, commChan := services.Accept(remoteStateMsgFilter, true)

height, err := coordinator.LedgerHeight()
if height == 0 {
Expand All @@ -164,14 +182,11 @@ func NewGossipCoordinatedStateProvider(chainID string, g GossipAdapter,

s := &GossipStateProviderImpl{
// MessageCryptoService
mcs: mcs,
mediator: services,

// Chain ID
chainID: chainID,

// Instance of the gossip
gossip: g,

// Channel to read new messages from
gossipChan: gossipChan,

Expand Down Expand Up @@ -202,7 +217,7 @@ func NewGossipCoordinatedStateProvider(chainID string, g GossipAdapter,
b, err := nodeMetastate.Bytes()
if err == nil {
logger.Debug("Updating gossip metadate nodeMetastate", nodeMetastate)
g.UpdateChannelMetadata(b, common2.ChainID(s.chainID))
services.UpdateChannelMetadata(b, common2.ChainID(s.chainID))
} else {
logger.Errorf("Unable to serialize node meta nodeMetastate, error = %s", err)
}
Expand All @@ -223,8 +238,8 @@ func NewGossipCoordinatedStateProvider(chainID string, g GossipAdapter,

// NewGossipStateProvider creates initialized instance of gossip state provider with committer
// which is wrapped up into coordinator, kept for API compatibility
func NewGossipStateProvider(chainID string, g GossipAdapter, committer committer.Committer, mcs api.MessageCryptoService) GossipStateProvider {
return NewGossipCoordinatedStateProvider(chainID, g, NewCoordinator(committer), mcs)
func NewGossipStateProvider(chainID string, services *ServicesMediator, committer committer.Committer) GossipStateProvider {
return NewGossipCoordinatedStateProvider(chainID, services, NewCoordinator(committer))
}

func (s *GossipStateProviderImpl) listen() {
Expand Down Expand Up @@ -389,7 +404,7 @@ func (s *GossipStateProviderImpl) handleStateResponse(msg proto.ReceivedMessage)
}
for _, payload := range response.GetPayloads() {
logger.Debugf("Received payload with sequence number %d.", payload.SeqNum)
if err := s.mcs.VerifyBlock(common2.ChainID(s.chainID), payload.SeqNum, payload.Data); err != nil {
if err := s.mediator.VerifyBlock(common2.ChainID(s.chainID), payload.SeqNum, payload.Data); err != nil {
logger.Warningf("Error verifying block with sequence number %d, due to %s", payload.SeqNum, err)
return uint64(0), err
}
Expand Down Expand Up @@ -518,7 +533,7 @@ func (s *GossipStateProviderImpl) antiEntropy() {
// find maximum available ledger height across peers
func (s *GossipStateProviderImpl) maxAvailableLedgerHeight() uint64 {
max := uint64(0)
for _, p := range s.gossip.PeersOfChannel(common2.ChainID(s.chainID)) {
for _, p := range s.mediator.PeersOfChannel(common2.ChainID(s.chainID)) {
if nodeMetastate, err := FromBytes(p.Metadata); err == nil {
if max < nodeMetastate.LedgerHeight {
max = nodeMetastate.LedgerHeight
Expand Down Expand Up @@ -559,7 +574,7 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64)
logger.Debugf("State transfer, with peer %s, requesting blocks in range [%d...%d], "+
"for chainID %s", peer.Endpoint, prev, next, s.chainID)

s.gossip.Send(gossipMsg, peer)
s.mediator.Send(gossipMsg, peer)
tryCounts++

// Wait until timeout or response arrival
Expand Down Expand Up @@ -619,7 +634,7 @@ func (s *GossipStateProviderImpl) selectPeerToRequestFrom(height uint64) (*comm.
func (s *GossipStateProviderImpl) filterPeers(predicate func(peer discovery.NetworkMember) bool) []*comm.RemotePeer {
var peers []*comm.RemotePeer

for _, member := range s.gossip.PeersOfChannel(common2.ChainID(s.chainID)) {
for _, member := range s.mediator.PeersOfChannel(common2.ChainID(s.chainID)) {
if predicate(member) {
peers = append(peers, &comm.RemotePeer{Endpoint: member.PreferredEndpoint(), PKIID: member.PKIid})
}
Expand Down Expand Up @@ -684,7 +699,7 @@ func (s *GossipStateProviderImpl) commitBlock(block *common.Block, pvtData []*Pv
// Decode nodeMetastate to byte array
b, err := nodeMetastate.Bytes()
if err == nil {
s.gossip.UpdateChannelMetadata(b, common2.ChainID(s.chainID))
s.mediator.UpdateChannelMetadata(b, common2.ChainID(s.chainID))
} else {

logger.Errorf("Unable to serialize node meta nodeMetastate, error = %s", err)
Expand Down
14 changes: 10 additions & 4 deletions gossip/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ func newPeerNodeWithGossip(config *gossip.Config, committer committer.Committer,
// Initialize pseudo peer simulator, which has only three
// basic parts

sp := NewGossipStateProvider(util.GetTestChainID(), g, committer, cs)
servicesAdapater := &ServicesMediator{GossipAdapter: g, MCSAdapter: cs}
sp := NewGossipStateProvider(util.GetTestChainID(), servicesAdapater, committer)
if sp == nil {
return nil
}
Expand Down Expand Up @@ -1018,7 +1019,8 @@ func TestTransferOfPrivateRWSet(t *testing.T) {

coord1.On("Close")

st := NewGossipCoordinatedStateProvider(chainID, g, coord1, &cryptoServiceMock{acceptor: noopPeerIdentityAcceptor})
servicesAdapater := &ServicesMediator{GossipAdapter: g, MCSAdapter: &cryptoServiceMock{acceptor: noopPeerIdentityAcceptor}}
st := NewGossipCoordinatedStateProvider(chainID, servicesAdapater, coord1)
defer st.Stop()

// Mocked state request message
Expand Down Expand Up @@ -1242,9 +1244,13 @@ func TestTransferOfPvtDataBetweenPeers(t *testing.T) {
}).Return([]string{}, nil) // No pvt data to complete and no error

cryptoService := &cryptoServiceMock{acceptor: noopPeerIdentityAcceptor}
peer1State := NewGossipCoordinatedStateProvider(chainID, peers["peer1"], peers["peer1"].coord, cryptoService)

mediator := &ServicesMediator{GossipAdapter: peers["peer1"], MCSAdapter: cryptoService}
peer1State := NewGossipCoordinatedStateProvider(chainID, mediator, peers["peer1"].coord)
defer peer1State.Stop()
peer2State := NewGossipCoordinatedStateProvider(chainID, peers["peer2"], peers["peer2"].coord, cryptoService)

mediator = &ServicesMediator{GossipAdapter: peers["peer2"], MCSAdapter: cryptoService}
peer2State := NewGossipCoordinatedStateProvider(chainID, mediator, peers["peer2"].coord)
defer peer2State.Stop()

// Make sure state was replicated
Expand Down

0 comments on commit f560850

Please sign in to comment.