From 023faa2d67a3050cd68cafd9c4e86e6915b79dc5 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Fri, 24 Nov 2023 12:47:24 +0100 Subject: [PATCH] Support reload using signal (#6453) * Support reload using signal Signed-off-by: Simon Pasquier * Add --reloader.method option to sidecar This option allows to tell the sidecar to send a SIGHUP signal to the monitored process to reload its configuration instead of the default HTTP-based method. Signed-off-by: Simon Pasquier * Update docs and CHANGELOG.md Signed-off-by: Simon Pasquier --------- Signed-off-by: Simon Pasquier --- CHANGELOG.md | 1 + cmd/thanos/config.go | 16 ++++ cmd/thanos/sidecar.go | 63 +++++++++------ docs/components/sidecar.md | 4 + go.mod | 1 + go.sum | 2 + pkg/reloader/reloader.go | 162 +++++++++++++++++++++++++++++++++---- pkg/reloader/tracker.go | 119 +++++++++++++++++++++++++++ 8 files changed, 329 insertions(+), 39 deletions(-) create mode 100644 pkg/reloader/tracker.go diff --git a/CHANGELOG.md b/CHANGELOG.md index ab281bdf4b..81c54dbfe7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Added - [#6891](https://github.com/thanos-io/thanos/pull/6891) Objstore: Bump `objstore` which adds support for Azure Workload Identity. +- [#6453](https://github.com/thanos-io/thanos/pull/6453) Sidecar: Added `--reloader.method` to support configuration reloads via SIHUP signal. ### Changed diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index 8ee36b333a..ec3878fc9e 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -114,8 +114,18 @@ type reloaderConfig struct { ruleDirectories []string watchInterval time.Duration retryInterval time.Duration + method string + processName string } +const ( + // HTTPReloadMethod reloads the configuration using the HTTP reload endpoint. + HTTPReloadMethod = "http" + + // SignalReloadMethod reloads the configuration sending a SIGHUP signal to the process. + SignalReloadMethod = "signal" +) + func (rc *reloaderConfig) registerFlag(cmd extkingpin.FlagClause) *reloaderConfig { cmd.Flag("reloader.config-file", "Config file watched by the reloader."). @@ -132,6 +142,12 @@ func (rc *reloaderConfig) registerFlag(cmd extkingpin.FlagClause) *reloaderConfi cmd.Flag("reloader.retry-interval", "Controls how often reloader retries config reload in case of error."). Default("5s").DurationVar(&rc.retryInterval) + cmd.Flag("reloader.method", + "Method used to reload the configuration."). + Default(HTTPReloadMethod).EnumVar(&rc.method, HTTPReloadMethod, SignalReloadMethod) + cmd.Flag("reloader.process-name", + "Executable name used to match the process being reloaded when using the signal method."). + Default("prometheus").StringVar(&rc.processName) return rc } diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index ab004791ef..3b8846d146 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -5,7 +5,9 @@ package main import ( "context" + "fmt" "math" + "net/http" "net/url" "sync" "time" @@ -62,18 +64,44 @@ func registerSidecar(app *extkingpin.App) { return errors.Wrap(err, "error while parsing config for request logging") } + httpConfContentYaml, err := conf.prometheus.httpClient.Content() + if err != nil { + return errors.Wrap(err, "getting http client config") + } + httpClientConfig, err := httpconfig.NewClientConfigFromYAML(httpConfContentYaml) + if err != nil { + return errors.Wrap(err, "parsing http config YAML") + } + + httpClient, err := httpconfig.NewHTTPClient(*httpClientConfig, "thanos-sidecar") + if err != nil { + return errors.Wrap(err, "Improper http client config") + } + + opts := reloader.Options{ + HTTPClient: *httpClient, + CfgFile: conf.reloader.confFile, + CfgOutputFile: conf.reloader.envVarConfFile, + WatchedDirs: conf.reloader.ruleDirectories, + WatchInterval: conf.reloader.watchInterval, + RetryInterval: conf.reloader.retryInterval, + } + + switch conf.reloader.method { + case HTTPReloadMethod: + opts.ReloadURL = reloader.ReloadURLFromBase(conf.prometheus.url) + case SignalReloadMethod: + opts.ProcessName = conf.reloader.processName + opts.RuntimeInfoURL = reloader.RuntimeInfoURLFromBase(conf.prometheus.url) + default: + return fmt.Errorf("invalid reload method: %s", conf.reloader.method) + } + rl := reloader.New(log.With(logger, "component", "reloader"), extprom.WrapRegistererWithPrefix("thanos_sidecar_", reg), - &reloader.Options{ - ReloadURL: reloader.ReloadURLFromBase(conf.prometheus.url), - CfgFile: conf.reloader.confFile, - CfgOutputFile: conf.reloader.envVarConfFile, - WatchedDirs: conf.reloader.ruleDirectories, - WatchInterval: conf.reloader.watchInterval, - RetryInterval: conf.reloader.retryInterval, - }) + &opts) - return runSidecar(g, logger, reg, tracer, rl, component.Sidecar, *conf, grpcLogOpts, tagOpts) + return runSidecar(g, logger, reg, tracer, rl, component.Sidecar, *conf, httpClient, grpcLogOpts, tagOpts) }) } @@ -85,28 +113,13 @@ func runSidecar( reloader *reloader.Reloader, comp component.Component, conf sidecarConfig, + httpClient *http.Client, grpcLogOpts []grpc_logging.Option, tagOpts []tags.Option, ) error { - httpConfContentYaml, err := conf.prometheus.httpClient.Content() - if err != nil { - return errors.Wrap(err, "getting http client config") - } - httpClientConfig, err := httpconfig.NewClientConfigFromYAML(httpConfContentYaml) - if err != nil { - return errors.Wrap(err, "parsing http config YAML") - } - - httpClient, err := httpconfig.NewHTTPClient(*httpClientConfig, "thanos-sidecar") - if err != nil { - return errors.Wrap(err, "Improper http client config") - } - - reloader.SetHttpClient(*httpClient) var m = &promMetadata{ promURL: conf.prometheus.url, - // Start out with the full time range. The shipper will constrain it later. // TODO(fabxc): minimum timestamp is never adjusted if shipping is disabled. mint: conf.limitMinTime.PrometheusTimestamp(), diff --git a/docs/components/sidecar.md b/docs/components/sidecar.md index 6dec56daeb..8a31801b13 100644 --- a/docs/components/sidecar.md +++ b/docs/components/sidecar.md @@ -153,6 +153,10 @@ Flags: Output file for environment variable substituted config file. --reloader.config-file="" Config file watched by the reloader. + --reloader.method=http Method used to reload the configuration. + --reloader.process-name="prometheus" + Executable name used to match the process being + reloaded when using the signal method. --reloader.retry-interval=5s Controls how often reloader retries config reload in case of error. diff --git a/go.mod b/go.mod index bcc0a7ce8f..cae8f8e69a 100644 --- a/go.mod +++ b/go.mod @@ -117,6 +117,7 @@ require ( ) require ( + github.com/mitchellh/go-ps v1.0.0 github.com/onsi/gomega v1.27.10 go.opentelemetry.io/contrib/propagators/autoprop v0.38.0 go4.org/intern v0.0.0-20230525184215-6c62f75575cb diff --git a/go.sum b/go.sum index 099ac6be2d..a16afe7fce 100644 --- a/go.sum +++ b/go.sum @@ -733,6 +733,8 @@ github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceT github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc= +github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg= github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= diff --git a/pkg/reloader/reloader.go b/pkg/reloader/reloader.go index 2057ea5f75..f4ba89a0ec 100644 --- a/pkg/reloader/reloader.go +++ b/pkg/reloader/reloader.go @@ -56,6 +56,7 @@ import ( "bytes" "compress/gzip" "context" + "fmt" "hash" "io" "net/http" @@ -66,12 +67,14 @@ import ( "regexp" "strings" "sync" + "syscall" "time" "github.com/fsnotify/fsnotify" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/minio/sha256-simd" + ps "github.com/mitchellh/go-ps" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -84,8 +87,6 @@ import ( // Referenced environment variables must be of the form `$(var)` (not `$var` or `${var}`). type Reloader struct { logger log.Logger - reloadURL *url.URL - httpClient http.Client cfgFile string cfgOutputFile string watchInterval time.Duration @@ -93,6 +94,8 @@ type Reloader struct { watchedDirs []string watcher *watcher + tr TriggerReloader + lastCfgHash []byte lastWatchedDirsHash []byte forceReload bool @@ -103,12 +106,33 @@ type Reloader struct { lastReloadSuccessTimestamp prometheus.Gauge configApplyErrors prometheus.Counter configApply prometheus.Counter + reloaderInfo *prometheus.GaugeVec +} + +// TriggerReloader reloads the configuration of the process. +type TriggerReloader interface { + TriggerReload(ctx context.Context) error } // Options bundles options for the Reloader. type Options struct { - // ReloadURL is a prometheus URL to trigger reloads. + // ReloadURL is the Prometheus URL to trigger reloads. ReloadURL *url.URL + + // HTTP client used to connect to the web server. + HTTPClient http.Client + + // ProcessName is the process executable name to trigger reloads. If not + // empty, the reloader sends a SIGHUP signal to the matching process ID + // instead of using the HTTP reload endpoint. + ProcessName string + // RuntimeInfoURL is the Prometheus URL returning runtime information + // including the last configuration status (e.g. `/api/v1/status/runtimeinfo`). + // It is only relevant for signal-based reloads. + // If empty, the reloader will not be able to assess that the reloading is + // successful. + RuntimeInfoURL *url.URL + // CfgFile is a path to the prometheus config file to watch. CfgFile string // CfgOutputFile is a path for the output config file. @@ -124,7 +148,7 @@ type Options struct { // WatchInterval controls how often reloader re-reads config and directories. WatchInterval time.Duration // RetryInterval controls how often the reloader retries a reloading of the - // configuration in case the endpoint returned an error. + // configuration in case the reload operation returned an error. RetryInterval time.Duration } @@ -138,7 +162,6 @@ func New(logger log.Logger, reg prometheus.Registerer, o *Options) *Reloader { } r := &Reloader{ logger: logger, - reloadURL: o.ReloadURL, cfgFile: o.CfgFile, cfgOutputFile: o.CfgOutputFile, watcher: newWatcher(logger, reg, o.DelayInterval), @@ -182,7 +205,23 @@ func New(logger log.Logger, reg prometheus.Registerer, o *Options) *Reloader { Help: "Total number of config apply operations that failed.", }, ), + reloaderInfo: promauto.With(reg).NewGaugeVec( + prometheus.GaugeOpts{ + Name: "reloader_info", + Help: "A metric with a constant '1' value labeled by reload method (either 'http' or 'signal').", + }, + []string{"method"}, + ), } + + if o.ProcessName != "" { + r.tr = NewPIDReloader(r.logger, o.ProcessName, o.RuntimeInfoURL, o.HTTPClient) + r.reloaderInfo.WithLabelValues("signal").Set(1) + } else { + r.tr = NewHTTPReloader(r.logger, o.ReloadURL, o.HTTPClient) + r.reloaderInfo.WithLabelValues("http").Set(1) + } + return r } @@ -201,6 +240,12 @@ func (r *Reloader) Watch(ctx context.Context) error { return nil } + if _, ok := r.tr.(*PIDReloader); ok { + level.Info(r.logger).Log("msg", "reloading via process signal") + } else { + level.Info(r.logger).Log("msg", "reloading via HTTP") + } + defer runutil.CloseWithLogOnErr(r.logger, r.watcher, "config watcher close") if r.cfgFile != "" { @@ -274,6 +319,7 @@ func (r *Reloader) apply(ctx context.Context) error { cfgHash []byte watchedDirsHash []byte ) + if r.cfgFile != "" { h := sha256.New() if err := hashFile(h, r.cfgFile); err != nil { @@ -350,6 +396,7 @@ func (r *Reloader) apply(ctx context.Context) error { return errors.Wrap(err, "build hash") } } + if len(r.watchedDirs) > 0 { watchedDirsHash = h.Sum(nil) } @@ -363,6 +410,7 @@ func (r *Reloader) apply(ctx context.Context) error { if r.watchInterval == 0 { return nil } + r.reloads.Inc() if err := r.triggerReload(ctx); err != nil { r.reloadErrors.Inc() @@ -389,6 +437,14 @@ func (r *Reloader) apply(ctx context.Context) error { return nil } +func (r *Reloader) triggerReload(ctx context.Context) error { + if err := r.tr.TriggerReload(ctx); err != nil { + return err + } + + return nil +} + func hashFile(h hash.Hash, fn string) error { f, err := os.Open(filepath.Clean(fn)) if err != nil { @@ -412,28 +468,101 @@ func hashFile(h hash.Hash, fn string) error { return nil } -func (r *Reloader) triggerReload(ctx context.Context) error { - req, err := http.NewRequest("POST", r.reloadURL.String(), nil) +type PIDReloader struct { + pname string + prt *prometheusReloadTracker +} + +func NewPIDReloader(logger log.Logger, processName string, u *url.URL, c http.Client) *PIDReloader { + return &PIDReloader{ + pname: processName, + prt: &prometheusReloadTracker{ + client: c, + runtimeURL: u, + logger: logger, + }, + } +} + +func (pr *PIDReloader) TriggerReload(ctx context.Context) error { + if err := pr.prt.preReload(ctx); err != nil { + return fmt.Errorf("pre-reload check failed: %w", err) + } + + procs, err := ps.Processes() + if err != nil { + return fmt.Errorf("list processes: %w", err) + } + + var proc ps.Process + for i := range procs { + if pr.pname == procs[i].Executable() { + proc = procs[i] + break + } + } + + if proc == nil { + return fmt.Errorf("failed to find process matching %q", pr.pname) + } + + p, err := os.FindProcess(proc.Pid()) + if err != nil { + return fmt.Errorf("find process err: %w", err) + } + + if p == nil { + return fmt.Errorf("failed to find process with pid %d", proc.Pid()) + } + + if err := p.Signal(syscall.SIGHUP); err != nil { + return fmt.Errorf("failed to send SIGHUP to pid %d: %w", p.Pid, err) + } + + if err := pr.prt.postReload(ctx); err != nil { + return fmt.Errorf("post-reload check failed: %w", err) + } + + return nil +} + +var _ = TriggerReloader(&PIDReloader{}) + +type HTTPReloader struct { + logger log.Logger + + u *url.URL + c http.Client +} + +var _ = TriggerReloader(&HTTPReloader{}) + +func NewHTTPReloader(logger log.Logger, u *url.URL, c http.Client) *HTTPReloader { + return &HTTPReloader{ + logger: logger, + u: u, + c: c, + } +} + +func (hr *HTTPReloader) TriggerReload(ctx context.Context) error { + req, err := http.NewRequest("POST", hr.u.String(), nil) if err != nil { return errors.Wrap(err, "create request") } req = req.WithContext(ctx) - resp, err := r.httpClient.Do(req) + resp, err := hr.c.Do(req) if err != nil { return errors.Wrap(err, "reload request failed") } - defer runutil.ExhaustCloseWithLogOnErr(r.logger, resp.Body, "trigger reload resp body") + defer runutil.ExhaustCloseWithLogOnErr(hr.logger, resp.Body, "trigger reload resp body") if resp.StatusCode != 200 { return errors.Errorf("received non-200 response: %s; have you set `--web.enable-lifecycle` Prometheus flag?", resp.Status) } - return nil -} -// SetHttpClient sets Http client for reloader. -func (r *Reloader) SetHttpClient(client http.Client) { - r.httpClient = client + return nil } // ReloadURLFromBase returns the standard Prometheus reload URL from its base URL. @@ -443,6 +572,11 @@ func ReloadURLFromBase(u *url.URL) *url.URL { return &r } +// RuntimeInfoURLFromBase returns the standard Prometheus runtime info URL from its base URL. +func RuntimeInfoURLFromBase(u *url.URL) *url.URL { + return u.JoinPath("/api/v1/status/runtimeinfo") +} + var envRe = regexp.MustCompile(`\$\(([a-zA-Z_0-9]+)\)`) func expandEnv(b []byte) (r []byte, err error) { diff --git a/pkg/reloader/tracker.go b/pkg/reloader/tracker.go new file mode 100644 index 0000000000..0ba6fe576e --- /dev/null +++ b/pkg/reloader/tracker.go @@ -0,0 +1,119 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package reloader + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" +) + +// prometheusReloadTracker keeps track of the last configuration status. +type prometheusReloadTracker struct { + runtimeURL *url.URL + client http.Client + logger log.Logger + + lastReload time.Time + lastSuccess bool +} + +func (prt *prometheusReloadTracker) getConfigStatus(ctx context.Context) (bool, time.Time, error) { + r, err := http.NewRequestWithContext(ctx, "GET", prt.runtimeURL.String(), nil) + if err != nil { + return false, time.Time{}, err + } + + resp, err := prt.client.Do(r) + if err != nil { + return false, time.Time{}, fmt.Errorf("%s: %w", r.URL.String(), err) + } + defer resp.Body.Close() + + if resp.StatusCode/100 != 2 { + return false, time.Time{}, fmt.Errorf("%s: invalid status code: %d", r.URL.String(), resp.StatusCode) + } + + var runtimeInfo = struct { + Status string `json:"status"` + Data struct { + ReloadConfigSuccess bool `json:"reloadConfigSuccess"` + LastConfigTime time.Time `json:"lastConfigTime"` + } `json:"data"` + }{} + + if err := json.NewDecoder(resp.Body).Decode(&runtimeInfo); err != nil { + return false, time.Time{}, fmt.Errorf("invalid response: %w", err) + } + + if runtimeInfo.Status != "success" { + return false, time.Time{}, fmt.Errorf("unexpected status: %s", runtimeInfo.Status) + } + + return runtimeInfo.Data.ReloadConfigSuccess, runtimeInfo.Data.LastConfigTime, nil +} + +func (prt *prometheusReloadTracker) preReload(ctx context.Context) error { + if prt.runtimeURL == nil { + level.Info(prt.logger).Log("msg", "Pre-reload check skipped because the runtimeInfo URL isn't defined") + return nil + } + + success, last, err := prt.getConfigStatus(ctx) + if err != nil { + return fmt.Errorf("failed to get config status before reload: %w", err) + } + + level.Debug(prt.logger).Log("msg", "Pre-reload check", "success", success, "last_config_reload", last) + prt.lastReload = last + prt.lastSuccess = success + + return nil +} + +func (prt *prometheusReloadTracker) postReload(ctx context.Context) error { + if prt.runtimeURL == nil { + level.Info(prt.logger).Log("msg", "Post-reload check skipped because the runtimeInfo URL isn't defined") + return nil + } + + t := time.NewTicker(time.Second) + defer t.Stop() + + for { + success, last, err := prt.getConfigStatus(ctx) + if err != nil { + return fmt.Errorf("failed to get config status after reload: %w", err) + } + level.Debug(prt.logger).Log("msg", "Post-reload hook in progress", "config_reload_success", success, "config_reload_success_ts", last) + + // The configuration has been updated successfully. + if success && last.After(prt.lastReload) { + level.Debug(prt.logger).Log("msg", "Post-reload hook successful") + return nil + } + + // The previous configuration was valid but the current configuration + // isn't so there's no need to poll again for the status. + if prt.lastSuccess && !success { + return fmt.Errorf("configuration reload has failed") + } + + select { + case <-ctx.Done(): + result := "failed" + if success { + result = "successful" + } + return fmt.Errorf("%w: configuration reload %s (last successful reload at %v)", ctx.Err(), result, last) + case <-t.C: + } + } +}