Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/store: Expose metric about empty stream responses in proxy store #2030

Merged
merged 1 commit into from
Jan 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#1970](https://github.com/thanos-io/thanos/issues/1970) *breaking* Receive: Use gRPC for forwarding requests between peers. Note that existing values for the `--receive.local-endpoint` flag and the endpoints in the hashring configuration file must now specify the receive gRPC port and must be updated to be a simple `host:port` combination, e.g. `127.0.0.1:10901`, rather than a full HTTP URL, e.g. `http://127.0.0.1:10902/api/v1/receive`.
- [#1939](https://github.com/thanos-io/thanos/pull/1939) Ruler: Add TLS and authentication support for query endpoints with the `--query.config` and `--query.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information.
- [#1982](https://github.com/thanos-io/thanos/pull/1982) Ruler: Add support for Alertmanager v2 API endpoints.
- #2030 Query: Add `thanos_proxy_store_empty_stream_responses_total` metric for number of empty responses from stores.

### Changed

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func runQuery(
dialOpts,
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
queryableCreator = query.NewQueryableCreator(logger, proxy)
engine = promql.NewEngine(
promql.EngineOpts{
Expand Down
36 changes: 35 additions & 1 deletion pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -49,12 +50,34 @@ type ProxyStore struct {
selectorLabels labels.Labels

responseTimeout time.Duration
metrics *proxyStoreMetrics
}

type proxyStoreMetrics struct {
emptyStreamResponses prometheus.Counter
}

func newProxyStoreMetrics(reg prometheus.Registerer) *proxyStoreMetrics {
var m proxyStoreMetrics

m.emptyStreamResponses = prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_proxy_store_empty_stream_responses_total",
Help: "Total number of empty responses received.",
})

if reg != nil {
reg.MustRegister(
m.emptyStreamResponses,
)
}
return &m
}

// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
// Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL).
func NewProxyStore(
logger log.Logger,
reg prometheus.Registerer,
stores func() []Client,
component component.StoreAPI,
selectorLabels labels.Labels,
Expand All @@ -64,12 +87,14 @@ func NewProxyStore(
logger = log.NewNopLogger()
}

metrics := newProxyStoreMetrics(reg)
s := &ProxyStore{
logger: logger,
stores: stores,
component: component,
selectorLabels: selectorLabels,
responseTimeout: responseTimeout,
metrics: metrics,
}
return s
}
Expand Down Expand Up @@ -260,7 +285,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
// Schedule streamSeriesSet that translates gRPC streamed response
// into seriesSet (if series) or respCh if warnings.
seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries,
wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout))
wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, s.metrics.emptyStreamResponses))
}

level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))
Expand Down Expand Up @@ -330,6 +355,7 @@ func startStreamSeriesSet(
name string,
partialResponse bool,
responseTimeout time.Duration,
emptyStreamResponses prometheus.Counter,
) *streamSeriesSet {
s := &streamSeriesSet{
ctx: ctx,
Expand All @@ -348,6 +374,12 @@ func startStreamSeriesSet(
defer wg.Done()
defer close(s.recvCh)

numResponses := 0
defer func() {
if numResponses == 0 {
emptyStreamResponses.Inc()
}
}()
for {
r, err := s.stream.Recv()

Expand All @@ -368,6 +400,8 @@ func startStreamSeriesSet(
return
}

numResponses++

if w := r.GetWarning(); w != "" {
s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w)))
continue
Expand Down
7 changes: 7 additions & 0 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestProxyStore_Info(t *testing.T) {
defer cancel()

q := NewProxyStore(nil,
nil,
func() []Client { return nil },
component.Query,
nil, 0*time.Second,
Expand Down Expand Up @@ -438,6 +439,7 @@ func TestProxyStore_Series(t *testing.T) {

if ok := t.Run(tc.title, func(t *testing.T) {
q := NewProxyStore(nil,
nil,
func() []Client { return tc.storeAPIs },
component.Query,
tc.selectorLabels,
Expand Down Expand Up @@ -560,6 +562,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
} {
if ok := t.Run(tc.title, func(t *testing.T) {
q := NewProxyStore(nil,
nil,
func() []Client { return tc.storeAPIs },
component.Query,
tc.selectorLabels,
Expand Down Expand Up @@ -602,6 +605,7 @@ func TestProxyStore_Series_RequestParamsProxied(t *testing.T) {
},
}
q := NewProxyStore(nil,
nil,
func() []Client { return cls },
component.Query,
nil,
Expand Down Expand Up @@ -661,6 +665,7 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) {
}

q := NewProxyStore(nil,
nil,
func() []Client { return cls },
component.Query,
labels.FromStrings("fed", "a"),
Expand Down Expand Up @@ -699,6 +704,7 @@ func TestProxyStore_LabelValues(t *testing.T) {
}},
}
q := NewProxyStore(nil,
nil,
func() []Client { return cls },
component.Query,
nil,
Expand Down Expand Up @@ -801,6 +807,7 @@ func TestProxyStore_LabelNames(t *testing.T) {
} {
if ok := t.Run(tc.title, func(t *testing.T) {
q := NewProxyStore(
nil,
nil,
func() []Client { return tc.storeAPIs },
component.Query,
Expand Down