Skip to content

Commit

Permalink
Merge pull request #814 from nyaruka/remove-librato
Browse files Browse the repository at this point in the history
Remove librato
  • Loading branch information
rowanseymour authored Dec 17, 2024
2 parents e9b8206 + c5d6e08 commit a9ed817
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 38 deletions.
14 changes: 1 addition & 13 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/queue"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/aws/dynamo"
"github.com/nyaruka/gocommon/aws/s3x"
Expand Down Expand Up @@ -802,18 +801,7 @@ func (b *backend) Heartbeat() error {
cwatch.Datum("QueuedMsgs", float64(prioritySize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "priority")),
)

analytics.Gauge("courier.db_busy", float64(dbStats.InUse))
analytics.Gauge("courier.db_idle", float64(dbStats.Idle))
analytics.Gauge("courier.db_wait_ms", float64(dbWaitDurationInPeriod/time.Millisecond))
analytics.Gauge("courier.db_wait_count", float64(dbWaitCountInPeriod))
analytics.Gauge("courier.redis_active", float64(redisStats.ActiveCount))
analytics.Gauge("courier.redis_idle", float64(redisStats.IdleCount))
analytics.Gauge("courier.redis_wait_ms", float64(redisWaitDurationInPeriod/time.Millisecond))
analytics.Gauge("courier.redis_wait_count", float64(redisWaitCountInPeriod))
analytics.Gauge("courier.bulk_queue", float64(bulkSize))
analytics.Gauge("courier.priority_queue", float64(prioritySize))

slog.Info("current analytics", "db_busy", dbStats.InUse,
slog.Info("current metrics", "db_busy", dbStats.InUse,
"db_idle", dbStats.Idle,
"db_wait_time", dbWaitDurationInPeriod,
"db_wait_count", dbWaitCountInPeriod,
Expand Down
2 changes: 1 addition & 1 deletion backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ func (ts *BackendTestSuite) TestHealth() {
}

func (ts *BackendTestSuite) TestHeartbeat() {
// TODO make analytics abstraction layer so we can test what we report
// TODO make metrics abstraction layer so we can test what we report
ts.NoError(ts.b.Heartbeat())
}

Expand Down
3 changes: 0 additions & 3 deletions backends/rapidpro/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
cwtypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/urns"
Expand Down Expand Up @@ -220,8 +219,6 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,
contact.URNID_ = contactURN.ID

// log that we created a new contact to librato
analytics.Gauge("courier.new_contact", float64(1))

b.cw.Queue(
cwatch.Datum(
"NewContact",
Expand Down
3 changes: 0 additions & 3 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/nyaruka/courier/utils/clogs"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/urns"
)
Expand Down Expand Up @@ -339,13 +338,11 @@ func (w *Sender) sendMessage(msg MsgOut) {

// report to librato
if status.Status() == MsgStatusErrored || status.Status() == MsgStatusFailed {
analytics.Gauge(fmt.Sprintf("courier.msg_send_error_%s", msg.Channel().ChannelType()), secondDuration)
backend.CloudWatch().Queue(
cwatch.Datum("MsgSendError", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim),
)

} else {
analytics.Gauge(fmt.Sprintf("courier.msg_send_%s", msg.Channel().ChannelType()), secondDuration)
backend.CloudWatch().Queue(
cwatch.Datum("MsgSend", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim),
)
Expand Down
19 changes: 1 addition & 18 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"log"
"log/slog"
"net/http"
"os"
"runtime/debug"
"slices"
"sort"
Expand All @@ -21,7 +20,6 @@ import (
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/nyaruka/courier/utils/clogs"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/jsonx"
Expand Down Expand Up @@ -94,14 +92,6 @@ func NewServerWithLogger(config *Config, backend Backend, logger *slog.Logger) S
// if it encounters any unrecoverable (or ignorable) error, though its bias is to move forward despite
// connection errors
func (s *server) Start() error {
// configure librato if we have configuration options for it
host, _ := os.Hostname()
if s.config.LibratoUsername != "" {
analytics.RegisterBackend(analytics.NewLibrato(s.config.LibratoUsername, s.config.LibratoToken, host, time.Second, s.waitGroup))
}

analytics.Start()

// start our backend
err := s.backend.Start()
if err != nil {
Expand Down Expand Up @@ -197,8 +187,6 @@ func (s *server) Stop() error {
return err
}

analytics.Stop()

// wait for everything to stop
s.waitGroup.Wait()

Expand Down Expand Up @@ -318,20 +306,18 @@ func (s *server) channelHandleWrapper(handler ChannelHandler, handlerFunc Channe
}

if channel != nil {
// if we have a channel but no events were created, we still log this to analytics
// if we have a channel but no events were created, we still log this to metrics
channelTypeDim := cwatch.Dimension("ChannelType", string(channel.ChannelType()))

if len(events) == 0 {
if hErr != nil {
s.Backend().CloudWatch().Queue(
cwatch.Datum("ChannelError", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim),
)
analytics.Gauge(fmt.Sprintf("courier.channel_error_%s", channel.ChannelType()), secondDuration)
} else {
s.Backend().CloudWatch().Queue(
cwatch.Datum("ChannelIgnored", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim),
)
analytics.Gauge(fmt.Sprintf("courier.channel_ignored_%s", channel.ChannelType()), secondDuration)
}
}

Expand All @@ -342,20 +328,17 @@ func (s *server) channelHandleWrapper(handler ChannelHandler, handlerFunc Channe
s.Backend().CloudWatch().Queue(
cwatch.Datum("MsgReceive", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim),
)
analytics.Gauge(fmt.Sprintf("courier.msg_receive_%s", channel.ChannelType()), secondDuration)
LogMsgReceived(r, e)
case StatusUpdate:
clog.SetAttached(true)
s.Backend().CloudWatch().Queue(
cwatch.Datum("MsgStatus", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim),
)
analytics.Gauge(fmt.Sprintf("courier.msg_status_%s", channel.ChannelType()), secondDuration)
LogMsgStatusReceived(r, e)
case ChannelEvent:
s.Backend().CloudWatch().Queue(
cwatch.Datum("EventReceive", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim),
)
analytics.Gauge(fmt.Sprintf("courier.evt_receive_%s", channel.ChannelType()), secondDuration)
LogChannelEventReceived(r, e)
}
}
Expand Down

0 comments on commit a9ed817

Please sign in to comment.