Skip to content

Commit

Permalink
Use std library errors, part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
norkans7 committed May 30, 2024
1 parent 43f290b commit 2fe6ac9
Show file tree
Hide file tree
Showing 24 changed files with 98 additions and 79 deletions.
39 changes: 20 additions & 19 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"sync"
"time"

"errors"

"github.com/aws/aws-sdk-go/service/s3"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
Expand All @@ -33,7 +35,6 @@ import (
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/redisx"
"github.com/pkg/errors"
)

// the name for our message queue
Expand Down Expand Up @@ -359,15 +360,15 @@ func (b *backend) DeleteMsgByExternalID(ctx context.Context, channel courier.Cha
var msgID courier.MsgID
var contactID ContactID
if err := row.Scan(&msgID, &contactID); err != nil && err != sql.ErrNoRows {
return errors.Wrap(err, "error querying deleted msg")
return fmt.Errorf("error querying deleted msg: %w", err)
}

if msgID != courier.NilMsgID && contactID != NilContactID {
rc := b.redisPool.Get()
defer rc.Close()

if err := queueMsgDeleted(rc, ch, msgID, contactID); err != nil {
return errors.Wrap(err, "error queuing message deleted task")
return fmt.Errorf("error queuing message deleted task: %w", err)
}
}

Expand Down Expand Up @@ -416,7 +417,7 @@ func (b *backend) PopNextOutgoingMsg(ctx context.Context) (courier.MsgOut, error
err = json.Unmarshal([]byte(msgJSON), dbMsg)
if err != nil {
queue.MarkComplete(rc, msgQueueName, token)
return nil, errors.Wrapf(err, "unable to unmarshal message: %s", string(msgJSON))
return nil, fmt.Errorf("unable to unmarshal message: %s: %w", string(msgJSON), err)
}

// populate the channel on our db msg
Expand Down Expand Up @@ -536,7 +537,7 @@ func (b *backend) WriteStatusUpdate(ctx context.Context, status courier.StatusUp
if oldURN != urns.NilURN && newURN != urns.NilURN {
err := b.updateContactURN(ctx, status)
if err != nil {
return errors.Wrap(err, "error updating contact URN")
return fmt.Errorf("error updating contact URN: %w", err)
}
}

Expand Down Expand Up @@ -575,7 +576,7 @@ func (b *backend) updateContactURN(ctx context.Context, status courier.StatusUpd
// retrieve channel
channel, err := b.GetChannel(ctx, courier.AnyChannelType, status.ChannelUUID())
if err != nil {
return errors.Wrap(err, "error retrieving channel")
return fmt.Errorf("error retrieving channel: %w", err)
}
dbChannel := channel.(*Channel)
tx, err := b.db.BeginTxx(ctx, nil)
Expand All @@ -585,7 +586,7 @@ func (b *backend) updateContactURN(ctx context.Context, status courier.StatusUpd
// retrieve the old URN
oldContactURN, err := getContactURNByIdentity(tx, dbChannel.OrgID(), old)
if err != nil {
return errors.Wrap(err, "error retrieving old contact URN")
return fmt.Errorf("error retrieving old contact URN: %w", err)
}
// retrieve the new URN
newContactURN, err := getContactURNByIdentity(tx, dbChannel.OrgID(), new)
Expand All @@ -598,11 +599,11 @@ func (b *backend) updateContactURN(ctx context.Context, status courier.StatusUpd
err = fullyUpdateContactURN(tx, oldContactURN)
if err != nil {
tx.Rollback()
return errors.Wrap(err, "error updating old contact URN")
return fmt.Errorf("error updating old contact URN: %w", err)
}
return tx.Commit()
}
return errors.Wrap(err, "error retrieving new contact URN")
return fmt.Errorf("error retrieving new contact URN: %w", err)
}

// only update the new URN if it doesn't have an associated contact
Expand All @@ -616,12 +617,12 @@ func (b *backend) updateContactURN(ctx context.Context, status courier.StatusUpd
err = fullyUpdateContactURN(tx, newContactURN)
if err != nil {
tx.Rollback()
return errors.Wrap(err, "error updating new contact URN")
return fmt.Errorf("error updating new contact URN: %w", err)
}
err = fullyUpdateContactURN(tx, oldContactURN)
if err != nil {
tx.Rollback()
return errors.Wrap(err, "error updating old contact URN")
return fmt.Errorf("error updating old contact URN: %w", err)
}
return tx.Commit()
}
Expand Down Expand Up @@ -663,7 +664,7 @@ func (b *backend) SaveAttachment(ctx context.Context, ch courier.Channel, conten

storageURL, err := b.attachmentStorage.Put(ctx, path, contentType, data)
if err != nil {
return "", errors.Wrapf(err, "error saving attachment to storage (bytes=%d)", len(data))
return "", fmt.Errorf("error saving attachment to storage (bytes=%d): %w", len(data), err)
}

return storageURL, nil
Expand All @@ -673,7 +674,7 @@ func (b *backend) SaveAttachment(ctx context.Context, ch courier.Channel, conten
func (b *backend) ResolveMedia(ctx context.Context, mediaUrl string) (courier.Media, error) {
u, err := url.Parse(mediaUrl)
if err != nil {
return nil, errors.Wrapf(err, "error parsing media URL")
return nil, fmt.Errorf("error parsing media URL: %w", err)
}

mediaUUID := uuidRegex.FindString(u.Path)
Expand All @@ -692,15 +693,15 @@ func (b *backend) ResolveMedia(ctx context.Context, mediaUrl string) (courier.Me
var media *Media
mediaJSON, err := b.mediaCache.Get(rc, mediaUUID)
if err != nil {
return nil, errors.Wrap(err, "error looking up cached media")
return nil, fmt.Errorf("error looking up cached media: %w", err)
}
if mediaJSON != "" {
jsonx.MustUnmarshal([]byte(mediaJSON), &media)
} else {
// lookup media in our database
media, err = lookupMediaFromUUID(ctx, b.db, uuids.UUID(mediaUUID))
if err != nil {
return nil, errors.Wrap(err, "error looking up media")
return nil, fmt.Errorf("error looking up media: %w", err)
}

// cache it for future requests
Expand Down Expand Up @@ -756,11 +757,11 @@ func (b *backend) Heartbeat() error {

active, err := redis.Strings(rc.Do("ZRANGE", fmt.Sprintf("%s:active", msgQueueName), "0", "-1"))
if err != nil {
return errors.Wrapf(err, "error getting active queues")
return fmt.Errorf("error getting active queues: %w", err)
}
throttled, err := redis.Strings(rc.Do("ZRANGE", fmt.Sprintf("%s:throttled", msgQueueName), "0", "-1"))
if err != nil {
return errors.Wrapf(err, "error getting throttled queues")
return fmt.Errorf("error getting throttled queues: %w", err)
}
queues := append(active, throttled...)

Expand All @@ -770,14 +771,14 @@ func (b *backend) Heartbeat() error {
q := fmt.Sprintf("%s/1", queue)
count, err := redis.Int(rc.Do("ZCARD", q))
if err != nil {
return errors.Wrapf(err, "error getting size of priority queue: %s", q)
return fmt.Errorf("error getting size of priority queue: %s: %w", q, err)
}
prioritySize += count

q = fmt.Sprintf("%s/0", queue)
count, err = redis.Int(rc.Do("ZCARD", q))
if err != nil {
return errors.Wrapf(err, "error getting size of bulk queue: %s", q)
return fmt.Errorf("error getting size of bulk queue: %s: %w", q, err)
}
bulkSize += count
}
Expand Down
16 changes: 8 additions & 8 deletions backends/rapidpro/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"database/sql/driver"
"fmt"
"log/slog"
"strconv"
"time"
Expand All @@ -16,7 +17,6 @@ import (
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/null/v3"
"github.com/pkg/errors"
)

// used by unit tests to slow down urn operations to test races
Expand Down Expand Up @@ -110,23 +110,23 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,
err := b.db.GetContext(ctx, contact, lookupContactFromURNSQL, urn.Identity(), org)
if err != nil && err != sql.ErrNoRows {
log.Error("error looking up contact by URN", "error", err)
return nil, errors.Wrap(err, "error looking up contact by URN")
return nil, fmt.Errorf("error looking up contact by URN: %w", err)
}

// we found it, return it
if err != sql.ErrNoRows {
tx, err := b.db.BeginTxx(ctx, nil)
if err != nil {
log.Error("error beginning transaction", "error", err)
return nil, errors.Wrap(err, "error beginning transaction")
return nil, fmt.Errorf("error beginning transaction: %w", err)
}

// update contact's URNs so this URN has priority
err = setDefaultURN(tx, channel, contact, urn, authTokens)
if err != nil {
log.Error("error updating default URN for contact", "error", err)
tx.Rollback()
return nil, errors.Wrap(err, "error setting default URN for contact")
return nil, fmt.Errorf("error setting default URN for contact: %w", err)
}
return contact, tx.Commit()
}
Expand Down Expand Up @@ -174,13 +174,13 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,
// insert it
tx, err := b.db.BeginTxx(ctx, nil)
if err != nil {
return nil, errors.Wrap(err, "error beginning transaction")
return nil, fmt.Errorf("error beginning transaction: %w", err)
}

err = insertContact(tx, contact)
if err != nil {
tx.Rollback()
return nil, errors.Wrap(err, "error inserting contact")
return nil, fmt.Errorf("error inserting contact: %w", err)
}

// used for unit testing contact races
Expand All @@ -199,7 +199,7 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,
// if this was a duplicate URN, start over with a contact lookup
return contactForURN(ctx, b, org, channel, urn, authTokens, name, clog)
}
return nil, errors.Wrap(err, "error getting URN for contact")
return nil, fmt.Errorf("error getting URN for contact: %w", err)
}

// we stole the URN from another contact, roll back and start over
Expand All @@ -211,7 +211,7 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,
// all is well, we created the new contact, commit and move forward
err = tx.Commit()
if err != nil {
return nil, errors.Wrap(err, "error commiting transaction")
return nil, fmt.Errorf("error commiting transaction: %w", err)
}

// store this URN on our contact
Expand Down
9 changes: 4 additions & 5 deletions backends/rapidpro/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/gocommon/uuids"
"github.com/nyaruka/null/v3"
"github.com/pkg/errors"
)

// MsgDirection is the direction of a message
Expand Down Expand Up @@ -204,7 +203,7 @@ func writeMsg(ctx context.Context, b *backend, msg courier.MsgIn, clog *courier.
attData, err := base64.StdEncoding.DecodeString(attURL[5:])
if err != nil {
clog.Error(courier.ErrorAttachmentNotDecodable())
return errors.Wrap(err, "unable to decode attachment data")
return fmt.Errorf("unable to decode attachment data: %w", err)
}

var contentType, extension string
Expand Down Expand Up @@ -257,7 +256,7 @@ func writeMsgToDB(ctx context.Context, b *backend, m *Msg, clog *courier.Channel

// our db is down, write to the spool, we will write/queue this later
if err != nil {
return errors.Wrap(err, "error getting contact for message")
return fmt.Errorf("error getting contact for message: %w", err)
}

// set our contact and urn id
Expand All @@ -266,14 +265,14 @@ func writeMsgToDB(ctx context.Context, b *backend, m *Msg, clog *courier.Channel

rows, err := b.db.NamedQueryContext(ctx, sqlInsertMsg, m)
if err != nil {
return errors.Wrap(err, "error inserting message")
return fmt.Errorf("error inserting message: %w", err)
}
defer rows.Close()

rows.Next()
err = rows.Scan(&m.ID_)
if err != nil {
return errors.Wrap(err, "error scanning for inserted message id")
return fmt.Errorf("error scanning for inserted message id: %w", err)
}

// queue this up to be handled by RapidPro
Expand Down
7 changes: 4 additions & 3 deletions backends/rapidpro/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"sync"
"time"

"errors"

"github.com/nyaruka/courier"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/syncx"
"github.com/nyaruka/gocommon/urns"
"github.com/pkg/errors"
)

// StatusUpdate represents a status update on a message
Expand Down Expand Up @@ -244,7 +245,7 @@ func (b *backend) writeStatusUpdatesToDB(ctx context.Context, statuses []*Status

err := dbutil.BulkQuery(ctx, b.db, sqlUpdateMsgByID, resolved)
if err != nil {
return nil, errors.Wrap(err, "error updating status")
return nil, fmt.Errorf("error updating status: %w", err)
}

return unresolved, nil
Expand Down Expand Up @@ -313,7 +314,7 @@ func (b *backend) resolveStatusUpdateMsgIDs(ctx context.Context, statuses []*Sta

for rows.Next() {
if err := rows.Scan(&msgID, &channelID, &externalID); err != nil {
return errors.Wrap(err, "error scanning rows")
return fmt.Errorf("error scanning rows: %w", err)
}

// find the status with this channel ID and external ID and update its msg ID
Expand Down
13 changes: 7 additions & 6 deletions backends/rapidpro/urn.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/nyaruka/courier/utils"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/null/v3"
"github.com/pkg/errors"
)

// ContactURNID represents a contact urn's id
Expand Down Expand Up @@ -178,14 +177,14 @@ func getOrCreateContactURN(db *sqlx.Tx, channel *Channel, contactID ContactID, u
}
err := db.Get(contactURN, sqlSelectURNByIdentity, channel.OrgID(), urn.Identity())
if err != nil && err != sql.ErrNoRows {
return nil, errors.Wrap(err, "error looking up URN by identity")
return nil, fmt.Errorf("error looking up URN by identity: %w", err)
}

// we didn't find it, let's insert it
if err == sql.ErrNoRows {
err = insertContactURN(db, contactURN)
if err != nil {
return nil, errors.Wrap(err, "error inserting URN")
return nil, fmt.Errorf("error inserting URN: %w", err)
}
}

Expand All @@ -201,7 +200,7 @@ func getOrCreateContactURN(db *sqlx.Tx, channel *Channel, contactID ContactID, u
contactURN.Display = display
err = updateContactURN(db, contactURN)
if err != nil {
return nil, errors.Wrap(err, "error updating URN")
return nil, fmt.Errorf("error updating URN: %w", err)
}
}

Expand All @@ -211,8 +210,10 @@ func getOrCreateContactURN(db *sqlx.Tx, channel *Channel, contactID ContactID, u

err = updateContactURN(db, contactURN)
}

return contactURN, errors.Wrap(err, "error updating URN auth")
if err != nil {
return contactURN, fmt.Errorf("error updating URN auth: %w", err)
}
return contactURN, nil
}

const sqlInsertURN = `
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ require (
github.com/nyaruka/null/v3 v3.0.0
github.com/nyaruka/redisx v0.8.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1
github.com/samber/slog-multi v1.0.2
github.com/samber/slog-sentry v1.2.2
github.com/stretchr/testify v1.9.0
Expand Down Expand Up @@ -49,6 +48,7 @@ require (
github.com/nyaruka/librato v1.1.1 // indirect
github.com/nyaruka/null/v2 v2.0.3 // indirect
github.com/nyaruka/phonenumbers v1.3.5 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/samber/lo v1.39.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
Expand Down
Loading

0 comments on commit 2fe6ac9

Please sign in to comment.