Skip to content

Commit

Permalink
Address issues that have discovered after review
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun committed Mar 10, 2020
1 parent ba29ba5 commit 84181fd
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 58 deletions.
10 changes: 5 additions & 5 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").Int()

dedupReplicaLabels := cmd.Flag("offline-deduplication.replica-labels", "Label to treat as a replica indicator of blocks that can be deduplicated. This will merge multiple replica blocks into one. This process is irrevertable. Experminteal").
Hidden().String()
dedupReplicaLabels := cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+
"Experimental. When it is set true, this will given labels from blocks so that vertical compaction could merge blocks.").
Hidden().Strings()

selectorRelabelConf := regSelectorRelabelFlags(cmd)

Expand Down Expand Up @@ -173,7 +174,7 @@ func runCompact(
maxCompactionLevel int,
blockSyncConcurrency int,
concurrency int,
dedupReplicaLabels string,
dedupReplicaLabels []string,
selectorRelabelConf *extflag.PathOrContent,
) error {
halted := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Expand Down Expand Up @@ -246,13 +247,12 @@ func runCompact(

duplicateBlocksFilter := block.NewDeduplicateFilter()
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
replicaLabelFilter := block.ReplicaLabelsFilter{ReplicaLabels: strings.Split(dedupReplicaLabels, ",")}

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
duplicateBlocksFilter.Filter,
replicaLabelFilter.Filter,
block.NewReplicaLabelRemover(dedupReplicaLabels).Modify,
)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
Expand Down
71 changes: 45 additions & 26 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,19 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
)

type txLabeledGauge interface {
WithLabelValues(lvs ...string) prometheus.Gauge
ResetTx()
Submit()
}

type syncMetrics struct {
syncs prometheus.Counter
syncFailures prometheus.Counter
syncDuration prometheus.Histogram

synced *extprom.TxGaugeVec
synced txLabeledGauge
modified txLabeledGauge
}

const (
Expand All @@ -49,11 +56,11 @@ const (
failedMeta = "failed"

// Filter's label values.
labelExcludedMeta = "label-excluded"
timeExcludedMeta = "time-excluded"
tooFreshMeta = "too-fresh"
duplicateMeta = "duplicate"
replicaExclude = "replica-exclude"
labelExcludedMeta = "label-excluded"
timeExcludedMeta = "time-excluded"
tooFreshMeta = "too-fresh"
duplicateMeta = "duplicate"
replicaRemovedMeta = "replica-label-removed"
)

func newSyncMetrics(reg prometheus.Registerer) *syncMetrics {
Expand Down Expand Up @@ -89,6 +96,15 @@ func newSyncMetrics(reg prometheus.Registerer) *syncMetrics {
[]string{labelExcludedMeta},
[]string{timeExcludedMeta},
[]string{duplicateMeta},
[]string{replicaRemovedMeta},
)
m.modified = extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{
Subsystem: syncMetricSubSys,
Name: "modified",
Help: "Number of block metadata that modified",
},
[]string{"modified"},
[]string{replicaRemovedMeta},
)
return &m
}
Expand All @@ -97,11 +113,7 @@ type MetadataFetcher interface {
Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error)
}

type GaugeLabeled interface {
WithLabelValues(lvs ...string) prometheus.Gauge
}

type MetaFetcherFilter func(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, incompleteView bool)
type MetaFetcherFilter func(metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, incompleteView bool)

// MetaFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state.
type MetaFetcher struct {
Expand Down Expand Up @@ -346,7 +358,7 @@ func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.

for _, f := range s.filters {
// NOTE: filter can update synced metric accordingly to the reason of the exclude.
f(metas, s.metrics.synced, incompleteView)
f(metas, s.metrics, incompleteView)
}

s.metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas)))
Expand All @@ -373,12 +385,12 @@ func NewTimePartitionMetaFilter(MinTime, MaxTime model.TimeOrDurationValue) *Tim
}

