Skip to content

Commit

Permalink
Send metrics to cloudwatch
Browse files Browse the repository at this point in the history
  • Loading branch information
norkans7 committed Dec 17, 2024
1 parent 841af35 commit d49541d
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 83 deletions.
4 changes: 4 additions & 0 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"

"github.com/gomodule/redigo/redis"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/urns"
)
Expand Down Expand Up @@ -105,6 +106,9 @@ type Backend interface {

// RedisPool returns the redisPool for this backend
RedisPool() *redis.Pool

// CloudWatch return the CloudWatch service for this backend
CloudWatch() *cwatch.Service
}

// Media is a resolved media object that can be used as a message attachment
Expand Down
36 changes: 34 additions & 2 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/service/s3/types"
cwtypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/queue"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/aws/dynamo"
"github.com/nyaruka/gocommon/aws/s3x"
"github.com/nyaruka/gocommon/cache"
Expand Down Expand Up @@ -70,6 +72,7 @@ type backend struct {
rp *redis.Pool
dynamo *dynamo.Service
s3 *s3x.Service
cw *cwatch.Service

channelsByUUID *cache.Local[courier.ChannelUUID, *Channel]
channelsByAddr *cache.Local[courier.ChannelAddress, *Channel]
Expand Down Expand Up @@ -190,6 +193,12 @@ func (b *backend) Start() error {
return err
}

b.cw, err = cwatch.NewService(b.config.AWSAccessKeyID, b.config.AWSSecretAccessKey, b.config.AWSRegion, b.config.CloudwatchNamespace, b.config.DeploymentID)
if err != nil {
return err
}

Check warning on line 199 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L198-L199

Added lines #L198 - L199 were not covered by tests
b.cw.StartQueue(time.Second * 3)

// check attachment bucket access
if err := b.s3.Test(ctx, b.config.S3AttachmentsBucket); err != nil {
log.Error("attachments bucket not accessible", "error", err)
Expand Down Expand Up @@ -246,6 +255,9 @@ func (b *backend) Stop() error {

// wait for our threads to exit
b.waitGroup.Wait()

// stop cloudwatch service
b.cw.StopQueue()
return nil
}

Expand Down Expand Up @@ -635,7 +647,7 @@ func (b *backend) SaveAttachment(ctx context.Context, ch courier.Channel, conten

path := filepath.Join("attachments", strconv.FormatInt(int64(orgID), 10), filename[:4], filename[4:8], filename)

storageURL, err := b.s3.PutObject(ctx, b.config.S3AttachmentsBucket, path, contentType, data, types.ObjectCannedACLPublicRead)
storageURL, err := b.s3.PutObject(ctx, b.config.S3AttachmentsBucket, path, contentType, data, s3types.ObjectCannedACLPublicRead)
if err != nil {
return "", fmt.Errorf("error saving attachment to storage (bytes=%d): %w", len(data), err)
}
Expand Down Expand Up @@ -775,6 +787,21 @@ func (b *backend) Heartbeat() error {
b.stats.redisWaitDuration = redisStats.WaitDuration
b.stats.redisWaitCount = redisStats.WaitCount

hostDim := cwatch.Dimension("Host", b.config.InstanceID)
appDim := cwatch.Dimension("App", "courier")

b.CloudWatch().Queue(
cwatch.Datum("DBConnectionsInUse", float64(dbStats.InUse), cwtypes.StandardUnitCount, hostDim, appDim),
cwatch.Datum("DBConnectionWaitDuration", float64(dbWaitDurationInPeriod/time.Millisecond), cwtypes.StandardUnitMilliseconds, hostDim, appDim),
cwatch.Datum("RedisConnectionsInUse", float64(redisStats.ActiveCount), cwtypes.StandardUnitCount, hostDim, appDim),
cwatch.Datum("RedisConnectionsWaitDuration", float64(redisWaitDurationInPeriod/time.Millisecond), cwtypes.StandardUnitMilliseconds, hostDim, appDim),
)

b.CloudWatch().Queue(
cwatch.Datum("QueuedMsgs", float64(bulkSize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "bulk")),
cwatch.Datum("QueuedMsgs", float64(prioritySize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "priority")),
)

analytics.Gauge("courier.db_busy", float64(dbStats.InUse))
analytics.Gauge("courier.db_idle", float64(dbStats.Idle))
analytics.Gauge("courier.db_wait_ms", float64(dbWaitDurationInPeriod/time.Millisecond))
Expand Down Expand Up @@ -873,3 +900,8 @@ func (b *backend) Status() string {
func (b *backend) RedisPool() *redis.Pool {
return b.rp
}

// CloudWatch return the cloudwatch service
func (b *backend) CloudWatch() *cwatch.Service {
return b.cw
}
11 changes: 11 additions & 0 deletions backends/rapidpro/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"time"
"unicode/utf8"

cwtypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/gocommon/uuids"
Expand Down Expand Up @@ -220,6 +222,15 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,
// log that we created a new contact to librato
analytics.Gauge("courier.new_contact", float64(1))

b.cw.Queue(
cwatch.Datum(
"NewContact",
float64(1),
cwtypes.StandardUnitCount,
cwatch.Dimension("ChannelType", string(channel.ChannelType())),
),
)

// and return it
return contact, nil
}
10 changes: 10 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log"
"log/slog"
"net"
"os"
"strings"

"github.com/nyaruka/courier/utils"
Expand All @@ -29,6 +30,10 @@ type Config struct {
AWSSecretAccessKey string `help:"secret access key to use for AWS services"`
AWSRegion string `help:"region to use for AWS services, e.g. us-east-1"`

CloudwatchNamespace string `help:"the namespace to use for cloudwatch metrics"`
DeploymentID string `help:"the deployment identifier to use for metrics"`
InstanceID string `help:"the instance identifier to use for metrics"`

DynamoEndpoint string `help:"DynamoDB service endpoint, e.g. https://dynamodb.us-east-1.amazonaws.com"`
DynamoTablePrefix string `help:"prefix to use for DynamoDB tables"`

Expand Down Expand Up @@ -60,6 +65,7 @@ type Config struct {

// NewDefaultConfig returns a new default configuration object
func NewDefaultConfig() *Config {
hostname, _ := os.Hostname()
return &Config{
Backend: "rapidpro",
Domain: "localhost",
Expand All @@ -73,6 +79,10 @@ func NewDefaultConfig() *Config {
AWSSecretAccessKey: "",
AWSRegion: "us-east-1",

CloudwatchNamespace: "Temba/Courier",
DeploymentID: "dev",
InstanceID: hostname,

DynamoEndpoint: "", // let library generate it
DynamoTablePrefix: "Temba",

Expand Down
55 changes: 28 additions & 27 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ go 1.23

require (
github.com/antchfx/xmlquery v1.4.2
github.com/aws/aws-sdk-go-v2 v1.32.4
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.16
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.37.0
github.com/aws/aws-sdk-go-v2/service/s3 v1.67.0
github.com/aws/aws-sdk-go-v2 v1.32.6
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.17
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.37.1
github.com/aws/aws-sdk-go-v2/service/s3 v1.69.0
github.com/buger/jsonparser v1.1.1
github.com/dghubble/oauth1 v0.7.3
github.com/getsentry/sentry-go v0.29.1
github.com/go-chi/chi/v5 v5.1.0
github.com/golang-jwt/jwt/v5 v5.2.1
Expand All @@ -19,13 +18,13 @@ require (
github.com/jmoiron/sqlx v1.4.0
github.com/lib/pq v1.10.9
github.com/nyaruka/ezconf v0.3.0
github.com/nyaruka/gocommon v1.59.2
github.com/nyaruka/gocommon v1.60.5
github.com/nyaruka/null/v3 v3.0.0
github.com/nyaruka/redisx v0.8.1
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/samber/slog-multi v1.2.4
github.com/samber/slog-sentry v1.2.2
github.com/stretchr/testify v1.9.0
github.com/stretchr/testify v1.10.0
golang.org/x/mod v0.22.0
golang.org/x/oauth2 v0.24.0
gopkg.in/go-playground/validator.v9 v9.31.0
Expand All @@ -34,30 +33,32 @@ require (
require (
cloud.google.com/go/compute/metadata v0.5.2 // indirect
github.com/antchfx/xpath v1.3.2 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect
github.com/aws/aws-sdk-go-v2/config v1.28.4 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.45 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.19 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23 // indirect
github.com/aws/aws-sdk-go v1.55.5
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7 // indirect
github.com/aws/aws-sdk-go-v2/config v1.28.5 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.46 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.20 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.23 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.24.5 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.33.0 // indirect
github.com/aws/smithy-go v1.22.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.24 // indirect
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.43.3
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.6 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.5 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.24.6 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.5 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.33.1 // indirect
github.com/aws/smithy-go v1.22.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.6 // indirect
github.com/gabriel-vasile/mimetype v1.4.7 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.22.1 // indirect
github.com/go-playground/validator/v10 v10.23.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
Expand All @@ -69,7 +70,7 @@ require (
github.com/naoina/toml v0.1.1 // indirect
github.com/nyaruka/librato v1.1.1 // indirect
github.com/nyaruka/null/v2 v2.0.3 // indirect
github.com/nyaruka/phonenumbers v1.4.2 // indirect
github.com/nyaruka/phonenumbers v1.4.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/samber/lo v1.47.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
Expand Down
Loading

0 comments on commit d49541d

Please sign in to comment.