Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query: Trim very long external labels and add cmd flag to optionally specify metric labels to collect #5785

Merged
merged 50 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
8c76dab
added unit tests for long labels and no external labels
utukJ Oct 13, 2022
c5f1536
trimmed too long external labels
utukJ Oct 13, 2022
745e729
added optional label selection
utukJ Oct 13, 2022
72ce975
added cmd flag for choosing metric labels
utukJ Oct 13, 2022
2c08e6d
updated docs
utukJ Oct 13, 2022
9fe8ea7
Update pkg/query/endpointset.go
utukJ Oct 13, 2022
2107140
Update pkg/query/endpointset.go
utukJ Oct 13, 2022
606cba5
minor fixes from code review
utukJ Oct 13, 2022
c581cc7
Merge branch 'query-labels' of https://github.com/utukj/thanos into q…
utukJ Oct 13, 2022
a07e974
fixed code comments
utukJ Oct 13, 2022
03b5289
used enum for labels
utukJ Oct 14, 2022
4b07af3
Merge branch 'main' of https://github.com/thanos-io/thanos into query…
utukJ Oct 14, 2022
be3ca53
updated query docs
utukJ Oct 14, 2022
32a0a98
cleaned up tests
utukJ Oct 18, 2022
9474c00
Updates busybox SHA (#5793)
github-actions[bot] Oct 15, 2022
24e1cc0
Receive: Reload tenant limit configuration on file change (#5673)
douglascamata Oct 17, 2022
eec4fd0
Query: add query metrics to calls going through the Store API (#5741)
douglascamata Oct 18, 2022
ea646a6
docs: mark me as shepherd for next release (#5797)
GiedriusS Oct 18, 2022
f01954d
Revert "docs: mark me as shepherd for next release (#5797)"
utukJ Oct 18, 2022
3d6dd07
Revert "Query: add query metrics to calls going through the Store API…
utukJ Oct 18, 2022
976fc7e
Revert "Receive: Reload tenant limit configuration on file change (#5…
utukJ Oct 18, 2022
a07ad4a
Revert "Updates busybox SHA (#5793)"
utukJ Oct 18, 2022
ad11a03
Updates busybox SHA (#5793)
github-actions[bot] Oct 15, 2022
32ca327
Receive: Reload tenant limit configuration on file change (#5673)
douglascamata Oct 17, 2022
7a77769
Query: add query metrics to calls going through the Store API (#5741)
douglascamata Oct 18, 2022
c509c0e
docs: mark me as shepherd for next release (#5797)
GiedriusS Oct 18, 2022
f109590
Revert "docs: mark me as shepherd for next release (#5797)"
utukJ Oct 18, 2022
5c1dd94
Revert "Updates busybox SHA (#5793)"
utukJ Oct 18, 2022
60f308c
Revert "Query: add query metrics to calls going through the Store API…
utukJ Oct 18, 2022
1dea93b
Revert "Receive: Reload tenant limit configuration on file change (#5…
utukJ Oct 18, 2022
0684e39
fixed lint issue
utukJ Oct 18, 2022
46c43d7
added unit test for truncate and clean up
utukJ Oct 18, 2022
b3f0d8c
fixed truncate label func and added more tests
utukJ Oct 20, 2022
4ce7626
removed name from truncate test
utukJ Oct 20, 2022
c8731e3
reorganized test cases and removed redundant comments
utukJ Oct 20, 2022
e38bfb8
Update pkg/query/endpointset_test.go
utukJ Oct 20, 2022
4d7175e
Update pkg/query/endpointset.go
utukJ Oct 20, 2022
56ef7cb
fixed failing checks
utukJ Oct 21, 2022
96b1545
e2e: Adding test for querier with two stores loadbalancing across them.
bwplotka Oct 21, 2022
fb38b75
Merge branch 'utuk-lb-query' of https://github.com/thanos-io/thanos i…
utukJ Oct 22, 2022
edaf16e
Update pkg/query/endpointset_test.go
utukJ Oct 25, 2022
f11cb35
Merge branch 'query-labels' of https://github.com/utukj/thanos into q…
utukJ Oct 25, 2022
9dfa81b
Merge branch 'main' of https://github.com/thanos-io/thanos into query…
utukJ Oct 25, 2022
14ee774
dumped long expected output in unittest
utukJ Oct 25, 2022
f963ac1
Revert "e2e: Adding test for querier with two stores loadbalancing ac…
utukJ Oct 25, 2022
a34f356
Update pkg/query/endpointset_test.go
utukJ Oct 26, 2022
c4e0f89
Update pkg/query/endpointset_test.go
utukJ Oct 26, 2022
c26cf74
moved label definition to endpointset
utukJ Oct 26, 2022
c68bdcd
Merge branch 'query-labels' of https://github.com/utukj/thanos into q…
utukJ Oct 26, 2022
f34506d
Merge branch 'main' into query-labels
utukJ Oct 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func registerQuery(app *extkingpin.App) {
maxConcurrentSelects := cmd.Flag("query.max-concurrent-select", "Maximum number of select requests made concurrently per a query.").
Default("4").Int()

queryConnMetricLabels := cmd.Flag("query.conn-metric.label", "Optional selection of query connection metric labels to be collected from endpoint set").
Default(string(query.ExternalLabels), string(query.StoreType)).
Enums(string(query.ExternalLabels), string(query.StoreType))

queryReplicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter. Data includes time series, recording rules, and alerting rules.").
Strings()

Expand Down Expand Up @@ -280,6 +284,7 @@ func registerQuery(app *extkingpin.App) {
*dynamicLookbackDelta,
time.Duration(*defaultEvaluationInterval),
time.Duration(*storeResponseTimeout),
*queryConnMetricLabels,
*queryReplicaLabels,
selectorLset,
getFlagsMap(cmd.Flags()),
Expand Down Expand Up @@ -355,6 +360,7 @@ func runQuery(
dynamicLookbackDelta bool,
defaultEvaluationInterval time.Duration,
storeResponseTimeout time.Duration,
queryConnMetricLabels []string,
queryReplicaLabels []string,
selectorLset labels.Labels,
flagsMap map[string]string,
Expand Down Expand Up @@ -497,6 +503,7 @@ func runQuery(
dialOpts,
unhealthyStoreTimeout,
endpointInfoTimeout,
queryConnMetricLabels...,
)
proxy = store.NewProxyStore(logger, reg, endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy))
rulesProxy = rules.NewProxy(logger, endpoints.GetRulesClients)
Expand Down
3 changes: 3 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ Flags:
--query.auto-downsampling Enable automatic adjustment (step / 5) to what
source of data should be used in store gateways
if no max_source_resolution param is specified.
--query.conn-metric.label=external_labels... ...
Optional selection of query connection metric
labels to be collected from endpoint set
--query.default-evaluation-interval=1m
Set default evaluation interval for sub
queries.
Expand Down
51 changes: 42 additions & 9 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,20 @@ const (
noMetadataEndpointMessage = "cannot obtain metadata: neither info nor store client found"
)

type queryConnMetricLabel string

const (
ExternalLabels queryConnMetricLabel = "external_labels"
StoreType queryConnMetricLabel = "store_type"
)

type GRPCEndpointSpec struct {
addr string
isStrictStatic bool
}

const externalLabelLimit = 1000

// NewGRPCEndpointSpec creates gRPC endpoint spec.
// It uses InfoAPI to get Metadata.
func NewGRPCEndpointSpec(addr string, isStrictStatic bool) *GRPCEndpointSpec {
Expand Down Expand Up @@ -183,28 +192,41 @@ type endpointSetNodeCollector struct {
storePerExtLset map[string]int

connectionsDesc *prometheus.Desc
labels []string
}

func newEndpointSetNodeCollector() *endpointSetNodeCollector {
func newEndpointSetNodeCollector(labels ...string) *endpointSetNodeCollector {
if len(labels) == 0 {
labels = []string{string(ExternalLabels), string(StoreType)}
}
return &endpointSetNodeCollector{
storeNodes: map[component.Component]map[string]int{},
connectionsDesc: prometheus.NewDesc(
"thanos_store_nodes_grpc_connections",
"Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier.",
[]string{"external_labels", "store_type"}, nil,
labels, nil,
),
labels: labels,
}
}

// truncateExtLabels truncates the stringify external labels with the format of {labels..}.
func truncateExtLabels(s string, threshold int) string {
utukJ marked this conversation as resolved.
Show resolved Hide resolved
if len(s) > threshold {
return fmt.Sprintf("%s}", s[:threshold-1])
}
return s
}
func (c *endpointSetNodeCollector) Update(nodes map[component.Component]map[string]int) {
storeNodes := make(map[component.Component]map[string]int, len(nodes))
storePerExtLset := map[string]int{}

for k, v := range nodes {
storeNodes[k] = make(map[string]int, len(v))
for kk, vv := range v {
storePerExtLset[kk] += vv
storeNodes[k][kk] = vv
for storeType, occurrencesPerExtLset := range nodes {
storeNodes[storeType] = make(map[string]int, len(occurrencesPerExtLset))
for externalLabels, occurrences := range occurrencesPerExtLset {
externalLabels = truncateExtLabels(externalLabels, externalLabelLimit)
storePerExtLset[externalLabels] += occurrences
storeNodes[storeType][externalLabels] = occurrences
}
}

Expand All @@ -228,7 +250,17 @@ func (c *endpointSetNodeCollector) Collect(ch chan<- prometheus.Metric) {
if storeType != nil {
storeTypeStr = storeType.String()
}
ch <- prometheus.MustNewConstMetric(c.connectionsDesc, prometheus.GaugeValue, float64(occurrences), externalLabels, storeTypeStr)
// Select only required labels.
lbls := []string{}
for _, lbl := range c.labels {
switch lbl {
case string(ExternalLabels):
lbls = append(lbls, externalLabels)
case string(StoreType):
lbls = append(lbls, storeTypeStr)
}
}
ch <- prometheus.MustNewConstMetric(c.connectionsDesc, prometheus.GaugeValue, float64(occurrences), lbls...)
}
}
}
Expand Down Expand Up @@ -268,8 +300,9 @@ func NewEndpointSet(
dialOpts []grpc.DialOption,
unhealthyEndpointTimeout time.Duration,
endpointInfoTimeout time.Duration,
endpointMetricLabels ...string,
) *EndpointSet {
endpointsMetric := newEndpointSetNodeCollector()
endpointsMetric := newEndpointSetNodeCollector(endpointMetricLabels...)
if reg != nil {
reg.MustRegister(endpointsMetric)
}
Expand Down
105 changes: 95 additions & 10 deletions pkg/query/endpointset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"math"
"net"
"strings"
"sync"
"testing"
"time"
Expand All @@ -23,6 +24,7 @@ import (
"google.golang.org/grpc/credentials/insecure"

"github.com/pkg/errors"
promtestutil "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand Down Expand Up @@ -269,13 +271,55 @@ func (e *testEndpoints) CloseOne(addr string) {
delete(e.srvs, addr)
}

func TestTruncateExtLabels(t *testing.T) {
utukJ marked this conversation as resolved.
Show resolved Hide resolved
const testLength = 5

for _, tc := range []struct {
labelToTruncate string
expectedOutput string
}{
{
labelToTruncate: "{abc}",
utukJ marked this conversation as resolved.
Show resolved Hide resolved
expectedOutput: "{abc}",
},
{
labelToTruncate: "{abcd}",
expectedOutput: "{abc}",
},
{
labelToTruncate: "{abcde}",
expectedOutput: "{abc}",
},
{
labelToTruncate: "{abcdef}",
expectedOutput: "{abc}",
},
{
labelToTruncate: "{abcdefghij}",
expectedOutput: "{abc}",
},
} {
t.Run(tc.labelToTruncate, func(t *testing.T) {
got := truncateExtLabels(tc.labelToTruncate, testLength)
testutil.Equals(t, tc.expectedOutput, got)
testutil.Assert(t, len(got) <= testLength)
})
}
}

func TestEndpointSetUpdate(t *testing.T) {
const metricsMeta = `
# HELP thanos_store_nodes_grpc_connections Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier.
# TYPE thanos_store_nodes_grpc_connections gauge
`
testCases := []struct {
name string
endpoints []testEndpointMeta
strict bool
name string
endpoints []testEndpointMeta
strict bool
connLabels []string

expectedEndpoints int
expectedEndpoints int
expectedConnMetrics string
}{
{
name: "available endpoint",
Expand All @@ -289,7 +333,13 @@ func TestEndpointSetUpdate(t *testing.T) {
},
},
},
connLabels: []string{"store_type"},

expectedEndpoints: 1,
expectedConnMetrics: metricsMeta +
`
thanos_store_nodes_grpc_connections{store_type="sidecar"} 1
`,
},
{
name: "unavailable endpoint",
Expand All @@ -304,7 +354,9 @@ func TestEndpointSetUpdate(t *testing.T) {
},
},
},
expectedEndpoints: 0,

expectedEndpoints: 0,
expectedConnMetrics: "",
},
{
name: "slow endpoint",
Expand All @@ -319,7 +371,9 @@ func TestEndpointSetUpdate(t *testing.T) {
},
},
},
expectedEndpoints: 0,

expectedEndpoints: 0,
expectedConnMetrics: "",
},
{
name: "strict endpoint",
Expand All @@ -334,7 +388,35 @@ func TestEndpointSetUpdate(t *testing.T) {
},
},
strict: true,
connLabels: []string{"store_type"},
expectedEndpoints: 1,
expectedConnMetrics: metricsMeta +
`
thanos_store_nodes_grpc_connections{store_type="sidecar"} 1
`,
},
{
name: "long external labels",
endpoints: []testEndpointMeta{
{
InfoResponse: sidecarInfo,
// Simulate very long external labels.
extlsetFn: func(addr string) []labelpb.ZLabelSet {
sLabel := []string{}
for i := 0; i < 1000; i++ {
sLabel = append(sLabel, "lbl")
sLabel = append(sLabel, "val")
}
return labelpb.ZLabelSetsFromPromLabels(
labels.FromStrings(sLabel...),
)
},
},
},
expectedEndpoints: 1,
expectedConnMetrics: metricsMeta + `
thanos_store_nodes_grpc_connections{external_labels="{lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val}",store_type="sidecar"} 1
`,
},
}

Expand All @@ -345,12 +427,15 @@ func TestEndpointSetUpdate(t *testing.T) {
defer endpoints.Close()

discoveredEndpointAddr := endpoints.EndpointAddresses()
endpointSet := makeEndpointSet(discoveredEndpointAddr, tc.strict, time.Now)
// Specify only "store_type" to exclude "external_labels".
endpointSet := makeEndpointSet(discoveredEndpointAddr, tc.strict, time.Now, tc.connLabels...)
defer endpointSet.Close()

endpointSet.Update(context.Background())
testutil.Equals(t, tc.expectedEndpoints, len(endpointSet.GetEndpointStatus()))
testutil.Equals(t, tc.expectedEndpoints, len(endpointSet.GetStoreClients()))

testutil.Ok(t, promtestutil.CollectAndCompare(endpointSet.endpointsMetric, strings.NewReader(tc.expectedConnMetrics)))
})
}
}
Expand Down Expand Up @@ -576,7 +661,7 @@ func TestEndpointSetUpdate_AtomicEndpointAdditions(t *testing.T) {
wg.Wait()
}

func TestEndpointSet_Update(t *testing.T) {
func TestEndpointSetUpdate_AvailabilityScenarios(t *testing.T) {
endpoints, err := startTestEndpoints([]testEndpointMeta{
{
InfoResponse: sidecarInfo,
Expand Down Expand Up @@ -1493,15 +1578,15 @@ func TestUpdateEndpointStateForgetsPreviousErrors(t *testing.T) {
testutil.Equals(t, `null`, string(b))
}

func makeEndpointSet(discoveredEndpointAddr []string, strict bool, now nowFunc) *EndpointSet {
func makeEndpointSet(discoveredEndpointAddr []string, strict bool, now nowFunc, metricLabels ...string) *EndpointSet {
endpointSet := NewEndpointSet(now, nil, nil,
func() (specs []*GRPCEndpointSpec) {
for _, addr := range discoveredEndpointAddr {
specs = append(specs, NewGRPCEndpointSpec(addr, strict))
}
return specs
},
testGRPCOpts, time.Minute, time.Second)
testGRPCOpts, time.Minute, time.Second, metricLabels...)
return endpointSet
}

Expand Down