diff --git a/backend.go b/backend.go index da4626c4e..eb949794b 100644 --- a/backend.go +++ b/backend.go @@ -46,9 +46,14 @@ type Backend interface { // returned message when they have dealt with the message (regardless of whether it was sent or not) PopNextOutgoingMsg() (Msg, error) + // WasMsgSent returns whether the backend thinks the passed in message was already sent. This can be used in cases where + // a backend wants to implement a failsafe against double sending messages (say if they were double queued) + WasMsgSent(msg Msg) (bool, error) + // MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case - // of errors during sending as it will manage the number of active workers per channel - MarkOutgoingMsgComplete(Msg) + // of errors during sending as it will manage the number of active workers per channel. The optional status parameter can be + // used to determine any sort of deduping of msg sends + MarkOutgoingMsgComplete(Msg, MsgStatus) // Health returns a string describing any health problems the backend has, or empty string if all is well Health() string diff --git a/backends/rapidpro/backend.go b/backends/rapidpro/backend.go index e815faa16..eab11cb93 100644 --- a/backends/rapidpro/backend.go +++ b/backends/rapidpro/backend.go @@ -26,6 +26,9 @@ import ( // the name for our message queue const msgQueueName = "msgs" +// the name of our set for tracking sends +const sentSetName = "msgs_sent_%s" + func init() { courier.RegisterBackend("rapidpro", newBackend) } @@ -46,7 +49,7 @@ func (b *backend) NewIncomingMsg(channel courier.Channel, urn courier.URN, text // have we seen this msg in the past period? prevUUID := checkMsgSeen(b, msg) if prevUUID != courier.NilMsgUUID { - // if so, use its UUID and mark that we've been written + // if so, use its UUID and that we've been written msg.UUID_ = prevUUID msg.AlreadyWritten_ = true } @@ -89,13 +92,38 @@ func (b *backend) PopNextOutgoingMsg() (courier.Msg, error) { return nil, nil } +// WasMsgSent returns whether the passed in message has already been sent +func (b *backend) WasMsgSent(msg courier.Msg) (bool, error) { + rc := b.redisPool.Get() + defer rc.Close() + + dateKey := fmt.Sprintf(sentSetName, time.Now().In(time.UTC).Format("2006_01_02")) + found, err := redis.Bool(rc.Do("sismember", dateKey, msg.ID().Int64)) + if err != nil { + return false, err + } + if found { + return true, nil + } + + dateKey = fmt.Sprintf(sentSetName, time.Now().Add(time.Hour*-24).In(time.UTC).Format("2006_01_02")) + found, err = redis.Bool(rc.Do("sismember", dateKey, msg.ID().Int64)) + return found, err +} + // MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel -func (b *backend) MarkOutgoingMsgComplete(msg courier.Msg) { +func (b *backend) MarkOutgoingMsgComplete(msg courier.Msg, status courier.MsgStatus) { rc := b.redisPool.Get() defer rc.Close() dbMsg := msg.(*DBMsg) queue.MarkComplete(rc, msgQueueName, dbMsg.WorkerToken_) + + // mark as sent in redis as well if this was actually wired or sent + if status != nil && (status.Status() == courier.MsgSent || status.Status() == courier.MsgWired) { + dateKey := fmt.Sprintf(sentSetName, time.Now().In(time.UTC).Format("2006_01_02")) + rc.Do("sadd", dateKey, msg.ID().Int64) + } } // WriteMsg writes the passed in message to our store diff --git a/backends/rapidpro/backend_test.go b/backends/rapidpro/backend_test.go index 29cbd1eaf..99ef614c5 100644 --- a/backends/rapidpro/backend_test.go +++ b/backends/rapidpro/backend_test.go @@ -143,6 +143,7 @@ func (ts *MsgTestSuite) TestContactURN() { func (ts *MsgTestSuite) TestStatus() { channel := ts.getChannel("KN", "dbc126ed-66bc-4e28-b67b-81dc3327c95d") now := time.Now().In(time.UTC) + time.Sleep(2 * time.Millisecond) // update by id status := ts.b.NewMsgStatusForID(channel, courier.NewMsgID(10001), courier.MsgSent) @@ -169,6 +170,7 @@ func (ts *MsgTestSuite) TestStatus() { // error our msg now = time.Now().In(time.UTC) + time.Sleep(2 * time.Millisecond) status = ts.b.NewMsgStatusForExternalID(channel, "ext1", courier.MsgErrored) err = ts.b.WriteMsgStatus(status) ts.NoError(err) @@ -232,12 +234,24 @@ func (ts *MsgTestSuite) TestOutgoingQueue() { ts.Equal(msg.Text(), "test message") // mark this message as dealt with - ts.b.MarkOutgoingMsgComplete(msg) + ts.b.MarkOutgoingMsgComplete(msg, ts.b.NewMsgStatusForID(msg.Channel(), msg.ID(), courier.MsgWired)) + + // this message should now be marked as sent + sent, err := ts.b.WasMsgSent(msg) + ts.NoError(err) + ts.True(sent) // pop another message off, shouldn't get anything msg, err = ts.b.PopNextOutgoingMsg() ts.Nil(msg) ts.Nil(err) + + // checking another message should show unsent + msg, err = readMsgFromDB(ts.b, courier.NewMsgID(10001)) + ts.NoError(err) + sent, err = ts.b.WasMsgSent(msg) + ts.NoError(err) + ts.False(sent) } func (ts *MsgTestSuite) TestChannel() { @@ -290,6 +304,7 @@ func (ts *MsgTestSuite) TestWriteMsg() { ts.NoError(err) // make sure our values are set appropriately + ts.Equal(msg.ID(), m.ID()) ts.Equal(knChannel.ID_, m.ChannelID_) ts.Equal(knChannel.OrgID_, m.OrgID_) ts.Equal(contactURN.ContactID, m.ContactID_) diff --git a/backends/rapidpro/msg.go b/backends/rapidpro/msg.go index 82d023052..a3bc5d0a4 100644 --- a/backends/rapidpro/msg.go +++ b/backends/rapidpro/msg.go @@ -158,7 +158,9 @@ WHERE id = $1 ` func readMsgFromDB(b *backend, id courier.MsgID) (*DBMsg, error) { - m := &DBMsg{} + m := &DBMsg{ + ID_: id, + } err := b.db.Get(m, selectMsgSQL, id) return m, err } diff --git a/sender.go b/sender.go index 774c24b0b..bf875845e 100644 --- a/sender.go +++ b/sender.go @@ -138,6 +138,8 @@ func (w *Sender) Send() { server := w.foreman.server backend := server.Backend() + var status MsgStatus + for true { // list ourselves as available for work w.foreman.availableSenders <- w @@ -151,20 +153,37 @@ func (w *Sender) Send() { return } - status, err := server.SendMsg(msg) + // was this msg already sent? (from a double queue?) + sent, err := backend.WasMsgSent(msg) + + // failing on a lookup isn't a halting problem but we should log it if err != nil { - log.WithField("msgID", msg.ID().Int64).WithError(err).Info("msg errored") + log.WithField("msgID", msg.ID().Int64).WithError(err).Warning("error looking up msg was sent") + } + + if sent { + // if this message was already sent, create a wired status for it + status = backend.NewMsgStatusForID(msg.Channel(), msg.ID(), MsgWired) + log.WithField("msgID", msg.ID().Int64).Warning("duplicate send, marking as wired") } else { - log.WithField("msgID", msg.ID().Int64).Info("msg sent") + // send our message + status, err = server.SendMsg(msg) + if err != nil { + log.WithField("msgID", msg.ID().Int64).WithError(err).Info("msg errored") + } else { + log.WithField("msgID", msg.ID().Int64).Info("msg sent") + } } - // record our status - err = backend.WriteMsgStatus(status) - if err != nil { - log.WithField("msgID", msg.ID().Int64).WithError(err).Info("error writing msg status") + // record our status if we have one + if status != nil { + err = backend.WriteMsgStatus(status) + if err != nil { + log.WithField("msgID", msg.ID().Int64).WithError(err).Info("error writing msg status") + } } // mark our send task as complete - backend.MarkOutgoingMsgComplete(msg) + backend.MarkOutgoingMsgComplete(msg, status) } } diff --git a/test.go b/test.go index c81ef6919..353a7e817 100644 --- a/test.go +++ b/test.go @@ -48,8 +48,13 @@ func (mb *MockBackend) PopNextOutgoingMsg() (Msg, error) { return nil, nil } +// WasMsgSent returns whether the passed in msg was already sent +func (mb *MockBackend) WasMsgSent(msg Msg) (bool, error) { + return false, nil +} + // MarkOutgoingMsgComplete marks the passed msg as having been dealt with -func (mb *MockBackend) MarkOutgoingMsgComplete(m Msg) { +func (mb *MockBackend) MarkOutgoingMsgComplete(m Msg, s MsgStatus) { } // WriteChannelLogs writes the passed in channel logs to the DB