Skip to content

Commit

Permalink
courier side of stopping contacts
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed Jul 17, 2017
1 parent 1e9a3ab commit a9e0424
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 22 deletions.
3 changes: 3 additions & 0 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type Backend interface {
// used to determine any sort of deduping of msg sends
MarkOutgoingMsgComplete(Msg, MsgStatus)

// StopMsgContact marks the contact for the passed in msg as stopped
StopMsgContact(Msg)

// Health returns a string describing any health problems the backend has, or empty string if all is well
Health() string
}
Expand Down
6 changes: 6 additions & 0 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ func (b *backend) MarkOutgoingMsgComplete(msg courier.Msg, status courier.MsgSta
}
}

// StopMsgContact marks the contact for the passed in msg as stopped, that is they no longer want to receive messages
func (b *backend) StopMsgContact(m courier.Msg) {
dbMsg := m.(*DBMsg)
b.notifier.addStopContactNotification(dbMsg.ContactID_)
}

// WriteMsg writes the passed in message to our store
func (b *backend) WriteMsg(m courier.Msg) error {
return writeMsg(b, m)
Expand Down
2 changes: 1 addition & 1 deletion backends/rapidpro/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func writeMsgToDB(b *backend, m *DBMsg) error {
}

// queue this up to be handled by RapidPro
b.notifier.addMsg(m.ID_)
b.notifier.addHandleMsgNotification(m.ID_)

return err
}
Expand Down
44 changes: 24 additions & 20 deletions backends/rapidpro/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,11 @@ import (
"github.com/sirupsen/logrus"
)

