Skip to content

Commit

Permalink
Notify rapidpro of messages to handle, add logrus
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed May 18, 2017
1 parent 86779a5 commit 7d528bd
Show file tree
Hide file tree
Showing 248 changed files with 129,385 additions and 43 deletions.
59 changes: 47 additions & 12 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package rapidpro
import (
"bytes"
"fmt"
"log"
"net/url"
"path"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/config"
"github.com/nyaruka/courier/utils"
"github.com/sirupsen/logrus"
)

func init() {
Expand Down Expand Up @@ -62,6 +64,12 @@ func (b *backend) Health() string {

// Start starts our RapidPro backend, this tests our various connections and starts our spool flushers
func (b *backend) Start() error {
log := logrus.WithFields(logrus.Fields{
"comp": "backend",
"state": "starting",
})
log.Info("starting backend")

// parse and test our db config
dbURL, err := url.Parse(b.config.DB)
if err != nil {
Expand All @@ -75,9 +83,9 @@ func (b *backend) Start() error {
// test our db connection
db, err := sqlx.Connect("postgres", b.config.DB)
if err != nil {
log.Printf("[ ] DB: error connecting: %s\n", err)
log.Error("db not reachable")
} else {
log.Println("[X] DB: connection ok")
log.Info("db ok")
}
b.db = db

Expand Down Expand Up @@ -111,9 +119,9 @@ func (b *backend) Start() error {
defer conn.Close()
_, err = conn.Do("PING")
if err != nil {
log.Printf("[ ] Redis: error connecting: %s\n", err)
log.WithError(err).Error("redis not reachable")
} else {
log.Println("[X] Redis: connection ok")
log.Info("redis ok")
}

// create our s3 client
Expand All @@ -129,9 +137,9 @@ func (b *backend) Start() error {
// test out our S3 credentials
err = utils.TestS3(b.s3Client, b.config.S3MediaBucket)
if err != nil {
log.Printf("[ ] S3: bucket inaccessible, media may not save: %s\n", err)
log.WithError(err).Error("s3 bucket not reachable")
} else {
log.Println("[X] S3: bucket accessible")
log.Info("s3 bucket ok")
}

// make sure our spool dirs are writable
Expand All @@ -140,14 +148,24 @@ func (b *backend) Start() error {
err = courier.EnsureSpoolDirPresent(b.config.SpoolDir, "statuses")
}
if err != nil {
log.Printf("[ ] Spool: spool directories not present, spooling may fail: %s\n", err)
log.WithError(err).Error("spool directories not writable")
} else {
log.Println("[X] Spool: spool directories present")
log.Info("spool directories ok")
}

// start our rapidpro notifier
b.notifier = newNotifier(b.config)
b.notifier.start(b)

// register and start our msg spool flushers
courier.RegisterFlusher("msgs", b.flushMsgFile)
courier.RegisterFlusher("statuses", b.flushStatusFile)
courier.RegisterFlusher(path.Join(b.config.SpoolDir, "msgs"), b.flushMsgFile)
courier.RegisterFlusher(path.Join(b.config.SpoolDir, "statuses"), b.flushStatusFile)

logrus.WithFields(logrus.Fields{
"comp": "backend",
"state": "started",
}).Info("backend started")

return nil
}

Expand All @@ -158,12 +176,24 @@ func (b *backend) Stop() error {
}

b.redisPool.Close()

// close our stop channel
close(b.stopChan)

// wait for our threads to exit
b.waitGroup.Wait()

return nil
}

// NewBackend creates a new RapidPro backend
func newBackend(config *config.Courier) courier.Backend {
return &backend{config: config}
return &backend{
config: config,

stopChan: make(chan bool),
waitGroup: &sync.WaitGroup{},
}
}

type backend struct {
Expand All @@ -173,4 +203,9 @@ type backend struct {
redisPool *redis.Pool
s3Client *s3.S3
awsCreds *credentials.Credentials

notifier *notifier

stopChan chan bool
waitGroup *sync.WaitGroup
}
18 changes: 7 additions & 11 deletions backends/rapidpro/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ func writeMsg(b *backend, msg *courier.Msg) error {
return courier.WriteToSpool(b.config.SpoolDir, "msgs", m)
}

// finally try to add this message to our handling queue
err = addToHandleQueue(b, m)

// set the id on the message returned (could be 0, that's ok)
msg.ID = m.ID

Expand Down Expand Up @@ -115,6 +112,7 @@ func newDBMsgFromMsg(m *courier.Msg) *DBMsg {
}
}

// adds the message to a redis queue for handling. CURRENTLY UNUSED
func addToHandleQueue(b *backend, m *DBMsg) error {
// write it to redis
r := b.redisPool.Get()
Expand Down Expand Up @@ -146,7 +144,7 @@ func writeMsgToDB(b *backend, m *DBMsg) error {

// our db is down, write to the spool, we will write/queue this later
if err != nil {
return courier.WriteToSpool(b.config.SpoolDir, "msgs", m)
return err
}

// set our contact and urn ids from our contact
Expand All @@ -162,6 +160,10 @@ func writeMsgToDB(b *backend, m *DBMsg) error {
if err != nil {
return err
}

// queue this up to be handled by RapidPro
b.notifier.addMsg(m.ID)

return err
}

Expand Down Expand Up @@ -244,13 +246,7 @@ func (b *backend) flushMsgFile(filename string, contents []byte) error {
err = writeMsgToDB(b, msg)

// fail? oh well, we'll try again later
if err != nil {
return err
}

// finally try to add this message to our handling queue
// TODO: if we fail here how do we avoid double inserts above?
return addToHandleQueue(b, msg)
return err
}

// DBMsg is our base struct to represent msgs both in our JSON and db representations
Expand Down
98 changes: 98 additions & 0 deletions backends/rapidpro/notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package rapidpro

import (
"fmt"
"net/http"
"net/url"
"strings"
"time"

"github.com/nyaruka/courier"
"github.com/nyaruka/courier/config"
"github.com/nyaruka/courier/utils"
"github.com/sirupsen/logrus"
)

func notifyRapidPro(config *config.Courier, msgID courier.MsgID) error {
// our form is just the id of the message to handle
body := url.Values{}
body.Add("message_id", msgID.String())

// build our request
req, err := http.NewRequest("POST", config.RapidproHandleURL, strings.NewReader(body.Encode()))

// this really should never happen, but if it does we ignore it
if err != nil {
logrus.WithField("comp", "notifier").WithError(err).Error("error creating request")
return nil
}

req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.Header.Set("AUTHORIZATION", fmt.Sprintf("Token %s", config.RapidproToken))
_, _, err = utils.MakeHTTPRequest(req)

return err
}

func newNotifier(config *config.Courier) *notifier {
return &notifier{
config: config,
msgIDChan: make(chan courier.MsgID, 100000), // TODO: is 100k enough?
}
}

func (n *notifier) addMsg(msgID courier.MsgID) {
n.msgIDChan <- msgID
}

func (n *notifier) start(backend *backend) {
go func() {
backend.waitGroup.Add(1)
defer backend.waitGroup.Done()

log := logrus.WithField("comp", "notifier")
log.WithField("state", "started").Info("notifier started")

for {
select {
case msgID := <-n.msgIDChan:
// if this failed, rapidpro is likely down, push it onto our retry list
err := notifyRapidPro(n.config, msgID)

// we failed, append it to our retries
if err != nil {
log.WithError(err).Error("error notifying rapidpro")
n.retries = append(n.retries, msgID)
}

// otherwise, all is well, move onto the next

case <-backend.stopChan:
// we are being stopped, exit
log.WithField("state", "stopped").Info("notifier stopped")
return

case <-time.After(500 * time.Millisecond):
// if we are quiet for 500ms, try to send some retries
retried := 0
for retried < 10 && retried < len(n.retries) {
msgID := n.retries[0]
n.retries = n.retries[1:]

err := notifyRapidPro(n.config, msgID)
if err != nil {
log.WithError(err).Error("error notifying rapidpro")
n.retries = append(n.retries, msgID)
}
retried++
}
}
}
}()
}

type notifier struct {
config *config.Courier
msgIDChan chan courier.MsgID
retries []courier.MsgID
}
10 changes: 10 additions & 0 deletions cmd/courier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
_ "github.com/lib/pq"
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/config"
"github.com/sirupsen/logrus"

// load channel handler packages
_ "github.com/nyaruka/courier/handlers/africastalking"
Expand All @@ -30,6 +31,15 @@ func main() {
log.Fatalf("Error loading configuration: %s", err)
}

// configure our logger
//logrus.SetFormatter(&logrus.JSONFormatter{})
logrus.SetOutput(os.Stdout)
level, err := logrus.ParseLevel(config.LogLevel)
if err != nil {
log.Fatalf("Invalid log level '%s'", level)
}
logrus.SetLevel(level)

// load our backend
backend, err := courier.NewBackend(config)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions config/courier.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ type Courier struct {
AWSAccessKeyID string `default:"missing_aws_access_key_id"`
AWSSecretAccessKey string `default:"missing_aws_secret_access_key"`

RapidproHandleURL string `default:"https://app.rapidpro.io/handlers/mage/handle_message/"`
RapidproToken string `default:"missing_rapidpro_token"`

LogLevel string `default:"error"`

IncludeChannels []string
ExcludeChannels []string
}
Expand Down
9 changes: 9 additions & 0 deletions courier.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
#
#-------------------------------------------------------------------------------------

# What backend to use for looking up and writing data
backend = "rapidpro"

# The externally accessible base URL of the server
base_url = "https://localhost:8080"

Expand Down Expand Up @@ -36,3 +39,9 @@ s3_media_bucket = "courier-test"

# prefix to our filenames for media (files will be named after the msg uuid)
s3_media_prefix = "media"

# the URL that should be hit with new msgs on RapidPro
rapidpro_handle_url = "https://app.rapidpro.io/handlers/mage/handle_message/"

# the token used for authentication agains the RapidPro instance
rapidpro_token = "missing_rapidpro_token"
12 changes: 12 additions & 0 deletions msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package courier

import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"strconv"
"time"

Expand Down Expand Up @@ -32,6 +34,16 @@ func (i *MsgID) UnmarshalText(text []byte) (err error) {
return err
}

// UnmarshalJSON satisfies json unmarshalling so ids can be decoded from JSON
func (i *MsgID) UnmarshalJSON(bytes []byte) (err error) {
return json.Unmarshal(bytes, &i.NullInt64)
}

// String satisfies the Stringer interface
func (i *MsgID) String() string {
return fmt.Sprintf("%d", i.Int64)
}

// NilMsgID is our nil value for MsgID
var NilMsgID = MsgID{sql.NullInt64{Int64: 0, Valid: false}}

Expand Down
Loading

0 comments on commit 7d528bd

Please sign in to comment.