Skip to content

Commit

Permalink
Address issues that have discovered after review
Browse files Browse the repository at this point in the history
  • Loading branch information
kakkoyun committed Mar 10, 2020
1 parent ba29ba5 commit b49e9b2
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 21 deletions.
10 changes: 5 additions & 5 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").Int()

dedupReplicaLabels := cmd.Flag("offline-deduplication.replica-labels", "Label to treat as a replica indicator of blocks that can be deduplicated. This will merge multiple replica blocks into one. This process is irrevertable. Experminteal").
Hidden().String()
dedupReplicaLabels := cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+
"Experimental. When it is set true, this will given labels from blocks so that vertical compaction could merge blocks.").
Hidden().Strings()

selectorRelabelConf := regSelectorRelabelFlags(cmd)

Expand Down Expand Up @@ -173,7 +174,7 @@ func runCompact(
maxCompactionLevel int,
blockSyncConcurrency int,
concurrency int,
dedupReplicaLabels string,
dedupReplicaLabels []string,
selectorRelabelConf *extflag.PathOrContent,
) error {
halted := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Expand Down Expand Up @@ -246,13 +247,12 @@ func runCompact(

duplicateBlocksFilter := block.NewDeduplicateFilter()
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
replicaLabelFilter := block.ReplicaLabelsFilter{ReplicaLabels: strings.Split(dedupReplicaLabels, ",")}

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
duplicateBlocksFilter.Filter,
replicaLabelFilter.Filter,
block.NewReplicaLabelRemover(dedupReplicaLabels).Modify,
)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
Expand Down
28 changes: 18 additions & 10 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ const (
failedMeta = "failed"

// Filter's label values.
labelExcludedMeta = "label-excluded"
timeExcludedMeta = "time-excluded"
tooFreshMeta = "too-fresh"
duplicateMeta = "duplicate"
replicaExclude = "replica-exclude"
labelExcludedMeta = "label-excluded"
timeExcludedMeta = "time-excluded"
tooFreshMeta = "too-fresh"
duplicateMeta = "duplicate"
replicaRemovedMeta = "replica-removed"
)

func newSyncMetrics(reg prometheus.Registerer) *syncMetrics {
Expand Down Expand Up @@ -89,6 +89,7 @@ func newSyncMetrics(reg prometheus.Registerer) *syncMetrics {
[]string{labelExcludedMeta},
[]string{timeExcludedMeta},
[]string{duplicateMeta},
[]string{replicaRemovedMeta},
)
return &m
}
Expand Down Expand Up @@ -527,17 +528,24 @@ func contains(s1 []ulid.ULID, s2 []ulid.ULID) bool {
return true
}

type ReplicaLabelsFilter struct {
ReplicaLabels []string
// ReplicaLabelRemover is a MetaFetcher modifier modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it.
type ReplicaLabelRemover struct {
replicaLabels []string
}

func (f *ReplicaLabelsFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, view bool) {
// NewReplicaLabelRemover creates a ReplicaLabelRemover.
func NewReplicaLabelRemover(replicaLabels []string) *ReplicaLabelRemover {
return &ReplicaLabelRemover{replicaLabels: replicaLabels}
}

// Modify modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it.
func (m *ReplicaLabelRemover) Modify(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, view bool) {
for u, meta := range metas {
labels := meta.Thanos.Labels
for _, replicaLabel := range f.ReplicaLabels {
for _, replicaLabel := range m.replicaLabels {
if _, exists := labels[replicaLabel]; exists {
delete(labels, replicaLabel)
synced.WithLabelValues(replicaExclude).Inc()
synced.WithLabelValues(replicaRemovedMeta).Inc()
}
}
metas[u].Thanos.Labels = labels
Expand Down
6 changes: 3 additions & 3 deletions pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ func TestDeduplicateFilter_Filter(t *testing.T) {
}

func TestReplicaLabel_Filter(t *testing.T) {
rf := ReplicaLabelsFilter{ReplicaLabels: []string{"replica", "rule_replica"}}
rm := NewReplicaLabelRemover([]string{"replica", "rule_replica"})

input := map[ulid.ULID]*metadata.Meta{
ULID(1): {Thanos: metadata.Thanos{Labels: map[string]string{"message": "something"}}},
Expand All @@ -865,9 +865,9 @@ func TestReplicaLabel_Filter(t *testing.T) {
}

synced := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"})
rf.Filter(input, synced, false)
rm.Modify(input, synced, false)

testutil.Equals(t, 5.0, promtest.ToFloat64(synced.WithLabelValues(replicaExclude)))
testutil.Equals(t, 5.0, promtest.ToFloat64(synced.WithLabelValues(replicaRemovedMeta)))
testutil.Equals(t, expected, input)
}

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestCompact(t *testing.T) {
Type: client.S3,
Config: s3Config,
})

testutil.Ok(t, err)
testutil.Ok(t, s.StartAndWaitReady(compact))

}
2 changes: 0 additions & 2 deletions test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,6 @@ func NewCompact(sharedDir string, name string, bucketConfig client.BucketConfig)
return nil, errors.Wrapf(err, "generate compact config file: %v", bucketConfig)
}

fmt.Println(string(bktConfigBytes))

return NewService(
fmt.Sprintf("compact-%s", name),
DefaultImage(),
Expand Down

0 comments on commit b49e9b2

Please sign in to comment.