diff --git a/CHANGELOG.md b/CHANGELOG.md index baeb87f1e6b..029f0b1ee3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,14 +13,15 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Fixed --[1505](https://github.com/thanos-io/thanos/pull/1505) Thanos store now removes invalid local cache blocks. +-[#1525](https://github.com/thanos-io/thanos/pull/1525) Thanos now deletes block's file in correct order allowing to detect partial blocks without problems. +-[#1505](https://github.com/thanos-io/thanos/pull/1505) Thanos store now removes invalid local cache blocks. ## v0.7.0 - 2019.09.02 Accepted into CNCF: - Thanos moved to new repository <https://github.com/thanos-io/thanos> - Docker images moved to <https://quay.io/thanos/thanos> and mirrored at <https://hub.docker.com/r/thanosio/thanos> -- Slack moved to <https://slack.cncf.io> `#thanos`/`#thanos-dev` / `#thanos-prs` +- Slack moved to <https://slack.cncf.io> `#thanos`/`#thanos-dev`/`#thanos-prs` ### Added diff --git a/go.mod b/go.mod index 98d4239483b..b5d592a939b 100644 --- a/go.mod +++ b/go.mod @@ -41,13 +41,12 @@ require ( github.com/uber/jaeger-lib v2.0.0+incompatible go.elastic.co/apm v1.5.0 go.elastic.co/apm/module/apmot v1.5.0 - go.uber.org/atomic v1.4.0 // indirect go.uber.org/automaxprocs v1.2.0 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 golang.org/x/sync v0.0.0-20190423024810-112230192c58 golang.org/x/text v0.3.2 google.golang.org/api v0.8.0 - google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64 + google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 google.golang.org/grpc v1.22.1 gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/fsnotify.v1 v1.4.7 diff --git a/go.sum b/go.sum index 6e7ab3b6d49..a76e2b096ca 100644 --- a/go.sum +++ b/go.sum @@ -208,8 +208,6 @@ github.com/gophercloud/gophercloud v0.3.0/go.mod h1:vxM41WHh5uqHVBMZHzuwNOHh8XEo github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gregjones/httpcache v0.0.0-20170728041850-787624de3eb7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= -github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c= -github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 h1:THDBEeQ9xZ8JEaCLyLQqXMMdRqNr0QAUJTIkQAUtFjg= github.com/grpc-ecosystem/go-grpc-middleware v1.1.0/go.mod h1:f5nM7jw/oeRSadq3xCzHAvxcr8HZnzsqU6ILg/0NiiE= github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20181025070259-68e3a13e4117 h1:v9uUYPE4RHQHA0C9XfpAX9uzWQvgIDYjPh6m/mQgrzs= @@ -440,6 +438,7 @@ github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoH github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= diff --git a/pkg/block/block.go b/pkg/block/block.go index 96d64349244..4a0115ad218 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -9,6 +9,9 @@ import ( "os" "path" "path/filepath" + "strings" + + "github.com/go-kit/kit/log/level" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -62,7 +65,7 @@ func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string) error { df, err := os.Stat(bdir) if err != nil { - return errors.Wrap(err, "stat bdir") + return err } if !df.IsDir() { return errors.Errorf("%s is not a directory", bdir) @@ -89,41 +92,73 @@ func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st } if err := objstore.UploadDir(ctx, logger, bkt, path.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname)); err != nil { - return cleanUp(bkt, id, errors.Wrap(err, "upload chunks")) + return cleanUp(logger, bkt, id, errors.Wrap(err, "upload chunks")) } if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, IndexFilename), path.Join(id.String(), IndexFilename)); err != nil { - return cleanUp(bkt, id, errors.Wrap(err, "upload index")) + return cleanUp(logger, bkt, id, errors.Wrap(err, "upload index")) } if meta.Thanos.Source == metadata.CompactorSource { if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, IndexCacheFilename), path.Join(id.String(), IndexCacheFilename)); err != nil { - return cleanUp(bkt, id, errors.Wrap(err, "upload index cache")) + return cleanUp(logger, bkt, id, errors.Wrap(err, "upload index cache")) } } // Meta.json always need to be uploaded as a last item. This will allow to assume block directories without meta file // to be pending uploads. if err := objstore.UploadFile(ctx, logger, bkt, path.Join(bdir, MetaFilename), path.Join(id.String(), MetaFilename)); err != nil { - return cleanUp(bkt, id, errors.Wrap(err, "upload meta file")) + return cleanUp(logger, bkt, id, errors.Wrap(err, "upload meta file")) } return nil } -func cleanUp(bkt objstore.Bucket, id ulid.ULID, err error) error { +func cleanUp(logger log.Logger, bkt objstore.Bucket, id ulid.ULID, err error) error { // Cleanup the dir with an uncancelable context. - cleanErr := Delete(context.Background(), bkt, id) + cleanErr := Delete(context.Background(), logger, bkt, id) if cleanErr != nil { return errors.Wrapf(err, "failed to clean block after upload issue. Partial block in system. Err: %s", err.Error()) } return err } -// Delete removes directory that is mean to be block directory. -// NOTE: Prefer this method instead of objstore.Delete to avoid deleting empty dir (whole bucket) by mistake. -func Delete(ctx context.Context, bucket objstore.Bucket, id ulid.ULID) error { - return objstore.DeleteDir(ctx, bucket, id.String()) +// Delete removes directory that is meant to be block directory. +// NOTE: Always prefer this method for deleting blocks. +// * We have to delete block's files in the certain order (meta.json first) +// to ensure we don't end up with malformed partial blocks. Thanos system handles well partial blocks +// only if they don't have meta.json. If meta.json is present Thanos assumes valid block. +// * This avoids deleting empty dir (whole bucket) by mistake. +func Delete(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) error { + metaFile := path.Join(id.String(), MetaFilename) + ok, err := bkt.Exists(ctx, metaFile) + if err != nil { + return errors.Wrapf(err, "stat %s", metaFile) + } + if ok { + if err := bkt.Delete(ctx, metaFile); err != nil { + return errors.Wrapf(err, "delete %s", metaFile) + } + level.Debug(logger).Log("msg", "deleted file", "file", metaFile, "bucket", bkt.Name()) + } + + return deleteDir(ctx, logger, bkt, id.String()) +} + +// deleteDir removes all objects prefixed with dir from the bucket. +// NOTE: For objects removal use `block.Delete` strictly. +func deleteDir(ctx context.Context, logger log.Logger, bkt objstore.Bucket, dir string) error { + return bkt.Iter(ctx, dir, func(name string) error { + // If we hit a directory, call DeleteDir recursively. + if strings.HasSuffix(name, objstore.DirDelim) { + return deleteDir(ctx, logger, bkt, name) + } + if err := bkt.Delete(ctx, name); err != nil { + return err + } + level.Debug(logger).Log("msg", "deleted file", "file", name, "bucket", bkt.Name()) + return nil + }) } // DownloadMeta downloads only meta file from bucket by block ID. diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go index b13caa557ad..169e0641aed 100644 --- a/pkg/block/block_test.go +++ b/pkg/block/block_test.go @@ -1,7 +1,21 @@ package block import ( + "context" + "io" + "io/ioutil" + "os" + "path" + "strings" "testing" + "time" + + "github.com/fortytw2/leaktest" + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/tsdb/labels" + "github.com/thanos-io/thanos/pkg/objstore/inmem" + "github.com/thanos-io/thanos/pkg/testutil" "github.com/oklog/ulid" ) @@ -56,3 +70,179 @@ func TestIsBlockDir(t *testing.T) { }) } } + +func TestUpload(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-block-upload") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt := inmem.NewBucket() + b1, err := testutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + {{Name: "b", Value: "1"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124) + testutil.Ok(t, err) + testutil.Ok(t, os.MkdirAll(path.Join(tmpDir, "test", b1.String()), os.ModePerm)) + + { + // Wrong dir. + err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "not-existing")) + testutil.NotOk(t, err) + testutil.Assert(t, strings.HasSuffix(err.Error(), "/not-existing: no such file or directory"), "") + } + { + // Wrong existing dir (not a block). + err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test")) + testutil.NotOk(t, err) + testutil.Equals(t, "not a block dir: ulid: bad data size when unmarshaling", err.Error()) + } + { + // Empty block dir + err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String())) + testutil.NotOk(t, err) + testutil.Assert(t, strings.HasSuffix(err.Error(), "/meta.json: no such file or directory"), "") + } + testutil.Ok(t, cpy(path.Join(tmpDir, b1.String(), MetaFilename), path.Join(tmpDir, "test", b1.String(), MetaFilename))) + { + // Missing chunks. + err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String())) + testutil.NotOk(t, err) + testutil.Assert(t, strings.HasSuffix(err.Error(), "/chunks: no such file or directory"), "") + + // Only debug meta.json present. + testutil.Equals(t, 1, len(bkt.Objects())) + } + testutil.Ok(t, os.MkdirAll(path.Join(tmpDir, "test", b1.String(), ChunksDirname), os.ModePerm)) + testutil.Ok(t, cpy(path.Join(tmpDir, b1.String(), ChunksDirname, "000001"), path.Join(tmpDir, "test", b1.String(), ChunksDirname, "000001"))) + { + // Missing index file. + err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String())) + testutil.NotOk(t, err) + testutil.Assert(t, strings.HasSuffix(err.Error(), "/index: no such file or directory"), "") + + // Only debug meta.json present. + testutil.Equals(t, 1, len(bkt.Objects())) + } + testutil.Ok(t, cpy(path.Join(tmpDir, b1.String(), IndexFilename), path.Join(tmpDir, "test", b1.String(), IndexFilename))) + testutil.Ok(t, os.Remove(path.Join(tmpDir, "test", b1.String(), MetaFilename))) + { + // Missing meta.json file. + err := Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String())) + testutil.NotOk(t, err) + testutil.Assert(t, strings.HasSuffix(err.Error(), "/meta.json: no such file or directory"), "") + + // Only debug meta.json present. + testutil.Equals(t, 1, len(bkt.Objects())) + } + testutil.Ok(t, cpy(path.Join(tmpDir, b1.String(), MetaFilename), path.Join(tmpDir, "test", b1.String(), MetaFilename))) + { + // Full block + testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()))) + testutil.Equals(t, 4, len(bkt.Objects())) + testutil.Equals(t, 3751, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")])) + testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)])) + testutil.Equals(t, 365, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) + } + { + // Test Upload is idempotent. + testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()))) + testutil.Equals(t, 4, len(bkt.Objects())) + testutil.Equals(t, 3751, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")])) + testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)])) + testutil.Equals(t, 365, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) + } + { + // Upload with no external labels should be blocked. + b2, err := testutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + {{Name: "b", Value: "1"}}, + }, 100, 0, 1000, nil, 124) + testutil.Ok(t, err) + err = Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b2.String())) + testutil.NotOk(t, err) + testutil.Equals(t, "empty external labels are not allowed for Thanos block.", err.Error()) + testutil.Equals(t, 4, len(bkt.Objects())) + } +} + +func cpy(src, dst string) error { + sourceFileStat, err := os.Stat(src) + if err != nil { + return err + } + + if !sourceFileStat.Mode().IsRegular() { + return errors.Errorf("%s is not a regular file", src) + } + + source, err := os.Open(src) + if err != nil { + return err + } + defer source.Close() + + destination, err := os.Create(dst) + if err != nil { + return err + } + defer destination.Close() + _, err = io.Copy(destination, source) + return err +} + +func TestDelete(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-block-delete") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt := inmem.NewBucket() + { + b1, err := testutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + {{Name: "b", Value: "1"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124) + testutil.Ok(t, err) + testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b1.String()))) + testutil.Equals(t, 4, len(bkt.Objects())) + + // Full delete. + testutil.Ok(t, Delete(ctx, log.NewNopLogger(), bkt, b1)) + // Still debug meta entry is expected. + testutil.Equals(t, 1, len(bkt.Objects())) + } + { + b2, err := testutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + {{Name: "b", Value: "1"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124) + testutil.Ok(t, err) + testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b2.String()))) + testutil.Equals(t, 5, len(bkt.Objects())) + + // Remove meta.json and check if delete can delete it. + testutil.Ok(t, bkt.Delete(ctx, path.Join(b2.String(), MetaFilename))) + testutil.Ok(t, Delete(ctx, log.NewNopLogger(), bkt, b2)) + // Still 2 debug meta entries are expected. + testutil.Equals(t, 2, len(bkt.Objects())) + } +} diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 5f4d00a2229..506d2dada81 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -296,7 +296,7 @@ func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) (remov return true } - if err := block.Delete(ctx, c.bkt, id); err != nil { + if err := block.Delete(ctx, c.logger, c.bkt, id); err != nil { level.Warn(c.logger).Log("msg", "failed to delete malformed block", "block", id, "err", err) return false } @@ -453,7 +453,7 @@ func (c *Syncer) garbageCollect(ctx context.Context, resolution int64) error { level.Info(c.logger).Log("msg", "deleting outdated block", "block", id) - err := block.Delete(delCtx, c.bkt, id) + err := block.Delete(delCtx, c.logger, c.bkt, id) cancel() if err != nil { return retry(errors.Wrapf(err, "delete block %s from bucket", id)) @@ -743,7 +743,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, defer cancel() // TODO(bplotka): Issue with this will introduce overlap that will halt compactor. Automate that (fix duplicate overlaps caused by this). - if err := block.Delete(delCtx, bkt, ie.id); err != nil { + if err := block.Delete(delCtx, logger, bkt, ie.id); err != nil { return errors.Wrapf(err, "deleting old block %s failed. You need to delete this block manually", ie.id) } @@ -932,7 +932,7 @@ func (cg *Group) deleteBlock(b string) error { delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() level.Info(cg.logger).Log("msg", "deleting compacted block", "old_block", id) - if err := block.Delete(delCtx, cg.bkt, id); err != nil { + if err := block.Delete(delCtx, cg.logger, cg.bkt, id); err != nil { return errors.Wrapf(err, "delete block %s from bucket", id) } return nil diff --git a/pkg/compact/retention.go b/pkg/compact/retention.go index 2440eed862f..9021677f2bd 100644 --- a/pkg/compact/retention.go +++ b/pkg/compact/retention.go @@ -32,8 +32,8 @@ func ApplyRetentionPolicyByResolution(ctx context.Context, logger log.Logger, bk maxTime := time.Unix(m.MaxTime/1000, 0) if time.Now().After(maxTime.Add(retentionDuration)) { - level.Info(logger).Log("msg", "deleting block", "id", id, "maxTime", maxTime.String()) - if err := block.Delete(ctx, bkt, id); err != nil { + level.Info(logger).Log("msg", "applying retention: deleting block", "id", id, "maxTime", maxTime.String()) + if err := block.Delete(ctx, logger, bkt, id); err != nil { return errors.Wrap(err, "delete block") } } diff --git a/pkg/objstore/inmem/inmem.go b/pkg/objstore/inmem/inmem.go index d9421319d04..589e520dcd5 100644 --- a/pkg/objstore/inmem/inmem.go +++ b/pkg/objstore/inmem/inmem.go @@ -135,6 +135,9 @@ func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error { // Delete removes all data prefixed with the dir. func (b *Bucket) Delete(_ context.Context, name string) error { + if _, ok := b.objects[name]; !ok { + return errNotFound + } delete(b.objects, name) return nil } diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index 79a443fa784..a1ab5d1b504 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -22,9 +22,11 @@ type Bucket interface { BucketReader // Upload the contents of the reader as an object into the bucket. + // Upload should be idempotent Upload(ctx context.Context, name string, r io.Reader) error // Delete removes the object with the given name. + // If object does not exists in the moment of deletion, Delete should throw error. Delete(ctx context.Context, name string) error // Name returns the bucket name for the provider. @@ -86,23 +88,13 @@ func UploadFile(ctx context.Context, logger log.Logger, bkt Bucket, src, dst str if err := bkt.Upload(ctx, dst, r); err != nil { return errors.Wrapf(err, "upload file %s as %s", src, dst) } + level.Debug(logger).Log("msg", "uploaded file", "from", src, "dst", dst, "bucket", bkt.Name()) return nil } // DirDelim is the delimiter used to model a directory structure in an object store bucket. const DirDelim = "/" -// DeleteDir removes all objects prefixed with dir from the bucket. -func DeleteDir(ctx context.Context, bkt Bucket, dir string) error { - return bkt.Iter(ctx, dir, func(name string) error { - // If we hit a directory, call DeleteDir recursively. - if strings.HasSuffix(name, DirDelim) { - return DeleteDir(ctx, bkt, name) - } - return bkt.Delete(ctx, name) - }) -} - // DownloadFile downloads the src file from the bucket to dst. If dst is an existing // directory, a file with the same name as the source is created in dst. // If destination file is already existing, download file will overwrite it. diff --git a/pkg/objstore/objtesting/acceptance_e2e_test.go b/pkg/objstore/objtesting/acceptance_e2e_test.go index 73da5ed16cf..e35cb75c44b 100644 --- a/pkg/objstore/objtesting/acceptance_e2e_test.go +++ b/pkg/objstore/objtesting/acceptance_e2e_test.go @@ -17,49 +17,53 @@ import ( // used object store is strongly consistent. func TestObjStore_AcceptanceTest_e2e(t *testing.T) { ForeachStore(t, func(t testing.TB, bkt objstore.Bucket) { - _, err := bkt.Get(context.Background(), "") + ctx := context.Background() + + _, err := bkt.Get(ctx, "") testutil.NotOk(t, err) testutil.Assert(t, !bkt.IsObjNotFoundErr(err), "expected user error got not found %s", err) - _, err = bkt.Get(context.Background(), "id1/obj_1.some") + _, err = bkt.Get(ctx, "id1/obj_1.some") testutil.NotOk(t, err) testutil.Assert(t, bkt.IsObjNotFoundErr(err), "expected not found error got %s", err) - ok, err := bkt.Exists(context.Background(), "id1/obj_1.some") + ok, err := bkt.Exists(ctx, "id1/obj_1.some") testutil.Ok(t, err) testutil.Assert(t, !ok, "expected not exits") // Upload first object. - testutil.Ok(t, bkt.Upload(context.Background(), "id1/obj_1.some", strings.NewReader("@test-data@"))) + testutil.Ok(t, bkt.Upload(ctx, "id1/obj_1.some", strings.NewReader("@test-data@"))) // Double check we can immediately read it. - rc1, err := bkt.Get(context.Background(), "id1/obj_1.some") + rc1, err := bkt.Get(ctx, "id1/obj_1.some") testutil.Ok(t, err) defer func() { testutil.Ok(t, rc1.Close()) }() content, err := ioutil.ReadAll(rc1) testutil.Ok(t, err) testutil.Equals(t, "@test-data@", string(content)) - rc2, err := bkt.GetRange(context.Background(), "id1/obj_1.some", 1, 3) + rc2, err := bkt.GetRange(ctx, "id1/obj_1.some", 1, 3) testutil.Ok(t, err) defer func() { testutil.Ok(t, rc2.Close()) }() content, err = ioutil.ReadAll(rc2) testutil.Ok(t, err) testutil.Equals(t, "tes", string(content)) - ok, err = bkt.Exists(context.Background(), "id1/obj_1.some") + ok, err = bkt.Exists(ctx, "id1/obj_1.some") testutil.Ok(t, err) testutil.Assert(t, ok, "expected exits") // Upload other objects. - testutil.Ok(t, bkt.Upload(context.Background(), "id1/obj_2.some", strings.NewReader("@test-data2@"))) - testutil.Ok(t, bkt.Upload(context.Background(), "id1/obj_3.some", strings.NewReader("@test-data3@"))) - testutil.Ok(t, bkt.Upload(context.Background(), "id2/obj_4.some", strings.NewReader("@test-data4@"))) - testutil.Ok(t, bkt.Upload(context.Background(), "obj_5.some", strings.NewReader("@test-data5@"))) + testutil.Ok(t, bkt.Upload(ctx, "id1/obj_2.some", strings.NewReader("@test-data2@"))) + // Upload should be idempotent. + testutil.Ok(t, bkt.Upload(ctx, "id1/obj_2.some", strings.NewReader("@test-data2@"))) + testutil.Ok(t, bkt.Upload(ctx, "id1/obj_3.some", strings.NewReader("@test-data3@"))) + testutil.Ok(t, bkt.Upload(ctx, "id2/obj_4.some", strings.NewReader("@test-data4@"))) + testutil.Ok(t, bkt.Upload(ctx, "obj_5.some", strings.NewReader("@test-data5@"))) // Can we iter over items from top dir? var seen []string - testutil.Ok(t, bkt.Iter(context.Background(), "", func(fn string) error { + testutil.Ok(t, bkt.Iter(ctx, "", func(fn string) error { seen = append(seen, fn) return nil })) @@ -70,7 +74,7 @@ func TestObjStore_AcceptanceTest_e2e(t *testing.T) { // Can we iter over items from id1/ dir? seen = []string{} - testutil.Ok(t, bkt.Iter(context.Background(), "id1/", func(fn string) error { + testutil.Ok(t, bkt.Iter(ctx, "id1/", func(fn string) error { seen = append(seen, fn) return nil })) @@ -78,36 +82,30 @@ func TestObjStore_AcceptanceTest_e2e(t *testing.T) { // Can we iter over items from id1 dir? seen = []string{} - testutil.Ok(t, bkt.Iter(context.Background(), "id1", func(fn string) error { + testutil.Ok(t, bkt.Iter(ctx, "id1", func(fn string) error { seen = append(seen, fn) return nil })) testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some"}, seen) // Can we iter over items from not existing dir? - testutil.Ok(t, bkt.Iter(context.Background(), "id0", func(fn string) error { + testutil.Ok(t, bkt.Iter(ctx, "id0", func(fn string) error { t.Error("Not expected to loop through not existing directory") t.FailNow() return nil })) - testutil.Ok(t, bkt.Delete(context.Background(), "id1/obj_2.some")) + testutil.Ok(t, bkt.Delete(ctx, "id1/obj_2.some")) + // Delete is expected to fail on non existing object. + testutil.NotOk(t, bkt.Delete(ctx, "id1/obj_2.some")) // Can we iter over items from id1/ dir and see obj2 being deleted? seen = []string{} - testutil.Ok(t, bkt.Iter(context.Background(), "id1/", func(fn string) error { + testutil.Ok(t, bkt.Iter(ctx, "id1/", func(fn string) error { seen = append(seen, fn) return nil })) testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_3.some"}, seen) - - testutil.Ok(t, objstore.DeleteDir(context.Background(), bkt, "id1")) - testutil.Ok(t, bkt.Iter(context.Background(), "id1/", func(fn string) error { - t.Error("Not expected to loop through empty / non existing directory") - t.FailNow() - - return nil - })) }) } diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index cf897f012f8..c0bb586d2c5 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -135,7 +135,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) { expFiles[id.String()+"/chunks/0002"] = []byte("chunkcontents2") } if i == 4 { - testutil.Ok(t, block.Delete(ctx, bkt, ids[4])) + testutil.Ok(t, block.Delete(ctx, log.NewNopLogger(), bkt, ids[4])) } // The shipper meta file should show all blocks as uploaded except the compacted one. shipMeta, err = ReadMetaFile(dir) @@ -287,7 +287,7 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) { expFiles[id.String()+"/chunks/0002"] = []byte("chunkcontents2") } if i == 4 { - testutil.Ok(t, block.Delete(ctx, bkt, ids[4])) + testutil.Ok(t, block.Delete(ctx, log.NewNopLogger(), bkt, ids[4])) } // The shipper meta file should show all blocks as uploaded except the compacted one. shipMeta, err = ReadMetaFile(dir) diff --git a/pkg/verifier/safe_delete.go b/pkg/verifier/safe_delete.go index f4b8d9a48b2..6e0f0ad1916 100644 --- a/pkg/verifier/safe_delete.go +++ b/pkg/verifier/safe_delete.go @@ -66,7 +66,7 @@ func BackupAndDelete(ctx context.Context, logger log.Logger, bkt, backupBkt objs // Block uploaded, so we are ok to remove from src bucket. level.Info(logger).Log("msg", "Deleting block", "id", id.String()) - if err := block.Delete(ctx, bkt, id); err != nil { + if err := block.Delete(ctx, logger, bkt, id); err != nil { return errors.Wrap(err, "delete from source") } @@ -95,7 +95,7 @@ func BackupAndDeleteDownloaded(ctx context.Context, logger log.Logger, bdir stri // Block uploaded, so we are ok to remove from src bucket. level.Info(logger).Log("msg", "Deleting block", "id", id.String()) - if err := block.Delete(ctx, bkt, id); err != nil { + if err := block.Delete(ctx, logger, bkt, id); err != nil { return errors.Wrap(err, "delete from source") }