Skip to content

Commit

Permalink
Merge branch 'master' of github.com:nyaruka/courier into SQ-handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
norkans7 committed Aug 22, 2017
2 parents 16c0715 + 59a06f8 commit d886012
Show file tree
Hide file tree
Showing 332 changed files with 60,848 additions and 28,260 deletions.
2 changes: 2 additions & 0 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,5 @@ type backend struct {
stopChan chan bool
waitGroup *sync.WaitGroup
}


21 changes: 14 additions & 7 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (ts *MsgTestSuite) TestMsgUnmarshal() {
msgJSON := `{
"status": "P",
"direction": "O",
"attachments": null,
"attachments": ["https://foo.bar/image.jpg"],
"queued_on": null,
"text": "Test message 21",
"contact_id": 30,
Expand All @@ -120,7 +120,8 @@ func (ts *MsgTestSuite) TestMsgUnmarshal() {
err := json.Unmarshal([]byte(msgJSON), &msg)
ts.NoError(err)
ts.Equal(msg.ChannelUUID_.String(), "f3ad3eb6-d00d-4dc3-92e9-9f34f32940ba")
ts.Equal(msg.ChannelID_, courier.ChannelID(11))
ts.Equal(msg.ChannelID_, courier.NewChannelID(11))
ts.Equal([]string{"https://foo.bar/image.jpg"}, msg.Attachments())
ts.Equal(msg.ExternalID_, "")
}

Expand Down Expand Up @@ -442,6 +443,12 @@ func (ts *MsgTestSuite) TestWriteAttachment() {
ts.NoError(err)
ts.True(strings.HasPrefix(msg.Attachments()[0], "image/png:"))
ts.True(strings.HasSuffix(msg.Attachments()[0], ".png"))

// load it back from the id
m, err := readMsgFromDB(ts.b, msg.ID())
ts.NoError(err)
ts.True(strings.HasPrefix(m.Attachments()[0], "image/png:"))
ts.True(strings.HasSuffix(m.Attachments()[0], ".png"))
}

func (ts *MsgTestSuite) TestWriteMsg() {
Expand All @@ -458,6 +465,10 @@ func (ts *MsgTestSuite) TestWriteMsg() {
err := ts.b.WriteMsg(msg)
ts.NoError(err)

// creating the incoming msg again should give us the same UUID and have the msg set as not to write
msg2 := ts.b.NewIncomingMsg(knChannel, urn, "test123").(*DBMsg)
ts.Equal(msg2.UUID(), msg.UUID())

// check we had an id set
ts.NotZero(msg.ID)

Expand All @@ -480,7 +491,7 @@ func (ts *MsgTestSuite) TestWriteMsg() {
ts.Equal(DefaultPriority, m.Priority_)
ts.Equal("ext123", m.ExternalID_)
ts.Equal("test123", m.Text_)
ts.Equal([]string(nil), m.Attachments_)
ts.Equal(0, len(m.Attachments()))
ts.Equal(1, m.MessageCount_)
ts.Equal(0, m.ErrorCount_)
ts.Equal(now, m.SentOn_.In(time.UTC))
Expand All @@ -496,10 +507,6 @@ func (ts *MsgTestSuite) TestWriteMsg() {
ts.NotNil(contact.UUID)
ts.NotNil(contact.ID)

// creating the incoming msg again should give us the same UUID and have the msg set as not to write
msg2 := ts.b.NewIncomingMsg(knChannel, urn, "test123").(*DBMsg)
ts.Equal(msg2.UUID(), msg.UUID())

// waiting 5 seconds should let us write it successfully
time.Sleep(5 * time.Second)
msg3 := ts.b.NewIncomingMsg(knChannel, urn, "test123").(*DBMsg)
Expand Down
51 changes: 2 additions & 49 deletions backends/rapidpro/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@ package rapidpro

import (
"database/sql"
"database/sql/driver"
"encoding/csv"
"errors"
"regexp"
"strings"
"sync"
"time"

"github.com/lib/pq"
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/utils"
)
Expand Down Expand Up @@ -132,7 +128,7 @@ type DBChannel struct {
OrgID_ OrgID `db:"org_id"`
ID_ courier.ChannelID `db:"id"`
ChannelType_ courier.ChannelType `db:"channel_type"`
Schemes_ StringSlice `db:"schemes"`
Schemes_ pq.StringArray `db:"schemes"`
UUID_ courier.ChannelUUID `db:"uuid"`
Address_ sql.NullString `db:"address"`
Country_ sql.NullString `db:"country"`
Expand Down Expand Up @@ -195,46 +191,3 @@ func (c *DBChannel) supportsScheme(scheme string) bool {
}
return false
}

// StringSlice is our custom implementation of a string array to support Postgres coding / encoding of schemes
type StringSlice []string

var quoteEscapeRegex = regexp.MustCompile(`([^\\]([\\]{2})*)\\"`)

// Scan convert a SQL value into our string array
// http://www.postgresql.org/docs/9.1/static/arrays.html#ARRAYS-IO
func (s *StringSlice) Scan(src interface{}) error {
asBytes, ok := src.([]byte)
if !ok {
return error(errors.New("Scan source was not []bytes"))
}
str := string(asBytes)

// change quote escapes for csv parser
str = quoteEscapeRegex.ReplaceAllString(str, `$1""`)
str = strings.Replace(str, `\\`, `\`, -1)
// remove braces
str = str[1 : len(str)-1]
csvReader := csv.NewReader(strings.NewReader(str))

slice, err := csvReader.Read()

if err != nil {
return err
}

(*s) = StringSlice(slice)

return nil
}

// Value returns the SQL encoded version of our StringSlice
func (s StringSlice) Value() (driver.Value, error) {
// string escapes.
// \ => \\\
// " => \"
for i, elem := range s {
s[i] = `"` + strings.Replace(strings.Replace(elem, `\`, `\\\`, -1), `"`, `\"`, -1) + `"`
}
return "{" + strings.Join(s, ",") + "}", nil
}
26 changes: 15 additions & 11 deletions backends/rapidpro/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ import (
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"mime"

"github.com/garyburd/redigo/redis"
"github.com/lib/pq"
"github.com/nyaruka/courier"
"github.com/nyaruka/courier/queue"
"github.com/nyaruka/courier/utils"
"github.com/sirupsen/logrus"
filetype "gopkg.in/h2non/filetype.v1"
)

Expand Down Expand Up @@ -63,7 +66,7 @@ func writeMsg(b *backend, msg courier.Msg) error {
// if we have media, go download it to S3
for i, attachment := range m.Attachments_ {
if strings.HasPrefix(attachment, "http") {
url, err := downloadMediaToS3(b, m.UUID_, attachment)
url, err := downloadMediaToS3(b, m.OrgID_, m.UUID_, attachment)
if err != nil {
return err
}
Expand All @@ -76,6 +79,7 @@ func writeMsg(b *backend, msg courier.Msg) error {

// fail? spool for later
if err != nil {
logrus.WithError(err).WithField("msg", m.UUID().String()).Error("error writing to db")
return courier.WriteToSpool(b.config.SpoolDir, "msgs", m)
}

Expand Down Expand Up @@ -116,9 +120,9 @@ func newMsg(direction MsgDirection, channel courier.Channel, urn courier.URN, te
}

const insertMsgSQL = `
INSERT INTO msgs_msg(org_id, direction, has_template_error, text, msg_count, error_count, priority, status,
INSERT INTO msgs_msg(org_id, direction, has_template_error, text, attachments, msg_count, error_count, priority, status,
visibility, external_id, channel_id, contact_id, contact_urn_id, created_on, modified_on, next_attempt, queued_on, sent_on)
VALUES(:org_id, :direction, FALSE, :text, :msg_count, :error_count, :priority, :status,
VALUES(:org_id, :direction, FALSE, :text, :attachments, :msg_count, :error_count, :priority, :status,
:visibility, :external_id, :channel_id, :contact_id, :contact_urn_id, :created_on, :modified_on, :next_attempt, :queued_on, :sent_on)
RETURNING id
`
Expand Down Expand Up @@ -153,7 +157,7 @@ func writeMsgToDB(b *backend, m *DBMsg) error {
}

const selectMsgSQL = `
SELECT org_id, direction, text, msg_count, error_count, priority, status,
SELECT org_id, direction, text, attachments, msg_count, error_count, priority, status,
visibility, external_id, channel_id, contact_id, contact_urn_id, created_on, modified_on, next_attempt, queued_on, sent_on
FROM msgs_msg
WHERE id = $1
Expand All @@ -171,7 +175,7 @@ func readMsgFromDB(b *backend, id courier.MsgID) (*DBMsg, error) {
// Media download and classification
//-----------------------------------------------------------------------------

func downloadMediaToS3(b *backend, msgUUID courier.MsgUUID, mediaURL string) (string, error) {
func downloadMediaToS3(b *backend, orgID OrgID, msgUUID courier.MsgUUID, mediaURL string) (string, error) {
parsedURL, err := url.Parse(mediaURL)
if err != nil {
return "", err
Expand Down Expand Up @@ -230,7 +234,7 @@ func downloadMediaToS3(b *backend, msgUUID courier.MsgUUID, mediaURL string) (st
if extension != "" {
filename = fmt.Sprintf("%s.%s", msgUUID, extension)
}
path := filepath.Join(b.config.S3MediaPrefix, filename[:4], filename)
path := filepath.Join(b.config.S3MediaPrefix, strconv.FormatInt(orgID.Int64, 10), filename[:4], filename[4:8], filename)
if !strings.HasPrefix(path, "/") {
path = fmt.Sprintf("/%s", path)
}
Expand Down Expand Up @@ -335,7 +339,7 @@ type DBMsg struct {
Priority_ MsgPriority `json:"priority" db:"priority"`
URN_ courier.URN `json:"urn"`
Text_ string `json:"text" db:"text"`
Attachments_ []string `json:"attachments"`
Attachments_ pq.StringArray `json:"attachments" db:"attachments"`
ExternalID_ string `json:"external_id" db:"external_id"`

ChannelID_ courier.ChannelID `json:"channel_id" db:"channel_id"`
Expand All @@ -354,16 +358,16 @@ type DBMsg struct {
QueuedOn_ time.Time `json:"queued_on" db:"queued_on"`
SentOn_ time.Time `json:"sent_on" db:"sent_on"`

Channel_ courier.Channel
WorkerToken_ queue.WorkerToken
AlreadyWritten_ bool
Channel_ courier.Channel `json:"-"`
WorkerToken_ queue.WorkerToken `json:"-"`
AlreadyWritten_ bool `json:"-"`
}

func (m *DBMsg) Channel() courier.Channel { return m.Channel_ }
func (m *DBMsg) ID() courier.MsgID { return m.ID_ }
func (m *DBMsg) UUID() courier.MsgUUID { return m.UUID_ }
func (m *DBMsg) Text() string { return m.Text_ }
func (m *DBMsg) Attachments() []string { return m.Attachments_ }
func (m *DBMsg) Attachments() []string { return []string(m.Attachments_) }
func (m *DBMsg) ExternalID() string { return m.ExternalID_ }
func (m *DBMsg) URN() courier.URN { return m.URN_ }
func (m *DBMsg) ContactName() string { return m.ContactName_ }
Expand Down
2 changes: 1 addition & 1 deletion backends/rapidpro/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ CREATE TABLE msgs_msg (
error_count integer NOT NULL,
next_attempt timestamp with time zone NOT NULL,
external_id character varying(255),
attachments character varying(255),
attachments character varying(255)[],
channel_id integer references channels_channel(id) on delete cascade,
contact_id integer NOT NULL references contacts_contact(id) on delete cascade,
contact_urn_id integer references contacts_contacturn(id) on delete cascade,
Expand Down
45 changes: 11 additions & 34 deletions channel.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package courier

import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"

null "gopkg.in/guregu/null.v3"

uuid "github.com/satori/go.uuid"
)

Expand Down Expand Up @@ -42,6 +41,10 @@ type ChannelType string
// AnyChannelType is our empty channel type used when doing lookups without channel type assertions
var AnyChannelType = ChannelType("")

func (ct ChannelType) String() string {
return string(ct)
}

// ChannelUUID is our typing of a channel's UUID
type ChannelUUID struct {
uuid.UUID
Expand All @@ -60,43 +63,17 @@ func NewChannelUUID(u string) (ChannelUUID, error) {
}

// ChannelID is our SQL type for a channel's id
type ChannelID int64
type ChannelID struct {
null.Int
}

// NewChannelID creates a new ChannelID for the passed in int64
func NewChannelID(id int64) ChannelID {
return ChannelID(id)
}

// UnmarshalText satisfies text unmarshalling so ids can be decoded from forms
func (i *ChannelID) UnmarshalText(text []byte) (err error) {
id, err := strconv.ParseInt(string(text), 10, 64)
*i = ChannelID(id)
if err != nil {
return err
}
return err
}

// UnmarshalJSON satisfies json unmarshalling so ids can be decoded from JSON
func (i *ChannelID) UnmarshalJSON(bytes []byte) (err error) {
var id int64
err = json.Unmarshal(bytes, &id)
*i = ChannelID(id)
return err
}

// MarshalJSON satisfies json marshalling so ids can be encoded to JSON
func (i *ChannelID) MarshalJSON() ([]byte, error) {
return json.Marshal(int64(*i))
}

// String satisfies the Stringer interface
func (i *ChannelID) String() string {
return fmt.Sprintf("%d", i)
return ChannelID{null.NewInt(id, true)}
}

// NilChannelID is our nil value for ChannelIDs
var NilChannelID = ChannelID(0)
var NilChannelID = ChannelID{null.NewInt(0, false)}

// ErrChannelExpired is returned when our cached channel has outlived it's TTL
var ErrChannelExpired = errors.New("channel expired")
Expand Down
13 changes: 6 additions & 7 deletions cmd/courier/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"log"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -31,7 +30,7 @@ func main() {
config := &config.Courier{}
err := m.Load(config)
if err != nil {
log.Fatalf("Error loading configuration: %s", err)
logrus.Fatalf("Error loading configuration: %s", err)
}

// if we have a custom version, use it
Expand All @@ -43,7 +42,7 @@ func main() {
logrus.SetOutput(os.Stdout)
level, err := logrus.ParseLevel(config.LogLevel)
if err != nil {
log.Fatalf("Invalid log level '%s'", level)
logrus.Fatalf("Invalid log level '%s'", level)
}
logrus.SetLevel(level)

Expand All @@ -55,26 +54,26 @@ func main() {
hook.StacktraceConfiguration.Skip = 4
hook.StacktraceConfiguration.Context = 5
if err != nil {
log.Fatalf("Invalid sentry DSN: '%s': %s", config.SentryDSN, err)
logrus.Fatalf("Invalid sentry DSN: '%s': %s", config.SentryDSN, err)
}
logrus.StandardLogger().Hooks.Add(hook)
}

// load our backend
backend, err := courier.NewBackend(config)
if err != nil {
log.Fatalf("Error creating backend: %s", err)
logrus.Fatalf("Error creating backend: %s", err)
}

server := courier.NewServer(config, backend)
err = server.Start()
if err != nil {
log.Fatalf("Error starting server: %s", err)
logrus.Fatalf("Error starting server: %s", err)
}

ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
log.Println(<-ch)
logrus.WithField("comp", "main").WithField("signal", <-ch).Info("stopping")

server.Stop()
}
3 changes: 3 additions & 0 deletions config/courier.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type Courier struct {

MaxWorkers int `default:"32"`

LibratoUsername string `default:""`
LibratoToken string `default:""`

RapidproHandleURL string `default:"https://app.rapidpro.io/handlers/mage/handle_message"`
RapidproToken string `default:"missing_rapidpro_token"`

Expand Down
Loading

0 comments on commit d886012

Please sign in to comment.