Skip to content

Commit

Permalink
support bucket rewrite relabel
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <yb532204897@gmail.com>
  • Loading branch information
yeya24 committed Mar 9, 2021
1 parent 84c73dc commit 73c34e6
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 14 deletions.
36 changes: 30 additions & 6 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"crypto/rand"
"encoding/json"
"fmt"
"github.com/prometheus/prometheus/pkg/relabel"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -791,7 +792,8 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
hashFunc := cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\".").
Default("").Enum("SHA256", "")
dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool()
toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", true)
toDelete := extflag.RegisterPathOrContent(cmd, "rewrite.to-delete-config", "YAML file that contains []metadata.DeletionRequest that will be applied to blocks", false)
toRelabel := extflag.RegisterPathOrContent(cmd, "rewrite.to-relabel-config", "YAML file that contains relabel configs that will be applied to blocks", false)
provideChangeLog := cmd.Flag("rewrite.add-change-log", "If specified, all modifications are written to new block directory. Disable if latency is to high.").Default("true").Bool()
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
Expand All @@ -804,15 +806,36 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
return err
}

deletionsYaml, err := toDelete.Content()
var modifiers []compactv2.Modifier

relabelYaml, err := toRelabel.Content()
if err != nil {
return err
}
var relabels []*relabel.Config
if len(relabelYaml) > 0 {
relabels, err = block.ParseRelabelConfig(relabelYaml, nil)
if err != nil {
return err
}
modifiers = append(modifiers, compactv2.WithRelabelModifier(relabels...))
}

