Skip to content

Commit

Permalink
Merge pull request thanos-io#6049 from SuperQ/superq/remove_group_label
Browse files Browse the repository at this point in the history
Compact: Replace group with resolution in compact metrics.
  • Loading branch information
fpetkovski authored Jun 17, 2023
2 parents d430269 + 7eeb1bc commit b7a7522
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 161 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6428](https://github.com/thanos-io/thanos/pull/6428) Report gRPC connnection errors in the logs.

### Changed
- [#6049](https://github.com/thanos-io/thanos/pull/6049) Compact: *breaking :warning:* Replace group with resolution in compact metrics to avoid cardinality explosion on compact metrics for large numbers of groups.
- [#6168](https://github.com/thanos-io/thanos/pull/6168) Receiver: Make ketama hashring fail early when configured with number of nodes lower than the replication factor.
- [#6201](https://github.com/thanos-io/thanos/pull/6201) Query-Frontend: Disable absent and absent_over_time for vertical sharding.
- [#6212](https://github.com/thanos-io/thanos/pull/6212) Query-Frontend: Disable scalar for vertical sharding.
Expand Down
12 changes: 6 additions & 6 deletions examples/dashboards/compact.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"dashLength": 10,
"dashes": false,
"datasource": "$datasource",
"description": "Shows rate of execution for compactions against blocks that are stored in the bucket by compaction group.",
"description": "Shows rate of execution for compactions against blocks that are stored in the bucket by compaction resolution.",
"fill": 10,
"id": 1,
"legend": {
Expand All @@ -46,10 +46,10 @@
"steppedLine": false,
"targets": [
{
"expr": "sum by (job, group) (rate(thanos_compact_group_compactions_total{job=~\"$job\"}[$__rate_interval]))",
"expr": "sum by (job, resolution) (rate(thanos_compact_group_compactions_total{job=~\"$job\"}[$__rate_interval]))",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "compaction {{job}} {{group}}",
"legendFormat": "compaction {{job}} {{resolution}}",
"legendLink": null,
"step": 10
}
Expand Down Expand Up @@ -186,7 +186,7 @@
"dashLength": 10,
"dashes": false,
"datasource": "$datasource",
"description": "Shows rate of execution for downsampling against blocks that are stored in the bucket by compaction group.",
"description": "Shows rate of execution for downsampling against blocks that are stored in the bucket by compaction resolution.",
"fill": 10,
"id": 3,
"legend": {
Expand All @@ -213,10 +213,10 @@
"steppedLine": false,
"targets": [
{
"expr": "sum by (job, group) (rate(thanos_compact_downsample_total{job=~\"$job\"}[$__rate_interval]))",
"expr": "sum by (job, resolution) (rate(thanos_compact_downsample_total{job=~\"$job\"}[$__rate_interval]))",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "downsample {{job}} {{group}}",
"legendFormat": "downsample {{job}} {{resolution}}",
"legendLink": null,
"step": 10
}
Expand Down
2 changes: 1 addition & 1 deletion examples/dashboards/overview.json
Original file line number Diff line number Diff line change
Expand Up @@ -1928,7 +1928,7 @@
"dashLength": 10,
"dashes": false,
"datasource": "$datasource",
"description": "Shows rate of execution for compactions against blocks that are stored in the bucket by compaction group.",
"description": "Shows rate of execution for compactions against blocks that are stored in the bucket.",
"fill": 10,
"id": 19,
"legend": {
Expand Down
14 changes: 7 additions & 7 deletions mixin/dashboards/compact.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ local g = import '../lib/thanos-grafana-builder/builder.libsonnet';
.addPanel(
g.panel(
'Rate',
'Shows rate of execution for compactions against blocks that are stored in the bucket by compaction group.'
'Shows rate of execution for compactions against blocks that are stored in the bucket by compaction resolution.'
) +
g.queryPanel(
'sum by (%(dimensions)s, group) (rate(thanos_compact_group_compactions_total{%(selector)s}[$__rate_interval]))' % thanos.compact.dashboard,
'compaction {{job}} {{group}}'
'sum by (%(dimensions)s, resolution) (rate(thanos_compact_group_compactions_total{%(selector)s}[$__rate_interval]))' % thanos.compact.dashboard,
'compaction {{job}} {{resolution}}'
) +
g.stack
)
Expand All @@ -43,11 +43,11 @@ local g = import '../lib/thanos-grafana-builder/builder.libsonnet';
.addPanel(
g.panel(
'Rate',
'Shows rate of execution for downsampling against blocks that are stored in the bucket by compaction group.'
'Shows rate of execution for downsampling against blocks that are stored in the bucket by compaction resolution.'
) +
g.queryPanel(
'sum by (%(dimensions)s, group) (rate(thanos_compact_downsample_total{%(selector)s}[$__rate_interval]))' % thanos.compact.dashboard,
'downsample {{job}} {{group}}'
'sum by (%(dimensions)s, resolution) (rate(thanos_compact_downsample_total{%(selector)s}[$__rate_interval]))' % thanos.compact.dashboard,
'downsample {{job}} {{resolution}}'
) +
g.stack
)
Expand Down Expand Up @@ -178,7 +178,7 @@ local g = import '../lib/thanos-grafana-builder/builder.libsonnet';
.addPanel(
g.panel(
'Compaction Rate',
'Shows rate of execution for compactions against blocks that are stored in the bucket by compaction group.'
'Shows rate of execution for compactions against blocks that are stored in the bucket.'
) +
g.queryPanel(
'sum by (%(dimensions)s) (rate(thanos_compact_group_compactions_total{%(selector)s}[$__rate_interval]))' % thanos.dashboard.overview,
Expand Down
7 changes: 6 additions & 1 deletion pkg/block/metadata/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,17 @@ func InjectThanos(logger log.Logger, bdir string, meta Thanos, downsampledMeta *
return newMeta, nil
}

// Returns a unique identifier for the compaction group the block belongs to.
// GroupKey returns a unique identifier for the compaction group the block belongs to.
// It considers the downsampling resolution and the block's labels.
func (m *Thanos) GroupKey() string {
return fmt.Sprintf("%d@%v", m.Downsample.Resolution, labels.FromMap(m.Labels).Hash())
}

// ResolutionString returns a the block's resolution as a string.
func (m *Thanos) ResolutionString() string {
return fmt.Sprintf("%d", m.Downsample.Resolution)
}

// WriteToDir writes the encoded meta into <dir>/meta.json.
func (m Meta) WriteToDir(logger log.Logger, dir string) error {
// Make any changes to the file appear atomic.
Expand Down
67 changes: 34 additions & 33 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,23 +256,23 @@ func NewDefaultGrouper(
compactions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_compact_group_compactions_total",
Help: "Total number of group compaction attempts that resulted in a new block.",
}, []string{"group"}),
}, []string{"resolution"}),
compactionRunsStarted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_compact_group_compaction_runs_started_total",
Help: "Total number of group compaction attempts.",
}, []string{"group"}),
}, []string{"resolution"}),
compactionRunsCompleted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_compact_group_compaction_runs_completed_total",
Help: "Total number of group completed compaction runs. This also includes compactor group runs that resulted with no compaction.",
}, []string{"group"}),
}, []string{"resolution"}),
compactionFailures: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_compact_group_compactions_failures_total",
Help: "Total number of failed group compactions.",
}, []string{"group"}),
}, []string{"resolution"}),
verticalCompactions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_compact_group_vertical_compactions_total",
Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.",
}, []string{"group"}),
}, []string{"resolution"}),
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
Expand All @@ -291,19 +291,20 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
group, ok := groups[groupKey]
if !ok {
lbls := labels.FromMap(m.Thanos.Labels)
resolutionLabel := m.Thanos.ResolutionString()
group, err = NewGroup(
log.With(g.logger, "group", fmt.Sprintf("%d@%v", m.Thanos.Downsample.Resolution, lbls.String()), "groupKey", groupKey),
log.With(g.logger, "group", fmt.Sprintf("%s@%v", resolutionLabel, lbls.String()), "groupKey", groupKey),
g.bkt,
groupKey,
lbls,
m.Thanos.Downsample.Resolution,
g.acceptMalformedIndex,
g.enableVerticalCompaction,
g.compactions.WithLabelValues(groupKey),
g.compactionRunsStarted.WithLabelValues(groupKey),
g.compactionRunsCompleted.WithLabelValues(groupKey),
g.compactionFailures.WithLabelValues(groupKey),
g.verticalCompactions.WithLabelValues(groupKey),
g.compactions.WithLabelValues(resolutionLabel),
g.compactionRunsStarted.WithLabelValues(resolutionLabel),
g.compactionRunsCompleted.WithLabelValues(resolutionLabel),
g.compactionFailures.WithLabelValues(resolutionLabel),
g.verticalCompactions.WithLabelValues(resolutionLabel),
g.garbageCollectedBlocks,
g.blocksMarkedForDeletion,
g.blocksMarkedForNoCompact,
Expand Down Expand Up @@ -492,8 +493,8 @@ func (cg *Group) Resolution() int64 {

// CompactProgressMetrics contains Prometheus metrics related to compaction progress.
type CompactProgressMetrics struct {
NumberOfCompactionRuns *prometheus.GaugeVec
NumberOfCompactionBlocks *prometheus.GaugeVec
NumberOfCompactionRuns prometheus.Gauge
NumberOfCompactionBlocks prometheus.Gauge
}

// ProgressCalculator calculates the progress of the compaction process for a given slice of Groups.
Expand All @@ -512,14 +513,14 @@ func NewCompactionProgressCalculator(reg prometheus.Registerer, planner *tsdbBas
return &CompactionProgressCalculator{
planner: planner,
CompactProgressMetrics: &CompactProgressMetrics{
NumberOfCompactionRuns: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
NumberOfCompactionRuns: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_compact_todo_compactions",
Help: "number of compactions to be done",
}, []string{"group"}),
NumberOfCompactionBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
}),
NumberOfCompactionBlocks: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_compact_todo_compaction_blocks",
Help: "number of blocks planned to be compacted",
}, []string{"group"}),
}),
},
}
}
Expand Down Expand Up @@ -568,20 +569,20 @@ func (ps *CompactionProgressCalculator) ProgressCalculate(ctx context.Context, g
groups = tmpGroups
}

