Skip to content

Commit

Permalink
Ingester/Generator Live trace cleanup (#4365)
Browse files Browse the repository at this point in the history
* moved trace sizes somewhere shareable

Signed-off-by: Joe Elliott <number101010@gmail.com>

* use tracesizes in ingester

Signed-off-by: Joe Elliott <number101010@gmail.com>

* make tests work

Signed-off-by: Joe Elliott <number101010@gmail.com>

* trace bytes in generator

Signed-off-by: Joe Elliott <number101010@gmail.com>

* remove traceCount

Signed-off-by: Joe Elliott <number101010@gmail.com>

* live trace shenanigans

Signed-off-by: Joe Elliott <number101010@gmail.com>

* changelog

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Update modules/generator/processor/localblocks/livetraces.go

Co-authored-by: Mario <mariorvinas@gmail.com>

* Update modules/ingester/instance.go

Co-authored-by: Mario <mariorvinas@gmail.com>

* Test cleanup. Add sz test, restore commented out and fix e2e

Signed-off-by: Joe Elliott <number101010@gmail.com>

* remove todo comment

Signed-off-by: Joe Elliott <number101010@gmail.com>

---------

Signed-off-by: Joe Elliott <number101010@gmail.com>
Co-authored-by: Mario <mariorvinas@gmail.com>
  • Loading branch information
joe-elliott and mapno authored Nov 22, 2024
1 parent 069bd1b commit dc97da1
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 73 deletions.
7 changes: 5 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@
* [ENHANCEMENT] Reduce ingester working set by improving prelloc behavior. [#4344](https://github.com/grafana/tempo/pull/4344) (@joe-elliott)
* [ENHANCEMENT] Use Promtheus fast regexp for TraceQL regular expression matchers. [#4329](https://github.com/grafana/tempo/pull/4329) (@joe-elliott)
**BREAKING CHANGE** All regular expression matchers will now be fully anchored. `span.foo =~ "bar"` will now be evaluated as `span.foo =~ "^bar$"`
* [ENHANCEMENT] Reuse generator code to better refuse "too large" traces. [#4365](https://github.com/grafana/tempo/pull/4365) (@joe-elliott)
This will cause the ingester to more aggressively and correctly refuse traces. Also added two metrics to better track bytes consumed per tenant in the ingester.
`tempo_metrics_generator_live_trace_bytes` and `tempo_ingester_live_trace_bytes`.
* [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen)
* [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott)
* [BUGFIX] Correctly handle 400 Bad Request and 404 Not Found in gRPC streaming [#4144](https://github.com/grafana/tempo/pull/4144) (@mapno)
Expand All @@ -71,9 +74,9 @@
* [BUGFIX] Fix several issues with exemplar values for traceql metrics [#4366](https://github.com/grafana/tempo/pull/4366) (@mdisibio)
* [BUGFIX] Skip computing exemplars for instant queries. [#4204](https://github.com/grafana/tempo/pull/4204) (@javiermolinar)
* [BUGFIX] Gave context to orphaned spans related to various maintenance processes. [#4260](https://github.com/grafana/tempo/pull/4260) (@joe-elliott)
* [BUGFIX] Utilize S3Pass and S3User parameters in tempo-cli options, which were previously unused in the code. [#44236](https://github.com/grafana/tempo/pull/4259) (@faridtmammadov)
* [BUGFIX] Initialize histogram buckets to 0 to avoid downsampling. [#4366](https://github.com/grafana/tempo/pull/4366) (@javiermolinar)

* [BUGFIX] Utilize S3Pass and S3User parameters in tempo-cli options, which were previously unused in the code. [#4259](https://github.com/grafana/tempo/pull/4259) (@faridtmammadov)
* [BUGFIX] Fixed an issue in the generator where the first batch was counted 2x against a traces size. [#4365](https://github.com/grafana/tempo/pull/4365) (@joe-elliott)

# v2.6.1

Expand Down
3 changes: 3 additions & 0 deletions integration/e2e/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ func TestQueryLimits(t *testing.T) {
batch.Spans = allSpans[i : i+1]
require.NoError(t, c.EmitBatch(context.Background(), batch))
util.CallFlush(t, tempo)
// this push along with the double flush is required to forget the too large trace
require.NoError(t, c.EmitBatch(context.Background(), util.MakeThriftBatchWithSpanCount(1)))
util.CallFlush(t, tempo)
time.Sleep(2 * time.Second) // trace idle and flush time are both 1ms
}

Expand Down
13 changes: 13 additions & 0 deletions modules/generator/processor/localblocks/livetraces.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ type liveTrace struct {
id []byte
timestamp time.Time
Batches []*v1.ResourceSpans

sz uint64
}

type liveTraces struct {
hash hash.Hash64
traces map[uint64]*liveTrace

sz uint64
}

func newLiveTraces() *liveTraces {
Expand All @@ -36,6 +40,10 @@ func (l *liveTraces) Len() uint64 {
return uint64(len(l.traces))
}

func (l *liveTraces) Size() uint64 {
return l.sz
}

func (l *liveTraces) Push(traceID []byte, batch *v1.ResourceSpans, max uint64) bool {
token := l.token(traceID)

Expand All @@ -54,6 +62,10 @@ func (l *liveTraces) Push(traceID []byte, batch *v1.ResourceSpans, max uint64) b
l.traces[token] = tr
}

sz := uint64(batch.Size())
tr.sz += sz
l.sz += sz

tr.Batches = append(tr.Batches, batch)
tr.timestamp = time.Now()
return true
Expand All @@ -65,6 +77,7 @@ func (l *liveTraces) CutIdle(idleSince time.Time, immediate bool) []*liveTrace {
for k, tr := range l.traces {
if tr.timestamp.Before(idleSince) || immediate {
res = append(res, tr)
l.sz -= tr.sz
delete(l.traces, k)
}
}
Expand Down
46 changes: 46 additions & 0 deletions modules/generator/processor/localblocks/livetraces_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package localblocks

import (
"math/rand/v2"
"testing"
"time"

"github.com/grafana/tempo/pkg/util/test"
"github.com/stretchr/testify/require"
)

func TestLiveTracesSizesAndLen(t *testing.T) {
lt := newLiveTraces()

expectedSz := uint64(0)
expectedLen := uint64(0)

for i := 0; i < 100; i++ {
id := test.ValidTraceID(nil)
tr := test.MakeTrace(rand.IntN(5)+1, id)

cutTime := time.Now()

// add some traces and confirm size/len
expectedLen++
for _, rs := range tr.ResourceSpans {
expectedSz += uint64(rs.Size())
lt.Push(id, rs, 0)
}

require.Equal(t, expectedSz, lt.Size())
require.Equal(t, expectedLen, lt.Len())

// cut some traces and confirm size/len
cutTraces := lt.CutIdle(cutTime, false)
for _, tr := range cutTraces {
for _, rs := range tr.Batches {
expectedSz -= uint64(rs.Size())
}
expectedLen--
}

require.Equal(t, expectedSz, lt.Size())
require.Equal(t, expectedLen, lt.Len())
}
}
6 changes: 6 additions & 0 deletions modules/generator/processor/localblocks/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ var (
Name: "live_traces",
Help: "Number of live traces",
}, []string{"tenant"})
metricLiveTraceBytes = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "live_trace_bytes",
Help: "Total number of traces created",
}, []string{"tenant"})
metricDroppedTraces = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Expand Down
6 changes: 4 additions & 2 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/google/uuid"
"github.com/grafana/tempo/modules/ingester"
"github.com/grafana/tempo/pkg/flushqueues"
"github.com/grafana/tempo/pkg/tracesizes"
"github.com/grafana/tempo/tempodb"
"go.opentelemetry.io/otel"

Expand Down Expand Up @@ -70,7 +71,7 @@ type Processor struct {

liveTracesMtx sync.Mutex
liveTraces *liveTraces
traceSizes *traceSizes
traceSizes *tracesizes.Tracker

writer tempodb.Writer
}
Expand Down Expand Up @@ -103,7 +104,7 @@ func New(cfg Config, tenant string, wal *wal.WAL, writer tempodb.Writer, overrid
completeBlocks: map[uuid.UUID]*ingester.LocalBlock{},
flushqueue: flushqueues.NewPriorityQueue(metricFlushQueueSize.WithLabelValues(tenant)),
liveTraces: newLiveTraces(),
traceSizes: newTraceSizes(),
traceSizes: tracesizes.New(),
closeCh: make(chan struct{}),
wg: sync.WaitGroup{},
cache: lru.New(100),
Expand Down Expand Up @@ -597,6 +598,7 @@ func (p *Processor) cutIdleTraces(immediate bool) error {

// Record live traces before flushing so we know the high water mark
metricLiveTraces.WithLabelValues(p.tenant).Set(float64(len(p.liveTraces.traces)))
metricLiveTraceBytes.WithLabelValues(p.tenant).Set(float64(p.liveTraces.Size()))

since := time.Now().Add(-p.Cfg.TraceIdlePeriod)
tracesToCut := p.liveTraces.CutIdle(since, immediate)
Expand Down
65 changes: 32 additions & 33 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"
"google.golang.org/grpc/codes"

"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/model/trace"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/tracesizes"
"github.com/grafana/tempo/pkg/util/log"
"github.com/grafana/tempo/pkg/validation"
"github.com/grafana/tempo/tempodb"
Expand Down Expand Up @@ -64,6 +64,11 @@ var (
Name: "ingester_live_traces",
Help: "The current number of lives traces per tenant.",
}, []string{"tenant"})
metricLiveTraceBytes = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tempo",
Name: "ingester_live_trace_bytes",
Help: "The current number of bytes consumed by lives traces per tenant.",
}, []string{"tenant"})
metricBlocksClearedTotal = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "tempo",
Name: "ingester_blocks_cleared_total",
Expand All @@ -82,10 +87,10 @@ var (
)

type instance struct {
tracesMtx sync.Mutex
traces map[uint32]*liveTrace
traceSizes map[uint32]uint32
traceCount atomic.Int32
tracesMtx sync.Mutex
traces map[uint32]*liveTrace
traceSizes *tracesizes.Tracker
traceSizeBytes uint64

headBlockMtx sync.RWMutex
headBlock common.WALBlock
Expand Down Expand Up @@ -115,7 +120,7 @@ type instance struct {
func newInstance(instanceID string, limiter *Limiter, overrides ingesterOverrides, writer tempodb.Writer, l *local.Backend, dedicatedColumns backend.DedicatedColumns) (*instance, error) {
i := &instance{
traces: map[uint32]*liveTrace{},
traceSizes: map[uint32]uint32{},
traceSizes: tracesizes.New(),

instanceID: instanceID,
tracesCreatedTotal: metricTracesCreatedTotal.WithLabelValues(instanceID),
Expand Down Expand Up @@ -190,40 +195,34 @@ func (i *instance) PushBytes(ctx context.Context, id, traceBytes []byte) error {
return status.Errorf(codes.InvalidArgument, "%s is not a valid traceid", hex.EncodeToString(id))
}

// check for max traces before grabbing the lock to better load shed
err := i.limiter.AssertMaxTracesPerUser(i.instanceID, int(i.traceCount.Load()))
if err != nil {
return newMaxLiveTracesError(i.instanceID, err.Error())
}

return i.push(ctx, id, traceBytes)
}

func (i *instance) push(ctx context.Context, id, traceBytes []byte) error {
i.tracesMtx.Lock()
defer i.tracesMtx.Unlock()

tkn := i.tokenForTraceID(id)
err := i.limiter.AssertMaxTracesPerUser(i.instanceID, len(i.traces))
if err != nil {
return newMaxLiveTracesError(i.instanceID, err.Error())
}

maxBytes := i.limiter.limits.MaxBytesPerTrace(i.instanceID)
reqSize := len(traceBytes)

if maxBytes > 0 {
prevSize := int(i.traceSizes[tkn])
reqSize := len(traceBytes)
if prevSize+reqSize > maxBytes {
return newTraceTooLargeError(id, i.instanceID, maxBytes, reqSize)
}
if maxBytes > 0 && !i.traceSizes.Allow(id, reqSize, maxBytes) {
return newTraceTooLargeError(id, i.instanceID, maxBytes, reqSize)
}

trace := i.getOrCreateTrace(id, tkn, maxBytes)
tkn := i.tokenForTraceID(id)
trace := i.getOrCreateTrace(id, tkn)

err := trace.Push(ctx, i.instanceID, traceBytes)
err = trace.Push(ctx, i.instanceID, traceBytes)
if err != nil {
return err
}

if maxBytes > 0 {
i.traceSizes[tkn] += uint32(len(traceBytes))
}
i.traceSizeBytes += uint64(reqSize)

return nil
}
Expand Down Expand Up @@ -281,6 +280,8 @@ func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes

now := time.Now()
if i.lastBlockCut.Add(maxBlockLifetime).Before(now) || i.headBlock.DataLength() >= maxBlockBytes || immediate {
// Reset trace sizes when cutting block
i.traceSizes.ClearIdle(i.lastBlockCut)

// Final flush
err := i.headBlock.Flush()
Expand Down Expand Up @@ -485,15 +486,14 @@ func (i *instance) AddCompletingBlock(b common.WALBlock) {
// getOrCreateTrace will return a new trace object for the given request
//
// It must be called under the i.tracesMtx lock
func (i *instance) getOrCreateTrace(traceID []byte, fp uint32, maxBytes int) *liveTrace {
func (i *instance) getOrCreateTrace(traceID []byte, fp uint32) *liveTrace {
trace, ok := i.traces[fp]
if ok {
return trace
}

trace = newTrace(traceID, maxBytes)
trace = newTrace(traceID)
i.traces[fp] = trace
i.traceCount.Inc()

return trace
}
Expand All @@ -507,11 +507,6 @@ func (i *instance) tokenForTraceID(id []byte) uint32 {

// resetHeadBlock() should be called under lock
func (i *instance) resetHeadBlock() error {
// Reset trace sizes when cutting block
i.tracesMtx.Lock()
i.traceSizes = make(map[uint32]uint32, len(i.traceSizes))
i.tracesMtx.Unlock()

dedicatedColumns := i.getDedicatedColumns()

meta := &backend.BlockMeta{
Expand Down Expand Up @@ -549,17 +544,21 @@ func (i *instance) tracesToCut(cutoff time.Duration, immediate bool) []*liveTrac

// Set this before cutting to give a more accurate number.
metricLiveTraces.WithLabelValues(i.instanceID).Set(float64(len(i.traces)))
metricLiveTraceBytes.WithLabelValues(i.instanceID).Set(float64(i.traceSizeBytes))

cutoffTime := time.Now().Add(cutoff)
tracesToCut := make([]*liveTrace, 0, len(i.traces))

for key, trace := range i.traces {
if cutoffTime.After(trace.lastAppend) || immediate {
tracesToCut = append(tracesToCut, trace)

// decrease live trace bytes
i.traceSizeBytes -= trace.Size()

delete(i.traces, key)
}
}
i.traceCount.Store(int32(len(i.traces)))

return tracesToCut
}
Expand Down
5 changes: 0 additions & 5 deletions modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func TestInstanceSearchTraceQL(t *testing.T) {

// Test after appending to WAL
require.NoError(t, i.CutCompleteTraces(0, true))
assert.Equal(t, int(i.traceCount.Load()), len(i.traces))

sr, err = i.Search(context.Background(), req)
assert.NoError(t, err)
Expand Down Expand Up @@ -597,8 +596,6 @@ func writeTracesForSearch(t *testing.T, i *instance, spanName, tagKey, tagValue
// searchData will be nil if not
err = i.PushBytes(context.Background(), id, traceBytes)
require.NoError(t, err)

assert.Equal(t, int(i.traceCount.Load()), len(i.traces))
}

// traces have to be cut to show up in searches
Expand Down Expand Up @@ -805,8 +802,6 @@ func TestInstanceSearchMetrics(t *testing.T) {

err = i.PushBytes(context.Background(), id, traceBytes)
require.NoError(t, err)

assert.Equal(t, int(i.traceCount.Load()), len(i.traces))
}

search := func() *tempopb.SearchMetrics {
Expand Down
Loading

0 comments on commit dc97da1

Please sign in to comment.