Skip to content

Commit

Permalink
notify: notify resolved alerts properly
Browse files Browse the repository at this point in the history
The PR prometheus#1205 while fixing an existing issue introduced another bug when
the send_resolved flag of the integration is set to true.

With send_resolved set to false, the semantics remain the same:
AlertManager generates a notification when new firing alerts are added
to the alert group. The notification only carries firing alerts.

With send_resolved set to true, AlertManager generates a notification
when new firing or resolved alerts are added to the alert group. The
notification carries both the firing and resolved notifications.

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
  • Loading branch information
simonpasquier committed Jun 7, 2018
1 parent db4af95 commit 3761608
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 79 deletions.
15 changes: 15 additions & 0 deletions nflog/nflogpb/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,21 @@ func (m *Entry) IsFiringSubset(subset map[uint64]struct{}) bool {
set[m.FiringAlerts[i]] = struct{}{}
}

return isSubset(set, subset)
}

// IsResolvedSubset returns whether the given subset is a subset of the alerts
// that were resolved at the time of the last notification.
func (m *Entry) IsResolvedSubset(subset map[uint64]struct{}) bool {
set := map[uint64]struct{}{}
for i := range m.ResolvedAlerts {
set[m.ResolvedAlerts[i]] = struct{}{}
}

return isSubset(set, subset)
}

