Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup shipper NewWithCompacted function #2940

Merged
merged 1 commit into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func runRule(
}
}()

s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, allowOutOfOrderUpload)
s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, false, allowOutOfOrderUpload)

ctx, cancel := context.WithCancel(context.Background())

Expand Down
8 changes: 2 additions & 6 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,8 @@ func runSidecar(
return errors.Wrapf(err, "aborting as no external labels found after waiting %s", promReadyTimeout)
}

var s *shipper.Shipper
if conf.shipper.uploadCompacted {
s = shipper.NewWithCompacted(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, conf.shipper.allowOutOfOrderUpload)
} else {
s = shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, conf.shipper.allowOutOfOrderUpload)
}
s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload)

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions docs/components/sidecar.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Prometheus servers connected to the Thanos cluster via the sidecar are subject t
If you choose to use the sidecar to also upload data to object storage:

* Must specify object storage (`--objstore.*` flags)
* It only uploads uncompacted Prometheus blocks. For compacted blocks, see [Upload compacted blocks](./sidecar.md/#upload-compacted-blocks-experimental).
* It only uploads uncompacted Prometheus blocks. For compacted blocks, see [Upload compacted blocks](./sidecar.md/#upload-compacted-blocks).
* The `--storage.tsdb.min-block-duration` and `--storage.tsdb.max-block-duration` must be set to equal values to disable local compaction on order to use Thanos sidecar upload, otherwise leave local compaction on if sidecar just exposes StoreAPI and your retention is normal. The default of `2h` is recommended.
Mentioned parameters set to equal values disable the internal Prometheus compaction, which is needed to avoid the uploaded data corruption when Thanos compactor does its job, this is critical for data consistency and should not be ignored if you plan to use Thanos compactor. Even though you set mentioned parameters equal, you might observe Prometheus internal metric `prometheus_tsdb_compactions_total` being incremented, don't be confused by that: Prometheus writes initial head block to filesytem via its internal compaction mechanism, but if you have followed recommendations - data won't be modified by Prometheus before the sidecar uploads it. Thanos sidecar will also check sanity of the flags set to Prometheus on the startup and log errors or warning if they have been configured improperly (#838).
* The retention is recommended to not be lower than three times the min block duration, so 6 hours. This achieves resilience in the face of connectivity issues to the object storage since all local data will remain available within the Thanos cluster. If connectivity gets restored the backlog of blocks gets uploaded to the object storage.
Expand Down Expand Up @@ -70,7 +70,7 @@ config:
bucket: example-bucket
```

## Upload compacted blocks (EXPERIMENTAL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it non experimental? (:

Copy link
Contributor Author

@yeya24 yeya24 Jul 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the doc because I saw this pr #1833. I thought this feature is already marked as ready

## Upload compacted blocks

If you want to migrate from a pure Prometheus setup to Thanos and have to keep the historical data, you can use the flag `--shipper.upload-compacted`. This will also upload blocks that were compacted by Prometheus. Values greater than 1 in the `compaction.level` field of a Prometheus block’s `meta.json` file indicate level of compaction.

Expand Down
1 change: 1 addition & 0 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
t.bucket,
func() labels.Labels { return lbls },
metadata.ReceiveSource,
false,
t.allowOutOfOrderUpload,
)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/reloader/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (r *Reloader) apply(ctx context.Context) error {
return err
}

// filepath.Walk uses Lstat to retriev os.FileInfo. Lstat does not
// filepath.Walk uses Lstat to retrieve os.FileInfo. Lstat does not
// follow symlinks. Make sure to follow a symlink before checking
// if it is a directory.
targetFile, err := os.Stat(path)
Expand Down
40 changes: 6 additions & 34 deletions pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,17 @@ type Shipper struct {
allowOutOfOrderUploads bool
}

// New creates a new shipper that detects new TSDB blocks in dir and uploads them
// to remote if necessary. It attaches the Thanos metadata section in each meta JSON file.
// New creates a new shipper that detects new TSDB blocks in dir and uploads them to
// remote if necessary. It attaches the Thanos metadata section in each meta JSON file.
// If uploadCompacted is enabled, it also uploads compacted blocks which are already in filesystem.
func New(
logger log.Logger,
r prometheus.Registerer,
dir string,
bucket objstore.Bucket,
lbls func() labels.Labels,
source metadata.SourceType,
uploadCompacted bool,
allowOutOfOrderUploads bool,
) *Shipper {
if logger == nil {
Expand All @@ -106,40 +108,10 @@ func New(
dir: dir,
bucket: bucket,
labels: lbls,
metrics: newMetrics(r, false),
metrics: newMetrics(r, uploadCompacted),
source: source,
allowOutOfOrderUploads: allowOutOfOrderUploads,
}
}

// NewWithCompacted creates a new shipper that detects new TSDB blocks in dir and uploads them
// to remote if necessary, including compacted blocks which are already in filesystem.
// It attaches the Thanos metadata section in each meta JSON file.
func NewWithCompacted(
logger log.Logger,
r prometheus.Registerer,
dir string,
bucket objstore.Bucket,
lbls func() labels.Labels,
source metadata.SourceType,
allowOutOfOrderUploads bool,
) *Shipper {
if logger == nil {
logger = log.NewNopLogger()
}
if lbls == nil {
lbls = func() labels.Labels { return nil }
}

return &Shipper{
logger: logger,
dir: dir,
bucket: bucket,
labels: lbls,
metrics: newMetrics(r, true),
source: source,
uploadCompacted: true,
allowOutOfOrderUploads: allowOutOfOrderUploads,
uploadCompacted: uploadCompacted,
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/shipper/shipper_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) {
}()

extLset := labels.FromStrings("prometheus", "prom-1")
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false)
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false, false)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) {
defer upcancel2()
testutil.Ok(t, p.WaitPrometheusUp(upctx2))

shipper := NewWithCompacted(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, false)
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, false)

// Create 10 new blocks. 9 of them (non compacted) should be actually uploaded.
var (
Expand Down
6 changes: 3 additions & 3 deletions pkg/shipper/shipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestShipperTimestamps(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir))
}()

s := New(nil, nil, dir, nil, nil, metadata.TestSource, false)
s := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false)

// Missing thanos meta file.
_, _, err = s.Timestamps()
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestIterBlockMetas(t *testing.T) {
},
}))

shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false)
shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false)
metas, err := shipper.blockMetasFromOldest()
testutil.Ok(t, err)
testutil.Equals(t, sort.SliceIsSorted(metas, func(i, j int) bool {
Expand Down Expand Up @@ -162,7 +162,7 @@ func BenchmarkIterBlockMetas(b *testing.B) {
})
b.ResetTimer()

shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false)
shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false)

_, err = shipper.blockMetasFromOldest()
testutil.Ok(b, err)
Expand Down