Skip to content

Commit

Permalink
Support reload using signal
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Pasquier <spasquie@redhat.com>
  • Loading branch information
simonpasquier committed Nov 15, 2023
1 parent ecb8bb8 commit 0a817f3
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 31 deletions.
35 changes: 18 additions & 17 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main
import (
"context"
"math"
"net/http"
"net/url"
"sync"
"time"
Expand Down Expand Up @@ -62,18 +63,33 @@ func registerSidecar(app *extkingpin.App) {
return errors.Wrap(err, "error while parsing config for request logging")
}

httpConfContentYaml, err := conf.prometheus.httpClient.Content()
if err != nil {
return errors.Wrap(err, "getting http client config")
}
httpClientConfig, err := httpconfig.NewClientConfigFromYAML(httpConfContentYaml)
if err != nil {
return errors.Wrap(err, "parsing http config YAML")
}

httpClient, err := httpconfig.NewHTTPClient(*httpClientConfig, "thanos-sidecar")
if err != nil {
return errors.Wrap(err, "Improper http client config")
}

rl := reloader.New(log.With(logger, "component", "reloader"),
extprom.WrapRegistererWithPrefix("thanos_sidecar_", reg),
&reloader.Options{
ReloadURL: reloader.ReloadURLFromBase(conf.prometheus.url),
HTTPClient: *httpClient,
CfgFile: conf.reloader.confFile,
CfgOutputFile: conf.reloader.envVarConfFile,
WatchedDirs: conf.reloader.ruleDirectories,
WatchInterval: conf.reloader.watchInterval,
RetryInterval: conf.reloader.retryInterval,
})

return runSidecar(g, logger, reg, tracer, rl, component.Sidecar, *conf, grpcLogOpts, tagOpts)
return runSidecar(g, logger, reg, tracer, rl, component.Sidecar, *conf, httpClient, grpcLogOpts, tagOpts)
})
}

