Skip to content

Commit

Permalink
query: add partition labels flag
Browse files Browse the repository at this point in the history
The distributed engine decides when to push down certain operations by
checking if the external labels are still present, i.e. we can push down
a binary operation if its vector matching includes all external labels.
This is great but if you have multiple external labels that are
irrelevant for the partition this is problematic since query authors
must be aware of those irrelevant labels and must incorporate them into
their queries.
This PR attempts to solve that by giving an option to focus on the
labels that are relevant for the partition.

Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
  • Loading branch information
MichaHoffmann committed Sep 11, 2024
1 parent 97710f4 commit b356e9b
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 8 deletions.
4 changes: 4 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func registerQuery(app *extkingpin.App) {

queryReplicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter. Data includes time series, recording rules, and alerting rules.").
Strings()
queryPartitionLabels := cmd.Flag("query.partition-label", "Labels that partition the leaf queriers. This is used to scope down the labelsets of leaf queriers when using the distributed query mode. If set, these labels must form a partition of the leaf queriers. Partition labels must not intersect with replica labels. Every TSDB of a leaf querier must have these labels. This is useful when there are multiple external labels that are irrelevant for the partition as it allows the distributed engine to ignore them for some optimizations.").Strings()

instantDefaultMaxSourceResolution := extkingpin.ModelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())

Expand Down Expand Up @@ -326,6 +327,7 @@ func registerQuery(app *extkingpin.App) {
time.Duration(*storeResponseTimeout),
*queryConnMetricLabels,
*queryReplicaLabels,
*queryPartitionLabels,
selectorLset,
getFlagsMap(cmd.Flags()),
*endpoints,
Expand Down Expand Up @@ -407,6 +409,7 @@ func runQuery(
storeResponseTimeout time.Duration,
queryConnMetricLabels []string,
queryReplicaLabels []string,
queryPartitionLabels []string,
selectorLset labels.Labels,
flagsMap map[string]string,
endpointAddrs []string,
Expand Down Expand Up @@ -682,6 +685,7 @@ func runQuery(
remoteEngineEndpoints = query.NewRemoteEndpoints(logger, endpoints.GetQueryAPIClients, query.Opts{
AutoDownsample: enableAutodownsampling,
ReplicaLabels: queryReplicaLabels,
PartitionLabels: queryPartitionLabels,
Timeout: queryTimeout,
EnablePartialResponse: enableQueryPartialResponse,
})
Expand Down
16 changes: 13 additions & 3 deletions pkg/query/remote_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
type Opts struct {
AutoDownsample bool
ReplicaLabels []string
PartitionLabels []string
Timeout time.Duration
EnablePartialResponse bool
}
Expand Down Expand Up @@ -118,7 +119,7 @@ func (r *remoteEngine) MinT() int64 {
hashBuf = make([]byte, 0, 128)
highestMintByLabelSet = make(map[uint64]int64)
)
for _, lset := range r.infosWithoutReplicaLabels() {
for _, lset := range r.adjustedInfos() {
key, _ := labelpb.LabelpbLabelsToPromLabels(lset.Labels.Labels).HashWithoutLabels(hashBuf)
lsetMinT, ok := highestMintByLabelSet[key]
if !ok {
Expand Down Expand Up @@ -152,16 +153,22 @@ func (r *remoteEngine) MaxT() int64 {

func (r *remoteEngine) LabelSets() []labels.Labels {
r.labelSetsOnce.Do(func() {
r.labelSets = r.infosWithoutReplicaLabels().LabelSets()
r.labelSets = r.adjustedInfos().LabelSets()
})
return r.labelSets
}

func (r *remoteEngine) infosWithoutReplicaLabels() infopb.TSDBInfos {
// adjustedInfos strips out replica labels and scopes the remaining labels
// onto the partition labels if they are set.
func (r *remoteEngine) adjustedInfos() infopb.TSDBInfos {
replicaLabelSet := make(map[string]struct{})
for _, lbl := range r.opts.ReplicaLabels {
replicaLabelSet[lbl] = struct{}{}
}
partitionLabelsSet := make(map[string]struct{})
for _, lbl := range r.opts.PartitionLabels {
partitionLabelsSet[lbl] = struct{}{}
}

// Strip replica labels from the result.
infos := make(infopb.TSDBInfos, 0, len(r.client.tsdbInfos))
Expand All @@ -172,6 +179,9 @@ func (r *remoteEngine) infosWithoutReplicaLabels() infopb.TSDBInfos {
if _, ok := replicaLabelSet[lbl.Name]; ok {
continue
}
if _, ok := partitionLabelsSet[lbl.Name]; !ok && len(partitionLabelsSet) > 0 {
continue
}
builder.Add(lbl.Name, lbl.Value)
}
infos = append(infos, infopb.NewTSDBInfo(
Expand Down
22 changes: 17 additions & 5 deletions pkg/query/remote_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ func TestRemoteEngine_Warnings(t *testing.T) {

func TestRemoteEngine_LabelSets(t *testing.T) {
tests := []struct {
name string
tsdbInfos []*infopb.TSDBInfo
replicaLabels []string
expected []labels.Labels
name string
tsdbInfos []*infopb.TSDBInfo
replicaLabels []string
partitionLabels []string
expected []labels.Labels
}{
{
name: "empty label sets",
Expand Down Expand Up @@ -103,13 +104,24 @@ func TestRemoteEngine_LabelSets(t *testing.T) {
replicaLabels: []string{"a", "b"},
expected: []labels.Labels{labels.FromStrings("c", "2")},
},
{
name: "non-empty label sets with partition labels",
tsdbInfos: []*infopb.TSDBInfo{
{
Labels: labelSetFromStrings("a", "1", "c", "2"),
},
},
partitionLabels: []string{"a"},
expected: []labels.Labels{labels.FromStrings("a", "1")},
},
}

for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
client := NewClient(nil, "", testCase.tsdbInfos)
engine := NewRemoteEngine(log.NewNopLogger(), client, Opts{
ReplicaLabels: testCase.replicaLabels,
ReplicaLabels: testCase.replicaLabels,
PartitionLabels: testCase.partitionLabels,
})

testutil.Equals(t, testCase.expected, engine.LabelSets())
Expand Down

0 comments on commit b356e9b

Please sign in to comment.