Skip to content

Commit 4c92ff7

Browse files
committed
address review comments
Signed-off-by: Hagar Meir <hagar.meir@ibm.com>
1 parent 803007a commit 4c92ff7

File tree

3 files changed

+24
-2
lines changed

3 files changed

+24
-2
lines changed

node/batcher/batcher_builder.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ func createMemPool(b *Batcher, config *node_config.BatcherNodeConfig) MemPool {
118118
BatchMaxSize: config.BatchMaxSize,
119119
BatchMaxSizeBytes: config.BatchMaxBytes,
120120
RequestMaxBytes: config.RequestMaxBytes,
121+
BatchTimeout: config.BatchCreationTimeout,
121122
SubmitTimeout: config.SubmitTimeout,
122123
FirstStrikeThreshold: config.FirstStrikeThreshold,
123124
SecondStrikeThreshold: config.SecondStrikeThreshold,

request/pool.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type PoolOptions struct {
6161
BatchMaxSize uint32
6262
BatchMaxSizeBytes uint32
6363
RequestMaxBytes uint64
64+
BatchTimeout time.Duration
6465
SubmitTimeout time.Duration
6566
FirstStrikeThreshold time.Duration
6667
SecondStrikeThreshold time.Duration
@@ -89,6 +90,17 @@ func (rp *Pool) start() {
8990
rp.pending.Start()
9091
}
9192

93+
func (rp *Pool) addRandomnessToFirstStrike() time.Duration {
94+
if rp.options.BatchTimeout.Milliseconds() == 0 {
95+
return time.Duration(0) // no randomness
96+
}
97+
return time.Duration(randRange(int(2*rp.options.BatchTimeout.Milliseconds()), int(-2*rp.options.BatchTimeout.Milliseconds()))) * time.Millisecond
98+
}
99+
100+
func randRange(max, min int) int {
101+
return rand.Intn(max-min) + min
102+
}
103+
92104
func (rp *Pool) createPendingStore() *PendingStore {
93105
return &PendingStore{
94106
Inspector: rp.inspector,
@@ -98,11 +110,11 @@ func (rp *Pool) createPendingStore() *PendingStore {
98110
StartTime: time.Now(),
99111
Logger: rp.logger,
100112
SecondStrikeThreshold: rp.options.SecondStrikeThreshold,
101-
FirstStrikeThreshold: rp.options.FirstStrikeThreshold,
113+
FirstStrikeThreshold: rp.options.FirstStrikeThreshold + rp.addRandomnessToFirstStrike(),
102114
OnDelete: func(key string) {
103115
rp.semaphore.Release(1)
104116
},
105-
Epoch: time.Duration(rand.Intn(50))*time.Millisecond + rp.options.FirstStrikeThreshold/10,
117+
Epoch: rp.options.FirstStrikeThreshold / 10,
106118
FirstStrikeCallback: rp.striker.OnFirstStrikeTimeout,
107119
SecondStrikeCallback: rp.striker.OnSecondStrikeTimeout,
108120
}

request/pool_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ func TestRestartPool(t *testing.T) {
146146
SubmitTimeout: time.Second * 10,
147147
}, &striker{})
148148

149+
assert.Equal(t, 5*time.Second, pool.pending.FirstStrikeThreshold)
150+
149151
pool.Restart(true)
150152

151153
count := 100
@@ -214,6 +216,8 @@ func TestBasicBatching(t *testing.T) {
214216
SubmitTimeout: time.Second * 10,
215217
}, &striker{})
216218

219+
assert.Equal(t, 5*time.Second, pool.pending.FirstStrikeThreshold)
220+
217221
pool.Restart(true)
218222

219223
ctx, cancel1 := context.WithTimeout(context.Background(), time.Second)
@@ -253,8 +257,13 @@ func TestBasicBatching(t *testing.T) {
253257
RequestMaxBytes: 100 * 1024,
254258
AutoRemoveTimeout: time.Second * 10,
255259
SubmitTimeout: time.Second * 10,
260+
BatchTimeout: time.Second,
256261
}, &striker{})
257262

263+
t.Logf("First strike with random is %f seconds\n", pool.pending.FirstStrikeThreshold.Seconds())
264+
assert.GreaterOrEqual(t, 7*time.Second, pool.pending.FirstStrikeThreshold)
265+
assert.LessOrEqual(t, 3*time.Second, pool.pending.FirstStrikeThreshold)
266+
258267
pool.Restart(true)
259268

260269
assert.NoError(t, pool.Submit(byteReq4))

0 commit comments

Comments
 (0)