diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index d4218c1d38..5b2e277fc6 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -169,8 +169,8 @@ func runCompact( statusProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) // Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes. - if err := defaultHTTPListener(g, logger, reg, httpBindAddr, statusProber); err != nil { - return errors.Wrap(err, "create readiness prober") + if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, component); err != nil { + return errors.Wrap(err, "create default HTTP server with readiness prober") } confContentYaml, err := objStoreConfig.Content() diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index b44a2c9fc0..123b9f6d77 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -31,6 +31,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/version" + "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -74,7 +75,7 @@ func main() { cmds := map[string]setupFunc{} registerSidecar(cmds, app) registerStore(cmds, app, "store") - registerQuery(cmds, app, "query") + registerQuery(cmds, app) registerRule(cmds, app, "rule") registerCompact(cmds, app) registerBucket(cmds, app, "bucket") @@ -332,7 +333,7 @@ func newStoreGRPCServer(logger log.Logger, reg *prometheus.Registry, tracer open return s } -// TODO Remove once all components are migrated to the new defaultHTTPListener. +// TODO Remove once all components are migrated to the new scheduleHTTPServer. // metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics. func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string) error { mux := http.NewServeMux() @@ -353,13 +354,16 @@ func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Regi return nil } -// defaultHTTPListener starts a run.Group that servers HTTP endpoint with default endpoints providing Prometheus metrics, +// scheduleHTTPServer starts a run.Group that servers HTTP endpoint with default endpoints providing Prometheus metrics, // profiling and liveness/readiness probes. -func defaultHTTPListener(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string, readinessProber *prober.Prober) error { +func scheduleHTTPServer(g *run.Group, logger log.Logger, reg *prometheus.Registry, readinessProber *prober.Prober, httpBindAddr string, handler http.Handler, comp component.Component) error { mux := http.NewServeMux() registerMetrics(mux, reg) registerProfile(mux) readinessProber.RegisterInMux(mux) + if handler != nil { + mux.Handle("/", handler) + } l, err := net.Listen("tcp", httpBindAddr) if err != nil { @@ -367,12 +371,12 @@ func defaultHTTPListener(g *run.Group, logger log.Logger, reg *prometheus.Regist } g.Add(func() error { - level.Info(logger).Log("msg", "listening for metrics", "address", httpBindAddr) + level.Info(logger).Log("msg", "listening for requests and metrics", "component", comp.String(), "address", httpBindAddr) readinessProber.SetHealthy() - return errors.Wrap(http.Serve(l, mux), "serve metrics") + return errors.Wrapf(http.Serve(l, mux), "serve %s and metrics", comp.String()) }, func(err error) { readinessProber.SetNotHealthy(err) - runutil.CloseWithLogOnErr(logger, l, "metric listener") + runutil.CloseWithLogOnErr(logger, l, "%s and metric listener", comp.String()) }) return nil } diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index b1b75dd5ae..515c57de61 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -30,6 +30,7 @@ import ( "github.com/thanos-io/thanos/pkg/discovery/dns" "github.com/thanos-io/thanos/pkg/extprom" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" + "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/query" v1 "github.com/thanos-io/thanos/pkg/query/api" "github.com/thanos-io/thanos/pkg/runutil" @@ -42,8 +43,9 @@ import ( ) // registerQuery registers a query command. -func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string) { - cmd := app.Command(name, "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes") +func registerQuery(m map[string]setupFunc, app *kingpin.Application) { + comp := component.Query + cmd := app.Command(comp.String(), "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes") grpcBindAddr, httpBindAddr, srvCert, srvKey, srvClientCA := regCommonServerFlags(cmd) @@ -99,7 +101,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string storeResponseTimeout := modelDuration(cmd.Flag("store.response-timeout", "If a Store doesn't send any data in this specified duration then a Store will be ignored and partial data will be returned if it's enabled. 0 disables timeout.").Default("0ms")) - m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { + m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { selectorLset, err := parseFlagLabels(*selectorLabels) if err != nil { return errors.Wrap(err, "parse federation labels") @@ -156,6 +158,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string *dnsSDResolver, time.Duration(*unhealthyStoreTimeout), time.Duration(*instantDefaultMaxSourceResolution), + component.Query, ) } } @@ -274,6 +277,7 @@ func runQuery( dnsSDResolver string, unhealthyStoreTimeout time.Duration, instantDefaultMaxSourceResolution time.Duration, + comp component.Component, ) error { // TODO(bplotka in PR #513 review): Move arguments into struct. duplicatedStores := prometheus.NewCounter(prometheus.CounterOpts{ @@ -385,6 +389,8 @@ func runQuery( }) } // Start query API + UI HTTP server. + + statusProber := prober.NewProber(comp, logger, reg) { router := route.New() @@ -408,46 +414,28 @@ func runQuery( api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins) - router.Get("/-/healthy", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - if _, err := fmt.Fprintf(w, "Thanos Querier is Healthy.\n"); err != nil { - level.Error(logger).Log("msg", "Could not write health check response.") - } - }) - - mux := http.NewServeMux() - registerMetrics(mux, reg) - registerProfile(mux) - mux.Handle("/", router) - - l, err := net.Listen("tcp", httpBindAddr) - if err != nil { - return errors.Wrapf(err, "listen HTTP on address %s", httpBindAddr) + // Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes. + if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, router, comp); err != nil { + return errors.Wrap(err, "create default HTTP server with readiness prober") } - - g.Add(func() error { - level.Info(logger).Log("msg", "Listening for query and metrics", "address", httpBindAddr) - return errors.Wrap(http.Serve(l, mux), "serve query") - }, func(error) { - runutil.CloseWithLogOnErr(logger, l, "query and metric listener") - }) } // Start query (proxy) gRPC StoreAPI. { l, err := net.Listen("tcp", grpcBindAddr) if err != nil { - return errors.Wrapf(err, "listen gRPC on address") + return errors.Wrap(err, "listen gRPC on address") } logger := log.With(logger, "component", component.Query.String()) opts, err := defaultGRPCServerOpts(logger, srvCert, srvKey, srvClientCA) if err != nil { - return errors.Wrapf(err, "build gRPC server") + return errors.Wrap(err, "build gRPC server") } s := newStoreGRPCServer(logger, reg, tracer, proxy, opts) g.Add(func() error { level.Info(logger).Log("msg", "Listening for StoreAPI gRPC", "address", grpcBindAddr) + statusProber.SetReady() return errors.Wrap(s.Serve(l), "serve gRPC") }, func(error) { s.Stop() diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 2a3c5afe9e..68afd666e4 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -121,8 +121,8 @@ func runSidecar( statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) // Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes. - if err := defaultHTTPListener(g, logger, reg, httpBindAddr, statusProber); err != nil { - return errors.Wrap(err, "create readiness prober") + if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, comp); err != nil { + return errors.Wrap(err, "create default HTTP server with readiness prober") } // Setup all the concurrent groups. diff --git a/tutorials/kubernetes-demo/manifests/thanos-querier.yaml b/tutorials/kubernetes-demo/manifests/thanos-querier.yaml index 5cbb3d9742..e5b50a9717 100644 --- a/tutorials/kubernetes-demo/manifests/thanos-querier.yaml +++ b/tutorials/kubernetes-demo/manifests/thanos-querier.yaml @@ -50,8 +50,12 @@ spec: containerPort: 10901 livenessProbe: httpGet: + port: http path: /-/healthy + readinessProbe: + httpGet: port: http + path: /-/ready --- apiVersion: v1 kind: Service