Skip to content

Commit

Permalink
[file-sd-part-3] Added file sd to rule (#547)
Browse files Browse the repository at this point in the history
* Added file sd to rule
  • Loading branch information
Ivan Valkov authored Oct 16, 2018
1 parent 49718a7 commit 1ec8639
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 53 deletions.
4 changes: 2 additions & 2 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func runQuery(
specs = append(specs, query.NewGRPCStoreSpec(addr))
}

specs = removeDuplicates(logger, duplicatedStores, specs)
specs = removeDuplicateStoreSpecs(logger, duplicatedStores, specs)

return specs
},
Expand Down Expand Up @@ -412,7 +412,7 @@ func runQuery(
return nil
}

func removeDuplicates(logger log.Logger, duplicatedStores prometheus.Counter, specs []query.StoreSpec) []query.StoreSpec {
func removeDuplicateStoreSpecs(logger log.Logger, duplicatedStores prometheus.Counter, specs []query.StoreSpec) []query.StoreSpec {
set := make(map[string]query.StoreSpec)
for _, spec := range specs {
addr := spec.Addr()
Expand Down
119 changes: 116 additions & 3 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/improbable-eng/thanos/pkg/alert"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/discovery/cache"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/shipper"
Expand All @@ -37,6 +38,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/discovery/file"
"github.com/prometheus/prometheus/discovery/targetgroup"
promlabels "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
Expand Down Expand Up @@ -75,6 +78,15 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)

objStoreConfig := regCommonObjStoreFlags(cmd, "")

queries := cmd.Flag("query", "Addresses of statically configured query API servers (repeatable).").
PlaceHolder("<query>").Strings()

fileSDFiles := cmd.Flag("query.file-sd-config.files", "Path to file that contain addresses of query peers. The path can be a glob pattern (repeatable).").
PlaceHolder("<path>").Strings()

fileSDInterval := modelDuration(cmd.Flag("query.file-sd-config.interval", "Refresh interval to re-read file SD files. (used as a fallback)").
Default("5m"))

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
lset, err := parseFlagLabels(*labelStrs)
if err != nil {
Expand All @@ -96,6 +108,25 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)
NoLockfile: true,
WALFlushInterval: 30 * time.Second,
}

lookupQueries := map[string]struct{}{}
for _, q := range *queries {
if _, ok := lookupQueries[q]; ok {
return errors.Errorf("Address %s is duplicated for --query flag.", q)
}

lookupQueries[q] = struct{}{}
}

var fileSD *file.Discovery
if len(*fileSDFiles) > 0 {
conf := &file.SDConfig{
Files: *fileSDFiles,
RefreshInterval: *fileSDInterval,
}
fileSD = file.NewDiscovery(conf, logger)
}

return runRule(g,
logger,
reg,
Expand All @@ -115,6 +146,8 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)
tsdbOpts,
name,
alertQueryURL,
*queries,
fileSD,
)
}
}
Expand All @@ -141,6 +174,8 @@ func runRule(
tsdbOpts *tsdb.Options,
component string,
alertQueryURL *url.URL,
queryAddrs []string,
fileSD *file.Discovery,
) error {
configSuccess := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_config_last_reload_successful",
Expand All @@ -150,9 +185,19 @@ func runRule(
Name: "thanos_config_last_reload_success_timestamp_seconds",
Help: "Timestamp of the last successful configuration reload.",
})

duplicatedQuery := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_rule_duplicated_query_address",
Help: "The number of times a duplicated query addresses is detected from the different configs in rule",
})
reg.MustRegister(configSuccess)
reg.MustRegister(configSuccessTime)
reg.MustRegister(duplicatedQuery)

for _, addr := range queryAddrs {
if addr == "" {
return errors.New("static querier address cannot be empty")
}
}

db, err := tsdb.Open(dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts)
if err != nil {
Expand All @@ -168,9 +213,17 @@ func runRule(
})
}

// FileSD query addresses
fileSDCache := cache.New()