func isSubset(set, subset map[uint64]struct{}) bool {
for k := range subset {
_, exists := set[k]
if !exists {
Expand Down
28 changes: 28 additions & 0 deletions nflog/nflogpb/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,34 @@ func TestIsFiringSubset(t *testing.T) {
}
}

func TestIsResolvedSubset(t *testing.T) {
e := &Entry{
ResolvedAlerts: []uint64{1, 2, 3},
}

tests := []struct {
subset map[uint64]struct{}
expected bool
}{
{newSubset(), true}, //empty subset
{newSubset(1), true},
{newSubset(2), true},
{newSubset(3), true},
{newSubset(1, 2), true},
{newSubset(1, 2), true},
{newSubset(1, 2, 3), true},
{newSubset(4), false},
{newSubset(1, 5), false},
{newSubset(1, 2, 3, 6), false},
}

for _, test := range tests {
if result := e.IsResolvedSubset(test.subset); result != test.expected {
t.Errorf("Expected %t, got %t for subset %v", test.expected, result, elements(test.subset))
}
}
}

func newSubset(elements ...uint64) map[uint64]struct{} {
subset := make(map[uint64]struct{})
for _, el := range elements {
Expand Down
23 changes: 1 addition & 22 deletions notify/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ import (
"github.com/prometheus/alertmanager/types"
)

type notifierConfig interface {
SendResolved() bool
}

// A Notifier notifies about alerts under constraints of the given context.
// It returns an error if unsuccessful and a flag whether the error is
// recoverable. This information is useful for a retry logic.
Expand All @@ -67,24 +63,7 @@ type Integration struct {

// Notify implements the Notifier interface.
func (i *Integration) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) {
var res []*types.Alert

// Resolved alerts have to be filtered only at this point, because they need
// to end up unfiltered in the SetNotifiesStage.
if i.conf.SendResolved() {
res = alerts
} else {
for _, a := range alerts {
if a.Status() != model.AlertResolved {
res = append(res, a)
}
}
}
if len(res) == 0 {
return false, nil
}

return i.notifier.Notify(ctx, res...)
return i.notifier.Notify(ctx, alerts...)
}

// BuildReceiverIntegrations builds a list of integration notifiers off of a
Expand Down
55 changes: 37 additions & 18 deletions notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ func init() {
prometheus.Register(notificationLatencySeconds)
}

type notifierConfig interface {
SendResolved() bool
}

// MinTimeout is the minimum timeout that is set for the context of a call
// to a notification pipeline.
const MinTimeout = 10 * time.Second
Expand Down Expand Up @@ -262,7 +266,7 @@ func createStage(rc *config.Receiver, tmpl *template.Template, wait func() time.
}
var s MultiStage
s = append(s, NewWaitStage(wait))
s = append(s, NewDedupStage(notificationLog, recv))
s = append(s, NewDedupStage(i, notificationLog, recv))
s = append(s, NewRetryStage(i, rc.Name))
s = append(s, NewSetNotifiesStage(notificationLog, recv))

Expand Down Expand Up @@ -452,16 +456,18 @@ func (ws *WaitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Al
type DedupStage struct {
nflog NotificationLog
recv *nflogpb.Receiver
conf notifierConfig

now func() time.Time
hash func(*types.Alert) uint64
}

// NewDedupStage wraps a DedupStage that runs against the given notification log.
func NewDedupStage(l NotificationLog, recv *nflogpb.Receiver) *DedupStage {
func NewDedupStage(i Integration, l NotificationLog, recv *nflogpb.Receiver) *DedupStage {
return &DedupStage{
nflog: l,
recv: recv,
conf: i.conf,
now: utcNow,
hash: hashAlert,
}
Expand Down Expand Up @@ -511,28 +517,34 @@ func hashAlert(a *types.Alert) uint64 {
return hash
}

func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) (bool, error) {
func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool {
// If we haven't notified about the alert group before, notify right away
// unless we only have resolved alerts.
if entry == nil {
return len(firing) > 0, nil
return len(firing) > 0
}

if !entry.IsFiringSubset(firing) {
return true, nil
return true
}

// Notify about all alerts being resolved. If the current alert group and
// last notification contain no firing alert, it means that some alerts
// have been fired and resolved during the last group_wait interval. In
// this case, there is no need to notify the receiver since it doesn't know
// about them.
// Notify about all alerts being resolved.
// This is done irrespective of the send_resolved flag to make sure that
// the firing alerts are cleared from the notification log.
if len(firing) == 0 {
return len(entry.FiringAlerts) > 0, nil
// If the current alert group and last notification contain no firing
// alert, it means that some alerts have been fired and resolved during the
// last interval. In this case, there is no need to notify the receiver
// since it doesn't know about them.
return len(entry.FiringAlerts) > 0
}

if n.conf.SendResolved() && !entry.IsResolvedSubset(resolved) {
return true
}

// Nothing changed, only notify if the repeat interval has passed.
return entry.Timestamp.Before(n.now().Add(-repeat)), nil
return entry.Timestamp.Before(n.now().Add(-repeat))
}

// Exec implements the Stage interface.
Expand Down Expand Up @@ -580,9 +592,7 @@ func (n *DedupStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Al
case 2:
return ctx, nil, fmt.Errorf("unexpected entry result size %d", len(entries))
}
if ok, err := n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval); err != nil {
return ctx, nil, err
} else if ok {
if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {
return ctx, alerts, nil
}
return ctx, nil, nil
Expand All @@ -605,17 +615,26 @@ func NewRetryStage(i Integration, groupName string) *RetryStage {

// Exec implements the Stage interface.
func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
var sent []*types.Alert

// If we shouldn't send notifications for resolved alerts, but there are only
// resolved alerts, report them all as successfully notified (we still want the
// notification log to log them).
// notification log to log them for the next run of DedupStage).
if !r.integration.conf.SendResolved() {
firing, ok := FiringAlerts(ctx)
if !ok {
return ctx, alerts, fmt.Errorf("firing alerts missing")
return ctx, nil, fmt.Errorf("firing alerts missing")
}
if len(firing) == 0 {
return ctx, alerts, nil
}
for _, a := range alerts {
if a.Status() != model.AlertResolved {
sent = append(sent, a)
}
}
} else {
sent = alerts
}

var (
Expand All @@ -642,7 +661,7 @@ func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Ale
select {
case <-tick.C:
now := time.Now()
retry, err := r.integration.Notify(ctx, alerts...)
retry, err := r.integration.Notify(ctx, sent...)
notificationLatencySeconds.WithLabelValues(r.integration.name).Observe(time.Since(now).Seconds())
if err != nil {
numFailedNotifications.WithLabelValues(r.integration.name).Inc()
Expand Down
Loading

0 comments on commit 3761608

Please sign in to comment.