Expand All @@ -85,28 +101,13 @@ func runSidecar(
reloader *reloader.Reloader,
comp component.Component,
conf sidecarConfig,
httpClient *http.Client,
grpcLogOpts []grpc_logging.Option,
tagOpts []tags.Option,
) error {
httpConfContentYaml, err := conf.prometheus.httpClient.Content()
if err != nil {
return errors.Wrap(err, "getting http client config")
}
httpClientConfig, err := httpconfig.NewClientConfigFromYAML(httpConfContentYaml)
if err != nil {
return errors.Wrap(err, "parsing http config YAML")
}

httpClient, err := httpconfig.NewHTTPClient(*httpClientConfig, "thanos-sidecar")
if err != nil {
return errors.Wrap(err, "Improper http client config")
}

reloader.SetHttpClient(*httpClient)

var m = &promMetadata{
promURL: conf.prometheus.url,

// Start out with the full time range. The shipper will constrain it later.
// TODO(fabxc): minimum timestamp is never adjusted if shipping is disabled.
mint: conf.limitMinTime.PrometheusTimestamp(),
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ require (
)

require (
github.com/mitchellh/go-ps v1.0.0
github.com/onsi/gomega v1.27.10
go.opentelemetry.io/contrib/propagators/autoprop v0.38.0
go4.org/intern v0.0.0-20230525184215-6c62f75575cb
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,8 @@ github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceT
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc=
github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
Expand Down
157 changes: 143 additions & 14 deletions pkg/reloader/reloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"bytes"
"compress/gzip"
"context"
"fmt"
"hash"
"io"
"net/http"
Expand All @@ -66,12 +67,14 @@ import (
"regexp"
"strings"
"sync"
"syscall"
"time"

"github.com/fsnotify/fsnotify"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/minio/sha256-simd"
ps "github.com/mitchellh/go-ps"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -84,15 +87,17 @@ import (
// Referenced environment variables must be of the form `$(var)` (not `$var` or `${var}`).
type Reloader struct {
logger log.Logger
reloadURL *url.URL
httpClient http.Client
cfgFile string
cfgOutputFile string
watchInterval time.Duration
retryInterval time.Duration
watchedDirs []string
watcher *watcher

tr TriggerReloader
preHook CallbackFunc
postHook CallbackFunc

lastCfgHash []byte
lastWatchedDirsHash []byte
forceReload bool
Expand All @@ -105,10 +110,21 @@ type Reloader struct {
configApply prometheus.Counter
}

// TriggerReloader reloads the configuration of the process.
type TriggerReloader interface {
TriggerReload(ctx context.Context) error
}

// Options bundles options for the Reloader.
type Options struct {
// ReloadURL is a prometheus URL to trigger reloads.
// ReloadURL is the Prometheus URL to trigger reloads.
ReloadURL *url.URL
// HTTP client used to connect to the web server.
HTTPClient http.Client
// ProcessName is the process executable name to trigger reloads. If not
// empty, the reloader sends a SIGHUP signal to the matching process ID
// instead of using the HTTP reload endpoint.
ProcessName string
// CfgFile is a path to the prometheus config file to watch.
CfgFile string
// CfgOutputFile is a path for the output config file.
Expand All @@ -124,10 +140,22 @@ type Options struct {
// WatchInterval controls how often reloader re-reads config and directories.
WatchInterval time.Duration
// RetryInterval controls how often the reloader retries a reloading of the
// configuration in case the endpoint returned an error.
// configuration in case the reload operation returned an error.
RetryInterval time.Duration

// PreReloadHook is a function called before reloading the configuration.
// If it returns an error, the reloader doesn't proceed with the config
// reload.
PreReloadHook CallbackFunc

// PostReloadHook is a function called after reloading the configuration.
// The config reload will not be considered successful until it returns no
// error.
PostReloadHook CallbackFunc
}

type CallbackFunc func(context.Context) error

var firstGzipBytes = []byte{0x1f, 0x8b, 0x08}

// New creates a new reloader that watches the given config file and directories
Expand All @@ -138,14 +166,16 @@ func New(logger log.Logger, reg prometheus.Registerer, o *Options) *Reloader {
}
r := &Reloader{
logger: logger,
reloadURL: o.ReloadURL,
cfgFile: o.CfgFile,
cfgOutputFile: o.CfgOutputFile,
watcher: newWatcher(logger, reg, o.DelayInterval),
watchedDirs: o.WatchedDirs,
watchInterval: o.WatchInterval,
retryInterval: o.RetryInterval,

preHook: o.PreReloadHook,
postHook: o.PostReloadHook,

reloads: promauto.With(reg).NewCounter(
prometheus.CounterOpts{
Name: "reloader_reloads_total",
Expand Down Expand Up @@ -183,6 +213,21 @@ func New(logger log.Logger, reg prometheus.Registerer, o *Options) *Reloader {
},
),
}

if r.preHook == nil {
r.preHook = func(context.Context) error { return nil }
}

if r.postHook == nil {
r.postHook = func(context.Context) error { return nil }
}

if o.ProcessName != "" {
r.tr = NewPIDReloader(o.ProcessName)
} else {
r.tr = NewHTTPReloader(r.logger, o.ReloadURL, o.HTTPClient)
}

return r
}

Expand All @@ -201,6 +246,12 @@ func (r *Reloader) Watch(ctx context.Context) error {
return nil
}

if _, ok := r.tr.(*PIDReloader); ok {
level.Info(r.logger).Log("msg", "reloading via process signal")
} else {
level.Info(r.logger).Log("msg", "reloading via HTTP")
}

defer runutil.CloseWithLogOnErr(r.logger, r.watcher, "config watcher close")

if r.cfgFile != "" {
Expand Down Expand Up @@ -274,6 +325,7 @@ func (r *Reloader) apply(ctx context.Context) error {
cfgHash []byte
watchedDirsHash []byte
)

if r.cfgFile != "" {
h := sha256.New()
if err := hashFile(h, r.cfgFile); err != nil {
Expand Down Expand Up @@ -350,6 +402,7 @@ func (r *Reloader) apply(ctx context.Context) error {
return errors.Wrap(err, "build hash")
}
}

if len(r.watchedDirs) > 0 {
watchedDirsHash = h.Sum(nil)
}
Expand All @@ -363,6 +416,7 @@ func (r *Reloader) apply(ctx context.Context) error {
if r.watchInterval == 0 {
return nil
}

r.reloads.Inc()
if err := r.triggerReload(ctx); err != nil {
r.reloadErrors.Inc()
Expand All @@ -389,6 +443,22 @@ func (r *Reloader) apply(ctx context.Context) error {
return nil
}

func (r *Reloader) triggerReload(ctx context.Context) error {
if err := r.preHook(ctx); err != nil {
return fmt.Errorf("prehook reload: %w", err)
}

if err := r.tr.TriggerReload(ctx); err != nil {
return err
}

if err := r.postHook(ctx); err != nil {
return fmt.Errorf("posthook reload: %w", err)
}

return nil
}

func hashFile(h hash.Hash, fn string) error {
f, err := os.Open(filepath.Clean(fn))
if err != nil {
Expand All @@ -412,28 +482,87 @@ func hashFile(h hash.Hash, fn string) error {
return nil
}

func (r *Reloader) triggerReload(ctx context.Context) error {
req, err := http.NewRequest("POST", r.reloadURL.String(), nil)
type PIDReloader struct {
pname string
}

func NewPIDReloader(pname string) *PIDReloader {
return &PIDReloader{
pname: pname,
}
}

func (pr *PIDReloader) TriggerReload(ctx context.Context) error {
procs, err := ps.Processes()
if err != nil {
return fmt.Errorf("list processes: %w", err)
}

var proc ps.Process
for i := range procs {
if pr.pname == procs[i].Executable() {
proc = procs[i]
break
}
}

if proc == nil {
return fmt.Errorf("failed to find process matching %q", pr.pname)
}

p, err := os.FindProcess(proc.Pid())
if err != nil {
return fmt.Errorf("find process err: %w", err)
}

if proc == nil {
return fmt.Errorf("failed to find process with pid %d", proc.Pid())
}

if err := p.Signal(syscall.SIGHUP); err != nil {
return fmt.Errorf("failed to send SIGHUP to pid %d: %w", p.Pid, err)
}

return nil
}

var _ = TriggerReloader(&PIDReloader{})

type HTTPReloader struct {
logger log.Logger

u *url.URL
c http.Client
}

var _ = TriggerReloader(&HTTPReloader{})

func NewHTTPReloader(logger log.Logger, u *url.URL, c http.Client) *HTTPReloader {
return &HTTPReloader{
logger: logger,
u: u,
c: c,
}
}

func (hr *HTTPReloader) TriggerReload(ctx context.Context) error {
req, err := http.NewRequest("POST", hr.u.String(), nil)
if err != nil {
return errors.Wrap(err, "create request")
}
req = req.WithContext(ctx)

resp, err := r.httpClient.Do(req)
resp, err := hr.c.Do(req)
if err != nil {
return errors.Wrap(err, "reload request failed")
}
defer runutil.ExhaustCloseWithLogOnErr(r.logger, resp.Body, "trigger reload resp body")
defer runutil.ExhaustCloseWithLogOnErr(hr.logger, resp.Body, "trigger reload resp body")

if resp.StatusCode != 200 {
return errors.Errorf("received non-200 response: %s; have you set `--web.enable-lifecycle` Prometheus flag?", resp.Status)
}
return nil
}

// SetHttpClient sets Http client for reloader.
func (r *Reloader) SetHttpClient(client http.Client) {
r.httpClient = client
return nil
}

// ReloadURLFromBase returns the standard Prometheus reload URL from its base URL.
Expand Down

0 comments on commit 0a817f3

Please sign in to comment.