From 89285673ed2e8c38e56a76be8cb3e27d8277cb0e Mon Sep 17 00:00:00 2001 From: Hagar Meir Date: Mon, 15 Sep 2025 21:20:35 +0300 Subject: [PATCH 1/4] configure mem pool buckets rotation Signed-off-by: Hagar Meir --- request/pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/request/pool.go b/request/pool.go index 7a64529b..8942afb7 100644 --- a/request/pool.go +++ b/request/pool.go @@ -95,7 +95,7 @@ func (rp *Pool) createPendingStore() *PendingStore { Inspector: rp.inspector, ReqIDGCInterval: rp.options.AutoRemoveTimeout / 4, ReqIDLifetime: rp.options.AutoRemoveTimeout, - Time: time.NewTicker(time.Second).C, + Time: time.NewTicker(rp.options.FirstStrikeThreshold / 10).C, StartTime: time.Now(), Logger: rp.logger, SecondStrikeThreshold: rp.options.SecondStrikeThreshold, @@ -104,7 +104,7 @@ func (rp *Pool) createPendingStore() *PendingStore { rp.semaphore.Release(1) atomic.AddInt64(&rp.size, -1) }, - Epoch: time.Second, + Epoch: rp.options.FirstStrikeThreshold / 10, FirstStrikeCallback: rp.striker.OnFirstStrikeTimeout, SecondStrikeCallback: rp.striker.OnSecondStrikeTimeout, } From 75d36b521b9e81f2bd629072c4c1cbc361592c9f Mon Sep 17 00:00:00 2001 From: Hagar Meir Date: Wed, 17 Sep 2025 11:23:08 +0300 Subject: [PATCH 2/4] add randomness Signed-off-by: Hagar Meir --- request/pool.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/request/pool.go b/request/pool.go index 8942afb7..1af4555f 100644 --- a/request/pool.go +++ b/request/pool.go @@ -9,6 +9,7 @@ package request import ( "context" "fmt" + "math/rand" "sync" "sync/atomic" "time" @@ -104,7 +105,7 @@ func (rp *Pool) createPendingStore() *PendingStore { rp.semaphore.Release(1) atomic.AddInt64(&rp.size, -1) }, - Epoch: rp.options.FirstStrikeThreshold / 10, + Epoch: time.Duration(rand.Intn(50))*time.Millisecond + rp.options.FirstStrikeThreshold/10, FirstStrikeCallback: rp.striker.OnFirstStrikeTimeout, SecondStrikeCallback: rp.striker.OnSecondStrikeTimeout, } From 0d5a150b6fe796a7e7eb5c214b87d3a571c817a1 Mon Sep 17 00:00:00 2001 From: Hagar Meir Date: Sun, 21 Sep 2025 11:44:03 +0300 Subject: [PATCH 3/4] address review comments Signed-off-by: Hagar Meir --- node/batcher/batcher_builder.go | 1 + request/pool.go | 16 ++++++++++++++-- request/pool_test.go | 9 +++++++++ 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/node/batcher/batcher_builder.go b/node/batcher/batcher_builder.go index f26438d9..eb2851da 100644 --- a/node/batcher/batcher_builder.go +++ b/node/batcher/batcher_builder.go @@ -127,6 +127,7 @@ func createMemPool(b *Batcher, config *node_config.BatcherNodeConfig) MemPool { BatchMaxSize: config.BatchMaxSize, BatchMaxSizeBytes: config.BatchMaxBytes, RequestMaxBytes: config.RequestMaxBytes, + BatchTimeout: config.BatchCreationTimeout, SubmitTimeout: config.SubmitTimeout, FirstStrikeThreshold: config.FirstStrikeThreshold, SecondStrikeThreshold: config.SecondStrikeThreshold, diff --git a/request/pool.go b/request/pool.go index 1af4555f..34febe6c 100644 --- a/request/pool.go +++ b/request/pool.go @@ -62,6 +62,7 @@ type PoolOptions struct { BatchMaxSize uint32 BatchMaxSizeBytes uint32 RequestMaxBytes uint64 + BatchTimeout time.Duration SubmitTimeout time.Duration FirstStrikeThreshold time.Duration SecondStrikeThreshold time.Duration @@ -91,6 +92,17 @@ func (rp *Pool) start() { rp.pending.Start() } +func (rp *Pool) addRandomnessToFirstStrike() time.Duration { + if rp.options.BatchTimeout.Milliseconds() == 0 { + return time.Duration(0) // no randomness + } + return time.Duration(randRange(int(2*rp.options.BatchTimeout.Milliseconds()), int(-2*rp.options.BatchTimeout.Milliseconds()))) * time.Millisecond +} + +func randRange(max, min int) int { + return rand.Intn(max-min) + min +} + func (rp *Pool) createPendingStore() *PendingStore { return &PendingStore{ Inspector: rp.inspector, @@ -100,12 +112,12 @@ func (rp *Pool) createPendingStore() *PendingStore { StartTime: time.Now(), Logger: rp.logger, SecondStrikeThreshold: rp.options.SecondStrikeThreshold, - FirstStrikeThreshold: rp.options.FirstStrikeThreshold, + FirstStrikeThreshold: rp.options.FirstStrikeThreshold + rp.addRandomnessToFirstStrike(), OnDelete: func(key string) { rp.semaphore.Release(1) atomic.AddInt64(&rp.size, -1) }, - Epoch: time.Duration(rand.Intn(50))*time.Millisecond + rp.options.FirstStrikeThreshold/10, + Epoch: rp.options.FirstStrikeThreshold / 10, FirstStrikeCallback: rp.striker.OnFirstStrikeTimeout, SecondStrikeCallback: rp.striker.OnSecondStrikeTimeout, } diff --git a/request/pool_test.go b/request/pool_test.go index 644508b2..b095ca06 100644 --- a/request/pool_test.go +++ b/request/pool_test.go @@ -146,6 +146,8 @@ func TestRestartPool(t *testing.T) { SubmitTimeout: time.Second * 10, }, &striker{}) + assert.Equal(t, 5*time.Second, pool.pending.FirstStrikeThreshold) + pool.Restart(true) count := 100 @@ -214,6 +216,8 @@ func TestBasicBatching(t *testing.T) { SubmitTimeout: time.Second * 10, }, &striker{}) + assert.Equal(t, 5*time.Second, pool.pending.FirstStrikeThreshold) + pool.Restart(true) ctx, cancel1 := context.WithTimeout(context.Background(), time.Second) @@ -253,8 +257,13 @@ func TestBasicBatching(t *testing.T) { RequestMaxBytes: 100 * 1024, AutoRemoveTimeout: time.Second * 10, SubmitTimeout: time.Second * 10, + BatchTimeout: time.Second, }, &striker{}) + t.Logf("First strike with random is %f seconds\n", pool.pending.FirstStrikeThreshold.Seconds()) + assert.GreaterOrEqual(t, 7*time.Second, pool.pending.FirstStrikeThreshold) + assert.LessOrEqual(t, 3*time.Second, pool.pending.FirstStrikeThreshold) + pool.Restart(true) assert.NoError(t, pool.Submit(byteReq4)) From f9502cf35ea6da184a1d437e734341736146b9a3 Mon Sep 17 00:00:00 2001 From: Hagar Meir Date: Wed, 8 Oct 2025 11:26:48 +0300 Subject: [PATCH 4/4] add sanity check Signed-off-by: Hagar Meir --- request/pool.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/request/pool.go b/request/pool.go index 34febe6c..30493837 100644 --- a/request/pool.go +++ b/request/pool.go @@ -71,6 +71,11 @@ type PoolOptions struct { // NewPool constructs a new requests pool func NewPool(logger types.Logger, inspector RequestInspector, options PoolOptions, striker Striker) *Pool { + if options.FirstStrikeThreshold < 2*options.BatchTimeout { + logger.Warnf("FirstStrikeThreshold should be at least 2*BatchTimeout") + return nil + } + rp := &Pool{ logger: logger, inspector: inspector,