diff --git a/AUTHORS.md b/AUTHORS.md index 23d81409..2bd235de 100644 --- a/AUTHORS.md +++ b/AUTHORS.md @@ -10,6 +10,7 @@ Resque Scheduler authors - Brian Landau - Brian P O'Rourke - Carlos Antonio da Silva +- Chris Bisnett - Chris Kampmeier - DJ Hartman - Damon P. Cortesi diff --git a/README.md b/README.md index a92a8820..c996e3bd 100644 --- a/README.md +++ b/README.md @@ -213,7 +213,7 @@ since the jobs are stored in a redis sorted set (zset). I can't imagine this being an issue for someone since redis is stupidly fast even at log(n), but full disclosure is always best. -#### Removing Delayed jobs +#### Removing Delayed Jobs If you have the need to cancel a delayed job, you can do like so: @@ -257,6 +257,47 @@ Resque.enqueue_delayed_selection { |args| args[0]['account_id'] == current_accou Resque.enqueue_delayed_selection { |args| args[0]['user_id'] == current_user.id } ``` +#### Updating Delayed Jobs + +Previously delayed jobs may be delayed even further into the future like so: + +```ruby +# after you've enqueued a job like: +Resque.enqueue_at(1.minute.from_now, SendNotifications, :user_id => current_user.id) +# delay running the job until two minutes from now +Resque.delay_or_enqueue_at(2.minutes.from_now, SendNotifications, :user_id => current_user.id) +``` + +You don't need to worry if a matching job has already been queued, because if no matching jobs are found a new job is created and enqueued as if you had called `enqueue_at`. This means you don't need any special conditionals to know if a job has already been queued. You simply create the job like so: + +```ruby +Resque.delay_or_enqueue_at(1.minute.from_now, SendNotifications, :user_id => current_user.id) +``` + +If multiple matching jobs are found, all of the matching jobs will be updated to have the same timestamp even if their original timestamps were not the same. + +```ruby +# enqueue multiple jobs with different delay timestamps +Resque.enqueue_at(1.minute.from_now, SendNotifications, :user_id => current_user.id) +Resque.enqueue_at(2.minutes.from_now, SendNotifications, :user_id => current_user.id) + +# delay running the two jobs until 5 minutes from now +Resque.delay_or_enqueue_at(5.minutes.from_now, SendNotifications, :user_id => current_user.id) +``` + +The most useful case for increasing the delay of an already delayed job is to batch together work based on multiple events. For example, if you wanted to send a notification email to a user when an event triggers but didn't want to send 10 emails if many events happened within a short period, you could use this technique to delay the noficication email until no events have triggered for a period of time. This way you could send 1 email containing the 10 notifications once no events have triggered for 2 minutes. You could implement this like so: + +```ruby +# Send a notification when an event is created. +# app/models/event.rb +after_commit on: :create do + Resque.delay_or_enqueue_in(2.minutes, SendNotifications, :user_id => user.id) +end +``` + +When the first event is created a job will be scheduled to send unsent notifications to the associated user. If another event is created within the 2 minute window, the timer will be reset to 2 minutes. This will continue as long as new events are created for the specific user before the 2 minute timer expires. Once the timer expires and the job is scheduled any new events that are created will schedule a new job and start the process over. By adjusting the window you can tweak the trade-off between sending notification emails quickly after an event happens and sending fewer emails. + + ### Scheduled Jobs (Recurring Jobs) Scheduled (or recurring) jobs are logically no different than a standard cron diff --git a/lib/resque/scheduler/delaying_extensions.rb b/lib/resque/scheduler/delaying_extensions.rb index ee000296..b4b4a661 100644 --- a/lib/resque/scheduler/delaying_extensions.rb +++ b/lib/resque/scheduler/delaying_extensions.rb @@ -63,6 +63,24 @@ def enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args) end + # Update the delayed timestamp of any matching delayed jobs or enqueue a + # new job if no matching jobs are found. Returns the number of delayed or + # enqueued jobs. + def delay_or_enqueue_at(timestamp, klass, *args) + count = remove_delayed(klass, *args) + count = 1 if count == 0 + + count.times do + enqueue_at(timestamp, klass, *args) + end + end + + # Identical to +delay_or_enqueue_at+, except it takes + # number_of_seconds_from_now instead of a timestamp + def delay_or_enqueue_in(number_of_seconds_from_now, klass, *args) + delay_or_enqueue_at(Time.now + number_of_seconds_from_now, klass, *args) + end + # Used internally to stuff the item into the schedule sorted list. # +timestamp+ can be either in seconds or a datetime object Insertion # if O(log(n)). Returns true if it's the first job to be scheduled at diff --git a/test/delayed_queue_test.rb b/test/delayed_queue_test.rb index 8a63db50..8cd6e2ec 100644 --- a/test/delayed_queue_test.rb +++ b/test/delayed_queue_test.rb @@ -150,6 +150,80 @@ 'job timestamps should have one entry now') end + test 'delay_or_enqueue_at enqueues a job when none match' do + timestamp = Time.now + 600 # 10 minutes from now + encoded_job = Resque.encode( + class: SomeIvarJob.to_s, + args: ['path'], + queue: Resque.queue_from_class(SomeIvarJob) + ) + + assert_equal(0, Resque.redis.llen("delayed:#{timestamp.to_i}").to_i, + 'delayed queue should be empty to start') + assert_equal(0, Resque.redis.scard("timestamps:#{encoded_job}"), + 'job timestamps set should be empty to start') + + Resque.delay_or_enqueue_at(timestamp, SomeIvarJob, 'path') + + # Confirm the correct keys were added + assert_equal(1, Resque.redis.llen("delayed:#{timestamp.to_i}").to_i, + 'delayed queue should have one entry now') + assert_equal(1, Resque.redis.scard("timestamps:#{encoded_job}"), + 'job timestamps should have one entry now') + assert_equal(1, Resque.redis.zcard(:delayed_queue_schedule), + 'The delayed_queue_schedule should have 1 entry now') + end + + test 'delay_or_enqueue_at updates the timestamp for a matching job' do + timestamp = Time.now + 600 # 10 minutes from now + encoded_job = Resque.encode( + class: SomeIvarJob.to_s, + args: ['path'], + queue: Resque.queue_from_class(SomeIvarJob) + ) + + Resque.enqueue_at(timestamp, SomeIvarJob, 'path') + + assert_equal(1, Resque.redis.llen("delayed:#{timestamp.to_i}").to_i, + 'delayed queue should have one entry now') + assert_equal(1, Resque.redis.scard("timestamps:#{encoded_job}"), + 'job timestamps should have one entry now') + + new_timestamp = Time.now + 700 + assert_equal(1, + Resque.delay_or_enqueue_at(new_timestamp, SomeIvarJob, 'path')) + assert_equal(1, Resque.redis.scard("timestamps:#{encoded_job}"), + 'job timestamps should still have only 1 entry') + + assert_equal(0, Resque.redis.llen("delayed:#{timestamp.to_i}").to_i, + 'delayed queue no longer has old entry') + assert_equal(1, Resque.redis.llen("delayed:#{new_timestamp.to_i}").to_i, + 'delayed queue should have new entry') + end + + test 'delay_or_enqueue_at updates multiple matching jobs' do + encoded_job = Resque.encode( + class: SomeIvarJob.to_s, + args: ['path'], + queue: Resque.queue_from_class(SomeIvarJob) + ) + + Resque.enqueue_at(Time.now + 600, SomeIvarJob, 'path') + Resque.enqueue_at(Time.now + 660, SomeIvarJob, 'path') + + assert_equal(2, Resque.redis.scard("timestamps:#{encoded_job}"), + 'job timestamps should have two entries now') + + new_timestamp = Time.now + 1200 + assert_equal(2, + Resque.delay_or_enqueue_at(new_timestamp, SomeIvarJob, 'path'), + 'should have updated two jobs') + assert_equal(1, Resque.redis.scard("timestamps:#{encoded_job}"), + 'job timestamps should have one entry now') + assert_equal(2, Resque.redis.llen("delayed:#{new_timestamp.to_i}").to_i, + 'delayed queue should have two entries') + end + test 'empty delayed_queue_peek returns empty array' do assert_equal([], Resque.delayed_queue_peek(0, 20)) end