diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 1e31b54d15..390ec14cc1 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -11,6 +11,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/hashicorp/go-version" "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/cluster" "github.com/improbable-eng/thanos/pkg/component" @@ -25,6 +26,7 @@ import ( opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/prometheus/tsdb/labels" "google.golang.org/grpc" kingpin "gopkg.in/alecthomas/kingpin.v2" @@ -111,6 +113,17 @@ func runSidecar( maxt: math.MaxInt64, } + confContentYaml, err := objStoreConfig.Content() + if err != nil { + return errors.Wrap(err, "getting object store config") + } + + var uploads = true + if len(confContentYaml) == 0 { + level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled") + uploads = false + } + // Setup all the concurrent groups. { promUp := prometheus.NewGauge(prometheus.GaugeOpts{ @@ -125,6 +138,29 @@ func runSidecar( ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { + // Only check Prometheus's flags when upload is enabled. + if uploads { + // Retry infinitely until we get Prometheus version. + if err := runutil.Retry(2*time.Second, ctx.Done(), func() error { + if m.version, err = promclient.PromVersion(logger, m.promURL); err != nil { + level.Warn(logger).Log( + "msg", "failed to get Prometheus version. Is Prometheus running? Retrying", + "err", err, + ) + return errors.Wrapf(err, "fetch Prometheus version") + } + + return nil + }); err != nil { + return errors.Wrap(err, "fetch Prometheus version") + } + + // Check prometheus's flags to ensure sane sidecar flags. + if err := validatePrometheus(ctx, logger, m); err != nil { + return errors.Wrap(err, "validate Prometheus flags") + } + } + // Blocking query of external labels before joining as a Source Peer into gossip. // We retry infinitely until we reach and fetch labels from our Prometheus. err := runutil.Retry(2*time.Second, ctx.Done(), func() error { @@ -229,17 +265,6 @@ func runSidecar( }) } - confContentYaml, err := objStoreConfig.Content() - if err != nil { - return err - } - - var uploads = true - if len(confContentYaml) == 0 { - level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled") - uploads = false - } - if uploads { // The background shipper continuously scans the data directory and uploads // new blocks to Google Cloud Storage or an S3-compatible storage service. @@ -265,10 +290,7 @@ func runSidecar( var s *shipper.Shipper if uploadCompacted { - s, err = shipper.NewWithCompacted(ctx, logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource, m.promURL) - if err != nil { - return errors.Wrap(err, "create shipper") - } + s = shipper.NewWithCompacted(logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource) } else { s = shipper.New(logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource) } @@ -298,13 +320,45 @@ func runSidecar( return nil } +func validatePrometheus(ctx context.Context, logger log.Logger, m *promMetadata) error { + if m.version == nil { + level.Warn(logger).Log("msg", "fetched version is nil or invalid. Unable to know whether Prometheus supports /version endpoint, skip validation") + return nil + } + + if m.version.LessThan(promclient.FlagsVersion) { + level.Warn(logger).Log("msg", + "Prometheus doesn't support flags endpoint, skip validation", "version", m.version.Original()) + return nil + } + + flags, err := promclient.ConfiguredFlags(ctx, logger, m.promURL) + if err != nil { + return errors.Wrap(err, "failed to check flags") + } + + // Check if compaction is disabled. + if flags.TSDBMinTime != flags.TSDBMaxTime { + return errors.Errorf("found that TSDB Max time is %s and Min time is %s. "+ + "Compaction needs to be disabled (storage.tsdb.min-block-duration = storage.tsdb.max-block-duration)", flags.TSDBMaxTime, flags.TSDBMinTime) + } + + // Check if block time is 2h. + if flags.TSDBMinTime != model.Duration(2*time.Hour) { + level.Warn(logger).Log("msg", "found that TSDB block time is not 2h. Only 2h block time is recommended.", "block-time", flags.TSDBMinTime) + } + + return nil +} + type promMetadata struct { promURL *url.URL - mtx sync.Mutex - mint int64 - maxt int64 - labels labels.Labels + mtx sync.Mutex + mint int64 + maxt int64 + labels labels.Labels + version *version.Version } func (s *promMetadata) UpdateLabels(ctx context.Context, logger log.Logger) error { diff --git a/docs/components/sidecar.md b/docs/components/sidecar.md index d76a1a484d..58ae7d4628 100644 --- a/docs/components/sidecar.md +++ b/docs/components/sidecar.md @@ -14,7 +14,8 @@ Prometheus servers connected to the Thanos cluster via the sidecar are subject t * The minimum Prometheus version is 2.2.1 * The `external_labels` section of the configuration implements is in line with the desired label scheme (will be used by query layer to filter out store APIs to query). * The `--web.enable-lifecycle` flag is enabled if you want to use `reload.*` flags. -* The `--storage.tsdb.min-block-duration` and `--storage.tsdb.max-block-duration` must be set to equal values to disable local compaction. The default of `2h` is recommended. +* The `--storage.tsdb.min-block-duration` and `--storage.tsdb.max-block-duration` must be set to equal values to disable local compaction on order to use Thanos sidecar upload. Leave local compaction on if sidecar just exposes StoreAPI and your retention is normal. The default of `2h` is recommended. + Mentioned parameters set to equal values disable the internal Prometheus compaction, which is needed to avoid the uploaded data corruption when thanos compactor does its job, this is critical for data consistency and should not be ignored if you plan to use Thanos compactor. Even though you set mentioned parameters equal, you might observe Prometheus internal metric `prometheus_tsdb_compactions_total` being incremented, don't be confused by that: Prometheus writes initial head block to filestem via internal compaction mechanis, but if you followed recommendations - data won't be modified by Prometheus before sidecar uploads it. Thanos sidecar will also check sanity of the flags set to Prometheus on the startup and log errors or warning if they have been configured improperly (#838). The retention is recommended to not be lower than three times the block duration. This achieves resilience in the face of connectivity issues to the object storage since all local data will remain available within the Thanos cluster. If connectivity gets restored the backlog of blocks gets uploaded to the object storage. diff --git a/go.mod b/go.mod index fd9870537c..5dd814f200 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20181025070259-68e3a13e4117 github.com/hashicorp/go-sockaddr v0.0.0-20180320115054-6d291a969b86 + github.com/hashicorp/go-version v1.1.0 github.com/hashicorp/golang-lru v0.5.1 github.com/hashicorp/memberlist v0.1.0 github.com/julienschmidt/httprouter v1.1.0 // indirect diff --git a/go.sum b/go.sum index 49a5f99a21..0f85bf4b87 100644 --- a/go.sum +++ b/go.sum @@ -59,6 +59,7 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME github.com/go-ini/ini v1.21.1/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-kit/kit v0.8.0 h1:Wz+5lgoB0kkuqLEc6NVmwRknTKP6dTGbSqvhZtBI/j0= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0 h1:8HUsc87TaSWLKwrnumgC8/YconD2fJQsRJAsWaPg2ic= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0 h1:MP4Eh7ZCb31lleYCFuwm0oe4/YGak+5l1vA2NOE80nA= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= @@ -122,6 +123,8 @@ github.com/hashicorp/go-sockaddr v0.0.0-20180320115054-6d291a969b86 h1:7YOlAIO2Y github.com/hashicorp/go-sockaddr v0.0.0-20180320115054-6d291a969b86/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-version v1.1.0 h1:bPIoEKD27tNdebFGGxxYwcL4nepeY4j1QP23PFRGzg0= +github.com/hashicorp/go-version v1.1.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index 1622a16340..efc35bbafe 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -23,6 +23,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/hashicorp/go-version" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/store/storepb" "github.com/improbable-eng/thanos/pkg/tracing" @@ -35,6 +36,8 @@ import ( yaml "gopkg.in/yaml.v2" ) +var FlagsVersion = version.Must(version.NewVersion("2.2.0")) + // IsWALFileAccesible returns no error if WAL dir can be found. This helps to tell // if we have access to Prometheus TSDB directory. func IsWALDirAccesible(dir string) error { @@ -70,7 +73,7 @@ func ExternalLabels(ctx context.Context, logger log.Logger, base *url.URL) (labe b, err := ioutil.ReadAll(resp.Body) if err != nil { - return nil, errors.Errorf("failed to read body") + return nil, errors.New("failed to read body") } if resp.StatusCode != 200 { @@ -186,7 +189,7 @@ func ConfiguredFlags(ctx context.Context, logger log.Logger, base *url.URL) (Fla b, err := ioutil.ReadAll(resp.Body) if err != nil { - return Flags{}, errors.Errorf("failed to read body") + return Flags{}, errors.New("failed to read body") } if resp.StatusCode != 200 { @@ -230,7 +233,7 @@ func Snapshot(ctx context.Context, logger log.Logger, base *url.URL, skipHead bo b, err := ioutil.ReadAll(resp.Body) if err != nil { - return "", errors.Errorf("failed to read body") + return "", errors.New("failed to read body") } if resp.StatusCode != 200 { @@ -376,6 +379,32 @@ func PromqlQueryInstant(ctx context.Context, logger log.Logger, base *url.URL, q return vec, warnings, nil } +// PromVersion will return the version of Prometheus by querying /version Prometheus endpoint. +func PromVersion(logger log.Logger, base *url.URL) (*version.Version, error) { + if logger == nil { + logger = log.NewNopLogger() + } + + u := *base + u.Path = path.Join(u.Path, "/version") + resp, err := http.Get(u.String()) + if err != nil { + return nil, errors.Wrapf(err, "request version against %s", u.String()) + } + defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body") + + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, errors.New("failed to read body") + } + + if resp.StatusCode != 200 { + return nil, errors.Errorf("got non-200 response code: %v, response: %v", resp.StatusCode, string(b)) + } + + return parseVersion(b) +} + // Scalar response consists of array with mixed types so it needs to be // unmarshaled separately. func convertScalarJSONToVector(scalarJSONResult json.RawMessage) (model.Vector, error) { @@ -478,3 +507,25 @@ func MetricValues(ctx context.Context, logger log.Logger, base *url.URL, perMetr } } } + +// parseVersion converts string to version.Version. +func parseVersion(data []byte) (*version.Version, error) { + var m struct { + Version string `json:"version"` + } + if err := json.Unmarshal(data, &m); err != nil { + return nil, errors.Wrapf(err, "unmarshal response: %v", string(data)) + } + + // Prometheus is built with nil version. + if strings.TrimSpace(m.Version) == "" { + return nil, nil + } + + ver, err := version.NewVersion(m.Version) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse version %s", m.Version) + } + + return ver, nil +} diff --git a/pkg/promclient/promclient_e2e_test.go b/pkg/promclient/promclient_e2e_test.go index 9f30996dde..c2b35fdca6 100644 --- a/pkg/promclient/promclient_e2e_test.go +++ b/pkg/promclient/promclient_e2e_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/go-kit/kit/log" + "github.com/hashicorp/go-version" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/testutil" "github.com/oklog/ulid" @@ -147,3 +148,36 @@ func TestRule_UnmarshalScalarResponse(t *testing.T) { vectorResult, err = convertScalarJSONToVector(invalidDataScalarJSONResult) testutil.NotOk(t, err) } + +func TestParseVersion(t *testing.T) { + promVersions := map[string]string{ + "": promVersionResp(""), + "2.2.0": promVersionResp("2.2.0"), + "2.3.0": promVersionResp("2.3.0"), + "2.3.0-rc.0": promVersionResp("2.3.0-rc.0"), + } + + promMalformedVersions := map[string]string{ + "foo": promVersionResp("foo"), + "bar": promVersionResp("bar"), + } + + for v, resp := range promVersions { + gotVersion, err := parseVersion([]byte(resp)) + testutil.Ok(t, err) + expectVersion, _ := version.NewVersion(v) + testutil.Equals(t, gotVersion, expectVersion) + } + + for v, resp := range promMalformedVersions { + gotVersion, err := parseVersion([]byte(resp)) + testutil.NotOk(t, err) + expectVersion, _ := version.NewVersion(v) + testutil.Equals(t, gotVersion, expectVersion) + } +} + +// promVersionResp returns the response of Prometheus /version endpoint. +func promVersionResp(ver string) string { + return fmt.Sprintf(`{"version":"%s","revision":"","branch":"","buildUser":"","buildDate":"","goVersion":""}`, ver) +} diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index c6f526f593..b4758e1749 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -7,24 +7,20 @@ import ( "encoding/json" "io/ioutil" "math" - "net/url" "os" "path" "path/filepath" "sort" - "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/block/metadata" "github.com/improbable-eng/thanos/pkg/objstore" - "github.com/improbable-eng/thanos/pkg/promclient" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" @@ -120,15 +116,13 @@ func New( // to remote if necessary, including compacted blocks which are already in filesystem. // It attaches the Thanos metadata section in each meta JSON file. func NewWithCompacted( - ctx context.Context, logger log.Logger, r prometheus.Registerer, dir string, bucket objstore.Bucket, lbls func() labels.Labels, source metadata.SourceType, - prometheusURL *url.URL, -) (*Shipper, error) { +) *Shipper { if logger == nil { logger = log.NewNopLogger() } @@ -136,25 +130,6 @@ func NewWithCompacted( lbls = func() labels.Labels { return nil } } - ctx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - - var flags promclient.Flags - if err := runutil.Retry(1*time.Second, ctx.Done(), func() (err error) { - flags, err = promclient.ConfiguredFlags(ctx, logger, prometheusURL) - if err != nil { - return errors.Wrap(err, "configured flags; failed to check if compaction is disabled") - } - return nil - }); err != nil { - return nil, err - } - - if flags.TSDBMinTime != model.Duration(2*time.Hour) || flags.TSDBMaxTime != model.Duration(2*time.Hour) { - return nil, errors.Errorf("Found that TSDB Max time is %s and Min time is %s. To use shipper with upload compacted option, "+ - "compaction needs to be disabled (storage.tsdb.min-block-duration = storage.tsdb.max-block-duration = 2h", flags.TSDBMinTime, flags.TSDBMaxTime) - } - return &Shipper{ logger: logger, dir: dir, @@ -163,7 +138,7 @@ func NewWithCompacted( metrics: newMetrics(r, true), source: source, uploadCompacted: true, - }, nil + } } // Timestamps returns the minimum timestamp for which data is available and the highest timestamp diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index 8ddde0e771..62c6b65d02 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -6,7 +6,6 @@ import ( "encoding/json" "io/ioutil" "math/rand" - "net/url" "os" "path" "path/filepath" @@ -191,11 +190,7 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) { defer upcancel() testutil.Ok(t, p.WaitPrometheusUp(upctx)) - addr, err := url.Parse(p.Addr()) - testutil.Ok(t, err) - - shipper, err := NewWithCompacted(ctx, log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, addr) - testutil.NotOk(t, err) // Compaction not disabled! + shipper := NewWithCompacted(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource) p.DisableCompaction() testutil.Ok(t, p.Restart()) @@ -204,11 +199,7 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) { defer upcancel2() testutil.Ok(t, p.WaitPrometheusUp(upctx2)) - addr, err = url.Parse("http://" + p.Addr()) - testutil.Ok(t, err) - - shipper, err = NewWithCompacted(ctx, log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource, addr) - testutil.Ok(t, err) + shipper = NewWithCompacted(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, metadata.TestSource) // Create 10 new blocks. 9 of them (non compacted) should be actually uploaded. var ( diff --git a/test/e2e/spinup_test.go b/test/e2e/spinup_test.go index 7ecd517877..c45fb0be24 100644 --- a/test/e2e/spinup_test.go +++ b/test/e2e/spinup_test.go @@ -108,6 +108,7 @@ func scraper(i int, config string) cmdScheduleFunc { cmds = append(cmds, newCmdExec(exec.Command(testutil.PrometheusBinary(), "--config.file", promDir+"/prometheus.yml", "--storage.tsdb.path", promDir, + "--storage.tsdb.max-block-duration", "2h", "--log.level", "info", "--web.listen-address", promHTTP(i), )))