diff --git a/CHANGELOG.md b/CHANGELOG.md index c99b1576a9..6da009da38 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel ## Unreleased -## [v0.13.0](https://github.com/thanos-io/thanos/releases/tag/v0.13.0) - 2020.06.15 +## [v0.13.0-rc.2](https://github.com/thanos-io/thanos/releases/tag/v0.13.0-rc.2) - 2020.06.15 ### Fixed @@ -26,6 +26,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2416](https://github.com/thanos-io/thanos/pull/2416) Bucket: Fixed issue #2416 bug in `inspect --sort-by` doesn't work correctly in all cases. - [#2719](https://github.com/thanos-io/thanos/pull/2719) Query: `irate` and `resets` use now counter downsampling aggregations. - [#2705](https://github.com/thanos-io/thanos/pull/2705) minio-go: Added support for `af-south-1` and `eu-south-1` regions. +- [#2753](https://github.com/thanos-io/thanos/issues/2753) Sidecar, Receive, Rule: Fixed possibility of out of order uploads in error cases. This could potentially cause Compactor to create overlapping blocks. ### Added diff --git a/VERSION b/VERSION index 54d1a4f2a4..dec5fb8bcf 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.13.0 +0.13.0-rc.2 diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index 1b46311adc..72bd17f486 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -112,17 +112,23 @@ func (rc *reloaderConfig) registerFlag(cmd *kingpin.CmdClause) *reloaderConfig { } type shipperConfig struct { - uploadCompacted bool - ignoreBlockSize bool + uploadCompacted bool + ignoreBlockSize bool + allowOutOfOrderUpload bool } func (sc *shipperConfig) registerFlag(cmd *kingpin.CmdClause) *shipperConfig { cmd.Flag("shipper.upload-compacted", - "If true sidecar will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus. Do it once and then disable the flag when done."). + "If true shipper will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus. Do it once and then disable the flag when done."). Default("false").BoolVar(&sc.uploadCompacted) cmd.Flag("shipper.ignore-unequal-block-size", - "If true sidecar will not require prometheus min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled on your Prometheus instance, as in the worst case it can result in ~2h data loss for your Thanos bucket storage."). + "If true shipper will not require prometheus min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled on your Prometheus instance, as in the worst case it can result in ~2h data loss for your Thanos bucket storage."). Default("false").Hidden().BoolVar(&sc.ignoreBlockSize) + cmd.Flag("shipper.allow-out-of-order-uploads", + "If true, shipper will skip failed block uploads in the given iteration and retry later. This means that some newer blocks might be uploaded sooner than older blocks."+ + "This can trigger compaction without those blocks and as a result will create an overlap situation. Set it to true if you have vertical compaction enabled and wish to upload blocks as soon as possible without caring"+ + "about order."). + Default("false").Hidden().BoolVar(&sc.allowOutOfOrderUpload) return sc } diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 972e02f250..0bd8e29c9d 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -89,6 +89,12 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { walCompression := cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").Bool() + allowOutOfOrderUpload := cmd.Flag("shipper.allow-out-of-order-uploads", + "If true, shipper will skip failed block uploads in the given iteration and retry later. This means that some newer blocks might be uploaded sooner than older blocks."+ + "This can trigger compaction without those blocks and as a result will create an overlap situation. Set it to true if you have vertical compaction enabled and wish to upload blocks as soon as possible without caring"+ + "about order."). + Default("false").Hidden().Bool() + m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { lset, err := parseFlagLabels(*labelStrs) if err != nil { @@ -157,6 +163,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) { *replicationFactor, time.Duration(*forwardTimeout), comp, + *allowOutOfOrderUpload, ) } } @@ -195,6 +202,7 @@ func runReceive( replicationFactor uint64, forwardTimeout time.Duration, comp component.SourceStoreAPI, + allowOutOfOrderUpload bool, ) error { logger = log.With(logger, "component", "receive") level.Warn(logger).Log("msg", "setting up receive; the Thanos receive component is EXPERIMENTAL, it may break significantly without notice") @@ -246,6 +254,7 @@ func runReceive( lset, tenantLabelName, bkt, + allowOutOfOrderUpload, ) writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs) webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 2789178f89..b9e5b98c56 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -114,6 +114,12 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { dnsSDResolver := cmd.Flag("query.sd-dns-resolver", "Resolver to use. Possible options: [golang, miekgdns]"). Default("golang").Hidden().String() + allowOutOfOrderUpload := cmd.Flag("shipper.allow-out-of-order-uploads", + "If true, shipper will skip failed block uploads in the given iteration and retry later. This means that some newer blocks might be uploaded sooner than older blocks."+ + "This can trigger compaction without those blocks and as a result will create an overlap situation. Set it to true if you have vertical compaction enabled and wish to upload blocks as soon as possible without caring"+ + "about order."). + Default("false").Hidden().Bool() + m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, reload <-chan struct{}, _ bool) error { lset, err := parseFlagLabels(*labelStrs) if err != nil { @@ -197,6 +203,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { time.Duration(*dnsSDInterval), *dnsSDResolver, comp, + *allowOutOfOrderUpload, ) } } @@ -283,6 +290,7 @@ func runRule( dnsSDInterval time.Duration, dnsSDResolver string, comp component.Component, + allowOutOfOrderUpload bool, ) error { metrics := newRuleMetrics(reg) @@ -615,7 +623,7 @@ func runRule( } }() - s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource) + s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, allowOutOfOrderUpload) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index e16f7534bc..83cb5e3081 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -273,9 +273,9 @@ func runSidecar( var s *shipper.Shipper if conf.shipper.uploadCompacted { - s = shipper.NewWithCompacted(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource) + s = shipper.NewWithCompacted(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, conf.shipper.allowOutOfOrderUpload) } else { - s = shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource) + s = shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, conf.shipper.allowOutOfOrderUpload) } return runutil.Repeat(30*time.Second, ctx.Done(), func() error { diff --git a/docs/components/sidecar.md b/docs/components/sidecar.md index 1eb2a0eb2f..7f80a1ac29 100644 --- a/docs/components/sidecar.md +++ b/docs/components/sidecar.md @@ -151,7 +151,7 @@ Flags: details: https://thanos.io/storage.md/#configuration --shipper.upload-compacted - If true sidecar will try to upload compacted + If true shipper will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus. Do it once and then disable the diff --git a/docs/operating/troubleshooting.md b/docs/operating/troubleshooting.md index 5e2b9fa3bb..4a94d51f48 100644 --- a/docs/operating/troubleshooting.md +++ b/docs/operating/troubleshooting.md @@ -7,7 +7,6 @@ slug: /troubleshooting.md # Troubleshooting; Common cases - ## Overlaps **Block overlap**: Set of blocks with exactly the same external labels in meta.json and for the same time or overlapping time period. @@ -29,6 +28,7 @@ Checking producers log for such ULID, and checking meta.json (e.g if sample stat ### Reasons +- You are running Thanos (sidecar, ruler or receive) older than 0.13.0. During transient upload errors there is a possibility to have overlaps caused by the compactor not being aware of all blocks See: [this](https://github.com/thanos-io/thanos/issues/2753) - Misconfiguraiton of sidecar/ruler: Same external labels or no external labels across many block producers. - Running multiple compactors for single block "stream", even for short duration. - Manually uploading blocks to the bucket. @@ -36,6 +36,7 @@ Checking producers log for such ULID, and checking meta.json (e.g if sample stat ### Solutions +- Upgrade sidecar, ruler and receive to 0.13.0+ - Compactor can be blocked for some time, but if it is urgent. Mitigate by removing overlap or better: Backing up somewhere else (you can rename block ULID to non-ulid). - Who uploaded the block? Search for logs with this ULID across all sidecars/rulers. Check access logs to object storage. Check debug/metas or meta.json of problematic block to see how blocks looks like and what is the `source`. - Determine what you misconfigured. diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index a9ba831f3d..672a8b0fa9 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -38,8 +38,9 @@ type MultiTSDB struct { labels labels.Labels bucket objstore.Bucket - mtx *sync.RWMutex - tenants map[string]*tenant + mtx *sync.RWMutex + tenants map[string]*tenant + allowOutOfOrderUpload bool } func NewMultiTSDB( @@ -50,21 +51,23 @@ func NewMultiTSDB( labels labels.Labels, tenantLabelName string, bucket objstore.Bucket, + allowOutOfOrderUpload bool, ) *MultiTSDB { if l == nil { l = log.NewNopLogger() } return &MultiTSDB{ - dataDir: dataDir, - logger: l, - reg: reg, - tsdbOpts: tsdbOpts, - mtx: &sync.RWMutex{}, - tenants: map[string]*tenant{}, - labels: labels, - tenantLabelName: tenantLabelName, - bucket: bucket, + dataDir: dataDir, + logger: l, + reg: reg, + tsdbOpts: tsdbOpts, + mtx: &sync.RWMutex{}, + tenants: map[string]*tenant{}, + labels: labels, + tenantLabelName: tenantLabelName, + bucket: bucket, + allowOutOfOrderUpload: allowOutOfOrderUpload, } } @@ -256,6 +259,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan t.bucket, func() labels.Labels { return lbls }, metadata.ReceiveSource, + t.allowOutOfOrderUpload, ) } diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index f6e7a012dd..a05244cf04 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -43,6 +43,7 @@ func TestMultiTSDB(t *testing.T) { labels.FromStrings("replica", "01"), "tenant_id", nil, + false, ) defer testutil.Ok(t, m.Flush()) @@ -109,6 +110,7 @@ func TestMultiTSDB(t *testing.T) { labels.FromStrings("replica", "01"), "tenant_id", nil, + false, ) defer testutil.Ok(t, m.Flush()) diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 0139a64382..6a79689ea0 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -72,13 +72,15 @@ func newMetrics(reg prometheus.Registerer, uploadCompacted bool) *metrics { // Shipper watches a directory for matching files and directories and uploads // them to a remote data store. type Shipper struct { - logger log.Logger - dir string - metrics *metrics - bucket objstore.Bucket - labels func() labels.Labels - source metadata.SourceType - uploadCompacted bool + logger log.Logger + dir string + metrics *metrics + bucket objstore.Bucket + labels func() labels.Labels + source metadata.SourceType + + uploadCompacted bool + allowOutOfOrderUploads bool } // New creates a new shipper that detects new TSDB blocks in dir and uploads them @@ -90,6 +92,7 @@ func New( bucket objstore.Bucket, lbls func() labels.Labels, source metadata.SourceType, + allowOutOfOrderUploads bool, ) *Shipper { if logger == nil { logger = log.NewNopLogger() @@ -99,12 +102,13 @@ func New( } return &Shipper{ - logger: logger, - dir: dir, - bucket: bucket, - labels: lbls, - metrics: newMetrics(r, false), - source: source, + logger: logger, + dir: dir, + bucket: bucket, + labels: lbls, + metrics: newMetrics(r, false), + source: source, + allowOutOfOrderUploads: allowOutOfOrderUploads, } } @@ -118,6 +122,7 @@ func NewWithCompacted( bucket objstore.Bucket, lbls func() labels.Labels, source metadata.SourceType, + allowOutOfOrderUploads bool, ) *Shipper { if logger == nil { logger = log.NewNopLogger() @@ -127,13 +132,14 @@ func NewWithCompacted( } return &Shipper{ - logger: logger, - dir: dir, - bucket: bucket, - labels: lbls, - metrics: newMetrics(r, true), - source: source, - uploadCompacted: true, + logger: logger, + dir: dir, + bucket: bucket, + labels: lbls, + metrics: newMetrics(r, true), + source: source, + uploadCompacted: true, + allowOutOfOrderUploads: allowOutOfOrderUploads, } } @@ -153,23 +159,23 @@ func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) { minTime = math.MaxInt64 maxSyncTime = math.MinInt64 - if err := s.iterBlockMetas(func(m *metadata.Meta) error { + metas, err := s.blockMetasFromOldest() + if err != nil { + return 0, 0, err + } + for _, m := range metas { if m.MinTime < minTime { minTime = m.MinTime } if _, ok := hasUploaded[m.ULID]; ok && m.MaxTime > maxSyncTime { maxSyncTime = m.MaxTime } - return nil - }); err != nil { - return 0, 0, errors.Wrap(err, "iter Block metas for timestamp") } if minTime == math.MaxInt64 { // No block yet found. We cannot assume any min block size so propagate 0 minTime. minTime = 0 } - return minTime, maxSyncTime, nil } @@ -272,70 +278,76 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { checker = newLazyOverlapChecker(s.logger, s.bucket, s.labels) uploadErrs int ) - // Sync non compacted blocks first. - if err := s.iterBlockMetas(func(m *metadata.Meta) error { + + metas, err := s.blockMetasFromOldest() + if err != nil { + return 0, err + } + for _, m := range metas { // Do not sync a block if we already uploaded or ignored it. If it's no longer found in the bucket, // it was generally removed by the compaction process. if _, uploaded := hasUploaded[m.ULID]; uploaded { meta.Uploaded = append(meta.Uploaded, m.ULID) - return nil + continue } if m.Stats.NumSamples == 0 { // Ignore empty blocks. level.Debug(s.logger).Log("msg", "ignoring empty block", "block", m.ULID) - return nil + continue } // Check against bucket if the meta file for this block exists. ok, err := s.bucket.Exists(ctx, path.Join(m.ULID.String(), block.MetaFilename)) if err != nil { - return errors.Wrap(err, "check exists") + return 0, errors.Wrap(err, "check exists") } if ok { - return nil + continue } // We only ship of the first compacted block level as normal flow. if m.Compaction.Level > 1 { if !s.uploadCompacted { - return nil + continue } if err := checker.IsOverlapping(ctx, m.BlockMeta); err != nil { + if !s.allowOutOfOrderUploads { + return 0, errors.Errorf("Found overlap or error during sync, cannot upload compacted block, details: %v", err) + } level.Error(s.logger).Log("msg", "found overlap or error during sync, cannot upload compacted block", "err", err) uploadErrs++ - return nil + continue } } if err := s.upload(ctx, m); err != nil { - level.Error(s.logger).Log("msg", "shipping failed", "block", m.ULID, "err", err) + if !s.allowOutOfOrderUploads { + return 0, errors.Wrapf(err, "upload %v", m.ULID) + } + // No error returned, just log line. This is because we want other blocks to be uploaded even // though this one failed. It will be retried on second Sync iteration. + level.Error(s.logger).Log("msg", "shipping failed", "block", m.ULID, "err", err) uploadErrs++ - return nil + continue } meta.Uploaded = append(meta.Uploaded, m.ULID) - uploaded++ s.metrics.uploads.Inc() - return nil - }); err != nil { - s.metrics.dirSyncFailures.Inc() - return uploaded, errors.Wrap(err, "iter local block metas") } - if err := WriteMetaFile(s.logger, s.dir, meta); err != nil { level.Warn(s.logger).Log("msg", "updating meta file failed", "err", err) } s.metrics.dirSyncs.Inc() - if uploadErrs > 0 { s.metrics.uploadFailures.Add(float64(uploadErrs)) return uploaded, errors.Errorf("failed to sync %v blocks", uploadErrs) - } else if s.uploadCompacted { + } + + if s.uploadCompacted { s.metrics.uploadedCompacted.Set(1) } return uploaded, nil @@ -378,15 +390,12 @@ func (s *Shipper) upload(ctx context.Context, meta *metadata.Meta) error { return block.Upload(ctx, s.logger, s.bucket, updir) } -// iterBlockMetas calls f with the block meta for each block found in dir -// sorted by minTime asc. It logs an error and continues if it cannot access a -// meta.json file. -// If f returns an error, the function returns with the same error. -func (s *Shipper) iterBlockMetas(f func(m *metadata.Meta) error) error { - var metas []*metadata.Meta +// blockMetasFromOldest returns the block meta of each block found in dir +// sorted by minTime asc. +func (s *Shipper) blockMetasFromOldest() (metas []*metadata.Meta, _ error) { fis, err := ioutil.ReadDir(s.dir) if err != nil { - return errors.Wrap(err, "read dir") + return nil, errors.Wrap(err, "read dir") } names := make([]string, 0, len(fis)) for _, fi := range fis { @@ -400,29 +409,21 @@ func (s *Shipper) iterBlockMetas(f func(m *metadata.Meta) error) error { fi, err := os.Stat(dir) if err != nil { - level.Warn(s.logger).Log("msg", "open file failed", "err", err) - continue + return nil, errors.Wrapf(err, "stat block %v", dir) } if !fi.IsDir() { continue } m, err := metadata.Read(dir) if err != nil { - level.Warn(s.logger).Log("msg", "reading meta file failed", "err", err) - continue + return nil, errors.Wrapf(err, "read metadata for block %v", dir) } metas = append(metas, m) } sort.Slice(metas, func(i, j int) bool { return metas[i].BlockMeta.MinTime < metas[j].BlockMeta.MinTime }) - for _, m := range metas { - - if err := f(m); err != nil { - return err - } - } - return nil + return metas, nil } func hardlinkBlock(src, dst string) error { diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index b307b16532..00df38104d 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -38,7 +38,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) { }() extLset := labels.FromStrings("prometheus", "prom-1") - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource) + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, false) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -199,7 +199,7 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) { defer upcancel2() testutil.Ok(t, p.WaitPrometheusUp(upctx2)) - shipper := NewWithCompacted(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource) + shipper := NewWithCompacted(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, false) // Create 10 new blocks. 9 of them (non compacted) should be actually uploaded. var ( diff --git a/pkg/shipper/shipper_test.go b/pkg/shipper/shipper_test.go index fd9b75706f..32cdeedf3b 100644 --- a/pkg/shipper/shipper_test.go +++ b/pkg/shipper/shipper_test.go @@ -26,7 +26,7 @@ func TestShipperTimestamps(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - s := New(nil, nil, dir, nil, nil, metadata.TestSource) + s := New(nil, nil, dir, nil, nil, metadata.TestSource, false) // Missing thanos meta file. _, _, err = s.Timestamps() @@ -84,7 +84,6 @@ func TestShipperTimestamps(t *testing.T) { } func TestIterBlockMetas(t *testing.T) { - var metas []*metadata.Meta dir, err := ioutil.TempDir("", "shipper-test") testutil.Ok(t, err) defer func() { @@ -124,13 +123,9 @@ func TestIterBlockMetas(t *testing.T) { }, })) - shipper := New(nil, nil, dir, nil, nil, metadata.TestSource) - if err := shipper.iterBlockMetas(func(m *metadata.Meta) error { - metas = append(metas, m) - return nil - }); err != nil { - testutil.Ok(t, err) - } + shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false) + metas, err := shipper.blockMetasFromOldest() + testutil.Ok(t, err) testutil.Equals(t, sort.SliceIsSorted(metas, func(i, j int) bool { return metas[i].BlockMeta.MinTime < metas[j].BlockMeta.MinTime }), true) @@ -167,11 +162,8 @@ func BenchmarkIterBlockMetas(b *testing.B) { }) b.ResetTimer() - shipper := New(nil, nil, dir, nil, nil, metadata.TestSource) - if err := shipper.iterBlockMetas(func(m *metadata.Meta) error { - metas = append(metas, m) - return nil - }); err != nil { - testutil.Ok(b, err) - } + shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false) + + _, err = shipper.blockMetasFromOldest() + testutil.Ok(b, err) }