From 2a54885906af1ea87819b1ea45474dd6148af470 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 29 Jul 2020 11:12:58 -0400 Subject: [PATCH] cleanup shipper NewWithCompacted function (#2940) Signed-off-by: Ben Ye Signed-off-by: soniasingla --- cmd/thanos/rule.go | 2 +- cmd/thanos/sidecar.go | 8 ++----- docs/components/sidecar.md | 4 ++-- pkg/receive/multitsdb.go | 1 + pkg/reloader/reloader.go | 2 +- pkg/shipper/shipper.go | 40 +++++---------------------------- pkg/shipper/shipper_e2e_test.go | 4 ++-- pkg/shipper/shipper_test.go | 6 ++--- 8 files changed, 18 insertions(+), 49 deletions(-) diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 6ee488411dd..4c9bc39303a 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -626,7 +626,7 @@ func runRule( } }() - s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, allowOutOfOrderUpload) + s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, false, allowOutOfOrderUpload) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index ec11ca47447..35bc41c5ca3 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -273,12 +273,8 @@ func runSidecar( return errors.Wrapf(err, "aborting as no external labels found after waiting %s", promReadyTimeout) } - var s *shipper.Shipper - if conf.shipper.uploadCompacted { - s = shipper.NewWithCompacted(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, conf.shipper.allowOutOfOrderUpload) - } else { - s = shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, conf.shipper.allowOutOfOrderUpload) - } + s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, + conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload) return runutil.Repeat(30*time.Second, ctx.Done(), func() error { if uploaded, err := s.Sync(ctx); err != nil { diff --git a/docs/components/sidecar.md b/docs/components/sidecar.md index 31616d5f692..bf684759547 100644 --- a/docs/components/sidecar.md +++ b/docs/components/sidecar.md @@ -32,7 +32,7 @@ Prometheus servers connected to the Thanos cluster via the sidecar are subject t If you choose to use the sidecar to also upload data to object storage: * Must specify object storage (`--objstore.*` flags) -* It only uploads uncompacted Prometheus blocks. For compacted blocks, see [Upload compacted blocks](./sidecar.md/#upload-compacted-blocks-experimental). +* It only uploads uncompacted Prometheus blocks. For compacted blocks, see [Upload compacted blocks](./sidecar.md/#upload-compacted-blocks). * The `--storage.tsdb.min-block-duration` and `--storage.tsdb.max-block-duration` must be set to equal values to disable local compaction on order to use Thanos sidecar upload, otherwise leave local compaction on if sidecar just exposes StoreAPI and your retention is normal. The default of `2h` is recommended. Mentioned parameters set to equal values disable the internal Prometheus compaction, which is needed to avoid the uploaded data corruption when Thanos compactor does its job, this is critical for data consistency and should not be ignored if you plan to use Thanos compactor. Even though you set mentioned parameters equal, you might observe Prometheus internal metric `prometheus_tsdb_compactions_total` being incremented, don't be confused by that: Prometheus writes initial head block to filesytem via its internal compaction mechanism, but if you have followed recommendations - data won't be modified by Prometheus before the sidecar uploads it. Thanos sidecar will also check sanity of the flags set to Prometheus on the startup and log errors or warning if they have been configured improperly (#838). * The retention is recommended to not be lower than three times the min block duration, so 6 hours. This achieves resilience in the face of connectivity issues to the object storage since all local data will remain available within the Thanos cluster. If connectivity gets restored the backlog of blocks gets uploaded to the object storage. @@ -70,7 +70,7 @@ config: bucket: example-bucket ``` -## Upload compacted blocks (EXPERIMENTAL) +## Upload compacted blocks If you want to migrate from a pure Prometheus setup to Thanos and have to keep the historical data, you can use the flag `--shipper.upload-compacted`. This will also upload blocks that were compacted by Prometheus. Values greater than 1 in the `compaction.level` field of a Prometheus block’s `meta.json` file indicate level of compaction. diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 889544f5411..017224d566b 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -282,6 +282,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant t.bucket, func() labels.Labels { return lbls }, metadata.ReceiveSource, + false, t.allowOutOfOrderUpload, ) } diff --git a/pkg/reloader/reloader.go b/pkg/reloader/reloader.go index b3b43f98853..6f406ab9a24 100644 --- a/pkg/reloader/reloader.go +++ b/pkg/reloader/reloader.go @@ -288,7 +288,7 @@ func (r *Reloader) apply(ctx context.Context) error { return err } - // filepath.Walk uses Lstat to retriev os.FileInfo. Lstat does not + // filepath.Walk uses Lstat to retrieve os.FileInfo. Lstat does not // follow symlinks. Make sure to follow a symlink before checking // if it is a directory. targetFile, err := os.Stat(path) diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 5c961f4459f..30496e43c04 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -83,8 +83,9 @@ type Shipper struct { allowOutOfOrderUploads bool } -// New creates a new shipper that detects new TSDB blocks in dir and uploads them -// to remote if necessary. It attaches the Thanos metadata section in each meta JSON file. +// New creates a new shipper that detects new TSDB blocks in dir and uploads them to +// remote if necessary. It attaches the Thanos metadata section in each meta JSON file. +// If uploadCompacted is enabled, it also uploads compacted blocks which are already in filesystem. func New( logger log.Logger, r prometheus.Registerer, @@ -92,6 +93,7 @@ func New( bucket objstore.Bucket, lbls func() labels.Labels, source metadata.SourceType, + uploadCompacted bool, allowOutOfOrderUploads bool, ) *Shipper { if logger == nil { @@ -106,40 +108,10 @@ func New( dir: dir, bucket: bucket, labels: lbls, - metrics: newMetrics(r, false), + metrics: newMetrics(r, uploadCompacted), source: source, allowOutOfOrderUploads: allowOutOfOrderUploads, - } -} - -// NewWithCompacted creates a new shipper that detects new TSDB blocks in dir and uploads them -// to remote if necessary, including compacted blocks which are already in filesystem. -// It attaches the Thanos metadata section in each meta JSON file. -func NewWithCompacted( - logger log.Logger, - r prometheus.Registerer, - dir string, - bucket objstore.Bucket, - lbls func() labels.Labels, - source metadata.SourceType, - allowOutOfOrderUploads bool, -) *Shipper { - if logger == nil { - logger = log.NewNopLogger() - } - if lbls == nil { - lbls = func() labels.Labels { return nil } - } - - return &Shipper{ - logger: logger, - dir: dir, - bucket: bucket, - labels: lbls, - metrics: newMetrics(r, true), - source: source, - uploadCompacted: true, - allowOutOfOrderUploads: allowOutOfOrderUploads, + uploadCompacted: uploadCompacted, } } diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index 9fac4ca00d9..e0383e4432d 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -45,7 +45,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) { }() extLset := labels.FromStrings("prometheus", "prom-1") - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false) + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false, false) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -219,7 +219,7 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) { defer upcancel2() testutil.Ok(t, p.WaitPrometheusUp(upctx2)) - shipper := NewWithCompacted(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, false) + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, false) // Create 10 new blocks. 9 of them (non compacted) should be actually uploaded. var ( diff --git a/pkg/shipper/shipper_test.go b/pkg/shipper/shipper_test.go index 32cdeedf3b7..59c564df3c4 100644 --- a/pkg/shipper/shipper_test.go +++ b/pkg/shipper/shipper_test.go @@ -26,7 +26,7 @@ func TestShipperTimestamps(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - s := New(nil, nil, dir, nil, nil, metadata.TestSource, false) + s := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false) // Missing thanos meta file. _, _, err = s.Timestamps() @@ -123,7 +123,7 @@ func TestIterBlockMetas(t *testing.T) { }, })) - shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false) + shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false) metas, err := shipper.blockMetasFromOldest() testutil.Ok(t, err) testutil.Equals(t, sort.SliceIsSorted(metas, func(i, j int) bool { @@ -162,7 +162,7 @@ func BenchmarkIterBlockMetas(b *testing.B) { }) b.ResetTimer() - shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false) + shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false) _, err = shipper.blockMetasFromOldest() testutil.Ok(b, err)