Skip to content

Commit e369f05

Browse files
committed
Fail messages without loading them
1 parent 4897d39 commit e369f05

File tree

4 files changed

+27
-110
lines changed

4 files changed

+27
-110
lines changed

core/models/msgs.go

+18-62
Original file line numberDiff line numberDiff line change
@@ -539,45 +539,6 @@ func GetMessagesForRetry(ctx context.Context, db Queryer) ([]*Msg, error) {
539539
return loadMessages(ctx, db, loadMessagesForRetrySQL)
540540
}
541541

542-
var loadMessagesForInterruptChannelSQL = `
543-
SELECT
544-
id,
545-
broadcast_id,
546-
uuid,
547-
text,
548-
created_on,
549-
direction,
550-
status,
551-
visibility,
552-
msg_count,
553-
error_count,
554-
next_attempt,
555-
failed_reason,
556-
coalesce(high_priority, FALSE) as high_priority,
557-
external_id,
558-
attachments,
559-
metadata,
560-
channel_id,
561-
contact_id,
562-
contact_urn_id,
563-
org_id,
564-
topup_id
565-
FROM
566-
msgs_msg
567-
WHERE
568-
org_id = $1 AND
569-
direction = 'O' AND
570-
channel_id = $2 AND
571-
status = ANY($3)
572-
ORDER BY
573-
id ASC
574-
`
575-
576-
func GetChannelMessagesToInterrupt(ctx context.Context, db Queryer, orgID OrgID, channelID ChannelID) ([]*Msg, error) {
577-
statuses := []MsgStatus{MsgStatusPending, MsgStatusErrored, MsgStatusQueued}
578-
return loadMessages(ctx, db, loadMessagesForInterruptChannelSQL, orgID, channelID, pq.Array(statuses))
579-
}
580-
581542
func loadMessages(ctx context.Context, db Queryer, sql string, params ...interface{}) ([]*Msg, error) {
582543
rows, err := db.QueryxContext(ctx, sql, params...)
583544
if err != nil {
@@ -1312,36 +1273,31 @@ func ResendMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa *OrgAsse
13121273
return resent, nil
13131274
}
13141275

1315-
const sqlUpdateMsgStatusFailed = `
1316-
UPDATE msgs_msg m
1317-
SET status = 'F', modified_on = NOW()
1318-
WHERE id = ANY($1)`
1319-
1320-
// FailMessages prepares messages for failing and marking them as FAILED
1321-
func FailMessages(ctx context.Context, db Queryer, rp *redis.Pool, oa *OrgAssets, msgs []*Msg) ([]*Msg, error) {
1322-
1323-
// for the bulk db updates
1324-
fails := make([]MsgID, 0, len(msgs))
1325-
failedMsgs := make([]*Msg, 0, len(msgs))
1276+
const sqlFailChannelMessages = `
1277+
WITH rows AS (
1278+
SELECT id FROM msgs_msg
1279+
WHERE org_id = $1 AND direction = 'O' AND channel_id = $2 AND status = ANY($3) LIMIT 1000
1280+
)
1281+
UPDATE msgs_msg SET status = 'F', modified_on = NOW() WHERE id IN (SELECT id FROM rows)`
13261282

1327-
for _, msg := range msgs {
1328-
if msg.m.Status == MsgStatusQueued || msg.m.Status == MsgStatusPending || msg.m.Status == MsgStatusErrored {
1329-
msg.m.Status = MsgStatusFailed
1330-
fails = append(fails, MsgID(msg.m.ID))
1331-
failedMsgs = append(failedMsgs, msg)
1332-
}
1283+
func FailChannelMessages(ctx context.Context, db Queryer, orgID OrgID, channelID ChannelID) error {
1284+
if channelID == NilChannelID {
1285+
return nil
13331286
}
13341287

1335-
for _, idBatch := range chunkSlice(fails, 1000) {
1288+
statuses := []MsgStatus{MsgStatusPending, MsgStatusErrored, MsgStatusQueued}
1289+
for {
13361290
// and update the messages as FAILED
1337-
_, err := db.ExecContext(ctx, sqlUpdateMsgStatusFailed, pq.Array(idBatch))
1291+
res, err := db.ExecContext(ctx, sqlFailChannelMessages, orgID, channelID, pq.Array(statuses))
13381292
if err != nil {
1339-
return nil, errors.Wrapf(err, "error failing messages")
1293+
return err
1294+
}
1295+
rows, _ := res.RowsAffected()
1296+
if rows == 0 {
1297+
break
13401298
}
13411299
}
1342-
1343-
return failedMsgs, nil
1344-
1300+
return nil
13451301
}
13461302

13471303
// MarkBroadcastSent marks the passed in broadcast as sent

core/models/msgs_test.go

+6-35
Original file line numberDiff line numberDiff line change
@@ -431,29 +431,6 @@ func TestGetMessagesByID(t *testing.T) {
431431
assert.Equal(t, "in 1", msgs[0].Text())
432432
}
433433

434-
func TestGetChannelMessagesForInterrupt(t *testing.T) {
435-
ctx, _, db, _ := testsuite.Get()
436-
437-
defer testsuite.Reset(testsuite.ResetData)
438-
439-
testdata.InsertIncomingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "in 1", models.MsgStatusHandled)
440-
testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "out 1", []utils.Attachment{"image/jpeg:hi.jpg"}, models.MsgStatusSent, false)
441-
testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "out 2", nil, models.MsgStatusSent, false)
442-
testdata.InsertOutgoingMsg(db, testdata.Org2, testdata.Org2Channel, testdata.Org2Contact, "out 3", nil, models.MsgStatusSent, false)
443-
testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "out 3", nil, models.MsgStatusErrored, false)
444-
testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "out 4", nil, models.MsgStatusPending, false)
445-
testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "out 5", nil, models.MsgStatusQueued, false)
446-
447-
msgs, err := models.GetChannelMessagesToInterrupt(ctx, db, testdata.Org1.ID, testdata.TwilioChannel.ID)
448-
449-
// should only return the outgoing messages for this org
450-
require.NoError(t, err)
451-
assert.Equal(t, 3, len(msgs))
452-
assert.Equal(t, "out 3", msgs[0].Text())
453-
assert.Equal(t, "out 4", msgs[1].Text())
454-
assert.Equal(t, "out 5", msgs[2].Text())
455-
}
456-
457434
func TestResendMessages(t *testing.T) {
458435
ctx, rt, db, rp := testsuite.Get()
459436

@@ -512,35 +489,29 @@ func TestResendMessages(t *testing.T) {
512489
}
513490

514491
func TestFailMessages(t *testing.T) {
515-
ctx, rt, db, rp := testsuite.Get()
492+
ctx, _, db, _ := testsuite.Get()
516493

517494
defer testsuite.Reset(testsuite.ResetAll)
518495

519-
oa, err := models.GetOrgAssets(ctx, rt, testdata.Org1.ID)
520-
require.NoError(t, err)
521-
522496
out1 := testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "hi", nil, models.MsgStatusPending, false)
523497
out2 := testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Bob, "hi", nil, models.MsgStatusErrored, false)
524498
out3 := testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "hi", nil, models.MsgStatusFailed, false)
525499
out4 := testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.Cathy, "hi", nil, models.MsgStatusQueued, false)
526-
out5 := testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.George, "hi", nil, models.MsgStatusQueued, false)
500+
testdata.InsertOutgoingMsg(db, testdata.Org1, testdata.TwilioChannel, testdata.George, "hi", nil, models.MsgStatusQueued, false)
527501

