From 59dac3c031d1935ec0795a443620f70620ca5297 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Wed, 18 Dec 2024 10:42:15 -0500 Subject: [PATCH] Tweak metric reporter so all errors are logged --- backends/rapidpro/backend.go | 39 +++++++++++++++++++++--------------- backends/rapidpro/stats.go | 8 ++++---- 2 files changed, 27 insertions(+), 20 deletions(-) diff --git a/backends/rapidpro/backend.go b/backends/rapidpro/backend.go index a02e7ce7..3864a1bb 100644 --- a/backends/rapidpro/backend.go +++ b/backends/rapidpro/backend.go @@ -246,6 +246,17 @@ func (b *backend) Start() error { func (b *backend) startMetricsReporter(interval time.Duration) { b.waitGroup.Add(1) + report := func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + count, err := b.reportMetrics(ctx) + cancel() + if err != nil { + slog.Error("error reporting metrics", "error", err) + } else { + slog.Info("sent metrics to cloudwatch", "count", count) + } + } + go func() { defer func() { slog.Info("metrics reporter exiting") @@ -255,10 +266,10 @@ func (b *backend) startMetricsReporter(interval time.Duration) { for { select { case <-b.stopChan: - b.reportMetrics() + report() return case <-time.After(interval): - b.reportMetrics() + report() } } }() @@ -764,19 +775,19 @@ func (b *backend) Health() string { return health.String() } -func (b *backend) reportMetrics() error { - rc := b.rp.Get() - defer rc.Close() - +func (b *backend) reportMetrics(ctx context.Context) (int, error) { metrics := b.stats.Extract().ToMetrics() + // get queue sizes + rc := b.rp.Get() + defer rc.Close() active, err := redis.Strings(rc.Do("ZRANGE", fmt.Sprintf("%s:active", msgQueueName), "0", "-1")) if err != nil { - return fmt.Errorf("error getting active queues: %w", err) + return 0, fmt.Errorf("error getting active queues: %w", err) } throttled, err := redis.Strings(rc.Do("ZRANGE", fmt.Sprintf("%s:throttled", msgQueueName), "0", "-1")) if err != nil { - return fmt.Errorf("error getting throttled queues: %w", err) + return 0, fmt.Errorf("error getting throttled queues: %w", err) } queues := append(active, throttled...) @@ -786,14 +797,14 @@ func (b *backend) reportMetrics() error { q := fmt.Sprintf("%s/1", queue) count, err := redis.Int(rc.Do("ZCARD", q)) if err != nil { - return fmt.Errorf("error getting size of priority queue: %s: %w", q, err) + return 0, fmt.Errorf("error getting size of priority queue: %s: %w", q, err) } prioritySize += count q = fmt.Sprintf("%s/0", queue) count, err = redis.Int(rc.Do("ZCARD", q)) if err != nil { - return fmt.Errorf("error getting size of bulk queue: %s: %w", q, err) + return 0, fmt.Errorf("error getting size of bulk queue: %s: %w", q, err) } bulkSize += count } @@ -816,15 +827,11 @@ func (b *backend) reportMetrics() error { cwatch.Datum("QueuedMsgs", float64(prioritySize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "priority")), ) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) if err := b.cw.Send(ctx, metrics...); err != nil { - slog.Error("error sending metrics", "error", err) - } else { - slog.Info("sent metrics to cloudwatch", "metrics", len(metrics)) + return 0, fmt.Errorf("error sending metrics: %w", err) } - cancel() - return nil + return len(metrics), nil } // Status returns information on our queue sizes, number of workers etc.. diff --git a/backends/rapidpro/stats.go b/backends/rapidpro/stats.go index cd026d02..2cde3330 100644 --- a/backends/rapidpro/stats.go +++ b/backends/rapidpro/stats.go @@ -63,16 +63,16 @@ func (s *Stats) ToMetrics() []types.MetricDatum { metrics = append(metrics, s.IncomingIgnored.metrics("IncomingIgnored")...) for typ, d := range s.IncomingDuration { // convert to averages - avgTime := float64(d) / float64(s.IncomingRequests[typ]) - metrics = append(metrics, cwatch.Datum("IncomingDuration", avgTime/float64(time.Second), types.StandardUnitSeconds, cwatch.Dimension("ChannelType", string(typ)))) + avgTime := d / time.Duration(s.IncomingRequests[typ]) + metrics = append(metrics, cwatch.Datum("IncomingDuration", float64(avgTime/time.Second), types.StandardUnitSeconds, cwatch.Dimension("ChannelType", string(typ)))) } metrics = append(metrics, s.OutgoingSends.metrics("OutgoingSends")...) metrics = append(metrics, s.OutgoingErrors.metrics("OutgoingErrors")...) for typ, d := range s.OutgoingDuration { // convert to averages - avgTime := float64(d) / float64(s.OutgoingSends[typ]+s.OutgoingErrors[typ]) - metrics = append(metrics, cwatch.Datum("OutgoingDuration", avgTime/float64(time.Second), types.StandardUnitSeconds, cwatch.Dimension("ChannelType", string(typ)))) + avgTime := d / time.Duration(s.OutgoingSends[typ]+s.OutgoingErrors[typ]) + metrics = append(metrics, cwatch.Datum("OutgoingDuration", float64(avgTime/time.Second), types.StandardUnitSeconds, cwatch.Dimension("ChannelType", string(typ)))) } metrics = append(metrics, cwatch.Datum("ContactsCreated", float64(s.ContactsCreated), types.StandardUnitCount))