From 8150c9f5729d7968c9a5313e482d83c17fc0de86 Mon Sep 17 00:00:00 2001
From: Simon Pasquier <spasquie@redhat.com>
Date: Mon, 16 Dec 2019 15:55:58 +0100
Subject: [PATCH] Address Bartek's comments

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
---
 CHANGELOG.md             |  5 ++---
 cmd/thanos/rule.go       | 16 +++++++--------
 docs/components/rule.md  |  4 +++-
 pkg/alert/alert.go       | 19 ++++++++++--------
 pkg/alert/alert_test.go  | 19 +++++++++---------
 pkg/alert/client.go      | 11 ++++------
 pkg/alert/client_test.go |  4 ++++
 test/e2e/rule_test.go    | 43 ++++++++++++++++------------------------
 8 files changed, 59 insertions(+), 62 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4c66b9a5445..35cdc6f7fd2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -18,11 +18,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel
 ### Added
 - [#1852](https://github.com/thanos-io/thanos/pull/1852) Add support for `AWS_CONTAINER_CREDENTIALS_FULL_URI` by upgrading to minio-go v6.0.44
 - [#1854](https://github.com/thanos-io/thanos/pull/1854) Update Rule UI to support alerts count displaying and filtering.
-
-### Added
-
 - [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add TLS and authentication support for Alertmanager with the `--alertmanagers.config` and `--alertmanagers.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information.
 
+- [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add a new `--alertmanagers.sd-dns-interval` CLI option to specify the interval between DNS resolutions of Alertmanager hosts.
+
 ## [v0.9.0](https://github.com/thanos-io/thanos/releases/tag/v0.9.0) - 2019.12.03
 
 ### Added
diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go
index 0b756e5f25e..2a0e9e9969f 100644
--- a/cmd/thanos/rule.go
+++ b/cmd/thanos/rule.go
@@ -296,14 +296,14 @@ func runRule(
 		return err
 	}
 	var (
-		alertingcfg alert.AlertingConfig
+		alertingCfg alert.AlertingConfig
 		alertmgrs   []*alert.Alertmanager
 	)
 	if len(alertmgrsConfigYAML) > 0 {
 		if len(alertmgrURLs) != 0 {
 			return errors.New("--alertmanagers.url and --alertmanagers.config* flags cannot be defined at the same time")
 		}
-		alertingcfg, err = alert.LoadAlertingConfig(alertmgrsConfigYAML)
+		alertingCfg, err = alert.LoadAlertingConfig(alertmgrsConfigYAML)
 		if err != nil {
 			return err
 		}
@@ -314,13 +314,13 @@ func runRule(
 			if err != nil {
 				return err
 			}
-			alertingcfg.Alertmanagers = append(alertingcfg.Alertmanagers, cfg)
+			alertingCfg.Alertmanagers = append(alertingCfg.Alertmanagers, cfg)
 		}
 	}
-	if len(alertingcfg.Alertmanagers) == 0 {
+	if len(alertingCfg.Alertmanagers) == 0 {
 		level.Warn(logger).Log("msg", "no alertmanager configured")
 	}
-	for _, cfg := range alertingcfg.Alertmanagers {
+	for _, cfg := range alertingCfg.Alertmanagers {
 		am, err := alert.NewAlertmanager(logger, cfg)
 		if err != nil {
 			return err
@@ -421,11 +421,11 @@ func runRule(
 	}
 	// Run the alert sender.
 	{
-		doers := make([]alert.AlertmanagerDoer, len(alertmgrs))
+		clients := make([]alert.AlertmanagerClient, len(alertmgrs))
 		for i := range alertmgrs {
-			doers[i] = alertmgrs[i]
+			clients[i] = alertmgrs[i]
 		}
-		sdr := alert.NewSender(logger, reg, doers)
+		sdr := alert.NewSender(logger, reg, clients)
 		ctx, cancel := context.WithCancel(context.Background())
 
 		g.Add(func() error {
diff --git a/docs/components/rule.md b/docs/components/rule.md
index 565223681fd..7f8807a9062 100644
--- a/docs/components/rule.md
+++ b/docs/components/rule.md
@@ -290,7 +290,9 @@ Flags:
 
 ### Alertmanager
 
-The configuration format supported by the `--alertmanagers.config` and `--alertmanagers.config-file` flags is the following:
+The `--alertmanagers.config` and `--alertmanagers.config-file` flags allow specifying multiple Alertmanagers. Those entries are treated as a single HA group. This means that alert send failure is claimed only if the Ruler fails to send to all instances.
+
+The configuration format is the following:
 
 [embedmd]:# (../flags/config_rule_alerting.txt yaml)
 ```yaml
diff --git a/pkg/alert/alert.go b/pkg/alert/alert.go
index c443f715558..2a9e2e8ab85 100644
--- a/pkg/alert/alert.go
+++ b/pkg/alert/alert.go
@@ -2,9 +2,11 @@
 package alert
 
 import (
+	"bytes"
 	"context"
 	"encoding/json"
 	"fmt"
+	"io"
 	"net/url"
 	"sync"
 	"sync/atomic"
@@ -238,15 +240,15 @@ func (q *Queue) Push(alerts []*Alert) {
 	}
 }
 
-type AlertmanagerDoer interface {
+type AlertmanagerClient interface {
 	Endpoints() []*url.URL
-	Do(context.Context, *url.URL, []byte) error
+	Do(context.Context, *url.URL, io.Reader) error
 }
 
 // Sender sends notifications to a dynamic set of alertmanagers.
 type Sender struct {
 	logger        log.Logger
-	alertmanagers []AlertmanagerDoer
+	alertmanagers []AlertmanagerClient
 
 	sent    *prometheus.CounterVec
 	errs    *prometheus.CounterVec
@@ -259,7 +261,7 @@ type Sender struct {
 func NewSender(
 	logger log.Logger,
 	reg prometheus.Registerer,
-	alertmanagers []AlertmanagerDoer,
+	alertmanagers []AlertmanagerClient,
 ) *Sender {
 	if logger == nil {
 		logger = log.NewNopLogger()
@@ -294,7 +296,7 @@ func NewSender(
 	return s
 }
 
-// Send an alert batch to all given Alertmanager client.
+// Send an alert batch to all given Alertmanager clients.
 // TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660.
 func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
 	if len(alerts) == 0 {
@@ -313,17 +315,18 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
 	for _, amc := range s.alertmanagers {
 		for _, u := range amc.Endpoints() {
 			wg.Add(1)
-			go func(amc AlertmanagerDoer, u *url.URL) {
+			go func(amc AlertmanagerClient, u *url.URL) {
 				defer wg.Done()
 
 				level.Debug(s.logger).Log("msg", "sending alerts", "alertmanager", u.Host, "numAlerts", len(alerts))
 				start := time.Now()
-				if err := amc.Do(ctx, u, b); err != nil {
+				if err := amc.Do(ctx, u, bytes.NewReader(b)); err != nil {
 					level.Warn(s.logger).Log(
 						"msg", "sending alerts failed",
 						"alertmanager", u.Host,
 						"numAlerts", len(alerts),
-						"err", err)
+						"err", err,
+					)
 					s.errs.WithLabelValues(u.Host).Inc()
 					return
 				}
diff --git a/pkg/alert/alert_test.go b/pkg/alert/alert_test.go
index 8797d9f8f2d..7e1e851d80c 100644
--- a/pkg/alert/alert_test.go
+++ b/pkg/alert/alert_test.go
@@ -2,6 +2,7 @@ package alert
 
 import (
 	"context"
+	"io"
 	"net/url"
 	"sync"
 	"testing"
@@ -46,18 +47,18 @@ func assertSameHosts(t *testing.T, expected []*url.URL, found []*url.URL) {
 	}
 }
 
-type fakeDoer struct {
+type fakeClient struct {
 	urls  []*url.URL
 	postf func(u *url.URL) error
 	mtx   sync.Mutex
 	seen  []*url.URL
 }
 
-func (f *fakeDoer) Endpoints() []*url.URL {
+func (f *fakeClient) Endpoints() []*url.URL {
 	return f.urls
 }
 
-func (f *fakeDoer) Do(ctx context.Context, u *url.URL, b []byte) error {
+func (f *fakeClient) Do(ctx context.Context, u *url.URL, r io.Reader) error {
 	f.mtx.Lock()
 	defer f.mtx.Unlock()
 	f.seen = append(f.seen, u)
@@ -68,10 +69,10 @@ func (f *fakeDoer) Do(ctx context.Context, u *url.URL, b []byte) error {
 }
 
 func TestSenderSendsOk(t *testing.T) {
-	poster := &fakeDoer{
+	poster := &fakeClient{
 		urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}},
 	}
-	s := NewSender(nil, nil, []AlertmanagerDoer{poster})
+	s := NewSender(nil, nil, []AlertmanagerClient{poster})
 
 	s.Send(context.Background(), []*Alert{{}, {}})
 
@@ -86,7 +87,7 @@ func TestSenderSendsOk(t *testing.T) {
 }
 
 func TestSenderSendsOneFails(t *testing.T) {
-	poster := &fakeDoer{
+	poster := &fakeClient{
 		urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}},
 		postf: func(u *url.URL) error {
 			if u.Host == "am1:9090" {
@@ -95,7 +96,7 @@ func TestSenderSendsOneFails(t *testing.T) {
 			return nil
 		},
 	}
-	s := NewSender(nil, nil, []AlertmanagerDoer{poster})
+	s := NewSender(nil, nil, []AlertmanagerClient{poster})
 
 	s.Send(context.Background(), []*Alert{{}, {}})
 
@@ -110,13 +111,13 @@ func TestSenderSendsOneFails(t *testing.T) {
 }
 
 func TestSenderSendsAllFail(t *testing.T) {
-	poster := &fakeDoer{
+	poster := &fakeClient{
 		urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}},
 		postf: func(u *url.URL) error {
 			return errors.New("no such host")
 		},
 	}
-	s := NewSender(nil, nil, []AlertmanagerDoer{poster})
+	s := NewSender(nil, nil, []AlertmanagerClient{poster})
 
 	s.Send(context.Background(), []*Alert{{}, {}})
 
diff --git a/pkg/alert/client.go b/pkg/alert/client.go
index 9e7ddad67ca..3361a40d43d 100644
--- a/pkg/alert/client.go
+++ b/pkg/alert/client.go
@@ -1,8 +1,8 @@
 package alert
 
 import (
-	"bytes"
 	"context"
+	"io"
 	"net"
 	"net/http"
 	"net/url"
@@ -157,10 +157,7 @@ func DefaultAlertmanagerConfig() AlertmanagerConfig {
 func (c *AlertmanagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
 	*c = DefaultAlertmanagerConfig()
 	type plain AlertmanagerConfig
-	if err := unmarshal((*plain)(c)); err != nil {
-		return err
-	}
-	return nil
+	return unmarshal((*plain)(c))
 }
 
 // Alertmanager represents an HTTP client that can send alerts to a cluster of Alertmanager endpoints.
@@ -278,8 +275,8 @@ func (a *Alertmanager) Endpoints() []*url.URL {
 }
 
 // Post sends a POST request to the given URL.
-func (a *Alertmanager) Do(ctx context.Context, u *url.URL, b []byte) error {
-	req, err := http.NewRequest("POST", u.String(), bytes.NewReader(b))
+func (a *Alertmanager) Do(ctx context.Context, u *url.URL, r io.Reader) error {
+	req, err := http.NewRequest("POST", u.String(), r)
 	if err != nil {
 		return err
 	}
diff --git a/pkg/alert/client_test.go b/pkg/alert/client_test.go
index 060e218e051..3fd94c9404b 100644
--- a/pkg/alert/client_test.go
+++ b/pkg/alert/client_test.go
@@ -75,6 +75,10 @@ func TestBuildAlertmanagerConfiguration(t *testing.T) {
 				Scheme:          "http",
 			},
 		},
+		{
+			address: "://user:pass@localhost:9093",
+			err:     true,
+		},
 	} {
 		t.Run(tc.address, func(t *testing.T) {
 			cfg, err := BuildAlertmanagerConfig(nil, tc.address, time.Duration(0))
diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go
index de28a5aa9ad..89f1a4ee49a 100644
--- a/test/e2e/rule_test.go
+++ b/test/e2e/rule_test.go
@@ -82,28 +82,6 @@ func serializeAlertingConfiguration(t *testing.T, cfg ...alert.AlertmanagerConfi
 	return b
 }
 
-func writeAlertmanagerFileSD(t *testing.T, path string, addrs ...string) {
-	group := targetgroup.Group{Targets: []model.LabelSet{}}
-	for _, addr := range addrs {
-		group.Targets = append(group.Targets, model.LabelSet{model.LabelName(model.AddressLabel): model.LabelValue(addr)})
-	}
-
-	b, err := yaml.Marshal([]*targetgroup.Group{&group})
-	if err != nil {
-		t.Errorf("failed to serialize file SD configuration: %v", err)
-		return
-	}
-
-	err = ioutil.WriteFile(path+".tmp", b, 0660)
-	if err != nil {
-		t.Errorf("failed to write file SD configuration: %v", err)
-		return
-	}
-
-	err = os.Rename(path+".tmp", path)
-	testutil.Ok(t, err)
-}
-
 type mockAlertmanager struct {
 	path      string
 	token     string
@@ -232,7 +210,7 @@ func TestRuleAlertmanagerHTTPClient(t *testing.T) {
 	r := rule(a.New(), a.New(), rulesDir, amCfg, []address{qAddr}, nil)
 	q := querier(qAddr, a.New(), []address{r.GRPC}, nil)
 
-	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
+	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
 	exit, err := e2eSpinup(t, ctx, q, r)
 	if err != nil {
 		t.Errorf("spinup failed: %v", err)
@@ -306,7 +284,7 @@ func TestRuleAlertmanagerFileSD(t *testing.T) {
 		<-exit
 	}()
 
-	// Wait for a couple of evaluations.
+	// Wait for a couple of evaluations and make sure that Alertmanager didn't receive anything.
 	testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() (err error) {
 		select {
 		case <-exit:
@@ -340,8 +318,21 @@ func TestRuleAlertmanagerFileSD(t *testing.T) {
 		return nil
 	}))
 
-	// Update the Alertmanager file service discovery configuration.
-	writeAlertmanagerFileSD(t, filepath.Join(amDir, "targets.yaml"), am.HTTP.HostPort())
+	// Add the Alertmanager address to the file SD directory.
+	fileSDPath := filepath.Join(amDir, "targets.yaml")
+	b, err := yaml.Marshal([]*targetgroup.Group{
+		&targetgroup.Group{
+			Targets: []model.LabelSet{
+				model.LabelSet{
+					model.LabelName(model.AddressLabel): model.LabelValue(am.HTTP.HostPort()),
+				},
+			},
+		},
+	})
+	testutil.Ok(t, err)
+
+	testutil.Ok(t, ioutil.WriteFile(fileSDPath+".tmp", b, 0660))
+	testutil.Ok(t, os.Rename(fileSDPath+".tmp", fileSDPath))
 
 	// Verify that alerts are received by Alertmanager.
 	testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() (err error) {