diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 1e31b54d15..538f25016f 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -2,6 +2,8 @@ package main import ( "context" + "github.com/hashicorp/go-version" + "github.com/prometheus/common/model" "math" "net" "net/http" @@ -53,6 +55,8 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri uploadCompacted := cmd.Flag("shipper.upload-compacted", "[Experimental] If true sidecar will try to upload compacted blocks as well. Useful for migration purposes. Works only if compaction is disabled on Prometheus.").Default("false").Hidden().Bool() + validateProm := cmd.Flag("sidecar.validate-prom", "[Experimental]If true sidecar will check Prometheus' flags to ensure disabled compaction and 2h block-time.").Default("true").Hidden().Bool() + m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error { rl := reloader.New( log.With(logger, "component", "reloader"), @@ -81,6 +85,7 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri peer, rl, *uploadCompacted, + *validateProm, ) } } @@ -101,6 +106,7 @@ func runSidecar( peer cluster.Peer, reloader *reloader.Reloader, uploadCompacted bool, + validateProm bool, ) error { var m = &promMetadata{ promURL: promURL, @@ -125,6 +131,40 @@ func runSidecar( ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { + if validateProm { + // Retry infinitely until we get Prometheus version. + err := runutil.Retry(2*time.Second, ctx.Done(), func() error { + err := m.FetchPromVersion(logger) + if err != nil { + level.Warn(logger).Log( + "msg", "failed to get Prometheus version. Is Prometheus running? Retrying", + "err", err, + ) + return errors.Wrapf(err, "fetch Prometheus version") + } + return nil + }) + if err != nil { + return err + } + + if m.version == nil { + level.Warn(logger).Log("msg", "can't fetch version, skip validation") + } else { + // Check if Prometheus has /status/flags endpoint. + if m.version.LessThan(promclient.FlagsVersion) { + level.Warn(logger).Log("msg", + "Prometheus doesn't support flags endpoint, skip validation", "version", m.version.Original()) + return nil + } + + // Check prometheus's flags to ensure sane sidecar flags. + if err := validatePrometheus(ctx, logger, promURL, dataDir); err != nil { + return errors.Wrap(err, "validate Prometheus flags") + } + } + } + // Blocking query of external labels before joining as a Source Peer into gossip. // We retry infinitely until we reach and fetch labels from our Prometheus. err := runutil.Retry(2*time.Second, ctx.Done(), func() error { @@ -298,13 +338,31 @@ func runSidecar( return nil } +func validatePrometheus(ctx context.Context, logger log.Logger, promURL *url.URL, tsdbPath string) error { + flags, err := promclient.ConfiguredFlags(ctx, logger, promURL) + if err != nil { + return errors.Wrap(err, "configured flags; failed to check flags") + } + // Check if min-block-time and max-block-time are the same. + if flags.TSDBMinTime != flags.TSDBMaxTime { + return errors.New("TSDB Min-block-time mismatches with Max-block-time") + } + // Check if block-time equals 2h. + if flags.TSDBMinTime != model.Duration(2*time.Hour) { + level.Warn(logger).Log("msg", "TSDB Max-block-time and Min-block-time should be configured to 2h", "block-time", flags.TSDBMinTime) + } + + return nil +} + type promMetadata struct { promURL *url.URL - mtx sync.Mutex - mint int64 - maxt int64 - labels labels.Labels + mtx sync.Mutex + mint int64 + maxt int64 + labels labels.Labels + version *version.Version } func (s *promMetadata) UpdateLabels(ctx context.Context, logger log.Logger) error { @@ -355,3 +413,8 @@ func (s *promMetadata) Timestamps() (mint int64, maxt int64) { return s.mint, s.maxt } + +func (s *promMetadata) FetchPromVersion(logger log.Logger) (err error) { + s.version, err = promclient.GetPromVersion(logger, s.promURL) + return err +} \ No newline at end of file diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index e70fbb245b..8b7596493f 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -6,6 +6,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/hashicorp/go-version" "io/ioutil" "net/http" "net/url" @@ -28,6 +29,12 @@ import ( "gopkg.in/yaml.v2" ) +var FlagsVersion *version.Version + +func init() { + FlagsVersion, _ = version.NewVersion("2.2.0") +} + // IsWALFileAccesible returns no error if WAL dir can be found. This helps to tell // if we have access to Prometheus TSDB directory. func IsWALDirAccesible(dir string) error { @@ -335,6 +342,32 @@ func PromqlQueryInstant(ctx context.Context, logger log.Logger, base *url.URL, q return vec, nil } +// GetPromVersion will return the version of Prometheus by querying /version Prometheus endpoint. +func GetPromVersion(logger log.Logger, base *url.URL) (*version.Version, error) { + if logger == nil { + logger = log.NewNopLogger() + } + + u := *base + u.Path = path.Join(u.Path, "/version") + resp, err := http.Get(u.String()) + if err != nil { + return nil, errors.Wrapf(err, "request version against %s", u.String()) + } + defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body") + + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, errors.Errorf("failed to read body") + } + + if resp.StatusCode != 200 { + return nil, errors.Errorf("got non-200 response code: %v, response: %v", resp.StatusCode, string(b)) + } + + return parseVersion(b) +} + // Scalar response consists of array with mixed types so it needs to be // unmarshaled separately. func convertScalarJSONToVector(scalarJSONResult json.RawMessage) (model.Vector, error) { @@ -362,3 +395,24 @@ func convertScalarJSONToVector(scalarJSONResult json.RawMessage) (model.Vector, Value: resultValue, Timestamp: resultTime}}, nil } + +// parseVersion converts string to version.Version. +func parseVersion(data []byte) (*version.Version, error) { + var m struct { + Version string `json:"version"` + } + if err := json.Unmarshal(data, &m); err != nil { + return nil, errors.Wrapf(err, "unmarshal response: %v", string(data)) + } + + if strings.TrimSpace(m.Version) == "" { + return nil, nil + } + + ver, err := version.NewVersion(m.Version) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse version %s", m.Version) + } + + return ver, nil +} \ No newline at end of file diff --git a/pkg/promclient/promclient_e2e_test.go b/pkg/promclient/promclient_e2e_test.go index 1cb7c968af..0f5bb0b0ab 100644 --- a/pkg/promclient/promclient_e2e_test.go +++ b/pkg/promclient/promclient_e2e_test.go @@ -3,6 +3,7 @@ package promclient import ( "context" "fmt" + "github.com/hashicorp/go-version" "io/ioutil" "net/url" "os" @@ -142,3 +143,36 @@ func TestRule_UnmarshalScalarResponse(t *testing.T) { vectorResult, err = convertScalarJSONToVector(invalidDataScalarJSONResult) testutil.NotOk(t, err) } + +func TestParseVersion(t *testing.T) { + promVersions := map[string]string{ + "": promVersionResp(""), + "2.2.0": promVersionResp("2.2.0"), + "2.3.0": promVersionResp("2.3.0"), + "2.3.0-rc.0": promVersionResp("2.3.0-rc.0"), + } + + promMalformedVersions := map[string]string{ + "foo": promVersionResp("foo"), + "bar": promVersionResp("bar"), + } + + for v, resp := range promVersions { + gotVersion, err := parseVersion([]byte(resp)) + testutil.Ok(t, err) + expectVersion, _ := version.NewVersion(v) + testutil.Equals(t, gotVersion, expectVersion) + } + + for v, resp := range promMalformedVersions { + gotVersion, err := parseVersion([]byte(resp)) + testutil.NotOk(t, err) + expectVersion, _ := version.NewVersion(v) + testutil.Equals(t, gotVersion, expectVersion) + } +} + +// promVersionResp returns the response of Prometheus /version endpoint. +func promVersionResp(ver string) string { + return fmt.Sprintf(`{"version":"%s","revision":"","branch":"","buildUser":"","buildDate":"","goVersion":""}`, ver) +} \ No newline at end of file diff --git a/test/e2e/spinup_test.go b/test/e2e/spinup_test.go index 592c1e7e69..0e8dbff507 100644 --- a/test/e2e/spinup_test.go +++ b/test/e2e/spinup_test.go @@ -78,6 +78,7 @@ func scraper(i int, config string) cmdScheduleFunc { cmds = append(cmds, exec.Command(testutil.PrometheusBinary(), "--config.file", promDir+"/prometheus.yml", "--storage.tsdb.path", promDir, + "--storage.tsdb.max-block-duration", "2h", "--log.level", "info", "--web.listen-address", promHTTP(i), ))