Skip to content

Commit

Permalink
cleanup shipper NewWithCompacted function
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <yb532204897@gmail.com>
  • Loading branch information
yeya24 committed Jul 26, 2020
1 parent 9b2343f commit 9946558
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 49 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func runRule(
}
}()

s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, allowOutOfOrderUpload)
s := shipper.New(logger, reg, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource, false, allowOutOfOrderUpload)

ctx, cancel := context.WithCancel(context.Background())

Expand Down
8 changes: 2 additions & 6 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,8 @@ func runSidecar(
return errors.Wrapf(err, "aborting as no external labels found after waiting %s", promReadyTimeout)
}

var s *shipper.Shipper
if conf.shipper.uploadCompacted {
s = shipper.NewWithCompacted(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, conf.shipper.allowOutOfOrderUpload)
} else {
s = shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource, conf.shipper.allowOutOfOrderUpload)
}
s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
conf.shipper.uploadCompacted, conf.shipper.allowOutOfOrderUpload)

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if uploaded, err := s.Sync(ctx); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions docs/components/sidecar.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Prometheus servers connected to the Thanos cluster via the sidecar are subject t
If you choose to use the sidecar to also upload data to object storage:

* Must specify object storage (`--objstore.*` flags)
* It only uploads uncompacted Prometheus blocks. For compacted blocks, see [Upload compacted blocks](./sidecar.md/#upload-compacted-blocks-experimental).
* It only uploads uncompacted Prometheus blocks. For compacted blocks, see [Upload compacted blocks](./sidecar.md/#upload-compacted-blocks).
* The `--storage.tsdb.min-block-duration` and `--storage.tsdb.max-block-duration` must be set to equal values to disable local compaction on order to use Thanos sidecar upload, otherwise leave local compaction on if sidecar just exposes StoreAPI and your retention is normal. The default of `2h` is recommended.
Mentioned parameters set to equal values disable the internal Prometheus compaction, which is needed to avoid the uploaded data corruption when Thanos compactor does its job, this is critical for data consistency and should not be ignored if you plan to use Thanos compactor. Even though you set mentioned parameters equal, you might observe Prometheus internal metric `prometheus_tsdb_compactions_total` being incremented, don't be confused by that: Prometheus writes initial head block to filesytem via its internal compaction mechanism, but if you have followed recommendations - data won't be modified by Prometheus before the sidecar uploads it. Thanos sidecar will also check sanity of the flags set to Prometheus on the startup and log errors or warning if they have been configured improperly (#838).
* The retention is recommended to not be lower than three times the min block duration, so 6 hours. This achieves resilience in the face of connectivity issues to the object storage since all local data will remain available within the Thanos cluster. If connectivity gets restored the backlog of blocks gets uploaded to the object storage.
Expand Down Expand Up @@ -70,7 +70,7 @@ config:
bucket: example-bucket
```
## Upload compacted blocks (EXPERIMENTAL)
## Upload compacted blocks
If you want to migrate from a pure Prometheus setup to Thanos and have to keep the historical data, you can use the flag `--shipper.upload-compacted`. This will also upload blocks that were compacted by Prometheus. Values greater than 1 in the `compaction.level` field of a Prometheus block’s `meta.json` file indicate level of compaction.

Expand Down
1 change: 1 addition & 0 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
t.bucket,
func() labels.Labels { return lbls },
metadata.ReceiveSource,
false,
t.allowOutOfOrderUpload,
)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/reloader/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (r *Reloader) apply(ctx context.Context) error {
return err
}

// filepath.Walk uses Lstat to retriev os.FileInfo. Lstat does not
// filepath.Walk uses Lstat to retrieve os.FileInfo. Lstat does not
// follow symlinks. Make sure to follow a symlink before checking
// if it is a directory.
targetFile, err := os.Stat(path)
Expand Down
39 changes: 5 additions & 34 deletions pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,17 @@ type Shipper struct {
allowOutOfOrderUploads bool
}

// New creates a new shipper that detects new TSDB blocks in dir and uploads them
// to remote if necessary. It attaches the Thanos metadata section in each meta JSON file.
// New creates a new shipper that detects new TSDB blocks in dir and uploads them to
// remote if necessary. It attaches the Thanos metadata section in each meta JSON file.
// If uploadCompacted is enabled, it also uploads compacted blocks which are already in filesystem.
func New(
logger log.Logger,
r prometheus.Registerer,
dir string,
bucket objstore.Bucket,
lbls func() labels.Labels,
source metadata.SourceType,
uploadCompacted bool,
allowOutOfOrderUploads bool,
) *Shipper {
if logger == nil {
Expand All @@ -106,43 +108,12 @@ func New(
dir: dir,
bucket: bucket,
labels: lbls,
metrics: newMetrics(r, false),
metrics: newMetrics(r, uploadCompacted),
source: source,
allowOutOfOrderUploads: allowOutOfOrderUploads,
}
}

// NewWithCompacted creates a new shipper that detects new TSDB blocks in dir and uploads them
// to remote if necessary, including compacted blocks which are already in filesystem.
// It attaches the Thanos metadata section in each meta JSON file.
func NewWithCompacted(
logger log.Logger,
r prometheus.Registerer,
dir string,
bucket objstore.Bucket,
lbls func() labels.Labels,
source metadata.SourceType,
allowOutOfOrderUploads bool,
) *Shipper {
if logger == nil {
logger = log.NewNopLogger()
}
if lbls == nil {
lbls = func() labels.Labels { return nil }
}

return &Shipper{
logger: logger,
dir: dir,
bucket: bucket,
labels: lbls,
metrics: newMetrics(r, true),
source: source,
uploadCompacted: true,
allowOutOfOrderUploads: allowOutOfOrderUploads,
}
}

// Timestamps returns the minimum timestamp for which data is available and the highest timestamp
// of blocks that were successfully uploaded.
func (s *Shipper) Timestamps() (minTime, maxSyncTime int64, err error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/shipper/shipper_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) {
}()

extLset := labels.FromStrings("prometheus", "prom-1")
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false)
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, metricsBucket, func() labels.Labels { return extLset }, metadata.TestSource, false, false)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) {
defer upcancel2()
testutil.Ok(t, p.WaitPrometheusUp(upctx2))

shipper := NewWithCompacted(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, false)
shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, true, false)

// Create 10 new blocks. 9 of them (non compacted) should be actually uploaded.
var (
Expand Down
6 changes: 3 additions & 3 deletions pkg/shipper/shipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestShipperTimestamps(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir))
}()

s := New(nil, nil, dir, nil, nil, metadata.TestSource, false)
s := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false)

// Missing thanos meta file.
_, _, err = s.Timestamps()
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestIterBlockMetas(t *testing.T) {
},
}))

shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false)
shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false)
metas, err := shipper.blockMetasFromOldest()
testutil.Ok(t, err)
testutil.Equals(t, sort.SliceIsSorted(metas, func(i, j int) bool {
Expand Down Expand Up @@ -162,7 +162,7 @@ func BenchmarkIterBlockMetas(b *testing.B) {
})
b.ResetTimer()

shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false)
shipper := New(nil, nil, dir, nil, nil, metadata.TestSource, false, false)

_, err = shipper.blockMetasFromOldest()
testutil.Ok(b, err)
Expand Down

0 comments on commit 9946558

Please sign in to comment.