Skip to content

Commit

Permalink
tests of our backend
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed Jul 5, 2017
1 parent c157b9f commit 7b2ea69
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 63 deletions.
45 changes: 45 additions & 0 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import (
"testing"
"time"

"encoding/json"

"github.com/nyaruka/courier"
"github.com/nyaruka/courier/config"
"github.com/nyaruka/courier/queue"
"github.com/stretchr/testify/suite"
)

Expand Down Expand Up @@ -160,6 +163,48 @@ func (ts *MsgTestSuite) TestStatus() {
ts.Error(err)
}

func (ts *MsgTestSuite) TestHealth() {
// all should be well in test land
ts.Equal(ts.b.Health(), "")
}

func (ts *MsgTestSuite) TestOutgoingQueue() {
// add one of our outgoing messages to the queue
r := ts.b.redisPool.Get()
defer r.Close()

dbMsg, err := readMsgFromDB(ts.b, courier.NewMsgID(10000))
dbMsg.ChannelUUID, _ = courier.NewChannelUUID("dbc126ed-66bc-4e28-b67b-81dc3327c95d")
ts.NoError(err)
ts.NotNil(dbMsg)

// serialize our message
msgJSON, err := json.Marshal(dbMsg)
ts.NoError(err)

err = queue.PushOntoQueue(r, msgQueueName, "dbc126ed-66bc-4e28-b67b-81dc3327c95d", 1, string(msgJSON), queue.DefaultPriority)
ts.NoError(err)

// pop a message off our queue
msg, err := ts.b.PopNextOutgoingMsg()
ts.NoError(err)
ts.NotNil(msg)

// make sure it is the message we just added
ts.Equal(dbMsg.ID, msg.ID)

// and that it has the appropriate text
ts.Equal(msg.Text, "test message")

// mark this message as dealt with
ts.b.MarkOutgoingMsgComplete(msg)

// pop another message off, shouldn't get anything
msg, err = ts.b.PopNextOutgoingMsg()
ts.Nil(msg)
ts.Nil(err)
}

func (ts *MsgTestSuite) TestChannel() {
knChannel := ts.getChannel("KN", "dbc126ed-66bc-4e28-b67b-81dc3327c95d")

Expand Down
18 changes: 0 additions & 18 deletions backends/rapidpro/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,24 +113,6 @@ func newDBMsgFromMsg(m *courier.Msg) *DBMsg {
}
}

// adds the message to a redis queue for handling. CURRENTLY UNUSED
func addToHandleQueue(b *backend, m *DBMsg) error {
// write it to redis
r := b.redisPool.Get()
defer r.Close()

// we push to two different queues, one that is URN specific and the other that is our global queue (and to this only the URN)
r.Send("MULTI")
r.Send("RPUSH", fmt.Sprintf("c:u:%s", m.URN), m.ID)
r.Send("RPUSH", "c:msgs", m.URN)
_, err := r.Do("EXEC")
if err != nil {
return err
}

return nil
}

const insertMsgSQL = `
INSERT INTO msgs_msg(org_id, direction, has_template_error, text, msg_count, error_count, priority, status,
visibility, external_id, channel_id, contact_id, contact_urn_id, created_on, modified_on, next_attempt, queued_on, sent_on)
Expand Down
4 changes: 2 additions & 2 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ var luaPop = redis.NewScript(2, `-- KEYS: [Epoch QueueType]

