Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom enumerators #944

Merged
merged 1 commit into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,35 @@ module Maintenance
end
```

### Tasks with Custom Enumerators

If you have a special use case requiring iteration over an unsupported
collection type, such as external resources fetched from some API, you can
implement the `enumerator_builder(cursor:)` method in your task.

This method should return an `Enumerator`, yielding pairs of
`[item, cursor]`. Maintenance Tasks takes care of persisting the current
cursor position and will provide it as the `cursor` argument if your task is
interrupted or resumed. The `cursor` is stored as a `String`, so your custom
enumerator should handle serializing/deserializing the value if required.

```ruby
# app/tasks/maintenance/custom_enumerator_task.rb

module Maintenance
class CustomEnumeratorTask < MaintenanceTasks::Task
def enumerator_builder(cursor:)
after_id = cursor&.to_i
PostAPI.index(after_id: after_id).map { |post| [post, post.id] }.to_enum
end

def process(post)
Post.create!(post)
end
end
end
```
rianmcguire marked this conversation as resolved.
Show resolved Hide resolved

### Throttling

Maintenance tasks often modify a lot of data and can be taxing on your database.
Expand Down
40 changes: 1 addition & 39 deletions app/jobs/concerns/maintenance_tasks/task_job_concern.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,45 +32,7 @@ def retry_on(*, **)

def build_enumerator(_run, cursor:)
cursor ||= @run.cursor
collection = @task.collection
@enumerator = nil

@collection_enum = case collection
when :no_collection
enumerator_builder.build_once_enumerator(cursor: nil)
when ActiveRecord::Relation
enumerator_builder.active_record_on_records(collection, cursor: cursor)
when ActiveRecord::Batches::BatchEnumerator
if collection.start || collection.finish
raise ArgumentError, <<~MSG.squish
#{@task.class.name}#collection cannot support
a batch enumerator with the "start" or "finish" options.
MSG
end

# For now, only support automatic count based on the enumerator for
# batches
enumerator_builder.active_record_on_batch_relations(
collection.relation,
cursor: cursor,
batch_size: collection.batch_size,
)
when Array
enumerator_builder.build_array_enumerator(collection, cursor: cursor&.to_i)
when BatchCsvCollectionBuilder::BatchCsv
JobIteration::CsvEnumerator.new(collection.csv).batches(
batch_size: collection.batch_size,
cursor: cursor&.to_i,
)
when CSV
JobIteration::CsvEnumerator.new(collection).rows(cursor: cursor&.to_i)
else
raise ArgumentError, <<~MSG.squish
#{@task.class.name}#collection must be either an
Active Record Relation, ActiveRecord::Batches::BatchEnumerator,
Array, or CSV.
MSG
end
@collection_enum = @task.enumerator_builder(cursor: cursor)
throttle_enumerator(@collection_enum)
end

Expand Down
50 changes: 50 additions & 0 deletions app/models/maintenance_tasks/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -247,5 +247,55 @@ def process(_item)
def count
self.class.collection_builder_strategy.count(self)
end

# Default enumeration builder. You may override this method to return any
# Enumerator yielding pairs of `[item, item_cursor]`.
#
# @param cursor [String, nil] cursor position to resume from, or nil on
# initial call.
#
# @return [Enumerator]
def enumerator_builder(cursor:)
collection = self.collection

job_iteration_builder = JobIteration::EnumeratorBuilder.new(nil)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit icky to pass in nil here - it's for the job argument.

@job is only referenced by EnumeratorBuilder#build_throttle_enumerator, which isn't called here, so there's no immediate issue with it being nil.

Alternatively, job could be provided as an additional argument to enumerator_builder, but leaking the job instance into the Task interface feels worse than passing in nil.


case collection
when :no_collection
job_iteration_builder.build_once_enumerator(cursor: nil)
when ActiveRecord::Relation
job_iteration_builder.active_record_on_records(collection, cursor: cursor)
when ActiveRecord::Batches::BatchEnumerator
if collection.start || collection.finish
raise ArgumentError, <<~MSG.squish
#{self.class.name}#collection cannot support
a batch enumerator with the "start" or "finish" options.
MSG
end

# For now, only support automatic count based on the enumerator for
# batches
job_iteration_builder.active_record_on_batch_relations(
collection.relation,
cursor: cursor,
batch_size: collection.batch_size,
)
when Array
job_iteration_builder.build_array_enumerator(collection, cursor: cursor&.to_i)
when BatchCsvCollectionBuilder::BatchCsv
JobIteration::CsvEnumerator.new(collection.csv).batches(
batch_size: collection.batch_size,
cursor: cursor&.to_i,
)
when CSV
JobIteration::CsvEnumerator.new(collection).rows(cursor: cursor&.to_i)
else
raise ArgumentError, <<~MSG.squish
#{self.class.name}#collection must be either an
Active Record Relation, ActiveRecord::Batches::BatchEnumerator,
Array, or CSV.
MSG
end
end
end
end
18 changes: 18 additions & 0 deletions test/dummy/app/tasks/maintenance/custom_enumerating_task.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

module Maintenance
class CustomEnumeratingTask < MaintenanceTasks::Task
def enumerator_builder(cursor:)
drop = cursor.nil? ? 0 : cursor.to_i + 1

[:a, :b, :c].lazy.with_index.drop(drop)
end

def count
3
end

def process(_)
end
end
end
31 changes: 31 additions & 0 deletions test/jobs/maintenance_tasks/task_job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -617,5 +617,36 @@ class << self

assert_equal 2, @run.reload.tick_total
end

test ".perform_now accepts custom enumerated tasks" do
run = Run.create!(task_name: "Maintenance::CustomEnumeratingTask")

[:a, :b, :c].each do |item|
Maintenance::CustomEnumeratingTask.any_instance
.expects(:process).with(item).once
end

TaskJob.perform_now(run)
end

test ".perform_now handles cursors provided by custom enumerated tasks" do
run = Run.create!(task_name: "Maintenance::CustomEnumeratingTask")

TaskJob.perform_now(run)

assert_equal "2", run.reload.cursor
end

test ".perform_now starts custom enumerated tasks from cursor position when job resumes" do
run = Run.create!(task_name: "Maintenance::CustomEnumeratingTask")
run.update!(cursor: "0")

[:b, :c].each do |item|
Maintenance::CustomEnumeratingTask.any_instance
.expects(:process).with(item).once
end

TaskJob.perform_now(run)
end
end
end
1 change: 1 addition & 0 deletions test/models/maintenance_tasks/task_data_index_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class TaskDataIndexTest < ActiveSupport::TestCase
"Maintenance::BatchImportPostsTask",
"Maintenance::CallbackTestTask",
"Maintenance::CancelledEnqueueTask",
"Maintenance::CustomEnumeratingTask",
"Maintenance::EnqueueErrorTask",
"Maintenance::ErrorTask",
"Maintenance::ImportPostsTask",
Expand Down
1 change: 1 addition & 0 deletions test/models/maintenance_tasks/task_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class TaskTest < ActiveSupport::TestCase
"Maintenance::BatchImportPostsTask",
"Maintenance::CallbackTestTask",
"Maintenance::CancelledEnqueueTask",
"Maintenance::CustomEnumeratingTask",
"Maintenance::EnqueueErrorTask",
"Maintenance::ErrorTask",
"Maintenance::ImportPostsTask",
Expand Down
1 change: 1 addition & 0 deletions test/system/maintenance_tasks/tasks_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class TasksTest < ApplicationSystemTestCase
"Maintenance::BatchImportPostsTask\nNew",
"Maintenance::CallbackTestTask\nNew",
"Maintenance::CancelledEnqueueTask\nNew",
"Maintenance::CustomEnumeratingTask\nNew",
"Maintenance::EnqueueErrorTask\nNew",
"Maintenance::ErrorTask\nNew",
"Maintenance::Nested::NestedMore::NestedMoreTask\nNew",
Expand Down