From a6dd23a0e3e4b20372cae20cd2f61a11c91ffc39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 20 Dec 2021 10:37:16 +0200 Subject: [PATCH 1/4] Update version MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- CHANGELOG.md | 2 +- VERSION | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 23fa84af16..6cfcf27805 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ## v0.24.0 - In Progress -## [v0.24.0-rc.1](https://github.com/thanos-io/thanos/tree/release-0.24) - 2021.12.09 +## [v0.24.0-rc.2](https://github.com/thanos-io/thanos/tree/release-0.24) - 2021.12.20 ### Added diff --git a/VERSION b/VERSION index 96e0a313b7..ce531e6b1b 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.24.0-rc.1 +0.24.0-rc.2 From 5c38b9ce2e9d5f89161ebef646ad52f006c56a05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 17 Dec 2021 14:32:48 +0200 Subject: [PATCH 2/4] downsample: fix deadlock if error occurs (#4962) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix deadlock that occurs if there is some backlog of blocks to be deduplicated, and an error occurs. For this to trigger, there needs to be at least `downsampleConcurrency + 2` blocks in the backlog. Add test to cover this regression. Affected versions 0.22.0 - 0.23.1. Signed-off-by: Giedrius Statkevičius --- CHANGELOG.md | 1 + cmd/thanos/downsample.go | 131 ++++++++++++++++++++---------------- cmd/thanos/main_test.go | 141 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 215 insertions(+), 58 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6cfcf27805..551be1b6af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#4811](https://github.com/thanos-io/thanos/pull/4811) Query: Fix data race in metadata, rules, and targets servers. - [#4795](https://github.com/thanos-io/thanos/pull/4795) Query: Fix deadlock in endpointset. - [#4928](https://github.com/thanos-io/thanos/pull/4928) Azure: Only create an http client once, to conserve memory. +- [#4962](https://github.com/thanos-io/thanos/pull/4962) Compact/downsample: fix deadlock if error occurs with some backlog of blocks; fixes [this pull request](https://github.com/thanos-io/thanos/pull/4430). Affected versions are 0.22.0 - 0.23.1. ### Changed diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 4c1d6affed..5a9eaa2cf0 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "sort" + "sync" "time" extflag "github.com/efficientgo/tools/extkingpin" @@ -20,13 +21,13 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" - "golang.org/x/sync/errgroup" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" @@ -229,90 +230,104 @@ func downsampleBucket( }) var ( - eg errgroup.Group - ch = make(chan *metadata.Meta, downsampleConcurrency) + wg sync.WaitGroup + metaCh = make(chan *metadata.Meta) + downsampleErrs errutil.MultiError + errCh = make(chan error, downsampleConcurrency) + workerCtx, workerCancel = context.WithCancel(ctx) ) + defer workerCancel() + level.Debug(logger).Log("msg", "downsampling bucket", "concurrency", downsampleConcurrency) for i := 0; i < downsampleConcurrency; i++ { - eg.Go(func() error { - for m := range ch { + wg.Add(1) + go func() { + defer wg.Done() + for m := range metaCh { resolution := downsample.ResLevel1 errMsg := "downsampling to 5 min" if m.Thanos.Downsample.Resolution == downsample.ResLevel1 { resolution = downsample.ResLevel2 errMsg = "downsampling to 60 min" } - if err := processDownsampling(ctx, logger, bkt, m, dir, resolution, hashFunc, metrics); err != nil { + if err := processDownsampling(workerCtx, logger, bkt, m, dir, resolution, hashFunc, metrics); err != nil { metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc() - return errors.Wrap(err, errMsg) + errCh <- errors.Wrap(err, errMsg) } metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc() } - return nil - }) + }() } // Workers scheduled, distribute blocks. - eg.Go(func() error { - defer close(ch) - for _, mk := range metasULIDS { - m := metas[mk] +metaSendLoop: + for _, mk := range metasULIDS { + m := metas[mk] - switch m.Thanos.Downsample.Resolution { - case downsample.ResLevel2: - continue + switch m.Thanos.Downsample.Resolution { + case downsample.ResLevel2: + continue - case downsample.ResLevel0: - missing := false - for _, id := range m.Compaction.Sources { - if _, ok := sources5m[id]; !ok { - missing = true - break - } - } - if !missing { - continue - } - // Only downsample blocks once we are sure to get roughly 2 chunks out of it. - // NOTE(fabxc): this must match with at which block size the compactor creates downsampled - // blocks. Otherwise we may never downsample some data. - if m.MaxTime-m.MinTime < downsample.DownsampleRange0 { - continue + case downsample.ResLevel0: + missing := false + for _, id := range m.Compaction.Sources { + if _, ok := sources5m[id]; !ok { + missing = true + break } + } + if !missing { + continue + } + // Only downsample blocks once we are sure to get roughly 2 chunks out of it. + // NOTE(fabxc): this must match with at which block size the compactor creates downsampled + // blocks. Otherwise we may never downsample some data. + if m.MaxTime-m.MinTime < downsample.DownsampleRange0 { + continue + } - case downsample.ResLevel1: - missing := false - for _, id := range m.Compaction.Sources { - if _, ok := sources1h[id]; !ok { - missing = true - break - } - } - if !missing { - continue - } - // Only downsample blocks once we are sure to get roughly 2 chunks out of it. - // NOTE(fabxc): this must match with at which block size the compactor creates downsampled - // blocks. Otherwise we may never downsample some data. - if m.MaxTime-m.MinTime < downsample.DownsampleRange1 { - continue + case downsample.ResLevel1: + missing := false + for _, id := range m.Compaction.Sources { + if _, ok := sources1h[id]; !ok { + missing = true + break } } - - select { - case <-ctx.Done(): - return ctx.Err() - case ch <- m: + if !missing { + continue + } + // Only downsample blocks once we are sure to get roughly 2 chunks out of it. + // NOTE(fabxc): this must match with at which block size the compactor creates downsampled + // blocks. Otherwise we may never downsample some data. + if m.MaxTime-m.MinTime < downsample.DownsampleRange1 { + continue } } - return nil - }) - if err := eg.Wait(); err != nil { - return errors.Wrap(err, "downsample bucket") + select { + case <-workerCtx.Done(): + downsampleErrs.Add(workerCtx.Err()) + break metaSendLoop + case metaCh <- m: + case downsampleErr := <-errCh: + downsampleErrs.Add(downsampleErr) + break metaSendLoop + } } - return nil + + close(metaCh) + wg.Wait() + workerCancel() + close(errCh) + + // Collect any other error reported by the workers. + for downsampleErr := range errCh { + downsampleErrs.Add(downsampleErr) + } + + return downsampleErrs.Err() } func processDownsampling( diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go index 73a7d656ec..18dd566aa6 100644 --- a/cmd/thanos/main_test.go +++ b/cmd/thanos/main_test.go @@ -5,9 +5,12 @@ package main import ( "context" + "fmt" + "io" "io/ioutil" "os" "path" + "strings" "testing" "time" @@ -26,6 +29,144 @@ import ( "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) +type erroringBucket struct { + bkt objstore.InstrumentedBucket +} + +func (b *erroringBucket) Close() error { + return b.bkt.Close() +} + +// WithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment +// thanos_objstore_bucket_operation_failures_total metric. +func (b *erroringBucket) WithExpectedErrs(f objstore.IsOpFailureExpectedFunc) objstore.Bucket { + return b.bkt.WithExpectedErrs(f) +} + +// ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment +// thanos_objstore_bucket_operation_failures_total metric. +func (b *erroringBucket) ReaderWithExpectedErrs(f objstore.IsOpFailureExpectedFunc) objstore.BucketReader { + return b.bkt.ReaderWithExpectedErrs(f) +} + +func (b *erroringBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { + return b.bkt.Iter(ctx, dir, f, options...) +} + +// Get returns a reader for the given object name. +func (b *erroringBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + if strings.Contains(name, "chunk") { + return nil, fmt.Errorf("some random error has occurred") + } + return b.bkt.Get(ctx, name) +} + +// GetRange returns a new range reader for the given object name and range. +func (b *erroringBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + if strings.Contains(name, "chunk") { + return nil, fmt.Errorf("some random error has occurred") + } + return b.bkt.GetRange(ctx, name, off, length) +} + +// Exists checks if the given object exists in the bucket. +func (b *erroringBucket) Exists(ctx context.Context, name string) (bool, error) { + return b.bkt.Exists(ctx, name) +} + +// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. +func (b *erroringBucket) IsObjNotFoundErr(err error) bool { + return b.bkt.IsObjNotFoundErr(err) +} + +// Attributes returns information about the specified object. +func (b *erroringBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + return b.bkt.Attributes(ctx, name) +} + +// Upload the contents of the reader as an object into the bucket. +// Upload should be idempotent. +func (b *erroringBucket) Upload(ctx context.Context, name string, r io.Reader) error { + return b.bkt.Upload(ctx, name, r) +} + +// Delete removes the object with the given name. +// If object does not exists in the moment of deletion, Delete should throw error. +func (b *erroringBucket) Delete(ctx context.Context, name string) error { + return b.bkt.Delete(ctx, name) +} + +// Name returns the bucket name for the provider. +func (b *erroringBucket) Name() string { + return b.bkt.Name() +} + +// Ensures that downsampleBucket() stops its work properly +// after an error occurs with some blocks in the backlog. +// Testing for https://github.com/thanos-io/thanos/issues/4960. +func TestRegression4960_Deadlock(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stderr) + dir, err := ioutil.TempDir("", "test-compact-cleanup") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) + bkt = &erroringBucket{bkt: bkt} + var id, id2, id3 ulid.ULID + { + id, err = e2eutil.CreateBlock( + ctx, + dir, + []labels.Labels{{{Name: "a", Value: "1"}}}, + 1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check. + labels.Labels{{Name: "e1", Value: "1"}}, + downsample.ResLevel0, metadata.NoneFunc) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String()), metadata.NoneFunc)) + } + { + id2, err = e2eutil.CreateBlock( + ctx, + dir, + []labels.Labels{{{Name: "a", Value: "2"}}}, + 1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check. + labels.Labels{{Name: "e1", Value: "2"}}, + downsample.ResLevel0, metadata.NoneFunc) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id2.String()), metadata.NoneFunc)) + } + { + id3, err = e2eutil.CreateBlock( + ctx, + dir, + []labels.Labels{{{Name: "a", Value: "2"}}}, + 1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check. + labels.Labels{{Name: "e1", Value: "2"}}, + downsample.ResLevel0, metadata.NoneFunc) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id3.String()), metadata.NoneFunc)) + } + + meta, err := block.DownloadMeta(ctx, logger, bkt, id) + testutil.Ok(t, err) + + metrics := newDownsampleMetrics(prometheus.NewRegistry()) + testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(meta.Thanos)))) + metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, "", nil, nil, nil) + testutil.Ok(t, err) + + metas, _, err := metaFetcher.Fetch(ctx) + testutil.Ok(t, err) + err = downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, metadata.NoneFunc) + testutil.NotOk(t, err) + + testutil.Assert(t, strings.Contains(err.Error(), "some random error has occurred")) + +} + func TestCleanupDownsampleCacheFolder(t *testing.T) { logger := log.NewLogfmtLogger(os.Stderr) dir, err := ioutil.TempDir("", "test-compact-cleanup") From 8982b8f23004b83470c773411533be36f7804bcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 20 Dec 2021 10:41:23 +0200 Subject: [PATCH 3/4] Makefile: update SHA256 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 9f4ceb6030..d738acd16a 100644 --- a/Makefile +++ b/Makefile @@ -14,10 +14,10 @@ arch = $(shell uname -m) # Update at 2021.12.08 ifeq ($(arch), x86_64) # amd64 - BASE_DOCKER_SHA="97a9aacc097e5dbdec33b0d671adea0785e76d26ff2b979ee28570baf6a9155d" + BASE_DOCKER_SHA="768a51a5f71827471e6e58f0d6200c2fa24f2cb5cde1ecbd67fe28f93d4ef464" else ifeq ($(arch), armv8) # arm64 - BASE_DOCKER_SHA="5feb736d32e5b57f4944691d00b581f1f9192b3732cab03e3b6034cf0d1c8f2c" + BASE_DOCKER_SHA="042d6195e1793b226d1632117cccb4c4906c8ab393b8b68328ad43cf59c64f9d" else echo >&2 "only support amd64 or arm64 arch" && exit 1 endif From 3c23e6212ae51197f5b77ad9c69fb674de1357e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 20 Dec 2021 10:45:41 +0200 Subject: [PATCH 4/4] Makefile/CHANGELOG: fix format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- CHANGELOG.md | 2 +- Makefile | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 551be1b6af..9ad2ce7595 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ## v0.24.0 - In Progress -## [v0.24.0-rc.2](https://github.com/thanos-io/thanos/tree/release-0.24) - 2021.12.20 +## [v0.24.0-rc.1](https://github.com/thanos-io/thanos/tree/release-0.24) - 2021.12.09 ### Added diff --git a/Makefile b/Makefile index d738acd16a..72ebc15ce1 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ arch = $(shell uname -m) # Run `DOCKER_CLI_EXPERIMENTAL=enabled docker manifest inspect quay.io/prometheus/busybox:latest` to get SHA or # just visit https://quay.io/repository/prometheus/busybox?tag=latest&tab=tags. # TODO(bwplotka): Pinning is important but somehow quay kills the old images, so make sure to update regularly. -# Update at 2021.12.08 +# Update at 2021.12.15 ifeq ($(arch), x86_64) # amd64 BASE_DOCKER_SHA="768a51a5f71827471e6e58f0d6200c2fa24f2cb5cde1ecbd67fe28f93d4ef464"