Skip to content

Commit

Permalink
new backend architecture
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed May 16, 2017
1 parent cd8c402 commit 37b888c
Show file tree
Hide file tree
Showing 41 changed files with 1,631 additions and 1,242 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ before_script:
- psql -U postgres -c "CREATE USER courier WITH PASSWORD 'courier';"
- psql -U postgres -c "ALTER ROLE courier WITH SUPERUSER;"
- psql -U postgres -c "CREATE DATABASE courier_test;"
- psql -U postgres -d courier_test -f schema.sql
- psql -U postgres -d courier_test -f backends/rapidpro/schema.sql
- go get github.com/mattn/goveralls
script:
- $HOME/gopath/bin/goveralls -service=travis-ci
39 changes: 39 additions & 0 deletions backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package courier

import (
"fmt"
"strings"

"github.com/nyaruka/courier/config"
)

// BackendConstructorFunc defines a function to create a particular backend type
type BackendConstructorFunc func(*config.Courier) Backend

// Backend represents the part of Courier that deals with looking up and writing channels and results
type Backend interface {
Start() error
Stop() error

GetChannel(ChannelType, ChannelUUID) (Channel, error)
WriteMsg(*Msg) error
WriteMsgStatus(*MsgStatusUpdate) error

Health() string
}

// NewBackend creates the type of backend passed in
func NewBackend(config *config.Courier) (Backend, error) {
backendFunc, found := registeredBackends[strings.ToLower(config.Backend)]
if !found {
return nil, fmt.Errorf("no such backend type: '%s'", config.Backend)
}
return backendFunc(config), nil
}

// RegisterBackend adds a new backend, called by individual backends in their init() func
func RegisterBackend(backendType string, constructorFunc BackendConstructorFunc) {
registeredBackends[strings.ToLower(backendType)] = constructorFunc
}

var registeredBackends = make(map[string]BackendConstructorFunc)
173 changes: 173 additions & 0 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package rapidpro

import (
"bytes"
"fmt"
"log"
"net/url"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/garyburd/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/config"
"github.com/nyaruka/courier/utils"
)

func init() {
courier.RegisterBackend("rapidpro", newBackend)
}

// GetChannel returns the channel for the passed in type and UUID
func (b *backend) GetChannel(ct courier.ChannelType, uuid courier.ChannelUUID) (courier.Channel, error) {
return getChannel(b, ct, uuid)
}

// WriteMsg writes the passed in message to our store
func (b *backend) WriteMsg(m *courier.Msg) error {
return writeMsg(b, m)
}

// WriteMsgStatus writes the passed in MsgStatus to our store
func (b *backend) WriteMsgStatus(status *courier.MsgStatusUpdate) error {
return writeMsgStatus(b, status)
}

// Health returns the health of this backend as a string, returning "" if all is well
func (b *backend) Health() string {
// test redis
rc := b.redisPool.Get()
_, redisErr := rc.Do("PING")
defer rc.Close()

// test our db
_, dbErr := b.db.Exec("SELECT 1")

health := bytes.Buffer{}

if redisErr != nil {
health.WriteString(fmt.Sprintf("\n% 16s: %v", "redis err", redisErr))
}
if dbErr != nil {
health.WriteString(fmt.Sprintf("\n% 16s: %v", "db err", dbErr))
}

return health.String()
}

// Start starts our RapidPro backend, this tests our various connections and starts our spool flushers
func (b *backend) Start() error {
// parse and test our db config
dbURL, err := url.Parse(b.config.DB)
if err != nil {
return fmt.Errorf("unable to parse DB URL '%s': %s", b.config.DB, err)
}

if dbURL.Scheme != "postgres" {
return fmt.Errorf("invalid DB URL: '%s', only postgres is supported", b.config.DB)
}

// test our db connection
db, err := sqlx.Connect("postgres", b.config.DB)
if err != nil {
log.Printf("[ ] DB: error connecting: %s\n", err)
} else {
log.Println("[X] DB: connection ok")
}
b.db = db

// parse and test our redis config
redisURL, err := url.Parse(b.config.Redis)
if err != nil {
return fmt.Errorf("unable to parse Redis URL '%s': %s", b.config.Redis, err)
}

// create our pool
redisPool := &redis.Pool{
Wait: true, // makes callers wait for a connection
MaxActive: 5, // only open this many concurrent connections at once
MaxIdle: 2, // only keep up to 2 idle
IdleTimeout: 240 * time.Second, // how long to wait before reaping a connection
Dial: func() (redis.Conn, error) {
conn, err := redis.Dial("tcp", fmt.Sprintf("%s", redisURL.Host))
if err != nil {
return nil, err
}

// switch to the right DB
_, err = conn.Do("SELECT", strings.TrimLeft(redisURL.Path, "/"))
return conn, err
},
}
b.redisPool = redisPool

// test our redis connection
conn := redisPool.Get()
defer conn.Close()
_, err = conn.Do("PING")
if err != nil {
log.Printf("[ ] Redis: error connecting: %s\n", err)
} else {
log.Println("[X] Redis: connection ok")
}

// create our s3 client
s3Session, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(b.config.AWSAccessKeyID, b.config.AWSSecretAccessKey, ""),
Region: aws.String(b.config.S3Region),
})
if err != nil {
return err
}
b.s3Client = s3.New(s3Session)