ps.CompactProgressMetrics.NumberOfCompactionRuns.Reset()
ps.CompactProgressMetrics.NumberOfCompactionBlocks.Reset()
ps.CompactProgressMetrics.NumberOfCompactionRuns.Set(0)
ps.CompactProgressMetrics.NumberOfCompactionBlocks.Set(0)

for key, iters := range groupCompactions {
ps.CompactProgressMetrics.NumberOfCompactionRuns.WithLabelValues(key).Add(float64(iters))
ps.CompactProgressMetrics.NumberOfCompactionBlocks.WithLabelValues(key).Add(float64(groupBlocks[key]))
ps.CompactProgressMetrics.NumberOfCompactionRuns.Add(float64(iters))
ps.CompactProgressMetrics.NumberOfCompactionBlocks.Add(float64(groupBlocks[key]))
}

return nil
}

// DownsampleProgressMetrics contains Prometheus metrics related to downsampling progress.
type DownsampleProgressMetrics struct {
NumberOfBlocksDownsampled *prometheus.GaugeVec
NumberOfBlocksDownsampled prometheus.Gauge
}

// DownsampleProgressCalculator contains DownsampleMetrics, which are updated during the downsampling simulation process.
Expand All @@ -593,10 +594,10 @@ type DownsampleProgressCalculator struct {
func NewDownsampleProgressCalculator(reg prometheus.Registerer) *DownsampleProgressCalculator {
return &DownsampleProgressCalculator{
DownsampleProgressMetrics: &DownsampleProgressMetrics{
NumberOfBlocksDownsampled: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
NumberOfBlocksDownsampled: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_compact_todo_downsample_blocks",
Help: "number of blocks to be downsampled",
}, []string{"group"}),
}),
},
}
}
Expand Down Expand Up @@ -666,17 +667,17 @@ func (ds *DownsampleProgressCalculator) ProgressCalculate(ctx context.Context, g
}
}

