Skip to content

Commit

Permalink
kafka replay speed: don't trim fetchWants (#9919)
Browse files Browse the repository at this point in the history
* kafka replay speed: don't trim fetchWants

I realized that trimming `fetchWant`s can end up discarding offsets in extreme circumstances.

### How it works

If the fetchWant is so big that its size would exceed 2GiB, then we trim it. We trim it by reducing the end offset. The idea is that the next fetchWant will pick up from where this one left off.

### How it can break

We trim the `fetchWant` in `UpdateBytesPerRecord` too. `UpdateBytesPerRecord` can be invoked in `concurrentFEtchers.run` after the `fetchWant` is dispatched. In that case the next `fetchWant` would have already been calculated. And we would end up with a gap.

### Did it break?

It's hard to tell, but it's very unlikely. To reach 2GiB we would have needed to have the estimation for bytes per record be 2 MiB. While these large records are possible, they should be rare and our rolling average estimation for records size shouldn't reach it.

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add tests for MaxBytes and UpdateBytesPerRecord

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Change expectedBytes to be int64 to be able to run on 64-bit systems

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

---------

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov authored Nov 17, 2024
1 parent 966f046 commit c3b4ada
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 17 deletions.
22 changes: 5 additions & 17 deletions pkg/storage/ingest/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ func fetchWantFrom(offset int64, targetMaxBytes, estimatedBytesPerRecord int) fe
func (w fetchWant) Next() fetchWant {
n := fetchWantFrom(w.endOffset, w.targetMaxBytes, w.estimatedBytesPerRecord)
n.estimatedBytesPerRecord = w.estimatedBytesPerRecord
return n.trimIfTooBig()
return n
}

// MaxBytes returns the maximum number of bytes we can fetch in a single request.
// It's capped at math.MaxInt32 to avoid overflow, and it'll always fetch a minimum of 1MB.
func (w fetchWant) MaxBytes() int32 {
fetchBytes := w.expectedBytes()
if fetchBytes > math.MaxInt32 {
if fetchBytes > math.MaxInt32 || fetchBytes < 0 {
// This shouldn't happen because w should have been trimmed before sending the request.
// But we definitely don't want to request negative bytes by casting to int32, so add this safeguard.
return math.MaxInt32
Expand All @@ -114,28 +114,16 @@ func (w fetchWant) UpdateBytesPerRecord(lastFetchBytes int, lastFetchNumberOfRec
actualBytesPerRecord := float64(lastFetchBytes) / float64(lastFetchNumberOfRecords)
w.estimatedBytesPerRecord = int(currentEstimateWeight*float64(w.estimatedBytesPerRecord) + (1-currentEstimateWeight)*actualBytesPerRecord)

return w.trimIfTooBig()
return w
}

// expectedBytes returns how many bytes we'd need to accommodate the range of offsets using estimatedBytesPerRecord.
// They may be more than the kafka protocol supports (> MaxInt32). Use MaxBytes.
func (w fetchWant) expectedBytes() int {
func (w fetchWant) expectedBytes() int64 {
// We over-fetch bytes to reduce the likelihood of under-fetching and having to run another request.
// Based on some testing 65% of under-estimations are by less than 5%. So we account for that.
const overFetchBytesFactor = 1.05
return int(overFetchBytesFactor * float64(w.estimatedBytesPerRecord*int(w.endOffset-w.startOffset)))
}

// trimIfTooBig adjusts the end offset if we expect to fetch too many bytes.
// It's capped at math.MaxInt32 bytes.
func (w fetchWant) trimIfTooBig() fetchWant {
if w.expectedBytes() <= math.MaxInt32 {
return w
}
// We are overflowing, so we need to trim the end offset.
// We do this by calculating how many records we can fetch with the max bytes, and then setting the end offset to that.
w.endOffset = w.startOffset + int64(math.MaxInt32/w.estimatedBytesPerRecord)
return w
return int64(overFetchBytesFactor * float64(int64(w.estimatedBytesPerRecord)*(w.endOffset-w.startOffset)))
}

type fetchResult struct {
Expand Down
101 changes: 101 additions & 0 deletions pkg/storage/ingest/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1220,3 +1221,103 @@ func (w waiterFunc) Wait() { w() }
type refresherFunc func()

func (r refresherFunc) ForceMetadataRefresh() { r() }

func TestFetchWant_MaxBytes(t *testing.T) {
testCases := map[string]struct {
fw fetchWant
expected int32
}{
"small fetch": {
fw: fetchWant{
startOffset: 0,
endOffset: 10,
estimatedBytesPerRecord: 100,
},
expected: 1_000_000, // minimum fetch size
},
"medium fetch": {
fw: fetchWant{
startOffset: 0,
endOffset: 1000,
estimatedBytesPerRecord: 1000,
},
expected: 1_050_000, // 1000 * 1000 * 1.05
},
"huge fetch with huge bytes per record; overflow risk": {
fw: fetchWant{
startOffset: 0,
endOffset: 2 << 31,
estimatedBytesPerRecord: 2 << 30,
},
expected: math.MaxInt32,
},
"negative product due to overflow": {
fw: fetchWant{
startOffset: 0,
endOffset: math.MaxInt64,
estimatedBytesPerRecord: math.MaxInt32,
},
expected: math.MaxInt32,
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
result := tc.fw.MaxBytes()
assert.Equal(t, tc.expected, result)
assert.GreaterOrEqual(t, result, int32(0), "MaxBytes should never return negative values")
})
}
}

func TestFetchWant_UpdateBytesPerRecord(t *testing.T) {
baseWant := fetchWant{
startOffset: 100,
endOffset: 200,
estimatedBytesPerRecord: 1000,
}

testCases := map[string]struct {
lastFetchBytes int
lastFetchRecords int
expectedBytesPerRecord int
}{
"similar to estimate": {
lastFetchBytes: 10000,
lastFetchRecords: 10,
expectedBytesPerRecord: 1000,
},
"much larger than estimate": {
lastFetchBytes: 100000,
lastFetchRecords: 10,
expectedBytesPerRecord: 2800,
},
"much smaller than estimate": {
lastFetchBytes: 1000,
lastFetchRecords: 10,
expectedBytesPerRecord: 820,
},
"risk of overflow": {
lastFetchBytes: math.MaxInt64,
lastFetchRecords: 1,
expectedBytesPerRecord: math.MaxInt64/5 + int(float64(baseWant.estimatedBytesPerRecord)*0.8),
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
result := baseWant.UpdateBytesPerRecord(tc.lastFetchBytes, tc.lastFetchRecords)

assert.Equal(t, baseWant.startOffset, result.startOffset, "startOffset should not change")
assert.Equal(t, baseWant.endOffset, result.endOffset, "endOffset should not change")

// Check the new bytes per record estimation. Because of large numbers and floats we allow for 0.1% error.
assert.InEpsilon(t, tc.expectedBytesPerRecord, result.estimatedBytesPerRecord, 0.001)

// Verify MaxBytes() doesn't overflow or return negative values
maxBytes := result.MaxBytes()
assert.GreaterOrEqual(t, maxBytes, int32(0), "MaxBytes should never return negative values")
assert.LessOrEqual(t, maxBytes, int32(math.MaxInt32), "MaxBytes should never exceed MaxInt32")
})
}
}

0 comments on commit c3b4ada

Please sign in to comment.