Skip to content

Commit

Permalink
Refactor compactor constants, fix bucket column (#1561)
Browse files Browse the repository at this point in the history
* compact: unify different time constants

Use downsample.* constants where possible. Move the downsampling time
ranges into constants and use them as well.

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* bucket: refactor column calculation into compact

Fix the column's name and name it UNTIL-DOWN because that is what it
actually shows - time until the next downsampling.

Move out the calculation into a separate function into the compact
package. Ideally we could use the retention policies in this calculation
as well but the `bucket` subcommand knows nothing about them :-(

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>

* compact: fix issues with naming

Reorder the constants and fix mistakes.

Signed-off-by: Giedrius Statkevičius <giedriuswork@gmail.com>
  • Loading branch information
GiedriusS authored and brancz committed Sep 25, 2019
1 parent b788fb4 commit 99bc7d2
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 19 deletions.
16 changes: 7 additions & 9 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
Expand Down Expand Up @@ -50,7 +51,7 @@ var (
sort.Strings(s)
return s
}
inspectColumns = []string{"ULID", "FROM", "UNTIL", "RANGE", "UNTIL-COMP", "#SERIES", "#SAMPLES", "#CHUNKS", "COMP-LEVEL", "COMP-FAILED", "LABELS", "RESOLUTION", "SOURCE"}
inspectColumns = []string{"ULID", "FROM", "UNTIL", "RANGE", "UNTIL-DOWN", "#SERIES", "#SAMPLES", "#CHUNKS", "COMP-LEVEL", "COMP-FAILED", "LABELS", "RESOLUTION", "SOURCE"}
)

func registerBucket(m map[string]setupFunc, app *kingpin.Application, name string) {
Expand Down Expand Up @@ -420,13 +421,10 @@ func printTable(blockMetas []*metadata.Meta, selectorLabels labels.Labels, sortB
}

timeRange := time.Duration((blockMeta.MaxTime - blockMeta.MinTime) * int64(time.Millisecond))
// Calculate how long it takes until the next compaction.
untilComp := "-"
if blockMeta.Thanos.Downsample.Resolution == 0 { // data currently raw, downsample if range >= 40 hours
untilComp = (time.Duration(40*60*60*1000*time.Millisecond) - timeRange).String()
}
if blockMeta.Thanos.Downsample.Resolution == 5*60*1000 { // data currently 5m resolution, downsample if range >= 10 days
untilComp = (time.Duration(10*24*60*60*1000*time.Millisecond) - timeRange).String()

untilDown := "-"
if until, err := compact.UntilNextDownsampling(blockMeta); err == nil {
untilDown = until.String()
}
var labels []string
for _, key := range getKeysAlphabetically(blockMeta.Thanos.Labels) {
Expand All @@ -438,7 +436,7 @@ func printTable(blockMetas []*metadata.Meta, selectorLabels labels.Labels, sortB
line = append(line, time.Unix(blockMeta.MinTime/1000, 0).Format("02-01-2006 15:04:05"))
line = append(line, time.Unix(blockMeta.MaxTime/1000, 0).Format("02-01-2006 15:04:05"))
line = append(line, timeRange.String())
line = append(line, untilComp)
line = append(line, untilDown)
line = append(line, p.Sprintf("%d", blockMeta.Stats.NumSeries))
line = append(line, p.Sprintf("%d", blockMeta.Stats.NumSamples))
line = append(line, p.Sprintf("%d", blockMeta.Stats.NumChunks))
Expand Down
18 changes: 9 additions & 9 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,13 @@ func downsampleBucket(

for _, m := range metas {
switch m.Thanos.Downsample.Resolution {
case 0:
case downsample.ResLevel0:
continue
case 5 * 60 * 1000:
case downsample.ResLevel1:
for _, id := range m.Compaction.Sources {
sources5m[id] = struct{}{}
}
case 60 * 60 * 1000:
case downsample.ResLevel2:
for _, id := range m.Compaction.Sources {
sources1h[id] = struct{}{}
}
Expand All @@ -187,7 +187,7 @@ func downsampleBucket(

for _, m := range metas {
switch m.Thanos.Downsample.Resolution {
case 0:
case downsample.ResLevel0:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources5m[id]; !ok {
Expand All @@ -201,16 +201,16 @@ func downsampleBucket(
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < 40*60*60*1000 {
if m.MaxTime-m.MinTime < downsample.DownsampleRange0 {
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, 5*60*1000); err != nil {
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel1); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.GroupKey(*m)).Inc()
return errors.Wrap(err, "downsampling to 5 min")
}
metrics.downsamples.WithLabelValues(compact.GroupKey(*m)).Inc()

case 5 * 60 * 1000:
case downsample.ResLevel1:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources1h[id]; !ok {
Expand All @@ -224,10 +224,10 @@ func downsampleBucket(
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < 10*24*60*60*1000 {
if m.MaxTime-m.MinTime < downsample.DownsampleRange1 {
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, 60*60*1000); err != nil {
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.GroupKey(*m))
return errors.Wrap(err, "downsampling to 60 min")
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,25 @@ func (c *Syncer) SyncMetas(ctx context.Context) error {
}
c.metrics.syncMetas.Inc()
c.metrics.syncMetaDuration.Observe(time.Since(begin).Seconds())

return err
}

// UntilNextDownsampling calculates how long it will take until the next downsampling operation.
// Returns an error if there will be no downsampling.
func UntilNextDownsampling(m *metadata.Meta) (time.Duration, error) {
timeRange := time.Duration((m.MaxTime - m.MinTime) * int64(time.Millisecond))
switch m.Thanos.Downsample.Resolution {
case downsample.ResLevel2:
return time.Duration(0), errors.New("no downsampling")
case downsample.ResLevel1:
return time.Duration(downsample.DownsampleRange1*time.Millisecond) - timeRange, nil
case downsample.ResLevel0:
return time.Duration(downsample.DownsampleRange0*time.Millisecond) - timeRange, nil
default:
panic(fmt.Errorf("invalid resolution %v", m.Thanos.Downsample.Resolution))
}
}

func (c *Syncer) syncMetas(ctx context.Context) error {
var wg sync.WaitGroup
defer wg.Wait()
Expand Down
6 changes: 6 additions & 0 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ const (
ResLevel2 = int64(60 * 60 * 1000) // 1 hour in milliseconds
)

// Downsampling ranges i.e. after what time we start to downsample blocks (in seconds).
const (
DownsampleRange0 = 40 * 60 * 60 * 1000 // 40 hours
DownsampleRange1 = 10 * 24 * 60 * 60 * 1000 // 10 days
)

// Downsample downsamples the given block. It writes a new block into dir and returns its ID.
func Downsample(
logger log.Logger,
Expand Down

0 comments on commit 99bc7d2

Please sign in to comment.