Skip to content

Commit

Permalink
Tweak metric reporter so all errors are logged
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Dec 18, 2024
1 parent 4699fa6 commit 59dac3c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 20 deletions.
39 changes: 23 additions & 16 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,17 @@ func (b *backend) Start() error {
func (b *backend) startMetricsReporter(interval time.Duration) {
b.waitGroup.Add(1)

report := func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
count, err := b.reportMetrics(ctx)
cancel()
if err != nil {
slog.Error("error reporting metrics", "error", err)
} else {
slog.Info("sent metrics to cloudwatch", "count", count)
}
}

go func() {
defer func() {
slog.Info("metrics reporter exiting")
Expand All @@ -255,10 +266,10 @@ func (b *backend) startMetricsReporter(interval time.Duration) {
for {
select {
case <-b.stopChan:
b.reportMetrics()
report()
return
case <-time.After(interval):
b.reportMetrics()
report()
}
}
}()
Expand Down Expand Up @@ -764,19 +775,19 @@ func (b *backend) Health() string {
return health.String()
}

func (b *backend) reportMetrics() error {
rc := b.rp.Get()
defer rc.Close()

func (b *backend) reportMetrics(ctx context.Context) (int, error) {
metrics := b.stats.Extract().ToMetrics()

// get queue sizes
rc := b.rp.Get()
defer rc.Close()
active, err := redis.Strings(rc.Do("ZRANGE", fmt.Sprintf("%s:active", msgQueueName), "0", "-1"))
if err != nil {
return fmt.Errorf("error getting active queues: %w", err)
return 0, fmt.Errorf("error getting active queues: %w", err)
}
throttled, err := redis.Strings(rc.Do("ZRANGE", fmt.Sprintf("%s:throttled", msgQueueName), "0", "-1"))
if err != nil {
return fmt.Errorf("error getting throttled queues: %w", err)
return 0, fmt.Errorf("error getting throttled queues: %w", err)
}
queues := append(active, throttled...)

Expand All @@ -786,14 +797,14 @@ func (b *backend) reportMetrics() error {
q := fmt.Sprintf("%s/1", queue)
count, err := redis.Int(rc.Do("ZCARD", q))
if err != nil {
return fmt.Errorf("error getting size of priority queue: %s: %w", q, err)
return 0, fmt.Errorf("error getting size of priority queue: %s: %w", q, err)
}
prioritySize += count

q = fmt.Sprintf("%s/0", queue)
count, err = redis.Int(rc.Do("ZCARD", q))
if err != nil {
return fmt.Errorf("error getting size of bulk queue: %s: %w", q, err)
return 0, fmt.Errorf("error getting size of bulk queue: %s: %w", q, err)
}
bulkSize += count
}
Expand All @@ -816,15 +827,11 @@ func (b *backend) reportMetrics() error {
cwatch.Datum("QueuedMsgs", float64(prioritySize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "priority")),
)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
if err := b.cw.Send(ctx, metrics...); err != nil {
slog.Error("error sending metrics", "error", err)
} else {
slog.Info("sent metrics to cloudwatch", "metrics", len(metrics))
return 0, fmt.Errorf("error sending metrics: %w", err)
}
cancel()

return nil
return len(metrics), nil
}

// Status returns information on our queue sizes, number of workers etc..
Expand Down
8 changes: 4 additions & 4 deletions backends/rapidpro/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ func (s *Stats) ToMetrics() []types.MetricDatum {
metrics = append(metrics, s.IncomingIgnored.metrics("IncomingIgnored")...)

for typ, d := range s.IncomingDuration { // convert to averages
avgTime := float64(d) / float64(s.IncomingRequests[typ])
metrics = append(metrics, cwatch.Datum("IncomingDuration", avgTime/float64(time.Second), types.StandardUnitSeconds, cwatch.Dimension("ChannelType", string(typ))))
avgTime := d / time.Duration(s.IncomingRequests[typ])
metrics = append(metrics, cwatch.Datum("IncomingDuration", float64(avgTime/time.Second), types.StandardUnitSeconds, cwatch.Dimension("ChannelType", string(typ))))
}

metrics = append(metrics, s.OutgoingSends.metrics("OutgoingSends")...)
metrics = append(metrics, s.OutgoingErrors.metrics("OutgoingErrors")...)

for typ, d := range s.OutgoingDuration { // convert to averages
avgTime := float64(d) / float64(s.OutgoingSends[typ]+s.OutgoingErrors[typ])
metrics = append(metrics, cwatch.Datum("OutgoingDuration", avgTime/float64(time.Second), types.StandardUnitSeconds, cwatch.Dimension("ChannelType", string(typ))))
avgTime := d / time.Duration(s.OutgoingSends[typ]+s.OutgoingErrors[typ])
metrics = append(metrics, cwatch.Datum("OutgoingDuration", float64(avgTime/time.Second), types.StandardUnitSeconds, cwatch.Dimension("ChannelType", string(typ))))
}

metrics = append(metrics, cwatch.Datum("ContactsCreated", float64(s.ContactsCreated), types.StandardUnitCount))
Expand Down

0 comments on commit 59dac3c

Please sign in to comment.