Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgraded Prometheus and TSDB deps. #704

Merged
merged 1 commit into from
Jan 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ ignored = ["github.com/improbable-eng/thanos/benchmark/*"]
name = "github.com/prometheus/common"

[[constraint]]
version = "v2.3.2"
# TODO(bwplotka): Move to released version once our recent fixes will be released (v2.7.0)
revision = "3bd41cc92c7800cc6072171bd4237406126fa169"
name = "github.com/prometheus/prometheus"

[[override]]
Expand All @@ -46,7 +47,7 @@ ignored = ["github.com/improbable-eng/thanos/benchmark/*"]

[[constraint]]
name = "github.com/prometheus/tsdb"
revision = "bd832fc8274e8fe63999ac749daaaff9d881241f"
version = "v0.4.0"

[[constraint]]
branch = "master"
Expand Down
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ PROTOC_VERSION ?= 3.4.0

# E2e test deps.
# Referenced by github.com/improbable-eng/thanos/blob/master/docs/getting_started.md#prometheus
SUPPORTED_PROM_VERSIONS ?=v2.0.0 v2.2.1 v2.3.2 v2.4.3 v2.5.0

# Limitied prom version, because testing was not possibe. This should fix it: https://github.com/improbable-eng/thanos/issues/758
SUPPORTED_PROM_VERSIONS ?=v2.4.3 v2.5.0
ALERTMANAGER_VERSION ?=v0.15.2
MINIO_SERVER_VERSION ?=RELEASE.2018-10-06T00-15-16Z

