diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 4c9bc39303a..6ee488411dd 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -626,7 +626,7 @@ func runRule( } }() - s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, false, allowOutOfOrderUpload) + s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, allowOutOfOrderUpload) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 35bc41c5ca3..ec11ca47447 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -273,8 +273,12 @@ func runSidecar( return errors.Wrapf(err, "aborting as no external labels found after waiting %s", promReadyTimeout) } - s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, - conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload) + var s *shipper.Shipper + if conf.shipper.uploadCompacted { + s = shipper.NewWithCompacted(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, conf.shipper.allowOutOfOrderUpload) + } else { + s = shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, conf.shipper.allowOutOfOrderUpload) + } return runutil.Repeat(30*time.Second, ctx.Done(), func() error { if uploaded, err := s.Sync(ctx); err != nil { diff --git a/docs/components/sidecar.md b/docs/components/sidecar.md index bf684759547..31616d5f692 100644 --- a/docs/components/sidecar.md +++ b/docs/components/sidecar.md @@ -32,7 +32,7 @@ Prometheus servers connected to the Thanos cluster via the sidecar are subject t If you choose to use the sidecar to also upload data to object storage: * Must specify object storage (`--objstore.*` flags) -* It only uploads uncompacted Prometheus blocks. For compacted blocks, see [Upload compacted blocks](./sidecar.md/#upload-compacted-blocks). +* It only uploads uncompacted Prometheus blocks. For compacted blocks, see [Upload compacted blocks](./sidecar.md/#upload-compacted-blocks-experimental). * The `--storage.tsdb.min-block-duration` and `--storage.tsdb.max-block-duration` must be set to equal values to disable local compaction on order to use Thanos sidecar upload, otherwise leave local compaction on if sidecar just exposes StoreAPI and your retention is normal. The default of `2h` is recommended. Mentioned parameters set to equal values disable the internal Prometheus compaction, which is needed to avoid the uploaded data corruption when Thanos compactor does its job, this is critical for data consistency and should not be ignored if you plan to use Thanos compactor. Even though you set mentioned parameters equal, you might observe Prometheus internal metric `prometheus_tsdb_compactions_total` being incremented, don't be confused by that: Prometheus writes initial head block to filesytem via its internal compaction mechanism, but if you have followed recommendations - data won't be modified by Prometheus before the sidecar uploads it. Thanos sidecar will also check sanity of the flags set to Prometheus on the startup and log errors or warning if they have been configured improperly (#838). * The retention is recommended to not be lower than three times the min block duration, so 6 hours. This achieves resilience in the face of connectivity issues to the object storage since all local data will remain available within the Thanos cluster. If connectivity gets restored the backlog of blocks gets uploaded to the object storage. @@ -70,7 +70,7 @@ config: bucket: example-bucket ``` -## Upload compacted blocks +## Upload compacted blocks (EXPERIMENTAL) If you want to migrate from a pure Prometheus setup to Thanos and have to keep the historical data, you can use the flag `--shipper.upload-compacted`. This will also upload blocks that were compacted by Prometheus. Values greater than 1 in the `compaction.level` field of a Prometheus block’s `meta.json` file indicate level of compaction. diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 017224d566b..889544f5411 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -282,7 +282,6 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant t.bucket, func() labels.Labels { return lbls }, metadata.ReceiveSource, - false, t.allowOutOfOrderUpload, ) } diff --git a/pkg/reloader/reloader.go b/pkg/reloader/reloader.go index 6f406ab9a24..b3b43f98853 100644 --- a/pkg/reloader/reloader.go +++ b/pkg/reloader/reloader.go @@ -288,7 +288,7 @@ func (r *Reloader) apply(ctx context.Context) error { return err } - // filepath.Walk uses Lstat to retrieve os.FileInfo. Lstat does not + // filepath.Walk uses Lstat to retriev os.FileInfo. Lstat does not // follow symlinks. Make sure to follow a symlink before checking // if it is a directory. targetFile, err := os.Stat(path) diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 30496e43c04..5c961f4459f 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -83,9 +83,8 @@ type Shipper struct { allowOutOfOrderUploads bool } -// New creates a new shipper that detects new TSDB blocks in dir and uploads them to -// remote if necessary. It attaches the Thanos metadata section in each meta JSON file. -// If uploadCompacted is enabled, it also uploads compacted blocks which are already in filesystem. +// New creates a new shipper that detects new TSDB blocks in dir and uploads them +// to remote if necessary. It attaches the Thanos metadata section in each meta JSON file. func New( logger log.Logger, r prometheus.Registerer, @@ -93,7 +92,6 @@ func New( bucket objstore.Bucket, lbls func() labels.Labels, source metadata.SourceType, - uploadCompacted bool, allowOutOfOrderUploads bool, ) *Shipper { if logger == nil { @@ -108,10 +106,40 @@ func New( dir: dir, bucket: bucket, labels: lbls, - metrics: newMetrics(r, uploadCompacted), + metrics: newMetrics(r, false), source: source, allowOutOfOrderUploads: allowOutOfOrderUploads, - uploadCompacted: uploadCompacted, + } +} + +// NewWithCompacted creates a new shipper that detects new TSDB blocks in dir and uploads them +// to remote if necessary, including compacted blocks which are already in filesystem. +// It attaches the Thanos metadata section in each meta JSON file. +func NewWithCompacted( + logger log.Logger, + r prometheus.Registerer, + dir string, + bucket objstore.Bucket, + lbls func() labels.Labels, + source metadata.SourceType, + allowOutOfOrderUploads bool, +) *Shipper { + if logger == nil { + logger = log.NewNopLogger() + } + if lbls == nil { + lbls = func() labels.Labels { return nil } + } + + return &Shipper{ + logger: logger, + dir: dir, + bucket: bucket, + labels: lbls, + metrics: newMetrics(r, true), + source: source, + uploadCompacted: true, + allowOutOfOrderUploads: allowOutOfOrderUploads, } } diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index e0383e4432d..9fac4ca00d9 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -45,7 +45,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) { }() extLset := labels.FromStrings("prometheus", "prom-1") - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false, false) + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -219,7 +219,7 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) { defer upcancel2() testutil.Ok(t, p.WaitPrometheusUp(upctx2)) - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, false) + shipper := NewWithCompacted(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, false) // Create 10 new blocks. 9 of them (non compacted) should be actually uploaded. var ( diff --git a/pkg/shipper/shipper_test.go b/pkg/shipper/shipper_test.go index 59c564df3c4..32cdeedf3b7 100644 --- a/pkg/shipper/shipper_test.go +++ b/pkg/shipper/shipper_test.go @@ -26,7 +26,7 @@ func TestShipperTimestamps(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - s := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false) + s := New(nil, nil, dir, nil, nil, metadata.TestSource, false) // Missing thanos meta file. _, _, err = s.Timestamps() @@ -123,7 +123,7 @@ func TestIterBlockMetas(t *testing.T) { }, })) - shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false) + shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false) metas, err := shipper.blockMetasFromOldest() testutil.Ok(t, err) testutil.Equals(t, sort.SliceIsSorted(metas, func(i, j int) bool { @@ -162,7 +162,7 @@ func BenchmarkIterBlockMetas(b *testing.B) { }) b.ResetTimer() - shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false) + shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false) _, err = shipper.blockMetasFromOldest() testutil.Ok(b, err)