diff --git a/README.md b/README.md index 40cea400..4f8c3adb 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Solid Queue is a DB-based queuing backend for [Active Job](https://edgeguides.rubyonrails.org/active_job_basics.html), designed with simplicity and performance in mind. -Besides regular job enqueuing and processing, Solid Queue supports delayed jobs, concurrency controls, recurring jobs, pausing queues, numeric priorities per job, priorities by queue order, and bulk enqueuing (`enqueue_all` for Active Job's `perform_all_later`). +Besides regular job enqueuing and processing, Solid Queue supports delayed jobs, concurrency controls, batch processing, recurring jobs, pausing queues, numeric priorities per job, priorities by queue order, and bulk enqueuing (`enqueue_all` for Active Job's `perform_all_later`). Solid Queue can be used with SQL databases such as MySQL, PostgreSQL or SQLite, and it leverages the `FOR UPDATE SKIP LOCKED` clause, if available, to avoid blocking and waiting on locks when polling jobs. It relies on Active Job for retries, discarding, error handling, serialization, or delays, and it's compatible with Ruby on Rails's multi-threading. @@ -503,6 +503,74 @@ production: Or something similar to that depending on your setup. You can also assign a different queue to a job on the moment of enqueuing so you can decide whether to enqueue a job in the throttled queue or another queue depending on the arguments, or pass a block to `queue_as` as explained [here](https://guides.rubyonrails.org/active_job_basics.html#queues). +## Batch processing + +Solid Queue supports grouping jobs into batches, allowing you to track their collective progress, run callbacks when all jobs complete, and manage complex workflows. This is useful for processing large datasets in parallel, importing files, or any scenario where you need to coordinate multiple jobs. + +To create a batch, use `perform_batch_later`: + +```ruby +# Simple batch +batch = MyJob.perform_batch_later([ + { user_id: 1, action: "update" }, + { user_id: 2, action: "update" }, + { user_id: 3, action: "update" } +]) + +puts batch.batch_id # => "550e8400-e29b-41d4-a716..." +puts batch.total_jobs # => 3 +``` + +You can specify callbacks to run when the batch completes: + +```ruby +batch = DataImportJob.perform_batch_later( + import_rows, + on_success: { job: ImportSuccessJob, args: { email: "admin@example.com" } }, + on_failure: { job: ImportFailureJob, args: { email: "admin@example.com" } }, + on_complete: { job: ImportCompleteJob }, + metadata: { source: "api", imported_by: current_user.id } +) +``` + +- `on_success`: Runs when all jobs complete successfully +- `on_failure`: Runs if any job fails +- `on_complete`: Always runs when the batch finishes + +Jobs can check if they're part of a batch: + +```ruby +class DataImportJob < ApplicationJob + def perform(row_data) + if in_batch? + Rails.logger.info "Processing row as part of batch #{batch_id}" + Rails.logger.info "Batch progress: #{batch_progress}%" + end + + # Process the row... + end +end +``` + +You can query and monitor batches: + +```ruby +# Find a batch +batch = SolidQueue::Batch.find_by(batch_id: batch_id) + +# Check progress +batch.pending_jobs # => 10 +batch.completed_jobs # => 85 +batch.failed_jobs # => 5 +batch.progress_percentage # => 90.0 +batch.finished? # => false + +# Query batches by status +SolidQueue::Batch.pending +SolidQueue::Batch.completed +SolidQueue::Batch.failed +``` + ## Failed jobs and retries Solid Queue doesn't include any automatic retry mechanism, it [relies on Active Job for this](https://edgeguides.rubyonrails.org/active_job_basics.html#retrying-or-discarding-failed-jobs). Jobs that fail will be kept in the system, and a _failed execution_ (a record in the `solid_queue_failed_executions` table) will be created for these. The job will stay there until manually discarded or re-enqueued. You can do this in a console as: diff --git a/app/jobs/solid_queue/batch_update_job.rb b/app/jobs/solid_queue/batch_update_job.rb new file mode 100644 index 00000000..7df4632b --- /dev/null +++ b/app/jobs/solid_queue/batch_update_job.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +module SolidQueue + class BatchUpdateJob < ActiveJob::Base + queue_as :default + + discard_on ActiveRecord::RecordNotFound + + def perform(batch_id, job_id) + batch = Batch.find_by!(batch_id: batch_id) + job = Job.find_by!(id: job_id) + + # Only process if the job is actually finished and belongs to this batch + return unless job.finished? && job.batch_id == batch_id + + batch.job_finished!(job) + rescue => e + Rails.logger.error "[SolidQueue] BatchUpdateJob failed for batch #{batch_id}, job #{job_id}: #{e.message}" + raise + end + end +end diff --git a/app/models/solid_queue/batch.rb b/app/models/solid_queue/batch.rb new file mode 100644 index 00000000..bd3d2490 --- /dev/null +++ b/app/models/solid_queue/batch.rb @@ -0,0 +1,152 @@ +# frozen_string_literal: true + +module SolidQueue + class Batch < Record + serialize :on_complete_job_args, coder: JSON + serialize :on_success_job_args, coder: JSON + serialize :on_failure_job_args, coder: JSON + serialize :metadata, coder: JSON + + STATUSES = %w[pending processing completed failed] + + validates :batch_id, uniqueness: true + validates :status, inclusion: { in: STATUSES } + + has_many :jobs, foreign_key: :batch_id, primary_key: :batch_id, dependent: :nullify + + scope :pending, -> { where(status: "pending") } + scope :processing, -> { where(status: "processing") } + scope :completed, -> { where(status: "completed") } + scope :failed, -> { where(status: "failed") } + scope :finished, -> { where(status: %w[completed failed]) } + scope :unfinished, -> { where(status: %w[pending processing]) } + + before_create :set_batch_id + + class << self + def enqueue(job_instances, on_complete: nil, on_success: nil, on_failure: nil, metadata: {}) + return 0 if job_instances.empty? + + batch = create!( + on_complete_job_class: on_complete&.dig(:job)&.to_s, + on_complete_job_args: on_complete&.dig(:args), + on_success_job_class: on_success&.dig(:job)&.to_s, + on_success_job_args: on_success&.dig(:args), + on_failure_job_class: on_failure&.dig(:job)&.to_s, + on_failure_job_args: on_failure&.dig(:args), + metadata: metadata, + total_jobs: job_instances.size, + pending_jobs: job_instances.size + ) + + # Add batch_id to each job + job_instances.each do |job| + job.batch_id = batch.batch_id + end + + # Use SolidQueue's bulk enqueue + enqueued_count = SolidQueue::Job.enqueue_all(job_instances) + + # Update pending count if some jobs failed to enqueue + if enqueued_count < job_instances.size + batch.update!(pending_jobs: enqueued_count) + end + + batch + end + end + + def add_jobs(job_instances) + return 0 if job_instances.empty? || finished? + + job_instances.each do |job| + job.batch_id = batch_id + end + + enqueued_count = SolidQueue::Job.enqueue_all(job_instances) + + increment!(:total_jobs, job_instances.size) + increment!(:pending_jobs, enqueued_count) + + enqueued_count + end + + def job_finished!(job) + return if finished? + + transaction do + if job.failed_execution.present? + increment!(:failed_jobs) + else + increment!(:completed_jobs) + end + + decrement!(:pending_jobs) + + check_completion! + end + end + + def check_completion! + return if finished? + + if pending_jobs <= 0 + if failed_jobs > 0 + mark_as_failed! + else + mark_as_completed! + end + elsif status == "pending" + update!(status: "processing") + end + end + + def finished? + status.in?(%w[completed failed]) + end + + def processing? + status == "processing" + end + + def pending? + status == "pending" + end + + def progress_percentage + return 0 if total_jobs == 0 + ((completed_jobs + failed_jobs) * 100.0 / total_jobs).round(2) + end + + private + def set_batch_id + self.batch_id ||= SecureRandom.uuid + end + + def mark_as_completed! + update!(status: "completed", completed_at: Time.current) + enqueue_callback(:on_success) + enqueue_callback(:on_complete) + end + + def mark_as_failed! + update!(status: "failed", completed_at: Time.current) + enqueue_callback(:on_failure) + enqueue_callback(:on_complete) + end + + def enqueue_callback(callback_type) + job_class = public_send("#{callback_type}_job_class") + job_args = public_send("#{callback_type}_job_args") + + return unless job_class.present? + + job_class.constantize.perform_later( + batch_id: batch_id, + **(job_args || {}).symbolize_keys + ) + rescue => e + Rails.logger.error "[SolidQueue] Failed to enqueue #{callback_type} callback for batch #{batch_id}: #{e.message}" + end + end +end diff --git a/app/models/solid_queue/job.rb b/app/models/solid_queue/job.rb index 8574c1ec..7402d0c1 100644 --- a/app/models/solid_queue/job.rb +++ b/app/models/solid_queue/job.rb @@ -4,7 +4,7 @@ module SolidQueue class Job < Record class EnqueueError < StandardError; end - include Executable, Clearable, Recurrable + include Executable, Clearable, Recurrable, Batchable serialize :arguments, coder: JSON @@ -60,7 +60,8 @@ def attributes_from_active_job(active_job) scheduled_at: active_job.scheduled_at, class_name: active_job.class.name, arguments: active_job.serialize, - concurrency_key: active_job.concurrency_key + concurrency_key: active_job.concurrency_key, + batch_id: active_job.respond_to?(:batch_id) ? active_job.batch_id : nil } end end diff --git a/app/models/solid_queue/job/batchable.rb b/app/models/solid_queue/job/batchable.rb new file mode 100644 index 00000000..f3e7dcdd --- /dev/null +++ b/app/models/solid_queue/job/batchable.rb @@ -0,0 +1,67 @@ +# frozen_string_literal: true + +module SolidQueue + class Job + module Batchable + extend ActiveSupport::Concern + + included do + belongs_to :batch, foreign_key: :batch_id, primary_key: :batch_id, optional: true, class_name: "SolidQueue::Batch" + + scope :in_batch, ->(batch_id) { where(batch_id: batch_id) } + scope :without_batch, -> { where(batch_id: nil) } + scope :batch_pending, -> { in_batch.where(finished_at: nil) } + scope :batch_finished, -> { in_batch.where.not(finished_at: nil) } + + after_update :notify_batch_if_finished, if: :batch_id? + end + + class_methods do + def enqueue_batch(active_jobs, **batch_options) + return 0 if active_jobs.empty? + + Batch.enqueue(active_jobs, **batch_options) + end + + def create_all_from_active_jobs_with_batch(active_jobs, batch_id = nil) + if batch_id.present? + job_rows = active_jobs.map do |job| + attributes_from_active_job(job).merge(batch_id: batch_id) + end + insert_all(job_rows) + where(active_job_id: active_jobs.map(&:job_id)) + else + create_all_from_active_jobs_without_batch(active_jobs) + end + end + end + + def in_batch? + batch_id.present? + end + + def batch_siblings + return Job.none unless in_batch? + + self.class.in_batch(batch_id).where.not(id: id) + end + + def batch_position + return nil unless in_batch? + + batch.jobs.where("id <= ?", id).count + end + + private + def notify_batch_if_finished + return unless saved_change_to_finished_at? && finished_at.present? + return unless batch.present? + + # Use perform_later to avoid holding locks + BatchUpdateJob.perform_later(batch_id, id) + rescue => e + Rails.logger.error "[SolidQueue] Failed to notify batch #{batch_id} about job #{id} completion: #{e.message}" + end + end + end +end diff --git a/lib/active_job/batches.rb b/lib/active_job/batches.rb new file mode 100644 index 00000000..adff1448 --- /dev/null +++ b/lib/active_job/batches.rb @@ -0,0 +1,94 @@ +# frozen_string_literal: true + +module ActiveJob + module Batches + extend ActiveSupport::Concern + + included do + attr_accessor :batch_id + end + + class_methods do + def perform_batch(job_args_array, **batch_options) + return if job_args_array.empty? + + jobs = job_args_array.map do |args| + # Handle both array and hash arguments + if args.is_a?(Hash) + new(**args) + else + new(*Array(args)) + end + end + + SolidQueue::Batch.enqueue(jobs, **batch_options) + end + + def perform_batch_later(job_args_array, **batch_options) + perform_batch(job_args_array, **batch_options) + end + + def perform_batch_at(scheduled_at, job_args_array, **batch_options) + return if job_args_array.empty? + + jobs = job_args_array.map do |args| + job = if args.is_a?(Hash) + new(**args) + else + new(*Array(args)) + end + job.scheduled_at = scheduled_at + job + end + + SolidQueue::Batch.enqueue(jobs, **batch_options) + end + end + + def batch + return nil unless batch_id.present? + @batch ||= SolidQueue::Batch.find_by(batch_id: batch_id) + end + + def in_batch? + batch_id.present? + end + + def batch_siblings + return self.class.none unless in_batch? + + batch.jobs.map do |job| + ActiveJob::Base.deserialize(job.arguments) + rescue + nil + end.compact + end + + def batch_progress + batch&.progress_percentage || 0 + end + + def batch_status + batch&.status + end + + def batch_finished? + batch&.finished? || false + end + + def serialize + super.tap do |job_data| + job_data["batch_id"] = batch_id if batch_id.present? + end + end + + def deserialize(job_data) + super + self.batch_id = job_data["batch_id"] + end + end + + class Base + include Batches + end +end diff --git a/lib/generators/solid_queue/install/templates/db/queue_schema.rb b/lib/generators/solid_queue/install/templates/db/queue_schema.rb index 85194b6a..b1c087e2 100644 --- a/lib/generators/solid_queue/install/templates/db/queue_schema.rb +++ b/lib/generators/solid_queue/install/templates/db/queue_schema.rb @@ -35,9 +35,12 @@ t.datetime "scheduled_at" t.datetime "finished_at" t.string "concurrency_key" + t.string "batch_id" t.datetime "created_at", null: false t.datetime "updated_at", null: false t.index [ "active_job_id" ], name: "index_solid_queue_jobs_on_active_job_id" + t.index [ "batch_id" ], name: "index_solid_queue_jobs_on_batch_id" + t.index [ "batch_id", "finished_at" ], name: "index_solid_queue_jobs_on_batch_id_and_finished_at" t.index [ "class_name" ], name: "index_solid_queue_jobs_on_class_name" t.index [ "finished_at" ], name: "index_solid_queue_jobs_on_finished_at" t.index [ "queue_name", "finished_at" ], name: "index_solid_queue_jobs_for_filtering" @@ -120,10 +123,33 @@ t.index [ "key" ], name: "index_solid_queue_semaphores_on_key", unique: true end + create_table "solid_queue_batches", force: :cascade do |t| + t.string "batch_id", null: false + t.string "on_complete_job_class" + t.text "on_complete_job_args" + t.string "on_success_job_class" + t.text "on_success_job_args" + t.string "on_failure_job_class" + t.text "on_failure_job_args" + t.text "metadata" + t.integer "total_jobs", default: 0, null: false + t.integer "pending_jobs", default: 0, null: false + t.integer "completed_jobs", default: 0, null: false + t.integer "failed_jobs", default: 0, null: false + t.string "status", default: "pending", null: false + t.datetime "completed_at" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index [ "batch_id" ], name: "index_solid_queue_batches_on_batch_id", unique: true + t.index [ "status" ], name: "index_solid_queue_batches_on_status" + t.index [ "status", "created_at" ], name: "index_solid_queue_batches_on_status_and_created_at" + end + add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_jobs", "solid_queue_batches", column: "batch_id", primary_key: "batch_id", on_delete: :nullify end diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index e0d51c8c..d52d585c 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -5,6 +5,7 @@ require "active_job" require "active_job/queue_adapters" +require "active_job/batches" require "active_support" require "active_support/core_ext/numeric/time" diff --git a/test/dummy/db/queue_schema.rb b/test/dummy/db/queue_schema.rb index 697c2e92..c94b0d6a 100644 --- a/test/dummy/db/queue_schema.rb +++ b/test/dummy/db/queue_schema.rb @@ -47,9 +47,12 @@ t.datetime "scheduled_at" t.datetime "finished_at" t.string "concurrency_key" + t.string "batch_id" t.datetime "created_at", null: false t.datetime "updated_at", null: false t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id" + t.index ["batch_id"], name: "index_solid_queue_jobs_on_batch_id" + t.index ["batch_id", "finished_at"], name: "index_solid_queue_jobs_on_batch_id_and_finished_at" t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name" t.index ["finished_at"], name: "index_solid_queue_jobs_on_finished_at" t.index ["queue_name", "finished_at"], name: "index_solid_queue_jobs_for_filtering" @@ -132,10 +135,33 @@ t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true end + create_table "solid_queue_batches", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "batch_id", null: false + t.string "on_complete_job_class" + t.text "on_complete_job_args" + t.string "on_success_job_class" + t.text "on_success_job_args" + t.string "on_failure_job_class" + t.text "on_failure_job_args" + t.text "metadata" + t.integer "total_jobs", default: 0, null: false + t.integer "pending_jobs", default: 0, null: false + t.integer "completed_jobs", default: 0, null: false + t.integer "failed_jobs", default: 0, null: false + t.string "status", default: "pending", null: false + t.datetime "completed_at" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["batch_id"], name: "index_solid_queue_batches_on_batch_id", unique: true + t.index ["status"], name: "index_solid_queue_batches_on_status" + t.index ["status", "created_at"], name: "index_solid_queue_batches_on_status_and_created_at" + end + add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade + add_foreign_key "solid_queue_jobs", "solid_queue_batches", column: "batch_id", primary_key: "batch_id", on_delete: :nullify end diff --git a/test/models/solid_queue/batch_test.rb b/test/models/solid_queue/batch_test.rb new file mode 100644 index 00000000..7e2bb148 --- /dev/null +++ b/test/models/solid_queue/batch_test.rb @@ -0,0 +1,311 @@ +# frozen_string_literal: true + +require "test_helper" + +class SolidQueue::BatchTest < ActiveSupport::TestCase + class TestJob < ApplicationJob + def perform(value) + # Simple test job + end + end + + class CallbackJob < ApplicationJob + def perform(batch_id:, **options) + # Callback test job + end + end + + setup do + @job_args = [ + { value: 1 }, + { value: 2 }, + { value: 3 } + ] + end + + test "creates batch with multiple jobs" do + jobs = @job_args.map { |args| TestJob.new(**args) } + + assert_difference -> { SolidQueue::Batch.count } => 1, -> { SolidQueue::Job.count } => 3 do + batch = SolidQueue::Batch.enqueue(jobs) + assert_not_nil batch.batch_id + assert_equal 3, batch.total_jobs + assert_equal 3, batch.pending_jobs + assert_equal 0, batch.completed_jobs + assert_equal 0, batch.failed_jobs + assert_equal "pending", batch.status + end + end + + test "creates batch with callbacks" do + jobs = @job_args.map { |args| TestJob.new(**args) } + + batch = SolidQueue::Batch.enqueue( + jobs, + on_complete: { job: CallbackJob, args: { type: "complete" } }, + on_success: { job: CallbackJob, args: { type: "success" } }, + on_failure: { job: CallbackJob, args: { type: "failure" } } + ) + + assert_equal "SolidQueue::BatchTest::CallbackJob", batch.on_complete_job_class + assert_equal({ "type" => "complete" }, batch.on_complete_job_args) + assert_equal "SolidQueue::BatchTest::CallbackJob", batch.on_success_job_class + assert_equal({ "type" => "success" }, batch.on_success_job_args) + assert_equal "SolidQueue::BatchTest::CallbackJob", batch.on_failure_job_class + assert_equal({ "type" => "failure" }, batch.on_failure_job_args) + end + + test "creates batch with metadata" do + jobs = @job_args.map { |args| TestJob.new(**args) } + + batch = SolidQueue::Batch.enqueue( + jobs, + metadata: { source: "test", priority: "high", user_id: 123 } + ) + + assert_equal "test", batch.metadata["source"] + assert_equal "high", batch.metadata["priority"] + assert_equal 123, batch.metadata["user_id"] + end + + test "adds jobs to existing batch" do + jobs = @job_args.first(2).map { |args| TestJob.new(**args) } + batch = SolidQueue::Batch.enqueue(jobs) + + assert_equal 2, batch.total_jobs + + additional_job = TestJob.new(value: 4) + assert_difference -> { SolidQueue::Job.count } => 1 do + added_count = batch.add_jobs([ additional_job ]) + assert_equal 1, added_count + end + + batch.reload + assert_equal 3, batch.total_jobs + assert_equal 3, batch.pending_jobs + end + + test "does not add jobs to finished batch" do + batch = SolidQueue::Batch.create!( + status: "completed", + completed_at: Time.current, + total_jobs: 1, + completed_jobs: 1 + ) + + additional_job = TestJob.new(value: 4) + assert_no_difference -> { SolidQueue::Job.count } do + added_count = batch.add_jobs([ additional_job ]) + assert_equal 0, added_count + end + end + + test "tracks job completion" do + jobs = @job_args.map { |args| TestJob.new(**args) } + batch = SolidQueue::Batch.enqueue(jobs) + + job = batch.jobs.first + job.finished! + + batch.job_finished!(job) + batch.reload + + assert_equal 2, batch.pending_jobs + assert_equal 1, batch.completed_jobs + assert_equal 0, batch.failed_jobs + assert_equal "processing", batch.status + end + + test "tracks job failure" do + jobs = @job_args.map { |args| TestJob.new(**args) } + batch = SolidQueue::Batch.enqueue(jobs) + + job = batch.jobs.first + SolidQueue::FailedExecution.create!(job: job, error: "Test error") + job.finished! + + batch.job_finished!(job) + batch.reload + + assert_equal 2, batch.pending_jobs + assert_equal 0, batch.completed_jobs + assert_equal 1, batch.failed_jobs + assert_equal "processing", batch.status + end + + test "completes batch when all jobs succeed" do + jobs = @job_args.map { |args| TestJob.new(**args) } + batch = SolidQueue::Batch.enqueue( + jobs, + on_complete: { job: CallbackJob }, + on_success: { job: CallbackJob } + ) + + # Simulate all jobs completing successfully + assert_difference -> { SolidQueue::Job.count } => 2 do # 2 callback jobs + batch.jobs.each do |job| + job.finished! + batch.job_finished!(job) + end + end + + batch.reload + assert_equal "completed", batch.status + assert_not_nil batch.completed_at + assert_equal 0, batch.pending_jobs + assert_equal 3, batch.completed_jobs + assert_equal 0, batch.failed_jobs + + # Check callbacks were enqueued + callback_jobs = SolidQueue::Job.where(class_name: "SolidQueue::BatchTest::CallbackJob") + assert_equal 2, callback_jobs.count # on_complete and on_success + end + + test "fails batch when any job fails" do + jobs = @job_args.map { |args| TestJob.new(**args) } + batch = SolidQueue::Batch.enqueue( + jobs, + on_complete: { job: CallbackJob }, + on_failure: { job: CallbackJob } + ) + + # Complete 2 jobs successfully, fail 1 + assert_difference -> { SolidQueue::Job.count } => 2 do # 2 callback jobs + batch.jobs.first(2).each do |job| + job.finished! + batch.job_finished!(job) + end + + failed_job = batch.jobs.last + SolidQueue::FailedExecution.create!(job: failed_job, error: "Test error") + failed_job.finished! + batch.job_finished!(failed_job) + end + + batch.reload + assert_equal "failed", batch.status + assert_not_nil batch.completed_at + assert_equal 0, batch.pending_jobs + assert_equal 2, batch.completed_jobs + assert_equal 1, batch.failed_jobs + + # Check callbacks were enqueued + callback_jobs = SolidQueue::Job.where(class_name: "SolidQueue::BatchTest::CallbackJob") + assert_equal 2, callback_jobs.count # on_complete and on_failure + end + + test "calculates progress percentage" do + jobs = @job_args.map { |args| TestJob.new(**args) } + batch = SolidQueue::Batch.enqueue(jobs) + + assert_equal 0.0, batch.progress_percentage + + # Complete one job + job = batch.jobs.first + job.finished! + batch.job_finished!(job) + + batch.reload + assert_in_delta 33.33, batch.progress_percentage, 0.01 + + # Complete remaining jobs + batch.jobs.where.not(id: job.id).each do |j| + j.finished! + batch.job_finished!(j) + end + + batch.reload + assert_equal 100.0, batch.progress_percentage + end + + test "batch scopes" do + pending_batch = SolidQueue::Batch.create!(status: "pending") + processing_batch = SolidQueue::Batch.create!(status: "processing") + completed_batch = SolidQueue::Batch.create!(status: "completed", completed_at: Time.current) + failed_batch = SolidQueue::Batch.create!(status: "failed", completed_at: Time.current) + + assert_includes SolidQueue::Batch.pending, pending_batch + assert_includes SolidQueue::Batch.processing, processing_batch + assert_includes SolidQueue::Batch.completed, completed_batch + assert_includes SolidQueue::Batch.failed, failed_batch + + assert_includes SolidQueue::Batch.finished, completed_batch + assert_includes SolidQueue::Batch.finished, failed_batch + assert_not_includes SolidQueue::Batch.finished, pending_batch + assert_not_includes SolidQueue::Batch.finished, processing_batch + + assert_includes SolidQueue::Batch.unfinished, pending_batch + assert_includes SolidQueue::Batch.unfinished, processing_batch + assert_not_includes SolidQueue::Batch.unfinished, completed_batch + assert_not_includes SolidQueue::Batch.unfinished, failed_batch + end + + test "batch relationships" do + batch = SolidQueue::Batch.create! + job1 = SolidQueue::Job.create!( + queue_name: "default", + class_name: "TestJob", + batch_id: batch.batch_id + ) + job2 = SolidQueue::Job.create!( + queue_name: "default", + class_name: "TestJob", + batch_id: batch.batch_id + ) + + assert_equal 2, batch.jobs.count + assert_includes batch.jobs, job1 + assert_includes batch.jobs, job2 + assert_equal batch.batch_id, job1.batch_id + assert_equal batch.batch_id, job2.batch_id + end + + test "perform_batch_later creates batch" do + assert_difference -> { SolidQueue::Batch.count } => 1, -> { SolidQueue::Job.count } => 3 do + batch = TestJob.perform_batch_later(@job_args) + assert_kind_of SolidQueue::Batch, batch + assert_equal 3, batch.total_jobs + end + end + + test "perform_batch_at creates scheduled batch" do + scheduled_time = 1.hour.from_now + + assert_difference -> { SolidQueue::Batch.count } => 1, -> { SolidQueue::Job.count } => 3 do + batch = TestJob.perform_batch_at(scheduled_time, @job_args) + assert_kind_of SolidQueue::Batch, batch + assert_equal 3, batch.total_jobs + + batch.jobs.each do |job| + assert_in_delta scheduled_time.to_f, job.scheduled_at.to_f, 1.0 + end + end + end + + test "empty batch creation" do + assert_no_difference -> { SolidQueue::Batch.count } do + result = SolidQueue::Batch.enqueue([]) + assert_equal 0, result + end + end + + test "batch with mixed argument types" do + # Test with both hash and array arguments + mixed_args = [ + { value: 1 }, + [ 2 ], + { value: 3, extra: "data" } + ] + + jobs = [ + TestJob.new(value: 1), + TestJob.new(2), + TestJob.new(value: 3, extra: "data") + ] + + assert_difference -> { SolidQueue::Job.count } => 3 do + batch = SolidQueue::Batch.enqueue(jobs) + assert_equal 3, batch.total_jobs + end + end +end