diff --git a/gossip/gossip/channel/channel.go b/gossip/gossip/channel/channel.go new file mode 100644 index 00000000000..45279cd2f5c --- /dev/null +++ b/gossip/gossip/channel/channel.go @@ -0,0 +1,561 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package channel + +import ( + "bytes" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/hyperledger/fabric/gossip/api" + "github.com/hyperledger/fabric/gossip/comm" + "github.com/hyperledger/fabric/gossip/common" + "github.com/hyperledger/fabric/gossip/discovery" + "github.com/hyperledger/fabric/gossip/gossip/msgstore" + "github.com/hyperledger/fabric/gossip/gossip/pull" + "github.com/hyperledger/fabric/gossip/proto" + "github.com/hyperledger/fabric/gossip/util" +) + +// Config is a configuration item +// of the channel store +type Config struct { + ID string + PublishStateInfoInterval time.Duration + MaxBlockCountToStore int + PullPeerNum int + PullInterval time.Duration + RequestStateInfoInterval time.Duration +} + +// GossipChannel defines an object that deals with all channel-related messages +type GossipChannel interface { + + // GetPeers returns a list of peers with metadata as published by them + GetPeers() []discovery.NetworkMember + + // IsMemberInChan checks whether the given member is eligible to be in the channel + IsMemberInChan(member discovery.NetworkMember) bool + + // UpdateStateInfo updates this channel's StateInfo message + // that is periodically published + UpdateStateInfo(msg *proto.GossipMessage) + + // IsOrgInChannel returns whether the given organization is in the channel + IsOrgInChannel(membersOrg api.OrgIdentityType) bool + + // IsSubscribed returns whether the given member published + // its participation in the channel + IsSubscribed(member discovery.NetworkMember) bool + + // HandleMessage processes a message sent by a remote peer + HandleMessage(comm.ReceivedMessage) + + // AddToMsgStore adds a given GossipMessage to the message store + AddToMsgStore(msg *proto.GossipMessage) + + // ConfigureChannel (re)configures the list of organizations + // that are eligible to be in the channel + ConfigureChannel(joinMsg api.JoinChannelMessage) + + // Stop stops the channel's activity + Stop() +} + +// Adapter enables the gossipChannel +// to communicate with gossipServiceImpl. +type Adapter interface { + // GetConf returns the configuration that this GossipChannel will posses + GetConf() Config + + // Gossip gossips a message in the channel + Gossip(*proto.GossipMessage) + + // DeMultiplex de-multiplexes an item to subscribers + DeMultiplex(interface{}) + + // GetMembership returns the known alive peers and their information + GetMembership() []discovery.NetworkMember + + // Send sends a message to a list of peers + Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) + + // ValidateStateInfoMessage returns an error if a message + // hasn't been signed correctly, nil otherwise. + ValidateStateInfoMessage(*proto.GossipMessage) error + + // OrgByPeerIdentity returns the organization ID of a given peer identity + OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType + + // GetOrgOfPeer returns the organization ID of a given peer PKI-ID + GetOrgOfPeer(common.PKIidType) api.OrgIdentityType +} + +type gossipChannel struct { + Adapter + sync.RWMutex + shouldGossipStateInfo int32 + mcs api.MessageCryptoService + stopChan chan struct{} + stateInfoMsg *proto.GossipMessage + orgs []api.OrgIdentityType + joinMsg api.JoinChannelMessage + blockMsgStore msgstore.MessageStore + stateInfoMsgStore msgstore.MessageStore + chainID common.ChainID + blocksPuller pull.Mediator + logger *util.Logger + stateInfoPublishScheduler *time.Ticker + stateInfoRequestScheduler *time.Ticker + memFilter *membershipFilter +} + +type membershipFilter struct { + adapter Adapter + *gossipChannel +} + +// GetMembership returns the known alive peers and their information +func (mf *membershipFilter) GetMembership() []discovery.NetworkMember { + var members []discovery.NetworkMember + for _, mem := range mf.adapter.GetMembership() { + if mf.IsSubscribed(mem) { + members = append(members, mem) + } + } + return members +} + +// NewGossipChannel creates a new GossipChannel +func NewGossipChannel(mcs api.MessageCryptoService, chainID common.ChainID, adapter Adapter, joinMsg api.JoinChannelMessage) GossipChannel { + gc := &gossipChannel{ + mcs: mcs, + Adapter: adapter, + logger: util.GetLogger("channelState", adapter.GetConf().ID), + stopChan: make(chan struct{}, 1), + shouldGossipStateInfo: int32(0), + stateInfoPublishScheduler: time.NewTicker(adapter.GetConf().PublishStateInfoInterval), + stateInfoRequestScheduler: time.NewTicker(adapter.GetConf().RequestStateInfoInterval), + orgs: []api.OrgIdentityType{}, + chainID: chainID, + } + + gc.memFilter = &membershipFilter{adapter: gc.Adapter, gossipChannel: gc} + + comparator := proto.NewGossipMessageComparator(adapter.GetConf().MaxBlockCountToStore) + gc.blockMsgStore = msgstore.NewMessageStore(comparator, func(m interface{}) { + gc.blocksPuller.Remove(m.(*proto.GossipMessage)) + }) + + gc.stateInfoMsgStore = msgstore.NewMessageStore(comparator, func(m interface{}) {}) + gc.blocksPuller = gc.createBlockPuller() + + gc.ConfigureChannel(joinMsg) + + // Periodically publish state info + go gc.periodicalInvocation(gc.publishStateInfo, gc.stateInfoPublishScheduler.C) + // Periodically request state info + go gc.periodicalInvocation(gc.requestStateInfo, gc.stateInfoRequestScheduler.C) + return gc +} + +// Stop stop the channel operations +func (gc *gossipChannel) Stop() { + gc.stopChan <- struct{}{} + gc.blocksPuller.Stop() + gc.stateInfoPublishScheduler.Stop() + gc.stateInfoRequestScheduler.Stop() +} + +func (gc *gossipChannel) periodicalInvocation(fn func(), c <-chan time.Time) { + for { + select { + case <-c: + fn() + case <-gc.stopChan: + gc.stopChan <- struct{}{} + return + } + } +} + +// GetPeers returns a list of peers with metadata as published by them +func (gc *gossipChannel) GetPeers() []discovery.NetworkMember { + members := []discovery.NetworkMember{} + + pkiID2NetMember := make(map[string]discovery.NetworkMember) + for _, member := range gc.GetMembership() { + pkiID2NetMember[string(member.PKIid)] = member + } + + for _, o := range gc.stateInfoMsgStore.Get() { + stateInf := o.(*proto.GossipMessage).GetStateInfo() + pkiID := stateInf.PkiID + if member, exists := pkiID2NetMember[string(pkiID)]; !exists { + continue + } else { + member.Metadata = stateInf.Metadata + members = append(members, member) + } + } + return members +} + +func (gc *gossipChannel) requestStateInfo() { + req := gc.createStateInfoRequest() + endpoints := selectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsMemberInChan) + gc.Send(req, endpoints...) +} + +func (gc *gossipChannel) publishStateInfo() { + if atomic.LoadInt32(&gc.shouldGossipStateInfo) == int32(0) { + return + } + gc.RLock() + stateInfoMsg := gc.stateInfoMsg + gc.RUnlock() + gc.Gossip(stateInfoMsg) +} + +func (gc *gossipChannel) createBlockPuller() pull.Mediator { + conf := pull.PullConfig{ + MsgType: proto.PullMsgType_BlockMessage, + Channel: []byte(gc.chainID), + Id: gc.GetConf().ID, + PeerCountToSelect: gc.GetConf().PullPeerNum, + PullInterval: gc.GetConf().PullInterval, + Tag: proto.GossipMessage_CHAN_AND_ORG, + } + seqNumFromMsg := func(msg *proto.GossipMessage) string { + dataMsg := msg.GetDataMsg() + if dataMsg == nil || dataMsg.Payload == nil { + return "" + } + return fmt.Sprintf("%d", dataMsg.Payload.SeqNum) + } + blockConsumer := func(msg *proto.GossipMessage) { + dataMsg := msg.GetDataMsg() + if dataMsg == nil || dataMsg.Payload == nil { + gc.logger.Warning("Invalid DataMessage:", dataMsg) + return + } + added := gc.blockMsgStore.Add(msg) + // if we can't add the message to the msgStore, + // no point in disseminating it to others... + if !added { + return + } + gc.DeMultiplex(msg) + } + return pull.NewPullMediator(conf, gc, gc.memFilter, seqNumFromMsg, blockConsumer) +} + +// IsMemberInChan checks whether the given member is eligible to be in the channel +func (gc *gossipChannel) IsMemberInChan(member discovery.NetworkMember) bool { + org := gc.GetOrgOfPeer(member.PKIid) + if org == nil { + return false + } + + return gc.IsOrgInChannel(org) +} + +// IsOrgInChannel returns whether the given organization is in the channel +func (gc *gossipChannel) IsOrgInChannel(membersOrg api.OrgIdentityType) bool { + gc.RLock() + defer gc.RUnlock() + for _, orgOfChan := range gc.orgs { + if bytes.Equal(orgOfChan, membersOrg) { + return true + } + } + return false +} + +// IsSubscribed returns whether the given member published +// its participation in the channel +func (gc *gossipChannel) IsSubscribed(member discovery.NetworkMember) bool { + if !gc.IsMemberInChan(member) { + return false + } + for _, o := range gc.stateInfoMsgStore.Get() { + m, isMsg := o.(*proto.GossipMessage) + if isMsg && m.IsStateInfoMsg() && bytes.Equal(m.GetStateInfo().PkiID, member.PKIid) { + return true + } + } + return false +} + +// AddToMsgStore adds a given GossipMessage to the message store +func (gc *gossipChannel) AddToMsgStore(msg *proto.GossipMessage) { + if msg.IsDataMsg() { + gc.blockMsgStore.Add(msg) + gc.blocksPuller.Add(msg) + } + + if msg.IsStateInfoMsg() { + gc.stateInfoMsgStore.Add(msg) + } +} + +// ConfigureChannel (re)configures the list of organizations +// that are eligible to be in the channel +func (gc *gossipChannel) ConfigureChannel(joinMsg api.JoinChannelMessage) { + gc.Lock() + defer gc.Unlock() + + if gc.joinMsg == nil { + gc.joinMsg = joinMsg + } + + if gc.joinMsg.GetTimestamp().After(joinMsg.GetTimestamp()) { + gc.logger.Warning("Already have a more updated JoinChannel message(", gc.joinMsg.GetTimestamp(), ") than", gc.joinMsg.GetTimestamp()) + return + } + orgs := []api.OrgIdentityType{} + existingOrgInJoinChanMsg := make(map[string]struct{}) + for _, anchorPeer := range joinMsg.AnchorPeers() { + orgID := gc.OrgByPeerIdentity(anchorPeer.Cert) + if orgID == nil { + gc.logger.Warning("Cannot extract org identity from certificate, aborting.") + return + } + if _, exists := existingOrgInJoinChanMsg[string(orgID)]; !exists { + orgs = append(orgs, orgID) + existingOrgInJoinChanMsg[string(orgID)] = struct{}{} + } + } + gc.orgs = orgs + gc.joinMsg = joinMsg +} + +// HandleMessage processes a message sent by a remote peer +func (gc *gossipChannel) HandleMessage(msg comm.ReceivedMessage) { + if !gc.verifyMsg(msg) { + return + } + m := msg.GetGossipMessage() + if !m.IsChannelRestricted() { + gc.logger.Warning("Got message", msg.GetGossipMessage(), "but it's not a per-channel message, discarding it") + return + } + orgID := gc.GetOrgOfPeer(msg.GetPKIID()) + if orgID == nil { + gc.logger.Warning("Couldn't find org identity of peer", msg.GetPKIID()) + return + } + if !gc.IsOrgInChannel(orgID) { + gc.logger.Warning("Point to point message came from", msg.GetPKIID(), "but it's not eligible for the channel", msg.GetGossipMessage().Channel) + return + } + + if m.IsStateInfoPullRequestMsg() { + msg.Respond(gc.createStateInfoSnapshot()) + return + } + + if m.IsStateInfoSnapshot() { + gc.handleStateInfSnapshot(m, msg.GetPKIID()) + return + } + + if m.IsDataMsg() || m.IsStateInfoMsg() { + added := false + + if m.IsDataMsg() { + if !gc.verifyBlock(m, msg.GetPKIID()) { + return + } + added = gc.blockMsgStore.Add(msg.GetGossipMessage()) + } else { // StateInfoMsg verification should be handled in a layer above + // since we don't have access to the id mapper here + added = gc.stateInfoMsgStore.Add(msg.GetGossipMessage()) + } + + if added { + // Forward the message + gc.Gossip(msg.GetGossipMessage()) + // DeMultiplex to local subscribers + gc.DeMultiplex(m) + + if m.IsDataMsg() { + gc.blocksPuller.Add(msg.GetGossipMessage()) + } + } + return + } + if m.IsPullMsg() && m.GetPullMsgType() == proto.PullMsgType_BlockMessage { + if m.IsDataUpdate() { + for _, item := range m.GetDataUpdate().Data { + if !bytes.Equal(item.Channel, []byte(gc.chainID)) { + gc.logger.Warning("DataUpdate message contains item with channel", item.Channel, "but should be", gc.chainID) + return + } + if !gc.verifyBlock(item, msg.GetPKIID()) { + return + } + } + } + gc.blocksPuller.HandleMessage(msg) + } +} + +func (gc *gossipChannel) handleStateInfSnapshot(m *proto.GossipMessage, sender common.PKIidType) { + for _, stateInf := range m.GetStateSnapshot().Elements { + if !stateInf.IsStateInfoMsg() { + gc.logger.Warning("Element of StateInfoSnapshot isn't a StateInfoMessage:", stateInf, "message sent from", sender) + return + } + + orgID := gc.GetOrgOfPeer(stateInf.GetStateInfo().PkiID) + if orgID == nil { + gc.logger.Warning("Couldn't find org identity of peer", stateInf.GetStateInfo().PkiID, "message sent from", sender) + return + } + + if !gc.IsOrgInChannel(orgID) { + gc.logger.Warning("Peer", stateInf.GetStateInfo().PkiID, "is not in an eligible org, can't process a stateInfo from it, sent from", sender) + return + } + + if !bytes.Equal(stateInf.Channel, []byte(gc.chainID)) { + gc.logger.Warning("StateInfo message is of an invalid channel", stateInf, "sent from", sender) + return + } + err := gc.ValidateStateInfoMessage(stateInf) + if err != nil { + gc.logger.Warning("Failed validating state info message:", stateInf, ":", err, "sent from", sender) + return + } + gc.stateInfoMsgStore.Add(stateInf) + } +} + +func (gc *gossipChannel) verifyBlock(msg *proto.GossipMessage, sender common.PKIidType) bool { + if !msg.IsDataMsg() { + gc.logger.Warning("Received from ", sender, "a DataUpdate message that contains a non-block GossipMessage:", msg) + return false + } + if msg.GetDataMsg().Payload == nil { + gc.logger.Warning("Received empty payload from", sender) + return false + } + err := gc.mcs.VerifyBlock(msg.GetDataMsg().Payload) + if err != nil { + gc.logger.Warning("Received fabricated block from", sender, "in DataUpdate:", err) + return false + } + return true +} + +func (gc *gossipChannel) createStateInfoSnapshot() *proto.GossipMessage { + rawElements := gc.stateInfoMsgStore.Get() + elements := make([]*proto.GossipMessage, len(rawElements)) + for i, rawEl := range rawElements { + elements[i] = rawEl.(*proto.GossipMessage) + } + + return &proto.GossipMessage{ + Channel: gc.chainID, + Tag: proto.GossipMessage_CHAN_OR_ORG, + Nonce: 0, + Content: &proto.GossipMessage_StateSnapshot{ + StateSnapshot: &proto.StateInfoSnapshot{ + Elements: elements, + }, + }, + } +} + +func (gc *gossipChannel) verifyMsg(msg comm.ReceivedMessage) bool { + if msg == nil { + gc.logger.Warning("Messsage is nil") + return false + } + m := msg.GetGossipMessage() + if m == nil { + gc.logger.Warning("Message content is empty") + return false + } + + if msg.GetPKIID() == nil { + gc.logger.Warning("Message has nil PKI-ID") + return false + } + + if !bytes.Equal(m.Channel, []byte(gc.chainID)) { + gc.logger.Warning("Message contains wrong channel(", m.Channel, "), expected", gc.chainID) + return false + } + return true +} + +func (gc *gossipChannel) createStateInfoRequest() *proto.GossipMessage { + return &proto.GossipMessage{ + Channel: gc.chainID, + Tag: proto.GossipMessage_CHAN_OR_ORG, + Nonce: 0, + Content: &proto.GossipMessage_StateInfoPullReq{ + StateInfoPullReq: &proto.StateInfoPullRequest{}, + }, + } +} + +// UpdateStateInfo updates this channel's StateInfo message +// that is periodically published +func (gc *gossipChannel) UpdateStateInfo(msg *proto.GossipMessage) { + if !msg.IsStateInfoMsg() { + return + } + gc.stateInfoMsgStore.Add(msg) + gc.Lock() + defer gc.Unlock() + gc.stateInfoMsg = msg + atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1)) +} + +// selectPeers returns a slice of peers that match a list of routing filters +func selectPeers(k int, peerPool []discovery.NetworkMember, filters ...func(discovery.NetworkMember) bool) []*comm.RemotePeer { + var indices []int + if len(peerPool) < k { + indices = make([]int, len(peerPool)) + for i := 0; i < len(peerPool); i++ { + indices[i] = i + } + } else { + indices = util.GetRandomIndices(k, len(peerPool)-1) + } + + var remotePeers []*comm.RemotePeer + for _, index := range indices { + peer := peerPool[index] + passesFilters := true + for _, filter := range filters { + if !filter(peer) { + passesFilters = false + } + } + if passesFilters { + remotePeers = append(remotePeers, &comm.RemotePeer{PKIID: peer.PKIid, Endpoint: peer.Endpoint}) + } + + } + return remotePeers +} diff --git a/gossip/gossip/channel/channel_test.go b/gossip/gossip/channel/channel_test.go new file mode 100644 index 00000000000..adfef361ac8 --- /dev/null +++ b/gossip/gossip/channel/channel_test.go @@ -0,0 +1,904 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package channel + +import ( + "fmt" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/hyperledger/fabric/gossip/api" + "github.com/hyperledger/fabric/gossip/comm" + "github.com/hyperledger/fabric/gossip/common" + "github.com/hyperledger/fabric/gossip/discovery" + "github.com/hyperledger/fabric/gossip/gossip/algo" + "github.com/hyperledger/fabric/gossip/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +type msgMutator func(*proto.GossipMessage) + +var conf = Config{ + ID: "test", + PublishStateInfoInterval: time.Millisecond * 100, + MaxBlockCountToStore: 100, + PullPeerNum: 3, + PullInterval: time.Second, + RequestStateInfoInterval: time.Millisecond * 100, +} + +func init() { + shortenedWaitTime := time.Millisecond * 300 + algo.SetDigestWaitTime(shortenedWaitTime / 2) + algo.SetRequestWaitTime(shortenedWaitTime) + algo.SetResponseWaitTime(shortenedWaitTime) +} + +var ( + // Organizations: {ORG1, ORG2} + // Channel A: {ORG1} + channelA = common.ChainID("A") + orgInChannelA = api.OrgIdentityType("ORG1") + orgNotInChannelA = api.OrgIdentityType("ORG2") + anchorPeerIdentity = api.PeerIdentityType("identityInOrg1") + pkiIDInOrg1 = common.PKIidType("pkiIDInOrg1") + pkiIDinOrg2 = common.PKIidType("pkiIDinOrg2") +) + +type joinChanMsg struct { + getTS func() time.Time + anchorPeers func() []api.AnchorPeer +} + +// GetTimestamp returns the timestamp of the message's creation +func (jcm *joinChanMsg) GetTimestamp() time.Time { + if jcm.getTS != nil { + return jcm.getTS() + } + return time.Now() +} + +// AnchorPeers returns all the anchor peers that are in the channel +func (jcm *joinChanMsg) AnchorPeers() []api.AnchorPeer { + if jcm.anchorPeers != nil { + return jcm.anchorPeers() + } + return []api.AnchorPeer{{Cert: anchorPeerIdentity}} +} + +type cryptoService struct { + mock.Mock +} + +func (cs *cryptoService) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType { + panic("Should not be called in this test") +} + +func (cs *cryptoService) VerifyBlock(signedBlock api.SignedBlock) error { + args := cs.Called(signedBlock) + if args.Get(0) == nil { + return nil + } + return args.Get(0).(error) +} + +func (cs *cryptoService) Sign(msg []byte) ([]byte, error) { + panic("Should not be called in this test") +} + +func (cs *cryptoService) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error { + panic("Should not be called in this test") +} + +func (cs *cryptoService) ValidateIdentity(peerIdentity api.PeerIdentityType) error { + panic("Should not be called in this test") +} + +type receivedMsg struct { + PKIID common.PKIidType + msg *proto.GossipMessage + mock.Mock +} + +func (m *receivedMsg) GetGossipMessage() *proto.GossipMessage { + return m.msg +} + +func (m *receivedMsg) Respond(msg *proto.GossipMessage) { + m.Called(msg) +} + +func (m *receivedMsg) GetPKIID() common.PKIidType { + return m.PKIID +} + +type gossipAdapterMock struct { + mock.Mock +} + +func (ga *gossipAdapterMock) GetConf() Config { + args := ga.Called() + return args.Get(0).(Config) +} + +func (ga *gossipAdapterMock) Gossip(msg *proto.GossipMessage) { + ga.Called(msg) +} + +func (ga *gossipAdapterMock) DeMultiplex(msg interface{}) { + ga.Called(msg) +} + +func (ga *gossipAdapterMock) GetMembership() []discovery.NetworkMember { + args := ga.Called() + return args.Get(0).([]discovery.NetworkMember) +} + +func (ga *gossipAdapterMock) Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) { + // Ensure we have configured Send prior + foundSend := false + for _, ec := range ga.ExpectedCalls { + if ec.Method == "Send" { + foundSend = true + } + + } + if !foundSend { + return + } + ga.Called(msg, peers) +} + +func (ga *gossipAdapterMock) ValidateStateInfoMessage(msg *proto.GossipMessage) error { + args := ga.Called(msg) + if args.Get(0) == nil { + return nil + } + return args.Get(0).(error) +} + +func (ga *gossipAdapterMock) OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType { + args := ga.Called(identity) + return args.Get(0).(api.OrgIdentityType) +} + +func (ga *gossipAdapterMock) GetOrgOfPeer(PKIIID common.PKIidType) api.OrgIdentityType { + args := ga.Called(PKIIID) + return args.Get(0).(api.OrgIdentityType) +} + +func configureAdapter(adapter *gossipAdapterMock, members ...discovery.NetworkMember) { + adapter.On("GetConf").Return(conf) + adapter.On("GetMembership").Return(members) + adapter.On("OrgByPeerIdentity", anchorPeerIdentity).Return(orgInChannelA) + adapter.On("GetOrgOfPeer", pkiIDInOrg1).Return(orgInChannelA) + adapter.On("GetOrgOfPeer", pkiIDinOrg2).Return(orgNotInChannelA) + adapter.On("GetOrgOfPeer", mock.Anything).Return(api.OrgIdentityType(nil)) +} + +func TestChannelPeriodicalPublishStateInfo(t *testing.T) { + t.Parallel() + ledgerHeight := 5 + receivedMsg := int32(0) + stateInfoReceptionChan := make(chan *proto.GossipMessage, 1) + + cs := &cryptoService{} + cs.On("VerifyBlock", mock.Anything).Return(nil) + + adapter := new(gossipAdapterMock) + configureAdapter(adapter) + adapter.On("Send", mock.AnythingOfType("*proto.GossipMessage"), mock.Anything) + adapter.On("Gossip", mock.AnythingOfType("*proto.GossipMessage")).Run(func(arg mock.Arguments) { + if atomic.LoadInt32(&receivedMsg) == int32(1) { + return + } + + atomic.StoreInt32(&receivedMsg, int32(1)) + msg := arg.Get(0).(*proto.GossipMessage) + stateInfoReceptionChan <- msg + }) + + gc := NewGossipChannel(cs, channelA, adapter, &joinChanMsg{}) + gc.UpdateStateInfo(createStateInfoMsg(ledgerHeight, pkiIDInOrg1, channelA)) + + var msg *proto.GossipMessage + select { + case <-time.After(time.Second * 5): + t.Fatalf("Haven't sent stateInfo on time") + case m := <-stateInfoReceptionChan: + msg = m + } + + md := msg.GetStateInfo().Metadata + height, err := strconv.ParseInt(string(md), 10, 64) + assert.NoError(t, err, "ReceivedMetadata is invalid") + assert.Equal(t, ledgerHeight, int(height), "Received different ledger height than expected") +} + +func TestChannelPull(t *testing.T) { + t.Parallel() + cs := &cryptoService{} + cs.On("VerifyBlock", mock.Anything).Return(nil) + receivedBlocksChan := make(chan *proto.GossipMessage) + adapter := new(gossipAdapterMock) + configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) + adapter.On("Gossip", mock.AnythingOfType("*proto.GossipMessage")) + adapter.On("DeMultiplex", mock.AnythingOfType("*proto.GossipMessage")).Run(func(arg mock.Arguments) { + msg := arg.Get(0).(*proto.GossipMessage) + if !msg.IsDataMsg() { + return + } + // The peer is supposed to de-multiplex 2 ledger blocks + assert.True(t, msg.IsDataMsg()) + receivedBlocksChan <- msg + }) + gc := NewGossipChannel(cs, channelA, adapter, &joinChanMsg{}) + go gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(100, pkiIDInOrg1, channelA)}) + + var wg sync.WaitGroup + pullPhase := simulatePullPhase(gc, t, &wg, func(*proto.GossipMessage) {}) + adapter.On("Send", mock.AnythingOfType("*proto.GossipMessage"), mock.Anything).Run(pullPhase) + + wg.Wait() + for expectedSeq := 10; expectedSeq < 11; expectedSeq++ { + select { + case <-time.After(time.Second * 5): + t.Fatalf("Haven't received blocks on time") + case msg := <-receivedBlocksChan: + assert.Equal(t, uint64(expectedSeq), msg.GetDataMsg().Payload.SeqNum) + } + } +} + +func TestChannelPeerNotInChannel(t *testing.T) { + t.Parallel() + + cs := &cryptoService{} + cs.On("VerifyBlock", mock.Anything).Return(nil) + gossipMessagesSentFromChannel := make(chan *proto.GossipMessage, 1) + adapter := new(gossipAdapterMock) + configureAdapter(adapter) + adapter.On("Gossip", mock.AnythingOfType("*proto.GossipMessage")) + adapter.On("Send", mock.Anything, mock.Anything) + adapter.On("DeMultiplex", mock.Anything) + gc := NewGossipChannel(cs, channelA, adapter, &joinChanMsg{}) + + // First thing, we test that blocks can only be received from peers that are in an org that's in the channel + // Empty PKI-ID, should drop the block + gc.HandleMessage(&receivedMsg{msg: dataMsgOfChannel(5, channelA)}) + assert.Equal(t, 0, gc.(*gossipChannel).blockMsgStore.Size()) + + // Known PKI-ID but not in channel, should drop the block + gc.HandleMessage(&receivedMsg{msg: dataMsgOfChannel(5, channelA), PKIID: pkiIDinOrg2}) + assert.Equal(t, 0, gc.(*gossipChannel).blockMsgStore.Size()) + // Known PKI-ID, and in channel, should add the block + gc.HandleMessage(&receivedMsg{msg: dataMsgOfChannel(5, channelA), PKIID: pkiIDInOrg1}) + assert.Equal(t, 1, gc.(*gossipChannel).blockMsgStore.Size()) + + // Next, we make sure that the channel doesn't respond to pull messages (hello or requests) from peers that're not in the channel + messageRelayer := func(arg mock.Arguments) { + msg := arg.Get(0).(*proto.GossipMessage) + gossipMessagesSentFromChannel <- msg + } + // First, ensure it does that for pull messages from peers that are in the channel + helloMsg := createHelloMsg(pkiIDInOrg1) + helloMsg.On("Respond", mock.AnythingOfType("*proto.GossipMessage")).Run(messageRelayer) + gc.HandleMessage(helloMsg) + select { + case <-gossipMessagesSentFromChannel: + case <-time.After(time.Second * 5): + t.Fatalf("Didn't reply with a digest on time") + } + // And now for peers that are not in the channel (should not send back a message) + helloMsg = createHelloMsg(pkiIDinOrg2) + helloMsg.On("Respond", mock.AnythingOfType("*proto.GossipMessage")).Run(messageRelayer) + gc.HandleMessage(helloMsg) + select { + case <-gossipMessagesSentFromChannel: + t.Fatalf("Responded with digest, but shouldn't have since peer is in ORG2 and its not in the channel") + case <-time.After(time.Second * 1): + } + + // Ensure we respond to a valid StateInfoRequest + req := gc.(*gossipChannel).createStateInfoRequest() + validReceivedMsg := &receivedMsg{ + msg: req, + PKIID: pkiIDInOrg1, + } + validReceivedMsg.On("Respond", mock.AnythingOfType("*proto.GossipMessage")).Run(messageRelayer) + gc.HandleMessage(validReceivedMsg) + select { + case <-gossipMessagesSentFromChannel: + case <-time.After(time.Second * 5): + t.Fatalf("Didn't reply with a digest on time") + } + + // Ensure we don't respond to a StateInfoRequest from a peer in the wrong org + invalidReceivedMsg := &receivedMsg{ + msg: req, + PKIID: pkiIDinOrg2, + } + invalidReceivedMsg.On("Respond", mock.AnythingOfType("*proto.GossipMessage")).Run(messageRelayer) + gc.HandleMessage(invalidReceivedMsg) + select { + case <-gossipMessagesSentFromChannel: + t.Fatalf("Responded with digest, but shouldn't have since peer is in ORG2 and its not in the channel") + case <-time.After(time.Second * 1): + } + + // Ensure we don't respond to a StateInfoRequest in the wrong channel from a peer in the right org + req2 := gc.(*gossipChannel).createStateInfoRequest() + req2.Channel = []byte("B") // Not channelA + invalidReceivedMsg2 := &receivedMsg{ + msg: req2, + PKIID: pkiIDInOrg1, + } + invalidReceivedMsg2.On("Respond", mock.AnythingOfType("*proto.GossipMessage")).Run(messageRelayer) + gc.HandleMessage(invalidReceivedMsg2) + select { + case <-gossipMessagesSentFromChannel: + t.Fatalf("Responded with digest, but shouldn't have since peer is in ORG2 and its not in the channel") + case <-time.After(time.Second * 1): + } +} + +func TestChannelIsInChannel(t *testing.T) { + t.Parallel() + + cs := &cryptoService{} + cs.On("VerifyBlock", mock.Anything).Return(nil) + adapter := new(gossipAdapterMock) + configureAdapter(adapter) + gc := NewGossipChannel(cs, channelA, adapter, &joinChanMsg{}) + adapter.On("Gossip", mock.AnythingOfType("*proto.GossipMessage")) + adapter.On("Send", mock.Anything, mock.Anything) + adapter.On("DeMultiplex", mock.Anything) + + assert.False(t, gc.IsOrgInChannel(nil)) + assert.True(t, gc.IsOrgInChannel(orgInChannelA)) + assert.False(t, gc.IsOrgInChannel(orgNotInChannelA)) + assert.True(t, gc.IsMemberInChan(discovery.NetworkMember{PKIid: pkiIDInOrg1})) + assert.False(t, gc.IsMemberInChan(discovery.NetworkMember{PKIid: pkiIDinOrg2})) +} + +func TestChannelIsSubscribed(t *testing.T) { + t.Parallel() + + cs := &cryptoService{} + cs.On("VerifyBlock", mock.Anything).Return(nil) + adapter := new(gossipAdapterMock) + configureAdapter(adapter) + gc := NewGossipChannel(cs, channelA, adapter, &joinChanMsg{}) + adapter.On("Gossip", mock.AnythingOfType("*proto.GossipMessage")) + adapter.On("Send", mock.Anything, mock.Anything) + adapter.On("DeMultiplex", mock.Anything) + gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1, channelA), PKIID: pkiIDInOrg1}) + assert.True(t, gc.IsSubscribed(discovery.NetworkMember{PKIid: pkiIDInOrg1})) +} + +func TestChannelAddToMessageStore(t *testing.T) { + t.Parallel() + + cs := &cryptoService{} + cs.On("VerifyBlock", mock.Anything).Return(nil) + demuxedMsgs := make(chan *proto.GossipMessage, 1) + adapter := new(gossipAdapterMock) + configureAdapter(adapter) + gc := NewGossipChannel(cs, channelA, adapter, &joinChanMsg{}) + adapter.On("Gossip", mock.Anything) + adapter.On("Send", mock.Anything, mock.Anything) + adapter.On("DeMultiplex", mock.AnythingOfType("*proto.GossipMessage")).Run(func(arg mock.Arguments) { + demuxedMsgs <- arg.Get(0).(*proto.GossipMessage) + }) + + // Check that adding a message of a bad type doesn't crash the program + gc.AddToMsgStore(createHelloMsg(pkiIDInOrg1).GetGossipMessage()) + + // We make sure that if we get a new message it is de-multiplexed, + // but if we put such a message in the message store, it isn't demultiplexed when we + // receive that message again + gc.HandleMessage(&receivedMsg{msg: dataMsgOfChannel(11, channelA), PKIID: pkiIDInOrg1}) + select { + case <-time.After(time.Second): + t.Fatalf("Haven't detected a demultiplexing within a time period") + case <-demuxedMsgs: + } + gc.AddToMsgStore(dataMsgOfChannel(12, channelA)) + gc.HandleMessage(&receivedMsg{msg: dataMsgOfChannel(12, channelA), PKIID: pkiIDInOrg1}) + select { + case <-time.After(time.Second): + case <-demuxedMsgs: + t.Fatalf("Demultiplexing detected, even though it wasn't supposed to happen") + } + + gc.AddToMsgStore(createStateInfoMsg(10, pkiIDInOrg1, channelA)) + helloMsg := createHelloMsg(pkiIDInOrg1) + respondedChan := make(chan struct{}, 1) + helloMsg.On("Respond", mock.AnythingOfType("*proto.GossipMessage")).Run(func(arg mock.Arguments) { + respondedChan <- struct{}{} + }) + gc.HandleMessage(helloMsg) + select { + case <-time.After(time.Second): + t.Fatalf("Haven't responded to hello message within a time period") + case <-respondedChan: + } + + gc.HandleMessage(&receivedMsg{msg: createStateInfoMsg(10, pkiIDInOrg1, channelA), PKIID: pkiIDInOrg1}) + assert.True(t, gc.IsSubscribed(discovery.NetworkMember{PKIid: pkiIDInOrg1})) +} + +func TestChannelBadBlocks(t *testing.T) { + t.Parallel() + receivedMessages := make(chan *proto.GossipMessage, 1) + cs := &cryptoService{} + cs.On("VerifyBlock", mock.Anything).Return(nil) + adapter := new(gossipAdapterMock) + configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) + adapter.On("Gossip", mock.Anything) + gc := NewGossipChannel(cs, channelA, adapter, &joinChanMsg{}) + + adapter.On("DeMultiplex", mock.Anything).Run(func(args mock.Arguments) { + receivedMessages <- args.Get(0).(*proto.GossipMessage) + }) + + // Send a valid block + gc.HandleMessage(&receivedMsg{msg: createDataMsg(1, channelA), PKIID: pkiIDInOrg1}) + assert.Len(t, receivedMessages, 1) + <-receivedMessages // drain + + // Send a block with wrong channel + gc.HandleMessage(&receivedMsg{msg: createDataMsg(2, common.ChainID("B")), PKIID: pkiIDInOrg1}) + assert.Len(t, receivedMessages, 0) + + // Send a block with empty payload + dataMsg := createDataMsg(3, channelA) + dataMsg.GetDataMsg().Payload = nil + gc.HandleMessage(&receivedMsg{msg: dataMsg, PKIID: pkiIDInOrg1}) + assert.Len(t, receivedMessages, 0) + + // Send a block with a bad signature + cs.Mock = mock.Mock{} + cs.On("VerifyBlock", mock.Anything).Return(fmt.Errorf("Bad signature")) + gc.HandleMessage(&receivedMsg{msg: createDataMsg(4, channelA), PKIID: pkiIDInOrg1}) + assert.Len(t, receivedMessages, 0) +} + +func TestChannelPulledBadBlocks(t *testing.T) { + t.Parallel() + + // Test a pull with a block of a bad channel + cs := &cryptoService{} + cs.On("VerifyBlock", mock.Anything).Return(nil) + adapter := new(gossipAdapterMock) + configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) + adapter.On("DeMultiplex", mock.Anything) + adapter.On("Gossip", mock.Anything) + gc := NewGossipChannel(cs, channelA, adapter, &joinChanMsg{}) + gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)}) + + var wg sync.WaitGroup + wg.Add(1) + + changeChan := func(msg *proto.GossipMessage) { + msg.Channel = []byte("B") + } + + pullPhase1 := simulatePullPhase(gc, t, &wg, changeChan) + adapter.On("Send", mock.Anything, mock.Anything).Run(pullPhase1) + adapter.On("DeMultiplex", mock.Anything) + wg.Wait() + gc.Stop() + assert.Equal(t, 0, gc.(*gossipChannel).blockMsgStore.Size()) + + // Test a pull with a badly signed block + cs = &cryptoService{} + cs.On("VerifyBlock", mock.Anything).Return(fmt.Errorf("Bad block")) + adapter = new(gossipAdapterMock) + adapter.On("Gossip", mock.Anything) + adapter.On("DeMultiplex", mock.Anything) + configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) + gc = NewGossipChannel(cs, channelA, adapter, &joinChanMsg{}) + gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)}) + + var wg2 sync.WaitGroup + wg2.Add(1) + noop := func(msg *proto.GossipMessage) { + + } + pullPhase2 := simulatePullPhase(gc, t, &wg2, noop) + adapter.On("Send", mock.Anything, mock.Anything).Run(pullPhase2) + wg2.Wait() + assert.Equal(t, 0, gc.(*gossipChannel).blockMsgStore.Size()) + + // Test a pull with an empty block + cs = &cryptoService{} + cs.On("VerifyBlock", mock.Anything).Return(nil) + adapter = new(gossipAdapterMock) + adapter.On("Gossip", mock.Anything) + adapter.On("DeMultiplex", mock.Anything) + configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) + gc = NewGossipChannel(cs, channelA, adapter, &joinChanMsg{}) + gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)}) + + var wg3 sync.WaitGroup + wg3.Add(1) + emptyBlock := func(msg *proto.GossipMessage) { + msg.GetDataMsg().Payload = nil + } + pullPhase3 := simulatePullPhase(gc, t, &wg3, emptyBlock) + adapter.On("Send", mock.Anything, mock.Anything).Run(pullPhase3) + wg3.Wait() + assert.Equal(t, 0, gc.(*gossipChannel).blockMsgStore.Size()) + + // Test a pull with a non-block message + cs = &cryptoService{} + cs.On("VerifyBlock", mock.Anything).Return(nil) + + adapter = new(gossipAdapterMock) + adapter.On("Gossip", mock.Anything) + adapter.On("DeMultiplex", mock.Anything) + configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) + gc = NewGossipChannel(cs, channelA, adapter, &joinChanMsg{}) + gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(1, pkiIDInOrg1, channelA)}) + + var wg4 sync.WaitGroup + wg4.Add(1) + nonBlockMsg := func(msg *proto.GossipMessage) { + msg.Content = createHelloMsg(pkiIDInOrg1).GetGossipMessage().Content + } + pullPhase4 := simulatePullPhase(gc, t, &wg4, nonBlockMsg) + adapter.On("Send", mock.Anything, mock.Anything).Run(pullPhase4) + wg4.Wait() + assert.Equal(t, 0, gc.(*gossipChannel).blockMsgStore.Size()) +} + +func TestChannelStateInfoSnapshot(t *testing.T) { + t.Parallel() + + cs := &cryptoService{} + cs.On("VerifyBlock", mock.Anything).Return(nil) + adapter := new(gossipAdapterMock) + configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) + gc := NewGossipChannel(cs, channelA, adapter, &joinChanMsg{}) + adapter.On("Gossip", mock.Anything) + sentMessages := make(chan *proto.GossipMessage, 10) + adapter.On("Send", mock.Anything, mock.Anything) + adapter.On("ValidateStateInfoMessage", mock.AnythingOfType("*proto.GossipMessage")).Return(nil) + + // Ensure we ignore stateInfo snapshots from peers not in the channel + gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: stateInfoSnapshotForChannel(common.ChainID("B"), createStateInfoMsg(4, pkiIDInOrg1, channelA))}) + assert.Empty(t, gc.GetPeers()) + // Ensure we ignore invalid stateInfo snapshots + gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: stateInfoSnapshotForChannel(channelA, createStateInfoMsg(4, pkiIDInOrg1, common.ChainID("B")))}) + assert.Empty(t, gc.GetPeers()) + + // Ensure we ignore stateInfo messages from peers not in the channel + gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: stateInfoSnapshotForChannel(channelA, createStateInfoMsg(4, pkiIDinOrg2, channelA))}) + assert.Empty(t, gc.GetPeers()) + + // Ensure we ignore stateInfo snapshots from peers not in the org + gc.HandleMessage(&receivedMsg{PKIID: pkiIDinOrg2, msg: stateInfoSnapshotForChannel(channelA, createStateInfoMsg(4, pkiIDInOrg1, channelA))}) + assert.Empty(t, gc.GetPeers()) + + // Ensure we process stateInfo snapshots that are OK + gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: stateInfoSnapshotForChannel(channelA, createStateInfoMsg(4, pkiIDInOrg1, channelA))}) + assert.NotEmpty(t, gc.GetPeers()) + assert.Equal(t, "4", string(gc.GetPeers()[0].Metadata)) + + // Check we can respond to stateInfoSnapshot requests + snapshotReq := &receivedMsg{ + PKIID: pkiIDInOrg1, + msg: &proto.GossipMessage{ + Channel: channelA, + Tag: proto.GossipMessage_CHAN_OR_ORG, + Content: &proto.GossipMessage_StateInfoPullReq{ + StateInfoPullReq: &proto.StateInfoPullRequest{}, + }, + }, + } + snapshotReq.On("Respond", mock.Anything).Run(func(args mock.Arguments) { + sentMessages <- args.Get(0).(*proto.GossipMessage) + }) + + go gc.HandleMessage(snapshotReq) + select { + case <-time.After(time.Second): + t.Fatal("Haven't received a state info snapshot on time") + case msg := <-sentMessages: + elements := msg.GetStateSnapshot().Elements + assert.Len(t, elements, 1) + assert.Equal(t, []byte("4"), elements[0].GetStateInfo().Metadata) + } + + // Ensure we don't crash if we got an invalid state info message + invalidStateInfoSnapshot := stateInfoSnapshotForChannel(channelA, createStateInfoMsg(4, pkiIDInOrg1, channelA)) + invalidStateInfoSnapshot.GetStateSnapshot().Elements = []*proto.GossipMessage{createHelloMsg(pkiIDInOrg1).GetGossipMessage()} + gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: invalidStateInfoSnapshot}) + + // Ensure we don't crash if we got a stateInfoMessage from a peer that its org isn't known + invalidStateInfoSnapshot = stateInfoSnapshotForChannel(channelA, createStateInfoMsg(4, common.PKIidType("unknown"), channelA)) + gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: invalidStateInfoSnapshot}) + +} + +func TestChannelStop(t *testing.T) { + t.Parallel() + + cs := &cryptoService{} + cs.On("VerifyBlock", mock.Anything).Return(nil) + adapter := new(gossipAdapterMock) + var sendCount int32 + configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) + adapter.On("Send", mock.Anything, mock.Anything).Run(func(mock.Arguments) { + atomic.AddInt32(&sendCount, int32(1)) + }) + gc := NewGossipChannel(cs, channelA, adapter, &joinChanMsg{}) + time.Sleep(time.Second) + gc.Stop() + oldCount := atomic.LoadInt32(&sendCount) + t1 := time.Now() + for { + if time.Since(t1).Nanoseconds() > (time.Second * 15).Nanoseconds() { + t.Fatal("Stop failed") + } + time.Sleep(time.Second) + newCount := atomic.LoadInt32(&sendCount) + if newCount == oldCount { + break + } + oldCount = newCount + } +} + +func TestChannelReconfigureChannel(t *testing.T) { + t.Parallel() + + // Scenario: We test the following things: + // Updating a channel with an outdated JoinChannel message doesn't work + // Removing an organization from a channel is indeed reflected in that + // the GossipChannel doesn't consider peers from that organization as + // peers in the channel, and refuses to have any channel-related contact + // with peers of that channel + + cs := &cryptoService{} + adapter := new(gossipAdapterMock) + configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1}) + + adapter.On("GetConf").Return(conf) + adapter.On("GetMembership").Return([]discovery.NetworkMember{}) + adapter.On("OrgByPeerIdentity", api.PeerIdentityType(orgInChannelA)).Return(orgInChannelA) + adapter.On("OrgByPeerIdentity", api.PeerIdentityType(orgNotInChannelA)).Return(orgNotInChannelA) + adapter.On("GetOrgOfPeer", pkiIDInOrg1).Return(orgInChannelA) + adapter.On("GetOrgOfPeer", pkiIDinOrg2).Return(orgNotInChannelA) + + outdatedJoinChanMsg := &joinChanMsg{ + anchorPeers: func() []api.AnchorPeer { + return []api.AnchorPeer{{Cert: api.PeerIdentityType(orgNotInChannelA)}} + }, + getTS: func() time.Time { + return time.Now() + }, + } + + newJoinChanMsg := &joinChanMsg{ + anchorPeers: func() []api.AnchorPeer { + return []api.AnchorPeer{{Cert: api.PeerIdentityType(orgInChannelA)}} + }, + getTS: func() time.Time { + return time.Now().Add(time.Millisecond * 100) + }, + } + + updatedJoinChanMsg := &joinChanMsg{ + anchorPeers: func() []api.AnchorPeer { + return []api.AnchorPeer{{Cert: api.PeerIdentityType(orgNotInChannelA)}} + }, + getTS: func() time.Time { + return time.Now().Add(time.Millisecond * 200) + }, + } + + gc := NewGossipChannel(cs, channelA, adapter, api.JoinChannelMessage(newJoinChanMsg)) + + // Just call it again, to make sure stuff don't crash + gc.ConfigureChannel(api.JoinChannelMessage(newJoinChanMsg)) + + adapter.On("Gossip", mock.AnythingOfType("*proto.GossipMessage")) + adapter.On("Send", mock.Anything, mock.Anything) + adapter.On("DeMultiplex", mock.Anything) + + assert.True(t, gc.IsOrgInChannel(orgInChannelA)) + assert.False(t, gc.IsOrgInChannel(orgNotInChannelA)) + assert.True(t, gc.IsMemberInChan(discovery.NetworkMember{PKIid: pkiIDInOrg1})) + assert.False(t, gc.IsMemberInChan(discovery.NetworkMember{PKIid: pkiIDinOrg2})) + + gc.ConfigureChannel(outdatedJoinChanMsg) + assert.True(t, gc.IsOrgInChannel(orgInChannelA)) + assert.False(t, gc.IsOrgInChannel(orgNotInChannelA)) + assert.True(t, gc.IsMemberInChan(discovery.NetworkMember{PKIid: pkiIDInOrg1})) + assert.False(t, gc.IsMemberInChan(discovery.NetworkMember{PKIid: pkiIDinOrg2})) + + gc.ConfigureChannel(updatedJoinChanMsg) + gc.ConfigureChannel(updatedJoinChanMsg) + assert.False(t, gc.IsOrgInChannel(orgInChannelA)) + assert.True(t, gc.IsOrgInChannel(orgNotInChannelA)) + assert.False(t, gc.IsMemberInChan(discovery.NetworkMember{PKIid: pkiIDInOrg1})) + assert.True(t, gc.IsMemberInChan(discovery.NetworkMember{PKIid: pkiIDinOrg2})) + + // Ensure we don't respond to a StateInfoRequest from a peer in the wrong org + invalidReceivedMsg := &receivedMsg{ + msg: gc.(*gossipChannel).createStateInfoRequest(), + PKIID: pkiIDInOrg1, + } + gossipMessagesSentFromChannel := make(chan *proto.GossipMessage, 1) + messageRelayer := func(arg mock.Arguments) { + msg := arg.Get(0).(*proto.GossipMessage) + gossipMessagesSentFromChannel <- msg + } + invalidReceivedMsg.On("Respond", mock.AnythingOfType("*proto.GossipMessage")).Run(messageRelayer) + gc.HandleMessage(invalidReceivedMsg) + select { + case <-gossipMessagesSentFromChannel: + t.Fatalf("Responded with digest, but shouldn't have since peer is in ORG2 and its not in the channel") + case <-time.After(time.Second * 1): + } + +} + +func createDataUpdateMsg(nonce uint64) *proto.GossipMessage { + return &proto.GossipMessage{ + Nonce: 0, + Channel: []byte(channelA), + Tag: proto.GossipMessage_CHAN_AND_ORG, + Content: &proto.GossipMessage_DataUpdate{ + DataUpdate: &proto.DataUpdate{ + MsgType: proto.PullMsgType_BlockMessage, + Nonce: nonce, + Data: []*proto.GossipMessage{createDataMsg(10, channelA), createDataMsg(11, channelA)}, + }, + }, + } +} + +func createHelloMsg(PKIID common.PKIidType) *receivedMsg { + msg := &proto.GossipMessage{ + Channel: []byte(channelA), + Tag: proto.GossipMessage_CHAN_AND_ORG, + Content: &proto.GossipMessage_Hello{ + Hello: &proto.GossipHello{ + Nonce: 500, + Metadata: nil, + MsgType: proto.PullMsgType_BlockMessage, + }, + }, + } + return &receivedMsg{msg: msg, PKIID: PKIID} +} + +func dataMsgOfChannel(seqnum uint64, channel common.ChainID) *proto.GossipMessage { + return &proto.GossipMessage{ + Channel: []byte(channel), + Nonce: 0, + Tag: proto.GossipMessage_CHAN_AND_ORG, + Content: &proto.GossipMessage_DataMsg{ + DataMsg: &proto.DataMessage{ + Payload: &proto.Payload{ + Data: []byte{}, + Hash: "", + SeqNum: seqnum, + }, + }, + }, + } +} + +func createStateInfoMsg(ledgerHeight int, pkiID common.PKIidType, channel common.ChainID) *proto.GossipMessage { + return &proto.GossipMessage{ + Channel: channel, + Tag: proto.GossipMessage_CHAN_OR_ORG, + Content: &proto.GossipMessage_StateInfo{ + StateInfo: &proto.StateInfo{ + Timestamp: &proto.PeerTime{IncNumber: uint64(time.Now().UnixNano()), SeqNum: 1}, + Metadata: []byte(fmt.Sprintf("%d", ledgerHeight)), + PkiID: []byte(pkiID), + }, + }, + } +} + +func stateInfoSnapshotForChannel(chainID common.ChainID, stateInfoMsgs ...*proto.GossipMessage) *proto.GossipMessage { + return &proto.GossipMessage{ + Channel: chainID, + Tag: proto.GossipMessage_CHAN_OR_ORG, + Nonce: 0, + Content: &proto.GossipMessage_StateSnapshot{ + StateSnapshot: &proto.StateInfoSnapshot{ + Elements: stateInfoMsgs, + }, + }, + } +} + +func createDataMsg(seqnum uint64, channel common.ChainID) *proto.GossipMessage { + return &proto.GossipMessage{ + Nonce: 0, + Tag: proto.GossipMessage_CHAN_AND_ORG, + Channel: []byte(channel), + Content: &proto.GossipMessage_DataMsg{ + DataMsg: &proto.DataMessage{ + Payload: &proto.Payload{ + Data: []byte{}, + Hash: "", + SeqNum: seqnum, + }, + }, + }, + } +} + +func simulatePullPhase(gc GossipChannel, t *testing.T, wg *sync.WaitGroup, mutator msgMutator) func(args mock.Arguments) { + var l sync.Mutex + var sentHello bool + var sentReq bool + return func(args mock.Arguments) { + msg := args.Get(0).(*proto.GossipMessage) + l.Lock() + defer l.Unlock() + + if msg.IsHelloMsg() && !sentHello { + sentHello = true + // Simulate a digest message an imaginary peer responds to the hello message sent + digestMsg := &receivedMsg{ + PKIID: pkiIDInOrg1, + msg: &proto.GossipMessage{ + Tag: proto.GossipMessage_CHAN_AND_ORG, + Channel: []byte(channelA), + Content: &proto.GossipMessage_DataDig{ + DataDig: &proto.DataDigest{ + MsgType: proto.PullMsgType_BlockMessage, + Digests: []string{"10", "11"}, + Nonce: msg.GetHello().Nonce, + }, + }, + }, + } + go gc.HandleMessage(digestMsg) + } + if msg.IsDataReq() && !sentReq { + sentReq = true + dataReq := msg.GetDataReq() + for _, expectedDigest := range []string{"10", "11"} { + assert.Contains(t, dataReq.Digests, expectedDigest) + } + assert.Equal(t, 2, len(dataReq.Digests)) + // When we send a data request, simulate a response of a data update + // from the imaginary peer that got the request + dataUpdateMsg := new(receivedMsg) + dataUpdateMsg.PKIID = pkiIDInOrg1 + dataUpdateMsg.msg = createDataUpdateMsg(dataReq.Nonce) + mutator(dataUpdateMsg.msg.GetDataUpdate().Data[0]) + gc.HandleMessage(dataUpdateMsg) + wg.Done() + } + } + +} diff --git a/gossip/proto/extensions.go b/gossip/proto/extensions.go index d648562d3ee..8340e100825 100644 --- a/gossip/proto/extensions.go +++ b/gossip/proto/extensions.go @@ -19,10 +19,12 @@ package proto import ( "bytes" + "fmt" "github.com/hyperledger/fabric/gossip/common" "github.com/hyperledger/fabric/gossip/util" ) +// NewGossipMessageComparator creates a MessageReplacingPolicy given a maximum number of blocks to hold func NewGossipMessageComparator(dataBlockStorageSize int) common.MessageReplacingPolicy { return (&msgComparator{dataBlockStorageSize: dataBlockStorageSize}).getMsgReplacingPolicy() } @@ -116,23 +118,46 @@ func compareTimestamps(thisTS *PeerTime, thatTS *PeerTime) common.InvalidationRe return common.MessageInvalidates } +// IsAliveMsg returns whether this GossipMessage is an AliveMessage func (m *GossipMessage) IsAliveMsg() bool { return m.GetAliveMsg() != nil } +// IsDataMsg returns whether this GossipMessage is a data message func (m *GossipMessage) IsDataMsg() bool { return m.GetDataMsg() != nil } +// IsStateInfoPullRequestMsg returns whether this GossipMessage is a stateInfoPullRequest +func (m *GossipMessage) IsStateInfoPullRequestMsg() bool { + return m.GetStateInfoPullReq() != nil +} + +// IsStateInfoSnapshot returns whether this GossipMessage is a stateInfo snapshot +func (m *GossipMessage) IsStateInfoSnapshot() bool { + return m.GetStateSnapshot() != nil +} + +// IsStateInfoMsg returns whether this GossipMessage is a stateInfo message func (m *GossipMessage) IsStateInfoMsg() bool { return m.GetStateInfo() != nil } +// IsPullMsg returns whether this GossipMessage is a message that has belongs +// to the pull mechanism func (m *GossipMessage) IsPullMsg() bool { return m.GetDataReq() != nil || m.GetDataUpdate() != nil || m.GetHello() != nil || m.GetDataDig() != nil } +// IsRemoteStateMessage returns whether this GossipMessage is related to state synchronization +func (m *GossipMessage) IsRemoteStateMessage() bool { + return m.GetStateRequest() != nil || m.GetStateResponse() != nil +} + +// GetPullMsgType returns the phase of the pull mechanism this GossipMessage belongs to +// for example: Hello, Digest, etc. +// If this isn't a pull message, PullMsgType_Undefined is returned. func (m *GossipMessage) GetPullMsgType() PullMsgType { if helloMsg := m.GetHello(); helloMsg != nil { return helloMsg.MsgType @@ -153,12 +178,99 @@ func (m *GossipMessage) GetPullMsgType() PullMsgType { return PullMsgType_Undefined } +// IsChannelRestricted returns whether this GossipMessage should be routed +// only in its channel +func (m *GossipMessage) IsChannelRestricted() bool { + return m.Tag == GossipMessage_CHAN_AND_ORG || m.Tag == GossipMessage_CHAN_ONLY || m.Tag == GossipMessage_CHAN_OR_ORG +} + +// IsOrgRestricted returns whether this GossipMessage should be routed only +// inside the organization +func (m *GossipMessage) IsOrgRestricted() bool { + return m.Tag == GossipMessage_CHAN_AND_ORG || m.Tag == GossipMessage_ORG_ONLY +} + +// IsIdentityMsg returns whether this GossipMessage is an identity message func (m *GossipMessage) IsIdentityMsg() bool { return m.GetPeerIdentity() != nil } +// IsDataReq returns whether this GossipMessage is a data request message +func (m *GossipMessage) IsDataReq() bool { + return m.GetDataReq() != nil +} + +// IsDataUpdate returns whether this GossipMessage is a data update message +func (m *GossipMessage) IsDataUpdate() bool { + return m.GetDataUpdate() != nil +} + +// IsHelloMsg returns whether this GossipMessage is a hello message +func (m *GossipMessage) IsHelloMsg() bool { + return m.GetHello() != nil +} + +// IsDigestMsg returns whether this GossipMessage is a digest message +func (m *GossipMessage) IsDigestMsg() bool { + return m.GetDataDig() != nil +} + // MsgConsumer invokes code given a GossipMessage type MsgConsumer func(*GossipMessage) // IdentifierExtractor extracts from a GossipMessage an identifier type IdentifierExtractor func(*GossipMessage) string + +// IsTagLegal checks the GossipMessage tags and inner type +// and returns an error if the tag doesn't match the type. +func (m *GossipMessage) IsTagLegal() error { + if m.Tag == GossipMessage_UNDEFINED { + return fmt.Errorf("Undefined tag") + } + if m.IsDataMsg() { + if m.Tag != GossipMessage_CHAN_AND_ORG { + return fmt.Errorf("Tag should be %s", GossipMessage_Tag_name[int32(GossipMessage_CHAN_AND_ORG)]) + } + return nil + } + + if m.IsAliveMsg() || m.GetMemReq() != nil || m.GetMemRes() != nil { + if m.Tag != GossipMessage_EMPTY { + return fmt.Errorf("Tag should be %s", GossipMessage_Tag_name[int32(GossipMessage_EMPTY)]) + } + return nil + } + + if m.IsIdentityMsg() { + if m.Tag != GossipMessage_ORG_ONLY { + return fmt.Errorf("Tag should be %s", GossipMessage_Tag_name[int32(GossipMessage_ORG_ONLY)]) + } + return nil + } + + if m.IsPullMsg() { + switch m.GetPullMsgType() { + case PullMsgType_BlockMessage: + if m.Tag != GossipMessage_CHAN_AND_ORG { + return fmt.Errorf("Tag should be %s", GossipMessage_Tag_name[int32(GossipMessage_CHAN_AND_ORG)]) + } + return nil + case PullMsgType_IdentityMsg: + if m.Tag != GossipMessage_EMPTY { + return fmt.Errorf("Tag should be %s", GossipMessage_Tag_name[int32(GossipMessage_EMPTY)]) + } + return nil + default: + return fmt.Errorf("Invalid PullMsgType: %s", PullMsgType_name[int32(m.GetPullMsgType())]) + } + } + + if m.IsStateInfoMsg() || m.IsStateInfoPullRequestMsg() || m.IsStateInfoSnapshot() || m.IsRemoteStateMessage() { + if m.Tag != GossipMessage_CHAN_OR_ORG { + return fmt.Errorf("Tag should be %s", GossipMessage_Tag_name[int32(GossipMessage_CHAN_OR_ORG)]) + } + return nil + } + + return fmt.Errorf("Unknown message type: %v", m) +} diff --git a/gossip/proto/message.pb.go b/gossip/proto/message.pb.go index fe668787ca4..b4b1afeb42a 100644 --- a/gossip/proto/message.pb.go +++ b/gossip/proto/message.pb.go @@ -11,7 +11,8 @@ It is generated from these files: It has these top-level messages: GossipMessage StateInfo - ChannelCommand + StateInfoSnapshot + StateInfoPullRequest ConnEstablish PeerIdentity DataRequest @@ -84,6 +85,7 @@ const ( GossipMessage_ORG_ONLY GossipMessage_Tag = 2 GossipMessage_CHAN_ONLY GossipMessage_Tag = 3 GossipMessage_CHAN_AND_ORG GossipMessage_Tag = 4 + GossipMessage_CHAN_OR_ORG GossipMessage_Tag = 5 ) var GossipMessage_Tag_name = map[int32]string{ @@ -92,6 +94,7 @@ var GossipMessage_Tag_name = map[int32]string{ 2: "ORG_ONLY", 3: "CHAN_ONLY", 4: "CHAN_AND_ORG", + 5: "CHAN_OR_ORG", } var GossipMessage_Tag_value = map[string]int32{ "UNDEFINED": 0, @@ -99,6 +102,7 @@ var GossipMessage_Tag_value = map[string]int32{ "ORG_ONLY": 2, "CHAN_ONLY": 3, "CHAN_AND_ORG": 4, + "CHAN_OR_ORG": 5, } func (x GossipMessage_Tag) String() string { @@ -127,10 +131,11 @@ type GossipMessage struct { // *GossipMessage_DataDig // *GossipMessage_DataReq // *GossipMessage_DataUpdate - // *GossipMessage_ChanCmd // *GossipMessage_Empty // *GossipMessage_Conn // *GossipMessage_StateInfo + // *GossipMessage_StateSnapshot + // *GossipMessage_StateInfoPullReq // *GossipMessage_StateRequest // *GossipMessage_StateResponse // *GossipMessage_LeadershipMsg @@ -171,47 +176,51 @@ type GossipMessage_DataReq struct { type GossipMessage_DataUpdate struct { DataUpdate *DataUpdate `protobuf:"bytes,11,opt,name=dataUpdate,oneof"` } -type GossipMessage_ChanCmd struct { - ChanCmd *ChannelCommand `protobuf:"bytes,12,opt,name=chanCmd,oneof"` -} type GossipMessage_Empty struct { - Empty *Empty `protobuf:"bytes,13,opt,name=empty,oneof"` + Empty *Empty `protobuf:"bytes,12,opt,name=empty,oneof"` } type GossipMessage_Conn struct { - Conn *ConnEstablish `protobuf:"bytes,14,opt,name=conn,oneof"` + Conn *ConnEstablish `protobuf:"bytes,13,opt,name=conn,oneof"` } type GossipMessage_StateInfo struct { - StateInfo *StateInfo `protobuf:"bytes,15,opt,name=stateInfo,oneof"` + StateInfo *StateInfo `protobuf:"bytes,14,opt,name=stateInfo,oneof"` +} +type GossipMessage_StateSnapshot struct { + StateSnapshot *StateInfoSnapshot `protobuf:"bytes,15,opt,name=stateSnapshot,oneof"` +} +type GossipMessage_StateInfoPullReq struct { + StateInfoPullReq *StateInfoPullRequest `protobuf:"bytes,16,opt,name=stateInfoPullReq,oneof"` } type GossipMessage_StateRequest struct { - StateRequest *RemoteStateRequest `protobuf:"bytes,16,opt,name=stateRequest,oneof"` + StateRequest *RemoteStateRequest `protobuf:"bytes,17,opt,name=stateRequest,oneof"` } type GossipMessage_StateResponse struct { - StateResponse *RemoteStateResponse `protobuf:"bytes,17,opt,name=stateResponse,oneof"` + StateResponse *RemoteStateResponse `protobuf:"bytes,18,opt,name=stateResponse,oneof"` } type GossipMessage_LeadershipMsg struct { - LeadershipMsg *LeadershipMessage `protobuf:"bytes,18,opt,name=leadershipMsg,oneof"` + LeadershipMsg *LeadershipMessage `protobuf:"bytes,19,opt,name=leadershipMsg,oneof"` } type GossipMessage_PeerIdentity struct { - PeerIdentity *PeerIdentity `protobuf:"bytes,19,opt,name=peerIdentity,oneof"` -} - -func (*GossipMessage_AliveMsg) isGossipMessage_Content() {} -func (*GossipMessage_MemReq) isGossipMessage_Content() {} -func (*GossipMessage_MemRes) isGossipMessage_Content() {} -func (*GossipMessage_DataMsg) isGossipMessage_Content() {} -func (*GossipMessage_Hello) isGossipMessage_Content() {} -func (*GossipMessage_DataDig) isGossipMessage_Content() {} -func (*GossipMessage_DataReq) isGossipMessage_Content() {} -func (*GossipMessage_DataUpdate) isGossipMessage_Content() {} -func (*GossipMessage_ChanCmd) isGossipMessage_Content() {} -func (*GossipMessage_Empty) isGossipMessage_Content() {} -func (*GossipMessage_Conn) isGossipMessage_Content() {} -func (*GossipMessage_StateInfo) isGossipMessage_Content() {} -func (*GossipMessage_StateRequest) isGossipMessage_Content() {} -func (*GossipMessage_StateResponse) isGossipMessage_Content() {} -func (*GossipMessage_LeadershipMsg) isGossipMessage_Content() {} -func (*GossipMessage_PeerIdentity) isGossipMessage_Content() {} + PeerIdentity *PeerIdentity `protobuf:"bytes,20,opt,name=peerIdentity,oneof"` +} + +func (*GossipMessage_AliveMsg) isGossipMessage_Content() {} +func (*GossipMessage_MemReq) isGossipMessage_Content() {} +func (*GossipMessage_MemRes) isGossipMessage_Content() {} +func (*GossipMessage_DataMsg) isGossipMessage_Content() {} +func (*GossipMessage_Hello) isGossipMessage_Content() {} +func (*GossipMessage_DataDig) isGossipMessage_Content() {} +func (*GossipMessage_DataReq) isGossipMessage_Content() {} +func (*GossipMessage_DataUpdate) isGossipMessage_Content() {} +func (*GossipMessage_Empty) isGossipMessage_Content() {} +func (*GossipMessage_Conn) isGossipMessage_Content() {} +func (*GossipMessage_StateInfo) isGossipMessage_Content() {} +func (*GossipMessage_StateSnapshot) isGossipMessage_Content() {} +func (*GossipMessage_StateInfoPullReq) isGossipMessage_Content() {} +func (*GossipMessage_StateRequest) isGossipMessage_Content() {} +func (*GossipMessage_StateResponse) isGossipMessage_Content() {} +func (*GossipMessage_LeadershipMsg) isGossipMessage_Content() {} +func (*GossipMessage_PeerIdentity) isGossipMessage_Content() {} func (m *GossipMessage) GetContent() isGossipMessage_Content { if m != nil { @@ -276,13 +285,6 @@ func (m *GossipMessage) GetDataUpdate() *DataUpdate { return nil } -func (m *GossipMessage) GetChanCmd() *ChannelCommand { - if x, ok := m.GetContent().(*GossipMessage_ChanCmd); ok { - return x.ChanCmd - } - return nil -} - func (m *GossipMessage) GetEmpty() *Empty { if x, ok := m.GetContent().(*GossipMessage_Empty); ok { return x.Empty @@ -304,6 +306,20 @@ func (m *GossipMessage) GetStateInfo() *StateInfo { return nil } +func (m *GossipMessage) GetStateSnapshot() *StateInfoSnapshot { + if x, ok := m.GetContent().(*GossipMessage_StateSnapshot); ok { + return x.StateSnapshot + } + return nil +} + +func (m *GossipMessage) GetStateInfoPullReq() *StateInfoPullRequest { + if x, ok := m.GetContent().(*GossipMessage_StateInfoPullReq); ok { + return x.StateInfoPullReq + } + return nil +} + func (m *GossipMessage) GetStateRequest() *RemoteStateRequest { if x, ok := m.GetContent().(*GossipMessage_StateRequest); ok { return x.StateRequest @@ -343,10 +359,11 @@ func (*GossipMessage) XXX_OneofFuncs() (func(msg proto1.Message, b *proto1.Buffe (*GossipMessage_DataDig)(nil), (*GossipMessage_DataReq)(nil), (*GossipMessage_DataUpdate)(nil), - (*GossipMessage_ChanCmd)(nil), (*GossipMessage_Empty)(nil), (*GossipMessage_Conn)(nil), (*GossipMessage_StateInfo)(nil), + (*GossipMessage_StateSnapshot)(nil), + (*GossipMessage_StateInfoPullReq)(nil), (*GossipMessage_StateRequest)(nil), (*GossipMessage_StateResponse)(nil), (*GossipMessage_LeadershipMsg)(nil), @@ -398,43 +415,48 @@ func _GossipMessage_OneofMarshaler(msg proto1.Message, b *proto1.Buffer) error { if err := b.EncodeMessage(x.DataUpdate); err != nil { return err } - case *GossipMessage_ChanCmd: - b.EncodeVarint(12<<3 | proto1.WireBytes) - if err := b.EncodeMessage(x.ChanCmd); err != nil { - return err - } case *GossipMessage_Empty: - b.EncodeVarint(13<<3 | proto1.WireBytes) + b.EncodeVarint(12<<3 | proto1.WireBytes) if err := b.EncodeMessage(x.Empty); err != nil { return err } case *GossipMessage_Conn: - b.EncodeVarint(14<<3 | proto1.WireBytes) + b.EncodeVarint(13<<3 | proto1.WireBytes) if err := b.EncodeMessage(x.Conn); err != nil { return err } case *GossipMessage_StateInfo: - b.EncodeVarint(15<<3 | proto1.WireBytes) + b.EncodeVarint(14<<3 | proto1.WireBytes) if err := b.EncodeMessage(x.StateInfo); err != nil { return err } - case *GossipMessage_StateRequest: + case *GossipMessage_StateSnapshot: + b.EncodeVarint(15<<3 | proto1.WireBytes) + if err := b.EncodeMessage(x.StateSnapshot); err != nil { + return err + } + case *GossipMessage_StateInfoPullReq: b.EncodeVarint(16<<3 | proto1.WireBytes) + if err := b.EncodeMessage(x.StateInfoPullReq); err != nil { + return err + } + case *GossipMessage_StateRequest: + b.EncodeVarint(17<<3 | proto1.WireBytes) if err := b.EncodeMessage(x.StateRequest); err != nil { return err } case *GossipMessage_StateResponse: - b.EncodeVarint(17<<3 | proto1.WireBytes) + b.EncodeVarint(18<<3 | proto1.WireBytes) if err := b.EncodeMessage(x.StateResponse); err != nil { return err } case *GossipMessage_LeadershipMsg: - b.EncodeVarint(18<<3 | proto1.WireBytes) + b.EncodeVarint(19<<3 | proto1.WireBytes) if err := b.EncodeMessage(x.LeadershipMsg); err != nil { return err } case *GossipMessage_PeerIdentity: - b.EncodeVarint(19<<3 | proto1.WireBytes) + b.EncodeVarint(20<<3 | proto1.WireBytes) if err := b.EncodeMessage(x.PeerIdentity); err != nil { return err } @@ -512,15 +534,7 @@ func _GossipMessage_OneofUnmarshaler(msg proto1.Message, tag, wire int, b *proto err := b.DecodeMessage(msg) m.Content = &GossipMessage_DataUpdate{msg} return true, err - case 12: // content.chanCmd - if wire != proto1.WireBytes { - return true, proto1.ErrInternalBadWireType - } - msg := new(ChannelCommand) - err := b.DecodeMessage(msg) - m.Content = &GossipMessage_ChanCmd{msg} - return true, err - case 13: // content.empty + case 12: // content.empty if wire != proto1.WireBytes { return true, proto1.ErrInternalBadWireType } @@ -528,7 +542,7 @@ func _GossipMessage_OneofUnmarshaler(msg proto1.Message, tag, wire int, b *proto err := b.DecodeMessage(msg) m.Content = &GossipMessage_Empty{msg} return true, err - case 14: // content.conn + case 13: // content.conn if wire != proto1.WireBytes { return true, proto1.ErrInternalBadWireType } @@ -536,7 +550,7 @@ func _GossipMessage_OneofUnmarshaler(msg proto1.Message, tag, wire int, b *proto err := b.DecodeMessage(msg) m.Content = &GossipMessage_Conn{msg} return true, err - case 15: // content.stateInfo + case 14: // content.stateInfo if wire != proto1.WireBytes { return true, proto1.ErrInternalBadWireType } @@ -544,7 +558,23 @@ func _GossipMessage_OneofUnmarshaler(msg proto1.Message, tag, wire int, b *proto err := b.DecodeMessage(msg) m.Content = &GossipMessage_StateInfo{msg} return true, err - case 16: // content.stateRequest + case 15: // content.stateSnapshot + if wire != proto1.WireBytes { + return true, proto1.ErrInternalBadWireType + } + msg := new(StateInfoSnapshot) + err := b.DecodeMessage(msg) + m.Content = &GossipMessage_StateSnapshot{msg} + return true, err + case 16: // content.stateInfoPullReq + if wire != proto1.WireBytes { + return true, proto1.ErrInternalBadWireType + } + msg := new(StateInfoPullRequest) + err := b.DecodeMessage(msg) + m.Content = &GossipMessage_StateInfoPullReq{msg} + return true, err + case 17: // content.stateRequest if wire != proto1.WireBytes { return true, proto1.ErrInternalBadWireType } @@ -552,7 +582,7 @@ func _GossipMessage_OneofUnmarshaler(msg proto1.Message, tag, wire int, b *proto err := b.DecodeMessage(msg) m.Content = &GossipMessage_StateRequest{msg} return true, err - case 17: // content.stateResponse + case 18: // content.stateResponse if wire != proto1.WireBytes { return true, proto1.ErrInternalBadWireType } @@ -560,7 +590,7 @@ func _GossipMessage_OneofUnmarshaler(msg proto1.Message, tag, wire int, b *proto err := b.DecodeMessage(msg) m.Content = &GossipMessage_StateResponse{msg} return true, err - case 18: // content.leadershipMsg + case 19: // content.leadershipMsg if wire != proto1.WireBytes { return true, proto1.ErrInternalBadWireType } @@ -568,7 +598,7 @@ func _GossipMessage_OneofUnmarshaler(msg proto1.Message, tag, wire int, b *proto err := b.DecodeMessage(msg) m.Content = &GossipMessage_LeadershipMsg{msg} return true, err - case 19: // content.peerIdentity + case 20: // content.peerIdentity if wire != proto1.WireBytes { return true, proto1.ErrInternalBadWireType } @@ -625,44 +655,49 @@ func _GossipMessage_OneofSizer(msg proto1.Message) (n int) { n += proto1.SizeVarint(11<<3 | proto1.WireBytes) n += proto1.SizeVarint(uint64(s)) n += s - case *GossipMessage_ChanCmd: - s := proto1.Size(x.ChanCmd) - n += proto1.SizeVarint(12<<3 | proto1.WireBytes) - n += proto1.SizeVarint(uint64(s)) - n += s case *GossipMessage_Empty: s := proto1.Size(x.Empty) - n += proto1.SizeVarint(13<<3 | proto1.WireBytes) + n += proto1.SizeVarint(12<<3 | proto1.WireBytes) n += proto1.SizeVarint(uint64(s)) n += s case *GossipMessage_Conn: s := proto1.Size(x.Conn) - n += proto1.SizeVarint(14<<3 | proto1.WireBytes) + n += proto1.SizeVarint(13<<3 | proto1.WireBytes) n += proto1.SizeVarint(uint64(s)) n += s case *GossipMessage_StateInfo: s := proto1.Size(x.StateInfo) + n += proto1.SizeVarint(14<<3 | proto1.WireBytes) + n += proto1.SizeVarint(uint64(s)) + n += s + case *GossipMessage_StateSnapshot: + s := proto1.Size(x.StateSnapshot) n += proto1.SizeVarint(15<<3 | proto1.WireBytes) n += proto1.SizeVarint(uint64(s)) n += s + case *GossipMessage_StateInfoPullReq: + s := proto1.Size(x.StateInfoPullReq) + n += proto1.SizeVarint(16<<3 | proto1.WireBytes) + n += proto1.SizeVarint(uint64(s)) + n += s case *GossipMessage_StateRequest: s := proto1.Size(x.StateRequest) - n += proto1.SizeVarint(16<<3 | proto1.WireBytes) + n += proto1.SizeVarint(17<<3 | proto1.WireBytes) n += proto1.SizeVarint(uint64(s)) n += s case *GossipMessage_StateResponse: s := proto1.Size(x.StateResponse) - n += proto1.SizeVarint(17<<3 | proto1.WireBytes) + n += proto1.SizeVarint(18<<3 | proto1.WireBytes) n += proto1.SizeVarint(uint64(s)) n += s case *GossipMessage_LeadershipMsg: s := proto1.Size(x.LeadershipMsg) - n += proto1.SizeVarint(18<<3 | proto1.WireBytes) + n += proto1.SizeVarint(19<<3 | proto1.WireBytes) n += proto1.SizeVarint(uint64(s)) n += s case *GossipMessage_PeerIdentity: s := proto1.Size(x.PeerIdentity) - n += proto1.SizeVarint(19<<3 | proto1.WireBytes) + n += proto1.SizeVarint(20<<3 | proto1.WireBytes) n += proto1.SizeVarint(uint64(s)) n += s case nil: @@ -693,16 +728,32 @@ func (m *StateInfo) GetTimestamp() *PeerTime { return nil } -// ChannelCommand is the message that contains a creation -// or modification of a channel -type ChannelCommand struct { - Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` +// StateInfoSnapshot is an aggregation of StateInfo messages +type StateInfoSnapshot struct { + Elements []*GossipMessage `protobuf:"bytes,1,rep,name=elements" json:"elements,omitempty"` +} + +func (m *StateInfoSnapshot) Reset() { *m = StateInfoSnapshot{} } +func (m *StateInfoSnapshot) String() string { return proto1.CompactTextString(m) } +func (*StateInfoSnapshot) ProtoMessage() {} +func (*StateInfoSnapshot) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *StateInfoSnapshot) GetElements() []*GossipMessage { + if m != nil { + return m.Elements + } + return nil +} + +// StateInfoPullRequest is used to fetch a StateInfoSnapshot +// from a remote peer +type StateInfoPullRequest struct { } -func (m *ChannelCommand) Reset() { *m = ChannelCommand{} } -func (m *ChannelCommand) String() string { return proto1.CompactTextString(m) } -func (*ChannelCommand) ProtoMessage() {} -func (*ChannelCommand) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } +func (m *StateInfoPullRequest) Reset() { *m = StateInfoPullRequest{} } +func (m *StateInfoPullRequest) String() string { return proto1.CompactTextString(m) } +func (*StateInfoPullRequest) ProtoMessage() {} +func (*StateInfoPullRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } // ConnEstablish is the message used for the gossip handshake // Whenever a peer connects to another peer, it handshakes @@ -716,7 +767,7 @@ type ConnEstablish struct { func (m *ConnEstablish) Reset() { *m = ConnEstablish{} } func (m *ConnEstablish) String() string { return proto1.CompactTextString(m) } func (*ConnEstablish) ProtoMessage() {} -func (*ConnEstablish) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } +func (*ConnEstablish) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } // PeerIdentity defines the identity of the peer // Used to make other peers learn of the identity @@ -731,7 +782,7 @@ type PeerIdentity struct { func (m *PeerIdentity) Reset() { *m = PeerIdentity{} } func (m *PeerIdentity) String() string { return proto1.CompactTextString(m) } func (*PeerIdentity) ProtoMessage() {} -func (*PeerIdentity) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } +func (*PeerIdentity) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } // DataRequest is a message used for a peer to request // certain data blocks from a remote peer @@ -744,7 +795,7 @@ type DataRequest struct { func (m *DataRequest) Reset() { *m = DataRequest{} } func (m *DataRequest) String() string { return proto1.CompactTextString(m) } func (*DataRequest) ProtoMessage() {} -func (*DataRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } +func (*DataRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } // GossipHello is the message that is used for the peer to initiate // a pull round with another peer @@ -757,7 +808,7 @@ type GossipHello struct { func (m *GossipHello) Reset() { *m = GossipHello{} } func (m *GossipHello) String() string { return proto1.CompactTextString(m) } func (*GossipHello) ProtoMessage() {} -func (*GossipHello) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } +func (*GossipHello) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } // DataUpdate is the the final message in the pull phase // sent from the receiver to the initiator @@ -770,7 +821,7 @@ type DataUpdate struct { func (m *DataUpdate) Reset() { *m = DataUpdate{} } func (m *DataUpdate) String() string { return proto1.CompactTextString(m) } func (*DataUpdate) ProtoMessage() {} -func (*DataUpdate) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } +func (*DataUpdate) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } func (m *DataUpdate) GetData() []*GossipMessage { if m != nil { @@ -790,7 +841,7 @@ type DataDigest struct { func (m *DataDigest) Reset() { *m = DataDigest{} } func (m *DataDigest) String() string { return proto1.CompactTextString(m) } func (*DataDigest) ProtoMessage() {} -func (*DataDigest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } +func (*DataDigest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} } // DataMessage is the message that contains a block type DataMessage struct { @@ -800,7 +851,7 @@ type DataMessage struct { func (m *DataMessage) Reset() { *m = DataMessage{} } func (m *DataMessage) String() string { return proto1.CompactTextString(m) } func (*DataMessage) ProtoMessage() {} -func (*DataMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} } +func (*DataMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} } func (m *DataMessage) GetPayload() *Payload { if m != nil { @@ -819,7 +870,7 @@ type Payload struct { func (m *Payload) Reset() { *m = Payload{} } func (m *Payload) String() string { return proto1.CompactTextString(m) } func (*Payload) ProtoMessage() {} -func (*Payload) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} } +func (*Payload) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} } // AliveMessage is sent to inform remote peers // of a peer's existence and activity @@ -833,7 +884,7 @@ type AliveMessage struct { func (m *AliveMessage) Reset() { *m = AliveMessage{} } func (m *AliveMessage) String() string { return proto1.CompactTextString(m) } func (*AliveMessage) ProtoMessage() {} -func (*AliveMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} } +func (*AliveMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} } func (m *AliveMessage) GetMembership() *Member { if m != nil { @@ -860,7 +911,7 @@ type LeadershipMessage struct { func (m *LeadershipMessage) Reset() { *m = LeadershipMessage{} } func (m *LeadershipMessage) String() string { return proto1.CompactTextString(m) } func (*LeadershipMessage) ProtoMessage() {} -func (*LeadershipMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} } +func (*LeadershipMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} } func (m *LeadershipMessage) GetMembership() *Member { if m != nil { @@ -885,7 +936,7 @@ type PeerTime struct { func (m *PeerTime) Reset() { *m = PeerTime{} } func (m *PeerTime) String() string { return proto1.CompactTextString(m) } func (*PeerTime) ProtoMessage() {} -func (*PeerTime) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} } +func (*PeerTime) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{14} } // MembershipRequest is used to ask membership information // from a remote peer @@ -897,7 +948,7 @@ type MembershipRequest struct { func (m *MembershipRequest) Reset() { *m = MembershipRequest{} } func (m *MembershipRequest) String() string { return proto1.CompactTextString(m) } func (*MembershipRequest) ProtoMessage() {} -func (*MembershipRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{14} } +func (*MembershipRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{15} } func (m *MembershipRequest) GetSelfInformation() *AliveMessage { if m != nil { @@ -915,7 +966,7 @@ type MembershipResponse struct { func (m *MembershipResponse) Reset() { *m = MembershipResponse{} } func (m *MembershipResponse) String() string { return proto1.CompactTextString(m) } func (*MembershipResponse) ProtoMessage() {} -func (*MembershipResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{15} } +func (*MembershipResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{16} } func (m *MembershipResponse) GetAlive() []*AliveMessage { if m != nil { @@ -942,7 +993,7 @@ type Member struct { func (m *Member) Reset() { *m = Member{} } func (m *Member) String() string { return proto1.CompactTextString(m) } func (*Member) ProtoMessage() {} -func (*Member) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{16} } +func (*Member) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{17} } // Empty is used for pinging and in tests type Empty struct { @@ -951,7 +1002,7 @@ type Empty struct { func (m *Empty) Reset() { *m = Empty{} } func (m *Empty) String() string { return proto1.CompactTextString(m) } func (*Empty) ProtoMessage() {} -func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{17} } +func (*Empty) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18} } // RemoteStateRequest is used to ask a set of blocks // from a remote peer @@ -962,7 +1013,7 @@ type RemoteStateRequest struct { func (m *RemoteStateRequest) Reset() { *m = RemoteStateRequest{} } func (m *RemoteStateRequest) String() string { return proto1.CompactTextString(m) } func (*RemoteStateRequest) ProtoMessage() {} -func (*RemoteStateRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18} } +func (*RemoteStateRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{19} } // RemoteStateResponse is used to send a set of blocks // to a remote peer @@ -973,7 +1024,7 @@ type RemoteStateResponse struct { func (m *RemoteStateResponse) Reset() { *m = RemoteStateResponse{} } func (m *RemoteStateResponse) String() string { return proto1.CompactTextString(m) } func (*RemoteStateResponse) ProtoMessage() {} -func (*RemoteStateResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{19} } +func (*RemoteStateResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{20} } func (m *RemoteStateResponse) GetPayloads() []*Payload { if m != nil { @@ -985,7 +1036,8 @@ func (m *RemoteStateResponse) GetPayloads() []*Payload { func init() { proto1.RegisterType((*GossipMessage)(nil), "proto.GossipMessage") proto1.RegisterType((*StateInfo)(nil), "proto.StateInfo") - proto1.RegisterType((*ChannelCommand)(nil), "proto.ChannelCommand") + proto1.RegisterType((*StateInfoSnapshot)(nil), "proto.StateInfoSnapshot") + proto1.RegisterType((*StateInfoPullRequest)(nil), "proto.StateInfoPullRequest") proto1.RegisterType((*ConnEstablish)(nil), "proto.ConnEstablish") proto1.RegisterType((*PeerIdentity)(nil), "proto.PeerIdentity") proto1.RegisterType((*DataRequest)(nil), "proto.DataRequest") @@ -1152,77 +1204,80 @@ var _Gossip_serviceDesc = grpc.ServiceDesc{ func init() { proto1.RegisterFile("message.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 1152 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xc4, 0x56, 0xdd, 0x6e, 0xdb, 0x36, - 0x14, 0xb6, 0xfc, 0xaf, 0x63, 0x39, 0x71, 0xd8, 0x6e, 0xd0, 0x82, 0x0d, 0x08, 0x84, 0x02, 0xf3, - 0x8c, 0xd6, 0x69, 0xdd, 0x8b, 0x61, 0x17, 0x43, 0x9b, 0xc4, 0x59, 0x1c, 0xac, 0x76, 0x02, 0x26, - 0x1d, 0xd0, 0xdd, 0x04, 0x8c, 0xc5, 0xc8, 0x5a, 0x24, 0x4a, 0x31, 0xe9, 0x0d, 0xd9, 0x23, 0x6c, - 0x17, 0x7b, 0x8b, 0x3d, 0xe7, 0xc0, 0x1f, 0xd9, 0x52, 0xec, 0x0c, 0xe8, 0x86, 0x61, 0x57, 0xd2, - 0x21, 0xbf, 0x8f, 0xe7, 0x90, 0xe7, 0x9c, 0x8f, 0x84, 0x76, 0x4c, 0x39, 0x27, 0x01, 0xed, 0xa7, - 0xf3, 0x44, 0x24, 0xa8, 0xa6, 0x3e, 0xde, 0xef, 0x4d, 0x68, 0x9f, 0x24, 0x9c, 0x87, 0xe9, 0x58, - 0x4f, 0xa3, 0xa7, 0x50, 0x63, 0x09, 0x9b, 0x52, 0xd7, 0xda, 0xb3, 0xba, 0x55, 0xac, 0x0d, 0xe4, - 0x42, 0x63, 0x3a, 0x23, 0x8c, 0xd1, 0xc8, 0x2d, 0xef, 0x59, 0x5d, 0x07, 0x67, 0x26, 0xea, 0x41, - 0x45, 0x90, 0xc0, 0xad, 0xec, 0x59, 0xdd, 0xad, 0x81, 0xab, 0x57, 0xef, 0x17, 0x96, 0xec, 0x5f, - 0x92, 0x00, 0x4b, 0x10, 0x7a, 0x05, 0x4d, 0x12, 0x85, 0x3f, 0xd3, 0x31, 0x0f, 0xdc, 0xea, 0x9e, - 0xd5, 0x6d, 0x0d, 0x9e, 0x18, 0xc2, 0x81, 0x1a, 0xd6, 0xf8, 0x51, 0x09, 0x2f, 0x61, 0x68, 0x00, - 0xf5, 0x98, 0xc6, 0x98, 0xde, 0xb9, 0x35, 0x45, 0xc8, 0x3c, 0x8c, 0x69, 0x7c, 0x4d, 0xe7, 0x7c, - 0x16, 0xa6, 0x98, 0xde, 0x2d, 0x28, 0x17, 0xa3, 0x12, 0x36, 0x48, 0xf4, 0xda, 0x70, 0xb8, 0x5b, - 0x57, 0x9c, 0xcf, 0x36, 0x70, 0x78, 0x9a, 0x30, 0x4e, 0x97, 0x24, 0x8e, 0xfa, 0xd0, 0xf0, 0x89, - 0x20, 0x32, 0xb4, 0x86, 0x62, 0x21, 0xc3, 0x1a, 0xca, 0xd1, 0x65, 0x64, 0x19, 0x08, 0xf5, 0xa0, - 0x36, 0xa3, 0x51, 0x94, 0xb8, 0xcd, 0x02, 0x5a, 0xef, 0x7c, 0x24, 0x67, 0x46, 0x25, 0xac, 0x21, - 0xe8, 0x85, 0x5e, 0x7b, 0x18, 0x06, 0xae, 0xad, 0xd0, 0x3b, 0xb9, 0xb5, 0x87, 0x61, 0xa0, 0xc3, - 0xcf, 0x30, 0x59, 0x28, 0x72, 0xd3, 0xb0, 0x16, 0xca, 0x6a, 0xbb, 0x19, 0x08, 0xbd, 0x06, 0x90, - 0xbf, 0xef, 0x53, 0x9f, 0x08, 0xea, 0xb6, 0xd6, 0x3c, 0xe8, 0x89, 0x51, 0x09, 0xe7, 0x60, 0xe8, - 0x95, 0xce, 0xe8, 0x51, 0xec, 0xbb, 0x8e, 0x62, 0x7c, 0x62, 0x18, 0x47, 0x3a, 0xb1, 0x47, 0x49, - 0x1c, 0x13, 0xe6, 0x4b, 0x3f, 0x06, 0x87, 0x9e, 0x41, 0x8d, 0xc6, 0xa9, 0xb8, 0x77, 0xdb, 0x8a, - 0xe0, 0x18, 0xc2, 0xb1, 0x1c, 0x93, 0x9b, 0x55, 0x93, 0xa8, 0x07, 0xd5, 0x69, 0xc2, 0x98, 0xbb, - 0xa5, 0x40, 0x4f, 0xb3, 0x55, 0x13, 0xc6, 0x8e, 0xb9, 0x20, 0xd7, 0x51, 0xc8, 0x67, 0xa3, 0x12, - 0x56, 0x18, 0xf4, 0x12, 0x6c, 0x2e, 0x88, 0xa0, 0xa7, 0xec, 0x26, 0x71, 0xb7, 0x15, 0xa1, 0x63, - 0x08, 0x17, 0xd9, 0xf8, 0xa8, 0x84, 0x57, 0x20, 0xf4, 0x06, 0x1c, 0x65, 0x98, 0x63, 0x70, 0x3b, - 0x85, 0x0c, 0x63, 0x1a, 0x27, 0x82, 0x5e, 0xe4, 0x00, 0xa3, 0x12, 0x2e, 0x10, 0xd0, 0x21, 0xb4, - 0x8d, 0xad, 0x4b, 0xc0, 0xdd, 0x51, 0x2b, 0xec, 0x6e, 0x5a, 0x61, 0x59, 0x24, 0x45, 0x0a, 0x7a, - 0x0b, 0xed, 0x88, 0x12, 0x5f, 0xd7, 0x92, 0xac, 0x18, 0x54, 0xa8, 0xcd, 0x77, 0xab, 0xb9, 0x65, - 0xdd, 0x14, 0x09, 0xe8, 0x1b, 0x70, 0x52, 0x4a, 0xe7, 0xa7, 0x3e, 0x65, 0x22, 0x14, 0xf7, 0xee, - 0x93, 0x42, 0x37, 0x9c, 0xe7, 0xa6, 0xe4, 0x06, 0xf2, 0x50, 0x6f, 0x02, 0x95, 0x4b, 0x12, 0xa0, - 0x36, 0xd8, 0xef, 0x27, 0xc3, 0xe3, 0xef, 0x4e, 0x27, 0xc7, 0xc3, 0x4e, 0x09, 0xd9, 0x50, 0x3b, - 0x1e, 0x9f, 0x5f, 0x7e, 0xe8, 0x58, 0xc8, 0x81, 0xe6, 0x19, 0x3e, 0xb9, 0x3a, 0x9b, 0xbc, 0xfb, - 0xd0, 0x29, 0x4b, 0xdc, 0xd1, 0xe8, 0x60, 0xa2, 0xcd, 0x0a, 0xea, 0x80, 0xa3, 0xcc, 0x83, 0xc9, - 0xf0, 0xea, 0x0c, 0x9f, 0x74, 0xaa, 0x87, 0x36, 0x34, 0xa6, 0x09, 0x13, 0x94, 0x09, 0xef, 0x37, - 0x0b, 0xec, 0xe5, 0xb9, 0xa3, 0x5d, 0x68, 0xc6, 0x54, 0x10, 0x59, 0x33, 0x4a, 0x0c, 0x1c, 0xbc, - 0xb4, 0xd1, 0x0b, 0xb0, 0x45, 0x18, 0x53, 0x2e, 0x48, 0x9c, 0x2a, 0x45, 0x68, 0x0d, 0xb6, 0x73, - 0xc1, 0x5f, 0x86, 0x31, 0xc5, 0x2b, 0x84, 0x14, 0x95, 0xf4, 0x36, 0x3c, 0x1d, 0x2a, 0x99, 0x70, - 0xb0, 0x36, 0xd0, 0xe7, 0x60, 0xf3, 0x30, 0x60, 0x44, 0x2c, 0xe6, 0x54, 0xe9, 0x81, 0x83, 0x57, - 0x03, 0x5e, 0x0f, 0xb6, 0x8a, 0xa5, 0x28, 0x45, 0x28, 0x25, 0xf7, 0x51, 0x42, 0x7c, 0x13, 0x4f, - 0x66, 0x7a, 0xdf, 0x43, 0xbb, 0x50, 0x60, 0xa8, 0x03, 0x15, 0x1e, 0x06, 0x06, 0x26, 0x7f, 0x57, - 0x21, 0x94, 0xf3, 0x21, 0x20, 0xa8, 0x4e, 0xe9, 0x5c, 0x98, 0xb8, 0xd4, 0xbf, 0x77, 0x03, 0x4e, - 0x3e, 0x01, 0xff, 0x66, 0xad, 0xc2, 0x19, 0x56, 0x8b, 0x67, 0xe8, 0xdd, 0x42, 0x2b, 0xd7, 0xd0, - 0x8f, 0x0b, 0xaf, 0xaf, 0x04, 0x82, 0xbb, 0xe5, 0xbd, 0x4a, 0xd7, 0xc6, 0x99, 0x89, 0x9e, 0x43, - 0x23, 0xe6, 0xc1, 0xe5, 0x7d, 0x4a, 0x8d, 0xf8, 0x66, 0x2a, 0x71, 0xbe, 0x88, 0xa2, 0xb1, 0x9e, - 0xc1, 0x19, 0xc4, 0x8b, 0xa1, 0x95, 0x93, 0xa6, 0x47, 0x9c, 0xe5, 0xa3, 0x2d, 0x3f, 0xc8, 0xf8, - 0xc7, 0xb9, 0xfb, 0x15, 0x60, 0xa5, 0x3c, 0x8f, 0x78, 0xeb, 0x42, 0xd5, 0x78, 0xaa, 0xe4, 0x84, - 0xa2, 0x70, 0x75, 0xe0, 0xea, 0x3f, 0xf0, 0xfd, 0x93, 0xf6, 0xad, 0x75, 0xf5, 0x3f, 0x3e, 0xd6, - 0xaf, 0x75, 0x0e, 0xb3, 0xcb, 0xb3, 0x5b, 0xac, 0xd0, 0xd6, 0x60, 0x2b, 0x23, 0xeb, 0xd1, 0x55, - 0xc5, 0x9e, 0x42, 0xc3, 0x8c, 0xa1, 0x4f, 0xa1, 0xce, 0xe9, 0xdd, 0x64, 0x11, 0x9b, 0x10, 0x8d, - 0x25, 0xeb, 0x69, 0x46, 0xf8, 0x4c, 0x65, 0xc2, 0xc6, 0xea, 0x5f, 0x8e, 0xa9, 0x33, 0x33, 0x35, - 0xa6, 0xea, 0xe8, 0x4f, 0x0b, 0x9c, 0xfc, 0xfd, 0x89, 0x5e, 0x00, 0xc4, 0xcb, 0xab, 0xce, 0x04, - 0xd2, 0x2e, 0xdc, 0x81, 0x38, 0x07, 0xf8, 0xd8, 0x5e, 0x2e, 0x74, 0x6d, 0xe5, 0x41, 0xd7, 0xca, - 0x12, 0x0a, 0x33, 0x51, 0x33, 0x05, 0x9f, 0xd9, 0xde, 0x1f, 0x16, 0xec, 0xac, 0x69, 0xe3, 0xff, - 0x19, 0xad, 0x77, 0x00, 0xcd, 0x8c, 0x84, 0xbe, 0x00, 0x08, 0xd9, 0xf4, 0x8a, 0x2d, 0xa4, 0x2b, - 0x93, 0x0a, 0x3b, 0x64, 0xd3, 0x89, 0x1a, 0xc8, 0x65, 0xa9, 0x9c, 0xcf, 0x92, 0x37, 0x83, 0x9d, - 0xb5, 0xb7, 0x08, 0xfa, 0x16, 0xb6, 0x39, 0x8d, 0x6e, 0xa4, 0x8c, 0xce, 0x63, 0x22, 0xc2, 0x84, - 0x99, 0x8d, 0x6d, 0x7a, 0xef, 0xe0, 0x87, 0x58, 0x59, 0xb3, 0xb7, 0x2c, 0xf9, 0x85, 0xa9, 0xda, - 0x74, 0xb0, 0x36, 0xbc, 0x19, 0xa0, 0xf5, 0x17, 0x0c, 0xfa, 0x0a, 0x6a, 0xea, 0xb1, 0xe4, 0x5a, - 0xaa, 0x8d, 0x36, 0x3a, 0xd0, 0x08, 0xf4, 0x25, 0x54, 0x7d, 0x4a, 0x7c, 0xd3, 0x70, 0x1b, 0x91, - 0x0a, 0xe0, 0xfd, 0x00, 0x75, 0xed, 0x49, 0xa6, 0x93, 0x32, 0x3f, 0x4d, 0x42, 0x26, 0xd4, 0x0e, - 0x6c, 0xbc, 0xb4, 0xff, 0x56, 0x2d, 0x36, 0x0a, 0xbe, 0xd7, 0x80, 0x9a, 0x7a, 0x2c, 0x78, 0x7d, - 0x40, 0xeb, 0x57, 0xb5, 0x6c, 0x4a, 0x7d, 0xa8, 0x5c, 0x6d, 0xa6, 0x8a, 0x33, 0xd3, 0x3b, 0x80, - 0x27, 0x1b, 0x2e, 0x66, 0xd4, 0x83, 0xa6, 0xe9, 0x27, 0x6e, 0xb6, 0xff, 0xb0, 0xdf, 0x96, 0xf3, - 0xbd, 0x37, 0xd0, 0xca, 0x75, 0xb0, 0xba, 0x3e, 0x99, 0x4f, 0x6f, 0x42, 0x46, 0xfd, 0x4e, 0x49, - 0x5e, 0x8b, 0x87, 0x51, 0x32, 0xbd, 0x35, 0xe7, 0xd0, 0xb1, 0xd0, 0x36, 0xb4, 0xb2, 0x1b, 0x60, - 0xcc, 0x83, 0x4e, 0x79, 0x90, 0x42, 0x5d, 0x6b, 0x13, 0x7a, 0x0b, 0x8e, 0xfe, 0xbb, 0x10, 0x73, - 0x4a, 0x62, 0xb4, 0x51, 0xba, 0x76, 0x37, 0x8e, 0x7a, 0xa5, 0xae, 0xf5, 0xd2, 0x42, 0xcf, 0xa0, - 0x7a, 0x1e, 0xb2, 0x00, 0x15, 0x9e, 0x50, 0xbb, 0x05, 0xcb, 0x2b, 0x1d, 0x3e, 0xff, 0xb1, 0x17, - 0x84, 0x62, 0xb6, 0xb8, 0xee, 0x4f, 0x93, 0x78, 0x7f, 0x76, 0x9f, 0xd2, 0x79, 0x44, 0xfd, 0x80, - 0xce, 0xf7, 0x6f, 0xc8, 0xf5, 0x3c, 0x9c, 0xee, 0x07, 0x6a, 0xe9, 0x7d, 0xc5, 0xba, 0xae, 0xab, - 0xcf, 0xeb, 0xbf, 0x02, 0x00, 0x00, 0xff, 0xff, 0x5c, 0x8d, 0x42, 0xaa, 0xe9, 0x0b, 0x00, 0x00, + // 1193 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xc4, 0x56, 0x4b, 0x6f, 0xdb, 0x46, + 0x10, 0x26, 0xf5, 0xb0, 0xc4, 0x11, 0x15, 0xd3, 0x1b, 0x23, 0x60, 0xdd, 0x16, 0x30, 0x88, 0x00, + 0x55, 0x8d, 0x44, 0x4e, 0x95, 0x43, 0xd1, 0x43, 0x91, 0xd8, 0x91, 0x1a, 0x09, 0x8d, 0x14, 0x63, + 0xed, 0x14, 0x48, 0x2f, 0x06, 0x2d, 0xae, 0x29, 0xd6, 0xe4, 0x92, 0xd1, 0xae, 0x5a, 0xb8, 0x3f, + 0xa1, 0x97, 0xfe, 0x8b, 0xfe, 0xc4, 0x9e, 0x8b, 0x7d, 0x90, 0x22, 0x2d, 0xb9, 0x40, 0x5a, 0x14, + 0x3d, 0x91, 0x33, 0xf3, 0xcd, 0x63, 0x67, 0xe7, 0xb1, 0xd0, 0x4d, 0x08, 0x63, 0x7e, 0x48, 0xfa, + 0xd9, 0x32, 0xe5, 0x29, 0x6a, 0xca, 0x8f, 0xf7, 0x67, 0x1b, 0xba, 0xaf, 0x53, 0xc6, 0xa2, 0x6c, + 0xaa, 0xc4, 0x68, 0x1f, 0x9a, 0x34, 0xa5, 0x73, 0xe2, 0x9a, 0x87, 0x66, 0xaf, 0x81, 0x15, 0x81, + 0x5c, 0x68, 0xcd, 0x17, 0x3e, 0xa5, 0x24, 0x76, 0x6b, 0x87, 0x66, 0xcf, 0xc6, 0x39, 0x89, 0x8e, + 0xa0, 0xce, 0xfd, 0xd0, 0xad, 0x1f, 0x9a, 0xbd, 0x07, 0x03, 0x57, 0x59, 0xef, 0x57, 0x4c, 0xf6, + 0x2f, 0xfc, 0x10, 0x0b, 0x10, 0xfa, 0x0a, 0xda, 0x7e, 0x1c, 0xfd, 0x4c, 0xa6, 0x2c, 0x74, 0x1b, + 0x87, 0x66, 0xaf, 0x33, 0x78, 0xa8, 0x15, 0x4e, 0x24, 0x5b, 0xe1, 0xc7, 0x06, 0x2e, 0x60, 0x68, + 0x00, 0x3b, 0x09, 0x49, 0x30, 0xf9, 0xe0, 0x36, 0xa5, 0x42, 0xee, 0x61, 0x4a, 0x92, 0x2b, 0xb2, + 0x64, 0x8b, 0x28, 0xc3, 0xe4, 0xc3, 0x8a, 0x30, 0x3e, 0x36, 0xb0, 0x46, 0xa2, 0xe7, 0x5a, 0x87, + 0xb9, 0x3b, 0x52, 0xe7, 0x93, 0x2d, 0x3a, 0x2c, 0x4b, 0x29, 0x23, 0x85, 0x12, 0x43, 0x7d, 0x68, + 0x05, 0x3e, 0xf7, 0x45, 0x68, 0x2d, 0xa9, 0x85, 0xb4, 0xd6, 0x50, 0x70, 0x8b, 0xc8, 0x72, 0x10, + 0x3a, 0x82, 0xe6, 0x82, 0xc4, 0x71, 0xea, 0xb6, 0x2b, 0x68, 0x75, 0xf2, 0xb1, 0x90, 0x8c, 0x0d, + 0xac, 0x20, 0xe8, 0xa9, 0xb2, 0x3d, 0x8c, 0x42, 0xd7, 0x92, 0xe8, 0xbd, 0x92, 0xed, 0x61, 0x14, + 0xaa, 0xf0, 0x73, 0x4c, 0x1e, 0x8a, 0x38, 0x34, 0x6c, 0x84, 0xb2, 0x3e, 0x6e, 0x0e, 0x42, 0xcf, + 0x01, 0xc4, 0xef, 0xbb, 0x2c, 0xf0, 0x39, 0x71, 0x3b, 0x1b, 0x1e, 0x94, 0x60, 0x6c, 0xe0, 0x12, + 0x0c, 0x3d, 0x86, 0x26, 0x49, 0x32, 0x7e, 0xeb, 0xda, 0x12, 0x6f, 0x6b, 0xfc, 0x48, 0xf0, 0x44, + 0xe4, 0x52, 0x88, 0x8e, 0xa0, 0x31, 0x4f, 0x29, 0x75, 0xbb, 0x12, 0xb4, 0xaf, 0x41, 0xaf, 0x52, + 0x4a, 0x47, 0x8c, 0xfb, 0x57, 0x71, 0xc4, 0x16, 0x63, 0x03, 0x4b, 0x0c, 0x7a, 0x06, 0x16, 0xe3, + 0x3e, 0x27, 0x13, 0x7a, 0x9d, 0xba, 0x0f, 0xa4, 0x82, 0xa3, 0x15, 0xce, 0x73, 0xfe, 0xd8, 0xc0, + 0x6b, 0x10, 0x7a, 0x09, 0x5d, 0x49, 0x9c, 0x53, 0x3f, 0x63, 0x8b, 0x94, 0xbb, 0xbb, 0x95, 0x3b, + 0x2e, 0xb4, 0x72, 0xf9, 0xd8, 0xc0, 0x55, 0x05, 0x34, 0x01, 0xa7, 0x30, 0x77, 0xb6, 0x8a, 0x63, + 0x91, 0x33, 0x47, 0x1a, 0xf9, 0xf4, 0xae, 0x11, 0x2d, 0xd6, 0xc9, 0xdb, 0x50, 0x43, 0x2f, 0xc0, + 0x96, 0x3c, 0x8d, 0x71, 0xf7, 0x2a, 0xb5, 0x83, 0x49, 0x92, 0x72, 0x72, 0x5e, 0x02, 0x8c, 0x0d, + 0x5c, 0x51, 0x40, 0xa7, 0xfa, 0x34, 0x79, 0x71, 0xb9, 0x48, 0x5a, 0x38, 0xd8, 0x66, 0xa1, 0x28, + 0xbf, 0xaa, 0x8a, 0xc8, 0x48, 0x4c, 0xfc, 0x40, 0x55, 0xa9, 0xa8, 0xc5, 0x87, 0x95, 0x8c, 0xbc, + 0x59, 0xcb, 0x8a, 0x8a, 0xac, 0x2a, 0xa0, 0x6f, 0xc0, 0xce, 0x08, 0x59, 0x4e, 0x02, 0x42, 0x79, + 0xc4, 0x6f, 0xdd, 0xfd, 0x4a, 0x9f, 0x9d, 0x95, 0x44, 0xe2, 0x00, 0x65, 0xa8, 0x77, 0x09, 0xf5, + 0x0b, 0x3f, 0x44, 0x5d, 0xb0, 0xde, 0xcd, 0x86, 0xa3, 0xef, 0x26, 0xb3, 0xd1, 0xd0, 0x31, 0x90, + 0x05, 0xcd, 0xd1, 0xf4, 0xec, 0xe2, 0xbd, 0x63, 0x22, 0x1b, 0xda, 0x6f, 0xf1, 0xeb, 0xcb, 0xb7, + 0xb3, 0x37, 0xef, 0x9d, 0x9a, 0xc0, 0xbd, 0x1a, 0x9f, 0xcc, 0x14, 0x59, 0x47, 0x0e, 0xd8, 0x92, + 0x3c, 0x99, 0x0d, 0x2f, 0xdf, 0xe2, 0xd7, 0x4e, 0x03, 0xed, 0x42, 0x47, 0x01, 0xb0, 0x64, 0x34, + 0x4f, 0x2d, 0x68, 0xcd, 0x53, 0xca, 0x09, 0xe5, 0xde, 0x6f, 0x26, 0x58, 0xc5, 0xd5, 0xa0, 0x03, + 0x68, 0x27, 0x84, 0xfb, 0xa2, 0x3c, 0xe5, 0xdc, 0xb1, 0x71, 0x41, 0xa3, 0xa7, 0x60, 0xf1, 0x28, + 0x21, 0x8c, 0xfb, 0x49, 0x26, 0x87, 0x4f, 0x67, 0xb0, 0x5b, 0x3a, 0xcd, 0x45, 0x94, 0x10, 0xbc, + 0x46, 0x88, 0xf9, 0x95, 0xdd, 0x44, 0x93, 0xa1, 0x9c, 0x48, 0x36, 0x56, 0x04, 0xfa, 0x0c, 0x2c, + 0x16, 0x85, 0xd4, 0xe7, 0xab, 0x25, 0x91, 0xa3, 0xc7, 0xc6, 0x6b, 0x86, 0x37, 0x82, 0xbd, 0x8d, + 0x5a, 0x43, 0xcf, 0xa0, 0x4d, 0x62, 0x92, 0x10, 0xca, 0x99, 0x6b, 0x1e, 0xd6, 0x4b, 0xe5, 0x5f, + 0x99, 0x6e, 0xb8, 0x40, 0x79, 0x8f, 0x60, 0x7f, 0x5b, 0xb5, 0x79, 0xdf, 0x43, 0xb7, 0xd2, 0x31, + 0xc8, 0x81, 0x3a, 0x8b, 0x42, 0x7d, 0x52, 0xf1, 0xbb, 0x8e, 0xba, 0x56, 0x8e, 0x1a, 0x41, 0x63, + 0x4e, 0x96, 0x5c, 0x1f, 0x45, 0xfe, 0x7b, 0xd7, 0x60, 0x97, 0x2f, 0xf1, 0xdf, 0xd8, 0xaa, 0xa4, + 0xbd, 0x51, 0x4d, 0xbb, 0x77, 0x03, 0x9d, 0xd2, 0xb8, 0xb9, 0x7f, 0x2d, 0x04, 0x72, 0x7c, 0x31, + 0xb7, 0x76, 0x58, 0xef, 0x59, 0x38, 0x27, 0xd1, 0x13, 0x68, 0x25, 0x2c, 0xbc, 0xb8, 0xcd, 0x88, + 0x5e, 0x0d, 0xf9, 0x0c, 0x13, 0x89, 0x99, 0x2a, 0x09, 0xce, 0x21, 0x5e, 0x02, 0x9d, 0xd2, 0xe0, + 0xbc, 0xc7, 0x59, 0x39, 0xda, 0xda, 0x9d, 0x22, 0xf9, 0x38, 0x77, 0xbf, 0x02, 0xac, 0xe7, 0xe2, + 0x3d, 0xde, 0x7a, 0xd0, 0xd0, 0x9e, 0xee, 0xbf, 0xfa, 0xc6, 0x3f, 0xf0, 0xfd, 0x93, 0xf2, 0xad, + 0xa6, 0xfe, 0x7f, 0x9c, 0xd6, 0xaf, 0xd5, 0x1d, 0xe6, 0xab, 0xbd, 0x07, 0xad, 0xcc, 0xbf, 0x8d, + 0x53, 0x3f, 0x90, 0xee, 0x3a, 0x83, 0x07, 0xb9, 0xb2, 0xe2, 0xe2, 0x5c, 0xec, 0x4d, 0xa0, 0xa5, + 0x79, 0xe8, 0x11, 0xec, 0x30, 0xf2, 0x61, 0xb6, 0x4a, 0x74, 0x88, 0x9a, 0x12, 0xf5, 0xb4, 0xf0, + 0xd9, 0x42, 0xde, 0x84, 0x85, 0xe5, 0xbf, 0xe0, 0xc9, 0x9c, 0xe9, 0x1a, 0x93, 0x75, 0xf4, 0x87, + 0x09, 0x76, 0x79, 0xbb, 0xa3, 0xa7, 0x00, 0x49, 0xb1, 0x88, 0x75, 0x20, 0xdd, 0xca, 0x86, 0xc6, + 0x25, 0xc0, 0xc7, 0xb6, 0x7f, 0xa5, 0xd1, 0xeb, 0x77, 0x1a, 0x5d, 0x94, 0x50, 0x94, 0x0f, 0x46, + 0x5d, 0xf0, 0x39, 0xed, 0xfd, 0x6e, 0xc2, 0xde, 0xc6, 0x7c, 0xfd, 0x3f, 0xa3, 0xf5, 0x4e, 0xa0, + 0x9d, 0x2b, 0xa1, 0xcf, 0x01, 0x22, 0x3a, 0xbf, 0xa4, 0x2b, 0xe1, 0x4a, 0x5f, 0x85, 0x15, 0xd1, + 0xf9, 0x4c, 0x32, 0x4a, 0xb7, 0x54, 0x2b, 0xdf, 0x92, 0xb7, 0x80, 0xbd, 0x8d, 0x97, 0x12, 0xfa, + 0x16, 0x76, 0x19, 0x89, 0xaf, 0xc5, 0x98, 0x5a, 0x26, 0x3e, 0x8f, 0x52, 0xaa, 0x0f, 0xb6, 0xed, + 0x35, 0x86, 0xef, 0x62, 0x45, 0xcd, 0xde, 0xd0, 0xf4, 0x17, 0x2a, 0x6b, 0xd3, 0xc6, 0x8a, 0xf0, + 0x16, 0x80, 0x36, 0xdf, 0x57, 0xe8, 0x4b, 0x68, 0xca, 0xa7, 0x9c, 0x9e, 0xa0, 0x5b, 0x1d, 0x28, + 0x04, 0xfa, 0x02, 0x1a, 0x01, 0xf1, 0x03, 0xdd, 0x70, 0x5b, 0x91, 0x12, 0xe0, 0xfd, 0x00, 0x3b, + 0xca, 0x93, 0xb8, 0x4e, 0x42, 0x83, 0x2c, 0x8d, 0x28, 0x97, 0x27, 0xb0, 0x70, 0x41, 0xff, 0xed, + 0xb4, 0xd8, 0xba, 0x23, 0xbc, 0x16, 0x34, 0xe5, 0xeb, 0xc7, 0xeb, 0x03, 0xda, 0x5c, 0xf7, 0xa2, + 0x29, 0x55, 0x52, 0xd5, 0x3a, 0x68, 0xe0, 0x9c, 0xf4, 0x4e, 0xe0, 0xe1, 0x96, 0xe5, 0x8e, 0x8e, + 0xa0, 0xad, 0xfb, 0x29, 0x5f, 0x20, 0x77, 0xfb, 0xad, 0x90, 0x1f, 0xbd, 0x80, 0x4e, 0xa9, 0x83, + 0xe5, 0x0a, 0xa6, 0x01, 0xb9, 0x8e, 0x28, 0x09, 0x1c, 0x43, 0xac, 0xd6, 0xd3, 0x38, 0x9d, 0xdf, + 0xe8, 0x3c, 0x38, 0xa6, 0x58, 0xad, 0xf9, 0x06, 0x98, 0xb2, 0xd0, 0xa9, 0x0d, 0x32, 0xd8, 0x51, + 0xb3, 0x09, 0xbd, 0x04, 0x5b, 0xfd, 0x9d, 0xf3, 0x25, 0xf1, 0x13, 0xb4, 0x75, 0x74, 0x1d, 0x6c, + 0xe5, 0x7a, 0x46, 0xcf, 0x7c, 0x66, 0xa2, 0xc7, 0xd0, 0x38, 0x8b, 0x68, 0x88, 0x2a, 0x6f, 0xc2, + 0x83, 0x0a, 0xe5, 0x19, 0xa7, 0x4f, 0x7e, 0x3c, 0x0a, 0x23, 0xbe, 0x58, 0x5d, 0xf5, 0xe7, 0x69, + 0x72, 0xbc, 0xb8, 0xcd, 0xc8, 0x32, 0x26, 0x41, 0x48, 0x96, 0xc7, 0xd7, 0xfe, 0xd5, 0x32, 0x9a, + 0x1f, 0x87, 0xd2, 0xf4, 0xb1, 0xd4, 0xba, 0xda, 0x91, 0x9f, 0xe7, 0x7f, 0x05, 0x00, 0x00, 0xff, + 0xff, 0x3a, 0x51, 0x83, 0xec, 0x87, 0x0c, 0x00, 0x00, } diff --git a/gossip/proto/message.proto b/gossip/proto/message.proto index dd222ba51ad..fb971c092d6 100644 --- a/gossip/proto/message.proto +++ b/gossip/proto/message.proto @@ -35,6 +35,7 @@ message GossipMessage { ORG_ONLY = 2; CHAN_ONLY = 3; CHAN_AND_ORG = 4; + CHAN_OR_ORG = 5; } // determines to which peers it is allowed @@ -56,30 +57,33 @@ message GossipMessage { DataRequest dataReq = 10; DataUpdate dataUpdate = 11; - // Used for creating or modifying a channel - ChannelCommand chanCmd = 12; - // Empty message, used for pinging - Empty empty = 13; + Empty empty = 12; // ConnEstablish, used for establishing a connection - ConnEstablish conn = 14; + ConnEstablish conn = 13; // Used for relaying information // about state - StateInfo stateInfo = 15; + StateInfo stateInfo = 14; + + // Used for sending sets of StateInfo messages + StateInfoSnapshot stateSnapshot = 15; + + // Used for asking for StateInfoSnapshots + StateInfoPullRequest stateInfoPullReq = 16; // Used to ask from a remote peer a set of blocks - RemoteStateRequest stateRequest = 16; + RemoteStateRequest stateRequest = 17; // Used to send a set of blocks to a remote peer - RemoteStateResponse stateResponse = 17; + RemoteStateResponse stateResponse = 18; // Used to indicate intent of peer to become leader - LeadershipMessage leadershipMsg = 18; + LeadershipMessage leadershipMsg = 19; // Used to learn of a peer's certificate - PeerIdentity peerIdentity = 19; + PeerIdentity peerIdentity = 20; } } @@ -92,12 +96,16 @@ message StateInfo { bytes signature = 4; } -// ChannelCommand is the message that contains a creation -// or modification of a channel -message ChannelCommand { - bytes payload = 1; +// StateInfoSnapshot is an aggregation of StateInfo messages +message StateInfoSnapshot { + repeated GossipMessage elements = 1; } +// StateInfoPullRequest is used to fetch a StateInfoSnapshot +// from a remote peer +message StateInfoPullRequest { + +} // ConnEstablish is the message used for the gossip handshake // Whenever a peer connects to another peer, it handshakes