Skip to content

Commit

Permalink
Merge branch 'master' into stop-contacts
Browse files Browse the repository at this point in the history
  • Loading branch information
nicpottier authored Jul 17, 2017
2 parents a9e0424 + a8daf8b commit 4cefc7f
Show file tree
Hide file tree
Showing 19 changed files with 261 additions and 93 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ before_script:
- go get github.com/mattn/goveralls

script:
- go test github.com/nyaruka/courier/... -p=1
- go test github.com/nyaruka/courier/... -p=1 -bench=.
- $HOME/gopath/bin/goveralls -service=travis-ci -v

after_success:
Expand Down
6 changes: 3 additions & 3 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (b *backend) WasMsgSent(msg courier.Msg) (bool, error) {
defer rc.Close()

dateKey := fmt.Sprintf(sentSetName, time.Now().In(time.UTC).Format("2006_01_02"))
found, err := redis.Bool(rc.Do("sismember", dateKey, msg.ID().Int64))
found, err := redis.Bool(rc.Do("sismember", dateKey, msg.ID()))
if err != nil {
return false, err
}
Expand All @@ -107,7 +107,7 @@ func (b *backend) WasMsgSent(msg courier.Msg) (bool, error) {
}

dateKey = fmt.Sprintf(sentSetName, time.Now().Add(time.Hour*-24).In(time.UTC).Format("2006_01_02"))
found, err = redis.Bool(rc.Do("sismember", dateKey, msg.ID().Int64))
found, err = redis.Bool(rc.Do("sismember", dateKey, msg.ID()))
return found, err
}

Expand All @@ -122,7 +122,7 @@ func (b *backend) MarkOutgoingMsgComplete(msg courier.Msg, status courier.MsgSta
// mark as sent in redis as well if this was actually wired or sent
if status != nil && (status.Status() == courier.MsgSent || status.Status() == courier.MsgWired) {
dateKey := fmt.Sprintf(sentSetName, time.Now().In(time.UTC).Format("2006_01_02"))
rc.Do("sadd", dateKey, msg.ID().Int64)
rc.Do("sadd", dateKey, msg.ID())
}
}

Expand Down
4 changes: 4 additions & 0 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/config"
"github.com/nyaruka/courier/queue"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/suite"
)

Expand All @@ -28,6 +29,9 @@ func testConfig() *config.Courier {
}

