Skip to content

Commit 75b8954

Browse files
committed
Merge branch 'main' into fetch_attachments
2 parents 7c90d90 + a6d5d38 commit 75b8954

File tree

6 files changed

+282
-1
lines changed

6 files changed

+282
-1
lines changed

core/models/msgs.go

+27
Original file line numberDiff line numberDiff line change
@@ -1275,6 +1275,33 @@ func ResendMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa *OrgAsse
12751275
return resent, nil
12761276
}
12771277

1278+
const sqlFailChannelMessages = `
1279+
WITH rows AS (
1280+
SELECT id FROM msgs_msg
1281+
WHERE org_id = $1 AND direction = 'O' AND channel_id = $2 AND status = ANY($3) LIMIT 1000
1282+
)
1283+
UPDATE msgs_msg SET status = 'F', modified_on = NOW() WHERE id IN (SELECT id FROM rows)`
1284+
1285+
func FailChannelMessages(ctx context.Context, db Queryer, orgID OrgID, channelID ChannelID) error {
1286+
if channelID == NilChannelID {
1287+
return nil
1288+
}
1289+
1290+
statuses := []MsgStatus{MsgStatusPending, MsgStatusErrored, MsgStatusQueued}
1291+
for {
1292+
// and update the messages as FAILED
1293+
res, err := db.ExecContext(ctx, sqlFailChannelMessages, orgID, channelID, pq.Array(statuses))
1294+
if err != nil {
1295+
return err
1296+
}
1297+
rows, _ := res.RowsAffected()
1298+
if rows == 0 {
1299+
break
1300+
}
1301+
}
1302+
return nil
1303+
}
1304+
12781305
// MarkBroadcastSent marks the passed in broadcast as sent
12791306
func MarkBroadcastSent(ctx context.Context, db Queryer, id BroadcastID) error {
12801307
// noop if it is a nil id

core/models/msgs_test.go

+27
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,33 @@ func TestResendMessages(t *testing.T) {
488488
assertdb.Query(t, db, `SELECT status, failed_reason FROM msgs_msg WHERE id = $1`, out5.ID()).Columns(map[string]interface{}{"status": "F", "failed_reason": "D"})
489489
}
490490

491+
func TestFailMessages(t *testing.T) {
492+
ctx, _, db, _ := testsuite.Get()
493+
494+
defer testsuite.Reset(testsuite.ResetAll)
495+
496+
out1 := testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "hi", nil, models.MsgStatusPending, false)
497+
out2 := testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Bob, "hi", nil, models.MsgStatusErrored, false)
498+
out3 := testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "hi", nil, models.MsgStatusFailed, false)
499+
out4 := testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "hi", nil, models.MsgStatusQueued, false)
500+
testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.George, "hi", nil, models.MsgStatusQueued, false)
501+
502+
ids := []models.MsgID{models.MsgID(out1.ID()), models.MsgID(out2.ID()), models.MsgID(out3.ID()), models.MsgID(out4.ID())}
503+
println(ids)
504+
505+
now := dates.Now()
506+
507+
// fail the msgs
508+
err := models.FailChannelMessages(ctx, db, testdata.Org1.ID, testdata.TwilioChannel.ID)
509+
require.NoError(t, err)
510+
511+
//assert.Len(t, failedMsgs, 3)
512+
513+
assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'F' AND modified_on > $1`, now).Returns(4)
514+
assertdb.Query(t, db, `SELECT status FROM msgs_msg WHERE id = $1`, out3.ID()).Columns(map[string]interface{}{"status": "F"})
515+
516+
}
517+
491518
func TestGetMsgRepetitions(t *testing.T) {
492519
_, rt, db, rp := testsuite.Get()
493520

core/msgio/courier.go

+21-1
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,27 @@ func QueueCourierMessages(rc redis.Conn, contactID models.ContactID, msgs []*mod
130130
return commitBatch()
131131
}
132132

133+
var clearChannelQueueScript = redis.NewScript(3, `
134+
-- KEYS: [QueueType, QueueName, TPS]
135+
local queueType, queueName, tps = KEYS[1], KEYS[2], tonumber(KEYS[3])
136+
137+
-- first construct the base key for this queue from the type + name + tps, e.g. "msgs:0a77a158-1dcb-4c06-9aee-e15bdf64653e|10"
138+
local queueKey = queueType .. ":" .. queueName .. "|" .. tps
139+
140+
-- clear the sorted sets for the key
141+
redis.call("DEL", queueKey .. "/1")
142+
redis.call("DEL", queueKey .. "/0")
143+
144+
-- reset queue to zero
145+
redis.call("ZADD", queueType .. ":active", 0, queueKey)
146+
147+
`)
148+
149+
func ClearChannelCourierQueue(rc redis.Conn, ch *models.Channel) error {
150+
_, err := clearChannelQueueScript.Do(rc, "msgs", ch.UUID(), ch.TPS())
151+
return err
152+
}
153+
133154
// see https://github.com/nyaruka/courier/blob/main/attachments.go#L23
134155
type fetchAttachmentRequest struct {
135156
ChannelType models.ChannelType `json:"channel_type"`
@@ -165,4 +186,3 @@ func FetchAttachment(ctx context.Context, rt *runtime.Runtime, ch *models.Channe
165186
}
166187

167188
return utils.Attachment(fmt.Sprintf("%s:%s", fa.Attachment.ContentType, fa.Attachment.URL)), models.ChannelLogUUID(fa.LogUUID), nil
168-
}

core/msgio/courier_test.go

+39
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,45 @@ func TestQueueCourierMessages(t *testing.T) {
6161
})
6262
}
6363

64+
func TestClearChannelCourierQueue(t *testing.T) {
65+
ctx, rt, _, rp := testsuite.Get()
66+
rc := rp.Get()
67+
defer rc.Close()
68+
69+
defer testsuite.Reset(testsuite.ResetData | testsuite.ResetRedis)
70+
71+
oa, err := models.GetOrgAssetsWithRefresh(ctx, rt, testdata.Org1.ID, models.RefreshOrg|models.RefreshChannels)
72+
require.NoError(t, err)
73+
74+
// queue 3 messages for Cathy..
75+
msgs := []*models.Msg{
76+
(&msgSpec{Channel: testdata.TwilioChannel, Contact: testdata.Cathy}).createMsg(t, rt, oa),
77+
(&msgSpec{Channel: testdata.TwilioChannel, Contact: testdata.Cathy}).createMsg(t, rt, oa),
78+
(&msgSpec{Channel: testdata.TwilioChannel, Contact: testdata.Cathy, HighPriority: true}).createMsg(t, rt, oa),
79+
(&msgSpec{Channel: testdata.VonageChannel, Contact: testdata.Cathy}).createMsg(t, rt, oa),
80+
}
81+
82+
msgio.QueueCourierMessages(rc, testdata.Cathy.ID, msgs)
83+
84+
testsuite.AssertCourierQueues(t, map[string][]int{
85+
"msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": {2}, // twilio, bulk priority
86+
"msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/1": {1}, // twilio, high priority
87+
"msgs:19012bfd-3ce3-4cae-9bb9-76cf92c73d49|10/0": {1}, // vonage, bulk priority
88+
})
89+
90+
twilioChannel := oa.ChannelByID(testdata.TwilioChannel.ID)
91+
msgio.ClearChannelCourierQueue(rc, twilioChannel)
92+
93+
testsuite.AssertCourierQueues(t, map[string][]int{
94+
"msgs:19012bfd-3ce3-4cae-9bb9-76cf92c73d49|10/0": {1}, // vonage, bulk priority
95+
})
96+
97+
vonageChannel := oa.ChannelByID(testdata.VonageChannel.ID)
98+
msgio.ClearChannelCourierQueue(rc, vonageChannel)
99+
testsuite.AssertCourierQueues(t, map[string][]int{})
100+
101+
}
102+
64103
func TestPushCourierBatch(t *testing.T) {
65104
ctx, rt, _, rp := testsuite.Get()
66105
rc := rp.Get()
+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package channels
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/nyaruka/mailroom/core/models"
8+
"github.com/nyaruka/mailroom/core/msgio"
9+
"github.com/nyaruka/mailroom/core/tasks"
10+
"github.com/nyaruka/mailroom/runtime"
11+
"github.com/pkg/errors"
12+
)
13+
14+
// TypeInterruptChannel is the type of the interruption of a channel
15+
const TypeInterruptChannel = "interrupt_channel"
16+
17+
func init() {
18+
tasks.RegisterType(TypeInterruptChannel, func() tasks.Task { return &InterruptChannelTask{} })
19+
}
20+
21+
// InterruptChannelTask is our task to interrupt a channel
22+
type InterruptChannelTask struct {
23+
ChannelID models.ChannelID `json:"channel_id"`
24+
}
25+
26+
// Perform implements tasks.Task
27+
func (t *InterruptChannelTask) Perform(ctx context.Context, rt *runtime.Runtime, orgID models.OrgID) error {
28+
db := rt.DB
29+
rc := rt.RP.Get()
30+
defer rc.Close()
31+
32+
channelIDs := []models.ChannelID{t.ChannelID}
33+
34+
channels, err := models.GetChannelsByID(ctx, db, channelIDs)
35+
if err != nil {
36+
return errors.Wrapf(err, "error getting channels")
37+
}
38+
39+
channel := channels[0]
40+
41+
if err := models.InterruptSessionsForChannels(ctx, db, channelIDs); err != nil {
42+
return errors.Wrapf(err, "error interrupting sessions")
43+
}
44+
45+
err = msgio.ClearChannelCourierQueue(rc, channel)
46+
if err != nil {
47+
return errors.Wrapf(err, "error clearing courier queues")
48+
}
49+
50+
err = models.FailChannelMessages(ctx, db, orgID, t.ChannelID)
51+
if err != nil {
52+
return errors.Wrapf(err, "error failing channel messages")
53+
}
54+
55+
return nil
56+
57+
}
58+
59+
// Timeout is the maximum amount of time the task can run for
60+
func (*InterruptChannelTask) Timeout() time.Duration {
61+
return time.Hour
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package channels
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/nyaruka/gocommon/dbutil/assertdb"
8+
"github.com/nyaruka/mailroom/core/models"
9+
"github.com/nyaruka/mailroom/core/tasks/msgs"
10+
"github.com/nyaruka/mailroom/testsuite"
11+
"github.com/nyaruka/mailroom/testsuite/testdata"
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
func TestInterruptChannel(t *testing.T) {
17+
ctx, rt, db, rp := testsuite.Get()
18+
rc := rp.Get()
19+
defer rc.Close()
20+
21+
defer testsuite.Reset(testsuite.ResetData)
22+
23+
insertSession := func(org *testdata.Org, contact *testdata.Contact, flow *testdata.Flow, connectionID models.CallID) models.SessionID {
24+
sessionID := testdata.InsertWaitingSession(db, org, contact, models.FlowTypeMessaging, flow, connectionID, time.Now(), time.Now(), false, nil)
25+
26+
// give session one waiting run too
27+
testdata.InsertFlowRun(db, org, sessionID, contact, flow, models.RunStatusWaiting)
28+
return sessionID
29+
}
30+
31+
// twilio call
32+
twilioCallID := testdata.InsertCall(db, testdata.Org1, testdata.TwilioChannel, testdata.Alexandria)
33+
34+
// vonage call
35+
vonageCallID := testdata.InsertCall(db, testdata.Org1, testdata.VonageChannel, testdata.George)
36+
37+
sessionID1 := insertSession(testdata.Org1, testdata.Cathy, testdata.Favorites, models.NilCallID)
38+
sessionID2 := insertSession(testdata.Org1, testdata.George, testdata.Favorites, vonageCallID)
39+
sessionID3 := insertSession(testdata.Org1, testdata.Alexandria, testdata.Favorites, twilioCallID)
40+
41+
testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "how can we help", nil, models.MsgStatusPending, false)
42+
testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.VonageChannel, testdata.Bob, "this failed", nil, models.MsgStatusQueued, false)
43+
testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.VonageChannel, testdata.George, "no URN", nil, models.MsgStatusPending, false)
44+
testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.VonageChannel, testdata.George, "no URN", nil, models.MsgStatusErrored, false)
45+
testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.VonageChannel, testdata.George, "no URN", nil, models.MsgStatusFailed, false)
46+
47+
assertdb.Query(t, db, `SELECT status FROM flows_flowsession WHERE id = $1`, sessionID1).Returns("W")
48+
assertdb.Query(t, db, `SELECT status FROM flows_flowsession WHERE id = $1`, sessionID2).Returns("W")
49+
assertdb.Query(t, db, `SELECT status FROM flows_flowsession WHERE id = $1`, sessionID3).Returns("W")
50+
51+
assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and channel_id = $1`, testdata.VonageChannel.ID).Returns(1)
52+
assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and channel_id = $1`, testdata.TwilioChannel.ID).Returns(0)
53+
54+
// twilio channel task
55+
task := &InterruptChannelTask{
56+
ChannelID: testdata.TwilioChannel.ID,
57+
}
58+
59+
// execute it
60+
err := task.Perform(ctx, rt, testdata.Org1.ID)
61+
assert.NoError(t, err)
62+
63+
assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and channel_id = $1`, testdata.VonageChannel.ID).Returns(1)
64+
assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and channel_id = $1`, testdata.TwilioChannel.ID).Returns(1)
65+
66+
assertdb.Query(t, db, `SELECT status FROM flows_flowsession WHERE id = $1`, sessionID1).Returns("W")
67+
assertdb.Query(t, db, `SELECT status FROM flows_flowsession WHERE id = $1`, sessionID2).Returns("W")
68+
assertdb.Query(t, db, `SELECT status FROM flows_flowsession WHERE id = $1`, sessionID3).Returns("I")
69+
70+
testdata.InsertErroredOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "Hi", 1, time.Now().Add(-time.Hour), false)
71+
testdata.InsertErroredOutgoingMsg(db, testdata.Org1, testdata.VonageChannel, testdata.Bob, "Hi", 2, time.Now().Add(-time.Minute), false)
72+
testdata.InsertErroredOutgoingMsg(db, testdata.Org1, testdata.VonageChannel, testdata.Bob, "Hi", 2, time.Now().Add(-time.Minute), false)
73+
testdata.InsertErroredOutgoingMsg(db, testdata.Org1, testdata.VonageChannel, testdata.Bob, "Hi", 2, time.Now().Add(-time.Minute), true) // high priority
74+
75+
// just to create courier queues
76+
err = msgs.RetryErroredMessages(ctx, rt)
77+
require.NoError(t, err)
78+
79+
testsuite.AssertCourierQueues(t, map[string][]int{
80+
"msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": {1}, // twilio, bulk priority
81+
"msgs:19012bfd-3ce3-4cae-9bb9-76cf92c73d49|10/0": {2}, // vonage, bulk priority
82+
"msgs:19012bfd-3ce3-4cae-9bb9-76cf92c73d49|10/1": {1}, // vonage, high priority
83+
})
84+
85+
// vonage channel task
86+
task = &InterruptChannelTask{
87+
ChannelID: testdata.VonageChannel.ID,
88+
}
89+
90+
// execute it
91+
err = task.Perform(ctx, rt, testdata.Org1.ID)
92+
assert.NoError(t, err)
93+
94+
assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and channel_id = $1`, testdata.VonageChannel.ID).Returns(7)
95+
assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'F' and channel_id = $1`, testdata.TwilioChannel.ID).Returns(1)
96+
97+
assertdb.Query(t, db, `SELECT status FROM flows_flowsession WHERE id = $1`, sessionID1).Returns("W")
98+
assertdb.Query(t, db, `SELECT status FROM flows_flowsession WHERE id = $1`, sessionID2).Returns("I")
99+
assertdb.Query(t, db, `SELECT status FROM flows_flowsession WHERE id = $1`, sessionID3).Returns("I")
100+
101+
// vonage queues should be cleared
102+
testsuite.AssertCourierQueues(t, map[string][]int{
103+
"msgs:74729f45-7f29-4868-9dc4-90e491e3c7d8|10/0": {1}, // twilio, bulk priority
104+
})
105+
106+
}

0 commit comments

Comments
 (0)