Skip to content

Commit

Permalink
Fixed partial delete issues on compactor; Added Upload/Delete tests.
Browse files Browse the repository at this point in the history
Fixes #1331

Problem that we are fixing is explained in the linked issue.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Sep 16, 2019
1 parent cc17395 commit bf568e9
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 44 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,12 @@ require (
github.com/uber/jaeger-lib v2.0.0+incompatible
go.elastic.co/apm v1.5.0
go.elastic.co/apm/module/apmot v1.5.0
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/automaxprocs v1.2.0
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/text v0.3.2
google.golang.org/api v0.8.0
google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55
google.golang.org/grpc v1.22.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/fsnotify.v1 v1.4.7
Expand Down
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,6 @@ github.com/gophercloud/gophercloud v0.3.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEo
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gregjones/httpcache v0.0.0-20170728041850-787624de3eb7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 h1:THDBEeQ9xZ8JEaCLyLQqXMMdRqNr0QAUJTIkQAUtFjg=
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0/go.mod h1:f5nM7jw/oeRSadq3xCzHAvxcr8HZnzsqU6ILg/0NiiE=
github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20181025070259-68e3a13e4117 h1:v9uUYPE4RHQHA0C9XfpAX9uzWQvgIDYjPh6m/mQgrzs=
Expand Down Expand Up @@ -440,6 +438,7 @@ github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoH
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
Expand Down
50 changes: 39 additions & 11 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"os"
"path"
"path/filepath"
"strings"

"github.com/go-kit/kit/log/level"

"github.com/thanos-io/thanos/pkg/block/metadata"

Expand Down Expand Up @@ -62,7 +65,7 @@ func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id
func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string) error {
df, err := os.Stat(bdir)
if err != nil {
return errors.Wrap(err, "stat bdir")
return err
}
if !df.IsDir() {
return errors.Errorf("%s is not a directory", bdir)
Expand All @@ -89,41 +92,66 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
}

if err := objstore.UploadDir(ctx, logger, bkt, path.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname)); err != nil {
return cleanUp(bkt, id, errors.Wrap(err, "upload chunks"))
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload chunks"))
}

if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, IndexFilename), path.Join(id.String(), IndexFilename)); err != nil {
return cleanUp(bkt, id, errors.Wrap(err, "upload index"))
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload index"))
}

if meta.Thanos.Source == metadata.CompactorSource {
if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, IndexCacheFilename), path.Join(id.String(), IndexCacheFilename)); err != nil {
return cleanUp(bkt, id, errors.Wrap(err, "upload index cache"))
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload index cache"))
}
}

// Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file
// to be pending uploads.
if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, MetaFilename), path.Join(id.String(), MetaFilename)); err != nil {
return cleanUp(bkt, id, errors.Wrap(err, "upload meta file"))
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload meta file"))
}

return nil
}

func cleanUp(bkt objstore.Bucket, id ulid.ULID, err error) error {
func cleanUp(logger log.Logger, bkt objstore.Bucket, id ulid.ULID, err error) error {
// Cleanup the dir with an uncancelable context.
cleanErr := Delete(context.Background(), bkt, id)
cleanErr := Delete(context.Background(), logger, bkt, id)
if cleanErr != nil {
return errors.Wrapf(err, "failed to clean block after upload issue. Partial block in system. Err: %s", err.Error())
}
return err
}

// Delete removes directory that is mean to be block directory.
// NOTE: Prefer this method instead of objstore.Delete to avoid deleting empty dir (whole bucket) by mistake.
func Delete(ctx context.Context, bucket objstore.Bucket, id ulid.ULID) error {
return objstore.DeleteDir(ctx, bucket, id.String())
// Delete removes directory that is meant to be block directory.
// NOTE: Always prefer this method for deleting blocks.
// * We have to delete block's files in the certain order (meta.json first)
// to ensure we don't end up with malformed partial blocks. Thanos system handles well partial blocks
// only if they don't have meta.json. If meta.json is present Thanos assumes valid block.
// * This avoids deleting empty dir (whole bucket) by mistake.
func Delete(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) error {
if err := bkt.Delete(ctx, path.Join(id.String(), MetaFilename)); err != nil {
return errors.Wrapf(err, "delete %s", path.Join(id.String(), MetaFilename))
}
level.Debug(logger).Log("msg", "deleted file", "file", path.Join(id.String(), MetaFilename), "bucket", bkt.Name())

return deleteDir(ctx, logger, bkt, id.String())
}

// deleteDir removes all objects prefixed with dir from the bucket.
// NOTE: For objects removal use `block.Delete` strictly.
func deleteDir(ctx context.Context, logger log.Logger, bkt objstore.Bucket, dir string) error {
return bkt.Iter(ctx, dir, func(name string) error {
// If we hit a directory, call DeleteDir recursively.
if strings.HasSuffix(name, objstore.DirDelim) {
return deleteDir(ctx, logger, bkt, name)
}
if err := bkt.Delete(ctx, name); err != nil {
return err
}
level.Debug(logger).Log("msg", "deleted file", "file", name, "bucket", bkt.Name())
return nil
})
}

// DownloadMeta downloads only meta file from bucket by block ID.
Expand Down
190 changes: 190 additions & 0 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
package block

import (
"context"
"io"
"io/ioutil"
"os"
"path"
"strings"
"testing"
"time"

"github.com/fortytw2/leaktest"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/testutil"

"github.com/oklog/ulid"
)
Expand Down Expand Up @@ -56,3 +70,179 @@ func TestIsBlockDir(t *testing.T) {
})
}
}

