diff --git a/pkg/repl/client.go b/pkg/repl/client.go index 0648bf7..312f060 100644 --- a/pkg/repl/client.go +++ b/pkg/repl/client.go @@ -33,7 +33,8 @@ const ( // multiple-flush-threads, which should help it group commit and still be fast. // This is only used as an initial starting value. It will auto-scale based on the DefaultTargetBatchTime. DefaultBatchSize = 1000 - + // minBatchSize is the minimum batch size that we will allow the targetBatchSize to be. + minBatchSize = 5 // DefaultTargetBatchTime is the target time for flushing REPLACE/DELETE statements. DefaultTargetBatchTime = time.Millisecond * 500 @@ -623,6 +624,9 @@ func (c *Client) feedback(numberOfKeys int, d time.Duration) { if len(c.timingHistory) >= 10 { timePerKey := table.LazyFindP90(c.timingHistory) newBatchSize := int64(float64(c.targetBatchTime) / float64(timePerKey)) + if newBatchSize < minBatchSize { + newBatchSize = minBatchSize + } atomic.StoreInt64(&c.targetBatchSize, newBatchSize) c.timingHistory = nil // reset } diff --git a/pkg/repl/client_test.go b/pkg/repl/client_test.go index b62692c..4fcfc34 100644 --- a/pkg/repl/client_test.go +++ b/pkg/repl/client_test.go @@ -358,6 +358,12 @@ func TestFeedback(t *testing.T) { client.feedback(1000, time.Second) } assert.Equal(t, int64(500), client.targetBatchSize) // less keys. + + // Test with a way slower chunk. + for range 10 { + client.feedback(500, time.Second*100) + } + assert.Equal(t, int64(5), client.targetBatchSize) // equals the minimum. } // TestBlockWait tests that the BlockWait function will: