diff --git a/gossip/gossip/algo/pull.go b/gossip/gossip/algo/pull.go index d7b23c35eec..7339cc11473 100644 --- a/gossip/gossip/algo/pull.go +++ b/gossip/gossip/algo/pull.go @@ -43,19 +43,28 @@ import ( */ -const ( - DEF_DIGEST_WAIT_TIME = time.Duration(4) * time.Second - DEF_REQUEST_WAIT_TIME = time.Duration(4) * time.Second - DEF_RESPONSE_WAIT_TIME = time.Duration(7) * time.Second -) - func init() { rand.Seed(42) } -var defaultDigestWaitTime = DEF_DIGEST_WAIT_TIME -var defaultRequestWaitTime = DEF_REQUEST_WAIT_TIME -var defaultResponseWaitTime = DEF_RESPONSE_WAIT_TIME +var digestWaitTime = time.Duration(4) * time.Second +var requestWaitTime = time.Duration(4) * time.Second +var responseWaitTime = time.Duration(7) * time.Second + +// SetDigestWaitTime sets the digest wait time +func SetDigestWaitTime(time time.Duration) { + digestWaitTime = time +} + +// SetRequestWaitTime sets the request wait time +func SetRequestWaitTime(time time.Duration) { + requestWaitTime = time +} + +// SetResponseWaitTime sets the response wait time +func SetResponseWaitTime(time time.Duration) { + responseWaitTime = time +} // PullAdapter is needed by the PullEngine in order to // send messages to the remote PullEngine instances. @@ -83,6 +92,8 @@ type PullAdapter interface { SendRes(items []uint64, context interface{}, nonce uint64) } +// PullEngine is the component that actually invokes the pull algorithm +// with the help of the PullAdapter type PullEngine struct { PullAdapter stopFlag int32 @@ -93,12 +104,15 @@ type PullEngine struct { acceptingDigests int32 acceptingResponses int32 lock sync.Mutex - nonces *util.Set + outgoingNONCES *util.Set + incomingNONCES *util.Set } +// NewPullEngine creates an instance of a PullEngine with a certain sleep time +// between pull initiations func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine { engine := &PullEngine{ - PullAdapter: participant, + PullAdapter: participant, stopFlag: int32(0), state: util.NewSet(), item2owners: make(map[uint64][]string), @@ -106,7 +120,8 @@ func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine nonces2peers: make(map[uint64]string), acceptingDigests: int32(0), acceptingResponses: int32(0), - nonces: util.NewSet(), + incomingNONCES: util.NewSet(), + outgoingNONCES: util.NewSet(), } go func() { @@ -144,6 +159,7 @@ func (engine *PullEngine) ignoreDigests() { atomic.StoreInt32(&(engine.acceptingDigests), int32(0)) } +// Stop stops the engine func (engine *PullEngine) Stop() { atomic.StoreInt32(&(engine.stopFlag), int32(1)) } @@ -155,13 +171,13 @@ func (engine *PullEngine) initiatePull() { engine.acceptDigests() for _, peer := range engine.SelectPeers() { nonce := engine.newNONCE() - engine.nonces.Add(nonce) + engine.outgoingNONCES.Add(nonce) engine.nonces2peers[nonce] = peer engine.peers2nonces[peer] = nonce engine.Hello(peer, nonce) } - time.AfterFunc(defaultDigestWaitTime, func() { + time.AfterFunc(digestWaitTime, func() { engine.processIncomingDigests() }) } @@ -189,7 +205,7 @@ func (engine *PullEngine) processIncomingDigests() { engine.SendReq(dest, seqsToReq, engine.peers2nonces[dest]) } - time.AfterFunc(defaultResponseWaitTime, engine.endPull) + time.AfterFunc(responseWaitTime, engine.endPull) } @@ -198,15 +214,16 @@ func (engine *PullEngine) endPull() { defer engine.lock.Unlock() atomic.StoreInt32(&(engine.acceptingResponses), int32(0)) - engine.nonces.Clear() + engine.outgoingNONCES.Clear() engine.item2owners = make(map[uint64][]string) engine.peers2nonces = make(map[string]uint64) engine.nonces2peers = make(map[uint64]string) } +// OnDigest notifies the engine that a digest has arrived func (engine *PullEngine) OnDigest(digest []uint64, nonce uint64, context interface{}) { - if !engine.isAcceptingDigests() || !engine.nonces.Exists(nonce) { + if !engine.isAcceptingDigests() || !engine.outgoingNONCES.Exists(nonce) { return } @@ -226,22 +243,25 @@ func (engine *PullEngine) OnDigest(digest []uint64, nonce uint64, context interf } } +// Add adds items to the state func (engine *PullEngine) Add(seqs ...uint64) { for _, seq := range seqs { engine.state.Add(seq) } } +// Remove removes items from the state func (engine *PullEngine) Remove(seqs ...uint64) { for _, seq := range seqs { engine.state.Remove(seq) } } +// OnHello notifies the engine a hello has arrived func (engine *PullEngine) OnHello(nonce uint64, context interface{}) { - engine.nonces.Add(nonce) - time.AfterFunc(defaultRequestWaitTime, func() { - engine.nonces.Remove(nonce) + engine.incomingNONCES.Add(nonce) + time.AfterFunc(requestWaitTime, func() { + engine.incomingNONCES.Remove(nonce) }) a := engine.state.ToArray() @@ -252,14 +272,15 @@ func (engine *PullEngine) OnHello(nonce uint64, context interface{}) { engine.SendDigest(digest, nonce, context) } +// OnReq notifies the engine a request has arrived func (engine *PullEngine) OnReq(items []uint64, nonce uint64, context interface{}) { - if !engine.nonces.Exists(nonce) { + if !engine.incomingNONCES.Exists(nonce) { return } engine.lock.Lock() defer engine.lock.Unlock() - items2Send := make([]uint64, 0) + var items2Send []uint64 for _, item := range items { if engine.state.Exists(item) { items2Send = append(items2Send, item) @@ -269,8 +290,9 @@ func (engine *PullEngine) OnReq(items []uint64, nonce uint64, context interface{ engine.SendRes(items2Send, context, nonce) } +// OnRes notifies the engine a response has arrived func (engine *PullEngine) OnRes(items []uint64, nonce uint64) { - if !engine.nonces.Exists(nonce) || !engine.isAcceptingResponses() { + if !engine.outgoingNONCES.Exists(nonce) || !engine.isAcceptingResponses() { return } @@ -281,7 +303,7 @@ func (engine *PullEngine) newNONCE() uint64 { n := uint64(0) for { n = uint64(rand.Int63()) - if !engine.nonces.Exists(n) { + if !engine.outgoingNONCES.Exists(n) { return n } } diff --git a/gossip/gossip/algo/pull_test.go b/gossip/gossip/algo/pull_test.go index cd76b908975..ac9b749d277 100644 --- a/gossip/gossip/algo/pull_test.go +++ b/gossip/gossip/algo/pull_test.go @@ -21,16 +21,17 @@ import ( "testing" "time" + "fmt" + "sync/atomic" + "github.com/hyperledger/fabric/gossip/util" "github.com/stretchr/testify/assert" - "sync/atomic" ) func init() { - defaultRequestWaitTime = time.Duration(50) * time.Millisecond - defaultDigestWaitTime = time.Duration(20) * time.Millisecond - defaultResponseWaitTime = time.Duration(50) * time.Millisecond - + requestWaitTime = time.Duration(50) * time.Millisecond + digestWaitTime = time.Duration(20) * time.Millisecond + responseWaitTime = time.Duration(50) * time.Millisecond } type messageHook func(interface{}) @@ -78,7 +79,7 @@ func newPushPullTestInstance(name string, peers map[string]*pullTestInstance) *p name: name, } - inst.PullEngine = NewPullEngine(inst, time.Duration(500)*time.Millisecond) + inst.PullEngine = NewPullEngine(inst, time.Duration(100)*time.Millisecond) peers[name] = inst go func() { @@ -86,7 +87,6 @@ func newPushPullTestInstance(name string, peers map[string]*pullTestInstance) *p select { case <-inst.stopChan: return - break case m := <-inst.msgQueue: inst.handleMessage(m) break @@ -207,6 +207,31 @@ func TestPullEngine_Stop(t *testing.T) { assert.Equal(t, len1, len2, "PullEngine was still active after Stop() was invoked!") } +func TestPullEngineAll2AllWithIncrementalSpawning(t *testing.T) { + // Scenario: spawn 10 nodes, each 50 ms after the other + // and have them transfer data between themselves. + // Expected outcome: obviously, everything should succeed. + // Isn't that's why we're here? + instanceCount := 10 + peers := make(map[string]*pullTestInstance) + + for i := 0; i < instanceCount; i++ { + inst := newPushPullTestInstance(fmt.Sprintf("p%d", i+1), peers) + inst.Add(uint64(i + 1)) + time.Sleep(time.Duration(50) * time.Millisecond) + } + for i := 0; i < instanceCount; i++ { + pID := fmt.Sprintf("p%d", i+1) + peers[pID].setNextPeerSelection(keySet(pID, peers)) + } + time.Sleep(time.Duration(500) * time.Millisecond) + + for i := 0; i < instanceCount; i++ { + pID := fmt.Sprintf("p%d", i+1) + assert.Equal(t, instanceCount, len(peers[pID].state.ToArray())) + } +} + func TestPullEngineSelectiveUpdates(t *testing.T) { // Scenario: inst1 has {1, 3} and inst2 has {0,1,2,3}. // inst1 initiates to inst2 @@ -254,7 +279,7 @@ func TestPullEngineSelectiveUpdates(t *testing.T) { inst1.setNextPeerSelection([]string{"p2"}) - time.Sleep(time.Duration(800) * time.Millisecond) + time.Sleep(time.Duration(200) * time.Millisecond) assert.Equal(t, len(inst2.state.ToArray()), len(inst1.state.ToArray())) } @@ -301,7 +326,7 @@ func TestByzantineResponder(t *testing.T) { inst1.setNextPeerSelection([]string{"p2"}) - time.Sleep(time.Duration(800) * time.Millisecond) + time.Sleep(time.Duration(200) * time.Millisecond) assert.Equal(t, int32(1), atomic.LoadInt32(&receivedDigestFromInst3), "inst1 hasn't received a digest from inst3") @@ -333,7 +358,7 @@ func TestMultipleInitiators(t *testing.T) { inst2.setNextPeerSelection([]string{"p4"}) inst3.setNextPeerSelection([]string{"p4"}) - time.Sleep(time.Duration(800) * time.Millisecond) + time.Sleep(time.Duration(200) * time.Millisecond) for _, inst := range []*pullTestInstance{inst1, inst2, inst3} { assert.True(t, util.IndexInSlice(inst.state.ToArray(), uint64(1), numericCompare) != -1) @@ -362,7 +387,7 @@ func TestLatePeers(t *testing.T) { }) inst1.setNextPeerSelection([]string{"p2", "p3"}) - time.Sleep(time.Duration(800) * time.Millisecond) + time.Sleep(time.Duration(200) * time.Millisecond) assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(1), numericCompare) == -1) assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(2), numericCompare) == -1) @@ -391,7 +416,7 @@ func TestBiDiUpdates(t *testing.T) { inst1.setNextPeerSelection([]string{"p2"}) inst2.setNextPeerSelection([]string{"p1"}) - time.Sleep(time.Duration(800) * time.Millisecond) + time.Sleep(time.Duration(200) * time.Millisecond) assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(0), numericCompare) != -1) assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(1), numericCompare) != -1) @@ -453,14 +478,14 @@ func TestSpread(t *testing.T) { inst1.setNextPeerSelection([]string{"p2", "p3", "p4"}) - time.Sleep(time.Duration(800) * time.Millisecond) + time.Sleep(time.Duration(200) * time.Millisecond) lock.Lock() - for p_i, counter := range chooseCounters { - if p_i == "p5" { + for pI, counter := range chooseCounters { + if pI == "p5" { assert.Equal(t, 0, counter) } else { - assert.True(t, counter > 0, "%s was not selected!", p_i) + assert.True(t, counter > 0, "%s was not selected!", pI) } } lock.Unlock() @@ -470,3 +495,17 @@ func TestSpread(t *testing.T) { func numericCompare(a interface{}, b interface{}) bool { return a.(uint64) == b.(uint64) } + +func keySet(selfPeer string, m map[string]*pullTestInstance) []string { + peers := make([]string, len(m)-1) + i := 0 + for pID := range m { + if pID == selfPeer { + continue + } + peers[i] = pID + i++ + } + + return peers +}