Skip to content

Commit

Permalink
Delay an existing job or schedule a new job
Browse files Browse the repository at this point in the history
Add a helper function that will remove any matching delayed jobs and
reschedule them with the specified delay or schedule a new job if there
are no matching delayed jobs.
  • Loading branch information
cbisnett committed May 17, 2018
1 parent 78c5325 commit cdca208
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 1 deletion.
1 change: 1 addition & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Resque Scheduler authors
- Brian Landau
- Brian P O'Rourke
- Carlos Antonio da Silva
- Chris Bisnett
- Chris Kampmeier
- DJ Hartman
- Damon P. Cortesi
Expand Down
43 changes: 42 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ since the jobs are stored in a redis sorted set (zset). I can't imagine this
being an issue for someone since redis is stupidly fast even at log(n), but full
disclosure is always best.

#### Removing Delayed jobs
#### Removing Delayed Jobs

If you have the need to cancel a delayed job, you can do like so:

Expand Down Expand Up @@ -257,6 +257,47 @@ Resque.enqueue_delayed_selection { |args| args[0]['account_id'] == current_accou
Resque.enqueue_delayed_selection { |args| args[0]['user_id'] == current_user.id }
```

#### Updating Delayed Jobs

Previously delayed jobs may be delayed even further into the future like so:

```ruby
# after you've enqueued a job like:
Resque.enqueue_at(1.minute.from_now, SendNotifications, :user_id => current_user.id)
# delay running the job until two minutes from now
Resque.delay_or_enqueue_at(2.minutes.from_now, SendNotifications, :user_id => current_user.id)
```

You don't need to worry if a matching job has already been queued, because if no matching jobs are found a new job is created and enqueued as if you had called `enqueue_at`. This means you don't need any special conditionals to know if a job has already been queued. You simply create the job like so:

```ruby
Resque.delay_or_enqueue_at(1.minute.from_now, SendNotifications, :user_id => current_user.id)
```

If multiple matching jobs are found, all of the matching jobs will be updated to have the same timestamp even if their original timestamps were not the same.

```ruby
# enqueue multiple jobs with different delay timestamps
Resque.enqueue_at(1.minute.from_now, SendNotifications, :user_id => current_user.id)
Resque.enqueue_at(2.minutes.from_now, SendNotifications, :user_id => current_user.id)

# delay running the two jobs until 5 minutes from now
Resque.delay_or_enqueue_at(5.minutes.from_now, SendNotifications, :user_id => current_user.id)
```

The most useful case for increasing the delay of an already delayed job is to batch together work based on multiple events. For example, if you wanted to send a notification email to a user when an event triggers but didn't want to send 10 emails if many events happened within a short period, you could use this technique to delay the noficication email until no events have triggered for a period of time. This way you could send 1 email containing the 10 notifications once no events have triggered for 2 minutes. You could implement this like so:

```ruby
# Send a notification when an event is created.
# app/models/event.rb
after_commit on: :create do
Resque.delay_or_enqueue_in(2.minutes, SendNotifications, :user_id => user.id)
end
```

When the first event is created a job will be scheduled to send unsent notifications to the associated user. If another event is created within the 2 minute window, the timer will be reset to 2 minutes. This will continue as long as new events are created for the specific user before the 2 minute timer expires. Once the timer expires and the job is scheduled any new events that are created will schedule a new job and start the process over. By adjusting the window you can tweak the trade-off between sending notification emails quickly after an event happens and sending fewer emails.


### Scheduled Jobs (Recurring Jobs)

Scheduled (or recurring) jobs are logically no different than a standard cron
Expand Down
18 changes: 18 additions & 0 deletions lib/resque/scheduler/delaying_extensions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@ def enqueue_in_with_queue(queue, number_of_seconds_from_now,
klass, *args)
end

# Update the delayed timestamp of any matching delayed jobs or enqueue a
# new job if no matching jobs are found. Returns the number of delayed or
# enqueued jobs.
def delay_or_enqueue_at(timestamp, klass, *args)
count = remove_delayed(klass, *args)
count = 1 if count == 0

count.times do
enqueue_at(timestamp, klass, *args)
end
end

# Identical to +delay_or_enqueue_at+, except it takes
# number_of_seconds_from_now instead of a timestamp
def delay_or_enqueue_in(number_of_seconds_from_now, klass, *args)
delay_or_enqueue_at(Time.now + number_of_seconds_from_now, klass, *args)
end

# Used internally to stuff the item into the schedule sorted list.
# +timestamp+ can be either in seconds or a datetime object Insertion
# if O(log(n)). Returns true if it's the first job to be scheduled at
Expand Down
74 changes: 74 additions & 0 deletions test/delayed_queue_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,80 @@
'job timestamps should have one entry now')
end

test 'delay_or_enqueue_at enqueues a job when none match' do
timestamp = Time.now + 600 # 10 minutes from now
encoded_job = Resque.encode(
class: SomeIvarJob.to_s,
args: ['path'],
queue: Resque.queue_from_class(SomeIvarJob)
)

assert_equal(0, Resque.redis.llen("delayed:#{timestamp.to_i}").to_i,
'delayed queue should be empty to start')
assert_equal(0, Resque.redis.scard("timestamps:#{encoded_job}"),
'job timestamps set should be empty to start')

Resque.delay_or_enqueue_at(timestamp, SomeIvarJob, 'path')

# Confirm the correct keys were added
assert_equal(1, Resque.redis.llen("delayed:#{timestamp.to_i}").to_i,
'delayed queue should have one entry now')
assert_equal(1, Resque.redis.scard("timestamps:#{encoded_job}"),
'job timestamps should have one entry now')
assert_equal(1, Resque.redis.zcard(:delayed_queue_schedule),
'The delayed_queue_schedule should have 1 entry now')
end

test 'delay_or_enqueue_at updates the timestamp for a matching job' do
timestamp = Time.now + 600 # 10 minutes from now
encoded_job = Resque.encode(
class: SomeIvarJob.to_s,
args: ['path'],
queue: Resque.queue_from_class(SomeIvarJob)
)

Resque.enqueue_at(timestamp, SomeIvarJob, 'path')

assert_equal(1, Resque.redis.llen("delayed:#{timestamp.to_i}").to_i,
'delayed queue should have one entry now')
assert_equal(1, Resque.redis.scard("timestamps:#{encoded_job}"),
'job timestamps should have one entry now')

new_timestamp = Time.now + 700
assert_equal(1,
Resque.delay_or_enqueue_at(new_timestamp, SomeIvarJob, 'path'))
assert_equal(1, Resque.redis.scard("timestamps:#{encoded_job}"),
'job timestamps should still have only 1 entry')

assert_equal(0, Resque.redis.llen("delayed:#{timestamp.to_i}").to_i,
'delayed queue no longer has old entry')
assert_equal(1, Resque.redis.llen("delayed:#{new_timestamp.to_i}").to_i,
'delayed queue should have new entry')
end

test 'delay_or_enqueue_at updates multiple matching jobs' do
encoded_job = Resque.encode(
class: SomeIvarJob.to_s,
args: ['path'],
queue: Resque.queue_from_class(SomeIvarJob)
)

Resque.enqueue_at(Time.now + 600, SomeIvarJob, 'path')
Resque.enqueue_at(Time.now + 660, SomeIvarJob, 'path')

assert_equal(2, Resque.redis.scard("timestamps:#{encoded_job}"),
'job timestamps should have two entries now')

new_timestamp = Time.now + 1200
assert_equal(2,
Resque.delay_or_enqueue_at(new_timestamp, SomeIvarJob, 'path'),
'should have updated two jobs')
assert_equal(1, Resque.redis.scard("timestamps:#{encoded_job}"),
'job timestamps should have one entry now')
assert_equal(2, Resque.redis.llen("delayed:#{new_timestamp.to_i}").to_i,
'delayed queue should have two entries')
end

test 'empty delayed_queue_peek returns empty array' do
assert_equal([], Resque.delayed_queue_peek(0, 20))
end
Expand Down

0 comments on commit cdca208

Please sign in to comment.