// Hit the HTTP query API of query peers in randomized order until we get a result
// back or the context get canceled.
queryFn := func(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
var addrs []string
// Add addresses from static flag
addrs = append(addrs, queryAddrs...)

// Add addresses from gossip
peers := peer.PeerStates(cluster.PeerTypeQuery)
var ids []string
for id := range peers {
Expand All @@ -179,9 +232,19 @@ func runRule(
sort.Slice(ids, func(i int, j int) bool {
return strings.Compare(ids[i], ids[j]) < 0
})
for _, id := range ids {
addrs = append(addrs, peers[id].QueryAPIAddr)
}

// Add addresses from file sd
for _, addr := range fileSDCache.Addresses() {
addrs = append(addrs, addr)
}

for _, i := range rand.Perm(len(ids)) {
vec, err := queryPrometheusInstant(ctx, logger, peers[ids[i]].QueryAPIAddr, q, t)
removeDuplicateQueryAddrs(logger, duplicatedQuery, addrs)

for _, i := range rand.Perm(len(addrs)) {
vec, err := queryPrometheusInstant(ctx, logger, addrs[i], q, t)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -301,6 +364,39 @@ func runRule(
cancel()
})
}
// Run File Service Discovery and update the query addresses when the files are modified
if fileSD != nil {
var fileSDUpdates chan []*targetgroup.Group
ctxRun, cancelRun := context.WithCancel(context.Background())

fileSDUpdates = make(chan []*targetgroup.Group)

g.Add(func() error {
fileSD.Run(ctxRun, fileSDUpdates)
return nil
}, func(error) {
cancelRun()
})

ctxUpdate, cancelUpdate := context.WithCancel(context.Background())
g.Add(func() error {
for {
select {
case update := <-fileSDUpdates:
// Discoverers sometimes send nil updates so need to check for it to avoid panics
if update == nil {
continue
}
fileSDCache.Update(update)
case <-ctxUpdate.Done():
return nil
}
}
}, func(error) {
cancelUpdate()
close(fileSDUpdates)
})
}

// Handle reload and termination interrupts.
reload := make(chan struct{}, 1)
Expand Down Expand Up @@ -649,3 +745,20 @@ func labelsTSDBToProm(lset labels.Labels) (res promlabels.Labels) {
}
return res
}

func removeDuplicateQueryAddrs(logger log.Logger, duplicatedQueriers prometheus.Counter, addrs []string) []string {
set := make(map[string]struct{})
for _, addr := range addrs {
if _, ok := set[addr]; ok {
level.Warn(logger).Log("msg", "Duplicate query address is provided - %v", addr)
duplicatedQueriers.Inc()
}
set[addr] = struct{}{}
}

deduplicated := make([]string, 0, len(set))
for key := range set {
deduplicated = append(deduplicated, key)
}
return deduplicated
}
9 changes: 9 additions & 0 deletions docs/components/rule.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,14 @@ Flags:
--objstore.config=<bucket.config-yaml>
Alternative to 'objstore.config-file' flag.
Object store configuration in YAML.
--query=<query> ... Addresses of statically configured query API
servers (repeatable).
--query.file-sd-config.files=<path> ...
Path to file that contain addresses of query
peers. The path can be a glob pattern
(repeatable).
--query.file-sd-config.interval=5m
Refresh interval to re-read file SD files.
(used as a fallback)

```
40 changes: 20 additions & 20 deletions test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,41 @@ type testConfig struct {
var (
firstPromPort = promHTTPPort(1)

gossipSuite = newSpinupSuite().
Add(scraper(1, defaultPromConfig("prom-"+firstPromPort, 0), true)).
Add(scraper(2, defaultPromConfig("prom-ha", 0), true)).
Add(scraper(3, defaultPromConfig("prom-ha", 1), true)).
Add(querier(1, "replica")).
Add(querier(2, "replica"))

staticFlagsSuite = newSpinupSuite().
queryGossipSuite = newSpinupSuite().
Add(scraper(1, defaultPromConfig("prom-"+firstPromPort, 0), true)).
Add(scraper(2, defaultPromConfig("prom-ha", 0), true)).
Add(scraper(3, defaultPromConfig("prom-ha", 1), true)).
Add(querier(1, "replica")).
Add(querier(2, "replica"))

queryStaticFlagsSuite = newSpinupSuite().
Add(scraper(1, defaultPromConfig("prom-"+firstPromPort, 0), false)).
Add(scraper(2, defaultPromConfig("prom-ha", 0), false)).
Add(scraper(3, defaultPromConfig("prom-ha", 1), false)).
Add(querierWithStoreFlags(1, "replica", []string{sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3)})).
Add(querierWithStoreFlags(2, "replica", []string{sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3)}))

fileSDSuite = newSpinupSuite().
Add(scraper(1, defaultPromConfig("prom-"+firstPromPort, 0), false)).
Add(scraper(2, defaultPromConfig("prom-ha", 0), false)).
Add(scraper(3, defaultPromConfig("prom-ha", 1), false)).
Add(querierWithFileSD(1, "replica", []string{sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3)})).
Add(querierWithFileSD(2, "replica", []string{sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3)}))
Add(querierWithStoreFlags(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3))).
Add(querierWithStoreFlags(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3)))

queryFileSDSuite = newSpinupSuite().
Add(scraper(1, defaultPromConfig("prom-"+firstPromPort, 0), false)).
Add(scraper(2, defaultPromConfig("prom-ha", 0), false)).
Add(scraper(3, defaultPromConfig("prom-ha", 1), false)).
Add(querierWithFileSD(1, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3))).
Add(querierWithFileSD(2, "replica", sidecarGRPC(1), sidecarGRPC(2), sidecarGRPC(3)))
)

func TestQuery(t *testing.T) {
for _, tt := range []testConfig{
{
"gossip",
gossipSuite,
queryGossipSuite,
},
{
"staticFlag",
staticFlagsSuite,
queryStaticFlagsSuite,
},
{
"fileSD",
fileSDSuite,
queryFileSDSuite,
},
} {
t.Run(tt.name, func(t *testing.T) {
Expand Down
57 changes: 49 additions & 8 deletions test/e2e/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,7 @@ import (
"github.com/prometheus/prometheus/pkg/timestamp"
)

// TestRuleComponent tests the basic interaction between the rule component
// and the querying layer.
// Rules are evaluated against the query layer and the query layer in return
// can access data written by the rules.
func TestRuleComponent(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)

const alwaysFireRule = `
const alwaysFireRule = `
groups:
- name: example
rules:
Expand All @@ -34,6 +27,54 @@ groups:
summary: "I always complain"
`

var (
ruleGossipSuite = newSpinupSuite().
Add(querier(1, "")).
Add(ruler(1, alwaysFireRule)).
Add(ruler(2, alwaysFireRule)).
Add(alertManager(1))

ruleStaticFlagsSuite = newSpinupSuite().
Add(querierWithStoreFlags(1, "", rulerGRPC(1), rulerGRPC(2))).
Add(rulerWithQueryFlags(1, alwaysFireRule, queryHTTP(1))).
Add(rulerWithQueryFlags(2, alwaysFireRule, queryHTTP(1))).
Add(alertManager(1))

ruleFileSDSuite = newSpinupSuite().
Add(querierWithFileSD(1, "", rulerGRPC(1), rulerGRPC(2))).
Add(rulerWithFileSD(1, alwaysFireRule, queryHTTP(1))).
Add(rulerWithFileSD(2, alwaysFireRule, queryHTTP(1))).
Add(alertManager(1))
)

func TestRule(t *testing.T) {
for _, tt := range []testConfig{
{
"gossip",
ruleGossipSuite,
},
{
"staticFlag",
ruleStaticFlagsSuite,
},
{
"fileSD",
ruleFileSDSuite,
},
} {
t.Run(tt.name, func(t *testing.T) {
testRuleComponent(t, tt)
})
}
}

// testRuleComponent tests the basic interaction between the rule component
// and the querying layer.
// Rules are evaluated against the query layer and the query layer in return
// can access data written by the rules.
func testRuleComponent(t *testing.T, conf testConfig) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)

exit, err := newSpinupSuite().
Add(querier(1, "")).
Add(ruler(1, alwaysFireRule)).
Expand Down
Loading

0 comments on commit 1ec8639

Please sign in to comment.