Skip to content

Commit

Permalink
Store: add escape hatch to skip store resorting
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
  • Loading branch information
MichaHoffmann committed Sep 9, 2023
1 parent 882e417 commit 9628c04
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 77 deletions.
7 changes: 7 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type storeConfig struct {
reqLogConfig *extflag.PathOrContent
lazyIndexReaderEnabled bool
lazyIndexReaderIdleTimeout time.Duration
dontResortResponses bool
}

func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -182,6 +183,9 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("store.index-header-lazy-reader-idle-timeout", "If index-header lazy reader is enabled and this idle timeout setting is > 0, memory map-ed index-headers will be automatically released after 'idle timeout' inactivity.").
Hidden().Default("5m").DurationVar(&sc.lazyIndexReaderIdleTimeout)

cmd.Flag("store.dont-resort-responses", "Do not resort responses in the store. This should only be done if you are sure that the store response is sorted even after adding external labels or dropping of replica labels. It is generally unsafe and can lead to deduplication not working correctly but can improve query performance.").
Hidden().Default("false").BoolVar(&sc.dontResortResponses)

cmd.Flag("web.disable", "Disable Block Viewer UI.").Default("false").BoolVar(&sc.disableWeb)

cmd.Flag("web.external-prefix", "Static prefix for all HTML links and redirect URLs in the bucket web UI interface. Actual endpoints are still served on / or the web.route-prefix. This allows thanos bucket web UI to be served behind a reverse proxy that strips a URL sub-path.").
Expand Down Expand Up @@ -387,6 +391,9 @@ func runStore(
if conf.debugLogging {
options = append(options, store.WithDebugLogging())
}
if conf.dontResortResponses {
options = append(options, store.WithDontResortResponse())
}

bs, err := store.NewBucketStore(
insBkt,
Expand Down
71 changes: 37 additions & 34 deletions pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_EQ, Name: "n", Value: "1"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
},
},
{
Expand All @@ -251,7 +251,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_EQ, Name: "i", Value: "a"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
},
},
{
Expand All @@ -270,9 +270,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_EQ, Name: "missing", Value: ""},
},
expectedLabels: []labels.Labels{
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("n", "2", "region", "eu-west"),
labels.FromStrings("n", "2.5", "region", "eu-west"),
},
Expand All @@ -295,8 +295,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "i", Value: ".+"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
},
},
{
Expand All @@ -306,9 +306,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "i", Value: ".*"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("n", "2", "region", "eu-west"),
labels.FromStrings("n", "2.5", "region", "eu-west"),
},
Expand All @@ -332,8 +332,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_NEQ, Name: "i", Value: ""},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
},
},
{
Expand All @@ -352,8 +352,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_NEQ, Name: "i", Value: "a"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
},
},
{
Expand All @@ -363,9 +363,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "n", Value: "^1$"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
},
},
{
Expand All @@ -376,7 +376,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "i", Value: "^a$"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
},
},
{
Expand All @@ -387,8 +387,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "i", Value: "^a?$"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
},
},
{
Expand Down Expand Up @@ -422,9 +422,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "i", Value: "^.*$"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
},
},
{
Expand All @@ -435,8 +435,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "i", Value: "^.+$"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
},
},
{
Expand Down Expand Up @@ -489,8 +489,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^a$"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
},
},
{
Expand All @@ -501,7 +501,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^a?$"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
},
},
{
Expand All @@ -512,8 +512,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^$"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
},
},
{
Expand Down Expand Up @@ -545,7 +545,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_EQ, Name: "i", Value: "a"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
},
},
{
Expand All @@ -557,7 +557,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "i", Value: "^(b|a).*$"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
},
},
{
Expand All @@ -567,9 +567,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "n", Value: "(1|2)"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("n", "2", "region", "eu-west"),
},
},
Expand All @@ -580,8 +580,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "i", Value: "a|b"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
},
},
{
Expand All @@ -591,8 +591,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
{Type: storepb.LabelMatcher_RE, Name: "i", Value: "(a|b)"},
},
expectedLabels: []labels.Labels{
labels.FromStrings("n", "1", "i", "a", "region", "eu-west"),
labels.FromStrings("n", "1", "i", "b", "region", "eu-west"),
labels.FromStrings("i", "a", "n", "1", "region", "eu-west"),
labels.FromStrings("i", "b", "n", "1", "region", "eu-west"),
},
},
{
Expand Down Expand Up @@ -706,12 +706,15 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset
}
testutil.Ok(t, err)

