Skip to content

Commit

Permalink
Address review issues
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun committed Mar 19, 2020
1 parent ea102f5 commit a2dca30
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 123 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ jobs:
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}

- name: Run e2e docker-based tests.
run: make test-e2e
run: make test-e2e-ci
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
### Added

- [#2265](https://github.com/thanos-io/thanos/pull/2265) Compactor: Add `--wait-interval` to specify compaction wait interval between consecutive compact runs when `--wait` enabled.
- [#2250](https://github.com/thanos-io/thanos/pull/2250) Compactor: Enable vertical compaction for offline deduplication (Experimental). Uses `--deduplication.replica-label` flag to specify the replica label to deduplicate on (Hidden).
- [#2250](https://github.com/thanos-io/thanos/pull/2250) Compactor: Enable vertical compaction for offline deduplication (Experimental). Uses `--deduplication.replica-label` flag to specify the replica label to deduplicate on (Hidden). Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together). This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication. We plan to add a smarter algorithm in the following weeks.

### Changed

Expand Down
12 changes: 11 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,17 @@ test-local:

.PHONY: test-e2e
test-e2e: ## Runs all Thanos e2e docker-based e2e tests from test/e2e. Required access to docker daemon.
test-e2e: docker
test-e2e: docker-multi-stage
@echo ">> cleaning docker environment."
@docker system prune -f --volumes
@echo ">> cleaning e2e test garbage."
@rm -rf ./test/e2e/e2e_integration_test*
@echo ">> running /test/e2e tests."
@go test -failfast -timeout 5m -v ./test/e2e/...

.PHONY: test-e2e-ci
test-e2e-ci: ## Runs all Thanos e2e docker-based e2e tests from test/e2e, using limited resources. Required access to docker daemon.
test-e2e-ci: docker
@echo ">> cleaning docker environment."
@docker system prune -f --volumes
@echo ">> cleaning e2e test garbage."
Expand Down
6 changes: 4 additions & 2 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
Default("48h"))

dedupReplicaLabels := cmd.Flag("deduplication.replica-label", "Label to treat as a replica indicator of blocks that can be deduplicated (repeated flag). This will merge multiple replica blocks into one. This process is irreversible."+
"Experimental. When it is set true, this will given labels from blocks so that vertical compaction could merge blocks.").
"Experimental. When it is set true, this will given labels from blocks so that vertical compaction could merge blocks."+
"Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together)."+
"This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication.").
Hidden().Strings()

selectorRelabelConf := regSelectorRelabelFlags(cmd)
Expand Down Expand Up @@ -299,7 +301,7 @@ func runCompact(
enableVerticalCompaction := false
if len(dedupReplicaLabels) > 0 {
enableVerticalCompaction = true
level.Info(logger).Log("msg", "deduplication.replica-label specified, vertical compaction is enabled", "dedup-replica-labels", strings.Join(dedupReplicaLabels, ","))
level.Info(logger).Log("msg", "deduplication.replica-label specified, vertical compaction is enabled", "dedupReplicaLabels", strings.Join(dedupReplicaLabels, ","))
}

sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, blockSyncConcurrency, acceptMalformedIndex, enableVerticalCompaction)
Expand Down
81 changes: 32 additions & 49 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
)

