Skip to content

Commit

Permalink
Added tools bucket marker flag to mark block for no-downsample (thano…
Browse files Browse the repository at this point in the history
…s-io#5945)

Signed-off-by: Rohit Singh <rohitkochhar@bitgo.com>

Signed-off-by: Rohit Singh <rohitkochhar@bitgo.com>
  • Loading branch information
RohitKochhar authored and rabenhorst committed Dec 19, 2022
1 parent 919da20 commit 67ba676
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Added

- [#5945](https://github.com/thanos-io/thanos/pull/5945) Tools: Added new `no-downsample` marker to skip blocks when downsampling via `thanos tools bucket mark --marker=no-downsample-mark.json`. This will skip downsampling for blocks with the new marker.
- [#5814](https://github.com/thanos-io/thanos/pull/5814) Store: Add metric `thanos_bucket_store_postings_size_bytes` that shows the distribution of how many postings (in bytes) were needed for each Series() call in Thanos Store. Useful for determining limits.
- [#5801](https://github.com/thanos-io/thanos/pull/5801) Store: add a new limiter `--store.grpc.downloaded-bytes-limit` that limits the number of bytes downloaded in each Series/LabelNames/LabelValues call. Use `thanos_bucket_store_postings_size_bytes` for determining the limits.
- [#5839](https://github.com/thanos-io/thanos/pull/5839) Receive: Add parameter `--tsdb.out-of-order.time-window` to set time window for experimental out-of-order samples ingestion. Disabled by default (set to 0s). Please note if you enable this option and you use compactor, make sure you set the `--enable-vertical-compaction` flag, otherwise you might risk compactor halt.
Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,10 @@ func RunDownsample(
return err
}

// While fetching blocks, filter out blocks that were marked for no downsample.
metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewDeduplicateFilter(block.FetcherConcurrency),
downsample.NewGatherNoDownsampleMarkFilter(logger, bkt),
})
if err != nil {
return errors.Wrap(err, "create meta fetcher")
Expand Down
6 changes: 5 additions & 1 deletion cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (tbc *bucketDownsampleConfig) registerBucketDownsampleFlag(cmd extkingpin.F

func (tbc *bucketMarkBlockConfig) registerBucketMarkBlockFlag(cmd extkingpin.FlagClause) *bucketMarkBlockConfig {
cmd.Flag("id", "ID (ULID) of the blocks to be marked for deletion (repeated flag)").Required().StringsVar(&tbc.blockIDs)
cmd.Flag("marker", "Marker to be put.").Required().EnumVar(&tbc.marker, metadata.DeletionMarkFilename, metadata.NoCompactMarkFilename)
cmd.Flag("marker", "Marker to be put.").Required().EnumVar(&tbc.marker, metadata.DeletionMarkFilename, metadata.NoCompactMarkFilename, metadata.NoDownsampleMarkFilename)
cmd.Flag("details", "Human readable details to be put into marker.").Required().StringVar(&tbc.details)

return tbc
Expand Down Expand Up @@ -1059,6 +1059,10 @@ func registerBucketMarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.P
if err := block.MarkForNoCompact(ctx, logger, bkt, id, metadata.ManualNoCompactReason, tbc.details, promauto.With(nil).NewCounter(prometheus.CounterOpts{})); err != nil {
return errors.Wrapf(err, "mark %v for %v", id, tbc.marker)
}
case metadata.NoDownsampleMarkFilename:
if err := block.MarkForNoDownsample(ctx, logger, bkt, id, metadata.ManualNoDownsampleReason, tbc.details, promauto.With(nil).NewCounter(prometheus.CounterOpts{})); err != nil {
return errors.Wrapf(err, "mark %v for %v", id, tbc.marker)
}
default:
return errors.Errorf("not supported marker %v", tbc.marker)
}
Expand Down
31 changes: 31 additions & 0 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,3 +402,34 @@ func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucke
level.Info(logger).Log("msg", "block has been marked for no compaction", "block", id)
return nil
}

// MarkForNoDownsample creates a file which marks block to be not downsampled.
func MarkForNoDownsample(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, reason metadata.NoDownsampleReason, details string, markedForNoDownsample prometheus.Counter) error {
m := path.Join(id.String(), metadata.NoDownsampleMarkFilename)
noDownsampleMarkExists, err := bkt.Exists(ctx, m)
if err != nil {
return errors.Wrapf(err, "check exists %s in bucket", m)
}
if noDownsampleMarkExists {
level.Warn(logger).Log("msg", "requested to mark for no deletion, but file already exists; this should not happen; investigate", "err", errors.Errorf("file %s already exists in bucket", m))
return nil
}
noDownsampleMark, err := json.Marshal(metadata.NoDownsampleMark{
ID: id,
Version: metadata.NoDownsampleMarkVersion1,

NoDownsampleTime: time.Now().Unix(),
Reason: reason,
Details: details,
})
if err != nil {
return errors.Wrap(err, "json encode no downsample mark")
}

if err := bkt.Upload(ctx, m, bytes.NewBuffer(noDownsampleMark)); err != nil {
return errors.Wrapf(err, "upload file %s to bucket", m)
}
markedForNoDownsample.Inc()
level.Info(logger).Log("msg", "block has been marked for no downsample", "block", id)
return nil
}
55 changes: 55 additions & 0 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,61 @@ func TestMarkForNoCompact(t *testing.T) {
}
}

func TestMarkForNoDownsample(t *testing.T) {

defer testutil.TolerantVerifyLeak(t)
ctx := context.Background()

tmpDir := t.TempDir()

for _, tcase := range []struct {
name string
preUpload func(t testing.TB, id ulid.ULID, bkt objstore.Bucket)

blocksMarked int
}{
{
name: "block marked",
preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {},
blocksMarked: 1,
},
{
name: "block with no-downsample mark already, expected log and no metric increment",
preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {
m, err := json.Marshal(metadata.NoDownsampleMark{
ID: id,
NoDownsampleTime: time.Now().Unix(),
Version: metadata.NoDownsampleMarkVersion1,
})
testutil.Ok(t, err)
testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.NoDownsampleMarkFilename), bytes.NewReader(m)))
},
blocksMarked: 0,
},
} {
t.Run(tcase.name, func(t *testing.T) {
bkt := objstore.NewInMemBucket()
id, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124, metadata.NoneFunc)
testutil.Ok(t, err)

tcase.preUpload(t, id, bkt)

testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, id.String()), metadata.NoneFunc))

c := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
err = MarkForNoDownsample(ctx, log.NewNopLogger(), bkt, id, metadata.ManualNoDownsampleReason, "", c)
testutil.Ok(t, err)
testutil.Equals(t, float64(tcase.blocksMarked), promtest.ToFloat64(c))
})
}
}

// TestHashDownload uploads an empty block to in-memory storage
// and tries to download it to the same dir. It should not try
// to download twice.
Expand Down
3 changes: 3 additions & 0 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ const (
// MarkedForNoCompactionMeta is label for blocks which are loaded but also marked for no compaction. This label is also counted in `loaded` label metric.
MarkedForNoCompactionMeta = "marked-for-no-compact"

// MarkedForNoDownsampleMeta is label for blocks which are loaded but also marked for no downsample. This label is also counted in `loaded` label metric.
MarkedForNoDownsampleMeta = "marked-for-no-downsample"

// Modified label values.
replicaRemovedMeta = "replica-label-removed"
)
Expand Down
31 changes: 30 additions & 1 deletion pkg/block/metadata/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@ const (
// NoCompactMarkFilename is the known json filename for optional file storing details about why block has to be excluded from compaction.
// If such file is present in block dir, it means the block has to excluded from compaction (both vertical and horizontal) or rewrite (e.g deletions).
NoCompactMarkFilename = "no-compact-mark.json"

// NoDownsampleMarkFilename is the known json filenanme for optional file storing details about why block has to be excluded from downsampling.
// If such file is present in block dir, it means the block has to be excluded from downsampling.
NoDownsampleMarkFilename = "no-downsample-mark.json"
// DeletionMarkVersion1 is the version of deletion-mark file supported by Thanos.
DeletionMarkVersion1 = 1
// NoCompactMarkVersion1 is the version of no-compact-mark file supported by Thanos.
NoCompactMarkVersion1 = 1
// NoDownsampleVersion1 is the version of no-downsample-mark file supported by Thanos.
NoDownsampleMarkVersion1 = 1
)

