Skip to content

Commit

Permalink
Do not upload meta files to obj store (#5110)
Browse files Browse the repository at this point in the history
* Do not upload meta files to obj store

Signed-off-by: Romain Dauby <romain.dauby@gmail.com>

* Update CHANGELOG

Signed-off-by: Romain Dauby <romain.dauby@gmail.com>
  • Loading branch information
r0mdau authored Jan 30, 2022
1 parent b711a0c commit 480d2e4
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Added

- [#5110](https://github.com/thanos-io/thanos/pull/5110) Block: Do not upload DebugMeta files to obj store.
- [#4963](https://github.com/thanos-io/thanos/pull/4963) Compactor, Store, Tools: Loading block metadata now only filters out duplicates within a source (or compaction group if replica labels are configured), and does so in parallel over sources.
- [#5089](https://github.com/thanos-io/thanos/pull/5089) S3: Create an empty map in the case SSE-KMS is used and no KMSEncryptionContext is passed.
- [#4970](https://github.com/thanos-io/thanos/pull/4970) Added a new flag `exclude-delete` to `tools bucket ls`, which excludes blocks marked for deletion.
Expand Down
6 changes: 0 additions & 6 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
Expand Down Expand Up @@ -146,11 +145,6 @@ func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
return errors.Wrap(err, "encode meta file")
}

// TODO(yeya24): Remove this step.
if err := bkt.Upload(ctx, path.Join(DebugMetas, fmt.Sprintf("%s.json", id)), strings.NewReader(metaEncoded.String())); err != nil {
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload debug meta file"))
}

if err := objstore.UploadDir(ctx, logger, bkt, path.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname)); err != nil {
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload chunks"))
}
Expand Down
34 changes: 16 additions & 18 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestUpload(t *testing.T) {
{
// Full block.
testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()), metadata.NoneFunc))
testutil.Equals(t, 4, len(bkt.Objects()))
testutil.Equals(t, 3, len(bkt.Objects()))
testutil.Equals(t, 3751, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 546, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestUpload(t *testing.T) {
{
// Test Upload is idempotent.
testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, "test", b1.String()), metadata.NoneFunc))
testutil.Equals(t, 4, len(bkt.Objects()))
testutil.Equals(t, 3, len(bkt.Objects()))
testutil.Equals(t, 3751, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 546, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
Expand All @@ -209,7 +209,7 @@ func TestUpload(t *testing.T) {
err = Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b2.String()), metadata.NoneFunc)
testutil.NotOk(t, err)
testutil.Equals(t, "empty external labels are not allowed for Thanos block.", err.Error())
testutil.Equals(t, 4, len(bkt.Objects()))
testutil.Equals(t, 3, len(bkt.Objects()))
}
{
// No external labels with UploadPromBlocks.
Expand All @@ -223,7 +223,7 @@ func TestUpload(t *testing.T) {
testutil.Ok(t, err)
err = UploadPromBlock(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b2.String()), metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Equals(t, 8, len(bkt.Objects()))
testutil.Equals(t, 6, len(bkt.Objects()))
testutil.Equals(t, 3736, len(bkt.Objects()[path.Join(b2.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b2.String(), IndexFilename)]))
testutil.Equals(t, 525, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)]))
Expand All @@ -249,15 +249,14 @@ func TestDelete(t *testing.T) {
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b1.String()), metadata.NoneFunc))
testutil.Equals(t, 4, len(bkt.Objects()))
testutil.Equals(t, 3, len(bkt.Objects()))

markedForDeletion := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"})
testutil.Ok(t, MarkForDeletion(ctx, log.NewNopLogger(), bkt, b1, "", markedForDeletion))

// Full delete.
testutil.Ok(t, Delete(ctx, log.NewNopLogger(), bkt, b1))
// Still debug meta entry is expected.
testutil.Equals(t, 1, len(bkt.Objects()))
testutil.Equals(t, 0, len(bkt.Objects()))
}
{
b2, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
Expand All @@ -269,13 +268,12 @@ func TestDelete(t *testing.T) {
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, b2.String()), metadata.NoneFunc))
testutil.Equals(t, 5, len(bkt.Objects()))
testutil.Equals(t, 3, len(bkt.Objects()))

// Remove meta.json and check if delete can delete it.
testutil.Ok(t, bkt.Delete(ctx, path.Join(b2.String(), MetaFilename)))
testutil.Ok(t, Delete(ctx, log.NewNopLogger(), bkt, b2))
// Still 2 debug meta entries are expected.
testutil.Equals(t, 2, len(bkt.Objects()))
testutil.Equals(t, 0, len(bkt.Objects()))
}
}

