Skip to content

Commit

Permalink
compact: Add Index Stats to block metadata (thanos-io#6441)
Browse files Browse the repository at this point in the history
* compactor set index stats on block

Signed-off-by: Ben Ye <benye@amazon.com>

* comment

Signed-off-by: Ben Ye <benye@amazon.com>

* change field

Signed-off-by: Ben Ye <benye@amazon.com>

* update downsample

Signed-off-by: Ben Ye <benye@amazon.com>

* add index stats into compaction

Signed-off-by: Ben Ye <benye@amazon.com>

* fix tests

Signed-off-by: Ben Ye <benye@amazon.com>

* fix test

Signed-off-by: Ben Ye <benye@amazon.com>

* update changelog

Signed-off-by: Ben Ye <benye@amazon.com>

---------

Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 authored and HC Zhu committed Jun 27, 2023
1 parent 35c7610 commit 5e8d6d9
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5777](https://github.com/thanos-io/thanos/pull/5777) Receive: Allow specifying tenant-specific external labels in Router Ingestor.
- [#6352](https://github.com/thanos-io/thanos/pull/6352) Store: Expose store gateway query stats in series response hints.
- [#6420](https://github.com/thanos-io/thanos/pull/6420) Index Cache: Cache expanded postings.
- [#6441](https://github.com/thanos-io/thanos/pull/6441) Compact: Compactor will set `index_stats` in `meta.json` file with max series and chunk size information.

### Fixed
- [#6427](https://github.com/thanos-io/thanos/pull/6427) Receive: increasing log level for failed uploads to error
Expand Down
21 changes: 20 additions & 1 deletion cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,29 @@ func processDownsampling(
"from", m.ULID, "to", id, "duration", downsampleDuration, "duration_ms", downsampleDuration.Milliseconds())
metrics.downsampleDuration.WithLabelValues(m.Thanos.GroupKey()).Observe(downsampleDuration.Seconds())

if err := block.VerifyIndex(logger, filepath.Join(resdir, block.IndexFilename), m.MinTime, m.MaxTime); err != nil && !acceptMalformedIndex {
stats, err := block.GatherIndexHealthStats(logger, filepath.Join(resdir, block.IndexFilename), m.MinTime, m.MaxTime)
if err == nil {
err = stats.AnyErr()
}
if err != nil && !acceptMalformedIndex {
return errors.Wrap(err, "output block index not valid")
}

meta, err := metadata.ReadFromDir(resdir)
if err != nil {
return errors.Wrap(err, "read meta")
}

if stats.ChunkMaxSize > 0 {
meta.Thanos.IndexStats.ChunkMaxSize = stats.ChunkMaxSize
}
if stats.SeriesMaxSize > 0 {
meta.Thanos.IndexStats.SeriesMaxSize = stats.SeriesMaxSize
}
if err := meta.WriteToDir(logger, resdir); err != nil {
return errors.Wrap(err, "write meta")
}

begin = time.Now()

err = block.Upload(ctx, logger, bkt, resdir, hashFunc)
Expand Down
9 changes: 5 additions & 4 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, 3, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 546, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
testutil.Equals(t, 567, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))

// File stats are gathered.
testutil.Equals(t, fmt.Sprintf(`{
Expand Down Expand Up @@ -181,7 +181,8 @@ func TestUpload(t *testing.T) {
{
"rel_path": "meta.json"
}
]
],
"index_stats": {}
}
}
`, b1.String(), b1.String()), string(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
Expand All @@ -192,7 +193,7 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, 3, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)]))
testutil.Equals(t, 546, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
testutil.Equals(t, 567, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)]))
}
{
// Upload with no external labels should be blocked.
Expand Down Expand Up @@ -224,7 +225,7 @@ func TestUpload(t *testing.T) {
testutil.Equals(t, 6, len(bkt.Objects()))
testutil.Equals(t, 3727, len(bkt.Objects()[path.Join(b2.String(), ChunksDirname, "000001")]))
testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b2.String(), IndexFilename)]))
testutil.Equals(t, 525, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)]))
testutil.Equals(t, 546, len(bkt.Objects()[path.Join(b2.String(), MetaFilename)]))
}
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ type Thanos struct {

// Rewrites is present when any rewrite (deletion, relabel etc) were applied to this block. Optional.
Rewrites []Rewrite `json:"rewrites,omitempty"`

// IndexStats contains stats info related to block index.
IndexStats IndexStats `json:"index_stats,omitempty"`
}

type IndexStats struct {
SeriesMaxSize int64 `json:"series_max_size,omitempty"`
ChunkMaxSize int64 `json:"chunk_max_size,omitempty"`
}

