Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compaction: remove all cache dirs at the end of each run #1587

Merged
merged 14 commits into from
Oct 10, 2019
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#1525](https://github.com/thanos-io/thanos/pull/1525) Thanos now deletes block's file in correct order allowing to detect partial blocks without problems.
- [#1505](https://github.com/thanos-io/thanos/pull/1505) Thanos store now removes invalid local cache blocks.
- [#1587](https://github.com/thanos-io/thanos/pull/1587) Cleanup all cache dirs after each compaction run.
- [#1582](https://github.com/thanos-io/thanos/pull/1582) Thanos rule correctly parses Alertmanager URL if there is more `+` in it.

## v0.7.0 - 2019.09.02
Expand Down
16 changes: 14 additions & 2 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func runCompact(

// Generate index file.
if generateMissingIndexCacheFiles {
if err := genMissingIndexCacheFiles(ctx, logger, bkt, indexCacheDir); err != nil {
if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, indexCacheDir); err != nil {
return err
}
}
Expand Down Expand Up @@ -343,8 +343,19 @@ func runCompact(
return nil
}

const (
MetricIndexGenerateName = "thanos_compact_generated_index_total"
MetricIndexGenerateHelp = "Total number of generated indexes."
)

// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, bkt objstore.Bucket, dir string) error {
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, dir string) error {
genIndex := prometheus.NewCounter(prometheus.CounterOpts{
Name: MetricIndexGenerateName,
Help: MetricIndexGenerateHelp,
})
reg.MustRegister(genIndex)

if err := os.RemoveAll(dir); err != nil {
return errors.Wrap(err, "clean index cache directory")
}
Expand Down Expand Up @@ -395,6 +406,7 @@ func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, bkt objst
if err := generateIndexCacheFile(ctx, bkt, logger, dir, meta); err != nil {
return err
}
genIndex.Inc()
}

level.Info(logger).Log("msg", "generating index cache files is done, you can remove startup argument `index.generate-missing-cache-file`")
Expand Down
7 changes: 7 additions & 0 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ func downsampleBucket(
if err := os.MkdirAll(dir, 0777); err != nil {
return errors.Wrap(err, "create dir")
}

defer func() {
if err := os.RemoveAll(dir); err != nil {
level.Error(logger).Log("msg", "failed to remove downsample cache directory", "path", dir, "err", err)
}
}()

var metas []*metadata.Meta

err := bkt.Iter(ctx, "", func(name string) error {
Expand Down
128 changes: 128 additions & 0 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package main

import (
"context"
"io/ioutil"
"os"
"path"

"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestCleanupCompactCacheFolder(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not having those inside package responsible for deleting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because it will require duplicating the bootstrap function.
I thought to put the bootstrap in some reusable package, but don't think it is generic enough to be useful anywhere else.
If it happens that that func becomes useful in other tests would revisit and refactor.

ctx, logger, dir, _, bkt, actReg := bootstrap(t)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

sy, err := compact.NewSyncer(logger, actReg, bkt, 0*time.Second, 1, false, nil)
testutil.Ok(t, err)

expReg := prometheus.NewRegistry()
syncExp := prometheus.NewCounter(prometheus.CounterOpts{
Name: compact.MetricSyncMetaName,
Help: compact.MetricSyncMetaHelp,
})
expReg.MustRegister(syncExp)

testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)

comp, err := tsdb.NewLeveledCompactor(ctx, nil, logger, []int64{1}, nil)
testutil.Ok(t, err)

bComp, err := compact.NewBucketCompactor(logger, sy, comp, dir, bkt, 1)
testutil.Ok(t, err)

// Even with with a single uploaded block the bucker compactor needs to
// downloads the meta file to plan the compaction groups.
testutil.Ok(t, bComp.Compact(ctx))

syncExp.Inc()

testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)

_, err = os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution")

}

func TestCleanupIndexCacheFolder(t *testing.T) {
ctx, logger, dir, _, bkt, actReg := bootstrap(t)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

expReg := prometheus.NewRegistry()
genIndexExp := prometheus.NewCounter(prometheus.CounterOpts{
Name: MetricIndexGenerateName,
Help: MetricIndexGenerateHelp,
})
expReg.MustRegister(genIndexExp)

testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)

testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, actReg, bkt, dir))

genIndexExp.Inc()
testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)

_, err := os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution")
}