// Filter filters out blocks that are outside of specified time range.
func (f *TimePartitionMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) {
func (f *TimePartitionMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, _ bool) {
for id, m := range metas {
if m.MaxTime >= f.minTime.PrometheusTimestamp() && m.MinTime <= f.maxTime.PrometheusTimestamp() {
continue
}
synced.WithLabelValues(timeExcludedMeta).Inc()
metrics.synced.WithLabelValues(timeExcludedMeta).Inc()
delete(metas, id)
}
}
Expand All @@ -399,7 +411,7 @@ func NewLabelShardedMetaFilter(relabelConfig []*relabel.Config) *LabelShardedMet
const blockIDLabel = "__block_id"

// Filter filters out blocks that have no labels after relabelling of each block external (Thanos) labels.
func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) {
func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, _ bool) {
var lbls labels.Labels
for id, m := range metas {
lbls = lbls[:0]
Expand All @@ -409,7 +421,7 @@ func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, sync
}

if processedLabels := relabel.Process(lbls, f.relabelConfig...); len(processedLabels) == 0 {
synced.WithLabelValues(labelExcludedMeta).Inc()
metrics.synced.WithLabelValues(labelExcludedMeta).Inc()
delete(metas, id)
}
}
Expand All @@ -427,7 +439,7 @@ func NewDeduplicateFilter() *DeduplicateFilter {

// Filter filters out duplicate blocks that can be formed
// from two or more overlapping blocks that fully submatches the source blocks of the older blocks.
func (f *DeduplicateFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) {
func (f *DeduplicateFilter) Filter(metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, _ bool) {
var wg sync.WaitGroup

metasByResolution := make(map[int64][]*metadata.Meta)
Expand All @@ -444,14 +456,14 @@ func (f *DeduplicateFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced Ga
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(uint64(0), nil),
},
}), metasByResolution[res], metas, res, synced)
}), metasByResolution[res], metas, res, metrics.synced)
}(res)
}

wg.Wait()
}