type syncMetrics struct {
type fetcherMetrics struct {
syncs prometheus.Counter
syncFailures prometheus.Counter
syncDuration prometheus.Histogram
Expand All @@ -41,75 +41,58 @@ type syncMetrics struct {
modified *extprom.TxGaugeVec
}

func (s *syncMetrics) submit() {
if s == nil {
return
}

if s.synced != nil {
s.synced.Submit()
}

if s.modified != nil {
s.modified.Submit()
}
func (s *fetcherMetrics) submit() {
s.synced.Submit()
s.modified.Submit()
}

func (s *syncMetrics) resetTx() {
if s == nil {
return
}

if s.synced != nil {
s.synced.ResetTx()
}

if s.modified != nil {
s.modified.ResetTx()
}
func (s *fetcherMetrics) resetTx() {
s.synced.ResetTx()
s.modified.ResetTx()
}

const (
syncMetricSubSys = "blocks_meta"
fetcherSubSys = "blocks_meta"

corruptedMeta = "corrupted-meta-json"
noMeta = "no-meta-json"
loadedMeta = "loaded"
failedMeta = "failed"

// Filter's label values.
labelExcludedMeta = "label-excluded"
timeExcludedMeta = "time-excluded"
tooFreshMeta = "too-fresh"
duplicateMeta = "duplicate"
replicaRemovedMeta = "replica-label-removed"

// Fetcher's synced label values.
labelExcludedMeta = "label-excluded"
timeExcludedMeta = "time-excluded"
tooFreshMeta = "too-fresh"
duplicateMeta = "duplicate"
// Blocks that are marked for deletion can be loaded as well. This is done to make sure that we load blocks that are meant to be deleted,
// but don't have a replacement block yet.
markedForDeletionMeta = "marked-for-deletion"

// Fetcher's synced label values.
replicaRemovedMeta = "replica-label-removed"
)

func newSyncMetrics(reg prometheus.Registerer) *syncMetrics {
var m syncMetrics
func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics {
var m fetcherMetrics

m.syncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Subsystem: syncMetricSubSys,
Subsystem: fetcherSubSys,
Name: "syncs_total",
Help: "Total blocks metadata synchronization attempts",
})
m.syncFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Subsystem: syncMetricSubSys,
Subsystem: fetcherSubSys,
Name: "sync_failures_total",
Help: "Total blocks metadata synchronization failures",
})
m.syncDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Subsystem: syncMetricSubSys,
Subsystem: fetcherSubSys,
Name: "sync_duration_seconds",
Help: "Duration of the blocks metadata synchronization in seconds",
Buckets: []float64{0.01, 1, 10, 100, 1000},
})
m.synced = extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{
Subsystem: syncMetricSubSys,
Subsystem: fetcherSubSys,
Name: "synced",
Help: "Number of block metadata synced",
},
Expand All @@ -125,7 +108,7 @@ func newSyncMetrics(reg prometheus.Registerer) *syncMetrics {
[]string{markedForDeletionMeta},
)
m.modified = extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{
Subsystem: syncMetricSubSys,
Subsystem: fetcherSubSys,
Name: "modified",
Help: "Number of block metadata that modified",
},
Expand All @@ -139,7 +122,7 @@ type MetadataFetcher interface {
Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error)
}

type MetaFetcherFilter func(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, incompleteView bool) error
type MetaFetcherFilter func(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, incompleteView bool) error

// MetaFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state.
// Not go-routine safe.
Expand All @@ -150,7 +133,7 @@ type MetaFetcher struct {

// Optional local directory to cache meta.json files.
cacheDir string
metrics *syncMetrics
metrics *fetcherMetrics

filters []MetaFetcherFilter

Expand All @@ -176,7 +159,7 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReade
concurrency: concurrency,
bkt: bkt,
cacheDir: cacheDir,
metrics: newSyncMetrics(r),
metrics: newFetcherMetrics(r),
filters: filters,
cached: map[ulid.ULID]*metadata.Meta{},
}, nil
Expand Down Expand Up @@ -415,7 +398,7 @@ func NewTimePartitionMetaFilter(MinTime, MaxTime model.TimeOrDurationValue) *Tim
}

