From ef02fe0f75d72fd65708e2c3b12d50eeac3109e4 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Thu, 5 Nov 2020 17:44:51 +0100 Subject: [PATCH] tools: Added thanos bucket tool rewrite (for now: allowing block series deletions). Signed-off-by: Bartlomiej Plotka --- cmd/thanos/tools_bucket.go | 58 +++++ go.mod | 2 +- pkg/block/index.go | 12 +- pkg/block/writer.go | 176 +++++++++++++++ pkg/block/writer_modifiers.go | 105 +++++++++ pkg/block/writer_series.go | 386 ++++++++++++++++++++++++++++++++ pkg/block/writer_series_test.go | 258 +++++++++++++++++++++ pkg/component/component.go | 1 + 8 files changed, 991 insertions(+), 7 deletions(-) create mode 100644 pkg/block/writer.go create mode 100644 pkg/block/writer_modifiers.go create mode 100644 pkg/block/writer_series.go create mode 100644 pkg/block/writer_series_test.go diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index a2e944a7cf..13e1e54ff5 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -73,6 +73,7 @@ func registerBucket(app extkingpin.AppClause) { registerBucketReplicate(cmd, objStoreConfig) registerBucketDownsample(cmd, objStoreConfig) registerBucketCleanup(cmd, objStoreConfig) + registerBucketRewrite(cmd, objStoreConfig) } func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) { @@ -710,3 +711,60 @@ func compare(s1, s2 string) bool { } return s1Time.Before(s2Time) } + +func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.PathOrContent) { + cmd := app.Command(component.Rewrite.String(), "Rewrite chosen blocks in the bucket, while deleting or modifying series. Once rewritten, the old block is marked for deletion."+ + "NOTE: It's recommended to turn off compactor while doing this operation. If the compactor is running and touching exactly same block that"+ + "is being rewritten, the resulted rewritten block might only cause overlap (mitigated by marking overlapping block manually for deletion)"+ + "and the data you wanted to rewrite could already part of bigger block.\n\n"+ + "Use FILESYSTEM type of bucket to rewrite block on disk (suitable for vanilla Prometheus)"+ + "WARNING: This procedure is *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks first (you can use objstore.config-backup flags for this command)") + blockIDs := cmd.Flag("id", "ID (ULID) of the blocks for rewrite (repeated flag).").Required().Strings() + objStoreBackupConfig := extkingpin.RegisterCommonObjStoreFlags(cmd, "-backup", false, "Used for backup-ing block before rewrite if you choose so (only use in non-dry run mode).") + dryRun := cmd.Flag("dry-run", "Prints the series changes instead of doing them. Defaults to true, for user to double check. (: Pass --no-dry-run to skip this.").Default("true").Bool() + cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ <-chan struct{}, _ bool) error { + confContentYaml, err := objStoreConfig.Content() + if err != nil { + return err + } + + bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Rewrite.String()) + if err != nil { + return err + } + + var ids []ulid.ULID + for _, id := range *blockIDs { + u, err := ulid.Parse(id) + if err != nil { + return errors.Errorf("block.id is not a valid UUID, got: %v", id) + } + ids = append(ids, u) + } + + var backupBkt objstore.InstrumentedBucket + if !*dryRun { + confContentYaml, err := objStoreBackupConfig.Content() + if err != nil { + return err + } + + backupBkt, err = client.NewBucket(logger, confContentYaml, reg, component.Cleanup.String()) + if err != nil { + return err + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + g.Add(func() error { + for _, id := range ids { + // Delete series from block & repair. + } + level.Info(logger).Log("msg", "marking for deletion done", "IDs", strings.Join(*blockIDs, ",")) + return nil + }, func(err error) { + cancel() + }) + return nil + }) +} diff --git a/go.mod b/go.mod index 4e631219a8..1cf45a9a26 100644 --- a/go.mod +++ b/go.mod @@ -77,7 +77,7 @@ replace ( // Update to v1.1.1 to make sure windows CI pass. github.com/elastic/go-sysinfo => github.com/elastic/go-sysinfo v1.1.1 // Make sure Prometheus version is pinned as Prometheus semver does not include Go APIs. - github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20201029103703-63be30dceed9 + github.com/prometheus/prometheus => ../prometheus github.com/sercand/kuberesolver => github.com/sercand/kuberesolver v2.4.0+incompatible google.golang.org/grpc => google.golang.org/grpc v1.29.1 diff --git a/pkg/block/index.go b/pkg/block/index.go index c51251b2de..d245f99b91 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -282,15 +282,15 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT chunkw, err := chunks.NewWriter(filepath.Join(resdir, ChunksDirname)) if err != nil { - return resid, errors.Wrap(err, "open chunk writer") + return resid, errors.Wrap(err, "open chunk seriesWriter") } - defer runutil.CloseWithErrCapture(&err, chunkw, "repair chunk writer") + defer runutil.CloseWithErrCapture(&err, chunkw, "repair chunk seriesWriter") indexw, err := index.NewWriter(context.TODO(), filepath.Join(resdir, IndexFilename)) if err != nil { - return resid, errors.Wrap(err, "open index writer") + return resid, errors.Wrap(err, "open index seriesWriter") } - defer runutil.CloseWithErrCapture(&err, indexw, "repair index writer") + defer runutil.CloseWithErrCapture(&err, indexw, "repair index seriesWriter") // TODO(fabxc): adapt so we properly handle the version once we update to an upstream // that has multiple. @@ -435,9 +435,9 @@ func rewrite( series = []seriesRepair{} ) + var lset labels.Labels + var chks []chunks.Meta for all.Next() { - var lset labels.Labels - var chks []chunks.Meta id := all.At() if err := indexr.Series(id, &lset, &chks); err != nil { diff --git a/pkg/block/writer.go b/pkg/block/writer.go new file mode 100644 index 0000000000..1ec6590c14 --- /dev/null +++ b/pkg/block/writer.go @@ -0,0 +1,176 @@ +package block + +import ( + "context" + "io" + "os" + "path/filepath" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunks" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/prometheus/prometheus/tsdb/index" +) + +// Reader is like tsdb.BlockReader but without tombstones and size methods. +type Reader interface { + // Index returns an IndexReader over the block's data. + Index() (tsdb.IndexReader, error) + + // Chunks returns a ChunkReader over the block's data. + Chunks() (tsdb.ChunkReader, error) + + Meta() tsdb.BlockMeta +} + +// SeriesWriter is interface for writing series into one or multiple Blocks. +// Statistics has to be counted by implementation. +type SeriesWriter interface { + tsdb.IndexWriter + tsdb.ChunkWriter +} + +// Writer is interface for creating block(s). +type Writer interface { + SeriesWriter + + Flush() (tsdb.BlockStats, error) +} + +type DiskWriter struct { + statsGatheringSeriesWriter + + bTmp, bDir string + logger log.Logger + closers []io.Closer +} + +const tmpForCreationBlockDirSuffix = ".tmp-for-creation" + +// NewDiskWriter allows to write single TSDB block to disk and returns statistics. +func NewDiskWriter(ctx context.Context, logger log.Logger, bDir string) (_ *DiskWriter, err error) { + bTmp := bDir + tmpForCreationBlockDirSuffix + + d := &DiskWriter{ + bTmp: bTmp, + bDir: bDir, + logger: logger, + } + defer func() { + if err != nil { + err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(d.closers)).Err() + if err := os.RemoveAll(bTmp); err != nil { + level.Error(logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) + } + } + }() + + if err = os.RemoveAll(bTmp); err != nil { + return nil, err + } + if err = os.MkdirAll(bTmp, 0777); err != nil { + return nil, err + } + + chunkw, err := chunks.NewWriter(filepath.Join(bTmp, ChunksDirname)) + if err != nil { + return nil, errors.Wrap(err, "open chunk writer") + } + d.closers = append(d.closers, chunkw) + + // TODO(bwplotka): Setup instrumentedChunkWriter if we want to upstream this code. + + indexw, err := index.NewWriter(ctx, filepath.Join(bTmp, IndexFilename)) + if err != nil { + return nil, errors.Wrap(err, "open index writer") + } + d.closers = append(d.closers, indexw) + d.statsGatheringSeriesWriter = statsGatheringSeriesWriter{iw: indexw, cw: chunkw} + return d, nil +} + +func (d *DiskWriter) Flush() (_ tsdb.BlockStats, err error) { + defer func() { + if err != nil { + err = tsdb_errors.NewMulti(err, tsdb_errors.CloseAll(d.closers)).Err() + if err := os.RemoveAll(d.bTmp); err != nil { + level.Error(d.logger).Log("msg", "removed tmp folder failed after block(s) write", "err", err.Error()) + } + } + }() + df, err := fileutil.OpenDir(d.bTmp) + if err != nil { + return tsdb.BlockStats{}, errors.Wrap(err, "open temporary block dir") + } + defer func() { + if df != nil { + err = tsdb_errors.NewMulti(err, df.Close()).Err() + } + }() + + if err := df.Sync(); err != nil { + return tsdb.BlockStats{}, errors.Wrap(err, "sync temporary dir file") + } + + // Close temp dir before rename block dir (for windows platform). + if err = df.Close(); err != nil { + return tsdb.BlockStats{}, errors.Wrap(err, "close temporary dir") + } + df = nil + + if err := tsdb_errors.CloseAll(d.closers); err != nil { + d.closers = nil + return tsdb.BlockStats{}, err + } + d.closers = nil + + // Block successfully written, make it visible in destination dir by moving it from tmp one. + if err := fileutil.Replace(d.bTmp, d.bDir); err != nil { + return tsdb.BlockStats{}, errors.Wrap(err, "rename block dir") + } + return d.stats, nil +} + +type statsGatheringSeriesWriter struct { + iw tsdb.IndexWriter + cw tsdb.ChunkWriter + + stats tsdb.BlockStats + symbols int64 +} + +func (s *statsGatheringSeriesWriter) AddSymbol(sym string) error { + if err := s.iw.AddSymbol(sym); err != nil { + return err + } + s.symbols++ + return nil +} + +func (s *statsGatheringSeriesWriter) AddSeries(ref uint64, l labels.Labels, chks ...chunks.Meta) error { + if err := s.iw.AddSeries(ref, l, chks...); err != nil { + return err + } + s.stats.NumSeries++ + return nil +} + +func (s *statsGatheringSeriesWriter) WriteChunks(chks ...chunks.Meta) error { + if err := s.cw.WriteChunks(chks...); err != nil { + return err + } + s.stats.NumChunks += uint64(len(chks)) + for _, chk := range chks { + s.stats.NumSamples += uint64(chk.Chunk.NumSamples()) + } + return nil +} + +func (s statsGatheringSeriesWriter) Close() error { + return tsdb_errors.NewMulti(s.iw.Close(), s.cw.Close()).Err() +} diff --git a/pkg/block/writer_modifiers.go b/pkg/block/writer_modifiers.go new file mode 100644 index 0000000000..12abff1d80 --- /dev/null +++ b/pkg/block/writer_modifiers.go @@ -0,0 +1,105 @@ +package block + +import ( + "math" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/prometheus/prometheus/tsdb/tombstones" +) + +type Modifier interface { + Modify(sym index.StringIter, set storage.ChunkSeriesSet, log printChangeLog) (index.StringIter, storage.ChunkSeriesSet) +} + +type DeletionModifier struct { + deletions []DeleteRequest +} + +func WithDeletionModifier(deletions []DeleteRequest) *DeletionModifier { + return &DeletionModifier{deletions: deletions} +} + +func (d *DeletionModifier) Modify(sym index.StringIter, set storage.ChunkSeriesSet, log printChangeLog) (index.StringIter, storage.ChunkSeriesSet) { + return sym, &delModifierSeriesSet{ + d: d, + + ChunkSeriesSet: set, + log: log, + } +} + +type delModifierSeriesSet struct { + storage.ChunkSeriesSet + + d *DeletionModifier + log printChangeLog + + err error +} + +func (d *delModifierSeriesSet) Next() bool { + for d.ChunkSeriesSet.Next() { + s := d.ChunkSeriesSet.At() + lbls := s.Labels() + + var intervals tombstones.Intervals + for _, deletions := range d.d.deletions { + for _, m := range deletions.Matchers { + v := lbls.Get(m.Name) + if v == "" { + continue + } + + if m.Matches(v) { + continue + } + for _, in := range deletions.intervals { + intervals = intervals.Add(in) + } + break + } + } + + if (tombstones.Interval{Mint: math.MinInt64, Maxt: math.MaxInt64}.IsSubrange(intervals)) { + // Quick path for skipping series completely. + chksIter := d.ChunkSeriesSet.At().Iterator() + var chks []chunks.Meta + for chksIter.Next() { + chks = append(chks, chksIter.At()) + } + d.err = chksIter.Err() + if d.err != nil { + return false + } + + deleted := tombstones.Intervals{} + if len(chks) > 0 { + deleted.Add(tombstones.Interval{Mint: chks[0].MinTime, Maxt: chks[len(chks)].MaxTime}) + } + d.log.DeleteSeries(lbls, deleted) + continue + } + } + return false +} +func (d *delModifierSeriesSet) At() storage.ChunkSeries { + +} + +func (d *delModifierSeriesSet) Err() error { + panic("implement me") +} + +func (d *delModifierSeriesSet) Warnings() storage.Warnings { + panic("implement me") +} + +// TODO(bwplotka): Add relabelling. + +type DeleteRequest struct { + Matchers []*labels.Matcher + intervals tombstones.Intervals +} diff --git a/pkg/block/writer_series.go b/pkg/block/writer_series.go new file mode 100644 index 0000000000..5a90b74826 --- /dev/null +++ b/pkg/block/writer_series.go @@ -0,0 +1,386 @@ +package block + +import ( + "context" + "fmt" + "io" + "strings" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/prometheus/prometheus/tsdb/tombstones" +) + +type printChangeLog interface { + DeleteSeries(del labels.Labels, intervals tombstones.Intervals) + ModifySeries(old labels.Labels, new labels.Labels) +} + +type changeLog struct { + w io.Writer +} + +func (l *changeLog) DeleteSeries(del labels.Labels, intervals tombstones.Intervals) { + _, _ = fmt.Fprintf(l.w, "Deleted %v %v\n", del.String(), intervals) +} + +func (l *changeLog) ModifySeries(old labels.Labels, new labels.Labels) { + _, _ = fmt.Fprintf(l.w, "Relabelled %v %v\n", old.String(), new.String()) +} + +type seriesWriter struct { + tmpDir string + logger log.Logger + + chunkPool chunkenc.Pool + changeLogger printChangeLog + + dryRun bool +} + +type seriesReader struct { + ir tsdb.IndexReader + cr tsdb.ChunkReader +} + +func NewSeriesWriter(tmpDir string, logger log.Logger, changeLogger printChangeLog, pool chunkenc.Pool) *seriesWriter { + return &seriesWriter{ + tmpDir: tmpDir, + logger: logger, + changeLogger: changeLogger, + chunkPool: pool, + } +} + +// TODO(bwplotka): Upstream this. +func (w *seriesWriter) WriteSeries(ctx context.Context, readers []Reader, sWriter Writer, modifiers ...Modifier) (err error) { + if len(readers) == 0 { + return errors.New("cannot write from no readers") + } + + var ( + sReaders []seriesReader + closers []io.Closer + ) + defer func() { + errs := tsdb_errors.NewMulti(err) + if cerr := tsdb_errors.CloseAll(closers); cerr != nil { + errs.Add(errors.Wrap(cerr, "close")) + } + err = errs.Err() + }() + + for _, b := range readers { + indexr, err := b.Index() + if err != nil { + return errors.Wrapf(err, "open index reader for block %+v", b.Meta()) + } + closers = append(closers, indexr) + + chunkr, err := b.Chunks() + if err != nil { + return errors.Wrapf(err, "open chunk reader for block %+v", b.Meta()) + } + closers = append(closers, chunkr) + sReaders = append(sReaders, seriesReader{ir: indexr, cr: chunkr}) + } + + symbols, set, err := compactSeries(ctx, sReaders...) + if err != nil { + return errors.Wrapf(err, "compact series from %v", func() string { + var metas []string + for _, m := range readers { + metas = append(metas, fmt.Sprintf("%v", m.Meta())) + } + return strings.Join(metas, ",") + }()) + } + + for _, m := range modifiers { + symbols, set = m.Modify(symbols, set, w.changeLogger) + } + + if w.dryRun { + return nil + } + + if err := w.write(ctx, symbols, set, sWriter); err != nil { + return errors.Wrap(err, "write") + } + return nil +} + +// compactSeries compacts blocks' series into symbols and one ChunkSeriesSet with lazy populating chunks. +func compactSeries(ctx context.Context, sReaders ...seriesReader) (symbols index.StringIter, set storage.ChunkSeriesSet, _ error) { + if len(sReaders) == 0 { + return nil, nil, errors.New("cannot populate block from no readers") + } + + var sets []storage.ChunkSeriesSet + for i, r := range sReaders { + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + default: + } + + k, v := index.AllPostingsKey() + all, err := r.ir.Postings(k, v) + if err != nil { + return nil, nil, err + } + all = r.ir.SortedPostings(all) + syms := r.ir.Symbols() + sets = append(sets, newLazyPopulateChunkSeriesSet(r, all)) + if i == 0 { + symbols = syms + set = sets[0] + continue + } + symbols = tsdb.NewMergedStringIter(symbols, syms) + } + + if len(sets) <= 1 { + return symbols, set, nil + } + // Merge series using compacting chunk series merger. + return symbols, storage.NewMergeChunkSeriesSet(sets, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)), nil +} + +type lazyPopulateChunkSeriesSet struct { + sReader seriesReader + + all index.Postings + + bufChks []chunks.Meta + bufLbls labels.Labels + + curr *storage.ChunkSeriesEntry + err error +} + +func newLazyPopulateChunkSeriesSet(sReader seriesReader, all index.Postings) *lazyPopulateChunkSeriesSet { + return &lazyPopulateChunkSeriesSet{sReader: sReader, all: all} +} + +func (s *lazyPopulateChunkSeriesSet) Next() bool { + for s.all.Next() { + if err := s.sReader.ir.Series(s.all.At(), &s.bufLbls, &s.bufChks); err != nil { + // Postings may be stale. Skip if no underlying series exists. + if errors.Cause(err) == storage.ErrNotFound { + continue + } + s.err = errors.Wrapf(err, "get series %d", s.all.At()) + return false + } + + if len(s.bufChks) == 0 { + continue + } + + for i := range s.bufChks { + s.bufChks[i].Chunk = &lazyPopulatableChunk{cr: s.sReader.cr, m: &s.bufChks[i]} + } + s.curr = &storage.ChunkSeriesEntry{ + Lset: make(labels.Labels, len(s.bufLbls)), + ChunkIteratorFn: func() chunks.Iterator { + return storage.NewListChunkSeriesIterator(s.bufChks...) + }, + } + // TODO: Do we need to copy this? + copy(s.curr.Lset, s.bufLbls) + return true + } + return false +} + +func (s *lazyPopulateChunkSeriesSet) At() storage.ChunkSeries { + return s.curr +} + +func (s *lazyPopulateChunkSeriesSet) Err() error { + if s.err != nil { + return s.err + } + return s.all.Err() +} + +func (s *lazyPopulateChunkSeriesSet) Warnings() storage.Warnings { return nil } + +// populatableChunk allows to trigger when you want to have chunks populated. +type populatableChunk interface { + Populate(intervals tombstones.Intervals) (err error) +} + +type lazyPopulatableChunk struct { + m *chunks.Meta + + cr tsdb.ChunkReader + + populated chunkenc.Chunk + bufIter *tsdb.DeletedIterator +} + +type errChunkIterator struct{ err error } + +func (e errChunkIterator) Seek(int64) bool { return false } +func (e errChunkIterator) At() (int64, float64) { return 0, 0 } +func (e errChunkIterator) Next() bool { return false } +func (e errChunkIterator) Err() error { return e.err } + +var EmptyChunk = errChunk{err: errChunkIterator{err: errors.New("no samples")}} + +type errChunk struct{ err errChunkIterator } + +func (e errChunk) Bytes() []byte { return nil } +func (e errChunk) Encoding() chunkenc.Encoding { return chunkenc.EncXOR } +func (e errChunk) Appender() (chunkenc.Appender, error) { return nil, e.err.err } +func (e errChunk) Iterator(chunkenc.Iterator) chunkenc.Iterator { return e.err } +func (e errChunk) NumSamples() int { return 0 } +func (e errChunk) Compact() {} + +func (l *lazyPopulatableChunk) Populate(intervals tombstones.Intervals) { + if len(intervals) > 0 && (tombstones.Interval{Mint: l.m.MinTime, Maxt: l.m.MaxTime}.IsSubrange(intervals)) { + l.m.Chunk = EmptyChunk + return + } + + // TODO(bwplotka): In most cases we don't need to parse anything, just copy. Extend reader/writer for this. + var err error + l.populated, err = l.cr.Chunk(l.m.Ref) + if err != nil { + l.m.Chunk = errChunk{err: errChunkIterator{err: errors.Wrapf(err, "cannot populate chunk %d", l.m.Ref)}} + return + } + + var matching tombstones.Intervals + for _, interval := range intervals { + if l.m.OverlapsClosedInterval(interval.Mint, interval.Maxt) { + matching = matching.Add(interval) + } + } + + if len(matching) == 0 { + l.m.Chunk = l.populated + return + } + + // TODO(bwplotka): Optimize by using passed iterator. + l.bufIter = &tsdb.DeletedIterator{Intervals: matching, Iter: l.populated.Iterator(nil)} + return + +} + +func (l *lazyPopulatableChunk) Bytes() []byte { + if l.populated == nil { + l.Populate(nil) + } + return l.populated.Bytes() +} + +func (l *lazyPopulatableChunk) Encoding() chunkenc.Encoding { + if l.populated == nil { + l.Populate(nil) + } + return l.populated.Encoding() +} + +func (l *lazyPopulatableChunk) Appender() (chunkenc.Appender, error) { + if l.populated == nil { + l.Populate(nil) + } + return l.populated.Appender() +} + +func (l *lazyPopulatableChunk) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator { + if l.populated == nil { + l.Populate(nil) + } + if l.bufIter == nil { + return l.populated.Iterator(iterator) + } + return l.bufIter +} + +func (l *lazyPopulatableChunk) NumSamples() int { + if l.populated == nil { + l.Populate(nil) + } + return l.populated.NumSamples() +} + +func (l *lazyPopulatableChunk) Compact() { + if l.populated == nil { + l.Populate(nil) + } + l.populated.Compact() +} + +func (w *seriesWriter) write(ctx context.Context, symbols index.StringIter, populatedSet storage.ChunkSeriesSet, sWriter SeriesWriter) error { + var ( + chks []chunks.Meta + ref uint64 + ) + + for symbols.Next() { + if err := sWriter.AddSymbol(symbols.At()); err != nil { + return errors.Wrap(err, "add symbol") + } + } + if err := symbols.Err(); err != nil { + return errors.Wrap(err, "symbols") + } + + // Iterate over all sorted chunk series. + for populatedSet.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + s := populatedSet.At() + chksIter := s.Iterator() + chks = chks[:0] + for chksIter.Next() { + // We are not iterating in streaming way over chunk as it's more efficient to do bulk write for index and + // chunk file purposes. + chks = append(chks, chksIter.At()) + } + + if chksIter.Err() != nil { + return errors.Wrap(chksIter.Err(), "chunk iter") + } + + // Skip the series with all deleted chunks. + if len(chks) == 0 { + continue + } + + if err := sWriter.WriteChunks(chks...); err != nil { + return errors.Wrap(err, "write chunks") + } + if err := sWriter.AddSeries(ref, s.Labels(), chks...); err != nil { + return errors.Wrap(err, "add series") + } + + for _, chk := range chks { + if err := w.chunkPool.Put(chk.Chunk); err != nil { + return errors.Wrap(err, "put chunk") + } + } + ref++ + } + if populatedSet.Err() != nil { + return errors.Wrap(populatedSet.Err(), "iterate populated chunk series set") + } + + return nil +} diff --git a/pkg/block/writer_series_test.go b/pkg/block/writer_series_test.go new file mode 100644 index 0000000000..1b3560e17b --- /dev/null +++ b/pkg/block/writer_series_test.go @@ -0,0 +1,258 @@ +package block + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "sort" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestSeriesWriter_WriteSeries_e2e(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + logger := log.NewLogfmtLogger(os.Stderr) + for _, tcase := range []struct { + name string + + input [][]seriesSamples + expected []seriesSamples + expectedErr error + expectedStats tsdb.BlockStats + modifiers struct{} + }{ + { + name: "empty block", + expectedErr: errors.New("cannot write from no readers"), + }, + { + name: "1 blocks no modify", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + expectedStats: tsdb.BlockStats{ + NumSamples: 18, + NumSeries: 3, + NumChunks: 4, + }, + }, + { + name: "2 blocks compact no modify", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}}}, + }, + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{10, 12}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "4"}}, + chunks: [][]sample{{{10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 12}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "4"}}, + chunks: [][]sample{{{10, 12}, {11, 11}, {20, 20}}}}, + }, + expectedStats: tsdb.BlockStats{ + NumSamples: 21, + NumSeries: 4, + NumChunks: 7, + }, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "test-series-writer") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + chunkPool := chunkenc.NewPool() + s := NewSeriesWriter(tmpDir, logger, chunkPool) + + var blocks []Reader + for _, b := range tcase.input { + id := ulid.MustNew(uint64(len(blocks)+1), nil) + bdir := filepath.Join(tmpDir, id.String()) + testutil.Ok(t, os.MkdirAll(bdir, os.ModePerm)) + testutil.Ok(t, createBlockSeries(bdir, b)) + // Meta does not matter, but let's create for OpenBlock to work. + testutil.Ok(t, metadata.Meta{BlockMeta: tsdb.BlockMeta{Version: 1, ULID: id}}.WriteToDir(logger, bdir)) + block, err := tsdb.OpenBlock(logger, bdir, chunkPool) + testutil.Ok(t, err) + blocks = append(blocks, block) + } + + id := ulid.MustNew(uint64(len(blocks)+1), nil) + d, err := NewDiskWriter(ctx, logger, filepath.Join(tmpDir, id.String())) + testutil.Ok(t, err) + if tcase.expectedErr != nil { + err := s.WriteSeries(ctx, nil, d) + testutil.NotOk(t, err) + testutil.Equals(t, tcase.expectedErr.Error(), err.Error()) + return + } + testutil.Ok(t, s.WriteSeries(ctx, blocks, d)) + + stats, err := d.Flush() + testutil.Ok(t, err) + testutil.Equals(t, tcase.expectedStats, stats) + testutil.Equals(t, tcase.expected, readBlockSeries(t, filepath.Join(tmpDir, id.String()))) + }) + } +} + +type sample struct { + t int64 + v float64 +} + +type seriesSamples struct { + lset labels.Labels + chunks [][]sample +} + +func readBlockSeries(t *testing.T, bDir string) []seriesSamples { + indexr, err := index.NewFileReader(filepath.Join(bDir, IndexFilename)) + testutil.Ok(t, err) + defer indexr.Close() + + chunkr, err := chunks.NewDirReader(filepath.Join(bDir, ChunksDirname), nil) + testutil.Ok(t, err) + defer chunkr.Close() + + all, err := indexr.Postings(index.AllPostingsKey()) + testutil.Ok(t, err) + all = indexr.SortedPostings(all) + + var series []seriesSamples + var chks []chunks.Meta + for all.Next() { + s := seriesSamples{} + testutil.Ok(t, indexr.Series(all.At(), &s.lset, &chks)) + + for _, c := range chks { + c.Chunk, err = chunkr.Chunk(c.Ref) + testutil.Ok(t, err) + + var chk []sample + iter := c.Chunk.Iterator(nil) + for iter.Next() { + sa := sample{} + sa.t, sa.v = iter.At() + chk = append(chk, sa) + } + testutil.Ok(t, iter.Err()) + s.chunks = append(s.chunks, chk) + } + series = append(series, s) + } + testutil.Ok(t, all.Err()) + return series +} + +func createBlockSeries(bDir string, inputSeries []seriesSamples) (err error) { + d, err := NewDiskWriter(context.TODO(), log.NewNopLogger(), bDir) + if err != nil { + return err + } + defer func() { + if err != nil { + _, _ = d.Flush() + _ = os.RemoveAll(bDir) + } + }() + + sort.Slice(inputSeries, func(i, j int) bool { + return labels.Compare(inputSeries[i].lset, inputSeries[j].lset) < 0 + }) + + // Gather symbols. + symbols := map[string]struct{}{} + for _, input := range inputSeries { + for _, l := range input.lset { + symbols[l.Name] = struct{}{} + symbols[l.Value] = struct{}{} + } + } + + symbolsSlice := make([]string, 0, len(symbols)) + for s := range symbols { + symbolsSlice = append(symbolsSlice, s) + } + sort.Strings(symbolsSlice) + for _, s := range symbolsSlice { + if err := d.AddSymbol(s); err != nil { + return err + } + } + var ref uint64 + for _, input := range inputSeries { + var chks []chunks.Meta + for _, chk := range input.chunks { + x := chunkenc.NewXORChunk() + a, err := x.Appender() + if err != nil { + return err + } + for _, sa := range chk { + a.Append(sa.t, sa.v) + } + chks = append(chks, chunks.Meta{Chunk: x, MinTime: chk[0].t, MaxTime: chk[len(chk)-1].t}) + } + if err := d.WriteChunks(chks...); err != nil { + return errors.Wrap(err, "write chunks") + } + if err := d.AddSeries(ref, input.lset, chks...); err != nil { + return errors.Wrap(err, "add series") + } + ref++ + } + + if _, err = d.Flush(); err != nil { + return errors.Wrap(err, "flush") + } + return nil +} diff --git a/pkg/component/component.go b/pkg/component/component.go index c7451c4767..d5f242fe1c 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -91,6 +91,7 @@ func FromProto(storeType storepb.StoreType) StoreAPI { var ( Bucket = source{component: component{name: "bucket"}} Cleanup = source{component: component{name: "cleanup"}} + Rewrite = source{component: component{name: "rewrite"}} Compact = source{component: component{name: "compact"}} Downsample = source{component: component{name: "downsample"}} Replicate = source{component: component{name: "replicate"}}