diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 595186c939e..fa86a4c79db 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -233,16 +233,12 @@ func runReceive( level.Debug(logger).Log("msg", "setting up receive http handler") { - ctx, cancel := context.WithCancel(context.Background()) g.Add( func() error { - if err := webHandler.Run(ctx); err != nil { - return fmt.Errorf("error starting web server: %s", err) - } - return nil + return errors.Wrap(webHandler.Run(), "error starting web server") }, func(err error) { - cancel() + webHandler.Close() }, ) } diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 2ca1f14ce1a..c96cde5d763 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -1,7 +1,6 @@ package receive import ( - "context" "fmt" "io/ioutil" stdlog "log" @@ -16,7 +15,6 @@ import ( "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/store/prompb" conntrack "github.com/mwitkow/go-conntrack" - "github.com/oklog/run" "github.com/opentracing-contrib/go-stdlib/nethttp" opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" @@ -59,7 +57,7 @@ type Handler struct { receiver *Writer router *route.Router options *Options - quitCh chan struct{} + listener net.Listener ready uint32 // ready is uint32 rather than boolean to be able to use atomic functions. } @@ -86,7 +84,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler { readyStorage: o.ReadyStorage, receiver: o.Receiver, options: o, - quitCh: make(chan struct{}), } readyf := h.testReady @@ -122,9 +119,9 @@ func (h *Handler) testReady(f http.HandlerFunc) http.HandlerFunc { } } -// Quit returns the receive-only quit channel. -func (h *Handler) Quit() <-chan struct{} { - return h.quitCh +// Close stops the Handler. +func (h *Handler) Close() { + runutil.CloseWithLogOnErr(h.logger, h.listener, "receive HTTP listener") } // Checks if server is ready, calls f if it is, returns 503 if it is not. @@ -133,16 +130,17 @@ func (h *Handler) testReadyHandler(f http.Handler) http.HandlerFunc { } // Run serves the HTTP endpoints. -func (h *Handler) Run(ctx context.Context) error { +func (h *Handler) Run() error { level.Info(h.logger).Log("msg", "Start listening for connections", "address", h.options.ListenAddress) - listener, err := net.Listen("tcp", h.options.ListenAddress) + var err error + h.listener, err = net.Listen("tcp", h.options.ListenAddress) if err != nil { return err } // Monitor incoming connections with conntrack. - listener = conntrack.NewListener(listener, + h.listener = conntrack.NewListener(h.listener, conntrack.TrackWithName("http"), conntrack.TrackWithTracing()) @@ -159,14 +157,7 @@ func (h *Handler) Run(ctx context.Context) error { ErrorLog: errlog, } - var g run.Group - g.Add(func() error { - return httpSrv.Serve(listener) - }, func(error) { - runutil.CloseWithLogOnErr(h.logger, listener, "receive HTTP listener") - }) - - return g.Run() + return httpSrv.Serve(h.listener) } func (h *Handler) receive(w http.ResponseWriter, req *http.Request) {