Skip to content

Commit

Permalink
feat compact: added readiness Prober
Browse files Browse the repository at this point in the history
  • Loading branch information
FUSAKLA committed Jul 2, 2019
1 parent 3d30a90 commit 25100a8
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 15 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#1248](https://github.com/improbable-eng/thanos/pull/1248) Add a web UI to show the state of remote storage.

- [TODO]() Added `/-/ready` and `/-healthy` endpoints to Thanos compact.

### Changed

- [#1284](https://github.com/improbable-eng/thanos/pull/1284) Add support for multiple label-sets in Info gRPC service. This deprecates the single `Labels` slice of the `InfoResponse`, in a future release backward compatible handling for the single set of Labels will be removed. Upgrading to v0.6.0 or higher is advised.
Expand Down
33 changes: 19 additions & 14 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ import (
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/compact"
"github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/prober"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb"
kingpin "gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/alecthomas/kingpin.v2"
)

var (
Expand All @@ -49,7 +51,7 @@ func (cs compactionSet) String() string {
return strings.Join(result, ", ")
}

// levels returns set of compaction levels not higher than specified max compaction level
// levels returns set of compaction levels not higher than specified max compaction level.
func (cs compactionSet) levels(maxLevel int) ([]int64, error) {
if maxLevel >= len(cs) {
return nil, errors.Errorf("level is bigger then default set of %d", len(cs))
Expand All @@ -62,13 +64,13 @@ func (cs compactionSet) levels(maxLevel int) ([]int64, error) {
return levels, nil
}

// maxLevel returns max available compaction level
// maxLevel returns max available compaction level.
func (cs compactionSet) maxLevel() int {
return len(cs) - 1
}

func registerCompact(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "continuously compacts blocks in an object store bucket")
func registerCompact(m map[string]setupFunc, app *kingpin.Application, component component.Component) {
cmd := app.Command(component.String(), "continuously compacts blocks in an object store bucket")

haltOnError := cmd.Flag("debug.halt-on-error", "Halt the process if a critical compaction error is detected.").
Hidden().Default("true").Bool()
Expand Down Expand Up @@ -110,7 +112,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").Int()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
m[component.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
*dataDir,
Expand All @@ -125,7 +127,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
compact.ResolutionLevel5m: time.Duration(*retention5m),
compact.ResolutionLevel1h: time.Duration(*retention1h),
},
name,
component,
*disableDownsampling,
*maxCompactionLevel,
*blockSyncConcurrency,
Expand All @@ -147,7 +149,7 @@ func runCompact(
wait bool,
generateMissingIndexCacheFiles bool,
retentionByResolution map[compact.ResolutionLevel]time.Duration,
component string,
component component.Component,
disableDownsampling bool,
maxCompactionLevel int,
blockSyncConcurrency int,
Expand All @@ -168,12 +170,18 @@ func runCompact(

downsampleMetrics := newDownsampleMetrics(reg)

readinessProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := defaultHTTPListener(g, logger, reg, httpBindAddr, readinessProber); err != nil {
return errors.Wrap(err, "create readiness prober")
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component)
bkt, err := client.NewBucket(logger, confContentYaml, reg, component.String())
if err != nil {
return err
}
Expand Down Expand Up @@ -315,11 +323,8 @@ func runCompact(
cancel()
})

if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr); err != nil {
return err
}

level.Info(logger).Log("msg", "starting compact node")
readinessProber.SetReady()
return nil
}

Expand Down
29 changes: 28 additions & 1 deletion cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/prober"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/tracing"
"github.com/improbable-eng/thanos/pkg/tracing/client"
Expand Down Expand Up @@ -73,7 +75,7 @@ func main() {
registerStore(cmds, app, "store")
registerQuery(cmds, app, "query")
registerRule(cmds, app, "rule")
registerCompact(cmds, app, "compact")
registerCompact(cmds, app, component.Compact)
registerBucket(cmds, app, "bucket")
registerDownsample(cmds, app, "downsample")
registerReceive(cmds, app, "receive")
Expand Down Expand Up @@ -307,6 +309,7 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o
return append(opts, grpc.Creds(credentials.NewTLS(tlsCfg))), nil
}

// TODO Remove once all components are migrated to the new defaultHTTPListener
// metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics.
func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string) error {
mux := http.NewServeMux()
Expand All @@ -326,3 +329,27 @@ func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Regi
})
return nil
}

// defaultHTTPListener starts a run.Group that servers HTTP endpoint with default endpoints providing Prometheus metrics,
// profiling and liveness/readiness probes.
func defaultHTTPListener(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string, readinessProber *prober.Prober) error {
mux := http.NewServeMux()
registerMetrics(mux, reg)
registerProfile(mux)
readinessProber.RegisterInMux(mux)

l, err := net.Listen("tcp", httpBindAddr)
if err != nil {
return errors.Wrap(err, "listen metrics address")
}

g.Add(func() error {
level.Info(logger).Log("msg", "Listening for metrics", "address", httpBindAddr)
readinessProber.SetHealthy()
return errors.Wrap(http.Serve(l, mux), "serve metrics")
}, func(err error) {
readinessProber.SetNotHealthy(err)
runutil.CloseWithLogOnErr(logger, l, "metric listener")
})
return nil
}

0 comments on commit 25100a8

Please sign in to comment.