testutil.Equals(t, true, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) bool {
return labels.Compare(x.PromLabels(), y.PromLabels()) < 0
}))

receivedLabels := make([]labels.Labels, 0)
for _, s := range srv.SeriesSet {
receivedLabels = append(receivedLabels, s.PromLabels())
}
slices.SortFunc(c.expectedLabels, func(a, b labels.Labels) bool { return labels.Compare(a, b) < 0 })
slices.SortFunc(receivedLabels, func(a, b labels.Labels) bool { return labels.Compare(a, b) < 0 })

testutil.Equals(t, c.expectedLabels, receivedLabels)
})
}
Expand Down
62 changes: 23 additions & 39 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,8 @@ type BucketStore struct {

blockEstimatedMaxSeriesFunc BlockEstimator
blockEstimatedMaxChunkFunc BlockEstimator

sortingStrategy sortingStrategy
}

func (s *BucketStore) validate() error {
Expand Down Expand Up @@ -473,6 +475,12 @@ func WithBlockEstimatedMaxChunkFunc(f BlockEstimator) BucketStoreOption {
}
}

func WithDontResortResponse() BucketStoreOption {
return func(s *BucketStore) {
s.sortingStrategy = sortingStrategyNone
}
}

// NewBucketStore creates a new bucket backed store that implements the store API against
// an object store bucket. It is optimized to work against high latency backends.
func NewBucketStore(
Expand Down Expand Up @@ -516,6 +524,7 @@ func NewBucketStore(
enableChunkHashCalculation: enableChunkHashCalculation,
seriesBatchSize: SeriesBatchSize,
labelNamesSet: stringset.AllStrings(),
sortingStrategy: sortingStrategyStore,
}

for _, option := range options {
Expand Down Expand Up @@ -1254,7 +1263,8 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill

// Series implements the storepb.StoreServer interface.
func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) {
srv := newFlushableServer(seriesSrv)
srv := newFlushableServer(seriesSrv, s.sortingStrategy)

if s.queryGate != nil {
tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) {
err = s.queryGate.Start(srv.Context())
Expand Down Expand Up @@ -1375,44 +1385,18 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
return errors.Wrapf(err, "fetch postings for block %s", blk.meta.ULID)
}

// If we have inner replica labels we need to resort.
s.mtx.Lock()
needsEagerRetrival := len(req.WithoutReplicaLabels) > 0 && s.labelNamesSet.HasAny(req.WithoutReplicaLabels)
s.mtx.Unlock()

var resp respSet
if needsEagerRetrival {
labelsToRemove := make(map[string]struct{})
for _, replicaLabel := range req.WithoutReplicaLabels {
labelsToRemove[replicaLabel] = struct{}{}
}
resp = newEagerRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
[]labels.Labels{blk.extLset},
onClose,
blockClient,
shardMatcher,
false,
s.metrics.emptyPostingCount,
labelsToRemove,
)
} else {
resp = newLazyRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
[]labels.Labels{blk.extLset},
onClose,
blockClient,
shardMatcher,
false,
s.metrics.emptyPostingCount,
)
}
resp := newLazyRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
[]labels.Labels{blk.extLset},
onClose,
blockClient,
shardMatcher,
false,
s.metrics.emptyPostingCount,
)

mtx.Lock()
respSets = append(respSets, resp)
Expand Down
4 changes: 3 additions & 1 deletion pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3432,5 +3432,7 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) {
},
}, srv))

testutil.Equals(t, 2, len(srv.SeriesSet))
testutil.Equals(t, true, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) bool {
return labels.Compare(x.PromLabels(), y.PromLabels()) < 0
}))
}
Loading

0 comments on commit 9628c04

Please sign in to comment.