Skip to content

Commit

Permalink
pull algorithm fix
Browse files Browse the repository at this point in the history
Had a bug in the pull algorithm:
The NONCE set was actually needed to be split into 2 diff sets,
because servicing sync requests has nothing to do with
initiating them, and the cleanup of initiated sync requests
effected sync requets that were serviced.

Added a unit test that successfully reproduced the bug,
and after I fixed it it always succeeds.

Change-Id: Ie99f41485a41ddb46e99d8fa7d8db408f669b20c
Signed-off-by: Yacov Manevich <yacovm@il.ibm.com>
  • Loading branch information
yacovm committed Oct 25, 2016
1 parent db22cdc commit a7f445f
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 40 deletions.
70 changes: 46 additions & 24 deletions gossip/gossip/algo/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,28 @@ import (
*/

const (
DEF_DIGEST_WAIT_TIME = time.Duration(4) * time.Second
DEF_REQUEST_WAIT_TIME = time.Duration(4) * time.Second
DEF_RESPONSE_WAIT_TIME = time.Duration(7) * time.Second
)

func init() {
rand.Seed(42)
}

var defaultDigestWaitTime = DEF_DIGEST_WAIT_TIME
var defaultRequestWaitTime = DEF_REQUEST_WAIT_TIME
var defaultResponseWaitTime = DEF_RESPONSE_WAIT_TIME
var digestWaitTime = time.Duration(4) * time.Second
var requestWaitTime = time.Duration(4) * time.Second
var responseWaitTime = time.Duration(7) * time.Second

// SetDigestWaitTime sets the digest wait time
func SetDigestWaitTime(time time.Duration) {
digestWaitTime = time
}

// SetRequestWaitTime sets the request wait time
func SetRequestWaitTime(time time.Duration) {
requestWaitTime = time
}

// SetResponseWaitTime sets the response wait time
func SetResponseWaitTime(time time.Duration) {
responseWaitTime = time
}

// PullAdapter is needed by the PullEngine in order to
// send messages to the remote PullEngine instances.
Expand Down Expand Up @@ -83,6 +92,8 @@ type PullAdapter interface {
SendRes(items []uint64, context interface{}, nonce uint64)
}

// PullEngine is the component that actually invokes the pull algorithm
// with the help of the PullAdapter
type PullEngine struct {
PullAdapter
stopFlag int32
Expand All @@ -93,20 +104,24 @@ type PullEngine struct {
acceptingDigests int32
acceptingResponses int32
lock sync.Mutex
nonces *util.Set
outgoingNONCES *util.Set
incomingNONCES *util.Set
}

// NewPullEngine creates an instance of a PullEngine with a certain sleep time
// between pull initiations
func NewPullEngine(participant PullAdapter, sleepTime time.Duration) *PullEngine {
engine := &PullEngine{
PullAdapter: participant,
PullAdapter: participant,
stopFlag: int32(0),
state: util.NewSet(),
item2owners: make(map[uint64][]string),
peers2nonces: make(map[string]uint64),
nonces2peers: make(map[uint64]string),
acceptingDigests: int32(0),
acceptingResponses: int32(0),
nonces: util.NewSet(),
incomingNONCES: util.NewSet(),
outgoingNONCES: util.NewSet(),
}

go func() {
Expand Down Expand Up @@ -144,6 +159,7 @@ func (engine *PullEngine) ignoreDigests() {
atomic.StoreInt32(&(engine.acceptingDigests), int32(0))
}

// Stop stops the engine
func (engine *PullEngine) Stop() {
atomic.StoreInt32(&(engine.stopFlag), int32(1))
}
Expand All @@ -155,13 +171,13 @@ func (engine *PullEngine) initiatePull() {
engine.acceptDigests()
for _, peer := range engine.SelectPeers() {
nonce := engine.newNONCE()
engine.nonces.Add(nonce)
engine.outgoingNONCES.Add(nonce)
engine.nonces2peers[nonce] = peer
engine.peers2nonces[peer] = nonce
engine.Hello(peer, nonce)
}

time.AfterFunc(defaultDigestWaitTime, func() {
time.AfterFunc(digestWaitTime, func() {
engine.processIncomingDigests()
})
}
Expand Down Expand Up @@ -189,7 +205,7 @@ func (engine *PullEngine) processIncomingDigests() {
engine.SendReq(dest, seqsToReq, engine.peers2nonces[dest])
}

time.AfterFunc(defaultResponseWaitTime, engine.endPull)
time.AfterFunc(responseWaitTime, engine.endPull)

}

Expand All @@ -198,15 +214,16 @@ func (engine *PullEngine) endPull() {
defer engine.lock.Unlock()

atomic.StoreInt32(&(engine.acceptingResponses), int32(0))
engine.nonces.Clear()
engine.outgoingNONCES.Clear()

engine.item2owners = make(map[uint64][]string)
engine.peers2nonces = make(map[string]uint64)
engine.nonces2peers = make(map[uint64]string)
}

// OnDigest notifies the engine that a digest has arrived
func (engine *PullEngine) OnDigest(digest []uint64, nonce uint64, context interface{}) {
if !engine.isAcceptingDigests() || !engine.nonces.Exists(nonce) {
if !engine.isAcceptingDigests() || !engine.outgoingNONCES.Exists(nonce) {
return
}

Expand All @@ -226,22 +243,25 @@ func (engine *PullEngine) OnDigest(digest []uint64, nonce uint64, context interf
}
}

// Add adds items to the state
func (engine *PullEngine) Add(seqs ...uint64) {
for _, seq := range seqs {
engine.state.Add(seq)
}
}

// Remove removes items from the state
func (engine *PullEngine) Remove(seqs ...uint64) {
for _, seq := range seqs {
engine.state.Remove(seq)
}
}

// OnHello notifies the engine a hello has arrived
func (engine *PullEngine) OnHello(nonce uint64, context interface{}) {
engine.nonces.Add(nonce)
time.AfterFunc(defaultRequestWaitTime, func() {
engine.nonces.Remove(nonce)
engine.incomingNONCES.Add(nonce)
time.AfterFunc(requestWaitTime, func() {
engine.incomingNONCES.Remove(nonce)
})

a := engine.state.ToArray()
Expand All @@ -252,14 +272,15 @@ func (engine *PullEngine) OnHello(nonce uint64, context interface{}) {
engine.SendDigest(digest, nonce, context)
}

// OnReq notifies the engine a request has arrived
func (engine *PullEngine) OnReq(items []uint64, nonce uint64, context interface{}) {
if !engine.nonces.Exists(nonce) {
if !engine.incomingNONCES.Exists(nonce) {
return
}
engine.lock.Lock()
defer engine.lock.Unlock()

items2Send := make([]uint64, 0)
var items2Send []uint64
for _, item := range items {
if engine.state.Exists(item) {
items2Send = append(items2Send, item)
Expand All @@ -269,8 +290,9 @@ func (engine *PullEngine) OnReq(items []uint64, nonce uint64, context interface{
engine.SendRes(items2Send, context, nonce)
}

// OnRes notifies the engine a response has arrived
func (engine *PullEngine) OnRes(items []uint64, nonce uint64) {
if !engine.nonces.Exists(nonce) || !engine.isAcceptingResponses() {
if !engine.outgoingNONCES.Exists(nonce) || !engine.isAcceptingResponses() {
return
}

Expand All @@ -281,7 +303,7 @@ func (engine *PullEngine) newNONCE() uint64 {
n := uint64(0)
for {
n = uint64(rand.Int63())
if !engine.nonces.Exists(n) {
if !engine.outgoingNONCES.Exists(n) {
return n
}
}
Expand Down
71 changes: 55 additions & 16 deletions gossip/gossip/algo/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ import (
"testing"
"time"

"fmt"
"sync/atomic"

"github.com/hyperledger/fabric/gossip/util"
"github.com/stretchr/testify/assert"
"sync/atomic"
)

func init() {
defaultRequestWaitTime = time.Duration(50) * time.Millisecond
defaultDigestWaitTime = time.Duration(20) * time.Millisecond
defaultResponseWaitTime = time.Duration(50) * time.Millisecond

requestWaitTime = time.Duration(50) * time.Millisecond
digestWaitTime = time.Duration(20) * time.Millisecond
responseWaitTime = time.Duration(50) * time.Millisecond
}

type messageHook func(interface{})
Expand Down Expand Up @@ -78,15 +79,14 @@ func newPushPullTestInstance(name string, peers map[string]*pullTestInstance) *p
name: name,
}

inst.PullEngine = NewPullEngine(inst, time.Duration(500)*time.Millisecond)
inst.PullEngine = NewPullEngine(inst, time.Duration(100)*time.Millisecond)

peers[name] = inst
go func() {
for {
select {
case <-inst.stopChan:
return
break
case m := <-inst.msgQueue:
inst.handleMessage(m)
break
Expand Down Expand Up @@ -207,6 +207,31 @@ func TestPullEngine_Stop(t *testing.T) {
assert.Equal(t, len1, len2, "PullEngine was still active after Stop() was invoked!")
}

func TestPullEngineAll2AllWithIncrementalSpawning(t *testing.T) {
// Scenario: spawn 10 nodes, each 50 ms after the other
// and have them transfer data between themselves.
// Expected outcome: obviously, everything should succeed.
// Isn't that's why we're here?
instanceCount := 10
peers := make(map[string]*pullTestInstance)

for i := 0; i < instanceCount; i++ {
inst := newPushPullTestInstance(fmt.Sprintf("p%d", i+1), peers)
inst.Add(uint64(i + 1))
time.Sleep(time.Duration(50) * time.Millisecond)
}
for i := 0; i < instanceCount; i++ {
pID := fmt.Sprintf("p%d", i+1)
peers[pID].setNextPeerSelection(keySet(pID, peers))
}
time.Sleep(time.Duration(500) * time.Millisecond)

for i := 0; i < instanceCount; i++ {
pID := fmt.Sprintf("p%d", i+1)
assert.Equal(t, instanceCount, len(peers[pID].state.ToArray()))
}
}

func TestPullEngineSelectiveUpdates(t *testing.T) {
// Scenario: inst1 has {1, 3} and inst2 has {0,1,2,3}.
// inst1 initiates to inst2
Expand Down Expand Up @@ -254,7 +279,7 @@ func TestPullEngineSelectiveUpdates(t *testing.T) {

inst1.setNextPeerSelection([]string{"p2"})

time.Sleep(time.Duration(800) * time.Millisecond)
time.Sleep(time.Duration(200) * time.Millisecond)
assert.Equal(t, len(inst2.state.ToArray()), len(inst1.state.ToArray()))
}

Expand Down Expand Up @@ -301,7 +326,7 @@ func TestByzantineResponder(t *testing.T) {

inst1.setNextPeerSelection([]string{"p2"})

time.Sleep(time.Duration(800) * time.Millisecond)
time.Sleep(time.Duration(200) * time.Millisecond)

assert.Equal(t, int32(1), atomic.LoadInt32(&receivedDigestFromInst3), "inst1 hasn't received a digest from inst3")

Expand Down Expand Up @@ -333,7 +358,7 @@ func TestMultipleInitiators(t *testing.T) {
inst2.setNextPeerSelection([]string{"p4"})
inst3.setNextPeerSelection([]string{"p4"})

time.Sleep(time.Duration(800) * time.Millisecond)
time.Sleep(time.Duration(200) * time.Millisecond)

for _, inst := range []*pullTestInstance{inst1, inst2, inst3} {
assert.True(t, util.IndexInSlice(inst.state.ToArray(), uint64(1), numericCompare) != -1)
Expand Down Expand Up @@ -362,7 +387,7 @@ func TestLatePeers(t *testing.T) {
})
inst1.setNextPeerSelection([]string{"p2", "p3"})

time.Sleep(time.Duration(800) * time.Millisecond)
time.Sleep(time.Duration(200) * time.Millisecond)

assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(1), numericCompare) == -1)
assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(2), numericCompare) == -1)
Expand Down Expand Up @@ -391,7 +416,7 @@ func TestBiDiUpdates(t *testing.T) {
inst1.setNextPeerSelection([]string{"p2"})
inst2.setNextPeerSelection([]string{"p1"})

time.Sleep(time.Duration(800) * time.Millisecond)
time.Sleep(time.Duration(200) * time.Millisecond)

assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(0), numericCompare) != -1)
assert.True(t, util.IndexInSlice(inst1.state.ToArray(), uint64(1), numericCompare) != -1)
Expand Down Expand Up @@ -453,14 +478,14 @@ func TestSpread(t *testing.T) {

inst1.setNextPeerSelection([]string{"p2", "p3", "p4"})

time.Sleep(time.Duration(800) * time.Millisecond)
time.Sleep(time.Duration(200) * time.Millisecond)

lock.Lock()
for p_i, counter := range chooseCounters {
if p_i == "p5" {
for pI, counter := range chooseCounters {
if pI == "p5" {
assert.Equal(t, 0, counter)
} else {
assert.True(t, counter > 0, "%s was not selected!", p_i)
assert.True(t, counter > 0, "%s was not selected!", pI)
}
}
lock.Unlock()
Expand All @@ -470,3 +495,17 @@ func TestSpread(t *testing.T) {
func numericCompare(a interface{}, b interface{}) bool {
return a.(uint64) == b.(uint64)
}

func keySet(selfPeer string, m map[string]*pullTestInstance) []string {
peers := make([]string, len(m)-1)
i := 0
for pID := range m {
if pID == selfPeer {
continue
}
peers[i] = pID
i++
}

return peers
}

0 comments on commit a7f445f

Please sign in to comment.