var (
Expand Down Expand Up @@ -62,9 +66,14 @@ func (m *DeletionMark) markerFilename() string { return DeletionMarkFilename }
// NoCompactReason is a reason for a block to be excluded from compaction.
type NoCompactReason string

// NoDownsampleReason is a reason for a block to be excluded from downsample.
type NoDownsampleReason string

const (
// ManualNoCompactReason is a custom reason of excluding from compaction that should be added when no-compact mark is added for unknown/user specified reason.
ManualNoCompactReason NoCompactReason = "manual"
// ManualNoDownsampleReason is a custom reason of excluding from downsample that should be added when no-downsample mark is added for unknown/user specified reason.
ManualNoDownsampleReason NoDownsampleReason = "manual"
// IndexSizeExceedingNoCompactReason is a reason of index being too big (for example exceeding 64GB limit: https://github.com/thanos-io/thanos/issues/1424)
// This reason can be ignored when vertical block sharding will be implemented.
IndexSizeExceedingNoCompactReason = "index-size-exceeding"
Expand All @@ -88,6 +97,22 @@ type NoCompactMark struct {

func (n *NoCompactMark) markerFilename() string { return NoCompactMarkFilename }

// NoDownsampleMark marker stores reason of block being excluded from downsample if needed.
type NoDownsampleMark struct {
// ID of the tsdb block.
ID ulid.ULID `json:"id"`
// Version of the file.
Version int `json:"version"`
// Details is a human readable string giving details of reason.
Details string `json:"details,omitempty"`

// NoDownsampleTime is a unix timestamp of when the block was marked for no downsample.
NoDownsampleTime int64 `json:"no_downsample_time"`
Reason NoDownsampleReason `json:"reason"`
}

func (n *NoDownsampleMark) markerFilename() string { return NoDownsampleMarkFilename }

// ReadMarker reads the given mark file from <dir>/<marker filename>.json in bucket.
func ReadMarker(ctx context.Context, logger log.Logger, bkt objstore.InstrumentedBucketReader, dir string, marker Marker) error {
markerFile := path.Join(dir, marker.markerFilename())
Expand All @@ -113,6 +138,10 @@ func ReadMarker(ctx context.Context, logger log.Logger, bkt objstore.Instrumente
if version := marker.(*NoCompactMark).Version; version != NoCompactMarkVersion1 {
return errors.Errorf("unexpected no-compact-mark file version %d, expected %d", version, NoCompactMarkVersion1)
}
case NoDownsampleMarkFilename:
if version := marker.(*NoDownsampleMark).Version; version != NoDownsampleMarkVersion1 {
return errors.Errorf("unexpected no-downsample-mark file version %d, expected %d", version, NoDownsampleMarkVersion1)
}
case DeletionMarkFilename:
if version := marker.(*DeletionMark).Version; version != DeletionMarkVersion1 {
return errors.Errorf("unexpected deletion-mark file version %d, expected %d", version, DeletionMarkVersion1)
Expand Down
108 changes: 108 additions & 0 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
package downsample

import (
"context"
"fmt"
"math"
"math/rand"
"os"
"path/filepath"
"sync"
"time"

"github.com/go-kit/log"
Expand All @@ -23,7 +25,10 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"golang.org/x/sync/errgroup"

"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/errutil"
"github.com/thanos-io/thanos/pkg/runutil"
Expand Down Expand Up @@ -762,3 +767,106 @@ func SamplesFromTSDBSamples(samples []tsdbutil.Sample) []sample {
}
return res
}

// GatherNoDownsampleMarkFilter is a block.Fetcher filter that passes all metas.
// While doing it, it gathers all no-downsample-mark.json markers.
type GatherNoDownsampleMarkFilter struct {
logger log.Logger
bkt objstore.InstrumentedBucketReader
noDownsampleMarkedMap map[ulid.ULID]*metadata.NoDownsampleMark
concurrency int
mtx sync.Mutex
}

// NewGatherNoDownsampleMarkFilter creates GatherNoDownsampleMarkFilter.
func NewGatherNoDownsampleMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader) *GatherNoDownsampleMarkFilter {
return &GatherNoDownsampleMarkFilter{
logger: logger,
bkt: bkt,
concurrency: 1,
}
}

// NoDownsampleMarkedBlocks returns block ids that were marked for no downsample.
func (f *GatherNoDownsampleMarkFilter) NoDownsampleMarkedBlocks() map[ulid.ULID]*metadata.NoDownsampleMark {
f.mtx.Lock()
copiedNoDownsampleMarked := make(map[ulid.ULID]*metadata.NoDownsampleMark, len(f.noDownsampleMarkedMap))
for k, v := range f.noDownsampleMarkedMap {
copiedNoDownsampleMarked[k] = v
}
f.mtx.Unlock()

return copiedNoDownsampleMarked
}

// TODO (@rohitkochhar): reduce code duplication here by combining
// this code with that of GatherNoCompactionMarkFilter
// Filter passes all metas, while gathering no downsample markers.
func (f *GatherNoDownsampleMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error {
f.mtx.Lock()
f.noDownsampleMarkedMap = make(map[ulid.ULID]*metadata.NoDownsampleMark)
f.mtx.Unlock()

// Make a copy of block IDs to check, in order to avoid concurrency issues
// between the scheduler and workers.
blockIDs := make([]ulid.ULID, 0, len(metas))
for id := range metas {
blockIDs = append(blockIDs, id)
}

var (
eg errgroup.Group
ch = make(chan ulid.ULID, f.concurrency)
)

for i := 0; i < f.concurrency; i++ {
eg.Go(func() error {
var lastErr error
for id := range ch {
m := &metadata.NoDownsampleMark{}

if err := metadata.ReadMarker(ctx, f.logger, f.bkt, id.String(), m); err != nil {
if errors.Cause(err) == metadata.ErrorMarkerNotFound {
continue
}
if errors.Cause(err) == metadata.ErrorUnmarshalMarker {
level.Warn(f.logger).Log("msg", "found partial no-downsample-mark.json; if we will see it happening often for the same block, consider manually deleting no-downsample-mark.json from the object storage", "block", id, "err", err)
continue
}
// Remember the last error and continue draining the channel.
lastErr = err
continue
}

f.mtx.Lock()
f.noDownsampleMarkedMap[id] = m
f.mtx.Unlock()
synced.WithLabelValues(block.MarkedForNoDownsampleMeta).Inc()
}

return lastErr
})
}

// Workers scheduled, distribute blocks.
eg.Go(func() error {
defer close(ch)

for _, id := range blockIDs {
select {
case ch <- id:
// Nothing to do.
case <-ctx.Done():
return ctx.Err()
}
}

return nil
})

if err := eg.Wait(); err != nil {
return errors.Wrap(err, "filter blocks marked for no downsample")
}

return nil
}

0 comments on commit 67ba676

Please sign in to comment.