Skip to content

Commit

Permalink
pkg/store: Expose metric about empty stream responses in proxy store
Browse files Browse the repository at this point in the history
Signed-off-by: Frederic Branczyk <fbranczyk@gmail.com>
  • Loading branch information
brancz committed Jan 22, 2020
1 parent dbfe6f0 commit 10e2f6e
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func runQuery(
dialOpts,
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
queryableCreator = query.NewQueryableCreator(logger, proxy)
engine = promql.NewEngine(
promql.EngineOpts{
Expand Down
36 changes: 35 additions & 1 deletion pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -49,12 +50,34 @@ type ProxyStore struct {
selectorLabels labels.Labels

responseTimeout time.Duration
metrics *proxyStoreMetrics
}

type proxyStoreMetrics struct {
emptyStreamResponses prometheus.Counter
}

func newProxyStoreMetrics(reg prometheus.Registerer) *proxyStoreMetrics {
var m proxyStoreMetrics

m.emptyStreamResponses = prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_proxy_store_empty_stream_responses_total",
Help: "Total number of empty responses received.",
})

if reg != nil {
reg.MustRegister(
m.emptyStreamResponses,
)
}
return &m
}

// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
// Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL).
func NewProxyStore(
logger log.Logger,
reg prometheus.Registerer,
stores func() []Client,
component component.StoreAPI,
selectorLabels labels.Labels,
Expand All @@ -64,12 +87,14 @@ func NewProxyStore(
logger = log.NewNopLogger()
}

metrics := newProxyStoreMetrics(reg)
s := &ProxyStore{
logger: logger,
stores: stores,
component: component,
selectorLabels: selectorLabels,
responseTimeout: responseTimeout,
metrics: metrics,
}
return s
}
Expand Down Expand Up @@ -260,7 +285,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
// Schedule streamSeriesSet that translates gRPC streamed response
// into seriesSet (if series) or respCh if warnings.
seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries,
wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout))
wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, s.metrics.emptyStreamResponses))
}

level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))
Expand Down Expand Up @@ -330,6 +355,7 @@ func startStreamSeriesSet(
name string,
partialResponse bool,
responseTimeout time.Duration,
emptyStreamResponses prometheus.Counter,
) *streamSeriesSet {
s := &streamSeriesSet{
ctx: ctx,
Expand All @@ -348,6 +374,12 @@ func startStreamSeriesSet(
defer wg.Done()
defer close(s.recvCh)

numResponses := 0
defer func() {
if numResponses == 0 {
emptyStreamResponses.Inc()
}
}()
for {
r, err := s.stream.Recv()

Expand All @@ -368,6 +400,8 @@ func startStreamSeriesSet(
return
}

numResponses++

if w := r.GetWarning(); w != "" {
s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w)))
continue
Expand Down

0 comments on commit 10e2f6e

Please sign in to comment.