Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions node/batcher/batcher_role.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ func (b *BatcherRole) runSecondary() {
b.Logger.Infof("Batcher %d acting as secondary (shard %d; primary %d)", b.ID, b.Shard, b.primary)
b.MemPool.Restart(false)
b.Metrics.currentRole.Store(2)
first := true

for {
out := b.BatchPuller.PullBatches(b.primary)
Expand Down Expand Up @@ -363,6 +364,10 @@ func (b *BatcherRole) runSecondary() {
b.Metrics.batchedTxsTotal.Add(uint64(len(requests)))
b.BatchAcker.Ack(baf.Seq(), b.primary)
b.seq++
if first {
b.MemPool.Restart(false) // resets the timestamps, just to give the new primary more time
first = false
}
}
}
}
Expand Down
16 changes: 9 additions & 7 deletions node/batcher/batcher_role_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,10 @@ func TestSecondaryChangeToPrimary(t *testing.T) {
}

require.Eventually(t, func() bool {
return pool.RestartCallCount() == 2
return pool.RestartCallCount() == 3
}, 10*time.Second, 10*time.Millisecond)

require.True(t, pool.RestartArgsForCall(1))
require.True(t, pool.RestartArgsForCall(2))

require.Eventually(t, func() bool {
return ledger.AppendCallCount() == 2
Expand All @@ -285,7 +285,8 @@ func TestSecondaryChangeToPrimary(t *testing.T) {
batcher.Stop()

require.False(t, pool.RestartArgsForCall(0))
require.True(t, pool.RestartArgsForCall(1))
require.False(t, pool.RestartArgsForCall(1))
require.True(t, pool.RestartArgsForCall(2))

require.Equal(t, arma_types.PartyID(1), ledger.HeightArgsForCall(0))
require.Equal(t, arma_types.PartyID(2), ledger.HeightArgsForCall(1))
Expand Down Expand Up @@ -354,7 +355,7 @@ func TestSecondaryChangeToSecondary(t *testing.T) {
}

require.Eventually(t, func() bool {
return pool.RestartCallCount() == 2
return pool.RestartCallCount() == 3
}, 10*time.Second, 10*time.Millisecond)

require.False(t, pool.RestartArgsForCall(1))
Expand Down Expand Up @@ -650,10 +651,10 @@ func TestResubmitPending(t *testing.T) {
}, 10*time.Second, 10*time.Millisecond)

require.Eventually(t, func() bool {
return pool.RestartCallCount() == 2
return pool.RestartCallCount() == 3
}, 10*time.Second, 10*time.Millisecond)

require.True(t, pool.RestartArgsForCall(1))
require.True(t, pool.RestartArgsForCall(2))

require.Eventually(t, func() bool {
return ledger.AppendCallCount() == 2
Expand All @@ -668,7 +669,8 @@ func TestResubmitPending(t *testing.T) {
batcher.Stop()

require.False(t, pool.RestartArgsForCall(0))
require.True(t, pool.RestartArgsForCall(1))
require.False(t, pool.RestartArgsForCall(1))
require.True(t, pool.RestartArgsForCall(2))

require.Equal(t, arma_types.PartyID(1), ledger.HeightArgsForCall(0))
require.Equal(t, arma_types.PartyID(2), ledger.HeightArgsForCall(1))
Expand Down
5 changes: 4 additions & 1 deletion node/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,10 @@ func TestBatcherComplainAndReqFwd(t *testing.T) {
}

// submit another request only to a secondary
batchers[2].Submit(context.Background(), tx.CreateStructuredRequest([]byte{3}))
require.Eventually(t, func() bool {
resp, err := batchers[2].Submit(context.Background(), tx.CreateStructuredRequest([]byte{3}))
return err == nil && resp.Error == ""
}, 30*time.Second, 10*time.Millisecond)

// after a timeout the request is forwarded
require.Eventually(t, func() bool {
Expand Down
4 changes: 2 additions & 2 deletions node/batcher/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ func createBatchers(t *testing.T, num int, shardID types.ShardID, batcherNodes [
BatchMaxBytes: 1024 * 1024 * 10,
RequestMaxBytes: 1024 * 1024,
SubmitTimeout: time.Millisecond * 500,
FirstStrikeThreshold: 10 * time.Second,
SecondStrikeThreshold: 10 * time.Second,
FirstStrikeThreshold: 2 * time.Second,
SecondStrikeThreshold: 5 * time.Second,
AutoRemoveTimeout: 10 * time.Second,
BatchCreationTimeout: time.Millisecond * 500,
BatchSequenceGap: types.BatchSequence(10),
Expand Down
27 changes: 25 additions & 2 deletions request/pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type PendingStore struct {
lastTick atomic.Value
lastEpochChange time.Time
lastProcessedGC time.Time
lastSecondStrikeTime atomic.Value
reqID2Bucket *sync.Map
currentBucket atomic.Value
buckets []*bucket
Expand All @@ -47,6 +48,7 @@ func (ps *PendingStore) Init() {
ps.reqID2Bucket = new(sync.Map)
ps.currentBucket.Store(newBucket(ps.reqID2Bucket, 1))
ps.lastTick.Store(ps.StartTime)
ps.lastSecondStrikeTime.Store(time.Time{})
ps.closeChan = make(chan struct{})
ps.closeOnce = sync.Once{}
ps.resetChan = make(chan struct{})
Expand Down Expand Up @@ -220,18 +222,31 @@ func (ps *PendingStore) checkSecondStrike(now time.Time) bool {
continue
}

if now.Sub(bucket.getFirstStrikeTimestamp()) <= ps.SecondStrikeThreshold {
secondStrike := ps.calcSecondStrike()

if now.Sub(bucket.getFirstStrikeTimestamp()) <= secondStrike {
continue
}

bucket.resetTimestamp(ps.now())
now := ps.now()
bucket.resetTimestamp(now)
detectedCensorship = true
ps.lastSecondStrikeTime.Store(now)
ps.Logger.Infof("Second strike occurred for bucket id %d of size %d (threshold is currently %s)", bucket.id, bucket.getSize(), secondStrike.String())
break
}

return detectedCensorship
}

func (ps *PendingStore) calcSecondStrike() time.Duration {
now := ps.now()
if now.Sub(ps.StartTime) <= 5*ps.SecondStrikeThreshold {
return 3 * ps.SecondStrikeThreshold
}
return ps.SecondStrikeThreshold
}

func (ps *PendingStore) rotateBuckets(now time.Time) {
currentBucket := ps.currentBucket.Load().(*bucket)

Expand Down Expand Up @@ -314,6 +329,10 @@ func (ps *PendingStore) Submit(request []byte) error {

reqID := ps.Inspector.RequestID(request)

if ps.now().Sub(ps.lastSecondStrike()) <= 3*ps.SecondStrikeThreshold {
return errors.Errorf("there was a second strike not long ago")
}

// Insertion may fail if we have a concurrent sealing of the bucket.
// In such a case, wait for a new un-sealed bucket to replace the current bucket.
for {
Expand All @@ -329,6 +348,10 @@ func (ps *PendingStore) now() time.Time {
return ps.lastTick.Load().(time.Time)
}

func (ps *PendingStore) lastSecondStrike() time.Time {
return ps.lastSecondStrikeTime.Load().(time.Time)
}

// GetAllRequests returns all stored requests in the same order of their arrival, the oldest one will be the first
func (ps *PendingStore) GetAllRequests(max uint64) [][]byte {
if !ps.isStopped() {
Expand Down
15 changes: 12 additions & 3 deletions test/batcher_consenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func TestBatcherFailuresAndRecoveryWithTwoShards(t *testing.T) {
}, 30*time.Second, 100*time.Millisecond)

// Submit a request to primary of shard 1
batchers1[1].Submit(context.Background(), tx.CreateStructuredRequest([]byte{3}))
require.Eventually(t, func() bool {
resp, err := batchers1[1].Submit(context.Background(), tx.CreateStructuredRequest([]byte{3}))
return err == nil && resp.Error == ""
}, 30*time.Second, 100*time.Millisecond)

// Verify the batchers created a batch in shard 1
require.Eventually(t, func() bool {
Expand All @@ -110,8 +113,14 @@ func TestBatcherFailuresAndRecoveryWithTwoShards(t *testing.T) {
}

// Submit another request only to a secondary in shard 0 and shard 1
batchers0[2].Submit(context.Background(), tx.CreateStructuredRequest([]byte{9}))
batchers1[2].Submit(context.Background(), tx.CreateStructuredRequest([]byte{8}))
require.Eventually(t, func() bool {
resp, err := batchers0[2].Submit(context.Background(), tx.CreateStructuredRequest([]byte{9}))
return err == nil && resp.Error == ""
}, 30*time.Second, 100*time.Millisecond)
require.Eventually(t, func() bool {
resp, err := batchers1[2].Submit(context.Background(), tx.CreateStructuredRequest([]byte{8}))
return err == nil && resp.Error == ""
}, 30*time.Second, 100*time.Millisecond)

// Verify the batchers created batches in shard 0 and shard 1
require.Eventually(t, func() bool {
Expand Down