Skip to content

Commit

Permalink
pkg/receive: remove deadlock on interrupt (thanos-io#1231)
Browse files Browse the repository at this point in the history
Currently, the receive component blocks indefinitely when the Thanos
process is interrupted because the given context is not actually used
to stop the server.
  • Loading branch information
squat authored and FUSAKLA committed Jun 8, 2019
1 parent cdb8957 commit cb6af07
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 24 deletions.
8 changes: 2 additions & 6 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,16 +233,12 @@ func runReceive(

level.Debug(logger).Log("msg", "setting up receive http handler")
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(
func() error {
if err := webHandler.Run(ctx); err != nil {
return fmt.Errorf("error starting web server: %s", err)
}
return nil
return errors.Wrap(webHandler.Run(), "error starting web server")
},
func(err error) {
cancel()
webHandler.Close()
},
)
}
Expand Down
27 changes: 9 additions & 18 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package receive

import (
"context"
"fmt"
"io/ioutil"
stdlog "log"
Expand All @@ -16,7 +15,6 @@ import (
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/store/prompb"
conntrack "github.com/mwitkow/go-conntrack"
"github.com/oklog/run"
"github.com/opentracing-contrib/go-stdlib/nethttp"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -59,7 +57,7 @@ type Handler struct {
receiver *Writer
router *route.Router
options *Options
quitCh chan struct{}
listener net.Listener

ready uint32 // ready is uint32 rather than boolean to be able to use atomic functions.
}
Expand All @@ -86,7 +84,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
readyStorage: o.ReadyStorage,
receiver: o.Receiver,
options: o,
quitCh: make(chan struct{}),
}

readyf := h.testReady
Expand Down Expand Up @@ -122,9 +119,9 @@ func (h *Handler) testReady(f http.HandlerFunc) http.HandlerFunc {
}
}

// Quit returns the receive-only quit channel.
func (h *Handler) Quit() <-chan struct{} {
return h.quitCh
// Close stops the Handler.
func (h *Handler) Close() {
runutil.CloseWithLogOnErr(h.logger, h.listener, "receive HTTP listener")
}

// Checks if server is ready, calls f if it is, returns 503 if it is not.
Expand All @@ -133,16 +130,17 @@ func (h *Handler) testReadyHandler(f http.Handler) http.HandlerFunc {
}

// Run serves the HTTP endpoints.
func (h *Handler) Run(ctx context.Context) error {
func (h *Handler) Run() error {
level.Info(h.logger).Log("msg", "Start listening for connections", "address", h.options.ListenAddress)

listener, err := net.Listen("tcp", h.options.ListenAddress)
var err error
h.listener, err = net.Listen("tcp", h.options.ListenAddress)
if err != nil {
return err
}

// Monitor incoming connections with conntrack.
listener = conntrack.NewListener(listener,
h.listener = conntrack.NewListener(h.listener,
conntrack.TrackWithName("http"),
conntrack.TrackWithTracing())

Expand All @@ -159,14 +157,7 @@ func (h *Handler) Run(ctx context.Context) error {
ErrorLog: errlog,
}

var g run.Group
g.Add(func() error {
return httpSrv.Serve(listener)
}, func(error) {
runutil.CloseWithLogOnErr(h.logger, listener, "receive HTTP listener")
})

return g.Run()
return httpSrv.Serve(h.listener)
}

func (h *Handler) receive(w http.ResponseWriter, req *http.Request) {
Expand Down

0 comments on commit cb6af07

Please sign in to comment.