func TestCleanupDownsampleCacheFolder(t *testing.T) {
ctx, logger, dir, blckID, bkt, reg := bootstrap(t)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

meta, err := block.DownloadMeta(ctx, logger, bkt, blckID)
testutil.Ok(t, err)

metrics := newDownsampleMetrics(reg)
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta))))
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, dir))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta))))

_, err = os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution")
}

func bootstrap(t *testing.T) (context.Context, log.Logger, string, ulid.ULID, objstore.Bucket, *prometheus.Registry) {
logger := log.NewLogfmtLogger(os.Stderr)
dir, err := ioutil.TempDir("", "test-compact-cleanup")
testutil.Ok(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

bkt := inmem.NewBucket()
var blckID ulid.ULID

// Create and upload a single block to the bucker.
// The compaction will download the meta block of
// this block to plan the compaction groups.
{
blckID, err = testutil.CreateBlock(
ctx,
dir,
[]labels.Labels{
{{Name: "a", Value: "1"}},
},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "1"}},
downsample.ResLevel0)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, blckID.String())))
}

return ctx, logger, dir, blckID, bkt, prometheus.NewRegistry()
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ require (
github.com/opentracing/opentracing-go v1.1.0
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/prometheus/common v0.6.0
github.com/prometheus/prometheus v1.8.2-0.20190913102521-8ab628b35467 // v1.8.2 is misleading as Prometheus does not have v2 module.
github.com/uber-go/atomic v1.4.0 // indirect
Expand Down
15 changes: 13 additions & 2 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,19 @@ type syncerMetrics struct {
compactionFailures *prometheus.CounterVec
}

const (
MetricSyncMetaName = "thanos_compact_sync_meta_total"
MetricSyncMetaHelp = "Total number of sync meta operations."
)

func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {
var m syncerMetrics

m.syncMetas = prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_sync_meta_total",
Help: "Total number of sync meta operations.",
Name: MetricSyncMetaName,
Help: MetricSyncMetaHelp,
})

m.syncMetaFailures = prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_sync_meta_failures_total",
Help: "Total number of failed sync meta operations.",
Expand Down Expand Up @@ -1001,6 +1007,11 @@ func NewBucketCompactor(

// Compact runs compaction over bucket.
func (c *BucketCompactor) Compact(ctx context.Context) error {
defer func() {
if err := os.RemoveAll(c.compactDir); err != nil {
level.Error(c.logger).Log("msg", "failed to remove compaction cache directory", "path", c.compactDir, "err", err)
}
}()
// Loop over bucket and compact until there's no work left.
for {
var (
Expand Down
25 changes: 25 additions & 0 deletions pkg/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
"reflect"
"runtime"
"testing"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

// Assert fails the test if the condition is false.
Expand Down Expand Up @@ -71,3 +74,25 @@ func Equals(tb testing.TB, exp, act interface{}, v ...interface{}) {
tb.FailNow()
}
}

// GatherAndCompare compares the metrics of a Gatherers pair.
func GatherAndCompare(t *testing.T, g1 prometheus.Gatherer, g2 prometheus.Gatherer, filter string) {
g1m, err := g1.Gather()
Ok(t, err)
g2m, err := g2.Gather()
Ok(t, err)

var m1 *dto.MetricFamily
for _, m := range g1m {
if *m.Name == filter {
m1 = m
}
}
var m2 *dto.MetricFamily
for _, m := range g2m {
if *m.Name == filter {
m2 = m
}
}
Equals(t, m1.String(), m2.String())
}