// Filter filters out blocks that are outside of specified time range.
func (f *TimePartitionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, _ bool) error {
func (f *TimePartitionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, _ bool) error {
for id, m := range metas {
if m.MaxTime >= f.minTime.PrometheusTimestamp() && m.MinTime <= f.maxTime.PrometheusTimestamp() {
continue
Expand Down Expand Up @@ -443,7 +426,7 @@ func NewLabelShardedMetaFilter(relabelConfig []*relabel.Config) *LabelShardedMet
const blockIDLabel = "__block_id"

// Filter filters out blocks that have no labels after relabelling of each block external (Thanos) labels.
func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, _ bool) error {
func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, _ bool) error {
var lbls labels.Labels
for id, m := range metas {
lbls = lbls[:0]
Expand Down Expand Up @@ -473,7 +456,7 @@ func NewDeduplicateFilter() *DeduplicateFilter {

// Filter filters out duplicate blocks that can be formed
// from two or more overlapping blocks that fully submatches the source blocks of the older blocks.
func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, _ bool) error {
func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, _ bool) error {
var wg sync.WaitGroup

metasByResolution := make(map[int64][]*metadata.Meta)
Expand Down Expand Up @@ -587,7 +570,7 @@ func NewReplicaLabelRemover(logger log.Logger, replicaLabels []string) *ReplicaL
}

// Modify modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it.
func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, view bool) error {
func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, view bool) error {
for u, meta := range metas {
labels := meta.Thanos.Labels
for _, replicaLabel := range r.replicaLabels {
Expand Down Expand Up @@ -628,7 +611,7 @@ func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Dura
}

// Filter filters out blocks that filters blocks that have are created before a specified consistency delay.
func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, _ bool) error {
func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, _ bool) error {
for id, meta := range metas {
// TODO(khyatisoneji): Remove the checks about Thanos Source
// by implementing delete delay to fetch metas.
Expand Down Expand Up @@ -674,7 +657,7 @@ func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata.

// Filter filters out blocks that are marked for deletion after a given delay.
// It also returns the blocks that can be deleted since they were uploaded delay duration before current time.
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, m *syncMetrics, _ bool) error {
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, m *fetcherMetrics, _ bool) error {
f.deletionMarkMap = make(map[ulid.ULID]*metadata.DeletionMark)

for id := range metas {
Expand Down
22 changes: 11 additions & 11 deletions pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
"gopkg.in/yaml.v2"
)

func newTestSyncMetrics() *syncMetrics {
return &syncMetrics{
func newTestFetcherMetrics() *fetcherMetrics {
return &fetcherMetrics{
synced: extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}),
modified: extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"modified"}),
}
Expand Down Expand Up @@ -63,7 +63,7 @@ func TestMetaFetcher_Fetch(t *testing.T) {

var ulidToDelete ulid.ULID
r := prometheus.NewRegistry()
f, err := NewMetaFetcher(log.NewNopLogger(), 20, bkt, dir, r, func(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *syncMetrics, incompleteView bool) error {
f, err := NewMetaFetcher(log.NewNopLogger(), 20, bkt, dir, r, func(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, incompleteView bool) error {
if _, ok := metas[ulidToDelete]; ok {
metrics.synced.WithLabelValues("filtered").Inc()
delete(metas, ulidToDelete)
Expand Down Expand Up @@ -343,7 +343,7 @@ func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) {
ULID(6): input[ULID(6)],
}

m := newTestSyncMetrics()
m := newTestFetcherMetrics()
testutil.Ok(t, f.Filter(ctx, input, m, false))

testutil.Equals(t, 3.0, promtest.ToFloat64(m.synced.WithLabelValues(labelExcludedMeta)))
Expand Down Expand Up @@ -441,7 +441,7 @@ func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) {
}
deleted := len(input) - len(expected)

m := newTestSyncMetrics()
m := newTestFetcherMetrics()
testutil.Ok(t, f.Filter(ctx, input, m, false))

testutil.Equals(t, expected, input)
Expand Down Expand Up @@ -505,7 +505,7 @@ func TestTimePartitionMetaFilter_Filter(t *testing.T) {
ULID(4): input[ULID(4)],
}

m := newTestSyncMetrics()
m := newTestFetcherMetrics()
testutil.Ok(t, f.Filter(ctx, input, m, false))

testutil.Equals(t, 2.0, promtest.ToFloat64(m.synced.WithLabelValues(timeExcludedMeta)))
Expand Down Expand Up @@ -839,7 +839,7 @@ func TestDeduplicateFilter_Filter(t *testing.T) {
} {
f := NewDeduplicateFilter()
if ok := t.Run(tcase.name, func(t *testing.T) {
m := newTestSyncMetrics()
m := newTestFetcherMetrics()
metas := make(map[ulid.ULID]*metadata.Meta)
inputLen := len(tcase.input)
for id, metaInfo := range tcase.input {
Expand Down Expand Up @@ -908,7 +908,7 @@ func TestReplicaLabelRemover_Modify(t *testing.T) {
modified: 5.0,
},
} {
m := newTestSyncMetrics()
m := newTestFetcherMetrics()
testutil.Ok(t, rm.Modify(ctx, tcase.input, m, false))

testutil.Equals(t, tcase.modified, promtest.ToFloat64(m.modified.WithLabelValues(replicaRemovedMeta)))
Expand Down Expand Up @@ -1004,7 +1004,7 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) {
}

t.Run("consistency 0 (turned off)", func(t *testing.T) {
m := newTestSyncMetrics()
m := newTestFetcherMetrics()
expected := map[ulid.ULID]*metadata.Meta{}
// Copy all.
for _, id := range u.created {
Expand All @@ -1021,7 +1021,7 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) {
})

t.Run("consistency 30m.", func(t *testing.T) {
m := newTestSyncMetrics()
m := newTestFetcherMetrics()
expected := map[ulid.ULID]*metadata.Meta{}
// Only certain sources and those with 30m or more age go through.
for i, id := range u.created {
Expand Down Expand Up @@ -1092,7 +1092,7 @@ func TestIgnoreDeletionMarkFilter_Filter(t *testing.T) {
ULID(4): {},
}

m := newTestSyncMetrics()
m := newTestFetcherMetrics()
testutil.Ok(t, f.Filter(ctx, input, m, false))
testutil.Equals(t, 1.0, promtest.ToFloat64(m.synced.WithLabelValues(markedForDeletionMeta)))
testutil.Equals(t, expected, input)
Expand Down
Loading

0 comments on commit a2dca30

Please sign in to comment.