Skip to content

Commit

Permalink
query: Add readiness probe to query (#1534)
Browse files Browse the repository at this point in the history
* Add prober for querier

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>

* Fix minor issues

Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun authored and bwplotka committed Sep 18, 2019
1 parent b7739b7 commit a07e91e
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 38 deletions.
4 changes: 2 additions & 2 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ func runCompact(

statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := defaultHTTPListener(g, logger, reg, httpBindAddr, statusProber); err != nil {
return errors.Wrap(err, "create readiness prober")
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, component); err != nil {
return errors.Wrap(err, "create default HTTP server with readiness prober")
}

confContentYaml, err := objStoreConfig.Content()
Expand Down
18 changes: 11 additions & 7 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/version"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -74,7 +75,7 @@ func main() {
cmds := map[string]setupFunc{}
registerSidecar(cmds, app)
registerStore(cmds, app, "store")
registerQuery(cmds, app, "query")
registerQuery(cmds, app)
registerRule(cmds, app, "rule")
registerCompact(cmds, app)
registerBucket(cmds, app, "bucket")
Expand Down Expand Up @@ -332,7 +333,7 @@ func newStoreGRPCServer(logger log.Logger, reg *prometheus.Registry, tracer open
return s
}

// TODO Remove once all components are migrated to the new defaultHTTPListener.
// TODO Remove once all components are migrated to the new scheduleHTTPServer.
// metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics.
func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string) error {
mux := http.NewServeMux()
Expand All @@ -353,26 +354,29 @@ func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Regi
return nil
}

// defaultHTTPListener starts a run.Group that servers HTTP endpoint with default endpoints providing Prometheus metrics,
// scheduleHTTPServer starts a run.Group that servers HTTP endpoint with default endpoints providing Prometheus metrics,
// profiling and liveness/readiness probes.
func defaultHTTPListener(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string, readinessProber *prober.Prober) error {
func scheduleHTTPServer(g *run.Group, logger log.Logger, reg *prometheus.Registry, readinessProber *prober.Prober, httpBindAddr string, handler http.Handler, comp component.Component) error {
mux := http.NewServeMux()
registerMetrics(mux, reg)
registerProfile(mux)
readinessProber.RegisterInMux(mux)
if handler != nil {
mux.Handle("/", handler)
}

l, err := net.Listen("tcp", httpBindAddr)
if err != nil {
return errors.Wrap(err, "listen metrics address")
}

g.Add(func() error {
level.Info(logger).Log("msg", "listening for metrics", "address", httpBindAddr)
level.Info(logger).Log("msg", "listening for requests and metrics", "component", comp.String(), "address", httpBindAddr)
readinessProber.SetHealthy()
return errors.Wrap(http.Serve(l, mux), "serve metrics")
return errors.Wrapf(http.Serve(l, mux), "serve %s and metrics", comp.String())
}, func(err error) {
readinessProber.SetNotHealthy(err)
runutil.CloseWithLogOnErr(logger, l, "metric listener")
runutil.CloseWithLogOnErr(logger, l, "%s and metric listener", comp.String())
})
return nil
}
42 changes: 15 additions & 27 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/query"
v1 "github.com/thanos-io/thanos/pkg/query/api"
"github.com/thanos-io/thanos/pkg/runutil"
Expand All @@ -42,8 +43,9 @@ import (
)

// registerQuery registers a query command.
func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes")
func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
comp := component.Query
cmd := app.Command(comp.String(), "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes")

grpcBindAddr, httpBindAddr, srvCert, srvKey, srvClientCA := regCommonServerFlags(cmd)

Expand Down Expand Up @@ -99,7 +101,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string

storeResponseTimeout := modelDuration(cmd.Flag("store.response-timeout", "If a Store doesn't send any data in this specified duration then a Store will be ignored and partial data will be returned if it's enabled. 0 disables timeout.").Default("0ms"))

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
selectorLset, err := parseFlagLabels(*selectorLabels)
if err != nil {
return errors.Wrap(err, "parse federation labels")
Expand Down Expand Up @@ -156,6 +158,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
*dnsSDResolver,
time.Duration(*unhealthyStoreTimeout),
time.Duration(*instantDefaultMaxSourceResolution),
component.Query,
)
}
}
Expand Down Expand Up @@ -274,6 +277,7 @@ func runQuery(
dnsSDResolver string,
unhealthyStoreTimeout time.Duration,
instantDefaultMaxSourceResolution time.Duration,
comp component.Component,
) error {
// TODO(bplotka in PR #513 review): Move arguments into struct.
duplicatedStores := prometheus.NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -385,6 +389,8 @@ func runQuery(
})
}
// Start query API + UI HTTP server.

statusProber := prober.NewProber(comp, logger, reg)
{
router := route.New()

Expand All @@ -408,46 +414,28 @@ func runQuery(

api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins)

router.Get("/-/healthy", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
if _, err := fmt.Fprintf(w, "Thanos Querier is Healthy.\n"); err != nil {
level.Error(logger).Log("msg", "Could not write health check response.")
}
})

mux := http.NewServeMux()
registerMetrics(mux, reg)
registerProfile(mux)
mux.Handle("/", router)

l, err := net.Listen("tcp", httpBindAddr)
if err != nil {
return errors.Wrapf(err, "listen HTTP on address %s", httpBindAddr)
// Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, router, comp); err != nil {
return errors.Wrap(err, "create default HTTP server with readiness prober")
}

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for query and metrics", "address", httpBindAddr)
return errors.Wrap(http.Serve(l, mux), "serve query")
}, func(error) {
runutil.CloseWithLogOnErr(logger, l, "query and metric listener")
})
}
// Start query (proxy) gRPC StoreAPI.
{
l, err := net.Listen("tcp", grpcBindAddr)
if err != nil {
return errors.Wrapf(err, "listen gRPC on address")
return errors.Wrap(err, "listen gRPC on address")
}
logger := log.With(logger, "component", component.Query.String())

opts, err := defaultGRPCServerOpts(logger, srvCert, srvKey, srvClientCA)
if err != nil {
return errors.Wrapf(err, "build gRPC server")
return errors.Wrap(err, "build gRPC server")
}
s := newStoreGRPCServer(logger, reg, tracer, proxy, opts)

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr)
statusProber.SetReady()
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
s.Stop()
Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func runSidecar(

statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := defaultHTTPListener(g, logger, reg, httpBindAddr, statusProber); err != nil {
return errors.Wrap(err, "create readiness prober")
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, comp); err != nil {
return errors.Wrap(err, "create default HTTP server with readiness prober")
}

// Setup all the concurrent groups.
Expand Down
4 changes: 4 additions & 0 deletions tutorials/kubernetes-demo/manifests/thanos-querier.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ spec:
containerPort: 10901
livenessProbe:
httpGet:
port: http
path: /-/healthy
readinessProbe:
httpGet:
port: http
path: /-/ready
---
apiVersion: v1
kind: Service
Expand Down

0 comments on commit a07e91e

Please sign in to comment.