Expand Down Expand Up @@ -415,7 +413,7 @@ func TestHashDownload(t *testing.T) {
testutil.Ok(t, err)

testutil.Ok(t, Upload(ctx, log.NewNopLogger(), instrumentedBkt, path.Join(tmpDir, b1.String()), metadata.SHA256Func))
testutil.Equals(t, 4, len(bkt.Objects()))
testutil.Equals(t, 3, len(bkt.Objects()))

m, err := DownloadMeta(ctx, log.NewNopLogger(), bkt, b1)
testutil.Ok(t, err)
Expand Down Expand Up @@ -447,7 +445,7 @@ func TestHashDownload(t *testing.T) {
thanos_objstore_bucket_operations_total{bucket="test",operation="get"} 2
thanos_objstore_bucket_operations_total{bucket="test",operation="get_range"} 0
thanos_objstore_bucket_operations_total{bucket="test",operation="iter"} 2
thanos_objstore_bucket_operations_total{bucket="test",operation="upload"} 4
thanos_objstore_bucket_operations_total{bucket="test",operation="upload"} 3
`), `thanos_objstore_bucket_operations_total`))
}

Expand All @@ -465,7 +463,7 @@ func TestHashDownload(t *testing.T) {
thanos_objstore_bucket_operations_total{bucket="test",operation="get"} 4
thanos_objstore_bucket_operations_total{bucket="test",operation="get_range"} 0
thanos_objstore_bucket_operations_total{bucket="test",operation="iter"} 4
thanos_objstore_bucket_operations_total{bucket="test",operation="upload"} 4
thanos_objstore_bucket_operations_total{bucket="test",operation="upload"} 3
`), `thanos_objstore_bucket_operations_total`))
}

Expand All @@ -485,7 +483,7 @@ func TestHashDownload(t *testing.T) {
thanos_objstore_bucket_operations_total{bucket="test",operation="get"} 7
thanos_objstore_bucket_operations_total{bucket="test",operation="get_range"} 0
thanos_objstore_bucket_operations_total{bucket="test",operation="iter"} 6
thanos_objstore_bucket_operations_total{bucket="test",operation="upload"} 4
thanos_objstore_bucket_operations_total{bucket="test",operation="upload"} 3
`), `thanos_objstore_bucket_operations_total`))
}
}
Expand Down Expand Up @@ -516,8 +514,8 @@ func TestUploadCleanup(t *testing.T) {
testutil.Assert(t, errors.Is(uploadErr, errUploadFailed))

// If upload of index fails, block is deleted.
testutil.Equals(t, 1, len(bkt.Objects())) // Only debug meta file is present.
testutil.Assert(t, len(bkt.Objects()[path.Join(DebugMetas, fmt.Sprintf("%s.json", b1.String()))]) > 0)
testutil.Equals(t, 0, len(bkt.Objects()))
testutil.Assert(t, len(bkt.Objects()[path.Join(DebugMetas, fmt.Sprintf("%s.json", b1.String()))]) == 0)
}

{
Expand All @@ -527,11 +525,11 @@ func TestUploadCleanup(t *testing.T) {
testutil.Assert(t, errors.Is(uploadErr, errUploadFailed))

// If upload of meta.json fails, nothing is cleaned up.
testutil.Equals(t, 4, len(bkt.Objects()))
testutil.Equals(t, 3, len(bkt.Objects()))
testutil.Assert(t, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]) > 0)
testutil.Assert(t, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]) > 0)
testutil.Assert(t, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]) > 0)
testutil.Assert(t, len(bkt.Objects()[path.Join(DebugMetas, fmt.Sprintf("%s.json", b1.String()))]) > 0)
testutil.Assert(t, len(bkt.Objects()[path.Join(DebugMetas, fmt.Sprintf("%s.json", b1.String()))]) == 0)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/shipper/shipper_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) {
thanos_objstore_bucket_operations_total{bucket="test",operation="get"} 0
thanos_objstore_bucket_operations_total{bucket="test",operation="get_range"} 0
thanos_objstore_bucket_operations_total{bucket="test",operation="iter"} 0
thanos_objstore_bucket_operations_total{bucket="test",operation="upload"} 25
thanos_objstore_bucket_operations_total{bucket="test",operation="upload"} 20
`), `thanos_objstore_bucket_operations_total`))
testutil.Equals(t, 0, b)
}
Expand Down

0 comments on commit 480d2e4

Please sign in to comment.