func TestUpload(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

ctx := context.Background()

tmpDir, err := ioutil.TempDir("", "test-block-upload")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

bkt := inmem.NewBucket()
b1, err := testutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124)
testutil.Ok(t, err)
testutil.Ok(t, os.MkdirAll(path.Join(tmpDir, "test", b1.String()), os.ModePerm))

{
// Wrong dir.
err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "not-existing"))
testutil.NotOk(t, err)
testutil.Assert(t, strings.HasSuffix(err.Error(), "/not-existing: no such file or directory"), "")
}
{
// Wrong existing dir (not a block).
err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test"))
testutil.NotOk(t, err)
testutil.Equals(t, "not a block dir: ulid: bad data size when unmarshaling", err.Error())
}
{
// Empty block dir
err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()))
testutil.NotOk(t, err)
testutil.Assert(t, strings.HasSuffix(err.Error(), "/meta.json: no such file or directory"), "")
}
testutil.Ok(t, cpy(path.Join(tmpDir, b1.String(), MetaFilename), path.Join(tmpDir, "test", b1.String(), MetaFilename)))
{
// Missing chunks.
err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()))
testutil.NotOk(t, err)
testutil.Assert(t, strings.HasSuffix(err.Error(), "/chunks: no such file or directory"), "")

// Only debug meta.json present.
testutil.Equals(t, 1, len(bkt.Objects()))
}
testutil.Ok(t, os.MkdirAll(path.Join(tmpDir, "test", b1.String(), ChunksDirname), os.ModePerm))
testutil.Ok(t, cpy(path.Join(tmpDir, b1.String(), ChunksDirname, "000001"), path.Join(tmpDir, "test", b1.String(), ChunksDirname, "000001")))
{
// Missing index file.
err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()))
testutil.NotOk(t, err)
testutil.Assert(t, strings.HasSuffix(err.Error(), "/index: no such file or directory"), "")

// Only debug meta.json present.
testutil.Equals(t, 1, len(bkt.Objects()))
}
testutil.Ok(t, cpy(path.Join(tmpDir, b1.String(), IndexFilename), path.Join(tmpDir, "test", b1.String(), IndexFilename)))
testutil.Ok(t, os.Remove(path.Join(tmpDir, "test", b1.String(), MetaFilename)))
{
// Missing meta.json file.
err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()))
testutil.NotOk(t, err)
testutil.Assert(t, strings.HasSuffix(err.Error(), "/meta.json: no such file or directory"), "")

// Only debug meta.json present.
testutil.Equals(t, 1, len(bkt.Objects()))
}
testutil.Ok(t, cpy(path.Join(tmpDir, b1.String(), MetaFilename), path.Join(tmpDir, "test", b1.String(), MetaFilename)))
{
// Full block
testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String())))
testutil.Equals(t, 4, len(bkt.Objects()))
testutil.Equals(t, 3751, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 365, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
}
{
// Test Upload is idempotent.
testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String())))
testutil.Equals(t, 4, len(bkt.Objects()))
testutil.Equals(t, 3751, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 365, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
}
{
// Upload with no external labels should be blocked.
b2, err := testutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, nil, 124)
testutil.Ok(t, err)
err = Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b2.String()))
testutil.NotOk(t, err)
testutil.Equals(t, "empty external labels are not allowed for Thanos block.", err.Error())
testutil.Equals(t, 4, len(bkt.Objects()))
}
}

