Skip to content

Commit

Permalink
add compare flags func to compare flags between prometheus and sidecar (
Browse files Browse the repository at this point in the history
#838)

Original message:

* update documentation for a max/min block duration

add compare flags func to compare flags between prom and sidecar

* fix some nits


Functional change: now we check the configured flags (if possible) and error out if MinTime != MaxTime. We need to check this always since if that is not true then we will get overlapping blocks. Additionally, an error message is printed out if it is not equal to 2h (the recommended value).
  • Loading branch information
yeya24 authored and GiedriusS committed Apr 24, 2019
1 parent a18ca1e commit eddaf11
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 61 deletions.
92 changes: 73 additions & 19 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/hashicorp/go-version"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/component"
Expand All @@ -25,6 +26,7 @@ import (
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/tsdb/labels"
"google.golang.org/grpc"
kingpin "gopkg.in/alecthomas/kingpin.v2"
Expand Down Expand Up @@ -111,6 +113,17 @@ func runSidecar(
maxt: math.MaxInt64,
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return errors.Wrap(err, "getting object store config")
}

var uploads = true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled")
uploads = false
}

// Setup all the concurrent groups.
{
promUp := prometheus.NewGauge(prometheus.GaugeOpts{
Expand All @@ -125,6 +138,29 @@ func runSidecar(

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
// Only check Prometheus's flags when upload is enabled.
if uploads {
// Retry infinitely until we get Prometheus version.
if err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
if m.version, err = promclient.PromVersion(logger, m.promURL); err != nil {
level.Warn(logger).Log(
"msg", "failed to get Prometheus version. Is Prometheus running? Retrying",
"err", err,
)
return errors.Wrapf(err, "fetch Prometheus version")
}

return nil
}); err != nil {
return errors.Wrap(err, "fetch Prometheus version")
}

// Check prometheus's flags to ensure sane sidecar flags.
if err := validatePrometheus(ctx, logger, m); err != nil {
return errors.Wrap(err, "validate Prometheus flags")
}
}

// Blocking query of external labels before joining as a Source Peer into gossip.
// We retry infinitely until we reach and fetch labels from our Prometheus.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
Expand Down Expand Up @@ -229,17 +265,6 @@ func runSidecar(
})
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

var uploads = true
if len(confContentYaml) == 0 {
level.Info(logger).Log("msg", "No supported bucket was configured, uploads will be disabled")
uploads = false
}

if uploads {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
Expand All @@ -265,10 +290,7 @@ func runSidecar(

var s *shipper.Shipper
if uploadCompacted {
s, err = shipper.NewWithCompacted(ctx, logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource, m.promURL)
if err != nil {
return errors.Wrap(err, "create shipper")
}
s = shipper.NewWithCompacted(logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource)
} else {
s = shipper.New(logger, reg, dataDir, bkt, m.Labels, metadata.SidecarSource)
}
Expand Down Expand Up @@ -298,13 +320,45 @@ func runSidecar(
return nil
}

func validatePrometheus(ctx context.Context, logger log.Logger, m *promMetadata) error {
if m.version == nil {
level.Warn(logger).Log("msg", "fetched version is nil or invalid. Unable to know whether Prometheus supports /version endpoint, skip validation")
return nil
}

if m.version.LessThan(promclient.FlagsVersion) {
level.Warn(logger).Log("msg",
"Prometheus doesn't support flags endpoint, skip validation", "version", m.version.Original())
return nil
}

flags, err := promclient.ConfiguredFlags(ctx, logger, m.promURL)
if err != nil {
return errors.Wrap(err, "failed to check flags")
}

// Check if compaction is disabled.
if flags.TSDBMinTime != flags.TSDBMaxTime {
return errors.Errorf("found that TSDB Max time is %s and Min time is %s. "+
"Compaction needs to be disabled (storage.tsdb.min-block-duration = storage.tsdb.max-block-duration)", flags.TSDBMaxTime, flags.TSDBMinTime)
}

// Check if block time is 2h.
if flags.TSDBMinTime != model.Duration(2*time.Hour) {
level.Warn(logger).Log("msg", "found that TSDB block time is not 2h. Only 2h block time is recommended.", "block-time", flags.TSDBMinTime)
}

return nil
}

type promMetadata struct {
promURL *url.URL

mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
mtx sync.Mutex
mint int64
maxt int64
labels labels.Labels
version *version.Version
}

func (s *promMetadata) UpdateLabels(ctx context.Context, logger log.Logger) error {
Expand Down
3 changes: 2 additions & 1 deletion docs/components/sidecar.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ Prometheus servers connected to the Thanos cluster via the sidecar are subject t
* The minimum Prometheus version is 2.2.1
* The `external_labels` section of the configuration implements is in line with the desired label scheme (will be used by query layer to filter out store APIs to query).
* The `--web.enable-lifecycle` flag is enabled if you want to use `reload.*` flags.
* The `--storage.tsdb.min-block-duration` and `--storage.tsdb.max-block-duration` must be set to equal values to disable local compaction. The default of `2h` is recommended.
* The `--storage.tsdb.min-block-duration` and `--storage.tsdb.max-block-duration` must be set to equal values to disable local compaction on order to use Thanos sidecar upload. Leave local compaction on if sidecar just exposes StoreAPI and your retention is normal. The default of `2h` is recommended.
Mentioned parameters set to equal values disable the internal Prometheus compaction, which is needed to avoid the uploaded data corruption when thanos compactor does its job, this is critical for data consistency and should not be ignored if you plan to use Thanos compactor. Even though you set mentioned parameters equal, you might observe Prometheus internal metric `prometheus_tsdb_compactions_total` being incremented, don't be confused by that: Prometheus writes initial head block to filestem via internal compaction mechanis, but if you followed recommendations - data won't be modified by Prometheus before sidecar uploads it. Thanos sidecar will also check sanity of the flags set to Prometheus on the startup and log errors or warning if they have been configured improperly (#838).

The retention is recommended to not be lower than three times the block duration. This achieves resilience in the face of connectivity issues
to the object storage since all local data will remain available within the Thanos cluster. If connectivity gets restored the backlog of blocks gets uploaded to the object storage.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v0.0.0-20181025070259-68e3a13e4117
github.com/hashicorp/go-sockaddr v0.0.0-20180320115054-6d291a969b86
github.com/hashicorp/go-version v1.1.0
github.com/hashicorp/golang-lru v0.5.1
github.com/hashicorp/memberlist v0.1.0
github.com/julienschmidt/httprouter v1.1.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME
github.com/go-ini/ini v1.21.1/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-kit/kit v0.8.0 h1:Wz+5lgoB0kkuqLEc6NVmwRknTKP6dTGbSqvhZtBI/j0=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0 h1:8HUsc87TaSWLKwrnumgC8/YconD2fJQsRJAsWaPg2ic=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0 h1:MP4Eh7ZCb31lleYCFuwm0oe4/YGak+5l1vA2NOE80nA=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
Expand Down Expand Up @@ -122,6 +123,8 @@ github.com/hashicorp/go-sockaddr v0.0.0-20180320115054-6d291a969b86 h1:7YOlAIO2Y
github.com/hashicorp/go-sockaddr v0.0.0-20180320115054-6d291a969b86/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-version v1.1.0 h1:bPIoEKD27tNdebFGGxxYwcL4nepeY4j1QP23PFRGzg0=
github.com/hashicorp/go-version v1.1.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
Expand Down
57 changes: 54 additions & 3 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/hashicorp/go-version"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/improbable-eng/thanos/pkg/tracing"
Expand All @@ -35,6 +36,8 @@ import (
yaml "gopkg.in/yaml.v2"
)

var FlagsVersion = version.Must(version.NewVersion("2.2.0"))

// IsWALFileAccesible returns no error if WAL dir can be found. This helps to tell
// if we have access to Prometheus TSDB directory.
func IsWALDirAccesible(dir string) error {
Expand Down Expand Up @@ -70,7 +73,7 @@ func ExternalLabels(ctx context.Context, logger log.Logger, base *url.URL) (labe

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Errorf("failed to read body")
return nil, errors.New("failed to read body")
}

if resp.StatusCode != 200 {
Expand Down Expand Up @@ -186,7 +189,7 @@ func ConfiguredFlags(ctx context.Context, logger log.Logger, base *url.URL) (Fla

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return Flags{}, errors.Errorf("failed to read body")
return Flags{}, errors.New("failed to read body")
}

if resp.StatusCode != 200 {
Expand Down Expand Up @@ -230,7 +233,7 @@ func Snapshot(ctx context.Context, logger log.Logger, base *url.URL, skipHead bo

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", errors.Errorf("failed to read body")
return "", errors.New("failed to read body")
}

if resp.StatusCode != 200 {
Expand Down Expand Up @@ -376,6 +379,32 @@ func PromqlQueryInstant(ctx context.Context, logger log.Logger, base *url.URL, q
return vec, warnings, nil
}

// PromVersion will return the version of Prometheus by querying /version Prometheus endpoint.
func PromVersion(logger log.Logger, base *url.URL) (*version.Version, error) {
if logger == nil {
logger = log.NewNopLogger()
}

u := *base
u.Path = path.Join(u.Path, "/version")
resp, err := http.Get(u.String())
if err != nil {
return nil, errors.Wrapf(err, "request version against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.New("failed to read body")
}

if resp.StatusCode != 200 {
return nil, errors.Errorf("got non-200 response code: %v, response: %v", resp.StatusCode, string(b))
}

return parseVersion(b)
}

// Scalar response consists of array with mixed types so it needs to be
// unmarshaled separately.
func convertScalarJSONToVector(scalarJSONResult json.RawMessage) (model.Vector, error) {
Expand Down Expand Up @@ -478,3 +507,25 @@ func MetricValues(ctx context.Context, logger log.Logger, base *url.URL, perMetr
}
}
}

// parseVersion converts string to version.Version.
func parseVersion(data []byte) (*version.Version, error) {
var m struct {
Version string `json:"version"`
}
if err := json.Unmarshal(data, &m); err != nil {
return nil, errors.Wrapf(err, "unmarshal response: %v", string(data))
}

// Prometheus is built with nil version.
if strings.TrimSpace(m.Version) == "" {
return nil, nil
}

ver, err := version.NewVersion(m.Version)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse version %s", m.Version)
}

return ver, nil
}
34 changes: 34 additions & 0 deletions pkg/promclient/promclient_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/go-kit/kit/log"
"github.com/hashicorp/go-version"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/oklog/ulid"
Expand Down Expand Up @@ -147,3 +148,36 @@ func TestRule_UnmarshalScalarResponse(t *testing.T) {
vectorResult, err = convertScalarJSONToVector(invalidDataScalarJSONResult)
testutil.NotOk(t, err)
}

func TestParseVersion(t *testing.T) {
promVersions := map[string]string{
"": promVersionResp(""),
"2.2.0": promVersionResp("2.2.0"),
"2.3.0": promVersionResp("2.3.0"),
"2.3.0-rc.0": promVersionResp("2.3.0-rc.0"),
}

promMalformedVersions := map[string]string{
"foo": promVersionResp("foo"),
"bar": promVersionResp("bar"),
}

for v, resp := range promVersions {
gotVersion, err := parseVersion([]byte(resp))
testutil.Ok(t, err)
expectVersion, _ := version.NewVersion(v)
testutil.Equals(t, gotVersion, expectVersion)
}

for v, resp := range promMalformedVersions {
gotVersion, err := parseVersion([]byte(resp))
testutil.NotOk(t, err)
expectVersion, _ := version.NewVersion(v)
testutil.Equals(t, gotVersion, expectVersion)
}
}

// promVersionResp returns the response of Prometheus /version endpoint.
func promVersionResp(ver string) string {
return fmt.Sprintf(`{"version":"%s","revision":"","branch":"","buildUser":"","buildDate":"","goVersion":""}`, ver)
}
Loading

0 comments on commit eddaf11

Please sign in to comment.