Skip to content

Commit

Permalink
Merge pull request rapidpro#320 from nyaruka/throttle_queue_fix
Browse files Browse the repository at this point in the history
Fix throttle queue task
  • Loading branch information
rowanseymour authored Sep 11, 2024
2 parents 83d8388 + 15cf85b commit 2ce6757
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 6 deletions.
12 changes: 6 additions & 6 deletions core/tasks/starts/throttle_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ const (
)

func init() {
tasks.RegisterCron("trottle_queue", &ThrottleQueueCron{queue: tasks.StartsQueue})
tasks.RegisterCron("throttle_queue", &ThrottleQueueCron{Queue: tasks.StartsQueue})
}

type ThrottleQueueCron struct {
queue *queues.FairSorted
Queue *queues.FairSorted
}

func (c *ThrottleQueueCron) Next(last time.Time) time.Time {
Expand All @@ -36,24 +36,24 @@ func (c *ThrottleQueueCron) Run(ctx context.Context, rt *runtime.Runtime) (map[s
rc := rt.RP.Get()
defer rc.Close()

owners, err := c.queue.Owners(rc)
owners, err := c.Queue.Owners(rc)
if err != nil {
return nil, fmt.Errorf("error getting task owners: %w", err)
}

numPaused, numResumed := 0, 0

for ownerID := range owners {
for _, ownerID := range owners {
oa, err := models.GetOrgAssets(ctx, rt, models.OrgID(ownerID))
if err != nil {
return nil, fmt.Errorf("error org assets for org #%d: %w", ownerID, err)
}

if oa.Org().OutboxCount() >= outboxThreshold {
c.queue.Pause(rc, ownerID)
c.Queue.Pause(rc, ownerID)
numPaused++
} else {
c.queue.Resume(rc, ownerID)
c.Queue.Resume(rc, ownerID)
numResumed++
}
}
Expand Down
50 changes: 50 additions & 0 deletions core/tasks/starts/throttle_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package starts_test

import (
"testing"

"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/core/tasks/starts"
"github.com/nyaruka/mailroom/testsuite"
"github.com/nyaruka/mailroom/utils/queues"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestThrottleQueue(t *testing.T) {
ctx, rt := testsuite.Runtime()
rc := rt.RP.Get()
defer rc.Close()

defer testsuite.Reset(testsuite.ResetRedis | testsuite.ResetData)

queue := queues.NewFairSorted("test")
cron := &starts.ThrottleQueueCron{Queue: queue}
res, err := cron.Run(ctx, rt)
require.NoError(t, err)
assert.Equal(t, map[string]any{"paused": 0, "resumed": 0}, res)

queue.Push(rc, "type1", 1, "task1", queues.DefaultPriority)

res, err = cron.Run(ctx, rt)
require.NoError(t, err)
assert.Equal(t, map[string]any{"paused": 0, "resumed": 1}, res)

// make it look like org 1 has 20,000 messages in its outbox
rt.DB.MustExec(`INSERT INTO msgs_systemlabelcount(org_id, label_type, count, is_squashed) VALUES (1, 'O', 10050, FALSE)`)

models.FlushCache()

res, err = cron.Run(ctx, rt)
require.NoError(t, err)
assert.Equal(t, map[string]any{"paused": 1, "resumed": 0}, res)

// make it look like most of the inbox has cleared
rt.DB.MustExec(`INSERT INTO msgs_systemlabelcount(org_id, label_type, count, is_squashed) VALUES (1, 'O', -10000, FALSE)`)

models.FlushCache()

res, err = cron.Run(ctx, rt)
require.NoError(t, err)
assert.Equal(t, map[string]any{"paused": 0, "resumed": 1}, res)
}
1 change: 1 addition & 0 deletions testsuite/testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ DELETE FROM flows_flowrevision WHERE flow_id >= 30000;
DELETE FROM flows_flow WHERE id >= 30000;
DELETE FROM ivr_call;
DELETE FROM campaigns_eventfire;
DELETE FROM msgs_systemlabelcount;
DELETE FROM msgs_msg_labels;
DELETE FROM msgs_msg;
DELETE FROM msgs_broadcast_groups;
Expand Down

0 comments on commit 2ce6757

Please sign in to comment.