diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index da99c68fe36..712038b5154 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -124,6 +124,7 @@ func registerQuery(app *extkingpin.App) { queryReplicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter. Data includes time series, recording rules, and alerting rules."). Strings() + queryPartitionLabels := cmd.Flag("query.partition-label", "Labels that partition the leaf queriers. This is used to scope down the labelsets of leaf queriers when using the distributed query mode. If set, these labels must form a partition of the leaf queriers. Partition labels must not intersect with replica labels. Every TSDB of a leaf querier must have these labels. This is useful when there are multiple external labels that are irrelevant for the partition as it allows the distributed engine to ignore them for some optimizations.").Strings() instantDefaultMaxSourceResolution := extkingpin.ModelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden()) @@ -326,6 +327,7 @@ func registerQuery(app *extkingpin.App) { time.Duration(*storeResponseTimeout), *queryConnMetricLabels, *queryReplicaLabels, + *queryPartitionLabels, selectorLset, getFlagsMap(cmd.Flags()), *endpoints, @@ -407,6 +409,7 @@ func runQuery( storeResponseTimeout time.Duration, queryConnMetricLabels []string, queryReplicaLabels []string, + queryPartitionLabels []string, selectorLset labels.Labels, flagsMap map[string]string, endpointAddrs []string, @@ -682,6 +685,7 @@ func runQuery( remoteEngineEndpoints = query.NewRemoteEndpoints(logger, endpoints.GetQueryAPIClients, query.Opts{ AutoDownsample: enableAutodownsampling, ReplicaLabels: queryReplicaLabels, + PartitionLabels: queryPartitionLabels, Timeout: queryTimeout, EnablePartialResponse: enableQueryPartialResponse, }) diff --git a/pkg/query/remote_engine.go b/pkg/query/remote_engine.go index 9ac078d9f34..5fa5fbde451 100644 --- a/pkg/query/remote_engine.go +++ b/pkg/query/remote_engine.go @@ -35,6 +35,7 @@ import ( type Opts struct { AutoDownsample bool ReplicaLabels []string + PartitionLabels []string Timeout time.Duration EnablePartialResponse bool } @@ -118,7 +119,7 @@ func (r *remoteEngine) MinT() int64 { hashBuf = make([]byte, 0, 128) highestMintByLabelSet = make(map[uint64]int64) ) - for _, lset := range r.infosWithoutReplicaLabels() { + for _, lset := range r.adjustedInfos() { key, _ := labelpb.LabelpbLabelsToPromLabels(lset.Labels.Labels).HashWithoutLabels(hashBuf) lsetMinT, ok := highestMintByLabelSet[key] if !ok { @@ -152,16 +153,22 @@ func (r *remoteEngine) MaxT() int64 { func (r *remoteEngine) LabelSets() []labels.Labels { r.labelSetsOnce.Do(func() { - r.labelSets = r.infosWithoutReplicaLabels().LabelSets() + r.labelSets = r.adjustedInfos().LabelSets() }) return r.labelSets } -func (r *remoteEngine) infosWithoutReplicaLabels() infopb.TSDBInfos { +// adjustedInfos strips out replica labels and scopes the remaining labels +// onto the partition labels if they are set. +func (r *remoteEngine) adjustedInfos() infopb.TSDBInfos { replicaLabelSet := make(map[string]struct{}) for _, lbl := range r.opts.ReplicaLabels { replicaLabelSet[lbl] = struct{}{} } + partitionLabelsSet := make(map[string]struct{}) + for _, lbl := range r.opts.PartitionLabels { + partitionLabelsSet[lbl] = struct{}{} + } // Strip replica labels from the result. infos := make(infopb.TSDBInfos, 0, len(r.client.tsdbInfos)) @@ -172,6 +179,9 @@ func (r *remoteEngine) infosWithoutReplicaLabels() infopb.TSDBInfos { if _, ok := replicaLabelSet[lbl.Name]; ok { continue } + if _, ok := partitionLabelsSet[lbl.Name]; !ok && len(partitionLabelsSet) > 0 { + continue + } builder.Add(lbl.Name, lbl.Value) } infos = append(infos, infopb.NewTSDBInfo( diff --git a/pkg/query/remote_engine_test.go b/pkg/query/remote_engine_test.go index c0e46e217da..38964693416 100644 --- a/pkg/query/remote_engine_test.go +++ b/pkg/query/remote_engine_test.go @@ -62,10 +62,11 @@ func TestRemoteEngine_Warnings(t *testing.T) { func TestRemoteEngine_LabelSets(t *testing.T) { tests := []struct { - name string - tsdbInfos []*infopb.TSDBInfo - replicaLabels []string - expected []labels.Labels + name string + tsdbInfos []*infopb.TSDBInfo + replicaLabels []string + partitionLabels []string + expected []labels.Labels }{ { name: "empty label sets", @@ -103,13 +104,24 @@ func TestRemoteEngine_LabelSets(t *testing.T) { replicaLabels: []string{"a", "b"}, expected: []labels.Labels{labels.FromStrings("c", "2")}, }, + { + name: "non-empty label sets with partition labels", + tsdbInfos: []*infopb.TSDBInfo{ + { + Labels: labelSetFromStrings("a", "1", "c", "2"), + }, + }, + partitionLabels: []string{"a"}, + expected: []labels.Labels{labels.FromStrings("a", "1")}, + }, } for _, testCase := range tests { t.Run(testCase.name, func(t *testing.T) { client := NewClient(nil, "", testCase.tsdbInfos) engine := NewRemoteEngine(log.NewNopLogger(), client, Opts{ - ReplicaLabels: testCase.replicaLabels, + ReplicaLabels: testCase.replicaLabels, + PartitionLabels: testCase.partitionLabels, }) testutil.Equals(t, testCase.expected, engine.LabelSets())