Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use proto rules API instead of struct; Moved as much as possible to promclient; Added rulesAPI RPC to sidecar. #2243

Merged
merged 3 commits into from
Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ func runQuery(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy,
// TODO: Add rules API implementation when ready.
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy, nil,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
5 changes: 3 additions & 2 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,8 @@ func runRule(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store,
// TODO: Add rules API implementation when ready.
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store, nil,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down Expand Up @@ -709,7 +710,7 @@ func queryFunc(

promClients := make([]*promclient.Client, 0, len(queriers))
for _, q := range queriers {
promClients = append(promClients, promclient.NewClient(logger, q))
promClients = append(promClients, promclient.NewClient(q, logger, "thanos-rule"))
}

return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
Expand Down
19 changes: 11 additions & 8 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func runSidecar(
maxt: math.MaxInt64,

limitMinTime: limitMinTime,
client: promclient.NewWithTracingClient(logger, "thanos-sidecar"),
}

confContentYaml, err := objStoreConfig.Content()
Expand Down Expand Up @@ -198,15 +199,15 @@ func runSidecar(
// Only check Prometheus's flags when upload is enabled.
if uploads {
// Check prometheus's flags to ensure sane sidecar flags.
if err := validatePrometheus(ctx, logger, ignoreBlockSize, m); err != nil {
if err := validatePrometheus(ctx, m.client, logger, ignoreBlockSize, m); err != nil {
return errors.Wrap(err, "validate Prometheus flags")
}
}

// Blocking query of external labels before joining as a Source Peer into gossip.
// We retry infinitely until we reach and fetch labels from our Prometheus.
err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
if err := m.UpdateLabels(ctx, logger); err != nil {
if err := m.UpdateLabels(ctx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
"err", err,
Expand Down Expand Up @@ -239,7 +240,7 @@ func runSidecar(
iterCtx, iterCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer iterCancel()

if err := m.UpdateLabels(iterCtx, logger); err != nil {
if err := m.UpdateLabels(iterCtx); err != nil {
level.Warn(logger).Log("msg", "heartbeat failed", "err", err)
promUp.Set(0)
} else {
Expand Down Expand Up @@ -278,7 +279,7 @@ func runSidecar(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore, promStore,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down Expand Up @@ -356,14 +357,14 @@ func runSidecar(
return nil
}

func validatePrometheus(ctx context.Context, logger log.Logger, ignoreBlockSize bool, m *promMetadata) error {
func validatePrometheus(ctx context.Context, client *promclient.Client, logger log.Logger, ignoreBlockSize bool, m *promMetadata) error {
var (
flagErr error
flags promclient.Flags
)

if err := runutil.Retry(2*time.Second, ctx.Done(), func() error {
if flags, flagErr = promclient.ConfiguredFlags(ctx, logger, m.promURL); flagErr != nil && flagErr != promclient.ErrFlagEndpointNotFound {
if flags, flagErr = client.ConfiguredFlags(ctx, m.promURL); flagErr != nil && flagErr != promclient.ErrFlagEndpointNotFound {
level.Warn(logger).Log("msg", "failed to get Prometheus flags. Is Prometheus running? Retrying", "err", flagErr)
return errors.Wrapf(flagErr, "fetch Prometheus flags")
}
Expand Down Expand Up @@ -402,10 +403,12 @@ type promMetadata struct {
labels labels.Labels

limitMinTime thanosmodel.TimeOrDurationValue

client *promclient.Client
}

func (s *promMetadata) UpdateLabels(ctx context.Context, logger log.Logger) error {
elset, err := promclient.ExternalLabels(ctx, logger, s.promURL)
func (s *promMetadata) UpdateLabels(ctx context.Context) error {
elset, err := s.client.ExternalLabels(ctx, s.promURL)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func runStore(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, component, grpcProbe, bs,
s := grpcserver.New(logger, reg, tracer, component, grpcProbe, bs, nil,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/go-kit/kit v0.9.0
github.com/go-openapi/strfmt v0.19.2
github.com/gogo/protobuf v1.3.1
github.com/gogo/status v1.0.3
github.com/golang/snappy v0.0.1
github.com/googleapis/gax-go v2.0.2+incompatible
github.com/gophercloud/gophercloud v0.6.0
Expand Down
10 changes: 5 additions & 5 deletions pkg/store/matchers.go → pkg/promclient/matchers.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package store
package promclient

import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/store/storepb"
)

func translateMatcher(m storepb.LabelMatcher) (*labels.Matcher, error) {
func TranslateMatcher(m storepb.LabelMatcher) (*labels.Matcher, error) {
switch m.Type {
case storepb.LabelMatcher_EQ:
return labels.NewMatcher(labels.MatchEqual, m.Name, m.Value)
Expand All @@ -26,9 +26,9 @@ func translateMatcher(m storepb.LabelMatcher) (*labels.Matcher, error) {
return nil, errors.Errorf("unknown label matcher type %d", m.Type)
}

func translateMatchers(ms []storepb.LabelMatcher) (res []*labels.Matcher, err error) {
func TranslateMatchers(ms []storepb.LabelMatcher) (res []*labels.Matcher, err error) {
for _, m := range ms {
r, err := translateMatcher(m)
r, err := TranslateMatcher(m)
if err != nil {
return nil, err
}
Expand All @@ -40,7 +40,7 @@ func translateMatchers(ms []storepb.LabelMatcher) (res []*labels.Matcher, err er
// matchersToString converts label matchers to string format.
func matchersToString(ms []storepb.LabelMatcher) (string, error) {
var res string
matchers, err := translateMatchers(ms)
matchers, err := TranslateMatchers(ms)
if err != nil {
return "", err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package store
package promclient

import (
"testing"
Expand Down
Loading