Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[r174] Ingester memory improvements by adjusting prealloc #4357

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
* [ENHANCEMENT] Collection of query-frontend changes to reduce allocs. [#4242]https://github.com/grafana/tempo/pull/4242 (@joe-elliott)
* [ENHANCEMENT] Added `insecure-skip-verify` option in tempo-cli to skip SSL certificate validation when connecting to the S3 backend. [#44236](https://github.com/grafana/tempo/pull/4259) (@faridtmammadov)
* [ENHANCEMENT] Add `invalid_utf8` to reasons spanmetrics will discard spans. [#4293](https://github.com/grafana/tempo/pull/4293) (@zalegrala)
* [ENHANCEMENT] Reduce frontend and querier allocations by dropping HTTP headers early in the pipeline. [#4298](https://github.com/grafana/tempo/pull/4298) (@joe-elliott)
* [ENHANCEMENT] Reduce ingester working set by improving prelloc behavior. [#4344](https://github.com/grafana/tempo/pull/4344) (@joe-elliott)
* [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen)
* [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott)
* [BUGFIX] Correctly handle 400 Bad Request and 404 Not Found in gRPC streaming [#4144](https://github.com/grafana/tempo/pull/4144) (@mapno)
Expand Down
7 changes: 3 additions & 4 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,14 +430,13 @@ func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string
localCtx = user.InjectOrgID(localCtx, userID)

req := tempopb.PushBytesRequest{
Traces: make([]tempopb.PreallocBytes, len(indexes)),
Ids: make([]tempopb.PreallocBytes, len(indexes)),
SearchData: nil, // support for flatbuffer/v2 search has been removed. todo: cleanup the proto
Traces: make([]tempopb.PreallocBytes, len(indexes)),
Ids: make([][]byte, len(indexes)),
}

for i, j := range indexes {
req.Traces[i].Slice = marshalledTraces[j][0:]
req.Ids[i].Slice = traces[j].id
req.Ids[i] = traces[j].id
}

c, err := d.pool.GetClientFor(ingester.Addr)
Expand Down
6 changes: 2 additions & 4 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,10 +644,8 @@ func pushBatchV1(t testing.TB, i *Ingester, batch *v1.ResourceSpans, id []byte)
Slice: buffer,
},
},
Ids: []tempopb.PreallocBytes{
{
Slice: id,
},
Ids: [][]byte{
id,
},
})
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (i *instance) PushBytesRequest(ctx context.Context, req *tempopb.PushBytesR
pr := &tempopb.PushResponse{}

for j := range req.Traces {
err := i.PushBytes(ctx, req.Ids[j].Slice, req.Traces[j].Slice)
err := i.PushBytes(ctx, req.Ids[j], req.Traces[j].Slice)
pr.ErrorsByTrace = i.addTraceError(pr.ErrorsByTrace, err, len(req.Traces), j)
}

Expand Down
14 changes: 6 additions & 8 deletions modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ func BenchmarkInstancePush(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Rotate trace ID
binary.LittleEndian.PutUint32(request.Ids[0].Slice, uint32(i))
binary.LittleEndian.PutUint32(request.Ids[0], uint32(i))
response := instance.PushBytesRequest(context.Background(), request)
errored, _, _ := CheckPushBytesError(response)
require.False(b, errored, "push failed: %w", response.ErrorsByTrace)
Expand Down Expand Up @@ -825,9 +825,9 @@ func makePushBytesRequest(traceID []byte, batch *v1_trace.ResourceSpans) *tempop
}

return &tempopb.PushBytesRequest{
Ids: []tempopb.PreallocBytes{{
Slice: traceID,
}},
Ids: [][]byte{
traceID,
},
Traces: []tempopb.PreallocBytes{{
Slice: buffer,
}},
Expand Down Expand Up @@ -965,7 +965,7 @@ func makePushBytesRequestMultiTraces(traceIDs [][]byte, maxBytes []int) *tempopb
}
traces := makeTraces(batches)

byteIDs := make([]tempopb.PreallocBytes, 0, len(traceIDs))
byteIDs := make([][]byte, 0, len(traceIDs))
byteTraces := make([]tempopb.PreallocBytes, 0, len(traceIDs))

for index, id := range traceIDs {
Expand All @@ -974,9 +974,7 @@ func makePushBytesRequestMultiTraces(traceIDs [][]byte, maxBytes []int) *tempopb
panic(err)
}

byteIDs = append(byteIDs, tempopb.PreallocBytes{
Slice: id,
})
byteIDs = append(byteIDs, id)
byteTraces = append(byteTraces, tempopb.PreallocBytes{
Slice: buffer,
})
Expand Down
83 changes: 49 additions & 34 deletions pkg/tempopb/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,42 @@ package pool

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// Pool is a bucketed pool for variably sized byte slices.
var metricAllocOutPool = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "tempo",
Name: "ingester_prealloc_miss_bytes_total",
Help: "The total number of alloc'ed bytes that missed the sync pools.",
})

// Pool is a linearly bucketed pool for variably sized byte slices.
type Pool struct {
buckets []sync.Pool
sizes []int
bktSize int
// make is the function used to create an empty slice when none exist yet.
make func(int) []byte
}

// New returns a new Pool with size buckets for minSize to maxSize
// increasing by the given factor.
func New(minSize, maxSize int, factor float64, makeFunc func(int) []byte) *Pool {
if minSize < 1 {
panic("invalid minimum pool size")
}
func New(maxSize, bktSize int, makeFunc func(int) []byte) *Pool {
if maxSize < 1 {
panic("invalid maximum pool size")
}
if factor < 1 {
if bktSize < 1 {
panic("invalid factor")
}

var sizes []int

for s := minSize; s <= maxSize; s = int(float64(s) * factor) {
sizes = append(sizes, s)
if maxSize%bktSize != 0 {
panic("invalid bucket size")
}

bkts := maxSize / bktSize

p := &Pool{
buckets: make([]sync.Pool, len(sizes)),
sizes: sizes,
buckets: make([]sync.Pool, bkts),
bktSize: bktSize,
make: makeFunc,
}

Expand All @@ -46,29 +50,40 @@ func New(minSize, maxSize int, factor float64, makeFunc func(int) []byte) *Pool

// Get returns a new byte slices that fits the given size.
func (p *Pool) Get(sz int) []byte {
for i, bktSize := range p.sizes {
if sz > bktSize {
continue
}
b := p.buckets[i].Get()
if b == nil {
b = p.make(bktSize)
}
return b.([]byte)
if sz < 0 {
sz = 0 // just panic?
}
return p.make(sz)

// Find the right bucket.
bkt := sz / p.bktSize

if bkt >= len(p.buckets) {
metricAllocOutPool.Add(float64(sz)) // track the number of bytes alloc'ed outside the pool for future tuning
return p.make(sz)
}

b := p.buckets[bkt].Get()
if b == nil {
sz := (bkt + 1) * p.bktSize
b = p.make(sz)
}
return b.([]byte)
}

// Put adds a slice to the right bucket in the pool. This method has been adjusted from its initial
// implementation to ignore byte slices that dont have the correct size
// Put adds a slice to the right bucket in the pool.
func (p *Pool) Put(s []byte) {
c := cap(s)
for i, size := range p.sizes {
if c == size {
p.buckets[i].Put(s) // nolint: staticcheck
}
if c < size {
return
}

if c%p.bktSize != 0 {
return
}
bkt := (c / p.bktSize) - 1
if bkt < 0 {
return
}
if bkt >= len(p.buckets) {
return
}

p.buckets[bkt].Put(s) // nolint: staticcheck
}
20 changes: 15 additions & 5 deletions pkg/tempopb/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,30 @@ func makeFunc(size int) []byte {
}

func TestPool(t *testing.T) {
testPool := New(1, 8, 2, makeFunc)
testPool := New(20, 4, makeFunc)
cases := []struct {
size int
expectedCap int
}{
{
size: -1,
expectedCap: 1,
size: -5,
expectedCap: 4,
},
{
size: 0,
expectedCap: 4,
},
{
size: 3,
expectedCap: 4,
},
{
size: 10,
expectedCap: 10,
expectedCap: 12,
},
{
size: 23,
expectedCap: 23,
},
}
for _, c := range cases {
Expand All @@ -40,7 +48,7 @@ func TestPool(t *testing.T) {
}

func TestPoolSlicesAreAlwaysLargeEnough(t *testing.T) {
testPool := New(1, 1024, 2, makeFunc)
testPool := New(1025, 5, makeFunc)

for i := 0; i < 10000; i++ {
size := rand.Intn(1000)
Expand All @@ -51,5 +59,7 @@ func TestPoolSlicesAreAlwaysLargeEnough(t *testing.T) {
ret := testPool.Get(size)

require.True(t, cap(ret) >= size)

testPool.Put(ret)
}
}
3 changes: 1 addition & 2 deletions pkg/tempopb/prealloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package tempopb

import "github.com/grafana/tempo/pkg/tempopb/pool"

// buckets: [0.5KiB, 1KiB, 2KiB, 4KiB, 8KiB, 16KiB] ...
var bytePool = pool.New(500, 64_000, 2, func(size int) []byte { return make([]byte, 0, size) })
var bytePool = pool.New(100_000, 400, func(size int) []byte { return make([]byte, 0, size) })

// PreallocBytes is a (repeated bytes slices) which preallocs slices on Unmarshal.
type PreallocBytes struct {
Expand Down
Loading