func cpy(src, dst string) error {
sourceFileStat, err := os.Stat(src)
if err != nil {
return err
}

if !sourceFileStat.Mode().IsRegular() {
return errors.Errorf("%s is not a regular file", src)
}

source, err := os.Open(src)
if err != nil {
return err
}
defer source.Close()

destination, err := os.Create(dst)
if err != nil {
return err
}
defer destination.Close()
_, err = io.Copy(destination, source)
return err
}

func TestDelete(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

ctx := context.Background()

tmpDir, err := ioutil.TempDir("", "test-block-delete")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

bkt := inmem.NewBucket()
{
b1, err := testutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124)
testutil.Ok(t, err)
testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b1.String())))
testutil.Equals(t, 4, len(bkt.Objects()))

// Full delete.
testutil.Ok(t, Delete(ctx, log.NewNopLogger(), bkt, b1))
// Still debug meta entry is expected.
testutil.Equals(t, 1, len(bkt.Objects()))
}
{
b2, err := testutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124)
testutil.Ok(t, err)
testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b2.String())))
testutil.Equals(t, 5, len(bkt.Objects()))

// Remove meta.json and check if delete can delete it.
testutil.Ok(t, bkt.Delete(ctx, path.Join(b2.String(), MetaFilename)))
testutil.Ok(t, Delete(ctx, log.NewNopLogger(), bkt, b2))
// Still 2 debug meta entries are expected.
testutil.Equals(t, 2, len(bkt.Objects()))
}
}
8 changes: 4 additions & 4 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) (remov
return true
}

if err := block.Delete(ctx, c.bkt, id); err != nil {
if err := block.Delete(ctx, c.logger, c.bkt, id); err != nil {
level.Warn(c.logger).Log("msg", "failed to delete malformed block", "block", id, "err", err)
return false
}
Expand Down Expand Up @@ -453,7 +453,7 @@ func (c *Syncer) garbageCollect(ctx context.Context, resolution int64) error {

level.Info(c.logger).Log("msg", "deleting outdated block", "block", id)

err := block.Delete(delCtx, c.bkt, id)
err := block.Delete(delCtx, c.logger, c.bkt, id)
cancel()
if err != nil {
return retry(errors.Wrapf(err, "delete block %s from bucket", id))
Expand Down Expand Up @@ -743,7 +743,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket,
defer cancel()

// TODO(bplotka): Issue with this will introduce overlap that will halt compactor. Automate that (fix duplicate overlaps caused by this).
if err := block.Delete(delCtx, bkt, ie.id); err != nil {
if err := block.Delete(delCtx, logger, bkt, ie.id); err != nil {
return errors.Wrapf(err, "deleting old block %s failed. You need to delete this block manually", ie.id)
}

Expand Down Expand Up @@ -932,7 +932,7 @@ func (cg *Group) deleteBlock(b string) error {
delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
level.Info(cg.logger).Log("msg", "deleting compacted block", "old_block", id)
if err := block.Delete(delCtx, cg.bkt, id); err != nil {
if err := block.Delete(delCtx, cg.logger, cg.bkt, id); err != nil {
return errors.Wrapf(err, "delete block %s from bucket", id)
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/compact/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func ApplyRetentionPolicyByResolution(ctx context.Context, logger log.Logger, bk

maxTime := time.Unix(m.MaxTime/1000, 0)
if time.Now().After(maxTime.Add(retentionDuration)) {
level.Info(logger).Log("msg", "deleting block", "id", id, "maxTime", maxTime.String())
if err := block.Delete(ctx, bkt, id); err != nil {
level.Info(logger).Log("msg", "applying retention: deleting block", "id", id, "maxTime", maxTime.String())
if err := block.Delete(ctx, logger, bkt, id); err != nil {
return errors.Wrap(err, "delete block")
}
}
Expand Down
Loading

0 comments on commit bf568e9

Please sign in to comment.