diff --git a/core/committer/noopssinglechain/client.go b/core/committer/noopssinglechain/client.go index dea69b5cd80..98412e02fcc 100644 --- a/core/committer/noopssinglechain/client.go +++ b/core/committer/noopssinglechain/client.go @@ -39,8 +39,7 @@ import ( var logger *logging.Logger // package-level logger func init() { - logger = logging.MustGetLogger("committer") - logging.SetLevel(logging.DEBUG, logger.Module) + logger = logging.MustGetLogger("noopssinglechain.client") } // DeliverService used to communicate with orderers to obtain @@ -200,7 +199,7 @@ func (d *DeliverService) readUntilClose() { // Create payload with a block received payload := createPayload(seqNum, t.Block) // Use payload to create gossip message - gossipMsg := createGossipMsg(payload) + gossipMsg := createGossipMsg(d.chainID, payload) logger.Debug("Creating gossip message", gossipMsg) logger.Debugf("Adding payload locally, buffer seqNum = [%d], peers number [%d]", seqNum, numberOfPeers) @@ -221,15 +220,16 @@ func (d *DeliverService) readUntilClose() { } } -func createGossipMsg(payload *gossip_proto.Payload) *gossip_proto.GossipMessage { +func createGossipMsg(chainID string, payload *gossip_proto.Payload) *gossip_proto.GossipMessage { gossipMsg := &gossip_proto.GossipMessage{ - Nonce: 0, + Nonce: 0, + Tag: gossip_proto.GossipMessage_CHAN_AND_ORG, + Channel: []byte(chainID), Content: &gossip_proto.GossipMessage_DataMsg{ DataMsg: &gossip_proto.DataMessage{ Payload: payload, }, }, - Tag: gossip_proto.GossipMessage_EMPTY, } return gossipMsg } diff --git a/core/util/utils.go b/core/util/utils.go index fa93b69b4b1..7351b0cede7 100644 --- a/core/util/utils.go +++ b/core/util/utils.go @@ -25,9 +25,8 @@ import ( "strings" "time" - "github.com/hyperledger/fabric/common/metadata" - "github.com/golang/protobuf/ptypes/timestamp" + "github.com/hyperledger/fabric/common/metadata" "golang.org/x/crypto/sha3" ) @@ -38,7 +37,7 @@ type alg struct { const defaultAlg = "sha256" var availableIDgenAlgs = map[string]alg{ - defaultAlg: alg{GenerateIDfromTxSHAHash}, + defaultAlg: {GenerateIDfromTxSHAHash}, } // ComputeCryptoHash should be used in openchain code so that we can change the actual algo used for crypto-hash at one place @@ -146,12 +145,18 @@ func ArrayToChaincodeArgs(args []string) [][]byte { } const testchainid = "**TEST_CHAINID**" +const testorgid = "**TEST_ORGID**" //GetTestChainID returns the CHAINID constant in use by orderer func GetTestChainID() string { return testchainid } +//GetTestOrgID returns the ORGID constant in use by gossip join message +func GetTestOrgID() string { + return testorgid +} + //GetSysCCVersion returns the version of all system chaincodes //This needs to be revisited on policies around system chaincode //"upgrades" from user and relationship with "fabric" upgrade. For diff --git a/gossip/gossip/gossip_impl.go b/gossip/gossip/gossip_impl.go index 72a52fd2942..ed1b82e1c85 100644 --- a/gossip/gossip/gossip_impl.go +++ b/gossip/gossip/gossip_impl.go @@ -124,7 +124,7 @@ func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvis g.certStore = newCertStore(g.createCertStorePuller(), idMapper, selfIdentity, mcs) - g.logger.SetLevel(logging.WARNING) + g.logger.SetLevel(logging.DEBUG) go g.start() diff --git a/gossip/integration/integration.go b/gossip/integration/integration.go index 0419bb267c2..b0e2f41f7a6 100644 --- a/gossip/integration/integration.go +++ b/gossip/integration/integration.go @@ -41,20 +41,22 @@ func newConfig(selfEndpoint string, bootPeers ...string) *gossip.Config { if err != nil { panic(err) } + return &gossip.Config{ - BindPort: int(port), - BootstrapPeers: bootPeers, - ID: selfEndpoint, - MaxBlockCountToStore: 100, - MaxPropagationBurstLatency: time.Millisecond * 50, - MaxPropagationBurstSize: 3, + BindPort: int(port), + BootstrapPeers: bootPeers, + ID: selfEndpoint, + MaxBlockCountToStore: 100, + MaxPropagationBurstLatency: time.Duration(10) * time.Millisecond, + MaxPropagationBurstSize: 10, PropagateIterations: 1, PropagatePeerNum: 3, - PullInterval: time.Second * 5, + PullInterval: time.Duration(4) * time.Second, PullPeerNum: 3, SelfEndpoint: selfEndpoint, - PublishCertPeriod: time.Duration(4) * time.Second, - RequestStateInfoInterval: time.Duration(4) * time.Second, + PublishCertPeriod: 10 * time.Second, + RequestStateInfoInterval: 4 * time.Second, + PublishStateInfoInterval: 4 * time.Second, } } diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go index b23307b95f9..0ea99011ccd 100644 --- a/gossip/service/gossip_service.go +++ b/gossip/service/gossip_service.go @@ -18,9 +18,11 @@ package service import ( "sync" + "time" peerComm "github.com/hyperledger/fabric/core/comm" "github.com/hyperledger/fabric/core/committer" + "github.com/hyperledger/fabric/core/util" "github.com/hyperledger/fabric/gossip/api" gossipCommon "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/gossip" @@ -40,6 +42,23 @@ var ( type gossipSvc gossip.Gossip +// TODO: This is a temporary join channel struct, to be removed once +// the structure of configuration block in terms of anchor peers will +// be defined and coded. Currently need it to allow the end-to-end +// skeleton to work, having gossip multi chain support. +type joinChanMsg struct { +} + +// GetTimestamp returns the timestamp of the message's creation +func (*joinChanMsg) GetTimestamp() time.Time { + return time.Now() +} + +// AnchorPeers returns all the anchor peers that are in the channel +func (*joinChanMsg) AnchorPeers() []api.AnchorPeer { + return []api.AnchorPeer{{Cert: api.PeerIdentityType(util.GetTestOrgID())}} +} + // GossipService encapsulates gossip and state capabilities into single interface type GossipService interface { gossip.Gossip @@ -58,11 +77,6 @@ type gossipServiceImpl struct { lock sync.RWMutex } -// JoinChan makes the Gossip instance join a channel -func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID gossipCommon.ChainID) { - // TODO: eventually we'll have only 1 JoinChannel method -} - var logger = logging.MustGetLogger("gossipService") // InitGossipService initialize gossip service @@ -99,7 +113,8 @@ func (g *gossipServiceImpl) JoinChannel(commiter committer.Committer, block *com } else { // Initialize new state provider for given committer logger.Debug("Creating state provider for chainID", chainID) - g.chains[chainID] = state.NewGossipStateProvider(g, commiter) + g.chains[chainID] = state.NewGossipStateProvider(chainID, g, commiter) + g.JoinChan(&joinChanMsg{}, gossipCommon.ChainID(chainID)) } return nil diff --git a/gossip/state/state.go b/gossip/state/state.go index 74379b348ba..9904646f4ac 100644 --- a/gossip/state/state.go +++ b/gossip/state/state.go @@ -17,6 +17,7 @@ limitations under the License. package state import ( + "bytes" "math/rand" "sync" "sync/atomic" @@ -25,6 +26,7 @@ import ( pb "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/core/committer" "github.com/hyperledger/fabric/gossip/comm" + common2 "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/gossip" "github.com/hyperledger/fabric/gossip/proto" "github.com/hyperledger/fabric/protos/common" @@ -58,6 +60,9 @@ const ( // the struct to handle in memory sliding window of // new ledger block to be acquired by hyper ledger type GossipStateProviderImpl struct { + // Chain id + chainID string + // The gossiping service gossip gossip.Gossip @@ -82,18 +87,19 @@ type GossipStateProviderImpl struct { } // NewGossipStateProvider creates initialized instance of gossip state provider -func NewGossipStateProvider(g gossip.Gossip, committer committer.Committer) GossipStateProvider { +func NewGossipStateProvider(chainID string, g gossip.Gossip, committer committer.Committer) GossipStateProvider { logger, _ := logging.GetLogger("GossipStateProvider") + logging.SetLevel(logging.DEBUG, logger.Module) gossipChan, _ := g.Accept(func(message interface{}) bool { // Get only data messages - return message.(*proto.GossipMessage).GetDataMsg() != nil + return message.(*proto.GossipMessage).IsDataMsg() && + bytes.Equal(message.(*proto.GossipMessage).Channel, []byte(chainID)) }, false) // Filter message which are only relevant for state transfer _, commChan := g.Accept(func(message interface{}) bool { - return message.(comm.ReceivedMessage).GetGossipMessage().GetStateRequest() != nil || - message.(comm.ReceivedMessage).GetGossipMessage().GetStateResponse() != nil + return message.(comm.ReceivedMessage).GetGossipMessage().IsRemoteStateMessage() }, true) height, err := committer.LedgerHeight() @@ -106,6 +112,8 @@ func NewGossipStateProvider(g gossip.Gossip, committer committer.Committer) Goss } s := &GossipStateProviderImpl{ + chainID: chainID, + // Instance of the gossip gossip: g, @@ -131,7 +139,8 @@ func NewGossipStateProvider(g gossip.Gossip, committer committer.Committer) Goss s.logger.Infof("Updating node metadata information, current ledger sequence is at = %d, next expected block is = %d", state.LedgerHeight, s.payloads.Next()) bytes, err := state.Bytes() if err == nil { - g.UpdateMetadata(bytes) + s.logger.Debug("[VVV]: Updating gossip metadate state", state) + g.UpdateChannelMetadata(bytes, common2.ChainID(s.chainID)) } else { s.logger.Errorf("Unable to serialize node meta state, error = %s", err) } @@ -182,6 +191,12 @@ func (s *GossipStateProviderImpl) directMessage(msg comm.ReceivedMessage) { return } + if !bytes.Equal(msg.GetGossipMessage().Channel, []byte(s.chainID)) { + s.logger.Warning("Received state transfer request for channel", + string(msg.GetGossipMessage().Channel), "while expecting channel", s.chainID, "skipping request...") + return + } + incoming := msg.GetGossipMessage() if incoming.GetStateRequest() != nil { @@ -208,19 +223,17 @@ func (s *GossipStateProviderImpl) handleStateRequest(msg comm.ReceivedMessage) { s.logger.Errorf("Could not marshal block: %s", err) } - if err != nil { - s.logger.Errorf("Could not calculate hash of block: %s", err) - } - response.Payloads = append(response.Payloads, &proto.Payload{ SeqNum: seqNum, Data: blockBytes, - // TODO: Check hash generation for given block from the ledger - Hash: "", + Hash: string(blocks[0].Header.Hash()), }) } // Sending back response with missing blocks msg.Respond(&proto.GossipMessage{ + Nonce: 0, + Tag: proto.GossipMessage_CHAN_OR_ORG, + Channel: []byte(s.chainID), Content: &proto.GossipMessage_StateResponse{response}, }) } @@ -251,6 +264,12 @@ func (s *GossipStateProviderImpl) Stop() { // New message notification/handler func (s *GossipStateProviderImpl) queueNewMessage(msg *proto.GossipMessage) { + if !bytes.Equal(msg.Channel, []byte(s.chainID)) { + s.logger.Warning("Received state transfer request for channel", + string(msg.Channel), "while expecting channel", s.chainID, "skipping request...") + return + } + dataMsg := msg.GetDataMsg() if dataMsg != nil { // Add new payload to ordered set @@ -302,7 +321,7 @@ func (s *GossipStateProviderImpl) antiEntropy() { current, _ := s.committer.LedgerHeight() max, _ := s.committer.LedgerHeight() - for _, p := range s.gossip.Peers() { + for _, p := range s.gossip.PeersOfChannel(common2.ChainID(s.chainID)) { if state, err := FromBytes(p.Metadata); err == nil { if max < state.LedgerHeight { max = state.LedgerHeight @@ -328,7 +347,7 @@ func (s *GossipStateProviderImpl) antiEntropy() { func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64) { var peers []*comm.RemotePeer // Filtering peers which might have relevant blocks - for _, value := range s.gossip.Peers() { + for _, value := range s.gossip.PeersOfChannel(common2.ChainID(s.chainID)) { nodeMetadata, err := FromBytes(value.Metadata) if err == nil { if nodeMetadata.LedgerHeight >= end { @@ -356,13 +375,15 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64) request.SeqNums = append(request.SeqNums, uint64(i)) } - s.logger.Debug("[$$$$$$$$$$$$$$$$]: Sending direct request to complete missing blocks, ", request) + s.logger.Debug("[$$$$$$$$$$$$$$$$]: Sending direct request to complete missing blocks, ", request, "for chain", s.chainID) s.gossip.Send(&proto.GossipMessage{ + Nonce: 0, + Tag: proto.GossipMessage_CHAN_OR_ORG, + Channel: []byte(s.chainID), Content: &proto.GossipMessage_StateRequest{request}, }, peer) } - // GetBlock return ledger block given its sequence number as a parameter func (s *GossipStateProviderImpl) GetBlock(index uint64) *common.Block { // Try to read missing block from the ledger, should return no nil with @@ -376,6 +397,7 @@ func (s *GossipStateProviderImpl) GetBlock(index uint64) *common.Block { // AddPayload add new payload into state func (s *GossipStateProviderImpl) AddPayload(payload *proto.Payload) error { + s.logger.Debug("Adding new payload into the buffer, seqNum = ", payload.SeqNum) return s.payloads.Push(payload) } @@ -390,7 +412,7 @@ func (s *GossipStateProviderImpl) commitBlock(block *common.Block, seqNum uint64 // Decode state to byte array bytes, err := state.Bytes() if err == nil { - s.gossip.UpdateMetadata(bytes) + s.gossip.UpdateChannelMetadata(bytes, common2.ChainID(s.chainID)) } else { s.logger.Errorf("Unable to serialize node meta state, error = %s", err) } diff --git a/gossip/state/state_test.go b/gossip/state/state_test.go index f6df150aaa4..8336cf4c423 100644 --- a/gossip/state/state_test.go +++ b/gossip/state/state_test.go @@ -26,6 +26,7 @@ import ( pb "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/core/committer" "github.com/hyperledger/fabric/core/ledger/ledgermgmt" + "github.com/hyperledger/fabric/core/util" "github.com/hyperledger/fabric/gossip/api" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/gossip" @@ -41,6 +42,20 @@ var ( ) var orgId = []byte("ORG1") +var anchorPeerIdentity = api.PeerIdentityType("identityInOrg1") + +type joinChanMsg struct { +} + +// GetTimestamp returns the timestamp of the message's creation +func (*joinChanMsg) GetTimestamp() time.Time { + return time.Now() +} + +// AnchorPeers returns all the anchor peers that are in the channel +func (*joinChanMsg) AnchorPeers() []api.AnchorPeer { + return []api.AnchorPeer{{Cert: anchorPeerIdentity}} +} type orgCryptoService struct { @@ -120,10 +135,10 @@ func (node *peerNode) shutdown() { func newGossipConfig(id int, maxMsgCount int, boot ...int) *gossip.Config { port := id + portPrefix return &gossip.Config{ - BindPort: port, - BootstrapPeers: bootPeers(boot...), - ID: fmt.Sprintf("p%d", id), - MaxBlockCountToStore: maxMsgCount, + BindPort: port, + BootstrapPeers: bootPeers(boot...), + ID: fmt.Sprintf("p%d", id), + MaxBlockCountToStore: maxMsgCount, MaxPropagationBurstLatency: time.Duration(10) * time.Millisecond, MaxPropagationBurstSize: 10, PropagateIterations: 1, @@ -132,8 +147,8 @@ func newGossipConfig(id int, maxMsgCount int, boot ...int) *gossip.Config { PullPeerNum: 5, SelfEndpoint: fmt.Sprintf("localhost:%d", port), PublishCertPeriod: 10 * time.Second, - RequestStateInfoInterval: 4 * time.Second, - PublishStateInfoInterval: 4 * time.Second, + RequestStateInfoInterval: 4 * time.Second, + PublishStateInfoInterval: 4 * time.Second, } } @@ -154,11 +169,14 @@ func newPeerNode(config *gossip.Config, committer committer.Committer) *peerNode // Gossip component based on configuration provided and communication module gossip := newGossipInstance(config) + logger.Debug("Joinning channel", util.GetTestChainID()) + gossip.JoinChan(&joinChanMsg{}, common.ChainID(util.GetTestChainID())) + // Initialize pseudo peer simulator, which has only three // basic parts return &peerNode{ g: gossip, - s: NewGossipStateProvider(gossip, committer), + s: NewGossipStateProvider(util.GetTestChainID(), gossip, committer), commit: committer, } @@ -278,8 +296,8 @@ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) { peersSet := make([]*peerNode, 0) for i := 0; i < standartPeersSize; i++ { - committer := newCommitter(standartPeersSize + i) - peersSet = append(peersSet, newPeerNode(newGossipConfig(standartPeersSize+i, 100, 0, 1, 2, 3, 4), committer)) + committer := newCommitter(bootstrapSetSize + i) + peersSet = append(peersSet, newPeerNode(newGossipConfig(bootstrapSetSize+i, 100, 0, 1, 2, 3, 4), committer)) } defer func() { @@ -290,7 +308,7 @@ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) { waitUntilTrueOrTimeout(t, func() bool { for _, p := range peersSet { - if len(p.g.Peers()) != bootstrapSetSize+standartPeersSize-1 { + if len(p.g.PeersOfChannel(common.ChainID(util.GetTestChainID()))) != bootstrapSetSize+standartPeersSize-1 { logger.Debug("[XXXXXXX]: Peer discovery has not finished yet") return false } @@ -312,7 +330,6 @@ func TestNewGossipStateProvider_SendingManyMessages(t *testing.T) { logger.Debug("[#####]: All peers have same ledger height!!!") return true }, 60*time.Second) - } func waitUntilTrueOrTimeout(t *testing.T, predicate func() bool, timeout time.Duration) {