func (f *DeduplicateFilter) filterForResolution(root *Node, metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, res int64, synced GaugeLabeled) {
func (f *DeduplicateFilter) filterForResolution(root *Node, metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, res int64, synced txLabeledGauge) {
sort.Slice(metaSlice, func(i, j int) bool {
ilen := len(metaSlice[i].Compaction.Sources)
jlen := len(metaSlice[j].Compaction.Sources)
Expand Down Expand Up @@ -527,17 +539,24 @@ func contains(s1 []ulid.ULID, s2 []ulid.ULID) bool {
return true
}

type ReplicaLabelsFilter struct {
ReplicaLabels []string
// ReplicaLabelRemover is a MetaFetcher modifier modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it.
type ReplicaLabelRemover struct {
replicaLabels []string
}

// NewReplicaLabelRemover creates a ReplicaLabelRemover.
func NewReplicaLabelRemover(replicaLabels []string) *ReplicaLabelRemover {
return &ReplicaLabelRemover{replicaLabels: replicaLabels}
}

func (f *ReplicaLabelsFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, view bool) {
// Modify modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it.
func (r *ReplicaLabelRemover) Modify(metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, view bool) {
for u, meta := range metas {
labels := meta.Thanos.Labels
for _, replicaLabel := range f.ReplicaLabels {
for _, replicaLabel := range r.replicaLabels {
if _, exists := labels[replicaLabel]; exists {
delete(labels, replicaLabel)
synced.WithLabelValues(replicaExclude).Inc()
metrics.modified.WithLabelValues(replicaRemovedMeta).Inc()
}
}
metas[u].Thanos.Labels = labels
Expand Down Expand Up @@ -569,7 +588,7 @@ func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Dura
}

// Filter filters out blocks that filters blocks that have are created before a specified consistency delay.
func (f *ConsistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) {
func (f *ConsistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, _ bool) {
for id, meta := range metas {
// TODO(khyatisoneji): Remove the checks about Thanos Source
// by implementing delete delay to fetch metas.
Expand All @@ -580,7 +599,7 @@ func (f *ConsistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta,
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
synced.WithLabelValues(tooFreshMeta).Inc()
metrics.synced.WithLabelValues(tooFreshMeta).Inc()
delete(metas, id)
}
}
Expand Down
65 changes: 41 additions & 24 deletions pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,23 @@ import (
"gopkg.in/yaml.v2"
)

type testTxLabeledGauge struct {
g *prometheus.GaugeVec
}

func (t *testTxLabeledGauge) WithLabelValues(lvs ...string) prometheus.Gauge {
return t.g.WithLabelValues(lvs...)
}
func (t *testTxLabeledGauge) ResetTx() {}
func (t *testTxLabeledGauge) Submit() {}

func newTestSyncMetrics() *syncMetrics {
return &syncMetrics{
synced: &testTxLabeledGauge{promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"})},
modified: &testTxLabeledGauge{promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"modified"})},
}
}

func ULID(i int) ulid.ULID { return ulid.MustNew(uint64(i), nil) }

func ULIDs(is ...int) []ulid.ULID {
Expand All @@ -57,9 +74,9 @@ func TestMetaFetcher_Fetch(t *testing.T) {

var ulidToDelete ulid.ULID
r := prometheus.NewRegistry()
f, err := NewMetaFetcher(log.NewNopLogger(), 20, bkt, dir, r, func(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, incompleteView bool) {
f, err := NewMetaFetcher(log.NewNopLogger(), 20, bkt, dir, r, func(metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, incompleteView bool) {
if _, ok := metas[ulidToDelete]; ok {
synced.WithLabelValues("filtered").Inc()
metrics.synced.WithLabelValues("filtered").Inc()
delete(metas, ulidToDelete)
}
})
Expand Down Expand Up @@ -333,10 +350,10 @@ func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) {
ULID(6): input[ULID(6)],
}

synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"})
f.Filter(input, synced, false)
m := newTestSyncMetrics()
f.Filter(input, m, false)

testutil.Equals(t, 3.0, promtest.ToFloat64(synced.WithLabelValues(labelExcludedMeta)))
testutil.Equals(t, 3.0, promtest.ToFloat64(m.synced.WithLabelValues(labelExcludedMeta)))
testutil.Equals(t, expected, input)

}
Expand Down Expand Up @@ -428,11 +445,11 @@ func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) {
}
deleted := len(input) - len(expected)

synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"})
f.Filter(input, synced, false)
m := newTestSyncMetrics()
f.Filter(input, m, false)

testutil.Equals(t, expected, input)
testutil.Equals(t, float64(deleted), promtest.ToFloat64(synced.WithLabelValues(labelExcludedMeta)))
testutil.Equals(t, float64(deleted), promtest.ToFloat64(m.synced.WithLabelValues(labelExcludedMeta)))

})

Expand Down Expand Up @@ -489,10 +506,10 @@ func TestTimePartitionMetaFilter_Filter(t *testing.T) {
ULID(4): input[ULID(4)],
}

synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"})
f.Filter(input, synced, false)
m := newTestSyncMetrics()
f.Filter(input, m, false)

testutil.Equals(t, 2.0, promtest.ToFloat64(synced.WithLabelValues(timeExcludedMeta)))
testutil.Equals(t, 2.0, promtest.ToFloat64(m.synced.WithLabelValues(timeExcludedMeta)))
testutil.Equals(t, expected, input)

}
Expand Down Expand Up @@ -820,7 +837,7 @@ func TestDeduplicateFilter_Filter(t *testing.T) {
} {
f := NewDeduplicateFilter()
if ok := t.Run(tcase.name, func(t *testing.T) {
synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"})
m := newTestSyncMetrics()
metas := make(map[ulid.ULID]*metadata.Meta)
inputLen := len(tcase.input)
for id, metaInfo := range tcase.input {
Expand All @@ -838,18 +855,18 @@ func TestDeduplicateFilter_Filter(t *testing.T) {
},
}
}
f.Filter(metas, synced, false)
f.Filter(metas, m, false)

compareSliceWithMapKeys(t, metas, tcase.expected)
testutil.Equals(t, float64(inputLen-len(tcase.expected)), promtest.ToFloat64(synced.WithLabelValues(duplicateMeta)))
testutil.Equals(t, float64(inputLen-len(tcase.expected)), promtest.ToFloat64(m.synced.WithLabelValues(duplicateMeta)))
}); !ok {
return
}
}
}

func TestReplicaLabel_Filter(t *testing.T) {
rf := ReplicaLabelsFilter{ReplicaLabels: []string{"replica", "rule_replica"}}
rm := NewReplicaLabelRemover([]string{"replica", "rule_replica"})

input := map[ulid.ULID]*metadata.Meta{
ULID(1): {Thanos: metadata.Thanos{Labels: map[string]string{"message": "something"}}},
Expand All @@ -864,10 +881,10 @@ func TestReplicaLabel_Filter(t *testing.T) {
ULID(4): {Thanos: metadata.Thanos{Labels: map[string]string{}}},
}

synced := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"})
rf.Filter(input, synced, false)
m := newTestSyncMetrics()
rm.Modify(input, m, false)

testutil.Equals(t, 5.0, promtest.ToFloat64(synced.WithLabelValues(replicaExclude)))
testutil.Equals(t, 5.0, promtest.ToFloat64(m.modified.WithLabelValues(replicaRemovedMeta)))
testutil.Equals(t, expected, input)
}

Expand Down Expand Up @@ -956,7 +973,7 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) {
}

t.Run("consistency 0 (turned off)", func(t *testing.T) {
synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"})
m := newTestSyncMetrics()
expected := map[ulid.ULID]*metadata.Meta{}
// Copy all.
for _, id := range u.created {
Expand All @@ -967,14 +984,14 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) {
f := NewConsistencyDelayMetaFilter(nil, 0*time.Second, reg)
testutil.Equals(t, map[string]float64{"consistency_delay_seconds": 0.0}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds"))

f.Filter(input, synced, false)
f.Filter(input, m, false)

testutil.Equals(t, 0.0, promtest.ToFloat64(synced.WithLabelValues(tooFreshMeta)))
testutil.Equals(t, 0.0, promtest.ToFloat64(m.synced.WithLabelValues(tooFreshMeta)))
testutil.Equals(t, expected, input)
})

t.Run("consistency 30m.", func(t *testing.T) {
synced := promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"})
m := newTestSyncMetrics()
expected := map[ulid.ULID]*metadata.Meta{}
// Only certain sources and those with 30m or more age go through.
for i, id := range u.created {
Expand All @@ -993,9 +1010,9 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) {
f := NewConsistencyDelayMetaFilter(nil, 30*time.Minute, reg)
testutil.Equals(t, map[string]float64{"consistency_delay_seconds": (30 * time.Minute).Seconds()}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds"))

f.Filter(input, synced, false)
f.Filter(input, m, false)

testutil.Equals(t, float64(len(u.created)-len(expected)), promtest.ToFloat64(synced.WithLabelValues(tooFreshMeta)))
testutil.Equals(t, float64(len(u.created)-len(expected)), promtest.ToFloat64(m.synced.WithLabelValues(tooFreshMeta)))
testutil.Equals(t, expected, input)
})
}
2 changes: 1 addition & 1 deletion test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestCompact(t *testing.T) {
Type: client.S3,
Config: s3Config,
})

testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(compact))

}
2 changes: 0 additions & 2 deletions test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,6 @@ func NewCompact(sharedDir string, name string, bucketConfig client.BucketConfig)
return nil, errors.Wrapf(err, "generate compact config file: %v", bucketConfig)
}

fmt.Println(string(bktConfigBytes))

return NewService(
fmt.Sprintf("compact-%s", name),
DefaultImage(),
Expand Down

0 comments on commit 84181fd

Please sign in to comment.