Skip to content

Commit

Permalink
Separate filters and modifiers
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun committed Mar 23, 2020
1 parent dc291cf commit c2a58ac
Show file tree
Hide file tree
Showing 15 changed files with 95 additions and 72 deletions.
8 changes: 4 additions & 4 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func registerBucketVerify(m map[string]setupFunc, root *kingpin.CmdClause, name
issues = append(issues, issueFn)
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func registerBucketLs(m map[string]setupFunc, root *kingpin.CmdClause, name stri
return err
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -289,7 +289,7 @@ func registerBucketInspect(m map[string]setupFunc, root *kingpin.CmdClause, name
return err
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -458,7 +458,7 @@ func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, durati
return errors.Wrap(err, "bucket client")
}

fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
fetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}
Expand Down
13 changes: 7 additions & 6 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,13 @@ func runCompact(
duplicateBlocksFilter := block.NewDeduplicateFilter()
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
ignoreDeletionMarkFilter.Filter,
duplicateBlocksFilter.Filter,
block.NewReplicaLabelRemover(logger, dedupReplicaLabels).Modify,
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer, []block.MetadataFilter{
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer),
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
},
block.NewReplicaLabelRemover(logger, dedupReplicaLabels),
)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func RunDownsample(
return err
}

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg))
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), nil)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestCleanupIndexCacheFolder(t *testing.T) {
Name: metricIndexGenerateName,
Help: metricIndexGenerateHelp,
})
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil)
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil)
testutil.Ok(t, err)

testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, dir))
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos))))
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil)
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil)
testutil.Ok(t, err)

testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dir))
Expand Down
14 changes: 7 additions & 7 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,13 @@ func runStore(

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, ignoreDeletionMarksDelay)
prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg)
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer,
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter,
ignoreDeletionMarkFilter.Filter,
block.NewDeduplicateFilter().Filter,
)
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer),
ignoreDeletionMarkFilter,
block.NewDeduplicateFilter(),
})
if err != nil {
return errors.Wrap(err, "meta fetcher")
}
Expand Down
51 changes: 33 additions & 18 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,13 @@ type MetadataFetcher interface {
Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error)
}

type MetaFetcherFilter func(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, incompleteView bool) error
type MetadataFilter interface {
Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, gauge *extprom.TxGaugeVec, incompleteView bool) error
}

type MetadataModifier interface {
Modify(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, gauge *extprom.TxGaugeVec, incompleteView bool) error
}

// MetaFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state.
// Not go-routine safe.
Expand All @@ -135,13 +141,14 @@ type MetaFetcher struct {
cacheDir string
metrics *fetcherMetrics

filters []MetaFetcherFilter
filters []MetadataFilter
modifiers []MetadataModifier

cached map[ulid.ULID]*metadata.Meta
}

// NewMetaFetcher constructs MetaFetcher.
func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReader, dir string, r prometheus.Registerer, filters ...MetaFetcherFilter) (*MetaFetcher, error) {
func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReader, dir string, r prometheus.Registerer, filters []MetadataFilter, modifiers ...MetadataModifier) (*MetaFetcher, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -161,6 +168,7 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReade
cacheDir: cacheDir,
metrics: newFetcherMetrics(r),
filters: filters,
modifiers: modifiers,
cached: map[ulid.ULID]*metadata.Meta{},
}, nil
}
Expand Down Expand Up @@ -368,11 +376,18 @@ func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.

for _, f := range s.filters {
// NOTE: filter can update synced metric accordingly to the reason of the exclude.
if err := f(ctx, metas, s.metrics, incompleteView); err != nil {
if err := f.Filter(ctx, metas, s.metrics.synced, incompleteView); err != nil {
return nil, nil, errors.Wrap(err, "filter metas")
}
}

for _, m := range s.modifiers {
// NOTE: modifier can update modified metric accordingly to the reason of the modification.
if err := m.Modify(ctx, metas, s.metrics.modified, incompleteView); err != nil {
return nil, nil, errors.Wrap(err, "modify metas")
}
}

s.metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas)))
s.metrics.submit()

Expand All @@ -384,7 +399,7 @@ func (s *MetaFetcher) Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.
return metas, partial, nil
}

var _ MetaFetcherFilter = (&TimePartitionMetaFilter{}).Filter
var _ MetadataFilter = &TimePartitionMetaFilter{}

// TimePartitionMetaFilter is a MetaFetcher filter that filters out blocks that are outside of specified time range.
// Not go-routine safe.
Expand All @@ -398,18 +413,18 @@ func NewTimePartitionMetaFilter(MinTime, MaxTime model.TimeOrDurationValue) *Tim
}