func notifyRapidPro(config *config.Courier, msgID courier.MsgID) error {
// our form is just the id of the message to handle
body := url.Values{}
body.Add("message_id", msgID.String())

func notifyRapidPro(config *config.Courier, body url.Values) error {
// build our request
req, err := http.NewRequest("POST", config.RapidproHandleURL, strings.NewReader(body.Encode()))

// this really should never happen, but if it does we ignore it
// this really should never happen, but if it does we only log it
if err != nil {
logrus.WithField("comp", "notifier").WithError(err).Error("error creating request")
return nil
Expand All @@ -36,13 +32,21 @@ func notifyRapidPro(config *config.Courier, msgID courier.MsgID) error {

func newNotifier(config *config.Courier) *notifier {
return &notifier{
config: config,
msgIDChan: make(chan courier.MsgID, 100000), // TODO: is 100k enough?
config: config,
notifications: make(chan url.Values, 100000), // TODO: is 100k enough?
}
}

func (n *notifier) addMsg(msgID courier.MsgID) {
n.msgIDChan <- msgID
func (n *notifier) addHandleMsgNotification(msgID courier.MsgID) {
body := url.Values{}
body.Add("message_id", msgID.String())
n.notifications <- body
}

func (n *notifier) addStopContactNotification(contactID ContactID) {
body := url.Values{}
body.Add("contact_id", fmt.Sprintf("%d", contactID.Int64))
n.notifications <- body
}

func (n *notifier) start(backend *backend) {
Expand All @@ -57,16 +61,16 @@ func (n *notifier) start(backend *backend) {

for {
select {
case msgID := <-n.msgIDChan:
// if this failed, rapidpro is likely down, push it onto our retry list
err := notifyRapidPro(n.config, msgID)
case body := <-n.notifications:
// try to notify rapidpro
err := notifyRapidPro(n.config, body)

// we failed, append it to our retries
if err != nil {
if !lastError {
log.WithError(err).Error("error notifying rapidpro")
}
n.retries = append(n.retries, msgID)
n.retries = append(n.retries, body)
lastError = true
} else {
lastError = false
Expand All @@ -83,15 +87,15 @@ func (n *notifier) start(backend *backend) {
// if we are quiet for 500ms, try to send some retries
retried := 0
for retried < 10 && retried < len(n.retries) {
msgID := n.retries[0]
body := n.retries[0]
n.retries = n.retries[1:]

err := notifyRapidPro(n.config, msgID)
err := notifyRapidPro(n.config, body)
if err != nil {
if !lastError {
log.WithError(err).Error("error notifying rapidpro")
}
n.retries = append(n.retries, msgID)
n.retries = append(n.retries, body)
lastError = true
} else {
lastError = false
Expand All @@ -105,7 +109,7 @@ func (n *notifier) start(backend *backend) {
}

type notifier struct {
config *config.Courier
msgIDChan chan courier.MsgID
retries []courier.MsgID
config *config.Courier
notifications chan url.Values
retries []url.Values
}
6 changes: 6 additions & 0 deletions handlers/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type ChannelSendTestCase struct {
Status string
ExternalID string

Stopped bool

SendPrep SendPrepFunc
}

Expand Down Expand Up @@ -201,6 +203,10 @@ func RunChannelSendTestCases(t *testing.T, channel courier.Channel, handler cour
if testCase.Status != "" {
require.Equal(testCase.Status, string(status.Status()))
}

if testCase.Stopped {
require.Equal(msg, mb.GetLastStoppedMsgContact())
}
})
}

Expand Down
8 changes: 7 additions & 1 deletion handlers/twilio/twilio.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ const twSignatureHeader = "X-Twilio-Signature"

var sendURL = "https://api.twilio.com/2010-04-01/Accounts"

// error code twilio returns when a contact has sent "stop"
const errorStopped = 21610

type handler struct {
handlers.BaseHandler
}
Expand Down Expand Up @@ -209,7 +212,10 @@ func (h *handler) SendMsg(msg courier.Msg) (courier.MsgStatus, error) {
// was this request successful?
errorCode, _ := jsonparser.GetInt([]byte(rr.Body), "error_code")
if errorCode != 0 {
// TODO: Notify RapidPro of blocked contacts (code 21610)
if errorCode == errorStopped {
status.SetStatus(courier.MsgFailed)
h.Backend().StopMsgContact(msg)
}
return status, errors.Errorf("received error code from twilio '%d'", errorCode)
}

Expand Down
8 changes: 8 additions & 0 deletions handlers/twilio/twilio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ var defaultSendTestCases = []ChannelSendTestCase{
Error: "received error code from twilio '1001'",
PostParams: map[string]string{"Body": "Error Code", "To": "+250788383383"},
SendPrep: setSendURL},
{Label: "Stopped Contact Code",
Text: "Stopped Contact", URN: "tel:+250788383383",
Status: "F",
ResponseBody: `{ "error_code": 21610 }`, ResponseStatus: 200,
Error: "received error code from twilio '21610'",
PostParams: map[string]string{"Body": "Stopped Contact", "To": "+250788383383"},
SendPrep: setSendURL,
Stopped: true},
{Label: "No SID",
Text: "No SID", URN: "tel:+250788383383",
Status: "E",
Expand Down
22 changes: 22 additions & 0 deletions test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package courier

import (
"errors"
"sync"

"time"

Expand All @@ -18,6 +19,14 @@ type MockBackend struct {
channels map[ChannelUUID]Channel
queueMsgs []Msg
errorOnQueue bool

mutex sync.RWMutex
outgoingMsgs []Msg
msgStatuses []MsgStatus

stoppedMsgContacts []Msg

sentMsgs map[MsgID]bool
}

// NewMockBackend returns a new mock backend suitable for testing
Expand Down Expand Up @@ -53,6 +62,19 @@ func (mb *MockBackend) WasMsgSent(msg Msg) (bool, error) {
return false, nil
}

// StopMsgContact stops the contact for the passed in msg
func (mb *MockBackend) StopMsgContact(msg Msg) {
mb.stoppedMsgContacts = append(mb.stoppedMsgContacts, msg)
}

// GetLastStoppedMsgContact returns the last msg contact
func (mb *MockBackend) GetLastStoppedMsgContact() Msg {
if len(mb.stoppedMsgContacts) > 0 {
return mb.stoppedMsgContacts[len(mb.stoppedMsgContacts)-1]
}
return nil
}

// MarkOutgoingMsgComplete marks the passed msg as having been dealt with
func (mb *MockBackend) MarkOutgoingMsgComplete(m Msg, s MsgStatus) {
}
Expand Down

0 comments on commit a9e0424

Please sign in to comment.