Skip to content

Commit

Permalink
Wait hashring to be ready
Browse files Browse the repository at this point in the history
  • Loading branch information
kakkoyun committed Sep 19, 2019
1 parent d075eb9 commit 105037a
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ func runReceive(
ReplicationFactor: replicationFactor,
})

statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))

// Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completed.
dbOpen := make(chan struct{})
Expand Down Expand Up @@ -202,6 +204,7 @@ func runReceive(
)
}

hashringReady := make(chan struct{})
level.Debug(logger).Log("msg", "setting up hashring")
{
updates := make(chan receive.Hashring)
Expand Down Expand Up @@ -231,15 +234,20 @@ func runReceive(
func() error {
select {
case h := <-updates:
close(hashringReady)
webHandler.Hashring(h)
statusProber.SetReady()
case <-cancel:
close(hashringReady)
return nil
}
select {
// If any new hashring is received, then mark the handler as unready, but keep it alive.
case <-updates:
msg := "hashring has changed; server is not ready to receive web requests."
webHandler.Hashring(nil)
level.Info(logger).Log("msg", "hashring has changed; server is not ready to receive web requests.")
statusProber.SetNotReady(errors.New(msg))
level.Info(logger).Log("msg", msg)
case <-cancel:
return nil
}
Expand All @@ -253,7 +261,6 @@ func runReceive(
}

level.Debug(logger).Log("msg", "setting up http server")
statusProber := prober.NewProber(comp, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := scheduleHTTPServer(g, logger, reg, statusProber, httpBindAddr, nil, comp); err != nil {
return errors.Wrap(err, "schedule HTTP server with probes")
Expand Down Expand Up @@ -283,8 +290,9 @@ func runReceive(
}
s := newStoreGRPCServer(logger, reg, tracer, tsdbStore, opts)

// Wait hashring to be ready before start serving metrics
<-hashringReady
level.Info(logger).Log("msg", "listening for StoreAPI gRPC", "address", grpcBindAddr)
statusProber.SetReady()
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
if s != nil {
Expand Down

0 comments on commit 105037a

Please sign in to comment.