Skip to content

Commit

Permalink
pkg/store: Expose metric about empty stream responses in proxy store
Browse files Browse the repository at this point in the history
Signed-off-by: Frederic Branczyk <fbranczyk@gmail.com>
  • Loading branch information
brancz committed Jan 23, 2020
1 parent dbfe6f0 commit 63c2d53
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#1970](https://github.com/thanos-io/thanos/issues/1970) *breaking* Receive: Use gRPC for forwarding requests between peers. Note that existing values for the `--receive.local-endpoint` flag and the endpoints in the hashring configuration file must now specify the receive gRPC port and must be updated to be a simple `host:port` combination, e.g. `127.0.0.1:10901`, rather than a full HTTP URL, e.g. `http://127.0.0.1:10902/api/v1/receive`.
- [#1939](https://github.com/thanos-io/thanos/pull/1939) Ruler: Add TLS and authentication support for query endpoints with the `--query.config` and `--query.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information.
- [#1982](https://github.com/thanos-io/thanos/pull/1982) Ruler: Add support for Alertmanager v2 API endpoints.
- #2030 Query: Add `thanos_proxy_store_empty_stream_responses_total` metric for number of empty responses from stores.

### Changed

Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func runQuery(
dialOpts,
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, stores.Get, component.Query, selectorLset, storeResponseTimeout)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
queryableCreator = query.NewQueryableCreator(logger, proxy)
engine = promql.NewEngine(
promql.EngineOpts{
Expand Down
36 changes: 35 additions & 1 deletion pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -49,12 +50,34 @@ type ProxyStore struct {
selectorLabels labels.Labels

responseTimeout time.Duration
metrics *proxyStoreMetrics
}

type proxyStoreMetrics struct {
emptyStreamResponses prometheus.Counter
}

func newProxyStoreMetrics(reg prometheus.Registerer) *proxyStoreMetrics {
var m proxyStoreMetrics

m.emptyStreamResponses = prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_proxy_store_empty_stream_responses_total",
Help: "Total number of empty responses received.",
})

if reg != nil {
reg.MustRegister(
m.emptyStreamResponses,
)
}
return &m
}

// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
// Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL).
func NewProxyStore(
logger log.Logger,
reg prometheus.Registerer,
stores func() []Client,
component component.StoreAPI,
selectorLabels labels.Labels,
Expand All @@ -64,12 +87,14 @@ func NewProxyStore(
logger = log.NewNopLogger()
}

metrics := newProxyStoreMetrics(reg)
s := &ProxyStore{
logger: logger,
stores: stores,
component: component,
selectorLabels: selectorLabels,
responseTimeout: responseTimeout,
metrics: metrics,
}
return s
}
Expand Down Expand Up @@ -260,7 +285,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
// Schedule streamSeriesSet that translates gRPC streamed response
// into seriesSet (if series) or respCh if warnings.
seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, s.logger, closeSeries,
wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout))
wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, s.metrics.emptyStreamResponses))
}

level.Debug(s.logger).Log("msg", strings.Join(storeDebugMsgs, ";"))
Expand Down Expand Up @@ -330,6 +355,7 @@ func startStreamSeriesSet(
name string,
partialResponse bool,
responseTimeout time.Duration,
emptyStreamResponses prometheus.Counter,
) *streamSeriesSet {
s := &streamSeriesSet{
ctx: ctx,
Expand All @@ -348,6 +374,12 @@ func startStreamSeriesSet(
defer wg.Done()
defer close(s.recvCh)

numResponses := 0
defer func() {
if numResponses == 0 {
emptyStreamResponses.Inc()
}
}()
for {
r, err := s.stream.Recv()

Expand All @@ -368,6 +400,8 @@ func startStreamSeriesSet(
return
}

numResponses++

if w := r.GetWarning(); w != "" {
s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w)))
continue
Expand Down
7 changes: 7 additions & 0 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestProxyStore_Info(t *testing.T) {
defer cancel()

q := NewProxyStore(nil,
nil,
func() []Client { return nil },
component.Query,
nil, 0*time.Second,
Expand Down Expand Up @@ -438,6 +439,7 @@ func TestProxyStore_Series(t *testing.T) {

if ok := t.Run(tc.title, func(t *testing.T) {
q := NewProxyStore(nil,
nil,
func() []Client { return tc.storeAPIs },
component.Query,
tc.selectorLabels,
Expand Down Expand Up @@ -560,6 +562,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
} {
if ok := t.Run(tc.title, func(t *testing.T) {
q := NewProxyStore(nil,
nil,
func() []Client { return tc.storeAPIs },
component.Query,
tc.selectorLabels,
Expand Down Expand Up @@ -602,6 +605,7 @@ func TestProxyStore_Series_RequestParamsProxied(t *testing.T) {
},
}
q := NewProxyStore(nil,
nil,
func() []Client { return cls },
component.Query,
nil,
Expand Down Expand Up @@ -661,6 +665,7 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) {
}

q := NewProxyStore(nil,
nil,
func() []Client { return cls },
component.Query,
labels.FromStrings("fed", "a"),
Expand Down Expand Up @@ -699,6 +704,7 @@ func TestProxyStore_LabelValues(t *testing.T) {
}},
}
q := NewProxyStore(nil,
nil,
func() []Client { return cls },
component.Query,
nil,
Expand Down Expand Up @@ -801,6 +807,7 @@ func TestProxyStore_LabelNames(t *testing.T) {
} {
if ok := t.Run(tc.title, func(t *testing.T) {
q := NewProxyStore(
nil,
nil,
func() []Client { return tc.storeAPIs },
component.Query,
Expand Down

0 comments on commit 63c2d53

Please sign in to comment.