Skip to content

Commit

Permalink
compact: Move downsampling and compact to sub dirs to avoid removal o…
Browse files Browse the repository at this point in the history
…f potentially mounted dir. (#278)

* compact: Move downsampling and compact to sub dirs to avoid removal of potentially mounted dir.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* Moved removal to Compact.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* Added subdir + check

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* Fixed import.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka authored Apr 9, 2018
1 parent 9a1c269 commit acf890e
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
14 changes: 9 additions & 5 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"net"
"net/http"
"os"
"path"
"time"

"cloud.google.com/go/storage"
Expand Down Expand Up @@ -130,6 +130,11 @@ func runCompact(
ctx, cancel := context.WithCancel(context.Background())

f := func() error {
var (
compactDir = path.Join(dataDir, "compact")
downsamplingDir = path.Join(dataDir, "downsample")
)

// Loop over bucket and compact until there's no work left.
for {
if err := sy.SyncMetas(ctx); err != nil {
Expand All @@ -144,9 +149,8 @@ func runCompact(
}
done := true
for _, g := range groups {
os.RemoveAll(dataDir)
// While we do all compactions sequentially we just compact within the top-level dir.
id, err := g.Compact(ctx, dataDir, comp)
id, err := g.Compact(ctx, compactDir, comp)
if err == nil {
// If the returned ID has a zero value, the group had no blocks to be compacted.
// We keep going through the outer loop until no group has any work left.
Expand Down Expand Up @@ -177,13 +181,13 @@ func runCompact(
// for 5m downsamplings created in the first run.
level.Info(logger).Log("msg", "start first pass of downsampling")

if err := downsampleBucket(ctx, logger, bkt, dataDir); err != nil {
if err := downsampleBucket(ctx, logger, bkt, downsamplingDir); err != nil {
return errors.Wrap(err, "first pass of downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")

if err := downsampleBucket(ctx, logger, bkt, dataDir); err != nil {
if err := downsampleBucket(ctx, logger, bkt, downsamplingDir); err != nil {
return errors.Wrap(err, "second pass of downsampling failed")
}
return nil
Expand Down
28 changes: 19 additions & 9 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,22 @@ import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"sort"
"sync"
"time"

"github.com/improbable-eng/thanos/pkg/compact/downsample"

"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/prometheus/tsdb/labels"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/labels"
)

// Syncer syncronizes block metas from a bucket into a local directory.
Expand Down Expand Up @@ -441,10 +440,16 @@ func (cg *Group) Resolution() int64 {
// Compact plans and runs a single compaction against the group. The compacted result
// is uploaded into the bucket the blocks were retrieved from.
func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (ulid.ULID, error) {
if err := os.MkdirAll(dir, 0777); err != nil {
return ulid.ULID{}, errors.Wrap(err, "create compaction dir")
subDir := path.Join(dir, cg.Key())

if err := os.RemoveAll(subDir); err != nil {
return ulid.ULID{}, errors.Wrap(err, "clean compaction group dir")
}
if err := os.MkdirAll(subDir, 0777); err != nil {
return ulid.ULID{}, errors.Wrap(err, "create compaction group dir")
}
id, err := cg.compact(ctx, dir, comp)

id, err := cg.compact(ctx, subDir, comp)
if err != nil {
cg.compactionFailures.Inc()
}
Expand Down Expand Up @@ -508,6 +513,11 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
if err != nil {
return compID, errors.Wrapf(err, "read meta from %s", pdir)
}

if cg.Key() != GroupKey(*meta) {
return compID, halt(errors.Wrapf(err, "compact planned compaction for mixed groups. group: %s, planned block's group: %s", cg.Key(), GroupKey(*meta)))
}

for _, s := range meta.Compaction.Sources {
if _, ok := uniqueSources[s]; ok {
return compID, halt(errors.Errorf("overlapping sources detected for plan %v", plan))
Expand Down

0 comments on commit acf890e

Please sign in to comment.