Skip to content

Commit

Permalink
[FAB-3970] Add ledger height to pull
Browse files Browse the repository at this point in the history
We filter blocks that their seq num is lower than ledger height
from incomming digests.

Change-Id: I2b49de5c806e3c01fcfdc7f55eb02beba76f4342
Signed-off-by: Gennady Laventman <gennady@il.ibm.com>
  • Loading branch information
gennadylaventman committed Aug 16, 2017
1 parent 140edf7 commit 61e0907
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 51 deletions.
2 changes: 1 addition & 1 deletion gossip/state/metastate.go → gossip/common/metastate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package state
package common

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,14 @@ Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package state
package common

import (
"testing"

"github.com/hyperledger/fabric/gossip/util"
"github.com/stretchr/testify/assert"
)

func init() {
util.SetupTestLogging()
}

func TestNewNodeMetastate(t *testing.T) {
metastate := NewNodeMetastate(0)
assert.Equal(t, metastate.Height(), uint64(0))
Expand Down
31 changes: 31 additions & 0 deletions gossip/gossip/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package channel
import (
"bytes"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -127,6 +128,7 @@ type gossipChannel struct {
stateInfoPublishScheduler *time.Ticker
stateInfoRequestScheduler *time.Ticker
memFilter *membershipFilter
ledgerHeight uint64
}

type membershipFilter struct {
Expand Down Expand Up @@ -335,6 +337,27 @@ func (gc *gossipChannel) createBlockPuller() pull.Mediator {
gc.DeMultiplex(msg)
},
}

adapter.IngressDigFilter = func(digestMsg *proto.DataDigest) *proto.DataDigest {
gc.RLock()
height := gc.ledgerHeight
gc.RUnlock()
digests := digestMsg.Digests
digestMsg.Digests = nil
for i := range digests {
seqNum, err := strconv.ParseUint(digests[i], 10, 64)
if err != nil {
gc.logger.Warning("Can't parse digest", digests[i], "err", err)
continue
}
if seqNum >= height {
digestMsg.Digests = append(digestMsg.Digests, digests[i])
}

}
return digestMsg
}

return pull.NewPullMediator(conf, adapter)
}

Expand Down Expand Up @@ -478,6 +501,7 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
}
return
}

if m.IsPullMsg() && m.GetPullMsgType() == proto.PullMsgType_BLOCK_MSG {
// If we don't have a StateInfo message from the peer,
// no way of validating its eligibility in the channel.
Expand Down Expand Up @@ -701,6 +725,13 @@ func (gc *gossipChannel) UpdateStateInfo(msg *proto.SignedGossipMessage) {
gc.stateInfoMsgStore.Add(msg)
gc.Lock()
defer gc.Unlock()

nodeMeta, err := common.FromBytes(msg.GetStateInfo().Metadata)
if err != nil {
gc.logger.Warning("Can't extract ledger height from metadata", err)
return
}
gc.ledgerHeight = nodeMeta.LedgerHeight
gc.stateInfoMsg = msg
atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1))
}
Expand Down
66 changes: 56 additions & 10 deletions gossip/gossip/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package channel
import (
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -400,10 +399,9 @@ func TestChannelPeriodicalPublishStateInfo(t *testing.T) {
msg = m
}

md := msg.GetStateInfo().Metadata
height, err := strconv.ParseInt(string(md), 10, 64)
nodeMeta, err := common.FromBytes(msg.GetStateInfo().Metadata)
assert.NoError(t, err, "ReceivedMetadata is invalid")
assert.Equal(t, ledgerHeight, int(height), "Received different ledger height than expected")
assert.Equal(t, ledgerHeight, int(nodeMeta.LedgerHeight), "Received different ledger height than expected")
}

