Skip to content

Commit

Permalink
alerting: revert and remove warning alerters
Browse files Browse the repository at this point in the history
  • Loading branch information
darwinz committed Jun 15, 2022
1 parent 32a91ce commit 0130dd3
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 78 deletions.
9 changes: 0 additions & 9 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,15 +264,6 @@ ACHGateway:
ChannelID: <string>
Mock:
Enabled: <boolean>
Warnings:
PagerDuty:
ApiKey: <string>
RoutingKey: <string>
Slack:
AccessToken: <string>
ChannelID: <string>
Mock:
Enabled: <boolean>
```
---
Expand Down
40 changes: 17 additions & 23 deletions internal/incoming/odfi/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ type PeriodicScheduler struct {
downloader Downloader
processors Processors

errorAlerters []alerting.Alerter
warningAlerters []alerting.Alerter
alerters []alerting.Alerter
}

func NewPeriodicScheduler(logger log.Logger, cfg *service.Config, consul *consul.Client, processors Processors) (Scheduler, error) {
Expand All @@ -66,31 +65,26 @@ func NewPeriodicScheduler(logger log.Logger, cfg *service.Config, consul *consul
return nil, err
}

errorAlerters, err := alerting.NewAlerters(cfg.Errors)
alerters, err := alerting.NewAlerters(cfg.Errors)
if err != nil {
return nil, fmt.Errorf("ERROR creating error alerters: %v", err)
}
warningAlerters, err := alerting.NewAlerters(cfg.Warnings)
if err != nil {
return nil, fmt.Errorf("ERROR creating warning alerters: %v", err)
return nil, fmt.Errorf("ERROR creating alerters: %v", err)
}

ctx, cancelFunc := context.WithCancel(context.Background())

return &PeriodicScheduler{
logger: logger,
odfi: cfg.Inbound.ODFI,
sharding: cfg.Sharding,
uploadAgents: cfg.Upload,
ticker: time.NewTicker(cfg.Inbound.ODFI.Interval),
inboundTrigger: make(chan manuallyTriggeredInbound, 1),
consul: consul,
downloader: dl,
processors: processors,
shutdown: ctx,
shutdownFunc: cancelFunc,
errorAlerters: errorAlerters,
warningAlerters: warningAlerters,
logger: logger,
odfi: cfg.Inbound.ODFI,
sharding: cfg.Sharding,
uploadAgents: cfg.Upload,
ticker: time.NewTicker(cfg.Inbound.ODFI.Interval),
inboundTrigger: make(chan manuallyTriggeredInbound, 1),
consul: consul,
downloader: dl,
processors: processors,
shutdown: ctx,
shutdownFunc: cancelFunc,
alerters: alerters,
}, nil
}

Expand Down Expand Up @@ -194,14 +188,14 @@ func (s *PeriodicScheduler) tick(shard *service.Shard) error {
}

func (s *PeriodicScheduler) alertOnError(err error) {
if s == nil || len(s.errorAlerters) == 0 {
if s == nil || len(s.alerters) == 0 {
return
}
if err == nil {
return
}

for _, alerter := range s.errorAlerters {
for _, alerter := range s.alerters {
if err := alerter.AlertError(err); err != nil {
s.logger.LogErrorf("ERROR sending alert: %v", err)
}
Expand Down
43 changes: 8 additions & 35 deletions internal/pipeline/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ type aggregator struct {
auditStorage audittrail.Storage
preuploadTransformers []transform.PreUpload
outputFormatter output.Formatter
errorAlerters []alerting.Alerter
warningAlerters []alerting.Alerter
alerters []alerting.Alerter
}

func newAggregator(
Expand All @@ -67,7 +66,6 @@ func newAggregator(
shard service.Shard,
uploadAgents service.UploadAgents,
errorAlerting service.ErrorAlerting,
warningAlerting service.ErrorAlerting,
) (*aggregator, error) {
merger, err := NewMerging(logger, consul, shard, uploadAgents)
if err != nil {
Expand Down Expand Up @@ -103,13 +101,9 @@ func newAggregator(
return nil, fmt.Errorf("error creating cutoffs: %v", err)
}

errorAlerters, err := alerting.NewAlerters(errorAlerting)
alerters, err := alerting.NewAlerters(errorAlerting)
if err != nil {
return nil, fmt.Errorf("error setting up error alerters: %v", err)
}
warningAlerters, err := alerting.NewAlerters(errorAlerting)
if err != nil {
return nil, fmt.Errorf("error setting up warning alerters: %v", err)
return nil, fmt.Errorf("error setting up alerters: %v", err)
}

return &aggregator{
Expand All @@ -123,8 +117,7 @@ func newAggregator(
auditStorage: auditStorage,
preuploadTransformers: preuploadTransformers,
outputFormatter: outputFormatter,
errorAlerters: errorAlerters,
warningAlerters: warningAlerters,
alerters: alerters,
}, nil
}

Expand Down Expand Up @@ -335,23 +328,18 @@ func (xfagg *aggregator) notifyAfterUpload(filename string, file *ach.File, agen
}

func (xfagg *aggregator) notifyAboutHoliday(day *schedule.Day) {
var msg string
logger := xfagg.logger.With(log.Fields{
"shard": log.String(xfagg.shard.Name),
})

if !day.FirstWindow {
msg = "skipping holiday notification"
xfagg.alertOnWarning(errors.New(msg))
logger.Info().Logf(msg)
logger.Info().Log("skipping holiday notification")
return
}

uploadAgent := xfagg.uploadAgents.Find(xfagg.shard.UploadAgent)
if uploadAgent == nil {
msg = fmt.Sprintf("skipping holiday log for %v", day.Time.Format("2006-01-02"))
xfagg.alertOnWarning(errors.New(msg))
logger.Warn().Logf(msg)
logger.Warn().Logf("skipping holiday log for %v", day.Time.Format("2006-01-02"))
return
}

Expand All @@ -374,31 +362,16 @@ func (xfagg *aggregator) notifyAboutHoliday(day *schedule.Day) {
}

func (xfagg *aggregator) alertOnError(err error) {
if xfagg == nil || len(xfagg.errorAlerters) == 0 {
if xfagg == nil || len(xfagg.alerters) == 0 {
return
}
if err == nil {
return
}

for _, alerter := range xfagg.errorAlerters {
for _, alerter := range xfagg.alerters {
if err := alerter.AlertError(err); err != nil {
xfagg.logger.LogErrorf("ERROR sending alert: %v", err)
}
}
}

func (xfagg *aggregator) alertOnWarning(err error) {
if xfagg == nil || len(xfagg.warningAlerters) == 0 {
return
}
if err == nil {
return
}

for _, alerter := range xfagg.warningAlerters {
if err := alerter.AlertError(err); err != nil {
xfagg.logger.LogErrorf("ERROR sending warning alert: %v", err)
}
}
}
9 changes: 3 additions & 6 deletions internal/pipeline/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ func TestAggregateACHFile(t *testing.T) {
DefaultAgentID: "ftp-live",
}
var errorAlerting service.ErrorAlerting
var warningAlerting service.ErrorAlerting

xfagg, err := newAggregator(log.NewNopLogger(), nil, &events.MockEmitter{}, shard, uploadAgents, errorAlerting, warningAlerting)
xfagg, err := newAggregator(log.NewNopLogger(), nil, &events.MockEmitter{}, shard, uploadAgents, errorAlerting)
require.NoError(t, err)

merge := &MockXferMerging{}
Expand Down Expand Up @@ -104,9 +103,8 @@ func TestAggregate_notifyAfterUpload(t *testing.T) {
DefaultAgentID: "mock-agent",
}
var errorAlerting service.ErrorAlerting
var warningAlerting service.ErrorAlerting

xfagg, err := newAggregator(log.NewNopLogger(), nil, &events.MockEmitter{}, shard, uploadAgents, errorAlerting, warningAlerting)
xfagg, err := newAggregator(log.NewNopLogger(), nil, &events.MockEmitter{}, shard, uploadAgents, errorAlerting)
require.NoError(t, err)

require.NotPanics(t, func() {
Expand Down Expand Up @@ -139,9 +137,8 @@ func TestAggregate_notifyAfterUploadErr(t *testing.T) {
DefaultAgentID: "mock-agent",
}
var errorAlerting service.ErrorAlerting
var warningAlerting service.ErrorAlerting

xfagg, err := newAggregator(log.NewNopLogger(), nil, &events.MockEmitter{}, shard, uploadAgents, errorAlerting, warningAlerting)
xfagg, err := newAggregator(log.NewNopLogger(), nil, &events.MockEmitter{}, shard, uploadAgents, errorAlerting)
require.NoError(t, err)

require.NotPanics(t, func() {
Expand Down
2 changes: 1 addition & 1 deletion internal/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func Start(
// register each shard's aggregator
shardAggregators := make(map[string]*aggregator)
for i := range cfg.Sharding.Shards {
xfagg, err := newAggregator(logger, consul, eventEmitter, cfg.Sharding.Shards[i], cfg.Upload, cfg.Errors, cfg.Warnings)
xfagg, err := newAggregator(logger, consul, eventEmitter, cfg.Sharding.Shards[i], cfg.Upload, cfg.Errors)
if err != nil {
return nil, fmt.Errorf("problem starting shard=%s: %v", cfg.Sharding.Shards[i].Name, err)
}
Expand Down
4 changes: 0 additions & 4 deletions internal/service/model_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type Config struct {
Events *EventsConfig
Sharding Sharding
Upload UploadAgents
Warnings ErrorAlerting
Errors ErrorAlerting
}

Expand All @@ -61,9 +60,6 @@ func (cfg *Config) Validate() error {
if err := cfg.Upload.Validate(); err != nil {
return fmt.Errorf("upload: %v", err)
}
if err := cfg.Warnings.Validate(); err != nil {
return fmt.Errorf("warnings: %v", err)
}
if err := cfg.Errors.Validate(); err != nil {
return fmt.Errorf("errors: %v", err)
}
Expand Down

0 comments on commit 0130dd3

Please sign in to comment.