Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions node/batcher/batcher_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func createMemPool(b *Batcher, config *node_config.BatcherNodeConfig) MemPool {
BatchMaxSize: config.BatchMaxSize,
BatchMaxSizeBytes: config.BatchMaxBytes,
RequestMaxBytes: config.RequestMaxBytes,
BatchTimeout: config.BatchCreationTimeout,
SubmitTimeout: config.SubmitTimeout,
FirstStrikeThreshold: config.FirstStrikeThreshold,
SecondStrikeThreshold: config.SecondStrikeThreshold,
Expand Down
24 changes: 21 additions & 3 deletions request/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package request
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -61,6 +62,7 @@ type PoolOptions struct {
BatchMaxSize uint32
BatchMaxSizeBytes uint32
RequestMaxBytes uint64
BatchTimeout time.Duration
SubmitTimeout time.Duration
FirstStrikeThreshold time.Duration
SecondStrikeThreshold time.Duration
Expand All @@ -69,6 +71,11 @@ type PoolOptions struct {

// NewPool constructs a new requests pool
func NewPool(logger types.Logger, inspector RequestInspector, options PoolOptions, striker Striker) *Pool {
if options.FirstStrikeThreshold < 2*options.BatchTimeout {
logger.Warnf("FirstStrikeThreshold should be at least 2*BatchTimeout")
return nil
}

rp := &Pool{
logger: logger,
inspector: inspector,
Expand All @@ -90,21 +97,32 @@ func (rp *Pool) start() {
rp.pending.Start()
}

func (rp *Pool) addRandomnessToFirstStrike() time.Duration {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets make sure that in the worse case the first strike is still > BatchTimeout. IE compare the BatchTimeout with the FirstStrikeThreshold. It seems to me the FirstStrikeThreshold should be at least 5*BatchTimeout or so...

Even without randomization, if FirstStrikeThreshold < BatchTimeout + network-delay we'll get constant forwarding. So some sanity checks are required.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add checks
Currently the batch timeout is 500ms and we are running tests with first strike of 2s

if rp.options.BatchTimeout.Milliseconds() == 0 {
return time.Duration(0) // no randomness
}
return time.Duration(randRange(int(2*rp.options.BatchTimeout.Milliseconds()), int(-2*rp.options.BatchTimeout.Milliseconds()))) * time.Millisecond
}

func randRange(max, min int) int {
return rand.Intn(max-min) + min
}

func (rp *Pool) createPendingStore() *PendingStore {
return &PendingStore{
Inspector: rp.inspector,
ReqIDGCInterval: rp.options.AutoRemoveTimeout / 4,
ReqIDLifetime: rp.options.AutoRemoveTimeout,
Time: time.NewTicker(time.Second).C,
Time: time.NewTicker(rp.options.FirstStrikeThreshold / 10).C,
StartTime: time.Now(),
Logger: rp.logger,
SecondStrikeThreshold: rp.options.SecondStrikeThreshold,
FirstStrikeThreshold: rp.options.FirstStrikeThreshold,
FirstStrikeThreshold: rp.options.FirstStrikeThreshold + rp.addRandomnessToFirstStrike(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is adding randomness only once, when the pool is created. Wouldn't it make more sense to do it per bucket, or would that be too much? what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be too much

OnDelete: func(key string) {
rp.semaphore.Release(1)
atomic.AddInt64(&rp.size, -1)
},
Epoch: time.Second,
Epoch: rp.options.FirstStrikeThreshold / 10,
FirstStrikeCallback: rp.striker.OnFirstStrikeTimeout,
SecondStrikeCallback: rp.striker.OnSecondStrikeTimeout,
}
Expand Down
9 changes: 9 additions & 0 deletions request/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ func TestRestartPool(t *testing.T) {
SubmitTimeout: time.Second * 10,
}, &striker{})

assert.Equal(t, 5*time.Second, pool.pending.FirstStrikeThreshold)

pool.Restart(true)

count := 100
Expand Down Expand Up @@ -214,6 +216,8 @@ func TestBasicBatching(t *testing.T) {
SubmitTimeout: time.Second * 10,
}, &striker{})

assert.Equal(t, 5*time.Second, pool.pending.FirstStrikeThreshold)

pool.Restart(true)

ctx, cancel1 := context.WithTimeout(context.Background(), time.Second)
Expand Down Expand Up @@ -253,8 +257,13 @@ func TestBasicBatching(t *testing.T) {
RequestMaxBytes: 100 * 1024,
AutoRemoveTimeout: time.Second * 10,
SubmitTimeout: time.Second * 10,
BatchTimeout: time.Second,
}, &striker{})

t.Logf("First strike with random is %f seconds\n", pool.pending.FirstStrikeThreshold.Seconds())
assert.GreaterOrEqual(t, 7*time.Second, pool.pending.FirstStrikeThreshold)
assert.LessOrEqual(t, 3*time.Second, pool.pending.FirstStrikeThreshold)

pool.Restart(true)

assert.NoError(t, pool.Submit(byteReq4))
Expand Down