Skip to content

Commit

Permalink
tools: Added thanos bucket tool rewrite (for now: allowing block seri…
Browse files Browse the repository at this point in the history
…es deletions).

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Nov 7, 2020
1 parent a4576d8 commit ef02fe0
Show file tree
Hide file tree
Showing 8 changed files with 991 additions and 7 deletions.
58 changes: 58 additions & 0 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func registerBucket(app extkingpin.AppClause) {
registerBucketReplicate(cmd, objStoreConfig)
registerBucketDownsample(cmd, objStoreConfig)
registerBucketCleanup(cmd, objStoreConfig)
registerBucketRewrite(cmd, objStoreConfig)
}

func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
Expand Down Expand Up @@ -710,3 +711,60 @@ func compare(s1, s2 string) bool {
}
return s1Time.Before(s2Time)
}

func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) {
cmd := app.Command(component.Rewrite.String(), "Rewrite chosen blocks in the bucket, while deleting or modifying series. Once rewritten, the old block is marked for deletion."+
"NOTE: It's recommended to turn off compactor while doing this operation. If the compactor is running and touching exactly same block that"+
"is being rewritten, the resulted rewritten block might only cause overlap (mitigated by marking overlapping block manually for deletion)"+
"and the data you wanted to rewrite could already part of bigger block.\n\n"+
"Use FILESYSTEM type of bucket to rewrite block on disk (suitable for vanilla Prometheus)"+
"WARNING: This procedure is *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks first (you can use objstore.config-backup flags for this command)")
blockIDs := cmd.Flag("id", "ID (ULID) of the blocks for rewrite (repeated flag).").Required().Strings()
objStoreBackupConfig := extkingpin.RegisterCommonObjStoreFlags(cmd, "-backup", false, "Used for backup-ing block before rewrite if you choose so (only use in non-dry run mode).")
dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool()
cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Rewrite.String())
if err != nil {
return err
}

var ids []ulid.ULID
for _, id := range *blockIDs {
u, err := ulid.Parse(id)
if err != nil {
return errors.Errorf("block.id is not a valid UUID, got: %v", id)
}
ids = append(ids, u)
}

