Skip to content

Commit

Permalink
querier, receiver, sidecar, store: Add gRPC health check endpoints (#…
Browse files Browse the repository at this point in the history
…2008)

* Add gRPC health endpoints

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Add grpc life-cycle methods

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Utilize grpc health check server

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Update CHANGELOG

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Provide more structured way to probe

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Fix linted errors

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Remove white noise

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Add dedicated probe for instrumentation

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Retest

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun authored and bwplotka committed Jan 28, 2020
1 parent b36b4b9 commit c7869d9
Show file tree
Hide file tree
Showing 17 changed files with 360 additions and 191 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#1970](https://github.com/thanos-io/thanos/issues/1970) *breaking* Receive: Use gRPC for forwarding requests between peers. Note that existing values for the `--receive.local-endpoint` flag and the endpoints in the hashring configuration file must now specify the receive gRPC port and must be updated to be a simple `host:port` combination, e.g. `127.0.0.1:10901`, rather than a full HTTP URL, e.g. `http://127.0.0.1:10902/api/v1/receive`.
- [#1939](https://github.com/thanos-io/thanos/pull/1939) Ruler: Add TLS and authentication support for query endpoints with the `--query.config` and `--query.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information.
- [#1982](https://github.com/thanos-io/thanos/pull/1982) Ruler: Add support for Alertmanager v2 API endpoints.
- #2030 Query: Add `thanos_proxy_store_empty_stream_responses_total` metric for number of empty responses from stores.
- [#2030](https://github.com/thanos-io/thanos/pull/2030) Query: Add `thanos_proxy_store_empty_stream_responses_total` metric for number of empty responses from stores.
- [#2049](https://github.com/thanos-io/thanos/pull/2049) Tracing: Support sampling on Elastic APM with new sample_rate setting.
- [#2008](https://github.com/thanos-io/thanos/pull/2008) Querier, Receiver, Sidecar, Store: Add gRPC [health check](https://github.com/grpc/grpc/blob/master/doc/health-checking.md) endpoints.

### Changed

Expand Down
11 changes: 8 additions & 3 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,14 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
m[name+" web"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
ctx, cancel := context.WithCancel(context.Background())

statusProber := prober.New(component.Bucket, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := httpserver.New(logger, reg, component.Bucket, statusProber,
comp := component.Bucket
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
prober.NewInstrumentation(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)),
)

srv := httpserver.New(logger, reg, comp, httpProbe,
httpserver.WithListen(*httpBindAddr),
httpserver.WithGracePeriod(time.Duration(*httpGracePeriod)),
)
Expand Down
10 changes: 7 additions & 3 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,13 @@ func runCompact(

downsampleMetrics := newDownsampleMetrics(reg)

statusProber := prober.New(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := httpserver.New(logger, reg, component, statusProber,
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
prober.NewInstrumentation(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)),
)

srv := httpserver.New(logger, reg, component, httpProbe,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)
Expand Down
11 changes: 8 additions & 3 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,13 @@ func runDownsample(
}
}()

httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
prober.NewInstrumentation(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)),
)

metrics := newDownsampleMetrics(reg)
statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Start cycle of syncing blocks from the bucket and garbage collecting the bucket.
{
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -129,11 +134,11 @@ func runDownsample(
})
}

// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := httpserver.New(logger, reg, comp, statusProber,
srv := httpserver.New(logger, reg, comp, httpProbe,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)

g.Add(func() error {
statusProber.Healthy()

Expand Down
16 changes: 11 additions & 5 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,16 @@ func runQuery(
cancel()
})
}
// Start query API + UI HTTP server.

statusProber := prober.New(comp, logger, reg)
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
grpcProbe,
prober.NewInstrumentation(comp, logger, reg),
)

// Start query API + UI HTTP server.
{
router := route.New()

Expand All @@ -334,8 +341,7 @@ func runQuery(

api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)

// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := httpserver.New(logger, reg, comp, statusProber,
srv := httpserver.New(logger, reg, comp, httpProbe,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)
Expand All @@ -359,7 +365,7 @@ func runQuery(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, proxy,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
14 changes: 10 additions & 4 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,14 @@ func runReceive(
DialOpts: dialOpts,
})

statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
grpcProbe,
prober.NewInstrumentation(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)),
)

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
Expand Down Expand Up @@ -351,8 +358,7 @@ func runReceive(
}

level.Debug(logger).Log("msg", "setting up http server")
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := httpserver.New(logger, reg, comp, statusProber,
srv := httpserver.New(logger, reg, comp, httpProbe,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)
Expand Down Expand Up @@ -389,7 +395,7 @@ func runReceive(
WriteableStoreServer: webHandler,
}

s = grpcserver.NewReadWrite(logger, &receive.UnRegisterer{Registerer: reg}, tracer, comp, rw,
s = grpcserver.NewReadWrite(logger, &receive.UnRegisterer{Registerer: reg}, tracer, comp, grpcProbe, rw,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
15 changes: 11 additions & 4 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,15 @@ func runRule(
close(cancel)
})
}
statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))

grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
grpcProbe,
prober.NewInstrumentation(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)),
)

// Start gRPC server.
{
store := store.NewTSDBStore(logger, reg, db, component.Rule, lset)
Expand All @@ -548,7 +556,7 @@ func runRule(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, store,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down Expand Up @@ -590,8 +598,7 @@ func runRule(
api := v1.NewAPI(logger, reg, ruleMgr)
api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)

// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := httpserver.New(logger, reg, comp, statusProber,
srv := httpserver.New(logger, reg, comp, httpProbe,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)
Expand Down
14 changes: 10 additions & 4 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,15 @@ func runSidecar(
uploads = false
}

// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
statusProber := prober.New(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
srv := httpserver.New(logger, reg, comp, statusProber,
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
grpcProbe,
prober.NewInstrumentation(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)),
)

srv := httpserver.New(logger, reg, comp, httpProbe,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)
Expand Down Expand Up @@ -269,7 +275,7 @@ func runSidecar(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, promStore,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
14 changes: 10 additions & 4 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,15 @@ func runStore(
advertiseCompatibilityLabel bool,
enableIndexHeader bool,
) error {
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
statusProber := prober.New(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
srv := httpserver.New(logger, reg, component, statusProber,
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
grpcProbe,
prober.NewInstrumentation(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)),
)

srv := httpserver.New(logger, reg, component, httpProbe,
httpserver.WithListen(httpBindAddr),
httpserver.WithGracePeriod(httpGracePeriod),
)
Expand Down Expand Up @@ -278,7 +284,7 @@ func runStore(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, component, bs,
s := grpcserver.New(logger, reg, tracer, component, grpcProbe, bs,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
53 changes: 53 additions & 0 deletions pkg/prober/combiner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package prober

import "sync"

type combined struct {
mu sync.Mutex
probes []Probe
}

// Combine folds given probes into one, reflects their statuses in a thread-safe way.
func Combine(probes ...Probe) Probe {
return &combined{probes: probes}
}

// Ready sets components status to ready.
func (p *combined) Ready() {
p.mu.Lock()
defer p.mu.Unlock()

for _, probe := range p.probes {
probe.Ready()
}
}

// NotReady sets components status to not ready with given error as a cause.
func (p *combined) NotReady(err error) {
p.mu.Lock()
defer p.mu.Unlock()

for _, probe := range p.probes {
probe.NotReady(err)
}
}

// Healthy sets components status to healthy.
func (p *combined) Healthy() {
p.mu.Lock()
defer p.mu.Unlock()

for _, probe := range p.probes {
probe.Healthy()
}
}

// NotHealthy sets components status to not healthy with given error as a cause.
func (p *combined) NotHealthy(err error) {
p.mu.Lock()
defer p.mu.Unlock()

for _, probe := range p.probes {
probe.NotHealthy(err)
}
}
44 changes: 44 additions & 0 deletions pkg/prober/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package prober

import (
"google.golang.org/grpc/health"
grpc_health "google.golang.org/grpc/health/grpc_health_v1"
)

// GRPCProbe represents health and readiness status of given component, and provides GRPC integration.
type GRPCProbe struct {
h *health.Server
}

// NewGRPC creates a Probe that wrapped around grpc/healt.Server which reflects status of server.
func NewGRPC() *GRPCProbe {
h := health.NewServer()
h.SetServingStatus("", grpc_health.HealthCheckResponse_NOT_SERVING)

return &GRPCProbe{h: h}
}

// HealthServer returns a gRPC health server which responds readiness and liveness checks.
func (p *GRPCProbe) HealthServer() *health.Server {
return p.h
}

// Ready sets components status to ready.
func (p *GRPCProbe) Ready() {
p.h.SetServingStatus("", grpc_health.HealthCheckResponse_SERVING)
}

// NotReady sets components status to not ready with given error as a cause.
func (p *GRPCProbe) NotReady(err error) {
p.h.SetServingStatus("", grpc_health.HealthCheckResponse_NOT_SERVING)
}

// Healthy sets components status to healthy.
func (p *GRPCProbe) Healthy() {
p.h.Resume()
}

// NotHealthy sets components status to not healthy with given error as a cause.
func (p *GRPCProbe) NotHealthy(err error) {
p.h.Shutdown()
}
Loading

0 comments on commit c7869d9

Please sign in to comment.