Expand Down
10 changes: 5 additions & 5 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import (
"text/template"
"time"

"github.com/prometheus/tsdb/labels"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/verifier"
Expand All @@ -23,6 +22,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/labels"
"golang.org/x/text/language"
"golang.org/x/text/message"
"gopkg.in/alecthomas/kingpin.v2"
Expand Down Expand Up @@ -254,7 +254,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
defer cancel()

// Getting Metas.
var blockMetas []*block.Meta
var blockMetas []*metadata.Meta
if err = bkt.Iter(ctx, "", func(name string) error {
id, ok := block.IsBlockDir(name)
if !ok {
Expand All @@ -277,7 +277,7 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
}
}

func printTable(blockMetas []*block.Meta, selectorLabels labels.Labels, sortBy []string) error {
func printTable(blockMetas []*metadata.Meta, selectorLabels labels.Labels, sortBy []string) error {
header := inspectColumns

var lines [][]string
Expand Down Expand Up @@ -355,7 +355,7 @@ func getKeysAlphabetically(labels map[string]string) []string {

// matchesSelector checks if blockMeta contains every label from
// the selector with the correct value
func matchesSelector(blockMeta *block.Meta, selectorLabels labels.Labels) bool {
func matchesSelector(blockMeta *metadata.Meta, selectorLabels labels.Labels) bool {
for _, l := range selectorLabels {
if v, ok := blockMeta.Thanos.Labels[l.Name]; !ok || v != l.Value {
return false
Expand Down
12 changes: 6 additions & 6 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (
"path/filepath"
"time"

"github.com/prometheus/tsdb/chunkenc"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/client"
Expand All @@ -23,6 +22,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/chunkenc"
"gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -105,7 +105,7 @@ func downsampleBucket(
if err := os.MkdirAll(dir, 0777); err != nil {
return errors.Wrap(err, "create dir")
}
var metas []*block.Meta
var metas []*metadata.Meta

err := bkt.Iter(ctx, "", func(name string) error {
id, ok := block.IsBlockDir(name)
Expand All @@ -119,7 +119,7 @@ func downsampleBucket(
}
defer runutil.CloseWithLogOnErr(logger, rc, "block reader")

var m block.Meta
var m metadata.Meta
if err := json.NewDecoder(rc).Decode(&m); err != nil {
return errors.Wrap(err, "decode meta")
}
Expand Down Expand Up @@ -201,7 +201,7 @@ func downsampleBucket(
return nil
}

func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *block.Meta, dir string, resolution int64) error {
func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bucket, m *metadata.Meta, dir string, resolution int64) error {
begin := time.Now()
bdir := filepath.Join(dir, m.ULID.String())

Expand All @@ -224,7 +224,7 @@ func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bu
pool = downsample.NewPool()
}

b, err := tsdb.OpenBlock(bdir, pool)
b, err := tsdb.OpenBlock(logger, bdir, pool)
if err != nil {
return errors.Wrapf(err, "open block %s", m.ULID)
}
Expand Down
11 changes: 10 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,16 @@ func runQuery(
return stores.Get(), nil
}, selectorLset)
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel)
engine = promql.NewEngine(logger, reg, maxConcurrentQueries, queryTimeout)
engine = promql.NewEngine(
promql.EngineOpts{
Logger: logger,
Reg: reg,
MaxConcurrent: maxConcurrentQueries,
// TODO(bwplotka): Expose this as a flag: https://github.com/improbable-eng/thanos/issues/703
MaxSamples: math.MaxInt32,
Timeout: queryTimeout,
},
)
)
// Periodically update the store set with the addresses we see in our cluster.
{
Expand Down
17 changes: 8 additions & 9 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ import (
"syscall"
"time"

"github.com/improbable-eng/thanos/pkg/extprom"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/alert"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/discovery/cache"
"github.com/improbable-eng/thanos/pkg/discovery/dns"
"github.com/improbable-eng/thanos/pkg/extprom"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/shipper"
Expand Down Expand Up @@ -117,7 +116,6 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)
MaxBlockDuration: *tsdbBlockDuration,
Retention: *tsdbRetention,
NoLockfile: true,
WALFlushInterval: 30 * time.Second,
}

lookupQueries := map[string]struct{}{}
Expand Down Expand Up @@ -290,7 +288,7 @@ func runRule(
ctx, cancel := context.WithCancel(context.Background())
ctx = tracing.ContextWithTracer(ctx, tracer)

notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) error {
notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
res := make([]*alert.Alert, 0, len(alerts))
for _, alrt := range alerts {
// Only send actually firing alerts.
Expand All @@ -309,17 +307,18 @@ func runRule(
res = append(res, a)
}
alertQ.Push(res)

return nil
}

st := tsdb.Adapter(db, 0)
mgr = rules.NewManager(&rules.ManagerOptions{
Context: ctx,
QueryFunc: queryFn,
NotifyFunc: notify,
Logger: log.With(logger, "component", "rules"),
Appendable: tsdb.Adapter(db, 0),
Appendable: st,
Registerer: reg,
ExternalURL: nil,
TSDB: st,
})
g.Add(func() error {
mgr.Run()
Expand Down Expand Up @@ -579,7 +578,7 @@ func runRule(
}
}()

s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset }, block.RulerSource)
s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset }, metadata.RulerSource)

ctx, cancel := context.WithCancel(context.Background())

Expand Down
36 changes: 18 additions & 18 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/reloader"
Expand Down Expand Up @@ -102,7 +102,7 @@ func runSidecar(
reloader *reloader.Reloader,
component string,
) error {
var metadata = &metadata{
var m = &promMetadata{
promURL: promURL,

// Start out with the full time range. The shipper will constrain it later.
Expand All @@ -128,7 +128,7 @@ func runSidecar(
// Blocking query of external labels before joining as a Source Peer into gossip.
// We retry infinitely until we reach and fetch labels from our Prometheus.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
if err := metadata.UpdateLabels(ctx, logger); err != nil {
if err := m.UpdateLabels(ctx, logger); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
"err", err,
Expand All @@ -145,14 +145,14 @@ func runSidecar(
return errors.Wrap(err, "initial external labels query")
}

if len(metadata.Labels()) == 0 {
if len(m.Labels()) == 0 {
return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured")
}

// New gossip cluster.
mint, maxt := metadata.Timestamps()
mint, maxt := m.Timestamps()
if err = peer.Join(cluster.PeerTypeSource, cluster.PeerMetadata{
Labels: metadata.LabelsPB(),
Labels: m.LabelsPB(),
MinTime: mint,
MaxTime: maxt,
}); err != nil {
Expand All @@ -165,12 +165,12 @@ func runSidecar(
iterCtx, iterCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer iterCancel()

if err := metadata.UpdateLabels(iterCtx, logger); err != nil {
if err := m.UpdateLabels(iterCtx, logger); err != nil {
level.Warn(logger).Log("msg", "heartbeat failed", "err", err)
promUp.Set(0)
} else {
// Update gossip.
peer.SetLabels(metadata.LabelsPB())
peer.SetLabels(m.LabelsPB())

promUp.Set(1)
lastHeartbeat.Set(float64(time.Now().UnixNano()) / 1e9)
Expand Down Expand Up @@ -204,7 +204,7 @@ func runSidecar(
var client http.Client

promStore, err := store.NewPrometheusStore(
logger, &client, promURL, metadata.Labels, metadata.Timestamps)
logger, &client, promURL, m.Labels, m.Timestamps)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down Expand Up @@ -252,7 +252,7 @@ func runSidecar(
}
}()

s := shipper.New(logger, nil, dataDir, bkt, metadata.Labels, block.SidecarSource)
s := shipper.New(logger, nil, dataDir, bkt, m.Labels, metadata.SidecarSource)
ctx, cancel := context.WithCancel(context.Background())

g.Add(func() error {
Expand All @@ -265,9 +265,9 @@ func runSidecar(
if err != nil {
level.Warn(logger).Log("msg", "reading timestamps failed", "err", err)
} else {
metadata.UpdateTimestamps(minTime, math.MaxInt64)
m.UpdateTimestamps(minTime, math.MaxInt64)

mint, maxt := metadata.Timestamps()
mint, maxt := m.Timestamps()
peer.SetTimestamps(mint, maxt)
}
return nil
Expand All @@ -281,7 +281,7 @@ func runSidecar(
return nil
}

type metadata struct {
type promMetadata struct {
promURL *url.URL

mtx sync.Mutex
Expand All @@ -290,7 +290,7 @@ type metadata struct {
labels labels.Labels
}

func (s *metadata) UpdateLabels(ctx context.Context, logger log.Logger) error {
func (s *promMetadata) UpdateLabels(ctx context.Context, logger log.Logger) error {
elset, err := queryExternalLabels(ctx, logger, s.promURL)
if err != nil {
return err
Expand All @@ -303,22 +303,22 @@ func (s *metadata) UpdateLabels(ctx context.Context, logger log.Logger) error {
return nil
}

func (s *metadata) UpdateTimestamps(mint int64, maxt int64) {
func (s *promMetadata) UpdateTimestamps(mint int64, maxt int64) {
s.mtx.Lock()
defer s.mtx.Unlock()

s.mint = mint
s.maxt = maxt
}

func (s *metadata) Labels() labels.Labels {
func (s *promMetadata) Labels() labels.Labels {
s.mtx.Lock()
defer s.mtx.Unlock()

return s.labels
}

func (s *metadata) LabelsPB() []storepb.Label {
func (s *promMetadata) LabelsPB() []storepb.Label {
s.mtx.Lock()
defer s.mtx.Unlock()

Expand All @@ -332,7 +332,7 @@ func (s *metadata) LabelsPB() []storepb.Label {
return lset
}

func (s *metadata) Timestamps() (mint int64, maxt int64) {
func (s *promMetadata) Timestamps() (mint int64, maxt int64) {
s.mtx.Lock()
defer s.mtx.Unlock()

Expand Down
Loading