Skip to content

Commit

Permalink
packaging alertmanager sets and add a tests.
Browse files Browse the repository at this point in the history
Signed-off-by: johncming <johncming@yahoo.com>
  • Loading branch information
johncming committed Dec 14, 2019
1 parent a0e1771 commit 1269404
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 89 deletions.
93 changes: 4 additions & 89 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math/rand"
"net"
"net/http"
"net/url"
"os"
Expand All @@ -13,7 +12,6 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"

Expand All @@ -33,6 +31,7 @@ import (
"github.com/prometheus/prometheus/storage/tsdb"
"github.com/prometheus/prometheus/util/strutil"
"github.com/thanos-io/thanos/pkg/alert"
"github.com/thanos-io/thanos/pkg/alertmanager"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/discovery/cache"
Expand Down Expand Up @@ -288,7 +287,7 @@ func runRule(

// Run rule evaluation and alert notifications.
var (
alertmgrs = newAlertmanagerSet(logger, alertmgrURLs, dns.ResolverType(dnsSDResolver))
alertmgrs = alertmanager.NewAlertmanagerSet(logger, alertmgrURLs, dns.ResolverType(dnsSDResolver))
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
ruleMgr = thanosrule.NewManager(dataDir)
)
Expand Down Expand Up @@ -353,7 +352,7 @@ func runRule(
}
{
// TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660.
sdr := alert.NewSender(logger, reg, alertmgrs.get, nil, alertmgrsTimeout)
sdr := alert.NewSender(logger, reg, alertmgrs.Get, nil, alertmgrsTimeout)
ctx, cancel := context.WithCancel(context.Background())

g.Add(func() error {
Expand All @@ -375,7 +374,7 @@ func runRule(

g.Add(func() error {
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if err := alertmgrs.update(ctx); err != nil {
if err := alertmgrs.Update(ctx); err != nil {
level.Error(logger).Log("msg", "refreshing alertmanagers failed", "err", err)
alertMngrAddrResolutionErrors.Inc()
}
Expand Down Expand Up @@ -615,90 +614,6 @@ func runRule(
return nil
}

type alertmanagerSet struct {
resolver dns.Resolver
addrs []string
mtx sync.Mutex
current []*url.URL
}

func newAlertmanagerSet(logger log.Logger, addrs []string, dnsSDResolver dns.ResolverType) *alertmanagerSet {
return &alertmanagerSet{
resolver: dns.NewResolver(dnsSDResolver.ToResolver(logger)),
addrs: addrs,
}
}

func (s *alertmanagerSet) get() []*url.URL {
s.mtx.Lock()
defer s.mtx.Unlock()
return s.current
}

const defaultAlertmanagerPort = 9093

func parseAlertmanagerAddress(addr string) (qType dns.QType, parsedUrl *url.URL, err error) {
qType = ""
parsedUrl, err = url.Parse(addr)
if err != nil {
return qType, nil, err
}
// The Scheme might contain DNS resolver type separated by + so we split it a part.
if schemeParts := strings.Split(parsedUrl.Scheme, "+"); len(schemeParts) > 1 {
parsedUrl.Scheme = schemeParts[len(schemeParts)-1]
qType = dns.QType(strings.Join(schemeParts[:len(schemeParts)-1], "+"))
}
return qType, parsedUrl, err
}

func (s *alertmanagerSet) update(ctx context.Context) error {
var result []*url.URL
for _, addr := range s.addrs {
var (
qtype dns.QType
resolvedDomain []string
)

qtype, u, err := parseAlertmanagerAddress(addr)
if err != nil {
return errors.Wrapf(err, "parse URL %q", addr)
}

// Get only the host and resolve it if needed.
host := u.Host
if qtype != "" {
if qtype == dns.A {
_, _, err = net.SplitHostPort(host)
if err != nil {
// The host could be missing a port. Append the defaultAlertmanagerPort.
host = host + ":" + strconv.Itoa(defaultAlertmanagerPort)
}
}
resolvedDomain, err = s.resolver.Resolve(ctx, host, qtype)
if err != nil {
return errors.Wrap(err, "alertmanager resolve")
}
} else {
resolvedDomain = []string{host}
}

for _, resolved := range resolvedDomain {
result = append(result, &url.URL{
Scheme: u.Scheme,
Host: resolved,
Path: u.Path,
User: u.User,
})
}
}

s.mtx.Lock()
s.current = result
s.mtx.Unlock()

return nil
}

func parseFlagLabels(s []string) (labels.Labels, error) {
var lset labels.Labels
for _, l := range s {
Expand Down
127 changes: 127 additions & 0 deletions pkg/alertmanager/alertmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package alertmanager

import (
"context"
"net"
"net/url"
"strconv"
"strings"
"sync"

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/discovery/dns"
)

const (
defaultAlertmanagerPort = 9093
)

// Alertmanager replica URLs to push firing alerts. Ruler claims success if
// push to at least one alertmanager from discovered succeeds. The scheme
//should not be empty e.g `http` might be used. The scheme may be prefixed
//with 'dns+' or 'dnssrv+' to detect Alertmanager IPs through respective
//DNS lookups. The port defaults to 9093 or the SRV record's value.
//The URL path is used as a prefix for the regular Alertmanager API path.
type AlertManager interface {
// Gets the address of the configured alertmanager
Get() []*url.URL

// Update and parse the raw url
Update(ctx context.Context) error
}

type alertmanagerSet struct {
resolver dns.Resolver
addrs []string
mtx sync.Mutex
current []*url.URL
}

func NewAlertmanagerSet(logger log.Logger, addrs []string, dnsSDResolver dns.ResolverType) *alertmanagerSet {
return &alertmanagerSet{
resolver: dns.NewResolver(dnsSDResolver.ToResolver(logger)),
addrs: addrs,
}
}

// Gets the address of the configured alertmanager
func (s *alertmanagerSet) Get() []*url.URL {
s.mtx.Lock()
defer s.mtx.Unlock()

return s.current
}

// Update and parse the raw url
func (s *alertmanagerSet) Update(ctx context.Context) error {
var result []*url.URL
for _, addr := range s.addrs {
var (
qtype dns.QType
resolvedHosts []string
)

qtype, u, err := parseAlertmanagerAddress(addr)
if err != nil {
return errors.Wrapf(err, "parse URL %q", addr)
}

// Get only the host and resolve it if needed.
host := u.Host
if qtype != "" {
if qtype == dns.A {
_, _, err = net.SplitHostPort(host)
if err != nil {
// The host could be missing a port. Append the defaultAlertmanagerPort.
host = host + ":" + strconv.Itoa(defaultAlertmanagerPort)
}
}
resolvedHosts, err = s.resolver.Resolve(ctx, host, qtype)
if err != nil {
return errors.Wrap(err, "alertmanager resolve")
}
} else {
resolvedHosts = []string{host}
}

for _, host := range resolvedHosts {
result = append(result, &url.URL{
Scheme: u.Scheme,
Host: host,
Path: u.Path,
User: u.User,
})
}
}

s.mtx.Lock()
s.current = result
s.mtx.Unlock()

return nil
}

func parseAlertmanagerAddress(addr string) (qType dns.QType, parsedUrl *url.URL, err error) {
qType = ""
parsedUrl, err = url.Parse(addr)
if err != nil {
return qType, nil, err
}

// The Scheme might contain DNS resolver type separated by + so we split it a part.
if schemeParts := strings.Split(parsedUrl.Scheme, "+"); len(schemeParts) > 1 {
parsedUrl.Scheme = schemeParts[len(schemeParts)-1]
qType = dns.QType(strings.Join(schemeParts[:len(schemeParts)-1], "+"))
}

switch parsedUrl.Scheme {
case "http", "https":
case "":
return "", nil, errors.New("The scheme should not be empty, e.g `http` or `https`")
default:
return "", nil, errors.New("Scheme should be `http` or `https`")
}

return qType, parsedUrl, err
}
45 changes: 45 additions & 0 deletions pkg/alertmanager/alertmanager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package alertmanager

import (
"net/url"
"reflect"
"testing"

"github.com/thanos-io/thanos/pkg/discovery/dns"
)

func TestParseAlertmanagerAddress(t *testing.T) {
type expected struct {
hasErr bool
qtype dns.QType
url *url.URL
}
tests := []struct {
addr string
expected expected
}{
// no schema or no support schema
{"alertmanager", expected{hasErr: true}},
{"alertmanager:9093", expected{hasErr: true}},
{"tcp://alertmanager:9093", expected{hasErr: true}},

// correct cases
{"http://alertmanager:9093", expected{hasErr: false, qtype: dns.QType(""), url: &url.URL{Scheme: "http", Host: "alertmanager:9093"}}},
{"dns+http://alertmanager:9093", expected{hasErr: false, qtype: dns.QType("dns"), url: &url.URL{Scheme: "http", Host: "alertmanager:9093"}}},
{"dnssrv+http://alertmanager:9093", expected{hasErr: false, qtype: dns.QType("dnssrv"), url: &url.URL{Scheme: "http", Host: "alertmanager:9093"}}},
{"dnssrvnoa+http://alertmanager:9093", expected{hasErr: false, qtype: dns.QType("dnssrvnoa"), url: &url.URL{Scheme: "http", Host: "alertmanager:9093"}}},
}
for _, tt := range tests {
gotQType, gotParsedUrl, err := parseAlertmanagerAddress(tt.addr)
if (err != nil) != tt.expected.hasErr {
t.Errorf(" got error %v, expected error %v", err != nil, tt.expected.hasErr)
return
}
if gotQType != tt.expected.qtype {
t.Errorf("got %v, expected %v", gotQType, tt.expected.qtype)
}
if !reflect.DeepEqual(gotParsedUrl, tt.expected.url) {
t.Errorf("got %v, expected %v", gotParsedUrl, tt.expected.url)
}
}
}

0 comments on commit 1269404

Please sign in to comment.