func TestChannelMsgStoreEviction(t *testing.T) {
Expand Down Expand Up @@ -1097,7 +1095,9 @@ func TestChannelStateInfoSnapshot(t *testing.T) {
stateInfoMsg := &receivedMsg{PKIID: pkiIDInOrg1, msg: stateInfoSnapshotForChannel(channelA, createStateInfoMsg(4, pkiIDInOrg1, channelA))}
gc.HandleMessage(stateInfoMsg)
assert.NotEmpty(t, gc.GetPeers())
assert.Equal(t, "4", string(gc.GetPeers()[0].Metadata))
nodeMeta, err := common.FromBytes(gc.GetPeers()[0].Metadata)
assert.NoError(t, err)
assert.Equal(t, 4, int(nodeMeta.LedgerHeight))

// Check we don't respond to stateInfoSnapshot requests with wrong MAC
sMsg, _ := (&proto.GossipMessage{
Expand Down Expand Up @@ -1149,7 +1149,9 @@ func TestChannelStateInfoSnapshot(t *testing.T) {
assert.Len(t, elements, 1)
sMsg, err := elements[0].ToGossipMessage()
assert.NoError(t, err)
assert.Equal(t, []byte("4"), sMsg.GetStateInfo().Metadata)
nodeMeta, err := common.FromBytes(sMsg.GetStateInfo().Metadata)
assert.NoError(t, err)
assert.Equal(t, 4, int(nodeMeta.LedgerHeight))
}

// Ensure we don't crash if we got an invalid state info message
Expand Down Expand Up @@ -1615,6 +1617,44 @@ func TestOnDemandGossip(t *testing.T) {
}
}

func TestChannelPullWithDigestsFilter(t *testing.T) {
t.Parallel()
cs := &cryptoService{}
cs.On("VerifyBlock", mock.Anything).Return(nil)
receivedBlocksChan := make(chan *proto.SignedGossipMessage, 2)
adapter := new(gossipAdapterMock)
configureAdapter(adapter, discovery.NetworkMember{PKIid: pkiIDInOrg1})
adapter.On("Gossip", mock.Anything)
adapter.On("DeMultiplex", mock.Anything).Run(func(arg mock.Arguments) {
msg := arg.Get(0).(*proto.SignedGossipMessage)
if !msg.IsDataMsg() {
return
}
// The peer is supposed to de-multiplex 1 ledger block
assert.True(t, msg.IsDataMsg())
receivedBlocksChan <- msg
})
gc := NewGossipChannel(pkiIDInOrg1, orgInChannelA, cs, channelA, adapter, &joinChanMsg{})
go gc.HandleMessage(&receivedMsg{PKIID: pkiIDInOrg1, msg: createStateInfoMsg(100, pkiIDInOrg1, channelA)})

gc.UpdateStateInfo(createStateInfoMsg(11, pkiIDInOrg1, channelA))

var wg sync.WaitGroup
wg.Add(1)

pullPhase := simulatePullPhaseWithVariableDigest(gc, t, &wg, func(envelope *proto.Envelope) {}, []string{"10", "11"}, []string{"11"}, 11)
adapter.On("Send", mock.Anything, mock.Anything).Run(pullPhase)
wg.Wait()

select {
case <-time.After(time.Second * 5):
t.Fatal("Haven't received blocks on time")
case msg := <-receivedBlocksChan:
assert.Equal(t, uint64(11), msg.GetDataMsg().Payload.SeqNum)
}

}

func createDataUpdateMsg(nonce uint64, seqs ...uint64) *proto.SignedGossipMessage {
msg := &proto.GossipMessage{
Nonce: 0,
Expand Down Expand Up @@ -1669,13 +1709,15 @@ func dataMsgOfChannel(seqnum uint64, channel common.ChainID) *proto.SignedGossip
}

func createStateInfoMsg(ledgerHeight int, pkiID common.PKIidType, channel common.ChainID) *proto.SignedGossipMessage {
nodeMeta := common.NewNodeMetastate(uint64(ledgerHeight))
metaBytes, _ := nodeMeta.Bytes()
sMsg, _ := (&proto.GossipMessage{
Tag: proto.GossipMessage_CHAN_OR_ORG,
Content: &proto.GossipMessage_StateInfo{
StateInfo: &proto.StateInfo{
Channel_MAC: GenerateMAC(pkiID, channel),
Timestamp: &proto.PeerTime{IncNum: uint64(time.Now().UnixNano()), SeqNum: 1},
Metadata: []byte(fmt.Sprintf("%d", ledgerHeight)),
Metadata: metaBytes,
PkiId: []byte(pkiID),
},
},
Expand Down Expand Up @@ -1719,6 +1761,10 @@ func createDataMsg(seqnum uint64, channel common.ChainID) *proto.SignedGossipMes
}

func simulatePullPhase(gc GossipChannel, t *testing.T, wg *sync.WaitGroup, mutator msgMutator, seqs ...uint64) func(args mock.Arguments) {
return simulatePullPhaseWithVariableDigest(gc, t, wg, mutator, []string{"10", "11"}, []string{"10", "11"}, seqs...)
}

func simulatePullPhaseWithVariableDigest(gc GossipChannel, t *testing.T, wg *sync.WaitGroup, mutator msgMutator, proposedDigestSeqs []string, resultDigestSeqs []string, seqs ...uint64) func(args mock.Arguments) {
var l sync.Mutex
var sentHello bool
var sentReq bool
Expand All @@ -1735,7 +1781,7 @@ func simulatePullPhase(gc GossipChannel, t *testing.T, wg *sync.WaitGroup, mutat
Content: &proto.GossipMessage_DataDig{
DataDig: &proto.DataDigest{
MsgType: proto.PullMsgType_BLOCK_MSG,
Digests: []string{"10", "11"},
Digests: proposedDigestSeqs,
Nonce: msg.GetHello().Nonce,
},
},
Expand All @@ -1749,10 +1795,10 @@ func simulatePullPhase(gc GossipChannel, t *testing.T, wg *sync.WaitGroup, mutat
if msg.IsDataReq() && !sentReq {
sentReq = true
dataReq := msg.GetDataReq()
for _, expectedDigest := range []string{"10", "11"} {
for _, expectedDigest := range resultDigestSeqs {
assert.Contains(t, dataReq.Digests, expectedDigest)
}
assert.Equal(t, 2, len(dataReq.Digests))
assert.Equal(t, len(resultDigestSeqs), len(dataReq.Digests))
// When we send a data request, simulate a response of a data update
// from the imaginary peer that got the request
dataUpdateMsg := new(receivedMsg)
Expand Down
10 changes: 5 additions & 5 deletions gossip/gossip/gossip_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,11 +985,11 @@ func (g *gossipServiceImpl) createCertStorePuller() pull.Mediator {
g.logger.Info("Learned of a new certificate:", idMsg.Cert)
}
adapter := &pull.PullAdapter{
Sndr: g.comm,
MemSvc: g.disc,
IdExtractor: pkiIDFromMsg,
MsgCons: certConsumer,
DigFilter: g.sameOrgOrOurOrgPullFilter,
Sndr: g.comm,
MemSvc: g.disc,
IdExtractor: pkiIDFromMsg,
MsgCons: certConsumer,
EgressDigFilter: g.sameOrgOrOurOrgPullFilter,
}
return pull.NewPullMediator(conf, adapter)
}
Expand Down
43 changes: 28 additions & 15 deletions gossip/gossip/pull/pullstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,15 @@ type Config struct {
MsgType proto.PullMsgType
}

// DigestFilter filters digests to be sent to a remote peer, that
// IngressDigestFilter filters out entities in digests that are received from remote peers
type IngressDigestFilter func(digestMsg *proto.DataDigest) *proto.DataDigest

// EgressDigestFilter filters digests to be sent to a remote peer, that
// sent a hello with the following message
type DigestFilter func(helloMsg proto.ReceivedMessage) func(digestItem string) bool
type EgressDigestFilter func(helloMsg proto.ReceivedMessage) func(digestItem string) bool

// byContext converts this DigestFilter to an algo.DigestFilter
func (df DigestFilter) byContext() algo.DigestFilter {
// byContext converts this EgressDigFilter to an algo.DigestFilter
func (df EgressDigestFilter) byContext() algo.DigestFilter {
return func(context interface{}) func(digestItem string) bool {
return func(digestItem string) bool {
return df(context.(proto.ReceivedMessage))(digestItem)
Expand All @@ -71,11 +74,12 @@ func (df DigestFilter) byContext() algo.DigestFilter {
// PullAdapter defines methods of the pullStore to interact
// with various modules of gossip
type PullAdapter struct {
Sndr Sender
MemSvc MembershipService
IdExtractor proto.IdentifierExtractor
MsgCons proto.MsgConsumer
DigFilter DigestFilter
Sndr Sender
MemSvc MembershipService
IdExtractor proto.IdentifierExtractor
MsgCons proto.MsgConsumer
EgressDigFilter EgressDigestFilter
IngressDigFilter IngressDigestFilter
}

// Mediator is a component wrap a PullEngine and provides the methods
Expand Down Expand Up @@ -115,16 +119,16 @@ type pullMediatorImpl struct {

// NewPullMediator returns a new Mediator
func NewPullMediator(config Config, adapter *PullAdapter) Mediator {
digFilter := adapter.DigFilter
egressDigFilter := adapter.EgressDigFilter

acceptAllFilter := func(_ proto.ReceivedMessage) func(string) bool {
return func(_ string) bool {
return true
}
}

if digFilter == nil {
digFilter = acceptAllFilter
if egressDigFilter == nil {
egressDigFilter = acceptAllFilter
}

p := &pullMediatorImpl{
Expand All @@ -135,8 +139,16 @@ func NewPullMediator(config Config, adapter *PullAdapter) Mediator {
itemID2Msg: make(map[string]*proto.SignedGossipMessage),
}

p.engine = algo.NewPullEngineWithFilter(p, config.PullInterval, digFilter.byContext())
p.engine = algo.NewPullEngineWithFilter(p, config.PullInterval, egressDigFilter.byContext())

if adapter.IngressDigFilter == nil {
// Create accept all filter
adapter.IngressDigFilter = func(digestMsg *proto.DataDigest) *proto.DataDigest {
return digestMsg
}
}
return p

}

func (p *pullMediatorImpl) HandleMessage(m proto.ReceivedMessage) {
Expand All @@ -160,9 +172,10 @@ func (p *pullMediatorImpl) HandleMessage(m proto.ReceivedMessage) {
p.engine.OnHello(helloMsg.Nonce, m)
}
if digest := msg.GetDataDig(); digest != nil {
itemIDs = digest.Digests
d := p.PullAdapter.IngressDigFilter(digest)
itemIDs = d.Digests
pullMsgType = DigestMsgType
p.engine.OnDigest(digest.Digests, digest.Nonce, m)
p.engine.OnDigest(d.Digests, d.Nonce, m)
}
if req := msg.GetDataReq(); req != nil {
itemIDs = req.Digests
Expand Down
Loading

0 comments on commit 61e0907

Please sign in to comment.