Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat compact: added readiness Prober #1297

Merged
merged 1 commit into from
Aug 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
### Changed

- [#1338](https://github.com/thanos-io/thanos/pull/1338) Querier still warns on store API duplicate, but allows a single one from duplicated set. This is gracefully warn about the problematic logic and not disrupt immediately.
- [#1297](https://github.com/improbable-eng/thanos/pull/1297) Added `/-/ready` and `/-/healthy` endpoints to Thanos compact.

### Fixed

Expand Down
34 changes: 20 additions & 14 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
kingpin "gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/alecthomas/kingpin.v2"
)

var (
Expand All @@ -49,7 +51,7 @@ func (cs compactionSet) String() string {
return strings.Join(result, ", ")
}

// levels returns set of compaction levels not higher than specified max compaction level
// levels returns set of compaction levels not higher than specified max compaction level.
func (cs compactionSet) levels(maxLevel int) ([]int64, error) {
if maxLevel >= len(cs) {
return nil, errors.Errorf("level is bigger then default set of %d", len(cs))
Expand All @@ -62,13 +64,14 @@ func (cs compactionSet) levels(maxLevel int) ([]int64, error) {
return levels, nil
}

// maxLevel returns max available compaction level
// maxLevel returns max available compaction level.
func (cs compactionSet) maxLevel() int {
return len(cs) - 1
}

func registerCompact(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "continuously compacts blocks in an object store bucket")
func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
comp := component.Compact
cmd := app.Command(comp.String(), "continuously compacts blocks in an object store bucket")

haltOnError := cmd.Flag("debug.halt-on-error", "Halt the process if a critical compaction error is detected.").
Hidden().Default("true").Bool()
Expand Down Expand Up @@ -110,7 +113,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
compactionConcurrency := cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").Int()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
m[comp.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
*dataDir,
Expand All @@ -125,7 +128,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
compact.ResolutionLevel5m: time.Duration(*retention5m),
compact.ResolutionLevel1h: time.Duration(*retention1h),
},
name,
comp,
*disableDownsampling,
*maxCompactionLevel,
*blockSyncConcurrency,
Expand All @@ -147,7 +150,7 @@ func runCompact(
wait bool,
generateMissingIndexCacheFiles bool,
retentionByResolution map[compact.ResolutionLevel]time.Duration,
component string,
component component.Component,
disableDownsampling bool,
maxCompactionLevel int,
blockSyncConcurrency int,
Expand All @@ -168,12 +171,18 @@ func runCompact(

downsampleMetrics := newDownsampleMetrics(reg)

readinessProber := prober.NewProber(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate default HTTP listener providing metrics endpoint and readiness/liveness probes.
if err := defaultHTTPListener(g, logger, reg, httpBindAddr, readinessProber); err != nil {
return errors.Wrap(err, "create readiness prober")
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component)
bkt, err := client.NewBucket(logger, confContentYaml, reg, component.String())
if err != nil {
return err
}
Expand Down Expand Up @@ -318,11 +327,8 @@ func runCompact(
cancel()
})

if err := metricHTTPListenGroup(g, logger, reg, httpBindAddr); err != nil {
return err
}

level.Info(logger).Log("msg", "starting compact node")
readinessProber.SetReady()
return nil
}

Expand Down
30 changes: 28 additions & 2 deletions cmd/thanos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/version"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/thanos-io/thanos/pkg/tracing/client"
Expand Down Expand Up @@ -73,7 +74,7 @@ func main() {
registerStore(cmds, app, "store")
registerQuery(cmds, app, "query")
registerRule(cmds, app, "rule")
registerCompact(cmds, app, "compact")
registerCompact(cmds, app)
registerBucket(cmds, app, "bucket")
registerDownsample(cmds, app, "downsample")
registerReceive(cmds, app, "receive")
Expand Down Expand Up @@ -122,7 +123,7 @@ func main() {
)

prometheus.DefaultRegisterer = metrics
// Memberlist uses go-metrics
// Memberlist uses go-metrics.
sink, err := gprom.NewPrometheusSink()
if err != nil {
fmt.Fprintln(os.Stderr, errors.Wrapf(err, "%s command failed", cmd))
Expand Down Expand Up @@ -311,6 +312,7 @@ func defaultGRPCServerOpts(logger log.Logger, reg *prometheus.Registry, tracer o
return append(opts, grpc.Creds(credentials.NewTLS(tlsCfg))), nil
}

// TODO Remove once all components are migrated to the new defaultHTTPListener.
// metricHTTPListenGroup is a run.Group that servers HTTP endpoint with only Prometheus metrics.
func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string) error {
mux := http.NewServeMux()
Expand All @@ -330,3 +332,27 @@ func metricHTTPListenGroup(g *run.Group, logger log.Logger, reg *prometheus.Regi
})
return nil
}

// defaultHTTPListener starts a run.Group that servers HTTP endpoint with default endpoints providing Prometheus metrics,
// profiling and liveness/readiness probes.
func defaultHTTPListener(g *run.Group, logger log.Logger, reg *prometheus.Registry, httpBindAddr string, readinessProber *prober.Prober) error {
mux := http.NewServeMux()
registerMetrics(mux, reg)
registerProfile(mux)
readinessProber.RegisterInMux(mux)

l, err := net.Listen("tcp", httpBindAddr)
if err != nil {
return errors.Wrap(err, "listen metrics address")
}

g.Add(func() error {
level.Info(logger).Log("msg", "listening for metrics", "address", httpBindAddr)
readinessProber.SetHealthy()
return errors.Wrap(http.Serve(l, mux), "serve metrics")
}, func(err error) {
readinessProber.SetNotHealthy(err)
runutil.CloseWithLogOnErr(logger, l, "metric listener")
})
return nil
}
8 changes: 8 additions & 0 deletions tutorials/kubernetes-demo/manifests/thanos-compactor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ spec:
ports:
- name: http
containerPort: 10902
livenessProbe:
httpGet:
port: 10902
path: /-/healthy
readinessProbe:
httpGet:
port: 10902
path: /-/ready
resources:
limits:
cpu: "1"
Expand Down