diff --git a/CHANGELOG.md b/CHANGELOG.md index c98a47ac..6da445d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ - [241](https://github.com/Shopify/job-iteration/pull/241) - Require Ruby 2.7+, dropping 2.6 support - [241](https://github.com/Shopify/job-iteration/pull/241) - Require Rails 6.0+, dropping 5.2 support - [240](https://github.com/Shopify/job-iteration/pull/240) - Allow setting inheritable per-job `job_iteration_max_job_runtime` +- [80](https://github.com/Shopify/job-iteration/pull/80) - Serialize cursors using ActiveJob::Arguments +- [80](https://github.com/Shopify/job-iteration/pull/80) - Deprecate un(de)serializable cursors +- [80](https://github.com/Shopify/job-iteration/pull/80) - Add `enforce_serializable_cursors` config ## v1.3.6 (Mar 9, 2022) diff --git a/README.md b/README.md index 5ff6ee10..7cc976ea 100644 --- a/README.md +++ b/README.md @@ -193,6 +193,8 @@ class MyJob < ApplicationJob end ``` +See [the guide on Custom Enumerators](guides/custom-enumerator.md) for details. + ## Credits This project would not be possible without these individuals (in alphabetical order): diff --git a/guides/custom-enumerator.md b/guides/custom-enumerator.md index e0778996..d2733a97 100644 --- a/guides/custom-enumerator.md +++ b/guides/custom-enumerator.md @@ -1,5 +1,9 @@ +# Custom Enumerator + Iteration leverages the [Enumerator](http://ruby-doc.org/core-2.5.1/Enumerator.html) pattern from the Ruby standard library, which allows us to use almost any resource as a collection to iterate. +## Cursorless Enumerator + Consider a custom Enumerator that takes items from a Redis list. Because a Redis List is essentially a queue, we can ignore the cursor: ```ruby @@ -19,6 +23,8 @@ class ListJob < ActiveJob::Base end ``` +## Enumerator with cursor + But what about iterating based on a cursor? Consider this Enumerator that wraps third party API (Stripe) for paginated iteration: ```ruby @@ -82,6 +88,58 @@ class StripeJob < ActiveJob::Base end ``` +## Notes + We recommend that you read the implementation of the other enumerators that come with the library (`CsvEnumerator`, `ActiveRecordEnumerator`) to gain a better understanding of building Enumerator objects. +### Post-`yield` code + Code that is written after the `yield` in a custom enumerator is not guaranteed to execute. In the case that a job is forced to exit ie `job_should_exit?` is true, then the job is re-enqueued during the yield and the rest of the code in the enumerator does not run. You can follow that logic [here](https://github.com/Shopify/job-iteration/blob/9641f455b9126efff2214692c0bef423e0d12c39/lib/job-iteration/iteration.rb#L128-L131). + +### Cursor types + +Cursors should be of a [type that Active Job can serialize](https://guides.rubyonrails.org/active_job_basics.html#supported-types-for-arguments). + +For example, consider: + +```ruby +FancyCursor = Struct.new(:wrapped_value) do + def to_s + wrapped_value + end +end +``` + +```ruby +def build_enumerator(cursor:) + Enumerator.new do |yielder| + # ...something with fancy cursor... + yield 123, FancyCursor.new(:abc) + end +end +``` + +If this job was interrupted, Active Job would be unable to serialize +`FancyCursor`, and Job Iteration would fallback to the legacy behavior of not +serializing the cursor. This would typically result in the queue adapter +eventually serializing the cursor as JSON by calling `.to_s` on it. Upon +deserialization, the cursor would be corrupted as `:abc`, rather than +`FancyCursor.new(:abc)`. + +To avoid this, job authors should take care to ensure that their cursor is +serializable by Active Job. This can be done in a couple ways, such as: +- [adding GlobalID support to the cursor class](https://guides.rubyonrails.org/active_job_basics.html#globalid) +- [implementing a custom Active Job argument serializer for the cursor class](https://guides.rubyonrails.org/active_job_basics.html#serializers) +- handling (de)serialization in the job/enumerator itself + ```ruby + def build_enumerator(cursor:) + fancy_cursor = FancyCursor.new(cursor) unless cursor.nil? + Enumerator.new do |yielder| + # ...something with fancy cursor... + yield 123, FancyCursor(:abc).wrapped_value + end + end + ``` + +Note that starting in 2.0, Job Iteration will stop supporting fallback behavior +and raise when it encounters an unserializable cursor. diff --git a/lib/job-iteration.rb b/lib/job-iteration.rb index 290c0c5c..beca36ff 100644 --- a/lib/job-iteration.rb +++ b/lib/job-iteration.rb @@ -9,6 +9,8 @@ module JobIteration INTEGRATIONS = [:resque, :sidekiq] + Deprecation = ActiveSupport::Deprecation.new("2.0", "JobIteration") + extend self # Use this to _always_ interrupt the job after it's been running for more than N seconds. @@ -29,6 +31,24 @@ module JobIteration # # ... attr_accessor :max_job_runtime + # Set this to `true` to enforce that cursors be composed of objects capable + # of built-in (de)serialization by Active Job. + # + # JobIteration.enforce_serializable_cursors = true + # + # For more granular control, this can also be configured per job class, and + # is inherited by child classes. + # + # class MyJob < ActiveJob::Base + # include JobIteration::Iteration + # self.job_iteration_enforce_serializable_cursors = false + # # ... + # end + # + # Note that non-enforcement is deprecated and enforcement will be mandatory + # in version 2.0, at which point this config will go away. + attr_accessor :enforce_serializable_cursors + # Used internally for hooking into job processing frameworks like Sidekiq and Resque. attr_accessor :interruption_adapter diff --git a/lib/job-iteration/iteration.rb b/lib/job-iteration/iteration.rb index ca8c9957..567eec7f 100644 --- a/lib/job-iteration/iteration.rb +++ b/lib/job-iteration/iteration.rb @@ -13,28 +13,6 @@ module Iteration :total_time, ) - class CursorError < ArgumentError - attr_reader :cursor - - def initialize(message, cursor:) - super(message) - @cursor = cursor - end - - def message - "#{super} (#{inspected_cursor})" - end - - private - - def inspected_cursor - cursor.inspect - rescue NoMethodError - # For those brave enough to try to use BasicObject as cursor. Nice try. - Object.instance_method(:inspect).bind(cursor).call - end - end - included do |_base| define_callbacks :start define_callbacks :shutdown @@ -47,6 +25,13 @@ def inspected_cursor default: JobIteration.max_job_runtime, ) + class_attribute( + :job_iteration_enforce_serializable_cursors, + instance_writer: false, + instance_predicate: false, + default: JobIteration.enforce_serializable_cursors, + ) + singleton_class.prepend(PrependedClassMethods) end @@ -104,16 +89,25 @@ def initialize(*arguments) ruby2_keywords(:initialize) if respond_to?(:ruby2_keywords, true) def serialize # @private - super.merge( - "cursor_position" => cursor_position, + iteration_job_data = { + "cursor_position" => cursor_position, # Backwards compatibility "times_interrupted" => times_interrupted, "total_time" => total_time, - ) + } + + begin + iteration_job_data["serialized_cursor_position"] = serialize_cursor(cursor_position) + rescue ActiveJob::SerializationError + raise if job_iteration_enforce_serializable_cursors + # No point in duplicating the deprecation warning from assert_valid_cursor! + end + + super.merge(iteration_job_data) end def deserialize(job_data) # @private super - self.cursor_position = job_data["cursor_position"] + self.cursor_position = cursor_position_from_job_data(job_data) self.times_interrupted = job_data["times_interrupted"] || 0 self.total_time = job_data["total_time"] || 0 end @@ -176,8 +170,7 @@ def iterate_with_enumerator(enumerator, arguments) @needs_reenqueue = false enumerator.each do |object_from_enumerator, index| - # Deferred until 2.0.0 - # assert_valid_cursor!(index) + assert_valid_cursor!(index) record_unit_of_work do found_record = true @@ -236,16 +229,21 @@ def build_enumerator(params, cursor:) EOS end - # The adapter must be able to serialize and deserialize the cursor back into an equivalent object. - # https://github.com/mperham/sidekiq/wiki/Best-Practices#1-make-your-job-parameters-small-and-simple def assert_valid_cursor!(cursor) - return if serializable?(cursor) + serialize_cursor(cursor) + true + rescue ActiveJob::SerializationError + raise if job_iteration_enforce_serializable_cursors - raise CursorError.new( - "Cursor must be composed of objects capable of built-in (de)serialization: " \ - "Strings, Integers, Floats, Arrays, Hashes, true, false, or nil.", - cursor: cursor, - ) + Deprecation.warn(<<~DEPRECATION_MESSAGE, caller_locations(3)) + The Enumerator returned by #{self.class.name}#build_enumerator yielded a cursor which is unsafe to serialize. + + See https://github.com/Shopify/job-iteration/blob/main/guides/custom-enumerator.md#cursor-types + + This will raise starting in version #{Deprecation.deprecation_horizon} of #{Deprecation.gem_name}!" + DEPRECATION_MESSAGE + + false end def assert_implements_methods! @@ -315,6 +313,25 @@ def handle_completed(completed) raise "Unexpected thrown value: #{completed.inspect}" end + def cursor_position_from_job_data(job_data) + if job_data.key?("serialized_cursor_position") + deserialize_cursor(job_data.fetch("serialized_cursor_position")) + else + # Backwards compatibility for + # - jobs interrupted before cursor serialization feature shipped, or + # - jobs whose cursor could not be serialized + job_data.fetch("cursor_position", nil) + end + end + + def serialize_cursor(cursor) + ActiveJob::Arguments.serialize([cursor]).first + end + + def deserialize_cursor(cursor) + ActiveJob::Arguments.deserialize([cursor]).first + end + def valid_cursor_parameter?(parameters) # this condition is when people use the splat operator. # def build_enumerator(*) @@ -326,21 +343,5 @@ def valid_cursor_parameter?(parameters) end false end - - SIMPLE_SERIALIZABLE_CLASSES = [String, Integer, Float, NilClass, TrueClass, FalseClass].freeze - private_constant :SIMPLE_SERIALIZABLE_CLASSES - def serializable?(object) - # Subclasses must be excluded, hence not using is_a? or ===. - if object.instance_of?(Array) - object.all? { |element| serializable?(element) } - elsif object.instance_of?(Hash) - object.all? { |key, value| serializable?(key) && serializable?(value) } - else - SIMPLE_SERIALIZABLE_CLASSES.any? { |klass| object.instance_of?(klass) } - end - rescue NoMethodError - # BasicObject doesn't respond to instance_of, but we can't serialize it anyway - false - end end end diff --git a/test/integration/integration_behaviour.rb b/test/integration/integration_behaviour.rb index 89dd3602..9ac0662e 100644 --- a/test/integration/integration_behaviour.rb +++ b/test/integration/integration_behaviour.rb @@ -36,17 +36,12 @@ module IntegrationBehaviour test "unserializable corruption is prevented" do skip_until_version("2.0.0") - # Cursors are serialized as JSON, but not all objects are serializable. - # time = Time.at(0).utc # => 1970-01-01 00:00:00 UTC - # json = JSON.dump(time) # => "\"1970-01-01 00:00:00 UTC\"" - # string = JSON.parse(json) # => "1970-01-01 00:00:00 UTC" - # We serialized a Time, but it was deserialized as a String. - TimeCursorJob.perform_later + UnserializableCursorJob.perform_later TerminateJob.perform_later start_worker_and_wait assert_equal( - JobIteration::Iteration::CursorError.name, + ActiveJob::SerializationError.name, failed_job_error_class_name, ) end diff --git a/test/support/jobs.rb b/test/support/jobs.rb index d56a808e..2000aff1 100644 --- a/test/support/jobs.rb +++ b/test/support/jobs.rb @@ -15,11 +15,12 @@ def each_iteration(omg) end end -class TimeCursorJob < ActiveJob::Base +class UnserializableCursorJob < ActiveJob::Base include JobIteration::Iteration + UnserializableCursor = Class.new def build_enumerator(cursor:) - return [["item", Time.now]].to_enum if cursor.nil? + return [["item", UnserializableCursor.new]].to_enum if cursor.nil? raise "This should never run; cursor is unserializable!" end diff --git a/test/unit/iteration_test.rb b/test/unit/iteration_test.rb index 275f5ea8..7f0012d4 100644 --- a/test/unit/iteration_test.rb +++ b/test/unit/iteration_test.rb @@ -63,64 +63,6 @@ def each_iteration(*) end end - class InvalidCursorJob < ActiveJob::Base - include JobIteration::Iteration - def each_iteration(*) - raise "Cursor invalid. This should never run!" - end - end - - class JobWithTimeCursor < InvalidCursorJob - def build_enumerator(cursor:) - [["item", cursor || Time.now]].to_enum - end - end - - class JobWithSymbolCursor < InvalidCursorJob - def build_enumerator(cursor:) - [["item", cursor || :symbol]].to_enum - end - end - - class JobWithActiveRecordCursor < InvalidCursorJob - def build_enumerator(cursor:) - [["item", cursor || Product.first]].to_enum - end - end - - class JobWithStringSubclassCursor < InvalidCursorJob - StringSubClass = Class.new(String) - - def build_enumerator(cursor:) - [["item", cursor || StringSubClass.new]].to_enum - end - end - - class JobWithBasicObjectCursor < InvalidCursorJob - def build_enumerator(cursor:) - [["item", cursor || BasicObject.new]].to_enum - end - end - - class JobWithComplexCursor < ActiveJob::Base - include JobIteration::Iteration - def build_enumerator(cursor:) - [[ - "item", - cursor || [{ - "string" => "abc", - "integer" => 123, - "float" => 4.56, - "booleans" => [true, false], - "null" => nil, - }], - ]].to_enum - end - - def each_iteration(*) - end - end - class JobThatCompletesAfter3Seconds < ActiveJob::Base include JobIteration::Iteration include ActiveSupport::Testing::TimeHelpers @@ -138,6 +80,23 @@ def each_iteration(*) end end + class InfiniteCursorLoggingJob < ActiveJob::Base + include JobIteration::Iteration + class << self + def cursors + @cursors ||= [] + end + end + + def build_enumerator(cursor:) + self.class.cursors << cursor + ["VALUE", "CURSOR"].cycle + end + + def each_iteration(*) + end + end + def test_jobs_that_define_build_enumerator_and_each_iteration_will_not_raise push(JobWithRightMethods, "walrus" => "best") work_one_job @@ -199,41 +158,169 @@ def foo assert_includes(methods_added, :foo) end - def test_jobs_using_time_cursor_will_raise - skip_until_version("2.0.0") - push(JobWithTimeCursor) - assert_raises_cursor_error { work_one_job } + UnserializableCursor = Class.new + SerializableCursor = Class.new + + class SerializableCursorSerializer < ActiveJob::Serializers::ObjectSerializer + def serialize(_) + super({}) + end + + def deserialize(_) + klass.new + end + + private + + def klass + SerializableCursor + end end + ActiveJob::Serializers.add_serializers(SerializableCursorSerializer) - def test_jobs_using_active_record_cursor_will_raise + def test_jobs_using_unserializable_cursor_will_raise skip_until_version("2.0.0") - refute_nil(Product.first) - push(JobWithActiveRecordCursor) - assert_raises_cursor_error { work_one_job } + + job_class = build_invalid_cursor_job(cursor: UnserializableCursor.new) + + assert_raises(ActiveJob::SerializationError) do + job_class.perform_now + end end - def test_jobs_using_symbol_cursor_will_raise - skip_until_version("2.0.0") - push(JobWithSymbolCursor) - assert_raises_cursor_error { work_one_job } + def test_jobs_using_unserializable_cursor_is_deprecated + job_class = build_invalid_cursor_job(cursor: UnserializableCursor.new) + + assert_cursor_deprecation_warning_on_perform(job_class) end - def test_jobs_using_string_subclass_cursor_will_raise - skip_until_version("2.0.0") - push(JobWithStringSubclassCursor) - assert_raises_cursor_error { work_one_job } + def test_jobs_using_serializable_cursor_is_not_deprecated + job_class = build_invalid_cursor_job(cursor: SerializableCursor.new) + + assert_no_cursor_deprecation_warning_on_perform(job_class) end - def test_jobs_using_basic_object_cursor_will_raise - skip_until_version("2.0.0") - push(JobWithBasicObjectCursor) - assert_raises_cursor_error { work_one_job } + def test_jobs_using_complex_but_serializable_cursor_is_not_deprecated + job_class = build_invalid_cursor_job(cursor: [{ + "string" => "abc", + "integer" => 123, + "float" => 4.56, + "booleans" => [true, false], + "null" => nil, + }]) + + assert_no_cursor_deprecation_warning_on_perform(job_class) end - def test_jobs_using_complex_but_serializable_cursor_will_not_raise - skip_until_version("2.0.0") - push(JobWithComplexCursor) - work_one_job + def test_jobs_using_unserializable_cursor_will_raise_if_enforce_serializable_cursors_globally_enabled + with_global_enforce_serializable_cursors(true) do + job_class = build_invalid_cursor_job(cursor: UnserializableCursor.new) + + assert_raises(ActiveJob::SerializationError) do + job_class.perform_now + end + end + end + + def test_jobs_using_unserializable_cursor_will_raise_if_enforce_serializable_cursors_set_per_class + with_global_enforce_serializable_cursors(false) do + job_class = build_invalid_cursor_job(cursor: UnserializableCursor.new) + job_class.job_iteration_enforce_serializable_cursors = true + + assert_raises(ActiveJob::SerializationError) do + job_class.perform_now + end + end + end + + def test_jobs_using_unserializable_cursor_will_raise_if_enforce_serializable_cursors_set_in_parent + with_global_enforce_serializable_cursors(false) do + parent = build_invalid_cursor_job(cursor: UnserializableCursor.new) + parent.job_iteration_enforce_serializable_cursors = true + child = Class.new(parent) + + assert_raises(ActiveJob::SerializationError) do + child.perform_now + end + end + end + + def test_jobs_using_unserializable_cursor_will_not_raise_if_enforce_serializable_cursors_unset_per_class + with_global_enforce_serializable_cursors(true) do + job_class = build_invalid_cursor_job(cursor: UnserializableCursor.new) + job_class.job_iteration_enforce_serializable_cursors = false + + assert_cursor_deprecation_warning_on_perform(job_class) + end + end + + def test_jobs_using_unserializable_cursor_when_interrupted_should_only_store_the_cursor_and_no_serialized_cursor + job_class = build_invalid_cursor_job(cursor: UnserializableCursor.new) + with_interruption do + assert_cursor_deprecation_warning_on_perform(job_class) + end + + job_data = ActiveJob::Base.queue_adapter.enqueued_jobs.last + refute_nil(job_data, "interrupted job expected in queue") + assert_instance_of(UnserializableCursor, job_data.fetch("cursor_position")) + refute_includes(job_data, "serialized_cursor_position") + end + + def test_jobs_using_serializable_cursor_when_interrupted_should_store_both_legacy_cursor_and_serialized_cursor + job_class = build_invalid_cursor_job(cursor: SerializableCursor.new) + with_interruption do + assert_no_cursor_deprecation_warning_on_perform(job_class) + end + + job_data = ActiveJob::Base.queue_adapter.enqueued_jobs.last + refute_nil(job_data, "interrupted job expected in queue") + assert_instance_of(SerializableCursor, job_data.fetch("cursor_position")) + assert_equal(ActiveJob::Arguments.serialize([SerializableCursor.new]).first, + job_data.fetch("serialized_cursor_position")) + end + + def test_job_interrupted_with_only_cursor_position_should_resume + with_interruption do + InfiniteCursorLoggingJob.perform_now + + work_one_job do |job_data| + job_data["cursor_position"] = "raw cursor" + job_data.delete("serialized_cursor_position") + end + + assert_equal("raw cursor", InfiniteCursorLoggingJob.cursors.last) + end + ensure + InfiniteCursorLoggingJob.cursors.clear + end + + def test_job_interrupted_with_serialized_cursor_position_should_ignore_unserialized_cursor_position + with_interruption do + InfiniteCursorLoggingJob.perform_now + + work_one_job do |job_data| + job_data["cursor_position"] = "should be ignored" + job_data["serialized_cursor_position"] = ActiveJob::Arguments.serialize([SerializableCursor.new]).first + end + + assert_instance_of(SerializableCursor, InfiniteCursorLoggingJob.cursors.last) + end + ensure + InfiniteCursorLoggingJob.cursors.clear + end + + def test_job_resuming_with_invalid_serialized_cursor_position_should_raise + with_interruption do + InfiniteCursorLoggingJob.perform_now + assert_raises(ActiveJob::DeserializationError) do + work_one_job do |job_data| + job_data["cursor_position"] = "should be ignored" + job_data["serialized_cursor_position"] = UnserializableCursor.new # cannot be deserialized + end + end + end + ensure + InfiniteCursorLoggingJob.cursors.clear end def test_jobs_using_on_complete_have_accurate_total_time @@ -379,7 +466,6 @@ def assert_raises_cursor_error(&block) rescue NoMethodError Object.instance_method(:inspect).bind(error.cursor).call end - assert_equal( "Cursor must be composed of objects capable of built-in (de)serialization: " \ "Strings, Integers, Floats, Arrays, Hashes, true, false, or nil. " \ @@ -388,6 +474,56 @@ def assert_raises_cursor_error(&block) ) end + # This helper allows us to create a class that reads config at test time, instead of when this file is loaded + def build_invalid_cursor_job(cursor:) + test_cursor = cursor # so we don't collide with the method param below + Class.new(ActiveJob::Base) do + include JobIteration::Iteration + define_method(:build_enumerator) do |cursor:| + current_cursor = cursor + [["item", current_cursor || test_cursor]].to_enum + end + define_method(:each_iteration) do |*| + return if Gem::Version.new(JobIteration::VERSION) < Gem::Version.new("2.0") + + raise "Cursor invalid. Starting in version 2.0, this should never run!" + end + singleton_class.define_method(:name) do + "InvalidCursorJob (#{cursor.class})" + end + end + end + + def assert_cursor_deprecation_warning_on_perform(job_class) + expected_message = <<~MESSAGE.chomp + DEPRECATION WARNING: The Enumerator returned by #{job_class.name}#build_enumerator yielded a cursor which is unsafe to serialize. + + See https://github.com/Shopify/job-iteration/blob/main/guides/custom-enumerator.md#cursor-types + + This will raise starting in version #{JobIteration::Deprecation.deprecation_horizon} of #{JobIteration::Deprecation.gem_name}! + MESSAGE + + warned = false + with_deprecation_behavior( + lambda do |message, *| + flunk("expected only one deprecation warning") if warned + warned = true + assert( + message.start_with?(expected_message), + "expected deprecation warning \n#{message.inspect}\n to start_with? \n#{expected_message.inspect}", + ) + end, + ) { job_class.perform_now } + + assert(warned, "expected deprecation warning") + end + + def assert_no_cursor_deprecation_warning_on_perform(job_class) + with_deprecation_behavior( + ->(message, *) { flunk("Expected no deprecation warning: #{message}") }, + ) { job_class.perform_now } + end + def assert_partially_completed_job(cursor_position:) message = "Expected to find partially completed job enqueued with cursor_position: #{cursor_position}" enqueued_job = ActiveJob::Base.queue_adapter.enqueued_jobs.first @@ -400,8 +536,37 @@ def push(job, *args) end def work_one_job - job = ActiveJob::Base.queue_adapter.enqueued_jobs.pop - ActiveJob::Base.execute(job) + job_data = ActiveJob::Base.queue_adapter.enqueued_jobs.pop + yield job_data if block_given? + ActiveJob::Base.execute(job_data) + end + + def with_deprecation_behavior(behavior) + original_behaviour = JobIteration::Deprecation.behavior + JobIteration::Deprecation.behavior = behavior + yield + ensure + JobIteration::Deprecation.behavior = original_behaviour + end + + def with_global_enforce_serializable_cursors(temp) + original = JobIteration.enforce_serializable_cursors + JobIteration.enforce_serializable_cursors = temp + yield + ensure + JobIteration.enforce_serializable_cursors = original + end + + def with_interruption(&block) + with_interruption_adapter(-> { true }, &block) + end + + def with_interruption_adapter(temp) + original = JobIteration.interruption_adapter + JobIteration.interruption_adapter = temp + yield + ensure + JobIteration.interruption_adapter = original end def with_global_max_job_runtime(new)