func (ts *MsgTestSuite) SetupSuite() {
// turn off logging
logrus.SetOutput(ioutil.Discard)

b, err := courier.NewBackend(testConfig())
if err != nil {
log.Fatalf("unable to create rapidpro backend: %v", err)
Expand Down
12 changes: 2 additions & 10 deletions backends/rapidpro/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,6 @@ import (
"github.com/nyaruka/courier/utils"
)

// ChannelID is our SQL type for a channel's id
type ChannelID struct {
sql.NullInt64
}

// NilChannelID is our nil value for ChannelIDs
var NilChannelID = ChannelID{sql.NullInt64{Int64: 0, Valid: false}}

// getChannelFromUUID will look up the channel with the passed in UUID and channel type.
// It will return an error if the channel does not exist or is not active.
func getChannel(b *backend, channelType courier.ChannelType, channelUUID courier.ChannelUUID) (courier.Channel, error) {
Expand Down Expand Up @@ -133,7 +125,7 @@ var channelCache = make(map[courier.ChannelUUID]*DBChannel)
// DBChannel is the RapidPro specific concrete type satisfying the courier.Channel interface
type DBChannel struct {
OrgID_ OrgID `db:"org_id"`
ID_ ChannelID `db:"id"`
ID_ courier.ChannelID `db:"id"`
ChannelType_ courier.ChannelType `db:"channel_type"`
Scheme_ string `db:"scheme"`
UUID_ courier.ChannelUUID `db:"uuid"`
Expand All @@ -154,7 +146,7 @@ func (c *DBChannel) ChannelType() courier.ChannelType { return c.ChannelType_ }
func (c *DBChannel) Scheme() string { return c.Scheme_ }

// ID returns the id of this channel
func (c *DBChannel) ID() ChannelID { return c.ID_ }
func (c *DBChannel) ID() courier.ChannelID { return c.ID_ }

// UUID returns the UUID of this channel
func (c *DBChannel) UUID() courier.ChannelUUID { return c.UUID_ }
Expand Down
2 changes: 1 addition & 1 deletion backends/rapidpro/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ WHERE u.urn = $1 AND u.contact_id = c.id AND u.org_id = $2 AND c.is_active = TRU
`

// contactForURN first tries to look up a contact for the passed in URN, if not finding one then creating one
func contactForURN(db *sqlx.DB, org OrgID, channelID ChannelID, urn courier.URN, name string) (*DBContact, error) {
func contactForURN(db *sqlx.DB, org OrgID, channelID courier.ChannelID, urn courier.URN, name string) (*DBContact, error) {
// try to look up our contact by URN
contact := DBContact{}
err := db.Get(&contact, lookupContactFromURNSQL, urn, org)
Expand Down
6 changes: 3 additions & 3 deletions backends/rapidpro/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,9 @@ type DBMsg struct {
Attachments_ []string `json:"attachments"`
ExternalID_ string `json:"external_id" db:"external_id"`

ChannelID_ ChannelID `json:"channel_id" db:"channel_id"`
ContactID_ ContactID `json:"contact_id" db:"contact_id"`
ContactURNID_ ContactURNID `json:"contact_urn_id" db:"contact_urn_id"`
ChannelID_ courier.ChannelID `json:"channel_id" db:"channel_id"`
ContactID_ ContactID `json:"contact_id" db:"contact_id"`
ContactURNID_ ContactURNID `json:"contact_urn_id" db:"contact_urn_id"`

MessageCount_ int `json:"msg_count" db:"msg_count"`
ErrorCount_ int `json:"error_count" db:"error_count"`
Expand Down
20 changes: 10 additions & 10 deletions backends/rapidpro/urn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type ContactURNID struct {
var NilContactURNID = ContactURNID{sql.NullInt64{Int64: 0, Valid: false}}

// NewDBContactURN returns a new ContactURN object for the passed in org, contact and string urn, this is not saved to the DB yet
func newDBContactURN(org OrgID, channelID ChannelID, contactID ContactID, urn courier.URN) *DBContactURN {
func newDBContactURN(org OrgID, channelID courier.ChannelID, contactID ContactID, urn courier.URN) *DBContactURN {
offset := strings.Index(string(urn), ":")
scheme := string(urn)[:offset]
path := string(urn)[offset+1:]
Expand All @@ -34,7 +34,7 @@ ORDER BY priority desc LIMIT 1

// contactURNForURN returns the ContactURN for the passed in org and URN, creating and associating
// it with the passed in contact if necessary
func contactURNForURN(db *sqlx.DB, org OrgID, channelID ChannelID, contactID ContactID, urn courier.URN) (*DBContactURN, error) {
func contactURNForURN(db *sqlx.DB, org OrgID, channelID courier.ChannelID, contactID ContactID, urn courier.URN) (*DBContactURN, error) {
contactURN := newDBContactURN(org, channelID, contactID, urn)
err := db.Get(contactURN, selectOrgURN, org, urn)
if err != nil && err != sql.ErrNoRows {
Expand Down Expand Up @@ -98,12 +98,12 @@ func updateContactURN(db *sqlx.DB, urn *DBContactURN) error {

// DBContactURN is our struct to map to database level URNs
type DBContactURN struct {
OrgID OrgID `db:"org_id"`
ID ContactURNID `db:"id"`
URN courier.URN `db:"urn"`
Scheme string `db:"scheme"`
Path string `db:"path"`
Priority int `db:"priority"`
ChannelID ChannelID `db:"channel_id"`
ContactID ContactID `db:"contact_id"`
OrgID OrgID `db:"org_id"`
ID ContactURNID `db:"id"`
URN courier.URN `db:"urn"`
Scheme string `db:"scheme"`
Path string `db:"path"`
Priority int `db:"priority"`
ChannelID courier.ChannelID `db:"channel_id"`
ContactID ContactID `db:"contact_id"`
}
42 changes: 42 additions & 0 deletions channel.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package courier

import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"

uuid "github.com/satori/go.uuid"
Expand Down Expand Up @@ -56,6 +59,45 @@ func NewChannelUUID(u string) (ChannelUUID, error) {
return ChannelUUID{channelUUID}, nil
}

// ChannelID is our SQL type for a channel's id
type ChannelID int64

// NewChannelID creates a new ChannelID for the passed in int64
func NewChannelID(id int64) ChannelID {
return ChannelID(id)
}

// UnmarshalText satisfies text unmarshalling so ids can be decoded from forms
func (i *ChannelID) UnmarshalText(text []byte) (err error) {
id, err := strconv.ParseInt(string(text), 10, 64)
*i = ChannelID(id)
if err != nil {
return err
}
return err
}

// UnmarshalJSON satisfies json unmarshalling so ids can be decoded from JSON
func (i *ChannelID) UnmarshalJSON(bytes []byte) (err error) {
var id int64
err = json.Unmarshal(bytes, &id)
*i = ChannelID(id)
return err
}

// MarshalJSON satisfies json marshalling so ids can be encoded to JSON
func (i *ChannelID) MarshalJSON() ([]byte, error) {
return json.Marshal(int64(*i))
}

// String satisfies the Stringer interface
func (i *ChannelID) String() string {
return fmt.Sprintf("%d", i)
}

// NilChannelID is our nil value for ChannelIDs
var NilChannelID = ChannelID(0)

// ErrChannelExpired is returned when our cached channel has outlived it's TTL
var ErrChannelExpired = errors.New("channel expired")

Expand Down
1 change: 0 additions & 1 deletion cmd/courier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
// load channel handler packages
_ "github.com/nyaruka/courier/handlers/africastalking"
_ "github.com/nyaruka/courier/handlers/blackmyna"
_ "github.com/nyaruka/courier/handlers/dummy"
_ "github.com/nyaruka/courier/handlers/kannel"
_ "github.com/nyaruka/courier/handlers/telegram"
_ "github.com/nyaruka/courier/handlers/twilio"
Expand Down
30 changes: 30 additions & 0 deletions handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package courier

func init() {
RegisterHandler(NewHandler())
}

type dummyHandler struct {
server Server
backend Backend
}

// NewHandler returns a new Dummy handler
func NewHandler() ChannelHandler {
return &dummyHandler{}
}

func (h *dummyHandler) ChannelName() string { return "Dummy Handler" }
func (h *dummyHandler) ChannelType() ChannelType { return ChannelType("DM") }

// Initialize is called by the engine once everything is loaded
func (h *dummyHandler) Initialize(s Server) error {
h.server = s
h.backend = s.Backend()
return nil
}

// SendMsg sends the passed in message, returning any error
func (h *dummyHandler) SendMsg(msg Msg) (MsgStatus, error) {
return h.backend.NewMsgStatusForID(msg.Channel(), msg.ID(), MsgSent), nil
}
33 changes: 0 additions & 33 deletions handlers/dummy/dummy.go

This file was deleted.

3 changes: 1 addition & 2 deletions handlers/external/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"time"

"strings"
Expand Down Expand Up @@ -184,7 +183,7 @@ func (h *handler) SendMsg(msg courier.Msg) (courier.MsgStatus, error) {

// build our request
form := map[string]string{
"id": strconv.FormatInt(msg.ID().Int64, 10),
"id": fmt.Sprintf("%d", msg.ID()),
"text": courier.GetTextAndAttachments(msg),
"to": msg.URN().Path(),
"to_no_plus": strings.TrimPrefix(msg.URN().Path(), "+"),
Expand Down
2 changes: 1 addition & 1 deletion handlers/kannel/kannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (h *handler) SendMsg(msg courier.Msg) (courier.MsgStatus, error) {
return nil, fmt.Errorf("no send url set for KN channel")
}

dlrURL := fmt.Sprintf("%s%s%s/?id=%d&status=%%d", h.Server().Config().BaseURL, "/c/kn/", msg.Channel().UUID(), msg.ID().Int64)
dlrURL := fmt.Sprintf("%s%s%s/?id=%d&status=%%d", h.Server().Config().BaseURL, "/c/kn/", msg.Channel().UUID(), msg.ID())

// build our request
form := url.Values{
Expand Down
19 changes: 16 additions & 3 deletions handlers/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
_ "github.com/lib/pq" // postgres driver
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/config"
"github.com/pressly/lg"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -125,10 +127,21 @@ func testHandlerRequest(tb testing.TB, s courier.Server, url string, data string
return body
}

func newServer(backend courier.Backend) courier.Server {
// for benchmarks, log to null
logger := logrus.New()
logger.Out = ioutil.Discard
lg.RedirectStdlogOutput(logger)
lg.DefaultLogger = logger
logrus.SetOutput(ioutil.Discard)

return courier.NewServerWithLogger(config.NewTest(), backend, logger)
}

// RunChannelSendTestCases runs all the passed in test cases against the channel
func RunChannelSendTestCases(t *testing.T, channel courier.Channel, handler courier.ChannelHandler, testCases []ChannelSendTestCase) {
mb := courier.NewMockBackend()
s := courier.NewServer(config.NewTest(), mb)
s := newServer(mb)
mb.AddChannel(channel)
handler.Initialize(s)

Expand Down Expand Up @@ -215,7 +228,7 @@ func RunChannelSendTestCases(t *testing.T, channel courier.Channel, handler cour
// RunChannelTestCases runs all the passed in tests cases for the passed in channel configurations
func RunChannelTestCases(t *testing.T, channels []courier.Channel, handler courier.ChannelHandler, testCases []ChannelHandleTestCase) {
mb := courier.NewMockBackend()
s := courier.NewServer(config.NewTest(), mb)
s := newServer(mb)

for _, ch := range channels {
mb.AddChannel(ch)
Expand Down Expand Up @@ -281,7 +294,7 @@ func RunChannelTestCases(t *testing.T, channels []courier.Channel, handler couri
// RunChannelBenchmarks runs all the passed in test cases for the passed in channels
func RunChannelBenchmarks(b *testing.B, channels []courier.Channel, handler courier.ChannelHandler, testCases []ChannelHandleTestCase) {
mb := courier.NewMockBackend()
s := courier.NewServer(config.NewTest(), mb)
s := newServer(mb)

for _, ch := range channels {
mb.AddChannel(ch)
Expand Down
Loading

0 comments on commit 4cefc7f

Please sign in to comment.