Skip to content

Commit

Permalink
Compactor: Add tracing support (#4903)
Browse files Browse the repository at this point in the history
* first draft for tracing

Signed-off-by: metonymic-smokey <ahuja.aditi@gmail.com>

* add tracer to context

Signed-off-by: metonymic-smokey <ahuja.aditi@gmail.com>

* minor fixes

Signed-off-by: metonymic-smokey <ahuja.aditi@gmail.com>

* create common block spans; log block errors

Signed-off-by: metonymic-smokey <ahuja.aditi@gmail.com>

* reverted to generic block spans

Signed-off-by: metonymic-smokey <ahuja.aditi@gmail.com>

* changed block ID tag; add span for block delete

Signed-off-by: metonymic-smokey <ahuja.aditi@gmail.com>

* minor fix

Signed-off-by: metonymic-smokey <ahuja.aditi@gmail.com>

* removed extra spans

Signed-off-by: metonymic-smokey <ahuja.aditi@gmail.com>

* addressed some review comments

Signed-off-by: metonymic-smokey <ahuja.aditi@gmail.com>

* removed block spans

Signed-off-by: metonymic-smokey <ahuja.aditi@gmail.com>

* changed group key

Signed-off-by: metonymic-smokey <ahuja.aditi@gmail.com>

* removed extra var declarations

Signed-off-by: metonymic-smokey <ahuja.aditi@gmail.com>

* removed comments

Signed-off-by: metonymic-smokey <ahuja.aditi@gmail.com>

* added changelog entry

Signed-off-by: metonymic-smokey <ahuja.aditi@gmail.com>

* update healthcheck to healthstats.

Co-authored-by: Matej Gera <38492574+matej-g@users.noreply.github.com>
Signed-off-by: metonymic-smokey <ahuja.aditi@gmail.com>

* draft: logging errors

Signed-off-by: metonymic-smokey <ahuja.aditi@gmail.com>

* removed extra line

Signed-off-by: metonymic-smokey <ahuja.aditi@gmail.com>

* used alternate function to log errors

Signed-off-by: metonymic-smokey <ahuja.aditi@gmail.com>

* minor nits

Signed-off-by: metonymic-smokey <ahuja.aditi@gmail.com>

Co-authored-by: Matej Gera <38492574+matej-g@users.noreply.github.com>
  • Loading branch information
metonymic-smokey and matej-g authored Dec 1, 2021
1 parent afd23cf commit d08a12a
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4874](https://github.com/thanos-io/thanos/pull/4874) Query: Add `--endpoint-strict` flag to statically configure Thanos API server endpoints. It is similar to `--store-strict` but supports passing any Thanos gRPC APIs: StoreAPI, MetadataAPI, RulesAPI, TargetsAPI and ExemplarsAPI.
- [#4868](https://github.com/thanos-io/thanos/pull/4868) Rule: Support ruleGroup limit introduced by Prometheus v2.31.0.
- [#4897](https://github.com/thanos-io/thanos/pull/4897) Querier: Add validation for querier address flags.
- [#4903](https://github.com/thanos-io/thanos/pull/4903) Compactor: Added tracing support for compaction.

### Fixed

Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/ui"
)

Expand Down Expand Up @@ -300,6 +301,7 @@ func runCompact(
}

ctx, cancel := context.WithCancel(context.Background())
ctx = tracing.ContextWithTracer(ctx, tracer)

defer func() {
if rerr != nil {
Expand Down
49 changes: 41 additions & 8 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/tracing"
)

type ResolutionLevel int64
Expand Down Expand Up @@ -764,7 +766,11 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp
return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir")
}

shouldRerun, compID, err := cg.compact(ctx, subDir, planner, comp)
var err error
tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) error {
shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp)
return err
}, opentracing.Tags{"group.key": cg.Key()})
if err != nil {
cg.compactionFailures.Inc()
return false, ulid.ULID{}, err
Expand Down Expand Up @@ -980,7 +986,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
overlappingBlocks = true
}

toCompact, err := planner.Plan(ctx, cg.metasByMinTime)
var toCompact []*metadata.Meta
tracing.DoInSpanWithErr(ctx, "compaction_planning", func(ctx context.Context) error {
toCompact, err = planner.Plan(ctx, cg.metasByMinTime)
return err
})
if err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "plan compaction")
}
Expand Down Expand Up @@ -1008,12 +1018,20 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
uniqueSources[s] = struct{}{}
}

if err := block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir); err != nil {
tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error {
err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir)
return err
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "download block %s", meta.ULID))
}

// Ensure all input blocks are valid.
stats, err := block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
var stats block.HealthStats
tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) error {
stats, err = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
return err
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "gather index issues for block %s", bdir)
}
Expand All @@ -1039,7 +1057,10 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
level.Info(cg.logger).Log("msg", "downloaded and verified blocks; compacting blocks", "plan", fmt.Sprintf("%v", toCompactDirs), "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())

begin = time.Now()
compID, err = comp.Compact(dir, toCompactDirs, nil)
tracing.DoInSpanWithErr(ctx, "compaction", func(ctx context.Context) error {
compID, err = comp.Compact(dir, toCompactDirs, nil)
return err
})
if err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact blocks %v", toCompactDirs))
}
Expand Down Expand Up @@ -1081,7 +1102,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
}

// Ensure the output block is valid.
if err := block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime); !cg.acceptMalformedIndex && err != nil {
tracing.DoInSpanWithErr(ctx, "compaction_verify_index", func(ctx context.Context) error {
err = block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime)
return err
})
if !cg.acceptMalformedIndex && err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir))
}

Expand All @@ -1095,7 +1120,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp

begin = time.Now()

if err := block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc); err != nil {
tracing.DoInSpanWithErr(ctx, "compaction_block_upload", func(ctx context.Context) error {
err = block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc)
return err
})
if err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "upload of %s failed", compID))
}
level.Info(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
Expand All @@ -1104,7 +1133,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
// into the next planning cycle.
// Eventually the block we just uploaded should get synced into the group again (including sync-delay).
for _, meta := range toCompact {
if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil {
tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error {
err = cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String()))
return err
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket"))
}
cg.groupGarbageCollectedBlocks.Inc()
Expand Down
13 changes: 13 additions & 0 deletions pkg/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
)

const (
Expand Down Expand Up @@ -72,6 +73,18 @@ func StartSpan(ctx context.Context, operationName string, opts ...opentracing.St
return span, opentracing.ContextWithSpan(ctx, span)
}

// DoInSpanWtihErr executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any.
// It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer notification.
// It logs the error inside the new span created, which differentiates it from DoInSpan and DoWithSpan.
func DoInSpanWithErr(ctx context.Context, operationName string, doFn func(context.Context) error, opts ...opentracing.StartSpanOption) {
span, newCtx := StartSpan(ctx, operationName, opts...)
defer span.Finish()
err := doFn(newCtx)
if err != nil {
ext.LogError(span, err)
}
}

// DoInSpan executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any.
// It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer notification.
func DoInSpan(ctx context.Context, operationName string, doFn func(context.Context), opts ...opentracing.StartSpanOption) {
Expand Down

0 comments on commit d08a12a

Please sign in to comment.