type Rewrite struct {
Expand Down
16 changes: 13 additions & 3 deletions pkg/block/metadata/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func TestMeta_ReadWrite(t *testing.T) {
"downsample": {
"resolution": 0
},
"source": ""
"source": "",
"index_stats": {}
}
}
`, b.String())
Expand Down Expand Up @@ -73,6 +74,10 @@ func TestMeta_ReadWrite(t *testing.T) {
Downsample: ThanosDownsample{
Resolution: 123144,
},
IndexStats: IndexStats{
SeriesMaxSize: 2000,
ChunkMaxSize: 1000,
},
},
}
testutil.Ok(t, m1.Write(&b))
Expand Down Expand Up @@ -121,7 +126,11 @@ func TestMeta_ReadWrite(t *testing.T) {
{
"rel_path": "meta.json"
}
]
],
"index_stats": {
"series_max_size": 2000,
"chunk_max_size": 1000
}
}
}
`, b.String())
Expand Down Expand Up @@ -199,7 +208,8 @@ func TestMeta_ReadWrite(t *testing.T) {
"rel_path": "index",
"size_bytes": 1313
}
]
],
"index_stats": {}
}
}
`, b.String())
Expand Down
41 changes: 29 additions & 12 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ func outOfOrderChunkError(err error, brokenBlock ulid.ULID) OutOfOrderChunksErro
return OutOfOrderChunksError{err: err, id: brokenBlock}
}

// IsOutOfOrderChunk returns true if the base error is a OutOfOrderChunkError.
// IsOutOfOrderChunkError returns true if the base error is a OutOfOrderChunkError.
func IsOutOfOrderChunkError(err error) bool {
_, ok := errors.Cause(err).(OutOfOrderChunksError)
return ok
Expand Down Expand Up @@ -1100,28 +1100,45 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
bdir := filepath.Join(dir, compID.String())
index := filepath.Join(bdir, block.IndexFilename)

newMeta, err := metadata.InjectThanos(cg.logger, bdir, metadata.Thanos{
Labels: cg.labels.Map(),
Downsample: metadata.ThanosDownsample{Resolution: cg.resolution},
Source: metadata.CompactorSource,
SegmentFiles: block.GetSegmentFiles(bdir),
}, nil)
if err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "failed to finalize the block %s", bdir)
if err := os.Remove(filepath.Join(bdir, "tombstones")); err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "remove tombstones")
}

if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "remove tombstones")
newMeta, err := metadata.ReadFromDir(bdir)
if err != nil {
return false, ulid.ULID{}, errors.Wrap(err, "read new meta")
}

var stats block.HealthStats
// Ensure the output block is valid.
err = tracing.DoInSpanWithErr(ctx, "compaction_verify_index", func(ctx context.Context) error {
return block.VerifyIndex(cg.logger, index, newMeta.MinTime, newMeta.MaxTime)
stats, err = block.GatherIndexHealthStats(cg.logger, index, newMeta.MinTime, newMeta.MaxTime)
if err != nil {
return err
}
return stats.AnyErr()
})
if !cg.acceptMalformedIndex && err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "invalid result block %s", bdir))
}

thanosMeta := metadata.Thanos{
Labels: cg.labels.Map(),
Downsample: metadata.ThanosDownsample{Resolution: cg.resolution},
Source: metadata.CompactorSource,
SegmentFiles: block.GetSegmentFiles(bdir),
}
if stats.ChunkMaxSize > 0 {
thanosMeta.IndexStats.ChunkMaxSize = stats.ChunkMaxSize
}
if stats.SeriesMaxSize > 0 {
thanosMeta.IndexStats.SeriesMaxSize = stats.SeriesMaxSize
}
newMeta, err = metadata.InjectThanos(cg.logger, bdir, thanosMeta, nil)
if err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "failed to finalize the block %s", bdir)
}

// Ensure the output block is not overlapping with anything else,
// unless vertical compaction is enabled.
if !cg.enableVerticalCompaction {
Expand Down
4 changes: 4 additions & 0 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,8 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
testutil.Assert(t, labels.Equal(extLabels, labels.FromMap(meta.Thanos.Labels)), "ext labels does not match")
testutil.Equals(t, int64(124), meta.Thanos.Downsample.Resolution)
testutil.Assert(t, len(meta.Thanos.SegmentFiles) > 0, "compacted blocks have segment files set")
// Only one chunk will be generated in that block, so we won't set chunk size.
testutil.Assert(t, meta.Thanos.IndexStats.SeriesMaxSize > 0, "compacted blocks have index stats series max size set")
}
{
meta, ok := others[groupKey2]
Expand All @@ -415,6 +417,8 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
testutil.Assert(t, labels.Equal(extLabels2, labels.FromMap(meta.Thanos.Labels)), "ext labels does not match")
testutil.Equals(t, int64(124), meta.Thanos.Downsample.Resolution)
testutil.Assert(t, len(meta.Thanos.SegmentFiles) > 0, "compacted blocks have segment files set")
// Only one chunk will be generated in that block, so we won't set chunk size.
testutil.Assert(t, meta.Thanos.IndexStats.SeriesMaxSize > 0, "compacted blocks have index stats series max size set")
}
})
}
Expand Down

0 comments on commit 5e8d6d9

Please sign in to comment.