// Filter filters out blocks that are outside of specified time range.
func (f *TimePartitionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, _ bool) error {
func (f *TimePartitionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error {
for id, m := range metas {
if m.MaxTime >= f.minTime.PrometheusTimestamp() && m.MinTime <= f.maxTime.PrometheusTimestamp() {
continue
}
metrics.synced.WithLabelValues(timeExcludedMeta).Inc()
synced.WithLabelValues(timeExcludedMeta).Inc()
delete(metas, id)
}
return nil
}

var _ MetaFetcherFilter = (&LabelShardedMetaFilter{}).Filter
var _ MetadataFilter = &LabelShardedMetaFilter{}

// LabelShardedMetaFilter represents struct that allows sharding.
// Not go-routine safe.
Expand All @@ -426,7 +441,7 @@ func NewLabelShardedMetaFilter(relabelConfig []*relabel.Config) *LabelShardedMet
const blockIDLabel = "__block_id"

// Filter filters out blocks that have no labels after relabelling of each block external (Thanos) labels.
func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, _ bool) error {
func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error {
var lbls labels.Labels
for id, m := range metas {
lbls = lbls[:0]
Expand All @@ -436,7 +451,7 @@ func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*
}

if processedLabels := relabel.Process(lbls, f.relabelConfig...); len(processedLabels) == 0 {
metrics.synced.WithLabelValues(labelExcludedMeta).Inc()
synced.WithLabelValues(labelExcludedMeta).Inc()
delete(metas, id)
}
}
Expand All @@ -456,7 +471,7 @@ func NewDeduplicateFilter() *DeduplicateFilter {

// Filter filters out duplicate blocks that can be formed
// from two or more overlapping blocks that fully submatches the source blocks of the older blocks.
func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, _ bool) error {
func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error {
var wg sync.WaitGroup

metasByResolution := make(map[int64][]*metadata.Meta)
Expand All @@ -473,7 +488,7 @@ func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metad
BlockMeta: tsdb.BlockMeta{
ULID: ulid.MustNew(uint64(0), nil),
},
}), metasByResolution[res], metas, res, metrics.synced)
}), metasByResolution[res], metas, res, synced)
}(res)
}

Expand Down Expand Up @@ -570,14 +585,14 @@ func NewReplicaLabelRemover(logger log.Logger, replicaLabels []string) *ReplicaL
}

// Modify modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it.
func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, view bool) error {
func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*metadata.Meta, modified *extprom.TxGaugeVec, view bool) error {
for u, meta := range metas {
labels := meta.Thanos.Labels
for _, replicaLabel := range r.replicaLabels {
if _, exists := labels[replicaLabel]; exists {
level.Debug(r.logger).Log("msg", "replica label removed", "label", replicaLabel)
delete(labels, replicaLabel)
metrics.modified.WithLabelValues(replicaRemovedMeta).Inc()
modified.WithLabelValues(replicaRemovedMeta).Inc()
}
}
metas[u].Thanos.Labels = labels
Expand Down Expand Up @@ -611,7 +626,7 @@ func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Dura
}

// Filter filters out blocks that filters blocks that have are created before a specified consistency delay.
func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, _ bool) error {
func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error {
for id, meta := range metas {
// TODO(khyatisoneji): Remove the checks about Thanos Source
// by implementing delete delay to fetch metas.
Expand All @@ -622,7 +637,7 @@ func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.UL
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
metrics.synced.WithLabelValues(tooFreshMeta).Inc()
synced.WithLabelValues(tooFreshMeta).Inc()
delete(metas, id)
}
}
Expand Down Expand Up @@ -657,7 +672,7 @@ func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata.

// Filter filters out blocks that are marked for deletion after a given delay.
// It also returns the blocks that can be deleted since they were uploaded delay duration before current time.
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, m *fetcherMetrics, _ bool) error {
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error {
f.deletionMarkMap = make(map[ulid.ULID]*metadata.DeletionMark)

for id := range metas {
Expand All @@ -674,7 +689,7 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL
}
f.deletionMarkMap[id] = deletionMark
if time.Since(time.Unix(deletionMark.DeletionTime, 0)).Seconds() > f.delay.Seconds() {
m.synced.WithLabelValues(markedForDeletionMeta).Inc()
synced.WithLabelValues(markedForDeletionMeta).Inc()
delete(metas, id)
}
}
Expand Down
36 changes: 22 additions & 14 deletions pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ func newTestFetcherMetrics() *fetcherMetrics {
}
}

type ulidFilter struct {
ulidToDelete *ulid.ULID
}

func (f *ulidFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, incompleteView bool) error {
if _, ok := metas[*f.ulidToDelete]; ok {
synced.WithLabelValues("filtered").Inc()
delete(metas, *f.ulidToDelete)
}
return nil
}

func ULID(i int) ulid.ULID { return ulid.MustNew(uint64(i), nil) }