ds.DownsampleProgressMetrics.NumberOfBlocksDownsampled.Reset()
for key, blocks := range groupBlocks {
ds.DownsampleProgressMetrics.NumberOfBlocksDownsampled.WithLabelValues(key).Add(float64(blocks))
ds.DownsampleProgressMetrics.NumberOfBlocksDownsampled.Set(0)
for _, blocks := range groupBlocks {
ds.DownsampleProgressMetrics.NumberOfBlocksDownsampled.Add(float64(blocks))
}

return nil
}

// RetentionProgressMetrics contains Prometheus metrics related to retention progress.
type RetentionProgressMetrics struct {
NumberOfBlocksToDelete *prometheus.GaugeVec
NumberOfBlocksToDelete prometheus.Gauge
}

// RetentionProgressCalculator contains RetentionProgressMetrics, which are updated during the retention simulation process.
Expand All @@ -690,10 +691,10 @@ func NewRetentionProgressCalculator(reg prometheus.Registerer, retentionByResolu
return &RetentionProgressCalculator{
retentionByResolution: retentionByResolution,
RetentionProgressMetrics: &RetentionProgressMetrics{
NumberOfBlocksToDelete: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
NumberOfBlocksToDelete: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_compact_todo_deletion_blocks",
Help: "number of blocks that have crossed their retention period",
}, []string{"group"}),
}),
},
}
}
Expand All @@ -715,9 +716,9 @@ func (rs *RetentionProgressCalculator) ProgressCalculate(ctx context.Context, gr
}
}

