Skip to content

Commit

Permalink
change Msg to be an interface implemented by Backends, remove passthr…
Browse files Browse the repository at this point in the history
…oughs on Server to go straight to Backend
  • Loading branch information
nicpottier committed Jul 10, 2017
1 parent 59d21a9 commit d48a80d
Show file tree
Hide file tree
Showing 25 changed files with 438 additions and 356 deletions.
28 changes: 25 additions & 3 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,39 @@ type BackendConstructorFunc func(*config.Courier) Backend

// Backend represents the part of Courier that deals with looking up and writing channels and results
type Backend interface {
// Start starts the backend and opens any db connections it needs
Start() error

// Stop stops the backend closing any db connections it has open
Stop() error

// GetChannel returns the channel with the passed in type and UUID
GetChannel(ChannelType, ChannelUUID) (Channel, error)
WriteMsg(*Msg) error

// NewIncomingMsg creates a new message from the given params
NewIncomingMsg(channel Channel, urn URN, text string) Msg

// NewOutgoingMsg creates a new outgoing message from the given params
NewOutgoingMsg(channel Channel, urn URN, text string) Msg

// WriteMsg writes the passed in message to our backend
WriteMsg(Msg) error

// WriteMsgStatus writes the passed in status update to our backend
WriteMsgStatus(*MsgStatusUpdate) error

// WriteChannelLogs writes the passed in channel logs to our backend
WriteChannelLogs([]*ChannelLog) error

PopNextOutgoingMsg() (*Msg, error)
MarkOutgoingMsgComplete(*Msg)
// PopNextOutgoingMsg returns the next message that needs to be sent, callers should call MarkOutgoingMsgComplete with the
// returned message when they have dealt with the message (regardless of whether it was sent or not)
PopNextOutgoingMsg() (Msg, error)

// MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case
// of errors during sending as it will manage the number of active workers per channel
MarkOutgoingMsgComplete(Msg)

// Health returns a string describing any health problems the backend has, or empty string if all is well
Health() string
}

Expand Down
37 changes: 23 additions & 14 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,18 @@ func (b *backend) GetChannel(ct courier.ChannelType, uuid courier.ChannelUUID) (
return getChannel(b, ct, uuid)
}

// NewIncomingMsg creates a new message from the given params
func (b *backend) NewIncomingMsg(channel courier.Channel, urn courier.URN, text string) courier.Msg {
return newMsg(MsgIncoming, channel, urn, text)
}

// NewOutgoingMsg creates a new outgoing message from the given params
func (b *backend) NewOutgoingMsg(channel courier.Channel, urn courier.URN, text string) courier.Msg {
return newMsg(MsgOutgoing, channel, urn, text)
}

