Skip to content

Commit

Permalink
Merged Experimental Federated Rules API feature branch to master. (#2200
Browse files Browse the repository at this point in the history
)

* Added RulesAPI.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Added warnings.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Added Type to rules requests as it is on HTTP API. (#2201)

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* pkg/store/storepb: fix wrong rule reference (#2237)

* pkg/store/storepb: fix wrong rule reference

Currently we recursively reference rules instead of recording rules.

Signed-off-by: Sergiusz Urbaniak <sergiusz.urbaniak@gmail.com>

* proto: regenerate

Signed-off-by: Sergiusz Urbaniak <sergiusz.urbaniak@gmail.com>

* Made storepb.RuleGroups a source of truth for rules API (Go, JSON, proto). Added tests. (#2242)

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Use proto rules API instead of struct; Moved as much as possible to promclient; Added rulesAPI RPC to sidecar. (#2243)

* Use proto rules API instead of struct; Added rulesAPI RPC to sidecar.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Fixed broken test.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Use proto rules API instead of struct; Moved as much as possible to promclient; Added rulesAPI RPC to sidecar. (#2291)

* rules_custom_test: fix asserting labels

Signed-off-by: Sergiusz Urbaniak <sergiusz.urbaniak@gmail.com>

* TestPrometheusStore_Rules_e2e: fix test fixture

Signed-off-by: Sergiusz Urbaniak <sergiusz.urbaniak@gmail.com>

Co-authored-by: Sergiusz Urbaniak <sergiusz.urbaniak@gmail.com>

* cmd/thanos/query: add initial rules support (#2240)

* cmd/thanos/query: add initial rules support

Signed-off-by: Sergiusz Urbaniak <sergiusz.urbaniak@gmail.com>

* pkg/query/api/v1: initial implementation

Signed-off-by: Sergiusz Urbaniak <sergiusz.urbaniak@gmail.com>

* e2e: initial implementation and fixes

Signed-off-by: Sergiusz Urbaniak <sergiusz.urbaniak@gmail.com>

* pkg/query: fix racy access to assert rules API store

Signed-off-by: Sergiusz Urbaniak <sergiusz.urbaniak@gmail.com>

* Refactored proto generation and separated store from rules APIs. (#2558)

* Refactored proto generation and separate store from rules APIs.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Addressed comments.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Fixed proto gen.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Addressed Serg comments.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Added Ruler support for RulesAPI; Refactored Manager. (#2562)

As per: https://thanos.io/proposals/202003_thanos_rules_federation.md/

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Small fixes to changelog and flags. Do not add any.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Fixed after rebase.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

Co-authored-by: Sergiusz Urbaniak <sergiusz.urbaniak@gmail.com>
  • Loading branch information
bwplotka and s-urbaniak authored May 25, 2020
1 parent 079ad42 commit c733564
Show file tree
Hide file tree
Showing 68 changed files with 7,086 additions and 2,016 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version: 2
defaults: &defaults
docker:
# Built by Thanos make docker-ci
- image: quay.io/thanos/thanos-ci:go1.14.2-node
- image: quay.io/thanos/thanos-ci:v1.1-go1.14.2-node
jobs:
test:
<<: *defaults
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ ME ?= $(shell whoami)
# Referenced by github.com/thanos-io/thanos/blob/master/docs/getting_started.md#prometheus

# Limited prom version, because testing was not possible. This should fix it: https://github.com/thanos-io/thanos/issues/758
PROM_VERSIONS ?= v2.4.3 v2.5.0 v2.8.1 v2.9.2 v2.13.0
PROMS ?= $(GOBIN)/prometheus-v2.4.3 $(GOBIN)/prometheus-v2.5.0 $(GOBIN)/prometheus-v2.8.1 $(GOBIN)/prometheus-v2.9.2 $(GOBIN)/prometheus-v2.13.0
PROM_VERSIONS ?= v2.4.3 v2.5.0 v2.8.1 v2.9.2 v2.13.0 v2.18.1
PROMS ?= $(GOBIN)/prometheus-v2.4.3 $(GOBIN)/prometheus-v2.5.0 $(GOBIN)/prometheus-v2.8.1 $(GOBIN)/prometheus-v2.9.2 $(GOBIN)/prometheus-v2.13.0 $(GOBIN)/prometheus-v2.18.1

ALERTMANAGER_VERSION ?= v0.20.0
ALERTMANAGER ?= $(GOBIN)/alertmanager-$(ALERTMANAGER_VERSION)
Expand Down
93 changes: 72 additions & 21 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
kingpin "gopkg.in/alecthomas/kingpin.v2"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/discovery/cache"
"github.com/thanos-io/thanos/pkg/discovery/dns"
Expand All @@ -34,12 +32,14 @@ import (
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/query"
v1 "github.com/thanos-io/thanos/pkg/query/api"
"github.com/thanos-io/thanos/pkg/rules"
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/tls"
"github.com/thanos-io/thanos/pkg/ui"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

// registerQuery registers a query command.
Expand All @@ -66,7 +66,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
maxConcurrentQueries := cmd.Flag("query.max-concurrent", "Maximum number of queries processed concurrently by query node.").
Default("20").Int()

replicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter.").
queryReplicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter. Data includes time series, recording rules, and alerting rules.").
Strings()

instantDefaultMaxSourceResolution := modelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())
Expand All @@ -77,6 +77,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
stores := cmd.Flag("store", "Addresses of statically configured store API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups.").
PlaceHolder("<store>").Strings()

rules := cmd.Flag("rule", "Addresses of statically configured rules API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect rule API servers through respective DNS lookups.").
Hidden().PlaceHolder("<rule>").Strings()

strictStores := cmd.Flag("store-strict", "Addresses of only statically configured store API servers that are always used, even if the health check fails. Useful if you have a caching layer on top.").
PlaceHolder("<staticstore>").Strings()

Expand Down Expand Up @@ -111,13 +114,12 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
return errors.Wrap(err, "parse federation labels")
}

lookupStores := map[string]struct{}{}
for _, s := range *stores {
if _, ok := lookupStores[s]; ok {
return errors.Errorf("Address %s is duplicated for --store flag.", s)
}
if dup := firstDuplicate(*stores); dup != "" {
return errors.Errorf("Address %s is duplicated for --store flag.", dup)
}

lookupStores[s] = struct{}{}
if dup := firstDuplicate(*rules); dup != "" {
return errors.Errorf("Address %s is duplicated for --rule flag.", dup)
}

var fileSD *file.Discovery
Expand Down Expand Up @@ -154,9 +156,10 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
*maxConcurrentQueries,
time.Duration(*queryTimeout),
time.Duration(*storeResponseTimeout),
*replicaLabels,
*queryReplicaLabels,
selectorLset,
*stores,
*rules,
*enableAutodownsampling,
*enablePartialResponse,
fileSD,
Expand Down Expand Up @@ -195,9 +198,10 @@ func runQuery(
maxConcurrentQueries int,
queryTimeout time.Duration,
storeResponseTimeout time.Duration,
replicaLabels []string,
queryReplicaLabels []string,
selectorLset labels.Labels,
storeAddrs []string,
ruleAddrs []string,
enableAutodownsampling bool,
enablePartialResponse bool,
fileSD *file.Discovery,
Expand All @@ -220,7 +224,7 @@ func runQuery(
}

fileSDCache := cache.New()
dnsProvider := dns.NewProvider(
dnsStoreProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_querier_store_apis_", reg),
dns.ResolverType(dnsSDResolver),
Expand All @@ -232,28 +236,43 @@ func runQuery(
}
}

dnsRuleProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_querier_rule_apis_", reg),
dns.ResolverType(dnsSDResolver),
)

var (
stores = query.NewStoreSet(
logger,
reg,
func() (specs []query.StoreSpec) {
// Add DNS resolved addresses.
for _, addr := range dnsProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
}

// Add strict & static nodes.
for _, addr := range strictStores {
specs = append(specs, query.NewGRPCStoreSpec(addr, true))
}
// Add DNS resolved addresses from static flags and file SD.
for _, addr := range dnsStoreProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
}
return removeDuplicateStoreSpecs(logger, duplicatedStores, specs)
},
func() (specs []query.RuleSpec) {
for _, addr := range dnsRuleProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
}

specs = removeDuplicateStoreSpecs(logger, duplicatedStores, specs)
// NOTE(s-urbaniak): No need to remove duplicates, as rule apis are a subset of store apis.
// hence, any duplicates will be tracked in the store api set.

return specs
},
dialOpts,
unhealthyStoreTimeout,
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
queryableCreator = query.NewQueryableCreator(logger, proxy)
engine = promql.NewEngine(
promql.EngineOpts{
Expand Down Expand Up @@ -303,9 +322,11 @@ func runQuery(
}
fileSDCache.Update(update)
stores.Update(ctxUpdate)
if err := dnsProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...)); err != nil {

if err := dnsStoreProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...)); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err)
}
// Rules apis do not support file service discovery as of now.
case <-ctxUpdate.Done():
return nil
}
Expand All @@ -320,9 +341,12 @@ func runQuery(
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(dnsSDInterval, ctx.Done(), func() error {
if err := dnsProvider.Resolve(ctx, append(fileSDCache.Addresses(), storeAddrs...)); err != nil {
if err := dnsStoreProvider.Resolve(ctx, append(fileSDCache.Addresses(), storeAddrs...)); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for storeAPIs", "err", err)
}
if err := dnsRuleProvider.Resolve(ctx, ruleAddrs); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for rulesAPIs", "err", err)
}
return nil
})
}, func(error) {
Expand Down Expand Up @@ -357,7 +381,18 @@ func runQuery(
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewQueryUI(logger, reg, stores, webExternalPrefix, webPrefixHeaderName).Register(router, ins)

api := v1.NewAPI(logger, reg, stores, engine, queryableCreator, enableAutodownsampling, enablePartialResponse, replicaLabels, instantDefaultMaxSourceResolution)
api := v1.NewAPI(
logger,
reg,
stores,
engine,
queryableCreator,
enableAutodownsampling,
enablePartialResponse,
queryReplicaLabels,
instantDefaultMaxSourceResolution,
rules.NewGRPCClient(rulesProxy),
)

api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins)

Expand Down Expand Up @@ -385,7 +420,7 @@ func runQuery(
return errors.Wrap(err, "setup gRPC server")
}

s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy,
s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, proxy, rulesProxy,
grpcserver.WithListen(grpcBindAddr),
grpcserver.WithGracePeriod(grpcGracePeriod),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down Expand Up @@ -420,3 +455,19 @@ func removeDuplicateStoreSpecs(logger log.Logger, duplicatedStores prometheus.Co
}
return deduplicated
}

// firstDuplicate returns the first duplicate string in the given string slice
// or empty string if none was found.
func firstDuplicate(ss []string) string {
set := map[string]struct{}{}

for _, s := range ss {
if _, ok := set[s]; ok {
return s
}

set[s] = struct{}{}
}

return ""
}
Loading

0 comments on commit c733564

Please sign in to comment.