rs.RetentionProgressMetrics.NumberOfBlocksToDelete.Reset()
for key, blocks := range groupBlocks {
rs.RetentionProgressMetrics.NumberOfBlocksToDelete.WithLabelValues(key).Add(float64(blocks))
rs.RetentionProgressMetrics.NumberOfBlocksToDelete.Set(0)
for _, blocks := range groupBlocks {
rs.RetentionProgressMetrics.NumberOfBlocksToDelete.Add(float64(blocks))
}

return nil
Expand Down
32 changes: 12 additions & 20 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,26 +324,18 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
testutil.Equals(t, 5.0, promtest.ToFloat64(sy.metrics.blocksMarkedForDeletion))
testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.blocksMarkedForNoCompact))
testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures))
testutil.Equals(t, 4, MetricCount(grouper.compactions))
testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(metas[0].Thanos.GroupKey())))
testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(metas[7].Thanos.GroupKey())))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(metas[4].Thanos.GroupKey())))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(metas[5].Thanos.GroupKey())))
testutil.Equals(t, 4, MetricCount(grouper.compactionRunsStarted))
testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(metas[0].Thanos.GroupKey())))
testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(metas[7].Thanos.GroupKey())))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(metas[4].Thanos.GroupKey())))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(metas[5].Thanos.GroupKey())))
testutil.Equals(t, 4, MetricCount(grouper.compactionRunsCompleted))
testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(metas[0].Thanos.GroupKey())))
testutil.Equals(t, 3.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(metas[7].Thanos.GroupKey())))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(metas[4].Thanos.GroupKey())))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(metas[5].Thanos.GroupKey())))
testutil.Equals(t, 4, MetricCount(grouper.compactionFailures))
testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(metas[0].Thanos.GroupKey())))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(metas[7].Thanos.GroupKey())))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(metas[4].Thanos.GroupKey())))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(metas[5].Thanos.GroupKey())))
testutil.Equals(t, 2, MetricCount(grouper.compactions))
testutil.Equals(t, 2.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(metas[0].Thanos.ResolutionString())))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactions.WithLabelValues(metas[5].Thanos.ResolutionString())))
testutil.Equals(t, 2, MetricCount(grouper.compactionRunsStarted))
testutil.Equals(t, 6.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(metas[0].Thanos.ResolutionString())))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionRunsStarted.WithLabelValues(metas[5].Thanos.ResolutionString())))
testutil.Equals(t, 2, MetricCount(grouper.compactionRunsCompleted))
testutil.Equals(t, 5.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(metas[0].Thanos.ResolutionString())))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionRunsCompleted.WithLabelValues(metas[5].Thanos.ResolutionString())))
testutil.Equals(t, 2, MetricCount(grouper.compactionFailures))
testutil.Equals(t, 1.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(metas[0].Thanos.ResolutionString())))
testutil.Equals(t, 0.0, promtest.ToFloat64(grouper.compactionFailures.WithLabelValues(metas[5].Thanos.ResolutionString())))

_, err = os.Stat(dir)
testutil.Assert(t, os.IsNotExist(err), "dir %s should be remove after compaction.", dir)
Expand Down
Loading

0 comments on commit b7a7522

Please sign in to comment.