diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 349e9bf2eac..b2edceceef2 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -235,7 +235,7 @@ func runQuery( dialOpts, unhealthyStoreTimeout, ) - proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout) + proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout) queryableCreator = query.NewQueryableCreator(logger, proxy) engine = promql.NewEngine( promql.EngineOpts{ diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 574c5e0e4b1..b38f2c952b0 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -15,6 +15,7 @@ import ( grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -49,12 +50,34 @@ type ProxyStore struct { selectorLabels labels.Labels responseTimeout time.Duration + metrics *proxyStoreMetrics +} + +type proxyStoreMetrics struct { + emptyStreamResponses prometheus.Counter +} + +func newProxyStoreMetrics(reg prometheus.Registerer) *proxyStoreMetrics { + var m proxyStoreMetrics + + m.emptyStreamResponses = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_proxy_store_empty_stream_responses_total", + Help: "Total number of empty responses received.", + }) + + if reg != nil { + reg.MustRegister( + m.emptyStreamResponses, + ) + } + return &m } // NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client. // Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL). func NewProxyStore( logger log.Logger, + reg prometheus.Registerer, stores func() []Client, component component.StoreAPI, selectorLabels labels.Labels, @@ -64,12 +87,14 @@ func NewProxyStore( logger = log.NewNopLogger() } + metrics := newProxyStoreMetrics(reg) s := &ProxyStore{ logger: logger, stores: stores, component: component, selectorLabels: selectorLabels, responseTimeout: responseTimeout, + metrics: metrics, } return s } @@ -260,7 +285,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe // Schedule streamSeriesSet that translates gRPC streamed response // into seriesSet (if series) or respCh if warnings. seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries, - wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout)) + wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, s.metrics.emptyStreamResponses)) } level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";")) @@ -330,6 +355,7 @@ func startStreamSeriesSet( name string, partialResponse bool, responseTimeout time.Duration, + emptyStreamResponses prometheus.Counter, ) *streamSeriesSet { s := &streamSeriesSet{ ctx: ctx, @@ -348,6 +374,12 @@ func startStreamSeriesSet( defer wg.Done() defer close(s.recvCh) + numResponses := 0 + defer func() { + if numResponses == 0 { + emptyStreamResponses.Inc() + } + }() for { r, err := s.stream.Recv() @@ -368,6 +400,8 @@ func startStreamSeriesSet( return } + numResponses++ + if w := r.GetWarning(); w != "" { s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w))) continue