diff --git a/CHANGELOG.md b/CHANGELOG.md index eb96112711d..f0074fd75cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,11 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#1656](https://github.com/thanos-io/thanos/pull/1656) Thanos Store now starts metric and status probe HTTP server earlier in its start-up sequence. `/-/healthy` endpoint now starts to respond with success earlier. `/metrics` endpoint starts serving metrics earlier as well. Make sure to point your readiness probes to the `/-/ready` endpoint rather than `/metrics`. +### Changed + +- [#1666](https://github.com/thanos-io/thanos/pull/1666) `thanos_compact_group_compactions_total` now counts block compactions, so operations that resulted in a compacted block. The old behaviour +is now exposed by new metric: `thanos_compact_group_compaction_runs_started_total` and `thanos_compact_group_compaction_runs_completed_total` which counts compaction runs overall. + ## [v0.8.1](https://github.com/thanos-io/thanos/releases/tag/v0.8.1) - 2019.10.14 ### Fixed diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 12f6573a0e2..4bbb68cea2d 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -344,15 +344,15 @@ func runCompact( } const ( - MetricIndexGenerateName = "thanos_compact_generated_index_total" - MetricIndexGenerateHelp = "Total number of generated indexes." + metricIndexGenerateName = "thanos_compact_generated_index_total" + metricIndexGenerateHelp = "Total number of generated indexes." ) // genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage. func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, dir string) error { genIndex := prometheus.NewCounter(prometheus.CounterOpts{ - Name: MetricIndexGenerateName, - Help: MetricIndexGenerateHelp, + Name: metricIndexGenerateName, + Help: metricIndexGenerateHelp, }) reg.MustRegister(genIndex) diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index b6b23971273..ef5f3fbef64 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -214,10 +214,10 @@ func downsampleBucket( continue } if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel1); err != nil { - metrics.downsampleFailures.WithLabelValues(compact.GroupKey(*m)).Inc() + metrics.downsampleFailures.WithLabelValues(compact.GroupKey(m.Thanos)).Inc() return errors.Wrap(err, "downsampling to 5 min") } - metrics.downsamples.WithLabelValues(compact.GroupKey(*m)).Inc() + metrics.downsamples.WithLabelValues(compact.GroupKey(m.Thanos)).Inc() case downsample.ResLevel1: missing := false @@ -237,10 +237,10 @@ func downsampleBucket( continue } if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2); err != nil { - metrics.downsampleFailures.WithLabelValues(compact.GroupKey(*m)) + metrics.downsampleFailures.WithLabelValues(compact.GroupKey(m.Thanos)) return errors.Wrap(err, "downsampling to 60 min") } - metrics.downsamples.WithLabelValues(compact.GroupKey(*m)) + metrics.downsamples.WithLabelValues(compact.GroupKey(m.Thanos)) } } return nil diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go index 138ce525791..5203ce35447 100644 --- a/cmd/thanos/main_test.go +++ b/cmd/thanos/main_test.go @@ -5,6 +5,9 @@ import ( "io/ioutil" "os" "path" + "path/filepath" + + "github.com/thanos-io/thanos/pkg/block/metadata" "testing" "time" @@ -13,116 +16,105 @@ import ( "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/labels" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" - "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/inmem" "github.com/thanos-io/thanos/pkg/testutil" ) -func TestCleanupCompactCacheFolder(t *testing.T) { - ctx, logger, dir, _, bkt, actReg := bootstrap(t) - defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - - sy, err := compact.NewSyncer(logger, actReg, bkt, 0*time.Second, 1, false, nil) - testutil.Ok(t, err) - - expReg := prometheus.NewRegistry() - syncExp := prometheus.NewCounter(prometheus.CounterOpts{ - Name: compact.MetricSyncMetaName, - Help: compact.MetricSyncMetaHelp, - }) - expReg.MustRegister(syncExp) - - testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName) - - comp, err := tsdb.NewLeveledCompactor(ctx, nil, logger, []int64{1}, nil) - testutil.Ok(t, err) - - bComp, err := compact.NewBucketCompactor(logger, sy, comp, dir, bkt, 1) +func TestCleanupIndexCacheFolder(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stderr) + dir, err := ioutil.TempDir("", "test-compact-cleanup") testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - // Even with with a single uploaded block the bucker compactor needs to - // downloads the meta file to plan the compaction groups. - testutil.Ok(t, bComp.Compact(ctx)) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() - syncExp.Inc() + bkt := inmem.NewBucket() - testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName) + // Upload one compaction lvl = 2 block, one compaction lvl = 1. + // We generate index cache files only for lvl > 1 blocks. + { + id, err := testutil.CreateBlock( + ctx, + dir, + []labels.Labels{{{Name: "a", Value: "1"}}}, + 1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check. + labels.Labels{{Name: "e1", Value: "1"}}, + downsample.ResLevel0) + testutil.Ok(t, err) - _, err = os.Stat(dir) - testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution") + meta, err := metadata.Read(filepath.Join(dir, id.String())) + testutil.Ok(t, err) -} + meta.Compaction.Level = 2 -func TestCleanupIndexCacheFolder(t *testing.T) { - ctx, logger, dir, _, bkt, actReg := bootstrap(t) - defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + testutil.Ok(t, metadata.Write(logger, filepath.Join(dir, id.String()), meta)) + testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String()))) + } + { + id, err := testutil.CreateBlock( + ctx, + dir, + []labels.Labels{{{Name: "a", Value: "1"}}}, + 1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check. + labels.Labels{{Name: "e1", Value: "1"}}, + downsample.ResLevel0) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String()))) + } + reg := prometheus.NewRegistry() expReg := prometheus.NewRegistry() genIndexExp := prometheus.NewCounter(prometheus.CounterOpts{ - Name: MetricIndexGenerateName, - Help: MetricIndexGenerateHelp, + Name: metricIndexGenerateName, + Help: metricIndexGenerateHelp, }) expReg.MustRegister(genIndexExp) - testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName) - - testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, actReg, bkt, dir)) + testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, dir)) genIndexExp.Inc() - testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName) - - _, err := os.Stat(dir) - testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution") -} - -func TestCleanupDownsampleCacheFolder(t *testing.T) { - ctx, logger, dir, blckID, bkt, reg := bootstrap(t) - defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - - meta, err := block.DownloadMeta(ctx, logger, bkt, blckID) - testutil.Ok(t, err) - - metrics := newDownsampleMetrics(reg) - testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta)))) - testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, dir)) - testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta)))) + testutil.GatherAndCompare(t, expReg, reg, metricIndexGenerateName) _, err = os.Stat(dir) testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution") } -func bootstrap(t *testing.T) (context.Context, log.Logger, string, ulid.ULID, objstore.Bucket, *prometheus.Registry) { +func TestCleanupDownsampleCacheFolder(t *testing.T) { logger := log.NewLogfmtLogger(os.Stderr) dir, err := ioutil.TempDir("", "test-compact-cleanup") testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() bkt := inmem.NewBucket() - var blckID ulid.ULID - - // Create and upload a single block to the bucker. - // The compaction will download the meta block of - // this block to plan the compaction groups. + var id ulid.ULID { - blckID, err = testutil.CreateBlock( + id, err = testutil.CreateBlock( ctx, dir, - []labels.Labels{ - {{Name: "a", Value: "1"}}, - }, + []labels.Labels{{{Name: "a", Value: "1"}}}, 1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check. labels.Labels{{Name: "e1", Value: "1"}}, downsample.ResLevel0) testutil.Ok(t, err) - testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, blckID.String()))) + testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String()))) } - return ctx, logger, dir, blckID, bkt, prometheus.NewRegistry() + meta, err := block.DownloadMeta(ctx, logger, bkt, id) + testutil.Ok(t, err) + + metrics := newDownsampleMetrics(prometheus.NewRegistry()) + testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos)))) + testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, dir)) + testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos)))) + + _, err = os.Stat(dir) + testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution") } diff --git a/go.mod b/go.mod index c5ddc7f59f7..701362fe4cf 100644 --- a/go.mod +++ b/go.mod @@ -48,7 +48,7 @@ require ( golang.org/x/net v0.0.0-20191014212845-da9a3fd4c582 // indirect golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 golang.org/x/sync v0.0.0-20190423024810-112230192c58 - golang.org/x/sys v0.0.0-20191010194322-b09406accb47 // indirect + golang.org/x/sys v0.0.0-20191010194322-b09406accb47 golang.org/x/text v0.3.2 google.golang.org/api v0.11.0 google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 552780fb100..0a210e98bae 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -69,22 +69,18 @@ type syncerMetrics struct { garbageCollectionFailures prometheus.Counter garbageCollectionDuration prometheus.Histogram compactions *prometheus.CounterVec + compactionRunsStarted *prometheus.CounterVec + compactionRunsCompleted *prometheus.CounterVec compactionFailures *prometheus.CounterVec } -const ( - MetricSyncMetaName = "thanos_compact_sync_meta_total" - MetricSyncMetaHelp = "Total number of sync meta operations." -) - func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { var m syncerMetrics m.syncMetas = prometheus.NewCounter(prometheus.CounterOpts{ - Name: MetricSyncMetaName, - Help: MetricSyncMetaHelp, + Name: "thanos_compact_sync_meta_total", + Help: "Total number of sync meta operations.", }) - m.syncMetaFailures = prometheus.NewCounter(prometheus.CounterOpts{ Name: "thanos_compact_sync_meta_failures_total", Help: "Total number of failed sync meta operations.", @@ -101,7 +97,6 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { Name: "thanos_compact_garbage_collected_blocks_total", Help: "Total number of deleted blocks by compactor.", }) - m.garbageCollections = prometheus.NewCounter(prometheus.CounterOpts{ Name: "thanos_compact_garbage_collection_total", Help: "Total number of garbage collection operations.", @@ -120,7 +115,15 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { m.compactions = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "thanos_compact_group_compactions_total", - Help: "Total number of group compactions attempts.", + Help: "Total number of group compaction attempts that resulted in a new block.", + }, []string{"group"}) + m.compactionRunsStarted = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compact_group_compaction_runs_started_total", + Help: "Total number of group compaction attempts.", + }, []string{"group"}) + m.compactionRunsCompleted = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compact_group_compaction_runs_completed_total", + Help: "Total number of group completed compaction runs. This also includes compactor group runs that resulted with no compaction.", }, []string{"group"}) m.compactionFailures = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "thanos_compact_group_compactions_failures_total", @@ -137,6 +140,8 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { m.garbageCollectionFailures, m.garbageCollectionDuration, m.compactions, + m.compactionRunsStarted, + m.compactionRunsCompleted, m.compactionFailures, ) } @@ -347,12 +352,12 @@ func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) (remov // GroupKey returns a unique identifier for the group the block belongs to. It considers // the downsampling resolution and the block's labels. -func GroupKey(meta metadata.Meta) string { - return groupKey(meta.Thanos.Downsample.Resolution, labels.FromMap(meta.Thanos.Labels)) +func GroupKey(meta metadata.Thanos) string { + return groupKey(meta.Downsample.Resolution, labels.FromMap(meta.Labels)) } func groupKey(res int64, lbls labels.Labels) string { - return fmt.Sprintf("%d@%s", res, lbls) + return fmt.Sprintf("%d@%v", res, lbls.Hash()) } // Groups returns the compaction groups for all blocks currently known to the syncer. @@ -363,22 +368,24 @@ func (c *Syncer) Groups() (res []*Group, err error) { groups := map[string]*Group{} for _, m := range c.blocks { - g, ok := groups[GroupKey(*m)] + g, ok := groups[GroupKey(m.Thanos)] if !ok { g, err = newGroup( - log.With(c.logger, "compactionGroup", GroupKey(*m)), + log.With(c.logger, "compactionGroup", GroupKey(m.Thanos)), c.bkt, labels.FromMap(m.Thanos.Labels), m.Thanos.Downsample.Resolution, c.acceptMalformedIndex, - c.metrics.compactions.WithLabelValues(GroupKey(*m)), - c.metrics.compactionFailures.WithLabelValues(GroupKey(*m)), + c.metrics.compactions.WithLabelValues(GroupKey(m.Thanos)), + c.metrics.compactionRunsStarted.WithLabelValues(GroupKey(m.Thanos)), + c.metrics.compactionRunsCompleted.WithLabelValues(GroupKey(m.Thanos)), + c.metrics.compactionFailures.WithLabelValues(GroupKey(m.Thanos)), c.metrics.garbageCollectedBlocks, ) if err != nil { return nil, errors.Wrap(err, "create compaction group") } - groups[GroupKey(*m)] = g + groups[GroupKey(m.Thanos)] = g res = append(res, g) } if err := g.Add(m); err != nil { @@ -518,6 +525,8 @@ type Group struct { blocks map[ulid.ULID]*metadata.Meta acceptMalformedIndex bool compactions prometheus.Counter + compactionRunsStarted prometheus.Counter + compactionRunsCompleted prometheus.Counter compactionFailures prometheus.Counter groupGarbageCollectedBlocks prometheus.Counter } @@ -530,6 +539,8 @@ func newGroup( resolution int64, acceptMalformedIndex bool, compactions prometheus.Counter, + compactionRunsStarted prometheus.Counter, + compactionRunsCompleted prometheus.Counter, compactionFailures prometheus.Counter, groupGarbageCollectedBlocks prometheus.Counter, ) (*Group, error) { @@ -544,6 +555,8 @@ func newGroup( blocks: map[ulid.ULID]*metadata.Meta{}, acceptMalformedIndex: acceptMalformedIndex, compactions: compactions, + compactionRunsStarted: compactionRunsStarted, + compactionRunsCompleted: compactionRunsCompleted, compactionFailures: compactionFailures, groupGarbageCollectedBlocks: groupGarbageCollectedBlocks, } @@ -597,8 +610,16 @@ func (cg *Group) Resolution() int64 { // Compact plans and runs a single compaction against the group. The compacted result // is uploaded into the bucket the blocks were retrieved from. func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (bool, ulid.ULID, error) { + cg.compactionRunsStarted.Inc() + subDir := filepath.Join(dir, cg.Key()) + defer func() { + if err := os.RemoveAll(subDir); err != nil { + level.Error(cg.logger).Log("msg", "failed to remove compaction group work directory", "path", subDir, "err", err) + } + }() + if err := os.RemoveAll(subDir); err != nil { return false, ulid.ULID{}, errors.Wrap(err, "clean compaction group dir") } @@ -609,9 +630,10 @@ func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) ( shouldRerun, compID, err := cg.compact(ctx, subDir, comp) if err != nil { cg.compactionFailures.Inc() + return false, ulid.ULID{}, err } - cg.compactions.Inc() - return shouldRerun, compID, err + cg.compactionRunsCompleted.Inc() + return shouldRerun, compID, nil } // Issue347Error is a type wrapper for errors that should invoke repair process for broken block. @@ -893,8 +915,8 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( return false, ulid.ULID{}, errors.Wrapf(err, "read meta from %s", pdir) } - if cg.Key() != GroupKey(*meta) { - return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact planned compaction for mixed groups. group: %s, planned block's group: %s", cg.Key(), GroupKey(*meta))) + if cg.Key() != GroupKey(meta.Thanos) { + return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact planned compaction for mixed groups. group: %s, planned block's group: %s", cg.Key(), GroupKey(meta.Thanos))) } for _, s := range meta.Compaction.Sources { @@ -963,6 +985,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( // Even though this block was empty, there may be more work to do. return true, ulid.ULID{}, nil } + cg.compactions.Inc() level.Debug(cg.logger).Log("msg", "compacted blocks", "blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin)) @@ -1073,9 +1096,10 @@ func NewBucketCompactor( func (c *BucketCompactor) Compact(ctx context.Context) error { defer func() { if err := os.RemoveAll(c.compactDir); err != nil { - level.Error(c.logger).Log("msg", "failed to remove compaction cache directory", "path", c.compactDir, "err", err) + level.Error(c.logger).Log("msg", "failed to remove compaction work directory", "path", c.compactDir, "err", err) } }() + // Loop over bucket and compact until there's no work left. for { var ( @@ -1132,6 +1156,8 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { level.Info(c.logger).Log("msg", "start of GC") + // Blocks that were compacted are garbage collected after each Compaction. + // However if compactor crashes we need to resolve those on startup. if err := c.sy.GarbageCollect(ctx); err != nil { return errors.Wrap(err, "garbage") } diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index ae363a5c349..68887774110 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -17,6 +17,8 @@ import ( "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/index" @@ -164,148 +166,282 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { groups, err := sy.Groups() testutil.Ok(t, err) - testutil.Equals(t, "0@{}", groups[0].Key()) + testutil.Equals(t, "0@17241709254077376921", groups[0].Key()) testutil.Equals(t, []ulid.ULID{metas[9].ULID, m3.ULID}, groups[0].IDs()) - testutil.Equals(t, "1000@{}", groups[1].Key()) + testutil.Equals(t, "1000@17241709254077376921", groups[1].Key()) testutil.Equals(t, []ulid.ULID{m4.ULID}, groups[1].IDs()) }) } -func TestGroup_Compact_e2e(t *testing.T) { - objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { - prepareDir, err := ioutil.TempDir("", "test-compact-prepare") - testutil.Ok(t, err) - defer func() { testutil.Ok(t, os.RemoveAll(prepareDir)) }() - - ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) - defer cancel() - - var metas []*metadata.Meta - extLset := labels.Labels{{Name: "e1", Value: "1"}} - b1, err := testutil.CreateBlock(ctx, prepareDir, []labels.Labels{ - {{Name: "a", Value: "1"}}, - {{Name: "a", Value: "2"}, {Name: "a", Value: "2"}}, - {{Name: "a", Value: "3"}}, - {{Name: "a", Value: "4"}}, - }, 100, 0, 1000, extLset, 124) - testutil.Ok(t, err) - - meta, err := metadata.Read(filepath.Join(prepareDir, b1.String())) - testutil.Ok(t, err) - metas = append(metas, meta) - - b3, err := testutil.CreateBlock(ctx, prepareDir, []labels.Labels{ - {{Name: "a", Value: "3"}}, - {{Name: "a", Value: "4"}}, - {{Name: "a", Value: "5"}}, - {{Name: "a", Value: "6"}}, - }, 100, 2001, 3000, extLset, 124) - testutil.Ok(t, err) - - // Mix order to make sure compact is able to deduct min time / max time. - meta, err = metadata.Read(filepath.Join(prepareDir, b3.String())) - testutil.Ok(t, err) - metas = append(metas, meta) - - // Currently TSDB does not produces empty blocks (see: https://github.com/prometheus/tsdb/pull/374). However before v2.7.0 it was - // so we still want to mimick this case as close as possible. - b2, err := createEmptyBlock(prepareDir, 1001, 2000, extLset, 124) - testutil.Ok(t, err) - - // blocks" count=3 mint=0 maxt=3000 ulid=01D1RQCRRJM77KQQ4GYDSC50GM sources="[01D1RQCRMNZBVHBPGRPG2M3NZQ 01D1RQCRPJMYN45T65YA1PRWB7 01D1RQCRNMTWJKTN5QQXFNKKH8]". +func MetricCount(c prometheus.Collector) int { + var ( + mCount int + mChan = make(chan prometheus.Metric) + done = make(chan struct{}) + ) - meta, err = metadata.Read(filepath.Join(prepareDir, b2.String())) - testutil.Ok(t, err) - metas = append(metas, meta) + go func() { + for range mChan { + mCount++ + } + close(done) + }() - // Due to TSDB compaction delay (not compacting fresh block), we need one more block to be pushed to trigger compaction. - freshB, err := testutil.CreateBlock(ctx, prepareDir, []labels.Labels{ - {{Name: "a", Value: "2"}}, - {{Name: "a", Value: "3"}}, - {{Name: "a", Value: "4"}}, - {{Name: "a", Value: "5"}}, - }, 100, 3001, 4000, extLset, 124) - testutil.Ok(t, err) + c.Collect(mChan) + close(mChan) + <-done - meta, err = metadata.Read(filepath.Join(prepareDir, freshB.String())) - testutil.Ok(t, err) - metas = append(metas, meta) + return mCount +} - // Upload and forget about tmp dir with all blocks. We want to ensure same state we will have on compactor. - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, b1.String()))) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, b2.String()))) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, b3.String()))) - testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, freshB.String()))) +func TestGroup_Compact_e2e(t *testing.T) { + objtesting.ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() // Create fresh, empty directory for actual test. dir, err := ioutil.TempDir("", "test-compact") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - metrics := newSyncerMetrics(nil) - g, err := newGroup( - nil, - bkt, - extLset, - 124, - false, - metrics.compactions.WithLabelValues(""), - metrics.compactionFailures.WithLabelValues(""), - metrics.garbageCollectedBlocks, - ) - testutil.Ok(t, err) + logger := log.NewLogfmtLogger(os.Stderr) - comp, err := tsdb.NewLeveledCompactor(ctx, nil, log.NewLogfmtLogger(os.Stderr), []int64{1000, 3000}, nil) - testutil.Ok(t, err) + reg := prometheus.NewRegistry() - shouldRerun, id, err := g.Compact(ctx, dir, comp) + sy, err := NewSyncer(logger, reg, bkt, 0*time.Second, 5, false, nil) testutil.Ok(t, err) - testutil.Assert(t, !shouldRerun, "group should be empty, but compactor did a compaction and told us to rerun") - - // Add all metas that would be gathered by syncMetas. - for _, m := range metas { - testutil.Ok(t, g.Add(m)) - } - shouldRerun, id, err = g.Compact(ctx, dir, comp) + comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil) testutil.Ok(t, err) - testutil.Assert(t, shouldRerun, "there should be compactible data, but the compactor reported there was not") - - resDir := filepath.Join(dir, id.String()) - testutil.Ok(t, block.Download(ctx, log.NewNopLogger(), bkt, id, resDir)) - meta, err = metadata.Read(resDir) + bComp, err := NewBucketCompactor(logger, sy, comp, dir, bkt, 2) testutil.Ok(t, err) - testutil.Equals(t, int64(0), meta.MinTime) - testutil.Equals(t, int64(3000), meta.MaxTime) - testutil.Equals(t, uint64(6), meta.Stats.NumSeries) - testutil.Equals(t, uint64(2*4*100), meta.Stats.NumSamples) // Only 2 times 4*100 because one block was empty. - testutil.Equals(t, 2, meta.Compaction.Level) - testutil.Equals(t, []ulid.ULID{b1, b3, b2}, meta.Compaction.Sources) - - // Check thanos meta. - testutil.Assert(t, extLset.Equals(labels.FromMap(meta.Thanos.Labels)), "ext labels does not match") - testutil.Equals(t, int64(124), meta.Thanos.Downsample.Resolution) + // Compaction on empty should not fail. + testutil.Ok(t, bComp.Compact(ctx)) + testutil.Equals(t, 1.0, promtest.ToFloat64(sy.metrics.syncMetas)) + testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.syncMetaFailures)) + testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectedBlocks)) + testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures)) + testutil.Equals(t, 0, MetricCount(sy.metrics.compactions)) + testutil.Equals(t, 0, MetricCount(sy.metrics.compactionRunsStarted)) + testutil.Equals(t, 0, MetricCount(sy.metrics.compactionRunsCompleted)) + testutil.Equals(t, 0, MetricCount(sy.metrics.compactionFailures)) + + _, err = os.Stat(dir) + testutil.Assert(t, os.IsNotExist(err), "dir %s should be remove after compaction.", dir) + + // Test label name with slash, regression: https://github.com/thanos-io/thanos/issues/1661. + extLabels := labels.Labels{{Name: "e1", Value: "1/weird"}} + extLabels2 := labels.Labels{{Name: "e1", Value: "1"}} + metas := createAndUpload(t, bkt, []blockgenSpec{ + { + numSamples: 100, mint: 0, maxt: 1000, extLset: extLabels, res: 124, + series: []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}, {Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + }, + }, + { + numSamples: 100, mint: 2000, maxt: 3000, extLset: extLabels, res: 124, + series: []labels.Labels{ + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + {{Name: "a", Value: "5"}}, + {{Name: "a", Value: "6"}}, + }, + }, + // Mix order to make sure compact is able to deduct min time / max time. + // Currently TSDB does not produces empty blocks (see: https://github.com/prometheus/tsdb/pull/374). However before v2.7.0 it was + // so we still want to mimick this case as close as possible. + { + mint: 1000, maxt: 2000, extLset: extLabels, res: 124, + // Empty block. + }, + // Due to TSDB compaction delay (not compacting fresh block), we need one more block to be pushed to trigger compaction. + { + numSamples: 100, mint: 3000, maxt: 4000, extLset: extLabels, res: 124, + series: []labels.Labels{ + {{Name: "a", Value: "7"}}, + }, + }, + // Extra block for "distraction" for different resolution and one for different labels. + { + numSamples: 100, mint: 5000, maxt: 6000, extLset: labels.Labels{{Name: "e1", Value: "2"}}, res: 124, + series: []labels.Labels{ + {{Name: "a", Value: "7"}}, + }, + }, + // Extra block for "distraction" for different resolution and one for different labels. + { + numSamples: 100, mint: 4000, maxt: 5000, extLset: extLabels, res: 0, + series: []labels.Labels{ + {{Name: "a", Value: "7"}}, + }, + }, + // Second group (extLabels2). + { + numSamples: 100, mint: 2000, maxt: 3000, extLset: extLabels2, res: 124, + series: []labels.Labels{ + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + {{Name: "a", Value: "6"}}, + }, + }, + { + numSamples: 100, mint: 0, maxt: 1000, extLset: extLabels2, res: 124, + series: []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}, {Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + }, + }, + // Due to TSDB compaction delay (not compacting fresh block), we need one more block to be pushed to trigger compaction. + { + numSamples: 100, mint: 3000, maxt: 4000, extLset: extLabels2, res: 124, + series: []labels.Labels{ + {{Name: "a", Value: "7"}}, + }, + }, + }) - // Check object storage. All blocks that were included in new compacted one should be removed. - err = bkt.Iter(ctx, "", func(n string) error { + testutil.Ok(t, bComp.Compact(ctx)) + testutil.Equals(t, 3.0, promtest.ToFloat64(sy.metrics.syncMetas)) + testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.syncMetaFailures)) + testutil.Equals(t, 5.0, promtest.ToFloat64(sy.metrics.garbageCollectedBlocks)) + testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures)) + testutil.Equals(t, 4, MetricCount(sy.metrics.compactions)) + testutil.Equals(t, 1.0, promtest.ToFloat64(sy.metrics.compactions.WithLabelValues(GroupKey(metas[0].Thanos)))) + testutil.Equals(t, 1.0, promtest.ToFloat64(sy.metrics.compactions.WithLabelValues(GroupKey(metas[7].Thanos)))) + testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.compactions.WithLabelValues(GroupKey(metas[4].Thanos)))) + testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.compactions.WithLabelValues(GroupKey(metas[5].Thanos)))) + testutil.Equals(t, 4, MetricCount(sy.metrics.compactionRunsStarted)) + testutil.Equals(t, 2.0, promtest.ToFloat64(sy.metrics.compactionRunsStarted.WithLabelValues(GroupKey(metas[0].Thanos)))) + testutil.Equals(t, 2.0, promtest.ToFloat64(sy.metrics.compactionRunsStarted.WithLabelValues(GroupKey(metas[7].Thanos)))) + // TODO(bwplotka): Looks like we do some unnecessary loops. Not a major problem but investigate. + testutil.Equals(t, 2.0, promtest.ToFloat64(sy.metrics.compactionRunsStarted.WithLabelValues(GroupKey(metas[4].Thanos)))) + testutil.Equals(t, 2.0, promtest.ToFloat64(sy.metrics.compactionRunsStarted.WithLabelValues(GroupKey(metas[5].Thanos)))) + testutil.Equals(t, 4, MetricCount(sy.metrics.compactionRunsCompleted)) + testutil.Equals(t, 2.0, promtest.ToFloat64(sy.metrics.compactionRunsCompleted.WithLabelValues(GroupKey(metas[0].Thanos)))) + testutil.Equals(t, 2.0, promtest.ToFloat64(sy.metrics.compactionRunsCompleted.WithLabelValues(GroupKey(metas[7].Thanos)))) + // TODO(bwplotka): Looks like we do some unnecessary loops. Not a major problem but investigate. + testutil.Equals(t, 2.0, promtest.ToFloat64(sy.metrics.compactionRunsCompleted.WithLabelValues(GroupKey(metas[4].Thanos)))) + testutil.Equals(t, 2.0, promtest.ToFloat64(sy.metrics.compactionRunsCompleted.WithLabelValues(GroupKey(metas[5].Thanos)))) + testutil.Equals(t, 4, MetricCount(sy.metrics.compactionFailures)) + testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.compactionFailures.WithLabelValues(GroupKey(metas[0].Thanos)))) + testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.compactionFailures.WithLabelValues(GroupKey(metas[7].Thanos)))) + testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.compactionFailures.WithLabelValues(GroupKey(metas[4].Thanos)))) + testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.compactionFailures.WithLabelValues(GroupKey(metas[5].Thanos)))) + + _, err = os.Stat(dir) + testutil.Assert(t, os.IsNotExist(err), "dir %s should be remove after compaction.", dir) + + // Check object storage. All blocks that were included in new compacted one should be removed. New compacted ones + // are present and looks as expected. + nonCompactedExpected := map[ulid.ULID]bool{ + metas[3].ULID: false, + metas[4].ULID: false, + metas[5].ULID: false, + metas[8].ULID: false, + } + others := map[string]metadata.Meta{} + testutil.Ok(t, bkt.Iter(ctx, "", func(n string) error { id, ok := block.IsBlockDir(n) if !ok { return nil } - for _, source := range meta.Compaction.Sources { - if id.Compare(source) == 0 { - return errors.Errorf("Unexpectedly found %s block in bucket", source.String()) - } + if _, ok := nonCompactedExpected[id]; ok { + nonCompactedExpected[id] = true + return nil + } + + meta, err := block.DownloadMeta(ctx, logger, bkt, id) + if err != nil { + return err } + + others[GroupKey(meta.Thanos)] = meta return nil - }) - testutil.Ok(t, err) + })) + + for id, found := range nonCompactedExpected { + testutil.Assert(t, found, "not found expected block %s", id.String()) + } + + // We expect two compacted blocks only outside of what we expected in `nonCompactedExpected`. + testutil.Equals(t, 2, len(others)) + { + meta, ok := others[groupKey(124, extLabels)] + testutil.Assert(t, ok, "meta not found") + + testutil.Equals(t, int64(0), meta.MinTime) + testutil.Equals(t, int64(3000), meta.MaxTime) + testutil.Equals(t, uint64(6), meta.Stats.NumSeries) + testutil.Equals(t, uint64(2*4*100), meta.Stats.NumSamples) // Only 2 times 4*100 because one block was empty. + testutil.Equals(t, 2, meta.Compaction.Level) + testutil.Equals(t, []ulid.ULID{metas[0].ULID, metas[1].ULID, metas[2].ULID}, meta.Compaction.Sources) + + // Check thanos meta. + testutil.Assert(t, extLabels.Equals(labels.FromMap(meta.Thanos.Labels)), "ext labels does not match") + testutil.Equals(t, int64(124), meta.Thanos.Downsample.Resolution) + } + { + meta, ok := others[groupKey(124, extLabels2)] + testutil.Assert(t, ok, "meta not found") + + testutil.Equals(t, int64(0), meta.MinTime) + testutil.Equals(t, int64(3000), meta.MaxTime) + testutil.Equals(t, uint64(5), meta.Stats.NumSeries) + testutil.Equals(t, uint64(2*4*100-100), meta.Stats.NumSamples) + testutil.Equals(t, 2, meta.Compaction.Level) + testutil.Equals(t, []ulid.ULID{metas[6].ULID, metas[7].ULID}, meta.Compaction.Sources) + + // Check thanos meta. + testutil.Assert(t, extLabels2.Equals(labels.FromMap(meta.Thanos.Labels)), "ext labels does not match") + testutil.Equals(t, int64(124), meta.Thanos.Downsample.Resolution) + } }) } +type blockgenSpec struct { + mint, maxt int64 + series []labels.Labels + numSamples int + extLset labels.Labels + res int64 +} + +func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) (metas []*metadata.Meta) { + prepareDir, err := ioutil.TempDir("", "test-compact-prepare") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(prepareDir)) }() + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + for _, b := range blocks { + var id ulid.ULID + var err error + if b.numSamples == 0 { + id, err = createEmptyBlock(prepareDir, b.mint, b.maxt, b.extLset, b.res) + } else { + id, err = testutil.CreateBlock(ctx, prepareDir, b.series, b.numSamples, b.mint, b.maxt, b.extLset, b.res) + } + testutil.Ok(t, err) + + meta, err := metadata.Read(filepath.Join(prepareDir, id.String())) + testutil.Ok(t, err) + metas = append(metas, meta) + + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(prepareDir, id.String()))) + } + return metas +} + // createEmptyBlock produces empty block like it was the case before fix: https://github.com/prometheus/tsdb/pull/374. // (Prometheus pre v2.7.0). func createEmptyBlock(dir string, mint int64, maxt int64, extLset labels.Labels, resolution int64) (ulid.ULID, error) { diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index 9787c7d077f..83d0bf60a45 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/relabel" @@ -123,3 +125,42 @@ func TestHeuristicCheck(t *testing.T) { err = spaceHeuristicCheck(m, 2999) testutil.NotOk(t, err) } + +func TestGroupKey(t *testing.T) { + for _, tcase := range []struct { + input metadata.Thanos + expected string + }{ + { + input: metadata.Thanos{}, + expected: "0@17241709254077376921", + }, + { + input: metadata.Thanos{ + Labels: map[string]string{}, + Downsample: metadata.ThanosDownsample{Resolution: 0}, + }, + expected: "0@17241709254077376921", + }, + { + input: metadata.Thanos{ + Labels: map[string]string{"foo": "bar", "foo1": "bar2"}, + Downsample: metadata.ThanosDownsample{Resolution: 0}, + }, + expected: "0@2124638872457683483", + }, + { + input: metadata.Thanos{ + Labels: map[string]string{`foo/some..thing/some.thing/../`: `a_b_c/bar-something-a\metric/a\x`}, + Downsample: metadata.ThanosDownsample{Resolution: 0}, + }, + expected: "0@16590761456214576373", + }, + } { + if ok := t.Run("", func(t *testing.T) { + testutil.Equals(t, tcase.expected, GroupKey(tcase.input)) + }); !ok { + return + } + } +} diff --git a/pkg/objstore/inmem/inmem.go b/pkg/objstore/inmem/inmem.go index 589e520dcd5..edddd97e081 100644 --- a/pkg/objstore/inmem/inmem.go +++ b/pkg/objstore/inmem/inmem.go @@ -4,6 +4,7 @@ import ( "context" "io" "sort" + "sync" "bytes" "io/ioutil" @@ -16,7 +17,9 @@ import ( var errNotFound = errors.New("inmem: object not found") // Bucket implements the store.Bucket and shipper.Bucket interfaces against local memory. +// methods from Bucket interface are thread-safe. Object are assumed to be immutable. type Bucket struct { + mtx sync.RWMutex objects map[string][]byte } @@ -45,6 +48,8 @@ func (b *Bucket) Iter(_ context.Context, dir string, f func(string) error) error } dirPartsCount++ } + + b.mtx.RLock() for filename := range b.objects { if !strings.HasPrefix(filename, dir) || dir == filename { continue @@ -53,6 +58,7 @@ func (b *Bucket) Iter(_ context.Context, dir string, f func(string) error) error parts := strings.SplitAfter(filename, objstore.DirDelim) unique[strings.Join(parts[:dirPartsCount+1], "")] = struct{}{} } + b.mtx.RUnlock() var keys []string for n := range unique { @@ -86,7 +92,9 @@ func (b *Bucket) Get(_ context.Context, name string) (io.ReadCloser, error) { return nil, errors.New("inmem: object name is empty") } + b.mtx.RLock() file, ok := b.objects[name] + b.mtx.RUnlock() if !ok { return nil, errNotFound } @@ -100,7 +108,9 @@ func (b *Bucket) GetRange(_ context.Context, name string, off, length int64) (io return nil, errors.New("inmem: object name is empty") } + b.mtx.RLock() file, ok := b.objects[name] + b.mtx.RUnlock() if !ok { return nil, errNotFound } @@ -119,12 +129,16 @@ func (b *Bucket) GetRange(_ context.Context, name string, off, length int64) (io // Exists checks if the given directory exists in memory. func (b *Bucket) Exists(_ context.Context, name string) (bool, error) { + b.mtx.RLock() + defer b.mtx.RUnlock() _, ok := b.objects[name] return ok, nil } // Upload writes the file specified in src to into the memory. func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error { + b.mtx.Lock() + defer b.mtx.Unlock() body, err := ioutil.ReadAll(r) if err != nil { return err @@ -135,6 +149,8 @@ func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error { // Delete removes all data prefixed with the dir. func (b *Bucket) Delete(_ context.Context, name string) error { + b.mtx.Lock() + defer b.mtx.Unlock() if _, ok := b.objects[name]; !ok { return errNotFound } diff --git a/pkg/verifier/overlapped_blocks.go b/pkg/verifier/overlapped_blocks.go index 808c6f7b108..0e7be08e644 100644 --- a/pkg/verifier/overlapped_blocks.go +++ b/pkg/verifier/overlapped_blocks.go @@ -58,7 +58,7 @@ func fetchOverlaps(ctx context.Context, logger log.Logger, bkt objstore.Bucket) return err } - metas[compact.GroupKey(m)] = append(metas[compact.GroupKey(m)], m.BlockMeta) + metas[compact.GroupKey(m.Thanos)] = append(metas[compact.GroupKey(m.Thanos)], m.BlockMeta) return nil }) if err != nil {