// test out our S3 credentials
err = utils.TestS3(b.s3Client, b.config.S3MediaBucket)
if err != nil {
log.Printf("[ ] S3: bucket inaccessible, media may not save: %s\n", err)
} else {
log.Println("[X] S3: bucket accessible")
}

// make sure our spool dirs are writable
err = courier.EnsureSpoolDirPresent(b.config.SpoolDir, "msgs")
if err != nil {
log.Printf("[ ] Spool: spool directories not present, spooling may fail: %s\n", err)
} else {
log.Println("[X] Spool: spool directories present")
}

// register and start our msg spool flushers
courier.RegisterFlusher("msgs", b.flushMsgFile)
courier.RegisterFlusher("statuses", b.flushStatusFile)
return nil
}

// Stop stops our RapidPro backend, closing our db and redis connections
func (b *backend) Stop() error {
if b.db != nil {
b.db.Close()
}

b.redisPool.Close()
return nil
}

// NewBackend creates a new RapidPro backend
func newBackend(config *config.Courier) courier.Backend {
return &backend{config: config}
}

type backend struct {
config *config.Courier

db *sqlx.DB
redisPool *redis.Pool
s3Client *s3.S3
awsCreds *credentials.Credentials
}
100 changes: 100 additions & 0 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package rapidpro

import (
"fmt"
"io/ioutil"
"log"
"testing"

"github.com/nyaruka/courier"
"github.com/nyaruka/courier/config"
"github.com/stretchr/testify/suite"
)

var testConfig = config.Courier{
Backend: "rapidpro",
DB: "postgres://courier@localhost/courier_test?sslmode=disable",
Redis: "redis://localhost:6379/10",
}

type MsgTestSuite struct {
suite.Suite
b *backend
}

func (ts *MsgTestSuite) SetupSuite() {
b, _ := courier.NewBackend(&testConfig)
ts.b = b.(*backend)

err := ts.b.Start()
if err != nil {
log.Fatalf("unable to start backend for testing: %v", err)
}

// read our testdata sql
sql, err := ioutil.ReadFile("testdata.sql")
if err != nil {
panic(fmt.Errorf("Unable to read testdata.sql: %s", err))
}
ts.b.db.MustExec(string(sql))
}

func (ts *MsgTestSuite) TearDownSuite() {
ts.b.Stop()
}

func (ts *MsgTestSuite) TestCheckMsgExists() {
channelUUID, _ := courier.NewChannelUUID("dbc126ed-66bc-4e28-b67b-81dc3327c95d")
channel, err := ts.b.GetChannel(courier.ChannelType("KN"), channelUUID)
if err != nil {
ts.FailNow("Error getting channel: ", err.Error())
}

// check with invalid message id
err = checkMsgExists(ts.b, courier.NewStatusUpdateForID(channel, courier.NewMsgID(-1), courier.MsgStatus("S")))
ts.Equal(err, courier.ErrMsgNotFound)

// check with valid message id
err = checkMsgExists(ts.b, courier.NewStatusUpdateForID(channel, courier.NewMsgID(104), courier.MsgStatus("S")))
ts.Nil(err)

// check with invalid external id
err = checkMsgExists(ts.b, courier.NewStatusUpdateForExternalID(channel, "ext-invalid", courier.MsgStatus("S")))
ts.Equal(err, courier.ErrMsgNotFound)

// check with valid external id
status := courier.NewStatusUpdateForExternalID(channel, "ext1", courier.MsgStatus("S"))
err = checkMsgExists(ts.b, status)
ts.Nil(err)
}

func TestMsgSuite(t *testing.T) {
suite.Run(t, new(MsgTestSuite))
}

var invalidConfigTestCases = []struct {
config config.Courier
expectedError string
}{
{config: config.Courier{DB: ":foo"}, expectedError: "unable to parse DB URL"},
{config: config.Courier{DB: "mysql:test"}, expectedError: "only postgres is supported"},
{config: config.Courier{DB: "postgres://courier@localhost/courier", Redis: ":foo"}, expectedError: "unable to parse Redis URL"},
}

func (ts *ServerTestSuite) TestInvalidConfigs() {
for _, testCase := range invalidConfigTestCases {
config := &testCase.config
config.Backend = "rapidpro"
backend := newBackend(config)
err := backend.Start()
ts.Contains(err.Error(), testCase.expectedError)
}
}

func TestBackendSuite(t *testing.T) {
suite.Run(t, new(ServerTestSuite))
}

type ServerTestSuite struct {
suite.Suite
}
Loading

0 comments on commit 37b888c

Please sign in to comment.