Skip to content

Commit

Permalink
feat compact: added readiness Prober (#1297)
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Chodur <m.chodur@seznam.cz>
  • Loading branch information
FUSAKLA authored and GiedriusS committed Aug 5, 2019
1 parent e5c5112 commit 4cf32d0
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
### Changed

- [#1338](https://github.com/thanos-io/thanos/pull/1338) Querier still warns on store API duplicate, but allows a single one from duplicated set. This is gracefully warn about the problematic logic and not disrupt immediately.
- [#1297](https://github.com/improbable-eng/thanos/pull/1297) Added `/-/ready` and `/-/healthy` endpoints to Thanos compact.

### Fixed

Expand Down
34 changes: 20 additions & 14 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
kingpin "gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/alecthomas/kingpin.v2"
)

var (
Expand All @@ -49,7 +51,7 @@ func (cs compactionSet) String() string {
return strings.Join(result, ", ")
}

// levels returns set of compaction levels not higher than specified max compaction level
// levels returns set of compaction levels not higher than specified max compaction level.
func (cs compactionSet) levels(maxLevel int) ([]int64, error) {
if maxLevel >= len(cs) {
return nil, errors.Errorf("level is bigger then default set of %d", len(cs))
Expand All @@ -62,13 +64,14 @@ func (cs compactionSet) levels(maxLevel int) ([]int64, error) {
return levels, nil
}

// maxLevel returns max available compaction level
// maxLevel returns max available compaction level.
func (cs compactionSet) maxLevel() int {
return len(cs) - 1
}

func registerCompact(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "continuously compacts blocks in an object store bucket")
func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
comp := component.Compact
cmd := app.Command(comp.String(), "continuously compacts blocks in an object store bucket")

haltOnError := cmd.Flag("debug.halt-on-error", "Halt the process if a critical compaction error is detected.").
Hidden().Default("true").Bool()
Expand Down Expand Up @@ -110,7 +113,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").Int()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
*dataDir,
Expand All @@ -125,7 +128,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
compact.ResolutionLevel5m: time.Duration(*retention5m),
compact.ResolutionLevel1h: time.Duration(*retention1h),
},
name,
comp,
*disableDownsampling,
*maxCompactionLevel,
*blockSyncConcurrency,
Expand All @@ -147,7 +150,7 @@ func runCompact(
wait bool,
generateMissingIndexCacheFiles bool,
retentionByResolution map[compact.ResolutionLevel]time.Duration,
component string,
component component.Component,
disableDownsampling bool,
maxCompactionLevel int,
blockSyncConcurrency int,
Expand All @@ -168,12 +171,18 @@ func runCompact(

downsampleMetrics := newDownsampleMetrics(reg)

readinessProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := defaultHTTPListener(g, logger, reg, httpBindAddr, readinessProber); err != nil {
return errors.Wrap(err, "create readiness prober")
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component)
bkt, err := client.NewBucket(logger, confContentYaml, reg, component.String())
if err != nil {
return err
}
Expand Down Expand Up @@ -318,11 +327,8 @@ func runCompact(
cancel()
})

if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr); err != nil {
return err
}

level.Info(logger).Log("msg", "starting compact node")
readinessProber.SetReady()
return nil
}

Expand Down
30 changes: 28 additions & 2 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/version"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/tracing/client"
Expand Down Expand Up @@ -73,7 +74,7 @@ func main() {
registerStore(cmds, app, "store")
registerQuery(cmds, app, "query")
registerRule(cmds, app, "rule")
registerCompact(cmds, app, "compact")
registerCompact(cmds, app)
registerBucket(cmds, app, "bucket")
registerDownsample(cmds, app, "downsample")
registerReceive(cmds, app, "receive")
Expand Down Expand Up @@ -122,7 +123,7 @@ func main() {
)

prometheus.DefaultRegisterer = metrics
// Memberlist uses go-metrics
// Memberlist uses go-metrics.
sink, err := gprom.NewPrometheusSink()
if err != nil {
fmt.Fprintln(os.Stderr, errors.Wrapf(err, "%s command failed", cmd))
Expand Down Expand Up @@ -311,6 +312,7 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o
return append(opts, grpc.Creds(credentials.NewTLS(tlsCfg))), nil
}

// TODO Remove once all components are migrated to the new defaultHTTPListener.
// metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics.
func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string) error {
mux := http.NewServeMux()
Expand All @@ -330,3 +332,27 @@ func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Regi
})
return nil
}

// defaultHTTPListener starts a run.Group that servers HTTP endpoint with default endpoints providing Prometheus metrics,
// profiling and liveness/readiness probes.
func defaultHTTPListener(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string, readinessProber *prober.Prober) error {
mux := http.NewServeMux()
registerMetrics(mux, reg)
registerProfile(mux)
readinessProber.RegisterInMux(mux)

l, err := net.Listen("tcp", httpBindAddr)
if err != nil {
return errors.Wrap(err, "listen metrics address")
}

g.Add(func() error {
level.Info(logger).Log("msg", "listening for metrics", "address", httpBindAddr)
readinessProber.SetHealthy()
return errors.Wrap(http.Serve(l, mux), "serve metrics")
}, func(err error) {
readinessProber.SetNotHealthy(err)
runutil.CloseWithLogOnErr(logger, l, "metric listener")
})
return nil
}
8 changes: 8 additions & 0 deletions tutorials/kubernetes-demo/manifests/thanos-compactor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ spec:
ports:
- name: http
containerPort: 10902
livenessProbe:
httpGet:
port: 10902
path: /-/healthy
readinessProbe:
httpGet:
port: 10902
path: /-/ready
resources:
limits:
cpu: "1"
Expand Down

0 comments on commit 4cf32d0

Please sign in to comment.