Skip to content

Commit

Permalink
Merge pull request #11 from nyaruka/sending
Browse files Browse the repository at this point in the history
Add sending support to courier, plug in sentry, log request and respo…
  • Loading branch information
nicpottier authored Jul 5, 2017
2 parents 7b13ed7 + 7b2ea69 commit d1c351f
Show file tree
Hide file tree
Showing 100 changed files with 11,491 additions and 582 deletions.
9 changes: 6 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
language: go

addons:
apt:
packages:
- redis-server
postgresql: '9.3'

go:
- 1.8

services:
- redis-server

addons:
postgresql: '9.3'

before_script:
- psql -U postgres -c "CREATE USER courier WITH PASSWORD 'courier';"
- psql -U postgres -c "ALTER ROLE courier WITH SUPERUSER;"
Expand Down
4 changes: 4 additions & 0 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ type Backend interface {
GetChannel(ChannelType, ChannelUUID) (Channel, error)
WriteMsg(*Msg) error
WriteMsgStatus(*MsgStatusUpdate) error
WriteChannelLogs([]*ChannelLog) error

PopNextOutgoingMsg() (*Msg, error)
MarkOutgoingMsgComplete(*Msg)

Health() string
}
Expand Down
73 changes: 73 additions & 0 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rapidpro

