Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove incompleteView field from fetcher response. #2455

Merged
merged 1 commit into from
Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ type MetadataFetcher interface {
}

type MetadataFilter interface {
Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, incompleteView bool) error
Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error
}

type MetadataModifier interface {
Modify(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, modified *extprom.TxGaugeVec, incompleteView bool) error
Modify(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, modified *extprom.TxGaugeVec) error
}

// BaseFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state.
Expand Down Expand Up @@ -282,8 +282,6 @@ type response struct {

noMetas float64
corruptedMetas float64

incompleteView bool
}

func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
Expand Down Expand Up @@ -429,14 +427,14 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filter

for _, filter := range filters {
// NOTE: filter can update synced metric accordingly to the reason of the exclude.
if err := filter.Filter(ctx, metas, metrics.synced, resp.incompleteView); err != nil {
if err := filter.Filter(ctx, metas, metrics.synced); err != nil {
return nil, nil, errors.Wrap(err, "filter metas")
}
}

for _, m := range modifiers {
// NOTE: modifier can update modified metric accordingly to the reason of the modification.
if err := m.Modify(ctx, metas, metrics.modified, resp.incompleteView); err != nil {
if err := m.Modify(ctx, metas, metrics.modified); err != nil {
return nil, nil, errors.Wrap(err, "modify metas")
}
}
Expand Down Expand Up @@ -497,7 +495,7 @@ func NewTimePartitionMetaFilter(MinTime, MaxTime model.TimeOrDurationValue) *Tim
}

// Filter filters out blocks that are outside of specified time range.
func (f *TimePartitionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error {
func (f *TimePartitionMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error {
for id, m := range metas {
if m.MaxTime >= f.minTime.PrometheusTimestamp() && m.MinTime <= f.maxTime.PrometheusTimestamp() {
continue
Expand Down Expand Up @@ -525,7 +523,7 @@ func NewLabelShardedMetaFilter(relabelConfig []*relabel.Config) *LabelShardedMet
const blockIDLabel = "__block_id"

// Filter filters out blocks that have no labels after relabelling of each block external (Thanos) labels.
func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error {
func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error {
var lbls labels.Labels
for id, m := range metas {
lbls = lbls[:0]
Expand Down Expand Up @@ -557,7 +555,7 @@ func NewDeduplicateFilter() *DeduplicateFilter {

// Filter filters out duplicate blocks that can be formed
// from two or more overlapping blocks that fully submatches the source blocks of the older blocks.
func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error {
func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error {
var wg sync.WaitGroup

metasByResolution := make(map[int64][]*metadata.Meta)
Expand Down Expand Up @@ -673,7 +671,7 @@ func NewReplicaLabelRemover(logger log.Logger, replicaLabels []string) *ReplicaL
}

// Modify modifies external labels of existing blocks, it removes given replica labels from the metadata of blocks that have it.
func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*metadata.Meta, modified *extprom.TxGaugeVec, _ bool) error {
func (r *ReplicaLabelRemover) Modify(_ context.Context, metas map[ulid.ULID]*metadata.Meta, modified *extprom.TxGaugeVec) error {
for u, meta := range metas {
l := meta.Thanos.Labels
for _, replicaLabel := range r.replicaLabels {
Expand Down Expand Up @@ -714,7 +712,7 @@ func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Dura
}

// Filter filters out blocks that filters blocks that have are created before a specified consistency delay.
func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error {
func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error {
for id, meta := range metas {
// TODO(khyatisoneji): Remove the checks about Thanos Source
// by implementing delete delay to fetch metas.
Expand Down Expand Up @@ -760,7 +758,7 @@ func (f *IgnoreDeletionMarkFilter) DeletionMarkBlocks() map[ulid.ULID]*metadata.

// Filter filters out blocks that are marked for deletion after a given delay.
// It also returns the blocks that can be deleted since they were uploaded delay duration before current time.
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) error {
func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error {
f.deletionMarkMap = make(map[ulid.ULID]*metadata.DeletionMark)

for id := range metas {
Expand Down
18 changes: 9 additions & 9 deletions pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type ulidFilter struct {
ulidToDelete *ulid.ULID
}

func (f *ulidFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, incompleteView bool) error {
func (f *ulidFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error {
if _, ok := metas[*f.ulidToDelete]; ok {
synced.WithLabelValues("filtered").Inc()
delete(metas, *f.ulidToDelete)
Expand Down Expand Up @@ -355,7 +355,7 @@ func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) {
}

m := newTestFetcherMetrics()
testutil.Ok(t, f.Filter(ctx, input, m.synced, false))
testutil.Ok(t, f.Filter(ctx, input, m.synced))

testutil.Equals(t, 3.0, promtest.ToFloat64(m.synced.WithLabelValues(labelExcludedMeta)))
testutil.Equals(t, expected, input)
Expand Down Expand Up @@ -453,7 +453,7 @@ func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) {
deleted := len(input) - len(expected)

m := newTestFetcherMetrics()
testutil.Ok(t, f.Filter(ctx, input, m.synced, false))
testutil.Ok(t, f.Filter(ctx, input, m.synced))

testutil.Equals(t, expected, input)
testutil.Equals(t, float64(deleted), promtest.ToFloat64(m.synced.WithLabelValues(labelExcludedMeta)))
Expand Down Expand Up @@ -517,7 +517,7 @@ func TestTimePartitionMetaFilter_Filter(t *testing.T) {
}

m := newTestFetcherMetrics()
testutil.Ok(t, f.Filter(ctx, input, m.synced, false))
testutil.Ok(t, f.Filter(ctx, input, m.synced))

testutil.Equals(t, 2.0, promtest.ToFloat64(m.synced.WithLabelValues(timeExcludedMeta)))
testutil.Equals(t, expected, input)
Expand Down Expand Up @@ -868,7 +868,7 @@ func TestDeduplicateFilter_Filter(t *testing.T) {
},
}
}
testutil.Ok(t, f.Filter(ctx, metas, m.synced, false))
testutil.Ok(t, f.Filter(ctx, metas, m.synced))
compareSliceWithMapKeys(t, metas, tcase.expected)
testutil.Equals(t, float64(inputLen-len(tcase.expected)), promtest.ToFloat64(m.synced.WithLabelValues(duplicateMeta)))
}); !ok {
Expand Down Expand Up @@ -920,7 +920,7 @@ func TestReplicaLabelRemover_Modify(t *testing.T) {
},
} {
m := newTestFetcherMetrics()
testutil.Ok(t, rm.Modify(ctx, tcase.input, m.modified, false))
testutil.Ok(t, rm.Modify(ctx, tcase.input, m.modified))

testutil.Equals(t, tcase.modified, promtest.ToFloat64(m.modified.WithLabelValues(replicaRemovedMeta)))
testutil.Equals(t, tcase.expected, tcase.input)
Expand Down Expand Up @@ -1026,7 +1026,7 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) {
f := NewConsistencyDelayMetaFilter(nil, 0*time.Second, reg)
testutil.Equals(t, map[string]float64{"consistency_delay_seconds": 0.0}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds"))

testutil.Ok(t, f.Filter(ctx, input, m.synced, false))
testutil.Ok(t, f.Filter(ctx, input, m.synced))
testutil.Equals(t, 0.0, promtest.ToFloat64(m.synced.WithLabelValues(tooFreshMeta)))
testutil.Equals(t, expected, input)
})
Expand All @@ -1051,7 +1051,7 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) {
f := NewConsistencyDelayMetaFilter(nil, 30*time.Minute, reg)
testutil.Equals(t, map[string]float64{"consistency_delay_seconds": (30 * time.Minute).Seconds()}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds"))

testutil.Ok(t, f.Filter(ctx, input, m.synced, false))
testutil.Ok(t, f.Filter(ctx, input, m.synced))
testutil.Equals(t, float64(len(u.created)-len(expected)), promtest.ToFloat64(m.synced.WithLabelValues(tooFreshMeta)))
testutil.Equals(t, expected, input)
})
Expand Down Expand Up @@ -1104,7 +1104,7 @@ func TestIgnoreDeletionMarkFilter_Filter(t *testing.T) {
}

m := newTestFetcherMetrics()
testutil.Ok(t, f.Filter(ctx, input, m.synced, false))
testutil.Ok(t, f.Filter(ctx, input, m.synced))
testutil.Equals(t, 1.0, promtest.ToFloat64(m.synced.WithLabelValues(markedForDeletionMeta)))
testutil.Equals(t, expected, input)
})
Expand Down