var deletions []metadata.DeletionRequest
if err := yaml.Unmarshal(deletionsYaml, &deletions); err != nil {
deletionsYaml, err := toDelete.Content()
if err != nil {
return err
}
var deletions []metadata.DeletionRequest
if len(deletionsYaml) > 0 {
if err := yaml.Unmarshal(deletionsYaml, &deletions); err != nil {
return err
}
modifiers = append(modifiers, compactv2.WithDeletionModifier(deletions...))
}

if len(modifiers) == 0 {
return errors.New("rewrite configuration should be provided")
}

var ids []ulid.ULID
for _, id := range *blockIDs {
Expand Down Expand Up @@ -856,6 +879,7 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
meta.Thanos.Rewrites = append(meta.Thanos.Rewrites, metadata.Rewrite{
Sources: meta.Compaction.Sources,
DeletionsApplied: deletions,
RelabelsApplied: relabels,
})
meta.Compaction.Sources = []ulid.ULID{newID}
meta.Thanos.Source = metadata.BucketRewriteSource
Expand Down Expand Up @@ -887,8 +911,8 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
comp = compactv2.New(*tmpDir, logger, changeLog, chunkPool)
}

level.Info(logger).Log("msg", "starting rewrite for block", "source", id, "new", newID, "toDelete", string(deletionsYaml))
if err := comp.WriteSeries(ctx, []block.Reader{b}, d, p, compactv2.WithDeletionModifier(deletions...)); err != nil {
level.Info(logger).Log("msg", "starting rewrite for block", "source", id, "new", newID, "toDelete", string(deletionsYaml), "toRelabel", string(relabelYaml))
if err := comp.WriteSeries(ctx, []block.Reader{b}, d, p, modifiers...); err != nil {
return errors.Wrapf(err, "writing series from %v to %v", id, newID)
}

Expand Down
8 changes: 8 additions & 0 deletions docs/components/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,14 @@ Flags:
flag (lower priority). Content of YAML file that
contains []metadata.DeletionRequest that will be
applied to blocks
--rewrite.to-relabel-config-file=<file-path>
Path to YAML file that contains relabel configs
that will be applied to blocks
--rewrite.to-relabel-config=<content>
Alternative to 'rewrite.to-relabel-config-file'
flag (lower priority). Content of YAML file that
contains relabel configs that will be applied to
blocks
--rewrite.add-change-log If specified, all modifications are written to
new block directory. Disable if latency is to
high.
Expand Down
9 changes: 6 additions & 3 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,15 +876,18 @@ var (
)

// ParseRelabelConfig parses relabel configuration.
// If supportedActions not specified, all relabel actions are valid.
func ParseRelabelConfig(contentYaml []byte, supportedActions map[relabel.Action]struct{}) ([]*relabel.Config, error) {
var relabelConfig []*relabel.Config
if err := yaml.Unmarshal(contentYaml, &relabelConfig); err != nil {
return nil, errors.Wrap(err, "parsing relabel configuration")
}

for _, cfg := range relabelConfig {
if _, ok := supportedActions[cfg.Action]; !ok {
return nil, errors.Errorf("unsupported relabel action: %v", cfg.Action)
if supportedActions != nil {
for _, cfg := range relabelConfig {
if _, ok := supportedActions[cfg.Action]; !ok {
return nil, errors.Errorf("unsupported relabel action: %v", cfg.Action)
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package metadata
import (
"encoding/json"
"fmt"
"github.com/prometheus/prometheus/pkg/relabel"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -95,6 +96,8 @@ type Rewrite struct {
Sources []ulid.ULID `json:"sources,omitempty"`
// Deletions if applied (in order).
DeletionsApplied []DeletionRequest `json:"deletions_applied,omitempty"`
// Relabels if applied.
RelabelsApplied []*relabel.Config `json:"relabels_applied,omitempty"`
}

type Matchers []*labels.Matcher
Expand Down
1 change: 0 additions & 1 deletion pkg/compactv2/chunk_series_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package compactv2

import (
"context"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
Expand Down
37 changes: 34 additions & 3 deletions pkg/compactv2/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package compactv2
import (
"bytes"
"context"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/tsdb/tombstones"
"io/ioutil"
"math"
"os"
Expand All @@ -22,14 +25,13 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestCompactor_WriteSeries_e2e(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Minute)
defer cancel()

logger := log.NewLogfmtLogger(os.Stderr)
Expand Down Expand Up @@ -303,7 +305,36 @@ func TestCompactor_WriteSeries_e2e(t *testing.T) {
expectedStats: tsdb.BlockStats{
NumSamples: 12,
NumSeries: 2,
NumChunks: 2,
NumChunks: 2},
},
{
name: "1 block + relabel modifier, delete first series",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "3"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 13}, {11, 11}, {20, 20}}}},
},
},
modifiers: []Modifier{WithRelabelModifier(
&relabel.Config{Action: relabel.Drop,
Regex: relabel.MustNewRegexp("1"),
SourceLabels: model.LabelNames{"a"}},
)},
expected: []seriesSamples{
{lset: labels.Labels{{Name: "a", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "3"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 13}, {11, 11}, {20, 20}}}},
},
expectedChanges: "Deleted {a=\"1\"} [{0 20}]\n",
expectedStats: tsdb.BlockStats{
NumSamples: 12,
NumSeries: 2,
NumChunks: 3,
},
},
} {
Expand Down
108 changes: 107 additions & 1 deletion pkg/compactv2/modifiers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
package compactv2

import (
"math"
"sort"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
Expand Down Expand Up @@ -328,4 +333,105 @@ func (p *delChunkSeriesIterator) Next() bool {

func (p *delChunkSeriesIterator) At() chunks.Meta { return p.curr }

// TODO(bwplotka): Add relabelling.
type RelabelModifier struct {
relabels []*relabel.Config
}

func WithRelabelModifier(relabels ...*relabel.Config) *RelabelModifier {
return &RelabelModifier{relabels: relabels}
}

func (d *RelabelModifier) Modify(_ index.StringIter, set storage.ChunkSeriesSet, log ChangeLogger, p ProgressLogger) (index.StringIter, storage.ChunkSeriesSet) {
// Gather symbols.
symbols := make(map[string]struct{})
// TODO(yeya24): use a map to store the label set and the chunk iterators
// since multiple series might be relabelled to the same series.
seriesSets := make([]storage.ChunkSeries, 0)

SeriesLoop:
for set.Next() {
s := set.At()
lbls := s.Labels()

if processedLabels := relabel.Process(lbls, d.relabels...); len(processedLabels) == 0 {
// Special case: Delete whole series.
chksIter := s.Iterator()
var (
minT int64 = math.MaxInt64
maxT int64 = math.MinInt64
)
for chksIter.Next() {
c := chksIter.At()
if c.MinTime < minT {
minT = c.MinTime
}
if c.MaxTime > maxT {
maxT = c.MaxTime
}
}

if err := chksIter.Err(); err != nil {
return errorOnlyStringIter{err: err}, nil
}

var deleted tombstones.Intervals
// If minTime is set then there is at least one chunk.
if minT != math.MaxInt64 {
deleted = deleted.Add(tombstones.Interval{Mint: minT, Maxt: maxT})
}
log.DeleteSeries(lbls, deleted)
p.SeriesProcessed()
continue SeriesLoop
} else {
for _, lb := range processedLabels {
symbols[lb.Name] = struct{}{}
symbols[lb.Value] = struct{}{}
}

seriesSets = append(seriesSets, &storage.ChunkSeriesEntry{
Lset: processedLabels,
ChunkIteratorFn: s.Iterator,
})

if !labels.Equal(lbls, processedLabels) {
log.ModifySeries(lbls, processedLabels)
}
}
}

symbolsSlice := make([]string, 0, len(symbols))
for s := range symbols {
symbolsSlice = append(symbolsSlice, s)
}
sort.Strings(symbolsSlice)
sort.Slice(seriesSets, func(i, j int) bool {
return labels.Compare(seriesSets[i].Labels(), seriesSets[j].Labels()) < 0
})
return index.NewStringListIter(symbolsSlice), newListChunkSeriesSet(seriesSets...)
}

type errorOnlyStringIter struct {
err error
}

func (errorOnlyStringIter) Next() bool { return false }
func (errorOnlyStringIter) At() string { return "" }
func (s errorOnlyStringIter) Err() error { return s.err }

type listChunkSeriesSet struct {
css []storage.ChunkSeries
idx int
}

func newListChunkSeriesSet(css ...storage.ChunkSeries) storage.ChunkSeriesSet {
return &listChunkSeriesSet{css: css, idx: -1}
}

func (s *listChunkSeriesSet) Next() bool {
s.idx++
return s.idx < len(s.css)
}

func (s *listChunkSeriesSet) At() storage.ChunkSeries { return s.css[s.idx] }
func (s *listChunkSeriesSet) Err() error { return nil }
func (s *listChunkSeriesSet) Warnings() storage.Warnings { return nil }

0 comments on commit 73c34e6

Please sign in to comment.