import (
"bytes"
"encoding/json"
"fmt"
"net/url"
"path"
Expand All @@ -17,10 +18,14 @@ import (
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/config"
"github.com/nyaruka/courier/queue"
"github.com/nyaruka/courier/utils"
"github.com/sirupsen/logrus"
)

// the name for our message queue
const msgQueueName = "msgs"

func init() {
courier.RegisterBackend("rapidpro", newBackend)
}
Expand All @@ -30,6 +35,46 @@ func (b *backend) GetChannel(ct courier.ChannelType, uuid courier.ChannelUUID) (
return getChannel(b, ct, uuid)
}

// PopNextOutgoingMsg pops the next message that needs to be sent
func (b *backend) PopNextOutgoingMsg() (*courier.Msg, error) {
// pop the next message off our queue
rc := b.redisPool.Get()
defer rc.Close()

token, msgJSON, err := queue.PopFromQueue(rc, msgQueueName)
for token == queue.Retry {
token, msgJSON, err = queue.PopFromQueue(rc, msgQueueName)
}

var msg *courier.Msg
if msgJSON != "" {
dbMsg := &DBMsg{}
err = json.Unmarshal([]byte(msgJSON), dbMsg)
if err != nil {
return nil, err
}

// load our channel
channel, err := b.GetChannel(courier.AnyChannelType, dbMsg.ChannelUUID)
if err != nil {
return nil, err
}

// then create our outgoing msg
msg = courier.NewOutgoingMsg(channel, dbMsg.URN, dbMsg.Text)
msg.WorkerToken = token
}

return msg, nil
}

// MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel
func (b *backend) MarkOutgoingMsgComplete(msg *courier.Msg) {
rc := b.redisPool.Get()
defer rc.Close()
queue.MarkComplete(rc, msgQueueName, msg.WorkerToken)
}

// WriteMsg writes the passed in message to our store
func (b *backend) WriteMsg(m *courier.Msg) error {
return writeMsg(b, m)
Expand All @@ -40,6 +85,17 @@ func (b *backend) WriteMsgStatus(status *courier.MsgStatusUpdate) error {
return writeMsgStatus(b, status)
}

// WriteChannelLogs persists the passed in logs to our database, for rapidpro we swallow all errors, logging isn't critical
func (b *backend) WriteChannelLogs(logs []*courier.ChannelLog) error {
for _, l := range logs {
err := writeChannelLog(b, l)
if err != nil {
logrus.WithError(err).Error("error writing channel log")
}
}
return nil
}

// Health returns the health of this backend as a string, returning "" if all is well
func (b *backend) Health() string {
// test redis
Expand Down Expand Up @@ -124,6 +180,9 @@ func (b *backend) Start() error {
log.Info("redis ok")
}

// initialize our pop script
b.popScript = redis.NewScript(3, luaPopScript)

// create our s3 client
s3Session, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(b.config.AWSAccessKeyID, b.config.AWSSecretAccessKey, ""),
Expand Down Expand Up @@ -204,8 +263,22 @@ type backend struct {
s3Client *s3.S3
awsCreds *credentials.Credentials

popScript *redis.Script

notifier *notifier

stopChan chan bool
waitGroup *sync.WaitGroup
}

var luaPopScript = `
local val = redis.call('zrange', ARGV[2], 0, 0);
if not next(val) then
redis.call('zrem', ARGV[1], ARGV[3]);
return nil;
else
redis.call('zincrby', ARGV[1], 1, ARGV[3]);
redis.call('zremrangebyrank', ARGV[2], 0, 0);
return val[1];
end
`
45 changes: 45 additions & 0 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import (
"testing"
"time"

"encoding/json"

"github.com/nyaruka/courier"
"github.com/nyaruka/courier/config"
"github.com/nyaruka/courier/queue"
"github.com/stretchr/testify/suite"
)

Expand Down Expand Up @@ -160,6 +163,48 @@ func (ts *MsgTestSuite) TestStatus() {
ts.Error(err)
}

func (ts *MsgTestSuite) TestHealth() {
// all should be well in test land
ts.Equal(ts.b.Health(), "")
}

func (ts *MsgTestSuite) TestOutgoingQueue() {
// add one of our outgoing messages to the queue
r := ts.b.redisPool.Get()
defer r.Close()

dbMsg, err := readMsgFromDB(ts.b, courier.NewMsgID(10000))
dbMsg.ChannelUUID, _ = courier.NewChannelUUID("dbc126ed-66bc-4e28-b67b-81dc3327c95d")
ts.NoError(err)
ts.NotNil(dbMsg)

// serialize our message
msgJSON, err := json.Marshal(dbMsg)
ts.NoError(err)

err = queue.PushOntoQueue(r, msgQueueName, "dbc126ed-66bc-4e28-b67b-81dc3327c95d", 1, string(msgJSON), queue.DefaultPriority)
ts.NoError(err)

// pop a message off our queue
msg, err := ts.b.PopNextOutgoingMsg()
ts.NoError(err)
ts.NotNil(msg)

// make sure it is the message we just added
ts.Equal(dbMsg.ID, msg.ID)

// and that it has the appropriate text
ts.Equal(msg.Text, "test message")

// mark this message as dealt with
ts.b.MarkOutgoingMsgComplete(msg)

// pop another message off, shouldn't get anything
msg, err = ts.b.PopNextOutgoingMsg()
ts.Nil(msg)
ts.Nil(err)
}

func (ts *MsgTestSuite) TestChannel() {
knChannel := ts.getChannel("KN", "dbc126ed-66bc-4e28-b67b-81dc3327c95d")

Expand Down
27 changes: 23 additions & 4 deletions backends/rapidpro/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ func getChannel(b *backend, channelType courier.ChannelType, channelUUID courier
}

const lookupChannelFromUUIDSQL = `
SELECT org_id, id, uuid, channel_type, address, country, config
SELECT org_id, id, uuid, channel_type, scheme, address, country, config
FROM channels_channel
WHERE channel_type = $1 AND uuid = $2 AND is_active = true`
WHERE uuid = $1 AND is_active = true`

// ChannelForUUID attempts to look up the channel with the passed in UUID, returning it
func loadChannelFromDB(b *backend, channel *DBChannel, channelType courier.ChannelType, uuid courier.ChannelUUID) error {
// select just the fields we need
err := b.db.Get(channel, lookupChannelFromUUIDSQL, channelType, uuid)
err := b.db.Get(channel, lookupChannelFromUUIDSQL, uuid)

// we didn't find a match
if err == sql.ErrNoRows {
Expand All @@ -72,6 +72,11 @@ func loadChannelFromDB(b *backend, channel *DBChannel, channelType courier.Chann
return err
}

// is it the right type?
if channelType != courier.AnyChannelType && channelType != channel.ChannelType() {
return courier.ErrChannelWrongType
}

// found it, return it
return nil
}
Expand All @@ -85,7 +90,7 @@ func getLocalChannel(channelType courier.ChannelType, uuid courier.ChannelUUID)

if found {
// if it was found but the type is wrong, that's an error
if channel.ChannelType() != channelType {
if channelType != courier.AnyChannelType && channel.ChannelType() != channelType {
return &DBChannel{ChannelType_: channelType, UUID_: uuid}, courier.ErrChannelWrongType
}

Expand Down Expand Up @@ -130,6 +135,7 @@ type DBChannel struct {
OrgID_ OrgID `db:"org_id"`
ID_ ChannelID `db:"id"`
ChannelType_ courier.ChannelType `db:"channel_type"`
Scheme_ string `db:"scheme"`
UUID_ courier.ChannelUUID `db:"uuid"`
Address_ sql.NullString `db:"address"`
Country_ sql.NullString `db:"country"`
Expand All @@ -144,6 +150,9 @@ func (c *DBChannel) OrgID() OrgID { return c.OrgID_ }
// ChannelType returns the type of this channel
func (c *DBChannel) ChannelType() courier.ChannelType { return c.ChannelType_ }

// Scheme returns the scheme of the URNs this channel deals with
func (c *DBChannel) Scheme() string { return c.Scheme_ }

// ID returns the id of this channel
func (c *DBChannel) ID() ChannelID { return c.ID_ }

Expand All @@ -169,3 +178,13 @@ func (c *DBChannel) ConfigForKey(key string, defaultValue interface{}) interface
}
return value
}

// StringConfigForKey returns the config value for the passed in key, or defaultValue if it isn't found
func (c *DBChannel) StringConfigForKey(key string, defaultValue string) string {
val := c.ConfigForKey(key, defaultValue)
str, isStr := val.(string)
if !isStr {
return defaultValue
}
return str
}
33 changes: 33 additions & 0 deletions backends/rapidpro/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package rapidpro

import (
"fmt"

"time"

"github.com/nyaruka/courier"
)

const insertLogSQL = `
INSERT INTO channels_channellog("channel_id", "msg_id", "description", "is_error", "url", "request", "response", "response_status", "created_on", "request_time")
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
`

// WriteChannelLog writes the passed in channel log to the database, we do not queue on errors but instead just throw away the log
func writeChannelLog(b *backend, log *courier.ChannelLog) error {
// cast our channel to our own channel type
dbChan, isChan := log.Channel.(*DBChannel)
if !isChan {
return fmt.Errorf("unable to write non-rapidpro channel logs")
}

description := "Success"
if log.Error != "" {
description = fmt.Sprintf("Error: %s", log.Error)
}

_, err := b.db.Exec(insertLogSQL, dbChan.ID(), log.MsgID, description, log.Error != "", log.URL,
log.Request, log.Response, log.StatusCode, log.CreatedOn, log.Elapsed/time.Millisecond)

return err
}
28 changes: 8 additions & 20 deletions backends/rapidpro/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rapidpro
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
Expand Down Expand Up @@ -108,28 +109,10 @@ func newDBMsgFromMsg(m *courier.Msg) *DBMsg {
CreatedOn: now,
ModifiedOn: now,
QueuedOn: now,
SentOn: m.ReceivedOn,
SentOn: *m.ReceivedOn,
}
}

// adds the message to a redis queue for handling. CURRENTLY UNUSED
func addToHandleQueue(b *backend, m *DBMsg) error {
// write it to redis
r := b.redisPool.Get()
defer r.Close()

// we push to two different queues, one that is URN specific and the other that is our global queue (and to this only the URN)
r.Send("MULTI")
r.Send("RPUSH", fmt.Sprintf("c:u:%s", m.URN), m.ID)
r.Send("RPUSH", "c:msgs", m.URN)
_, err := r.Do("EXEC")
if err != nil {
return err
}

return nil
}

const insertMsgSQL = `
INSERT INTO msgs_msg(org_id, direction, has_template_error, text, msg_count, error_count, priority, status,
visibility, external_id, channel_id, contact_id, contact_urn_id, created_on, modified_on, next_attempt, queued_on, sent_on)
Expand Down Expand Up @@ -195,7 +178,12 @@ func downloadMediaToS3(b *backend, msgUUID courier.MsgUUID, mediaURL string) (st
if err != nil {
return "", err
}
resp, body, err := utils.MakeHTTPRequest(req)
resp, err := utils.GetHTTPClient().Do(req)
if err != nil {
return "", err
}
body, err := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
return "", err
}
Expand Down
Loading

0 comments on commit d1c351f

Please sign in to comment.