diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index e7a1c22040..2da91cd5e7 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -85,10 +85,11 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name var backupBkt objstore.Bucket if len(backupconfContentYaml) == 0 { if *repair { - return errors.Wrap(err, "repair is specified, so backup client is required") + return errors.New("repair is specified, so backup client is required") } } else { - backupBkt, err = client.NewBucket(logger, backupconfContentYaml, reg, name) + // nil Prometheus registerer: don't create conflicting metrics + backupBkt, err = client.NewBucket(logger, backupconfContentYaml, nil, name) if err != nil { return err } diff --git a/pkg/block/index.go b/pkg/block/index.go index 777654f2e6..b7fa36d66e 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -531,7 +531,7 @@ func IgnoreDuplicateOutsideChunk(_ int64, _ int64, last *chunks.Meta, curr *chun // the current one. if curr.MinTime != last.MinTime || curr.MaxTime != last.MaxTime { return false, errors.Errorf("non-sequential chunks not equal: [%d, %d] and [%d, %d]", - last.MaxTime, last.MaxTime, curr.MinTime, curr.MaxTime) + last.MinTime, last.MaxTime, curr.MinTime, curr.MaxTime) } ca := crc32.Checksum(last.Chunk.Bytes(), castagnoli) cb := crc32.Checksum(curr.Chunk.Bytes(), castagnoli) @@ -559,9 +559,14 @@ func sanitizeChunkSequence(chks []chunks.Meta, mint int64, maxt int64, ignoreChk var last *chunks.Meta OUTER: - for _, c := range chks { + // This compares the current chunk to the chunk from the last iteration + // by pointers. If we use "i, c := range chks" the variable c is a new + // variable who's address doesn't change through the entire loop. + // The current element of the chks slice is copied into it. We must take + // the address of the indexed slice instead. + for i := range chks { for _, ignoreChkFn := range ignoreChkFns { - ignore, err := ignoreChkFn(mint, maxt, last, &c) + ignore, err := ignoreChkFn(mint, maxt, last, &chks[i]) if err != nil { return nil, errors.Wrap(err, "ignore function") } @@ -571,13 +576,18 @@ OUTER: } } - last = &c - repl = append(repl, c) + last = &chks[i] + repl = append(repl, chks[i]) } return repl, nil } +type seriesRepair struct { + lset labels.Labels + chks []chunks.Meta +} + // rewrite writes all data from the readers back into the writers while cleaning // up mis-ordered and duplicated chunks. func rewrite( @@ -605,17 +615,20 @@ func rewrite( postings = index.NewMemPostings() values = map[string]stringset{} i = uint64(0) + series = []seriesRepair{} ) - var lset labels.Labels - var chks []chunks.Meta - for all.Next() { + var lset labels.Labels + var chks []chunks.Meta id := all.At() if err := indexr.Series(id, &lset, &chks); err != nil { return err } + // Make sure labels are in sorted order. + sort.Sort(lset) + for i, c := range chks { chks[i].Chunk, err = chunkr.Chunk(c.Ref) if err != nil { @@ -632,21 +645,39 @@ func rewrite( continue } - if err := chunkw.WriteChunks(chks...); err != nil { + series = append(series, seriesRepair{ + lset: lset, + chks: chks, + }) + } + + if all.Err() != nil { + return errors.Wrap(all.Err(), "iterate series") + } + + // Sort the series, if labels are re-ordered then the ordering of series + // will be different. + sort.Slice(series, func(i, j int) bool { + return labels.Compare(series[i].lset, series[j].lset) < 0 + }) + + // Build a new TSDB block. + for _, s := range series { + if err := chunkw.WriteChunks(s.chks...); err != nil { return errors.Wrap(err, "write chunks") } - if err := indexw.AddSeries(i, lset, chks...); err != nil { + if err := indexw.AddSeries(i, s.lset, s.chks...); err != nil { return errors.Wrap(err, "add series") } - meta.Stats.NumChunks += uint64(len(chks)) + meta.Stats.NumChunks += uint64(len(s.chks)) meta.Stats.NumSeries++ - for _, chk := range chks { + for _, chk := range s.chks { meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples()) } - for _, l := range lset { + for _, l := range s.lset { valset, ok := values[l.Name] if !ok { valset = stringset{} @@ -654,12 +685,9 @@ func rewrite( } valset.set(l.Value) } - postings.Add(i, lset) + postings.Add(i, s.lset) i++ } - if all.Err() != nil { - return errors.Wrap(all.Err(), "iterate series") - } s := make([]string, 0, 256) for n, v := range values { diff --git a/pkg/verifier/overlapped_blocks.go b/pkg/verifier/overlapped_blocks.go index 072fe54aec..af28fccb0c 100644 --- a/pkg/verifier/overlapped_blocks.go +++ b/pkg/verifier/overlapped_blocks.go @@ -2,6 +2,8 @@ package verifier import ( "context" + "sort" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" @@ -10,7 +12,6 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/tsdb" - "sort" ) const OverlappedBlocksIssueID = "overlapped_blocks" diff --git a/pkg/verifier/safe_delete.go b/pkg/verifier/safe_delete.go index 94383e07e2..cfea10c0df 100644 --- a/pkg/verifier/safe_delete.go +++ b/pkg/verifier/safe_delete.go @@ -2,9 +2,9 @@ package verifier import ( "context" - "fmt" "io/ioutil" "os" + "path/filepath" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -31,13 +31,18 @@ func SafeDelete(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac return errors.Errorf("%s dir seems to exists in backup bucket. Remove this block manually if you are sure it is safe to do", id) } - dir, err := ioutil.TempDir("", fmt.Sprintf("safe-delete-%s", id)) + tempdir, err := ioutil.TempDir("", "safe-delete") + if err != nil { + return err + } + dir := filepath.Join(tempdir, id.String()) + err = os.Mkdir(dir, 0755) if err != nil { return err } defer func() { - if err := os.RemoveAll(dir); err != nil { - level.Warn(logger).Log("msg", "failed to delete dir", "dir", dir, "err", err) + if err := os.RemoveAll(tempdir); err != nil { + level.Warn(logger).Log("msg", "failed to delete dir", "dir", tempdir, "err", err) } }()