Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed compactor tests; Moved to full e2e compact test; Cleaned metrics. #1666

Merged
merged 5 commits into from
Oct 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#1656](https://github.com/thanos-io/thanos/pull/1656) Thanos Store now starts metric and status probe HTTP server earlier in its start-up sequence. `/-/healthy` endpoint now starts to respond with success earlier. `/metrics` endpoint starts serving metrics earlier as well. Make sure to point your readiness probes to the `/-/ready` endpoint rather than `/metrics`.

### Changed

- [#1666](https://github.com/thanos-io/thanos/pull/1666) `thanos_compact_group_compactions_total` now counts block compactions, so operations that resulted in a compacted block. The old behaviour
is now exposed by new metric: `thanos_compact_group_compaction_runs_started_total` and `thanos_compact_group_compaction_runs_completed_total` which counts compaction runs overall.

## [v0.8.1](https://github.com/thanos-io/thanos/releases/tag/v0.8.1) - 2019.10.14

### Fixed
Expand Down
8 changes: 4 additions & 4 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,15 +344,15 @@ func runCompact(
}

const (
MetricIndexGenerateName = "thanos_compact_generated_index_total"
MetricIndexGenerateHelp = "Total number of generated indexes."
metricIndexGenerateName = "thanos_compact_generated_index_total"
metricIndexGenerateHelp = "Total number of generated indexes."
)

// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, dir string) error {
genIndex := prometheus.NewCounter(prometheus.CounterOpts{
Name: MetricIndexGenerateName,
Help: MetricIndexGenerateHelp,
Name: metricIndexGenerateName,
Help: metricIndexGenerateHelp,
})
reg.MustRegister(genIndex)

Expand Down
8 changes: 4 additions & 4 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ func downsampleBucket(
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel1); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.GroupKey(*m)).Inc()
metrics.downsampleFailures.WithLabelValues(compact.GroupKey(m.Thanos)).Inc()
return errors.Wrap(err, "downsampling to 5 min")
}
metrics.downsamples.WithLabelValues(compact.GroupKey(*m)).Inc()
metrics.downsamples.WithLabelValues(compact.GroupKey(m.Thanos)).Inc()

case downsample.ResLevel1:
missing := false
Expand All @@ -237,10 +237,10 @@ func downsampleBucket(
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.GroupKey(*m))
metrics.downsampleFailures.WithLabelValues(compact.GroupKey(m.Thanos))
return errors.Wrap(err, "downsampling to 60 min")
}
metrics.downsamples.WithLabelValues(compact.GroupKey(*m))
metrics.downsamples.WithLabelValues(compact.GroupKey(m.Thanos))
}
}
return nil
Expand Down
128 changes: 60 additions & 68 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"

"github.com/thanos-io/thanos/pkg/block/metadata"

"testing"
"time"
Expand All @@ -13,116 +16,105 @@ import (
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestCleanupCompactCacheFolder(t *testing.T) {
ctx, logger, dir, _, bkt, actReg := bootstrap(t)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

sy, err := compact.NewSyncer(logger, actReg, bkt, 0*time.Second, 1, false, nil)
testutil.Ok(t, err)

expReg := prometheus.NewRegistry()
syncExp := prometheus.NewCounter(prometheus.CounterOpts{
Name: compact.MetricSyncMetaName,
Help: compact.MetricSyncMetaHelp,
})
expReg.MustRegister(syncExp)

testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)

comp, err := tsdb.NewLeveledCompactor(ctx, nil, logger, []int64{1}, nil)
testutil.Ok(t, err)

bComp, err := compact.NewBucketCompactor(logger, sy, comp, dir, bkt, 1)
func TestCleanupIndexCacheFolder(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
dir, err := ioutil.TempDir("", "test-compact-cleanup")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

// Even with with a single uploaded block the bucker compactor needs to
// downloads the meta file to plan the compaction groups.
testutil.Ok(t, bComp.Compact(ctx))
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

syncExp.Inc()
bkt := inmem.NewBucket()

testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)
// Upload one compaction lvl = 2 block, one compaction lvl = 1.
// We generate index cache files only for lvl > 1 blocks.
{
id, err := testutil.CreateBlock(
ctx,
dir,
[]labels.Labels{{{Name: "a", Value: "1"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "1"}},
downsample.ResLevel0)
testutil.Ok(t, err)

_, err = os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution")
meta, err := metadata.Read(filepath.Join(dir, id.String()))
testutil.Ok(t, err)

}
meta.Compaction.Level = 2

func TestCleanupIndexCacheFolder(t *testing.T) {
ctx, logger, dir, _, bkt, actReg := bootstrap(t)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()
testutil.Ok(t, metadata.Write(logger, filepath.Join(dir, id.String()), meta))
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String())))
}
{
id, err := testutil.CreateBlock(
ctx,
dir,
[]labels.Labels{{{Name: "a", Value: "1"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "1"}},
downsample.ResLevel0)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String())))
}

reg := prometheus.NewRegistry()
expReg := prometheus.NewRegistry()
genIndexExp := prometheus.NewCounter(prometheus.CounterOpts{
Name: MetricIndexGenerateName,
Help: MetricIndexGenerateHelp,
Name: metricIndexGenerateName,
Help: metricIndexGenerateHelp,
})
expReg.MustRegister(genIndexExp)

testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)

testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, actReg, bkt, dir))
testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, dir))

genIndexExp.Inc()
testutil.GatherAndCompare(t, expReg, actReg, compact.MetricSyncMetaName)

_, err := os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution")
}

func TestCleanupDownsampleCacheFolder(t *testing.T) {
ctx, logger, dir, blckID, bkt, reg := bootstrap(t)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

meta, err := block.DownloadMeta(ctx, logger, bkt, blckID)
testutil.Ok(t, err)

metrics := newDownsampleMetrics(reg)
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta))))
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, dir))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta))))
testutil.GatherAndCompare(t, expReg, reg, metricIndexGenerateName)

_, err = os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution")
}

func bootstrap(t *testing.T) (context.Context, log.Logger, string, ulid.ULID, objstore.Bucket, *prometheus.Registry) {
func TestCleanupDownsampleCacheFolder(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
dir, err := ioutil.TempDir("", "test-compact-cleanup")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

bkt := inmem.NewBucket()
var blckID ulid.ULID

// Create and upload a single block to the bucker.
// The compaction will download the meta block of
// this block to plan the compaction groups.
var id ulid.ULID
{
blckID, err = testutil.CreateBlock(
id, err = testutil.CreateBlock(
ctx,
dir,
[]labels.Labels{
{{Name: "a", Value: "1"}},
},
[]labels.Labels{{{Name: "a", Value: "1"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "1"}},
downsample.ResLevel0)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, blckID.String())))
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String())))
}

return ctx, logger, dir, blckID, bkt, prometheus.NewRegistry()
meta, err := block.DownloadMeta(ctx, logger, bkt, id)
testutil.Ok(t, err)

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos))))
testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, dir))
testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos))))

_, err = os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "index cache dir shouldn't not exist at the end of execution")
}
Loading