Skip to content

Commit

Permalink
[FAB-1168]: Multichain support for gossip state trans.
Browse files Browse the repository at this point in the history
Gossip state transfer mechanism should be aware of the
relevant chainID, since it has to query for peers which
part of given channel to be to initiate state replication
of missing blocks. Also while doing peer-to-peer messaging
state mechanism should indicate which chain runs required
state replication of missing blocks. Current commit extends
gossip state transfer to provide support for multi chain
functionality.

Change-Id: Ie3e3807c8efe8ee0acf323e220ddb14fc0211a33
Signed-off-by: Artem Barger <bartem@il.ibm.com>
  • Loading branch information
C0rWin committed Jan 9, 2017
1 parent edd6258 commit a4c510b
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 52 deletions.
12 changes: 6 additions & 6 deletions core/committer/noopssinglechain/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ import (
var logger *logging.Logger // package-level logger

func init() {
logger = logging.MustGetLogger("committer")
logging.SetLevel(logging.DEBUG, logger.Module)
logger = logging.MustGetLogger("noopssinglechain.client")
}

// DeliverService used to communicate with orderers to obtain
Expand Down Expand Up @@ -200,7 +199,7 @@ func (d *DeliverService) readUntilClose() {
// Create payload with a block received
payload := createPayload(seqNum, t.Block)
// Use payload to create gossip message
gossipMsg := createGossipMsg(payload)
gossipMsg := createGossipMsg(d.chainID, payload)
logger.Debug("Creating gossip message", gossipMsg)

logger.Debugf("Adding payload locally, buffer seqNum = [%d], peers number [%d]", seqNum, numberOfPeers)
Expand All @@ -221,15 +220,16 @@ func (d *DeliverService) readUntilClose() {
}
}

func createGossipMsg(payload *gossip_proto.Payload) *gossip_proto.GossipMessage {
func createGossipMsg(chainID string, payload *gossip_proto.Payload) *gossip_proto.GossipMessage {
gossipMsg := &gossip_proto.GossipMessage{
Nonce: 0,
Nonce: 0,
Tag: gossip_proto.GossipMessage_CHAN_AND_ORG,
Channel: []byte(chainID),
Content: &gossip_proto.GossipMessage_DataMsg{
DataMsg: &gossip_proto.DataMessage{
Payload: payload,
},
},
Tag: gossip_proto.GossipMessage_EMPTY,
}
return gossipMsg
}
Expand Down
11 changes: 8 additions & 3 deletions core/util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ import (
"strings"
"time"

"github.com/hyperledger/fabric/common/metadata"

"github.com/golang/protobuf/ptypes/timestamp"
"github.com/hyperledger/fabric/common/metadata"
"golang.org/x/crypto/sha3"
)

Expand All @@ -38,7 +37,7 @@ type alg struct {
const defaultAlg = "sha256"

var availableIDgenAlgs = map[string]alg{
defaultAlg: alg{GenerateIDfromTxSHAHash},
defaultAlg: {GenerateIDfromTxSHAHash},
}

// ComputeCryptoHash should be used in openchain code so that we can change the actual algo used for crypto-hash at one place
Expand Down Expand Up @@ -146,12 +145,18 @@ func ArrayToChaincodeArgs(args []string) [][]byte {
}

const testchainid = "**TEST_CHAINID**"
const testorgid = "**TEST_ORGID**"

//GetTestChainID returns the CHAINID constant in use by orderer
func GetTestChainID() string {
return testchainid
}

//GetTestOrgID returns the ORGID constant in use by gossip join message
func GetTestOrgID() string {
return testorgid
}

//GetSysCCVersion returns the version of all system chaincodes
//This needs to be revisited on policies around system chaincode
//"upgrades" from user and relationship with "fabric" upgrade. For
Expand Down
2 changes: 1 addition & 1 deletion gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis

g.certStore = newCertStore(g.createCertStorePuller(), idMapper, selfIdentity, mcs)

g.logger.SetLevel(logging.WARNING)
g.logger.SetLevel(logging.DEBUG)

go g.start()

Expand Down
20 changes: 11 additions & 9 deletions gossip/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,22 @@ func newConfig(selfEndpoint string, bootPeers ...string) *gossip.Config {
if err != nil {
panic(err)
}

return &gossip.Config{
BindPort: int(port),
BootstrapPeers: bootPeers,
ID: selfEndpoint,
MaxBlockCountToStore: 100,
MaxPropagationBurstLatency: time.Millisecond * 50,
MaxPropagationBurstSize: 3,
BindPort: int(port),
BootstrapPeers: bootPeers,
ID: selfEndpoint,
MaxBlockCountToStore: 100,
MaxPropagationBurstLatency: time.Duration(10) * time.Millisecond,
MaxPropagationBurstSize: 10,
PropagateIterations: 1,
PropagatePeerNum: 3,
PullInterval: time.Second * 5,
PullInterval: time.Duration(4) * time.Second,
PullPeerNum: 3,
SelfEndpoint: selfEndpoint,
PublishCertPeriod: time.Duration(4) * time.Second,
RequestStateInfoInterval: time.Duration(4) * time.Second,
PublishCertPeriod: 10 * time.Second,
RequestStateInfoInterval: 4 * time.Second,
PublishStateInfoInterval: 4 * time.Second,
}
}

Expand Down
27 changes: 21 additions & 6 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package service

import (
"sync"
"time"

peerComm "github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/core/util"
"github.com/hyperledger/fabric/gossip/api"
gossipCommon "github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/gossip"
Expand All @@ -40,6 +42,23 @@ var (

type gossipSvc gossip.Gossip

// TODO: This is a temporary join channel struct, to be removed once
// the structure of configuration block in terms of anchor peers will
// be defined and coded. Currently need it to allow the end-to-end
// skeleton to work, having gossip multi chain support.
type joinChanMsg struct {
}

// GetTimestamp returns the timestamp of the message's creation
func (*joinChanMsg) GetTimestamp() time.Time {
return time.Now()
}

// AnchorPeers returns all the anchor peers that are in the channel
func (*joinChanMsg) AnchorPeers() []api.AnchorPeer {
return []api.AnchorPeer{{Cert: api.PeerIdentityType(util.GetTestOrgID())}}
}

// GossipService encapsulates gossip and state capabilities into single interface
type GossipService interface {
gossip.Gossip
Expand All @@ -58,11 +77,6 @@ type gossipServiceImpl struct {
lock sync.RWMutex
}

// JoinChan makes the Gossip instance join a channel
func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID gossipCommon.ChainID) {
// TODO: eventually we'll have only 1 JoinChannel method
}

var logger = logging.MustGetLogger("gossipService")

// InitGossipService initialize gossip service
Expand Down Expand Up @@ -99,7 +113,8 @@ func (g *gossipServiceImpl) JoinChannel(commiter committer.Committer, block *com
} else {
// Initialize new state provider for given committer
logger.Debug("Creating state provider for chainID", chainID)
g.chains[chainID] = state.NewGossipStateProvider(g, commiter)
g.chains[chainID] = state.NewGossipStateProvider(chainID, g, commiter)
g.JoinChan(&joinChanMsg{}, gossipCommon.ChainID(chainID))
}

return nil
Expand Down
54 changes: 38 additions & 16 deletions gossip/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package state

import (
"bytes"
"math/rand"
"sync"
"sync/atomic"
Expand All @@ -25,6 +26,7 @@ import (
pb "github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/gossip/comm"
common2 "github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/gossip"
"github.com/hyperledger/fabric/gossip/proto"
"github.com/hyperledger/fabric/protos/common"
Expand Down Expand Up @@ -58,6 +60,9 @@ const (
// the struct to handle in memory sliding window of
// new ledger block to be acquired by hyper ledger
type GossipStateProviderImpl struct {
// Chain id
chainID string

// The gossiping service
gossip gossip.Gossip

Expand All @@ -82,18 +87,19 @@ type GossipStateProviderImpl struct {
}

// NewGossipStateProvider creates initialized instance of gossip state provider
func NewGossipStateProvider(g gossip.Gossip, committer committer.Committer) GossipStateProvider {
func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer.Committer) GossipStateProvider {
logger, _ := logging.GetLogger("GossipStateProvider")
logging.SetLevel(logging.DEBUG, logger.Module)

gossipChan, _ := g.Accept(func(message interface{}) bool {
// Get only data messages
return message.(*proto.GossipMessage).GetDataMsg() != nil
return message.(*proto.GossipMessage).IsDataMsg() &&
bytes.Equal(message.(*proto.GossipMessage).Channel, []byte(chainID))
}, false)

// Filter message which are only relevant for state transfer
_, commChan := g.Accept(func(message interface{}) bool {
return message.(comm.ReceivedMessage).GetGossipMessage().GetStateRequest() != nil ||
message.(comm.ReceivedMessage).GetGossipMessage().GetStateResponse() != nil
return message.(comm.ReceivedMessage).GetGossipMessage().IsRemoteStateMessage()
}, true)

height, err := committer.LedgerHeight()
Expand All @@ -106,6 +112,8 @@ func NewGossipStateProvider(g gossip.Gossip, committer committer.Committer) Goss
}

s := &GossipStateProviderImpl{
chainID: chainID,

// Instance of the gossip
gossip: g,

Expand All @@ -131,7 +139,8 @@ func NewGossipStateProvider(g gossip.Gossip, committer committer.Committer) Goss
s.logger.Infof("Updating node metadata information, current ledger sequence is at = %d, next expected block is = %d", state.LedgerHeight, s.payloads.Next())
bytes, err := state.Bytes()
if err == nil {
g.UpdateMetadata(bytes)
s.logger.Debug("[VVV]: Updating gossip metadate state", state)
g.UpdateChannelMetadata(bytes, common2.ChainID(s.chainID))
} else {
s.logger.Errorf("Unable to serialize node meta state, error = %s", err)
}
Expand Down Expand Up @@ -182,6 +191,12 @@ func (s *GossipStateProviderImpl) directMessage(msg comm.ReceivedMessage) {
return
}

if !bytes.Equal(msg.GetGossipMessage().Channel, []byte(s.chainID)) {
s.logger.Warning("Received state transfer request for channel",
string(msg.GetGossipMessage().Channel), "while expecting channel", s.chainID, "skipping request...")
return
}

incoming := msg.GetGossipMessage()

if incoming.GetStateRequest() != nil {
Expand All @@ -208,19 +223,17 @@ func (s *GossipStateProviderImpl) handleStateRequest(msg comm.ReceivedMessage) {
s.logger.Errorf("Could not marshal block: %s", err)
}

if err != nil {
s.logger.Errorf("Could not calculate hash of block: %s", err)
}

response.Payloads = append(response.Payloads, &proto.Payload{
SeqNum: seqNum,
Data: blockBytes,
// TODO: Check hash generation for given block from the ledger
Hash: "",
Hash: string(blocks[0].Header.Hash()),
})
}
// Sending back response with missing blocks
msg.Respond(&proto.GossipMessage{
Nonce: 0,
Tag: proto.GossipMessage_CHAN_OR_ORG,
Channel: []byte(s.chainID),
Content: &proto.GossipMessage_StateResponse{response},
})
}
Expand Down Expand Up @@ -251,6 +264,12 @@ func (s *GossipStateProviderImpl) Stop() {

// New message notification/handler
func (s *GossipStateProviderImpl) queueNewMessage(msg *proto.GossipMessage) {
if !bytes.Equal(msg.Channel, []byte(s.chainID)) {
s.logger.Warning("Received state transfer request for channel",
string(msg.Channel), "while expecting channel", s.chainID, "skipping request...")
return
}

dataMsg := msg.GetDataMsg()
if dataMsg != nil {
// Add new payload to ordered set
Expand Down Expand Up @@ -302,7 +321,7 @@ func (s *GossipStateProviderImpl) antiEntropy() {
current, _ := s.committer.LedgerHeight()
max, _ := s.committer.LedgerHeight()

for _, p := range s.gossip.Peers() {
for _, p := range s.gossip.PeersOfChannel(common2.ChainID(s.chainID)) {
if state, err := FromBytes(p.Metadata); err == nil {
if max < state.LedgerHeight {
max = state.LedgerHeight
Expand All @@ -328,7 +347,7 @@ func (s *GossipStateProviderImpl) antiEntropy() {
func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64) {
var peers []*comm.RemotePeer
// Filtering peers which might have relevant blocks
for _, value := range s.gossip.Peers() {
for _, value := range s.gossip.PeersOfChannel(common2.ChainID(s.chainID)) {
nodeMetadata, err := FromBytes(value.Metadata)
if err == nil {
if nodeMetadata.LedgerHeight >= end {
Expand Down Expand Up @@ -356,13 +375,15 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64)
request.SeqNums = append(request.SeqNums, uint64(i))
}

s.logger.Debug("[$$$$$$$$$$$$$$$$]: Sending direct request to complete missing blocks, ", request)
s.logger.Debug("[$$$$$$$$$$$$$$$$]: Sending direct request to complete missing blocks, ", request, "for chain", s.chainID)
s.gossip.Send(&proto.GossipMessage{
Nonce: 0,
Tag: proto.GossipMessage_CHAN_OR_ORG,
Channel: []byte(s.chainID),
Content: &proto.GossipMessage_StateRequest{request},
}, peer)
}


// GetBlock return ledger block given its sequence number as a parameter
func (s *GossipStateProviderImpl) GetBlock(index uint64) *common.Block {
// Try to read missing block from the ledger, should return no nil with
Expand All @@ -376,6 +397,7 @@ func (s *GossipStateProviderImpl) GetBlock(index uint64) *common.Block {

// AddPayload add new payload into state
func (s *GossipStateProviderImpl) AddPayload(payload *proto.Payload) error {
s.logger.Debug("Adding new payload into the buffer, seqNum = ", payload.SeqNum)
return s.payloads.Push(payload)
}

Expand All @@ -390,7 +412,7 @@ func (s *GossipStateProviderImpl) commitBlock(block *common.Block, seqNum uint64
// Decode state to byte array
bytes, err := state.Bytes()
if err == nil {
s.gossip.UpdateMetadata(bytes)
s.gossip.UpdateChannelMetadata(bytes, common2.ChainID(s.chainID))
} else {
s.logger.Errorf("Unable to serialize node meta state, error = %s", err)
}
Expand Down
Loading

0 comments on commit a4c510b

Please sign in to comment.