Skip to content

Commit

Permalink
dedupe incoming messages in a 4 second window
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier committed Jul 10, 2017
1 parent d48a80d commit ca8f4e5
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 7 deletions.
11 changes: 10 additions & 1 deletion backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,16 @@ func (b *backend) GetChannel(ct courier.ChannelType, uuid courier.ChannelUUID) (

// NewIncomingMsg creates a new message from the given params
func (b *backend) NewIncomingMsg(channel courier.Channel, urn courier.URN, text string) courier.Msg {
return newMsg(MsgIncoming, channel, urn, text)
msg := newMsg(MsgIncoming, channel, urn, text)

// have we seen this msg in the past period?
prevUUID := checkMsgSeen(b, msg)
if prevUUID != courier.NilMsgUUID {
// if so, use its UUID and mark that we've been written
msg.UUID_ = prevUUID
msg.AlreadyWritten_ = true
}
return msg
}

// NewOutgoingMsg creates a new outgoing message from the given params
Expand Down
11 changes: 10 additions & 1 deletion backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (ts *MsgTestSuite) TestWriteMsg() {

// create a new courier msg
urn := courier.NewTelURNForChannel("12065551212", knChannel)
msg := newMsg(MsgIncoming, knChannel, urn, "test123").WithExternalID("ext123").WithReceivedOn(now).WithContactName("test contact")
msg := ts.b.NewIncomingMsg(knChannel, urn, "test123").WithExternalID("ext123").WithReceivedOn(now).WithContactName("test contact").(*DBMsg)

// try to write it to our db
err := ts.b.WriteMsg(msg)
Expand Down Expand Up @@ -284,6 +284,15 @@ func (ts *MsgTestSuite) TestWriteMsg() {
ts.Equal(m.ContactID_, contact.ID)
ts.NotNil(contact.UUID)
ts.NotNil(contact.ID)

// creating the incoming msg again should give us the same UUID and have the msg set as not to write
msg2 := ts.b.NewIncomingMsg(knChannel, urn, "test123").(*DBMsg)
ts.Equal(msg2.UUID(), msg.UUID())

// waiting 5 seconds should let us write it successfully
time.Sleep(5 * time.Second)
msg3 := ts.b.NewIncomingMsg(knChannel, urn, "test123").(*DBMsg)
ts.NotEqual(msg3.UUID(), msg.UUID())
}

func TestMsgSuite(t *testing.T) {
Expand Down
87 changes: 82 additions & 5 deletions backends/rapidpro/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strings"
"time"

"github.com/garyburd/redigo/redis"
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/queue"
"github.com/nyaruka/courier/utils"
Expand Down Expand Up @@ -52,6 +53,11 @@ const (
func writeMsg(b *backend, msg courier.Msg) error {
m := msg.(*DBMsg)

// this msg has already been written (we received it twice), we are a no op
if m.AlreadyWritten_ {
return nil
}

// if we have media, go download it to S3
for i, attachment := range m.Attachments_ {
if strings.HasPrefix(attachment, "http") {
Expand All @@ -70,17 +76,18 @@ func writeMsg(b *backend, msg courier.Msg) error {
if err != nil {
return courier.WriteToSpool(b.config.SpoolDir, "msgs", m)
}

// mark this msg as having been seen
writeMsgSeen(b, m)
return err
}

// newMsg creates a new DBMsg object with the passed in parameters
func newMsg(direction MsgDirection, channel courier.Channel, urn courier.URN, text string) courier.Msg {
func newMsg(direction MsgDirection, channel courier.Channel, urn courier.URN, text string) *DBMsg {
now := time.Now()
dbChannel := channel.(*DBChannel)

return &DBMsg{
Channel_: channel,

OrgID_: dbChannel.OrgID(),
UUID_: courier.NewMsgUUID(),
Direction_: direction,
Expand All @@ -99,6 +106,10 @@ func newMsg(direction MsgDirection, channel courier.Channel, urn courier.URN, te
CreatedOn_: now,
ModifiedOn_: now,
QueuedOn_: now,

Channel_: channel,
WorkerToken_: "",
AlreadyWritten_: false,
}
}

Expand Down Expand Up @@ -226,6 +237,66 @@ func (b *backend) flushMsgFile(filename string, contents []byte) error {
return err
}

//-----------------------------------------------------------------------------
// Deduping utility methods
//-----------------------------------------------------------------------------

var luaMsgSeen = redis.NewScript(3, `-- KEYS: [Window, PrevWindow, Fingerprint]
-- try to look up in window
local uuid = redis.call("hget", KEYS[1], KEYS[3])
-- didn't find it, try in our previous window
if not uuid then
uuid = redis.call("hget", KEYS[2], KEYS[3])
end
-- return the uuid found if any
return uuid
`)

// checkMsgSeen tries to look up whether a msg with the fingerprint passed in was seen in window or prevWindow. If
// found returns the UUID of that msg, if not returns empty string
func checkMsgSeen(b *backend, msg *DBMsg) courier.MsgUUID {
r := b.redisPool.Get()
defer r.Close()

fingerprint := msg.fingerprint()

now := time.Now().In(time.UTC)
prev := now.Add(time.Second * -2)
windowKey := fmt.Sprintf("seen:msgs:%s:%02d", now.Format("2006-01-02-15:05"), now.Second()/2*2)
prevWindowKey := fmt.Sprintf("seen:msgs:%s:%02d", prev.Format("2006-01-02-15:05"), prev.Second()/2*2)

// try to look up our UUID from either window or prev window
foundUUID, _ := redis.String(luaMsgSeen.Do(r, windowKey, prevWindowKey, fingerprint))
if foundUUID != "" {
return courier.NewMsgUUIDFromString(foundUUID)
}
return courier.NilMsgUUID
}

var luaWriteMsgSeen = redis.NewScript(3, `-- KEYS: [Window, Fingerprint, UUID]
redis.call("hset", KEYS[1], KEYS[2], KEYS[3])
redis.call("pexpire", KEYS[1], 5000)
`)

// writeMsgSeen writes that the message with the passed in fingerprint and UUID was seen in the
// passed in window
func writeMsgSeen(b *backend, msg *DBMsg) {
r := b.redisPool.Get()
defer r.Close()

fingerprint := msg.fingerprint()
now := time.Now().In(time.UTC)
windowKey := fmt.Sprintf("seen:msgs:%s:%02d", now.Format("2006-01-02-15:05"), now.Second()/2*2)

luaWriteMsgSeen.Do(r, windowKey, fingerprint, msg.UUID().String())
}

//-----------------------------------------------------------------------------
// Our implementation of Msg interface
//-----------------------------------------------------------------------------

// DBMsg is our base struct to represent msgs both in our JSON and db representations
type DBMsg struct {
OrgID_ OrgID `json:"org_id" db:"org_id"`
Expand Down Expand Up @@ -256,8 +327,9 @@ type DBMsg struct {
QueuedOn_ time.Time `json:"queued_on" db:"queued_on"`
SentOn_ time.Time `json:"sent_on" db:"sent_on"`

Channel_ courier.Channel
WorkerToken_ queue.WorkerToken
Channel_ courier.Channel
WorkerToken_ queue.WorkerToken
AlreadyWritten_ bool
}

func (m *DBMsg) Channel() courier.Channel { return m.Channel_ }
Expand All @@ -272,6 +344,11 @@ func (m *DBMsg) ContactName() string { return m.ContactName_ }
func (m *DBMsg) ReceivedOn() *time.Time { return &m.SentOn_ }
func (m *DBMsg) SentOn() *time.Time { return &m.SentOn_ }

// fingerprint returns a fingerprint for this msg, suitable for figuring out if this is a dupe
func (m *DBMsg) fingerprint() string {
return fmt.Sprintf("%s:%s:%s", m.Channel_.UUID(), m.URN_, m.Text_)
}

// WithContactName can be used to set the contact name on a msg
func (m *DBMsg) WithContactName(name string) courier.Msg { m.ContactName_ = name; return m }

Expand Down
6 changes: 6 additions & 0 deletions msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ func NewMsgUUID() MsgUUID {
return MsgUUID{uuid.NewV4()}
}

// NewMsgUUIDFromString creates a new message UUID for the passed in string
func NewMsgUUIDFromString(uuidString string) MsgUUID {
uuid, _ := uuid.FromString(uuidString)
return MsgUUID{uuid}
}

//-----------------------------------------------------------------------------
// Msg interface
//-----------------------------------------------------------------------------
Expand Down

0 comments on commit ca8f4e5

Please sign in to comment.