// PopNextOutgoingMsg pops the next message that needs to be sent
func (b *backend) PopNextOutgoingMsg() (*courier.Msg, error) {
func (b *backend) PopNextOutgoingMsg() (courier.Msg, error) {
// pop the next message off our queue
rc := b.redisPool.Get()
defer rc.Close()
Expand All @@ -47,37 +57,36 @@ func (b *backend) PopNextOutgoingMsg() (*courier.Msg, error) {
}

if msgJSON != "" {
dbMsg := DBMsg{}
err = json.Unmarshal([]byte(msgJSON), &dbMsg)
dbMsg := &DBMsg{}
err = json.Unmarshal([]byte(msgJSON), dbMsg)
if err != nil {
return nil, err
}

// create courier msg from our db msg
channel, err := b.GetChannel(courier.AnyChannelType, dbMsg.ChannelUUID)
// populate the channel on our db msg
channel, err := b.GetChannel(courier.AnyChannelType, dbMsg.ChannelUUID_)
if err != nil {
return nil, err
}

// TODO: what other attributes are needed here?
msg := courier.NewOutgoingMsg(channel, dbMsg.URN, dbMsg.Text).WithID(dbMsg.ID).WithExternalID(dbMsg.ExternalID)
msg.WorkerToken = token

return msg, nil
dbMsg.Channel_ = channel
dbMsg.WorkerToken_ = token
return dbMsg, nil
}

return nil, nil
}

// MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel
func (b *backend) MarkOutgoingMsgComplete(msg *courier.Msg) {
func (b *backend) MarkOutgoingMsgComplete(msg courier.Msg) {
rc := b.redisPool.Get()
defer rc.Close()
queue.MarkComplete(rc, msgQueueName, msg.WorkerToken)

dbMsg := msg.(*DBMsg)
queue.MarkComplete(rc, msgQueueName, dbMsg.WorkerToken_)
}

// WriteMsg writes the passed in message to our store
func (b *backend) WriteMsg(m *courier.Msg) error {
func (b *backend) WriteMsg(m courier.Msg) error {
return writeMsg(b, m)
}

Expand Down
67 changes: 36 additions & 31 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func (ts *MsgTestSuite) SetupSuite() {
panic(fmt.Errorf("Unable to read testdata.sql: %s", err))
}
ts.b.db.MustExec(string(sql))

// clear redis
r := ts.b.redisPool.Get()
defer r.Close()
r.Do("FLUSHDB")
}

func (ts *MsgTestSuite) TearDownSuite() {
Expand Down Expand Up @@ -145,17 +150,17 @@ func (ts *MsgTestSuite) TestStatus() {
ts.NoError(err)
m, err := readMsgFromDB(ts.b, courier.NewMsgID(10001))
ts.NoError(err)
ts.Equal(m.Status, courier.MsgSent)
ts.True(m.ModifiedOn.After(now))
ts.Equal(m.Status_, courier.MsgSent)
ts.True(m.ModifiedOn_.After(now))

// update by external id
status = courier.NewStatusUpdateForExternalID(channel, "ext1", courier.MsgFailed)
err = ts.b.WriteMsgStatus(status)
ts.NoError(err)
m, err = readMsgFromDB(ts.b, courier.NewMsgID(10000))
ts.NoError(err)
ts.Equal(m.Status, courier.MsgFailed)
ts.True(m.ModifiedOn.After(now))
ts.Equal(m.Status_, courier.MsgFailed)
ts.True(m.ModifiedOn_.After(now))

// no such external id
status = courier.NewStatusUpdateForExternalID(channel, "ext2", courier.MsgSent)
Expand All @@ -174,7 +179,7 @@ func (ts *MsgTestSuite) TestOutgoingQueue() {
defer r.Close()

dbMsg, err := readMsgFromDB(ts.b, courier.NewMsgID(10000))
dbMsg.ChannelUUID, _ = courier.NewChannelUUID("dbc126ed-66bc-4e28-b67b-81dc3327c95d")
dbMsg.ChannelUUID_, _ = courier.NewChannelUUID("dbc126ed-66bc-4e28-b67b-81dc3327c95d")
ts.NoError(err)
ts.NotNil(dbMsg)

Expand All @@ -191,10 +196,10 @@ func (ts *MsgTestSuite) TestOutgoingQueue() {
ts.NotNil(msg)

// make sure it is the message we just added
ts.Equal(dbMsg.ID, msg.ID)
ts.Equal(dbMsg.ID(), msg.ID())

// and that it has the appropriate text
ts.Equal(msg.Text, "test message")
ts.Equal(msg.Text(), "test message")

// mark this message as dealt with
ts.b.MarkOutgoingMsgComplete(msg)
Expand Down Expand Up @@ -237,7 +242,7 @@ func (ts *MsgTestSuite) TestWriteMsg() {

// create a new courier msg
urn := courier.NewTelURNForChannel("12065551212", knChannel)
msg := courier.NewIncomingMsg(knChannel, urn, "test123").WithExternalID("ext123").WithReceivedOn(now).WithContactName("test contact")
msg := newMsg(MsgIncoming, knChannel, urn, "test123").WithExternalID("ext123").WithReceivedOn(now).WithContactName("test contact")

// try to write it to our db
err := ts.b.WriteMsg(msg)
Expand All @@ -247,36 +252,36 @@ func (ts *MsgTestSuite) TestWriteMsg() {
ts.NotZero(msg.ID)

// load it back from the id
m, err := readMsgFromDB(ts.b, msg.ID)
m, err := readMsgFromDB(ts.b, msg.ID())
ts.NoError(err)

// load our URN
contactURN, err := contactURNForURN(ts.b.db, m.OrgID, m.ChannelID, m.ContactID, urn)
contactURN, err := contactURNForURN(ts.b.db, m.OrgID_, m.ChannelID_, m.ContactID_, urn)
ts.NoError(err)

// make sure our values are set appropriately
ts.Equal(knChannel.ID_, m.ChannelID)
ts.Equal(knChannel.OrgID_, m.OrgID)
ts.Equal(contactURN.ContactID, m.ContactID)
ts.Equal(contactURN.ID, m.ContactURNID)
ts.Equal(MsgIncoming, m.Direction)
ts.Equal(courier.MsgPending, m.Status)
ts.Equal(DefaultPriority, m.Priority)
ts.Equal("ext123", m.ExternalID)
ts.Equal("test123", m.Text)
ts.Equal([]string(nil), m.Attachments)
ts.Equal(1, m.MessageCount)
ts.Equal(0, m.ErrorCount)
ts.Equal(now, m.SentOn.In(time.UTC))
ts.NotNil(m.NextAttempt)
ts.NotNil(m.CreatedOn)
ts.NotNil(m.ModifiedOn)
ts.NotNil(m.QueuedOn)

contact, err := contactForURN(ts.b.db, m.OrgID, m.ChannelID, urn, "")
ts.Equal(knChannel.ID_, m.ChannelID_)
ts.Equal(knChannel.OrgID_, m.OrgID_)
ts.Equal(contactURN.ContactID, m.ContactID_)
ts.Equal(contactURN.ID, m.ContactURNID_)
ts.Equal(MsgIncoming, m.Direction_)
ts.Equal(courier.MsgPending, m.Status_)
ts.Equal(DefaultPriority, m.Priority_)
ts.Equal("ext123", m.ExternalID_)
ts.Equal("test123", m.Text_)
ts.Equal([]string(nil), m.Attachments_)
ts.Equal(1, m.MessageCount_)
ts.Equal(0, m.ErrorCount_)
ts.Equal(now, m.SentOn_.In(time.UTC))
ts.NotNil(m.NextAttempt_)
ts.NotNil(m.CreatedOn_)
ts.NotNil(m.ModifiedOn_)
ts.NotNil(m.QueuedOn_)

contact, err := contactForURN(ts.b.db, m.OrgID_, m.ChannelID_, urn, "")
ts.Equal("test contact", contact.Name)
ts.Equal(m.OrgID, contact.OrgID)
ts.Equal(m.ContactID, contact.ID)
ts.Equal(m.OrgID_, contact.OrgID)
ts.Equal(m.ContactID_, contact.ID)
ts.NotNil(contact.UUID)
ts.NotNil(contact.ID)
}
Expand Down
Loading

0 comments on commit d48a80d

Please sign in to comment.