528502
ids := []models.MsgID{models.MsgID(out1.ID()), models.MsgID(out2.ID()), models.MsgID(out3.ID()), models.MsgID(out4.ID())}
529-
530-
msgs, err := models.GetMessagesByID(ctx, db, testdata.Org1.ID, models.DirectionOut, ids)
531-
require.NoError(t, err)
503+
println(ids)
532504

533505
now := dates.Now()
534506

535507
// fail the msgs
536-
failedMsgs, err := models.FailMessages(ctx, db, rp, oa, msgs)
508+
err := models.FailChannelMessages(ctx, db, testdata.Org1.ID, testdata.TwilioChannel.ID)
537509
require.NoError(t, err)
538510

539-
assert.Len(t, failedMsgs, 3)
511+
//assert.Len(t, failedMsgs, 3)
540512

541-
assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'F' AND modified_on > $1`, now).Returns(3)
513+
assertdb.Query(t, db, `SELECT count(*) FROM msgs_msg WHERE status = 'F' AND modified_on > $1`, now).Returns(4)
542514
assertdb.Query(t, db, `SELECT status FROM msgs_msg WHERE id = $1`, out3.ID()).Columns(map[string]interface{}{"status": "F"})
543-
assertdb.Query(t, db, `SELECT status FROM msgs_msg WHERE id = $1`, out5.ID()).Columns(map[string]interface{}{"status": "Q"})
544515

545516
}
546517

core/msgio/courier.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ local queueType, queueName, tps = KEYS[1], KEYS[2], tonumber(KEYS[3])
125125
local queueKey = queueType .. ":" .. queueName .. "|" .. tps
126126
127127
-- clear the sorted sets for the key
128-
redis.call("ZREMRANGEBYSCORE", queueKey .. "/1", "-inf", "inf")
129-
redis.call("ZREMRANGEBYSCORE", queueKey .. "/0", "-inf", "inf")
128+
redis.call("DEL", queueKey .. "/1")
129+
redis.call("DEL", queueKey .. "/0")
130130
131131
-- reset queue to zero
132132
redis.call("ZADD", queueType .. ":active", 0, queueKey)

core/tasks/channels/interrupt_channel.go

+1-11
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,6 @@ func (t *InterruptChannelTask) Perform(ctx context.Context, rt *runtime.Runtime,
2828
rc := rt.RP.Get()
2929
defer rc.Close()
3030

31-
oa, err := models.GetOrgAssets(ctx, rt, orgID)
32-
if err != nil {
33-
return err
34-
}
35-
3631
channelIDs := []models.ChannelID{t.ChannelID}
3732

3833
channels, err := models.GetChannelsByID(ctx, db, channelIDs)
@@ -46,17 +41,12 @@ func (t *InterruptChannelTask) Perform(ctx context.Context, rt *runtime.Runtime,
4641
return err
4742
}
4843

49-
msgs, err := models.GetChannelMessagesToInterrupt(ctx, db, orgID, t.ChannelID)
50-
if err != nil {
51-
return err
52-
}
53-
5444
err = msgio.ClearChannelCourierQueue(rc, channel)
5545
if err != nil {
5646
return err
5747
}
5848

59-
_, err = models.FailMessages(ctx, db, rt.RP, oa, msgs)
49+
err = models.FailChannelMessages(ctx, db, orgID, t.ChannelID)
6050

6151
return err
6252

0 commit comments

Comments
 (0)