diff --git a/CHANGELOG.md b/CHANGELOG.md index a284df8208..0022e8d6f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#4874](https://github.com/thanos-io/thanos/pull/4874) Query: Add `--endpoint-strict` flag to statically configure Thanos API server endpoints. It is similar to `--store-strict` but supports passing any Thanos gRPC APIs: StoreAPI, MetadataAPI, RulesAPI, TargetsAPI and ExemplarsAPI. - [#4868](https://github.com/thanos-io/thanos/pull/4868) Rule: Support ruleGroup limit introduced by Prometheus v2.31.0. - [#4897](https://github.com/thanos-io/thanos/pull/4897) Querier: Add validation for querier address flags. +- [#4903](https://github.com/thanos-io/thanos/pull/4903) Compactor: Added tracing support for compaction. ### Fixed diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 7d504fcda3..ce24c7e486 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -42,6 +42,7 @@ import ( "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/runutil" httpserver "github.com/thanos-io/thanos/pkg/server/http" + "github.com/thanos-io/thanos/pkg/tracing" "github.com/thanos-io/thanos/pkg/ui" ) @@ -300,6 +301,7 @@ func runCompact( } ctx, cancel := context.WithCancel(context.Background()) + ctx = tracing.ContextWithTracer(ctx, tracer) defer func() { if rerr != nil { diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 63d8cf1e21..c8e9945ba7 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -17,6 +17,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/oklog/ulid" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -31,6 +32,7 @@ import ( "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/tracing" ) type ResolutionLevel int64 @@ -764,7 +766,11 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") } - shouldRerun, compID, err := cg.compact(ctx, subDir, planner, comp) + var err error + tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) error { + shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp) + return err + }, opentracing.Tags{"group.key": cg.Key()}) if err != nil { cg.compactionFailures.Inc() return false, ulid.ULID{}, err @@ -980,7 +986,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp overlappingBlocks = true } - toCompact, err := planner.Plan(ctx, cg.metasByMinTime) + var toCompact []*metadata.Meta + tracing.DoInSpanWithErr(ctx, "compaction_planning", func(ctx context.Context) error { + toCompact, err = planner.Plan(ctx, cg.metasByMinTime) + return err + }) if err != nil { return false, ulid.ULID{}, errors.Wrap(err, "plan compaction") } @@ -1008,12 +1018,20 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp uniqueSources[s] = struct{}{} } - if err := block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir); err != nil { + tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error { + err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir) + return err + }, opentracing.Tags{"block.id": meta.ULID}) + if err != nil { return false, ulid.ULID{}, retry(errors.Wrapf(err, "download block %s", meta.ULID)) } // Ensure all input blocks are valid. - stats, err := block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) + var stats block.HealthStats + tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) error { + stats, err = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) + return err + }, opentracing.Tags{"block.id": meta.ULID}) if err != nil { return false, ulid.ULID{}, errors.Wrapf(err, "gather index issues for block %s", bdir) } @@ -1039,7 +1057,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp level.Info(cg.logger).Log("msg", "downloaded and verified blocks; compacting blocks", "plan", fmt.Sprintf("%v", toCompactDirs), "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) begin = time.Now() - compID, err = comp.Compact(dir, toCompactDirs, nil) + tracing.DoInSpanWithErr(ctx, "compaction", func(ctx context.Context) error { + compID, err = comp.Compact(dir, toCompactDirs, nil) + return err + }) if err != nil { return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact blocks %v", toCompactDirs)) } @@ -1081,7 +1102,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp } // Ensure the output block is valid. - if err := block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil { + tracing.DoInSpanWithErr(ctx, "compaction_verify_index", func(ctx context.Context) error { + err = block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime) + return err + }) + if !cg.acceptMalformedIndex && err != nil { return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir)) } @@ -1095,7 +1120,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp begin = time.Now() - if err := block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc); err != nil { + tracing.DoInSpanWithErr(ctx, "compaction_block_upload", func(ctx context.Context) error { + err = block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc) + return err + }) + if err != nil { return false, ulid.ULID{}, retry(errors.Wrapf(err, "upload of %s failed", compID)) } level.Info(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) @@ -1104,7 +1133,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp // into the next planning cycle. // Eventually the block we just uploaded should get synced into the group again (including sync-delay). for _, meta := range toCompact { - if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil { + tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error { + err = cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())) + return err + }, opentracing.Tags{"block.id": meta.ULID}) + if err != nil { return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket")) } cg.groupGarbageCollectedBlocks.Inc() diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go index 867eef7ce0..1987382d58 100644 --- a/pkg/tracing/tracing.go +++ b/pkg/tracing/tracing.go @@ -7,6 +7,7 @@ import ( "context" "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" ) const ( @@ -72,6 +73,18 @@ func StartSpan(ctx context.Context, operationName string, opts ...opentracing.St return span, opentracing.ContextWithSpan(ctx, span) } +// DoInSpanWtihErr executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any. +// It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer notification. +// It logs the error inside the new span created, which differentiates it from DoInSpan and DoWithSpan. +func DoInSpanWithErr(ctx context.Context, operationName string, doFn func(context.Context) error, opts ...opentracing.StartSpanOption) { + span, newCtx := StartSpan(ctx, operationName, opts...) + defer span.Finish() + err := doFn(newCtx) + if err != nil { + ext.LogError(span, err) + } +} + // DoInSpan executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any. // It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer notification. func DoInSpan(ctx context.Context, operationName string, doFn func(context.Context), opts ...opentracing.StartSpanOption) {