Skip to content

Commit

Permalink
Fix stale job scheduling
Browse files Browse the repository at this point in the history
Our remote stale job was not enqueuing correctly from the scheduler. This is because we had no way to supply arguments to scheduled jobs and my own silly job argument validator could not handle that.

Now we can supply arguments
  • Loading branch information
atruskie committed Jan 14, 2025
1 parent e0fa818 commit bdf451e
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 14 deletions.
17 changes: 13 additions & 4 deletions lib/gems/baw-workers/lib/baw_workers/active_job/recurring.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ module ActiveJob
# @example
# class MyJob < BawWorkers::Jobs::ApplicationJob
# queue_as :default
# recurring_at '0 0 * * *' # every day at midnight
# def perform(*args)
# # do something
# recurring_at '0 0 * * *', args: [ 1 ]
#
# def perform(an_argument)
# # prints `1` every day at midnight
# puts an_argument
# end
# end
#
Expand All @@ -32,14 +34,20 @@ module ClassMethods
# @!attribute [rw] recurring_cron_schedule
# @return [string] The cron schedule for this recurring job.

# @!attribute [rw] recurring_cron_schedule_args
# @return [Array] The arguments the job will be scheduled with

# Sets the cron schedule for this job.
# @param [String] cron_schedule the cron schedule for this job. This is a 6-star schedule.
# @param [Array] args the arguments the job will be scheduled with
# @raise [ArgumentError] if cron_schedule is not a string
# @return [void]
def recurring_at(cron_schedule)
def recurring_at(cron_schedule, args: [])
raise ArgumentError, 'cron_schedule must be a string' unless cron_schedule.is_a?(String)
raise ArgumentError, 'args must be an array' unless args.is_a?(Array)

self.recurring_cron_schedule = cron_schedule
self.recurring_cron_schedule_args = args
end

# override resque-scheduler's default behaviour to adapt it to work with active job.
Expand All @@ -54,6 +62,7 @@ def scheduled(_queue, _klass, *args)

def __setup_recurring
class_attribute :recurring_cron_schedule, instance_accessor: false
class_attribute :recurring_cron_schedule_args, instance_accessor: false
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class RemoteStaleCheckJob < BawWorkers::Jobs::ApplicationJob
queue_as Settings.actions.analysis_stale_check.queue
perform_expects [NilClass, Integer]

recurring_at Settings.actions.analysis_stale_check.schedule
recurring_at Settings.actions.analysis_stale_check.schedule, args: [nil]

# only allow one of these to run at once.
# constrained resource: don't want two of these running at once otherwise
Expand All @@ -29,7 +29,7 @@ class RemoteStaleCheckJob < BawWorkers::Jobs::ApplicationJob
push_message(error.message)
end

def perform(min_age_seconds = nil)
def perform(min_age_seconds)
# first check if we can contact the remote queue
failed!('Could not connect to remote queue.') unless batch.remote_connected?

Expand Down
15 changes: 8 additions & 7 deletions lib/gems/baw-workers/lib/baw_workers/resque_api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def create_all_schedules
BawWorkers::Config.logger_worker.info(
'rake_task:baw:worker:run_scheduler adding recurring job',
job_class: job_class.name,
schedule: job_class.recurring_cron_schedule
schedule: job_class.recurring_cron_schedule,
args: job_class.recurring_cron_schedule_args
)

# add the job to the resque scheduler
Expand All @@ -51,7 +52,8 @@ def create_all_schedules
{
class: job_class.name,
cron: job_class.recurring_cron_schedule,
queue: job_class.queue_name
queue: job_class.queue_name,
args: job_class.recurring_cron_schedule_args
# We don't persist the schedule because we set it every time
# we start up the scheduler (this very process).
#persist:
Expand All @@ -70,10 +72,9 @@ def clear_all_schedules
# Get all currently queued jobs.
# @return [Array<Hash>]
def jobs_queued
jobs = []
Resque.queues.each do |queue|
jobs.push(jobs_queued_in(queue))
end
jobs = Resque.queues.map { |queue|
jobs_queued_in(queue)
}
jobs.flatten
end

Expand Down Expand Up @@ -258,7 +259,7 @@ def queues_being_worked_on

def queue_names(env = BawApp.env)
env_regex = Regexp.new(env)
Resque.queues.filter { |queue| queue =~ env_regex }
Resque.queues.grep(env_regex)
end

def clear_queues(env = BawApp.env)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
)

pause_all_jobs

submit_pbs_jobs_as_held

def get_last_status(expected_count)
Expand All @@ -23,6 +22,16 @@ def get_last_status(expected_count)
).max_by(&:time)
end

it 'can be performed later' do
BawWorkers::Jobs::Analysis::RemoteStaleCheckJob.perform_later!(nil)

expect_enqueued_jobs(1, of_class: BawWorkers::Jobs::Analysis::RemoteStaleCheckJob)

perform_jobs(count: 1)

expect_performed_jobs(1, of_class: BawWorkers::Jobs::Analysis::RemoteStaleCheckJob)
end

stepwise 'can resolve stale jobs' do
after do
Timecop.return
Expand Down

0 comments on commit bdf451e

Please sign in to comment.