// PopFromQueue pops the next available message from the passed in queue. If QueueRetry
// is returned the caller should immediately make another call to get the next value. A
// value of QueueEmpty there are no more items to retrive. Otherwise QueueResult
// should be saved in order to mark the task as complete
// worker token of EmptyQueue will be returned if there are no more items to retrive.
// Otherwise the WorkerToken should be saved in order to mark the task as complete later.
func PopFromQueue(conn redis.Conn, qType string) (WorkerToken, string, error) {
epoch := time.Now().Unix()
values, err := redis.Strings(luaPop.Do(conn, strconv.FormatInt(epoch, 10), qType))
Expand Down
87 changes: 44 additions & 43 deletions worker.go → sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,39 @@ import (
// Foreman takes care of managing our set of sending workers and assigns msgs for each to send
type Foreman struct {
server Server
workers []*Worker
availableWorkers chan *Worker
senders []*Sender
availableSenders chan *Sender
quit chan bool
}

// NewForeman creates a new Foreman for the passed in server with the number of max workers
func NewForeman(server Server, maxWorkers int) *Foreman {
// NewForeman creates a new Foreman for the passed in server with the number of max senders
func NewForeman(server Server, maxSenders int) *Foreman {
foreman := &Foreman{
server: server,
workers: make([]*Worker, maxWorkers),
availableWorkers: make(chan *Worker, maxWorkers),
senders: make([]*Sender, maxSenders),
availableSenders: make(chan *Sender, maxSenders),
quit: make(chan bool),
}

for i := 0; i < maxWorkers; i++ {
foreman.workers[i] = NewWorker(foreman, i)
for i := 0; i < maxSenders; i++ {
foreman.senders[i] = NewSender(foreman, i)
}

return foreman
}

// Start starts the foreman and all its workers, assigning jobs while there are some
// Start starts the foreman and all its senders, assigning jobs while there are some
func (f *Foreman) Start() {
for _, worker := range f.workers {
worker.Start()
for _, sender := range f.senders {
sender.Start()
}
go f.Assign()
}

// Stop stops the foreman and all its workers, the wait group of the server can be used to track progress
// Stop stops the foreman and all its senders, the wait group of the server can be used to track progress
func (f *Foreman) Stop() {
for _, worker := range f.workers {
worker.Stop()
for _, sender := range f.senders {
sender.Stop()
}
close(f.quit)
logrus.WithField("comp", "foreman").WithField("state", "stopping").Info("foreman stopping")
Expand All @@ -56,8 +56,8 @@ func (f *Foreman) Assign() {

log.WithFields(logrus.Fields{
"state": "started",
"workers": len(f.workers),
}).Info("sending workers started and waiting")
"senders": len(f.senders),
}).Info("senders started and waiting")

backend := f.server.Backend()
lastSleep := false
Expand All @@ -69,76 +69,77 @@ func (f *Foreman) Assign() {
log.WithField("state", "stopped").Info("foreman stopped")
return

// otherwise, grab the next msg and assign it to a worker
// otherwise, grab the next msg and assign it to a sender
default:
// get the next worker that is ready
worker := <-f.availableWorkers
// get the next sender that is ready
sender := <-f.availableSenders

// see if we have a message to work on
msg, err := backend.PopNextOutgoingMsg()
if err != nil {
log.WithError(err).Error("error popping outgoing msg")
break
}

if msg != nil {
// if so, assign it to our worker
worker.job <- msg
if err == nil && msg != nil {
// if so, assign it to our sender
sender.job <- msg
lastSleep = false
} else {
// otherwise, add our worker back to our queue and sleep a bit
// we received an error getting the next message, log it
if err != nil {
log.WithError(err).Error("error popping outgoing msg")
}

// add our sender back to our queue and sleep a bit
if !lastSleep {
log.Info("sleeping, no messages")
lastSleep = true
}
f.availableWorkers <- worker
f.availableSenders <- sender
time.Sleep(250 * time.Millisecond)
}
}
}
}

// Worker is our type for a single goroutine that is sending messages
type Worker struct {
// Sender is our type for a single goroutine that is sending messages
type Sender struct {
id int
foreman *Foreman
job chan *Msg
}

// NewWorker creates a new worker responsible for sending messages
func NewWorker(foreman *Foreman, id int) *Worker {
worker := &Worker{
// NewSender creates a new sender responsible for sending messages
func NewSender(foreman *Foreman, id int) *Sender {
sender := &Sender{
id: id,
foreman: foreman,
job: make(chan *Msg, 1),
}
return worker
return sender
}

// Start starts our Worker's goroutine and has it start waiting for tasks from the foreman
func (w *Worker) Start() {
go w.Work()
// Start starts our Sender's goroutine and has it start waiting for tasks from the foreman
func (w *Sender) Start() {
go w.Send()
}

// Stop stops our workers, callers can use the server's wait group to track progress
func (w *Worker) Stop() {
// Stop stops our senders, callers can use the server's wait group to track progress
func (w *Sender) Stop() {
close(w.job)
}

// Work is our main work loop for our worker. The Worker marks itself as available for work
// Send is our main work loop for our worker. The Worker marks itself as available for work
// to the foreman, then waits for the next job
func (w *Worker) Work() {
func (w *Sender) Send() {
w.foreman.server.WaitGroup().Add(1)
defer w.foreman.server.WaitGroup().Done()

log := logrus.WithField("comp", "worker").WithField("workerID", w.id)
log := logrus.WithField("comp", "sender").WithField("senderID", w.id)
log.Debug("started")

server := w.foreman.server

for true {
// list ourselves as available for work
w.foreman.availableWorkers <- w
w.foreman.availableSenders <- w

// grab our next piece of work
msg := <-w.job
Expand Down

0 comments on commit 7b2ea69

Please sign in to comment.