Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact: Move downsampling and compact to sub dirs to avoid removal of potentially mounted dir. #278

Merged
merged 4 commits into from
Apr 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"net"
"net/http"
"os"
"path"
"time"

"cloud.google.com/go/storage"
Expand Down Expand Up @@ -130,6 +130,11 @@ func runCompact(
ctx, cancel := context.WithCancel(context.Background())

f := func() error {
var (
compactDir = path.Join(dataDir, "compact")
downsamplingDir = path.Join(dataDir, "downsample")
)

// Loop over bucket and compact until there's no work left.
for {
if err := sy.SyncMetas(ctx); err != nil {
Expand All @@ -144,9 +149,8 @@ func runCompact(
}
done := true
for _, g := range groups {
os.RemoveAll(dataDir)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am 100% sure this was erroring 100% time for me ):

// While we do all compactions sequentially we just compact within the top-level dir.
id, err := g.Compact(ctx, dataDir, comp)
id, err := g.Compact(ctx, compactDir, comp)
if err == nil {
// If the returned ID has a zero value, the group had no blocks to be compacted.
// We keep going through the outer loop until no group has any work left.
Expand Down Expand Up @@ -177,13 +181,13 @@ func runCompact(
// for 5m downsamplings created in the first run.
level.Info(logger).Log("msg", "start first pass of downsampling")

if err := downsampleBucket(ctx, logger, bkt, dataDir); err != nil {
if err := downsampleBucket(ctx, logger, bkt, downsamplingDir); err != nil {
return errors.Wrap(err, "first pass of downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")

if err := downsampleBucket(ctx, logger, bkt, dataDir); err != nil {
if err := downsampleBucket(ctx, logger, bkt, downsamplingDir); err != nil {
return errors.Wrap(err, "second pass of downsampling failed")
}
return nil
Expand Down
28 changes: 19 additions & 9 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,22 @@ import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"sort"
"sync"
"time"

"github.com/improbable-eng/thanos/pkg/compact/downsample"

"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/prometheus/tsdb/labels"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/labels"
)

// Syncer syncronizes block metas from a bucket into a local directory.
Expand Down Expand Up @@ -441,10 +440,16 @@ func (cg *Group) Resolution() int64 {
// Compact plans and runs a single compaction against the group. The compacted result
// is uploaded into the bucket the blocks were retrieved from.
func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (ulid.ULID, error) {
if err := os.MkdirAll(dir, 0777); err != nil {
return ulid.ULID{}, errors.Wrap(err, "create compaction dir")
subDir := path.Join(dir, cg.Key())

if err := os.RemoveAll(subDir); err != nil {
return ulid.ULID{}, errors.Wrap(err, "clean compaction group dir")
}
if err := os.MkdirAll(subDir, 0777); err != nil {
return ulid.ULID{}, errors.Wrap(err, "create compaction group dir")
}
id, err := cg.compact(ctx, dir, comp)

id, err := cg.compact(ctx, subDir, comp)
if err != nil {
cg.compactionFailures.Inc()
}
Expand Down Expand Up @@ -508,6 +513,11 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
if err != nil {
return compID, errors.Wrapf(err, "read meta from %s", pdir)
}

if cg.Key() != GroupKey(*meta) {
return compID, halt(errors.Wrapf(err, "compact planned compaction for mixed groups. group: %s, planned block's group: %s", cg.Key(), GroupKey(*meta)))
}

for _, s := range meta.Compaction.Sources {
if _, ok := uniqueSources[s]; ok {
return compID, halt(errors.Errorf("overlapping sources detected for plan %v", plan))
Expand Down