Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Traces Cleanup #4260

Merged
merged 2 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
* [BUGFIX] Pushes a 0 to classic histogram's counter when the series is new to allow Prometheus to start from a non-null value. [#4140](https://github.com/grafana/tempo/pull/4140) (@mapno)
* [BUGFIX] Fix counter samples being downsampled by backdate to the previous minute the initial sample when the series is new [#44236](https://github.com/grafana/tempo/pull/4236) (@javiermolinar)
* [BUGFIX] Skip computing exemplars for instant queries. [#4204](https://github.com/grafana/tempo/pull/4204) (@javiermolinar)
* [BUGFIX] Gave context to orphaned spans related to various maintenance processes. [#4260](https://github.com/grafana/tempo/pull/4260) (@joe-elliott)

# v2.6.1

Expand Down
3 changes: 3 additions & 0 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ func (p *Processor) completeBlock() error {
b = firstWalBlock
)

ctx, span := tracer.Start(ctx, "Processor.CompleteBlock")
defer span.End()

iter, err := b.Iterator()
if err != nil {
return err
Expand Down
22 changes: 13 additions & 9 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) {
}

if blockID != uuid.Nil {
level.Info(log.Logger).Log("msg", "head block cut. enqueueing flush op", "userid", instance.instanceID, "block", blockID)
level.Info(log.Logger).Log("msg", "head block cut. enqueueing flush op", "tenant", instance.instanceID, "block", blockID)
// jitter to help when flushing many instances at the same time
// no jitter if immediate (initiated via /flush handler for example)
i.enqueue(&flushOp{
Expand Down Expand Up @@ -211,7 +211,7 @@ func (i *Ingester) flushLoop(j int) {
var err error

if op.kind == opKindComplete {
retry, err = i.handleComplete(op)
retry, err = i.handleComplete(context.Background(), op)
} else {
retry, err = i.handleFlush(context.Background(), op.userID, op.blockID)
}
Expand Down Expand Up @@ -243,7 +243,11 @@ func handleAbandonedOp(op *flushOp) {
"op", op.kind, "block", op.blockID.String(), "attempts", op.attempts)
}

func (i *Ingester) handleComplete(op *flushOp) (retry bool, err error) {
func (i *Ingester) handleComplete(ctx context.Context, op *flushOp) (retry bool, err error) {
ctx, sp := tracer.Start(ctx, "ingester.Complete", trace.WithAttributes(attribute.String("tenant", op.userID), attribute.String("blockID", op.blockID.String())))
defer sp.End()
withSpan(level.Info(log.Logger), sp).Log("msg", "flushing block", "tenant", op.userID, "block", op.blockID.String())

// No point in proceeding if shutdown has been initiated since
// we won't be able to queue up the next flush op
if i.flushQueues.IsStopped() {
Expand All @@ -252,20 +256,20 @@ func (i *Ingester) handleComplete(op *flushOp) (retry bool, err error) {
}

start := time.Now()
level.Info(log.Logger).Log("msg", "completing block", "userid", op.userID, "blockID", op.blockID)
level.Info(log.Logger).Log("msg", "completing block", "tenant", op.userID, "blockID", op.blockID)
instance, err := i.getOrCreateInstance(op.userID)
if err != nil {
return false, err
}

err = instance.CompleteBlock(op.blockID)
level.Info(log.Logger).Log("msg", "block completed", "userid", op.userID, "blockID", op.blockID, "duration", time.Since(start))
err = instance.CompleteBlock(ctx, op.blockID)
level.Info(log.Logger).Log("msg", "block completed", "tenant", op.userID, "blockID", op.blockID, "duration", time.Since(start))
if err != nil {
handleFailedOp(op, err)

if op.attempts >= maxCompleteAttempts {
level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "Block exceeded max completion errors. Deleting. POSSIBLE DATA LOSS",
"userID", op.userID, "attempts", op.attempts, "block", op.blockID.String())
"tenant", op.userID, "attempts", op.attempts, "block", op.blockID.String())

// Delete WAL and move on
err = instance.ClearCompletingBlock(op.blockID)
Expand Down Expand Up @@ -306,9 +310,9 @@ func withSpan(logger gklog.Logger, sp trace.Span) gklog.Logger {
}

func (i *Ingester) handleFlush(ctx context.Context, userID string, blockID uuid.UUID) (retry bool, err error) {
ctx, sp := tracer.Start(ctx, "flush", trace.WithAttributes(attribute.String("organization", userID), attribute.String("blockID", blockID.String())))
ctx, sp := tracer.Start(ctx, "ingester.Flush", trace.WithAttributes(attribute.String("tenant", userID), attribute.String("blockID", blockID.String())))
defer sp.End()
withSpan(level.Info(log.Logger), sp).Log("msg", "flushing block", "userid", userID, "block", blockID.String())
withSpan(level.Info(log.Logger), sp).Log("msg", "flushing block", "tenant", userID, "block", blockID.String())

instance, err := i.getOrCreateInstance(userID)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func TestWalDropsZeroLength(t *testing.T) {
blockID, err := instance.CutBlockIfReady(0, 0, true)
require.NoError(t, err)

err = instance.CompleteBlock(blockID)
err = instance.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

err = instance.ClearCompletingBlock(blockID)
Expand Down Expand Up @@ -408,7 +408,7 @@ func TestDedicatedColumns(t *testing.T) {
inst.blocksMtx.RUnlock()

// Complete block
err = inst.CompleteBlock(blockID)
err = inst.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

// TODO: This check should be included as part of the read path
Expand Down
4 changes: 1 addition & 3 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes
}

// CompleteBlock moves a completingBlock to a completeBlock. The new completeBlock has the same ID.
func (i *instance) CompleteBlock(blockID uuid.UUID) error {
func (i *instance) CompleteBlock(ctx context.Context, blockID uuid.UUID) error {
i.blocksMtx.Lock()
var completingBlock common.WALBlock
for _, iterBlock := range i.completingBlocks {
Expand All @@ -328,8 +328,6 @@ func (i *instance) CompleteBlock(blockID uuid.UUID) error {
return fmt.Errorf("error finding completingBlock")
}

ctx := context.Background()

backendBlock, err := i.writer.CompleteBlockWithBackend(ctx, completingBlock, i.localReader, i.localWriter)
if err != nil {
return fmt.Errorf("error completing wal block with local backend: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (i *instance) SearchTagsV2(ctx context.Context, req *tempopb.SearchTagsRequ
}

if distinctValues.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tags in instance exceeded limit, reduce cardinality or size of tags", "userID", userID, "limit", limit)
level.Warn(log.Logger).Log("msg", "size of tags in instance exceeded limit, reduce cardinality or size of tags", "tenant", userID, "limit", limit)
}

collected := distinctValues.Strings()
Expand Down Expand Up @@ -382,7 +382,7 @@ func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempop
}

if distinctValues.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName, "userID", userID, "limit", limit, "size", distinctValues.Size())
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName, "tenant", userID, "limit", limit, "size", distinctValues.Size())
}

return &tempopb.SearchTagValuesResponse{
Expand Down Expand Up @@ -589,7 +589,7 @@ func (i *instance) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTag
}

if valueCollector.Exceeded() {
_ = level.Warn(log.Logger).Log("msg", "size of tag values exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "size", valueCollector.Size())
_ = level.Warn(log.Logger).Log("msg", "size of tag values exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "tenant", userID, "limit", limit, "size", valueCollector.Size())
}

resp := &tempopb.SearchTagValuesV2Response{
Expand Down
14 changes: 7 additions & 7 deletions modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestInstanceSearch(t *testing.T) {
checkEqual(t, ids, sr)

// Test after completing a block
err = i.CompleteBlock(blockID)
err = i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

sr, err = i.Search(context.Background(), req)
Expand Down Expand Up @@ -133,7 +133,7 @@ func TestInstanceSearchTraceQL(t *testing.T) {
checkEqual(t, ids, sr)

// Test after completing a block
err = i.CompleteBlock(blockID)
err = i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

sr, err = i.Search(context.Background(), req)
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestInstanceSearchWithStartAndEnd(t *testing.T) {
searchAndAssert(req, uint32(100))

// Test after completing a block
err = i.CompleteBlock(blockID)
err = i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)
searchAndAssert(req, uint32(200))

Expand Down Expand Up @@ -267,7 +267,7 @@ func TestInstanceSearchTags(t *testing.T) {
testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues)

// Test after completing a block
err = i.CompleteBlock(blockID)
err = i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues)
Expand Down Expand Up @@ -332,7 +332,7 @@ func TestInstanceSearchTagAndValuesV2(t *testing.T) {
testSearchTagsAndValuesV2(t, userCtx, i, tagKey, queryThatMatches, expectedTagValues, expectedEventTagValues, expectedLinkTagValues)

// Test after completing a block
err = i.CompleteBlock(blockID)
err = i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)
require.NoError(t, i.ClearCompletingBlock(blockID)) // Clear the completing block

Expand Down Expand Up @@ -681,7 +681,7 @@ func TestInstanceSearchDoesNotRace(t *testing.T) {
// Cut wal, complete, delete wal, then flush
blockID, _ := i.CutBlockIfReady(0, 0, true)
if blockID != uuid.Nil {
err := i.CompleteBlock(blockID)
err := i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)
err = i.ClearCompletingBlock(blockID)
require.NoError(t, err)
Expand Down Expand Up @@ -837,7 +837,7 @@ func TestInstanceSearchMetrics(t *testing.T) {
require.Less(t, numBytes, m.InspectedBytes)

// Test after completing a block
err = i.CompleteBlock(blockID)
err = i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)
err = i.ClearCompletingBlock(blockID)
require.NoError(t, err)
Expand Down
14 changes: 7 additions & 7 deletions modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestInstance(t *testing.T) {
require.NoError(t, err, "unexpected error cutting block")
require.NotEqual(t, blockID, uuid.Nil)

err = i.CompleteBlock(blockID)
err = i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err, "unexpected error completing block")

block := i.GetBlockToBeFlushed(blockID)
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestInstanceFind(t *testing.T) {

queryAll(t, i, ids, traces)

err = i.CompleteBlock(blockID)
err = i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

queryAll(t, i, ids, traces)
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestInstanceDoesNotRace(t *testing.T) {
go concurrent(func() {
blockID, _ := i.CutBlockIfReady(0, 0, false)
if blockID != uuid.Nil {
err := i.CompleteBlock(blockID)
err := i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err, "unexpected error completing block")
block := i.GetBlockToBeFlushed(blockID)
require.NotNil(t, block)
Expand Down Expand Up @@ -487,7 +487,7 @@ func TestInstanceCutBlockIfReady(t *testing.T) {
blockID, err := instance.CutBlockIfReady(tc.maxBlockLifetime, tc.maxBlockBytes, tc.immediate)
require.NoError(t, err)

err = instance.CompleteBlock(blockID)
err = instance.CompleteBlock(context.Background(), blockID)
if tc.expectedToCutBlock {
require.NoError(t, err, "unexpected error completing block")
}
Expand Down Expand Up @@ -738,7 +738,7 @@ func BenchmarkInstanceFindTraceByIDFromCompleteBlock(b *testing.B) {
require.NoError(b, err)
id, err := instance.CutBlockIfReady(0, 0, true)
require.NoError(b, err)
err = instance.CompleteBlock(id)
err = instance.CompleteBlock(context.Background(), id)
require.NoError(b, err)

require.Equal(b, 1, len(instance.completeBlocks))
Expand Down Expand Up @@ -776,7 +776,7 @@ func benchmarkInstanceSearch(b testing.TB) {
// force the traces to be in a complete block
id, err := instance.CutBlockIfReady(0, 0, true)
require.NoError(b, err)
err = instance.CompleteBlock(id)
err = instance.CompleteBlock(context.Background(), id)
require.NoError(b, err)

require.Equal(b, 1, len(instance.completeBlocks))
Expand Down Expand Up @@ -880,7 +880,7 @@ func BenchmarkInstanceContention(t *testing.B) {
go concurrent(func() {
blockID, _ := i.CutBlockIfReady(0, 0, false)
if blockID != uuid.Nil {
err := i.CompleteBlock(blockID)
err := i.CompleteBlock(context.Background(), blockID)
require.NoError(t, err, "unexpected error completing block")
err = i.ClearCompletingBlock(blockID)
require.NoError(t, err, "unexpected error clearing wal block")
Expand Down
6 changes: 6 additions & 0 deletions pkg/usagestats/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel"

"github.com/grafana/tempo/cmd/tempo/build"
"github.com/grafana/tempo/tempodb/backend"
Expand All @@ -36,6 +37,8 @@ var (

stabilityCheckInterval = 5 * time.Second
stabilityMinimumRequired = 6

tracer = otel.Tracer("usagestats/Reporter")
)

type Reporter struct {
Expand Down Expand Up @@ -158,6 +161,9 @@ func ensureStableKey(ctx context.Context, kvClient kv.Client, logger log.Logger)
}

func (rep *Reporter) init(ctx context.Context) {
ctx, span := tracer.Start(ctx, "UsageReporter.init")
defer span.End()

if rep.conf.Leader {
rep.cluster = rep.initLeader(ctx)
return
Expand Down
2 changes: 1 addition & 1 deletion tempodb/blocklist/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, span := tracer.Start(ctx, "Poller.Do")
ctx, span := tracer.Start(ctx, "Poller.Do")
defer span.End()

tenants, err := p.reader.Tenants(ctx)
Expand Down