func ULIDs(is ...int) []ulid.ULID {
Expand All @@ -63,12 +75,8 @@ func TestMetaFetcher_Fetch(t *testing.T) {

var ulidToDelete ulid.ULID
r := prometheus.NewRegistry()
f, err := NewMetaFetcher(log.NewNopLogger(), 20, bkt, dir, r, func(_ context.Context, metas map[ulid.ULID]*metadata.Meta, metrics *fetcherMetrics, incompleteView bool) error {
if _, ok := metas[ulidToDelete]; ok {
metrics.synced.WithLabelValues("filtered").Inc()
delete(metas, ulidToDelete)
}
return nil
f, err := NewMetaFetcher(log.NewNopLogger(), 20, bkt, dir, r, []MetadataFilter{
&ulidFilter{ulidToDelete: &ulidToDelete},
})
testutil.Ok(t, err)

Expand Down Expand Up @@ -344,7 +352,7 @@ func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) {
}

m := newTestFetcherMetrics()
testutil.Ok(t, f.Filter(ctx, input, m, false))
testutil.Ok(t, f.Filter(ctx, input, m.synced, false))

testutil.Equals(t, 3.0, promtest.ToFloat64(m.synced.WithLabelValues(labelExcludedMeta)))
testutil.Equals(t, expected, input)
Expand Down Expand Up @@ -442,7 +450,7 @@ func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) {
deleted := len(input) - len(expected)

m := newTestFetcherMetrics()
testutil.Ok(t, f.Filter(ctx, input, m, false))
testutil.Ok(t, f.Filter(ctx, input, m.synced, false))

testutil.Equals(t, expected, input)
testutil.Equals(t, float64(deleted), promtest.ToFloat64(m.synced.WithLabelValues(labelExcludedMeta)))
Expand Down Expand Up @@ -506,7 +514,7 @@ func TestTimePartitionMetaFilter_Filter(t *testing.T) {
}

m := newTestFetcherMetrics()
testutil.Ok(t, f.Filter(ctx, input, m, false))
testutil.Ok(t, f.Filter(ctx, input, m.synced, false))

testutil.Equals(t, 2.0, promtest.ToFloat64(m.synced.WithLabelValues(timeExcludedMeta)))
testutil.Equals(t, expected, input)
Expand Down Expand Up @@ -857,7 +865,7 @@ func TestDeduplicateFilter_Filter(t *testing.T) {
},
}
}
testutil.Ok(t, f.Filter(ctx, metas, m, false))
testutil.Ok(t, f.Filter(ctx, metas, m.synced, false))
compareSliceWithMapKeys(t, metas, tcase.expected)
testutil.Equals(t, float64(inputLen-len(tcase.expected)), promtest.ToFloat64(m.synced.WithLabelValues(duplicateMeta)))
}); !ok {
Expand Down Expand Up @@ -909,7 +917,7 @@ func TestReplicaLabelRemover_Modify(t *testing.T) {
},
} {
m := newTestFetcherMetrics()
testutil.Ok(t, rm.Modify(ctx, tcase.input, m, false))
testutil.Ok(t, rm.Modify(ctx, tcase.input, m.modified, false))

testutil.Equals(t, tcase.modified, promtest.ToFloat64(m.modified.WithLabelValues(replicaRemovedMeta)))
testutil.Equals(t, tcase.expected, tcase.input)
Expand Down Expand Up @@ -1015,7 +1023,7 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) {
f := NewConsistencyDelayMetaFilter(nil, 0*time.Second, reg)
testutil.Equals(t, map[string]float64{"consistency_delay_seconds": 0.0}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds"))

testutil.Ok(t, f.Filter(ctx, input, m, false))
testutil.Ok(t, f.Filter(ctx, input, m.synced, false))
testutil.Equals(t, 0.0, promtest.ToFloat64(m.synced.WithLabelValues(tooFreshMeta)))
testutil.Equals(t, expected, input)
})
Expand All @@ -1040,7 +1048,7 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) {
f := NewConsistencyDelayMetaFilter(nil, 30*time.Minute, reg)
testutil.Equals(t, map[string]float64{"consistency_delay_seconds": (30 * time.Minute).Seconds()}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds"))

testutil.Ok(t, f.Filter(ctx, input, m, false))
testutil.Ok(t, f.Filter(ctx, input, m.synced, false))
testutil.Equals(t, float64(len(u.created)-len(expected)), promtest.ToFloat64(m.synced.WithLabelValues(tooFreshMeta)))
testutil.Equals(t, expected, input)
})
Expand Down Expand Up @@ -1093,7 +1101,7 @@ func TestIgnoreDeletionMarkFilter_Filter(t *testing.T) {
}

m := newTestFetcherMetrics()
testutil.Ok(t, f.Filter(ctx, input, m, false))
testutil.Ok(t, f.Filter(ctx, input, m.synced, false))
testutil.Equals(t, 1.0, promtest.ToFloat64(m.synced.WithLabelValues(markedForDeletionMeta)))
testutil.Equals(t, expected, input)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/compact/clean_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestBestEffortCleanAbortedPartialUploads(t *testing.T) {
bkt := inmem.NewBucket()
logger := log.NewNopLogger()

metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil)
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil)
testutil.Ok(t, err)

// 1. No meta, old block, should be removed.
Expand Down
Loading

0 comments on commit c2a58ac

Please sign in to comment.