var backupBkt objstore.InstrumentedBucket
if !*dryRun {
confContentYaml, err := objStoreBackupConfig.Content()
if err != nil {
return err
}

backupBkt, err = client.NewBucket(logger, confContentYaml, reg, component.Cleanup.String())
if err != nil {
return err
}
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
g.Add(func() error {
for _, id := range ids {
// Delete series from block & repair.
}
level.Info(logger).Log("msg", "marking for deletion done", "IDs", strings.Join(*blockIDs, ","))
return nil
}, func(err error) {
cancel()
})
return nil
})
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ replace (
// Update to v1.1.1 to make sure windows CI pass.
github.com/elastic/go-sysinfo => github.com/elastic/go-sysinfo v1.1.1
// Make sure Prometheus version is pinned as Prometheus semver does not include Go APIs.
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20201029103703-63be30dceed9
github.com/prometheus/prometheus => ../prometheus
github.com/sercand/kuberesolver => github.com/sercand/kuberesolver v2.4.0+incompatible
google.golang.org/grpc => google.golang.org/grpc v1.29.1

Expand Down
12 changes: 6 additions & 6 deletions pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,15 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT

chunkw, err := chunks.NewWriter(filepath.Join(resdir, ChunksDirname))
if err != nil {
return resid, errors.Wrap(err, "open chunk writer")
return resid, errors.Wrap(err, "open chunk seriesWriter")
}
defer runutil.CloseWithErrCapture(&err, chunkw, "repair chunk writer")
defer runutil.CloseWithErrCapture(&err, chunkw, "repair chunk seriesWriter")

indexw, err := index.NewWriter(context.TODO(), filepath.Join(resdir, IndexFilename))
if err != nil {
return resid, errors.Wrap(err, "open index writer")
return resid, errors.Wrap(err, "open index seriesWriter")
}
defer runutil.CloseWithErrCapture(&err, indexw, "repair index writer")
defer runutil.CloseWithErrCapture(&err, indexw, "repair index seriesWriter")

// TODO(fabxc): adapt so we properly handle the version once we update to an upstream
// that has multiple.
Expand Down Expand Up @@ -435,9 +435,9 @@ func rewrite(
series = []seriesRepair{}
)

var lset labels.Labels
var chks []chunks.Meta
for all.Next() {
var lset labels.Labels
var chks []chunks.Meta
id := all.At()

if err := indexr.Series(id, &lset, &chks); err != nil {
Expand Down
176 changes: 176 additions & 0 deletions pkg/block/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package block

import (
"context"
"io"
"os"
"path/filepath"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"
)

// Reader is like tsdb.BlockReader but without tombstones and size methods.
type Reader interface {
// Index returns an IndexReader over the block's data.
Index() (tsdb.IndexReader, error)

// Chunks returns a ChunkReader over the block's data.
Chunks() (tsdb.ChunkReader, error)

Meta() tsdb.BlockMeta
}

// SeriesWriter is interface for writing series into one or multiple Blocks.
// Statistics has to be counted by implementation.
type SeriesWriter interface {
tsdb.IndexWriter
tsdb.ChunkWriter
}

// Writer is interface for creating block(s).
type Writer interface {
SeriesWriter

Flush() (tsdb.BlockStats, error)
}

type DiskWriter struct {
statsGatheringSeriesWriter

bTmp, bDir string
logger log.Logger
closers []io.Closer
}

const tmpForCreationBlockDirSuffix = ".tmp-for-creation"

// NewDiskWriter allows to write single TSDB block to disk and returns statistics.
func NewDiskWriter(ctx context.Context, logger log.Logger, bDir string) (_ *DiskWriter, err error) {
bTmp := bDir + tmpForCreationBlockDirSuffix

d := &DiskWriter{
bTmp: bTmp,
bDir: bDir,
logger: logger,
}
defer func() {
if err != nil {
err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(d.closers)).Err()
if err := os.RemoveAll(bTmp); err != nil {
level.Error(logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error())
}
}
}()

if err = os.RemoveAll(bTmp); err != nil {
return nil, err
}
if err = os.MkdirAll(bTmp, 0777); err != nil {
return nil, err
}

chunkw, err := chunks.NewWriter(filepath.Join(bTmp, ChunksDirname))
if err != nil {
return nil, errors.Wrap(err, "open chunk writer")
}
d.closers = append(d.closers, chunkw)

// TODO(bwplotka): Setup instrumentedChunkWriter if we want to upstream this code.

indexw, err := index.NewWriter(ctx, filepath.Join(bTmp, IndexFilename))
if err != nil {
return nil, errors.Wrap(err, "open index writer")
}
d.closers = append(d.closers, indexw)
d.statsGatheringSeriesWriter = statsGatheringSeriesWriter{iw: indexw, cw: chunkw}
return d, nil
}

func (d *DiskWriter) Flush() (_ tsdb.BlockStats, err error) {
defer func() {
if err != nil {
err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(d.closers)).Err()
if err := os.RemoveAll(d.bTmp); err != nil {
level.Error(d.logger).Log("msg", "removed tmp folder failed after block(s) write", "err", err.Error())
}
}
}()
df, err := fileutil.OpenDir(d.bTmp)
if err != nil {
return tsdb.BlockStats{}, errors.Wrap(err, "open temporary block dir")
}
defer func() {
if df != nil {
err = tsdb_errors.NewMulti(err, df.Close()).Err()
}
}()

if err := df.Sync(); err != nil {
return tsdb.BlockStats{}, errors.Wrap(err, "sync temporary dir file")
}

// Close temp dir before rename block dir (for windows platform).
if err = df.Close(); err != nil {
return tsdb.BlockStats{}, errors.Wrap(err, "close temporary dir")
}
df = nil

if err := tsdb_errors.CloseAll(d.closers); err != nil {
d.closers = nil
return tsdb.BlockStats{}, err
}
d.closers = nil

// Block successfully written, make it visible in destination dir by moving it from tmp one.
if err := fileutil.Replace(d.bTmp, d.bDir); err != nil {
return tsdb.BlockStats{}, errors.Wrap(err, "rename block dir")
}
return d.stats, nil
}

type statsGatheringSeriesWriter struct {
iw tsdb.IndexWriter
cw tsdb.ChunkWriter

stats tsdb.BlockStats
symbols int64
}

func (s *statsGatheringSeriesWriter) AddSymbol(sym string) error {
if err := s.iw.AddSymbol(sym); err != nil {
return err
}
s.symbols++
return nil
}

func (s *statsGatheringSeriesWriter) AddSeries(ref uint64, l labels.Labels, chks ...chunks.Meta) error {
if err := s.iw.AddSeries(ref, l, chks...); err != nil {
return err
}
s.stats.NumSeries++
return nil
}

func (s *statsGatheringSeriesWriter) WriteChunks(chks ...chunks.Meta) error {
if err := s.cw.WriteChunks(chks...); err != nil {
return err
}
s.stats.NumChunks += uint64(len(chks))
for _, chk := range chks {
s.stats.NumSamples += uint64(chk.Chunk.NumSamples())
}
return nil
}

func (s statsGatheringSeriesWriter) Close() error {
return tsdb_errors.NewMulti(s.iw.Close(), s.cw.Close()).Err()
}
105 changes: 105 additions & 0 deletions pkg/block/writer_modifiers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package block

import (
"math"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tombstones"
)

type Modifier interface {
Modify(sym index.StringIter, set storage.ChunkSeriesSet, log printChangeLog) (index.StringIter, storage.ChunkSeriesSet)
}

type DeletionModifier struct {
deletions []DeleteRequest
}

func WithDeletionModifier(deletions []DeleteRequest) *DeletionModifier {
return &DeletionModifier{deletions: deletions}
}

func (d *DeletionModifier) Modify(sym index.StringIter, set storage.ChunkSeriesSet, log printChangeLog) (index.StringIter, storage.ChunkSeriesSet) {
return sym, &delModifierSeriesSet{
d: d,

ChunkSeriesSet: set,
log: log,
}
}

type delModifierSeriesSet struct {
storage.ChunkSeriesSet

d *DeletionModifier
log printChangeLog

err error
}

func (d *delModifierSeriesSet) Next() bool {
for d.ChunkSeriesSet.Next() {
s := d.ChunkSeriesSet.At()
lbls := s.Labels()

var intervals tombstones.Intervals
for _, deletions := range d.d.deletions {
for _, m := range deletions.Matchers {
v := lbls.Get(m.Name)
if v == "" {
continue
}

if m.Matches(v) {
continue
}
for _, in := range deletions.intervals {
intervals = intervals.Add(in)
}
break
}
}

if (tombstones.Interval{Mint: math.MinInt64, Maxt: math.MaxInt64}.IsSubrange(intervals)) {
// Quick path for skipping series completely.
chksIter := d.ChunkSeriesSet.At().Iterator()
var chks []chunks.Meta
for chksIter.Next() {
chks = append(chks, chksIter.At())
}
d.err = chksIter.Err()
if d.err != nil {
return false
}

deleted := tombstones.Intervals{}
if len(chks) > 0 {
deleted.Add(tombstones.Interval{Mint: chks[0].MinTime, Maxt: chks[len(chks)].MaxTime})
}
d.log.DeleteSeries(lbls, deleted)
continue
}
}
return false
}
func (d *delModifierSeriesSet) At() storage.ChunkSeries {

}

func (d *delModifierSeriesSet) Err() error {
panic("implement me")
}

func (d *delModifierSeriesSet) Warnings() storage.Warnings {
panic("implement me")
}

// TODO(bwplotka): Add relabelling.

type DeleteRequest struct {
Matchers []*labels.Matcher
intervals tombstones.Intervals
}
Loading

0 comments on commit ef02fe0

Please sign in to comment.