Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infer interruption handler from job #96

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion job-iteration.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

lib = File.expand_path("../lib", __FILE__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
require "job-iteration/version"
require "job_iteration/version"

Gem::Specification.new do |spec|
spec.name = "job-iteration"
Expand Down
64 changes: 0 additions & 64 deletions lib/job-iteration.rb

This file was deleted.

18 changes: 0 additions & 18 deletions lib/job-iteration/integrations/resque.rb

This file was deleted.

15 changes: 0 additions & 15 deletions lib/job-iteration/integrations/sidekiq.rb

This file was deleted.

35 changes: 35 additions & 0 deletions lib/job_iteration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# frozen_string_literal: true

require_relative "./job_iteration/version"
require_relative "./job_iteration/enumerator_builder"
require_relative "./job_iteration/iteration"
require_relative "./job_iteration/integrations"

module JobIteration
extend self

# Use this to _always_ interrupt the job after it's been running for more than N seconds.
# @example
#
# JobIteration.max_job_runtime = 5.minutes
#
# This setting will make it to always interrupt a job after it's been iterating for 5 minutes.
# Defaults to nil which means that jobs will not be interrupted except on termination signal.
attr_accessor :max_job_runtime

# Set if you want to use your own enumerator builder instead of default EnumeratorBuilder.
# @example
#
# class MyOwnBuilder < JobIteration::EnumeratorBuilder
# # ...
# end
#
# JobIteration.enumerator_builder = MyOwnBuilder
attr_accessor :enumerator_builder
self.enumerator_builder = JobIteration::EnumeratorBuilder

# Used internally for hooking into job processing frameworks like Sidekiq and Resque.
def self.load_interruption_integration(integration)
JobIteration::Integrations.load(integration)
end
end
21 changes: 21 additions & 0 deletions lib/job_iteration/integrations.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# frozen_string_literal: true

module JobIteration
module Integrations
IntegrationLoadError = Class.new(StandardError)

extend ActiveSupport::Autoload

autoload :SidekiqIntegration
autoload :ResqueIntegration
Comment on lines +7 to +10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what it's worth, this could just use Ruby's built-in autoloading, which wouldn't require the rename.

Suggested change
extend ActiveSupport::Autoload
autoload :SidekiqIntegration
autoload :ResqueIntegration
autoload :SidekiqIntegration, "#{__dir__}/integrations/sidekiq_integration.rb"
autoload :ResqueIntegration, "#{__dir__}/integrations/resque_integration.rb"

From there, a separate PR could propose the rename and use of ActiveSupport::Autoload, though I'm not sure there is much benefit, since the autoloading is so limited.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh — I didn’t even consider this, thanks


class << self
def load(integration)
integration = const_get(integration.to_s.camelize << "Integration")
integration.new
rescue NameError
raise IntegrationLoadError, "#{integration} integration is not supported."
end
end
end
end
23 changes: 23 additions & 0 deletions lib/job_iteration/integrations/resque_integration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# frozen_string_literal: true

require "resque"

module JobIteration
module Integrations
class ResqueIntegration
module ResqueIterationExtension # @private
def initialize(*) # @private
$resque_worker = self
super
end
end

# The patch is required in order to call shutdown? on a Resque::Worker instance
Resque::Worker.prepend(ResqueIterationExtension)

def stopping?
$resque_worker.try!(:shutdown?)
end
end
end
end
17 changes: 17 additions & 0 deletions lib/job_iteration/integrations/sidekiq_integration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

require "sidekiq"

module JobIteration
module Integrations
class SidekiqIntegration
def stopping?
if defined?(Sidekiq::CLI) && Sidekiq::CLI.instance
Sidekiq::CLI.instance.launcher.stopping?
else
false
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,16 @@ def output_interrupt_summary
logger.info(Kernel.format(message, times_interrupted, total_time))
end

def interruption_integration
JobIteration.load_interruption_integration(self.class.queue_adapter_name)
end

def job_should_exit?
if ::JobIteration.max_job_runtime && start_time && (Time.now.utc - start_time) > ::JobIteration.max_job_runtime
return true
end

JobIteration.interruption_adapter.call || (defined?(super) && super)
interruption_integration.stopping? || (defined?(super) && super)
end

def run_complete_callbacks?(completed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def initialize(stop_after_count)
@calls = 0
end

def call
def stopping?
@calls += 1
(@calls % @stop_after_count) == 0
end
Expand All @@ -23,7 +23,7 @@ def call
# MyJob.perform_now
# end
def iterate_exact_times(n_times)
JobIteration.stubs(:interruption_adapter).returns(StoppingSupervisor.new(n_times.size))
JobIteration.stubs(:load_interruption_integration).returns(StoppingSupervisor.new(n_times.size))
end

# Stubs interruption adapter to interrupt the job after every sing iteration.
Expand All @@ -46,8 +46,8 @@ def mark_job_worker_as_interrupted

def stub_shutdown_adapter_to_return(value)
adapter = mock
adapter.stubs(:call).returns(value)
JobIteration.stubs(:interruption_adapter).returns(adapter)
adapter.stubs(:stopping?).returns(value)
JobIteration.stubs(:load_interruption_integration).returns(adapter)
end
end
end
File renamed without changes.
15 changes: 1 addition & 14 deletions test/integration/integrations_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,13 @@
require "open3"

class IntegrationsTest < ActiveSupport::TestCase
test "will prevent loading two integrations" do
with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do
rubby = <<~RUBBY
require 'bundler/setup'
require 'job-iteration'
RUBBY
_stdout, stderr, status = run_ruby(rubby)

assert_equal false, status.success?
assert_match(/resque integration has already been loaded, but sidekiq is also available/, stderr)
end
end

test "successfully loads one (resque) integration" do
with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do
rubby = <<~RUBBY
require 'bundler/setup'
# Remove sidekiq, only resque will be left
$LOAD_PATH.delete_if { |p| p =~ /sidekiq/ }
require 'job-iteration'
require 'job_iteration'
RUBBY
_stdout, _stderr, status = run_ruby(rubby)

Expand Down
14 changes: 14 additions & 0 deletions test/support/async_integration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true

module JobIteration
module Integrations
# https://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/AsyncAdapter.html
module AsyncIntegration
class << self
def interruption_adapter
false
end
end
end
end
end
5 changes: 3 additions & 2 deletions test/support/resque/Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
# $LOAD_PATH.unshift File.dirname(__FILE__) unless $LOAD_PATH.include?(File.dirname(__FILE__))
require "resque/tasks"

require "job-iteration"
require "job-iteration/integrations/resque"
require "job_iteration"
require "job_iteration/integrations/resque_integration"
require "active_job"
require "i18n"

require_relative "../async_integration"
require_relative "../jobs"

redis_url = if ENV["USING_DEV"] == "1"
Expand Down
5 changes: 3 additions & 2 deletions test/support/sidekiq/init.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# frozen_string_literal: true

require "job-iteration"
require "job-iteration/integrations/sidekiq"
require "job_iteration"
require "job_iteration/integrations/sidekiq_integration"

require "active_job"
require "i18n"

require_relative "../async_integration"
require_relative "../jobs"

redis_host = if ENV["USING_DEV"] == "1"
Expand Down
24 changes: 20 additions & 4 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "minitest/autorun"

ENV["ITERATION_DISABLE_AUTOCONFIGURE"] = "true"

require "job-iteration"
require "job-iteration/test_helper"
require "job_iteration"
require "job_iteration/test_helper"

require "globalid"
require "sidekiq"
Expand Down Expand Up @@ -40,6 +38,24 @@ def enqueue_at(job, _delay)
end
end

module JobIteration
module Integrations
class IterationTestIntegration
def stopping?
false
end
end

# https://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/AsyncAdapter.html
class AsyncIntegration
def stopping?
false
end
end
end
end


ActiveJob::Base.queue_adapter = :iteration_test

class Product < ActiveRecord::Base
Expand Down
4 changes: 2 additions & 2 deletions test/unit/active_job_iteration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -696,11 +696,11 @@ def test_respects_job_should_exit_from_parent_class
def test_mark_job_worker_as_interrupted
mark_job_worker_as_interrupted

assert_equal(true, JobIteration.interruption_adapter.call)
assert_equal(true, JobIteration.load_interruption_integration().stopping?)

continue_iterating

assert_equal(false, JobIteration.interruption_adapter.call)
assert_equal(false, JobIteration.load_interruption_integration().stopping?)
end

def test_reenqueue_self
Expand Down