From e1c7884e89fa9fba7e75be732bbea4a1ec8ca0d3 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Fri, 3 Nov 2023 21:44:59 +0100 Subject: [PATCH 01/32] Implement a very basic concurrency control API for ActiveJob For now this just allows us to specify a Proc to build the key and a limit for the concurrent execution. --- lib/active_job/concurrency_controls.rb | 32 ++++++++++++++++++++++++ lib/solid_queue.rb | 1 + test/dummy/app/jobs/update_result_job.rb | 7 ++++++ test/unit/concurrency_controls_test.rb | 17 +++++++++++++ 4 files changed, 57 insertions(+) create mode 100644 lib/active_job/concurrency_controls.rb create mode 100644 test/dummy/app/jobs/update_result_job.rb create mode 100644 test/unit/concurrency_controls_test.rb diff --git a/lib/active_job/concurrency_controls.rb b/lib/active_job/concurrency_controls.rb new file mode 100644 index 00000000..edb9785e --- /dev/null +++ b/lib/active_job/concurrency_controls.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +module ActiveJob + module ConcurrencyControls + extend ActiveSupport::Concern + + DEFAULT_CONCURRENCY_KEY = ->(*) { self.name } + + included do + class_attribute :concurrency_limit, default: 1 + class_attribute :concurrency_key, default: DEFAULT_CONCURRENCY_KEY, instance_accessor: false + end + + class_methods do + def limit_concurrency(limit: 1, key: DEFAULT_CONCURRENCY_KEY) + self.concurrency_limit = limit + self.concurrency_key = key + end + end + + def concurrency_key + param = self.class.concurrency_key.call(*arguments) + + case param + when ActiveRecord::Base + [ self.class.name, param.class.name, param.id ] + else + [ self.class.name, param ] + end.compact.join("/") + end + end +end diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index eb36e274..ed077fa8 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -2,6 +2,7 @@ require "solid_queue/engine" require "active_job/queue_adapters/solid_queue_adapter" +require "active_job/concurrency_controls" require "solid_queue/app_executor" require "solid_queue/interruptible" diff --git a/test/dummy/app/jobs/update_result_job.rb b/test/dummy/app/jobs/update_result_job.rb new file mode 100644 index 00000000..c2a4459f --- /dev/null +++ b/test/dummy/app/jobs/update_result_job.rb @@ -0,0 +1,7 @@ +class UpdateResultJob < ApplicationJob + def perform(job_result, name:, pause: 0.1) + job_result.update!(status: "started_#{name}") + sleep(pause) + job_result.update!(status: "completed_#{name}") + end +end diff --git a/test/unit/concurrency_controls_test.rb b/test/unit/concurrency_controls_test.rb new file mode 100644 index 00000000..a4b4f04b --- /dev/null +++ b/test/unit/concurrency_controls_test.rb @@ -0,0 +1,17 @@ +require "test_helper" + +class ConcurrencyControlsTest < ActiveSupport::TestCase + class NonOverlappingJob < UpdateResultJob + include ActiveJob::ConcurrencyControls + + limit_concurrency limit: 1, key: ->(job_result, **) { job_result } + end + + test "enqueue jobs with concurrency controls" do + @result = JobResult.create!(queue_name: "default") + + job = NonOverlappingJob.perform_later(@result, name: "A") + assert_equal 1, job.concurrency_limit + assert_equal "ConcurrencyControlsTest::NonOverlappingJob/JobResult/#{@result.id}", job.concurrency_key + end +end From f405340246eedd3672cadc689dfabccc03076ed9 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Sun, 5 Nov 2023 12:44:01 +0100 Subject: [PATCH 02/32] Implement basic concurrency control on enqueuing using semaphores At the time of enqueuing a job and creating a ready execution for it, we check if the job has concurrency control settings. If it has, we use a semaphore to determine whether the job can go to the ready_executions table or whether it needs to wait in the new blocked_executions table. The semaphore is created for the job concurrency key and has just a value that gets decremented avoiding race conditions, only if it's still > 0. This needs to be paired with jobs releasing/signaling the semaphore when they finish, and the worker doing a pass over blocked executions in addition to polling. --- app/models/solid_queue/blocked_execution.rb | 10 ++ app/models/solid_queue/job.rb | 4 +- .../solid_queue/job/concurrency_controls.rb | 26 +++++ app/models/solid_queue/job/executable.rb | 109 +++++++++++------- app/models/solid_queue/semaphore.rb | 20 ++++ ...create_solid_queue_concurrency_controls.rb | 26 +++++ test/dummy/db/schema.rb | 21 ++++ test/unit/concurrency_controls_test.rb | 35 +++++- 8 files changed, 202 insertions(+), 49 deletions(-) create mode 100644 app/models/solid_queue/blocked_execution.rb create mode 100644 app/models/solid_queue/job/concurrency_controls.rb create mode 100644 app/models/solid_queue/semaphore.rb create mode 100644 db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb new file mode 100644 index 00000000..1dff8bd7 --- /dev/null +++ b/app/models/solid_queue/blocked_execution.rb @@ -0,0 +1,10 @@ +class SolidQueue::BlockedExecution < SolidQueue::Execution + before_create :assume_attributes_from_job + + private + def assume_attributes_from_job + super + self.concurrency_limit ||= job.concurrency_limit + self.concurrency_key ||= job.concurrency_key + end +end diff --git a/app/models/solid_queue/job.rb b/app/models/solid_queue/job.rb index e1d889c5..ac22c126 100644 --- a/app/models/solid_queue/job.rb +++ b/app/models/solid_queue/job.rb @@ -18,7 +18,9 @@ def enqueue_active_job(active_job, scheduled_at: Time.current) priority: active_job.priority, scheduled_at: scheduled_at, class_name: active_job.class.name, - arguments: active_job.serialize + arguments: active_job.serialize, + concurrency_limit: active_job.try(:concurrency_limit), + concurrency_key: active_job.try(:concurrency_key) end def enqueue(**kwargs) diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb new file mode 100644 index 00000000..e8b191ed --- /dev/null +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -0,0 +1,26 @@ +module SolidQueue + class Job + module ConcurrencyControls + extend ActiveSupport::Concern + + included do + has_one :blocked_execution, dependent: :destroy + end + + private + def acquire_concurrency_lock + return true unless concurrency_limited? + + Semaphore.wait_for(concurrency_key, concurrency_limit) + end + + def block + BlockedExecution.create_or_find_by!(job_id: id) + end + + def concurrency_limited? + concurrency_limit.to_i > 0 && concurrency_key.present? + end + end + end +end diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index 27f17463..de4552cf 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -1,64 +1,85 @@ module SolidQueue - module Job::Executable - extend ActiveSupport::Concern + class Job + module Executable + extend ActiveSupport::Concern - included do - has_one :ready_execution, dependent: :destroy - has_one :claimed_execution, dependent: :destroy - has_one :failed_execution, dependent: :destroy + included do + include ConcurrencyControls - has_one :scheduled_execution, dependent: :destroy + has_one :ready_execution, dependent: :destroy + has_one :claimed_execution, dependent: :destroy + has_one :failed_execution, dependent: :destroy - after_create :prepare_for_execution + has_one :scheduled_execution, dependent: :destroy - scope :finished, -> { where.not(finished_at: nil) } - end - - STATUSES = %w[ ready claimed failed scheduled ] + after_create :prepare_for_execution - STATUSES.each do |status| - define_method("#{status}?") { public_send("#{status}_execution").present? } - end + scope :finished, -> { where.not(finished_at: nil) } + end - def prepare_for_execution - if due? - ReadyExecution.create_or_find_by!(job_id: id) - else - ScheduledExecution.create_or_find_by!(job_id: id) + %w[ ready claimed failed scheduled ].each do |status| + define_method("#{status}?") { public_send("#{status}_execution").present? } end - end - def finished! - if delete_finished_jobs? - destroy! - else - touch(:finished_at) + def prepare_for_execution + if due? then dispatch + else + schedule + end end - end - def finished? - finished_at.present? - end + def finished! + if delete_finished_jobs? + destroy! + else + touch(:finished_at) + end + end - def failed_with(exception) - FailedExecution.create_or_find_by!(job_id: id, exception: exception) - end + def finished? + finished_at.present? + end - def discard - destroy unless claimed? - end + def failed_with(exception) + FailedExecution.create_or_find_by!(job_id: id, exception: exception) + end - def retry - failed_execution&.retry - end + def discard + destroy unless claimed? + end - private - def due? - scheduled_at.nil? || scheduled_at <= Time.current + def retry + failed_execution&.retry end - def delete_finished_jobs? - SolidQueue.delete_finished_jobs + def failed_with(exception) + FailedExecution.create_or_find_by!(job_id: id, exception: exception) end + + private + def due? + scheduled_at.nil? || scheduled_at <= Time.current + end + + def dispatch + if acquire_concurrency_lock then ready + else + block + end + end + + def schedule + ScheduledExecution.create_or_find_by!(job_id: id) + end + + def ready + ReadyExecution.create_or_find_by!(job_id: id) + end + + + def delete_finished_jobs? + SolidQueue.delete_finished_jobs + end + end end end diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb new file mode 100644 index 00000000..8a6f24c6 --- /dev/null +++ b/app/models/solid_queue/semaphore.rb @@ -0,0 +1,20 @@ +class SolidQueue::Semaphore < SolidQueue::Record + def self.wait_for(identifier, limit) + if semaphore = find_by(identifier: identifier) + semaphore.value > 0 && attempt_to_update(identifier) + else + attempt_to_create(identifier, limit) + end + end + + def self.attempt_to_create(identifier, limit) + create!(identifier: identifier, value: limit - 1) + true + rescue ActiveRecord::RecordNotUnique + attempt_to_update(identifier) + end + + def self.attempt_to_update(identifier) + where(identifier: identifier).where("value > 0").update_all("value = COALESCE(value, 1) - 1") > 0 + end +end diff --git a/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb b/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb new file mode 100644 index 00000000..c1b9ec28 --- /dev/null +++ b/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb @@ -0,0 +1,26 @@ +class CreateSolidQueueConcurrencyControls < ActiveRecord::Migration[7.1] + def change + change_table :solid_queue_jobs do |t| + t.integer :concurrency_limit + t.string :concurrency_key + end + + create_table :solid_queue_blocked_executions do |t| + t.references :job, index: { unique: true } + t.string :queue_name, null: false + t.integer :priority, default: 0, null: false + + t.integer :concurrency_limit, null: false + t.string :concurrency_key, null: false, index: true + + t.datetime :created_at, null: false + end + + create_table :solid_queue_semaphores do |t| + t.string :identifier, null: false, index: { unique: true } + t.integer :value, null: false, default: 1 + + t.timestamps + end + end +end diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index eb2d08d7..d319a7ad 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -19,6 +19,17 @@ t.datetime "updated_at", null: false end + create_table "solid_queue_blocked_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id" + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.integer "concurrency_limit", null: false + t.string "concurrency_key", null: false + t.datetime "created_at", null: false + t.index ["concurrency_key"], name: "index_solid_queue_blocked_executions_on_concurrency_key" + t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true + end + create_table "solid_queue_claimed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.bigint "job_id", null: false t.bigint "process_id" @@ -44,6 +55,8 @@ t.datetime "finished_at" t.datetime "created_at", null: false t.datetime "updated_at", null: false + t.integer "concurrency_limit" + t.string "concurrency_key" t.index ["active_job_id"], name: "index_solid_queue_jobs_on_job_id" t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name" t.index ["queue_name", "scheduled_at", "finished_at"], name: "index_solid_queue_jobs_for_alerting" @@ -84,4 +97,12 @@ t.index ["scheduled_at", "priority"], name: "index_solid_queue_scheduled_executions" end + create_table "solid_queue_semaphores", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "identifier", null: false + t.integer "value", default: 1, null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["identifier"], name: "index_solid_queue_semaphores_on_identifier", unique: true + end + end diff --git a/test/unit/concurrency_controls_test.rb b/test/unit/concurrency_controls_test.rb index a4b4f04b..3138f001 100644 --- a/test/unit/concurrency_controls_test.rb +++ b/test/unit/concurrency_controls_test.rb @@ -7,11 +7,38 @@ class NonOverlappingJob < UpdateResultJob limit_concurrency limit: 1, key: ->(job_result, **) { job_result } end - test "enqueue jobs with concurrency controls" do + setup do @result = JobResult.create!(queue_name: "default") + end - job = NonOverlappingJob.perform_later(@result, name: "A") - assert_equal 1, job.concurrency_limit - assert_equal "ConcurrencyControlsTest::NonOverlappingJob/JobResult/#{@result.id}", job.concurrency_key + test "enqueue jobs with concurrency controls" do + active_job = NonOverlappingJob.perform_later(@result, name: "A") + assert_equal 1, active_job.concurrency_limit + assert_equal "ConcurrencyControlsTest::NonOverlappingJob/JobResult/#{@result.id}", active_job.concurrency_key + + job = SolidQueue::Job.last + assert_equal active_job.concurrency_limit, job.concurrency_limit + assert_equal active_job.concurrency_key, job.concurrency_key end + + test "blocks jobs when concurrency limits are reached" do + assert_ready do + NonOverlappingJob.perform_later(@result, name: "A") + end + + assert_blocked do + NonOverlappingJob.perform_later(@result, name: "B") + end + end + + private + def assert_ready(&block) + assert_difference -> { SolidQueue::ReadyExecution.count }, +1, &block + end + + def assert_blocked(&block) + assert_no_difference -> { SolidQueue::ReadyExecution.count } do + assert_difference -> { SolidQueue::BlockedExecution.count }, +1, &block + end + end end From 4ce223a7fe11df3ee66be74540287cc3b8133ecc Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Sun, 5 Nov 2023 12:52:56 +0100 Subject: [PATCH 03/32] Move concurrency control unit tests to job model test They fit better there, as I'll write integration concurrency control tests for the whole thing. --- test/models/solid_queue/job_test.rb | 51 +++++++++++++++++++++++++- test/unit/concurrency_controls_test.rb | 44 ---------------------- 2 files changed, 49 insertions(+), 46 deletions(-) delete mode 100644 test/unit/concurrency_controls_test.rb diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index 14c69139..d834ac7a 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -1,10 +1,16 @@ require "test_helper" class SolidQueue::JobTest < ActiveSupport::TestCase + class NonOverlappingJob < UpdateResultJob + include ActiveJob::ConcurrencyControls + + limit_concurrency limit: 1, key: ->(job_result, **) { job_result } + end + test "enqueue active job to be executed right away" do active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test") - assert_difference -> { SolidQueue::Job.count } => +1, -> { SolidQueue::ReadyExecution.count } => +1 do + assert_ready do SolidQueue::Job.enqueue_active_job(active_job) end @@ -24,7 +30,7 @@ class SolidQueue::JobTest < ActiveSupport::TestCase test "enqueue active job to be scheduled in the future" do active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test") - assert_difference -> { SolidQueue::Job.count } => +1, -> { SolidQueue::ScheduledExecution.count } => +1 do + assert_scheduled do SolidQueue::Job.enqueue_active_job(active_job, scheduled_at: 5.minutes.from_now) end @@ -41,4 +47,45 @@ class SolidQueue::JobTest < ActiveSupport::TestCase assert_equal 8, execution.priority assert Time.now < execution.scheduled_at end + + setup do + @result = JobResult.create!(queue_name: "default") + end + + test "enqueue jobs with concurrency controls" do + active_job = NonOverlappingJob.perform_later(@result, name: "A") + assert_equal 1, active_job.concurrency_limit + assert_equal "SolidQueue::JobTest::NonOverlappingJob/JobResult/#{@result.id}", active_job.concurrency_key + + job = SolidQueue::Job.last + assert_equal active_job.concurrency_limit, job.concurrency_limit + assert_equal active_job.concurrency_key, job.concurrency_key + end + + test "block jobs when concurrency limits are reached" do + assert_ready do + NonOverlappingJob.perform_later(@result, name: "A") + end + + assert_blocked do + NonOverlappingJob.perform_later(@result, name: "B") + end + end + + private + def assert_ready(&block) + assert_difference -> { SolidQueue::Job.count } => +1, -> { SolidQueue::ReadyExecution.count } => +1, &block + end + + def assert_scheduled(&block) + assert_no_difference -> { SolidQueue::ReadyExecution.count } do + assert_difference -> { SolidQueue::Job.count } => +1, -> { SolidQueue::ScheduledExecution.count } => +1, &block + end + end + + def assert_blocked(&block) + assert_no_difference -> { SolidQueue::ReadyExecution.count } do + assert_difference -> { SolidQueue::Job.count } => +1, -> { SolidQueue::BlockedExecution.count } => +1, &block + end + end end diff --git a/test/unit/concurrency_controls_test.rb b/test/unit/concurrency_controls_test.rb deleted file mode 100644 index 3138f001..00000000 --- a/test/unit/concurrency_controls_test.rb +++ /dev/null @@ -1,44 +0,0 @@ -require "test_helper" - -class ConcurrencyControlsTest < ActiveSupport::TestCase - class NonOverlappingJob < UpdateResultJob - include ActiveJob::ConcurrencyControls - - limit_concurrency limit: 1, key: ->(job_result, **) { job_result } - end - - setup do - @result = JobResult.create!(queue_name: "default") - end - - test "enqueue jobs with concurrency controls" do - active_job = NonOverlappingJob.perform_later(@result, name: "A") - assert_equal 1, active_job.concurrency_limit - assert_equal "ConcurrencyControlsTest::NonOverlappingJob/JobResult/#{@result.id}", active_job.concurrency_key - - job = SolidQueue::Job.last - assert_equal active_job.concurrency_limit, job.concurrency_limit - assert_equal active_job.concurrency_key, job.concurrency_key - end - - test "blocks jobs when concurrency limits are reached" do - assert_ready do - NonOverlappingJob.perform_later(@result, name: "A") - end - - assert_blocked do - NonOverlappingJob.perform_later(@result, name: "B") - end - end - - private - def assert_ready(&block) - assert_difference -> { SolidQueue::ReadyExecution.count }, +1, &block - end - - def assert_blocked(&block) - assert_no_difference -> { SolidQueue::ReadyExecution.count } do - assert_difference -> { SolidQueue::BlockedExecution.count }, +1, &block - end - end -end From 6050088d33e8a04b044417c0edaa8748a0a3899a Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Sun, 5 Nov 2023 13:12:58 +0100 Subject: [PATCH 04/32] Refactor slightly how executions assume attributes from job Just to test how it feels to do it in this way. --- app/models/solid_queue/blocked_execution.rb | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index 1dff8bd7..8b0928bc 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -1,10 +1,3 @@ class SolidQueue::BlockedExecution < SolidQueue::Execution - before_create :assume_attributes_from_job - - private - def assume_attributes_from_job - super - self.concurrency_limit ||= job.concurrency_limit - self.concurrency_key ||= job.concurrency_key - end + assume_attributes_from_job :concurrency_limit, :concurrency_key end From 5c1427761ee3b553447c6a3f9f51206d9a728722 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Sun, 5 Nov 2023 22:54:38 +0100 Subject: [PATCH 05/32] Dispatch blocked executions when a concurrency-limited job finishes Either successfully or failing, and also release the semaphore. --- app/models/solid_queue/blocked_execution.rb | 11 +++++ .../solid_queue/job/concurrency_controls.rb | 11 +++++ app/models/solid_queue/job/executable.rb | 8 ++++ app/models/solid_queue/semaphore.rb | 34 ++++++++------- lib/active_job/concurrency_controls.rb | 2 +- test/dummy/app/jobs/update_result_job.rb | 16 +++++-- test/integration/concurrency_controls_test.rb | 42 +++++++++++++++++++ test/integration/jobs_lifecycle_test.rb | 2 +- test/integration/processes_lifecycle_test.rb | 13 ++---- test/models/solid_queue/job_test.rb | 13 +++--- 10 files changed, 118 insertions(+), 34 deletions(-) create mode 100644 test/integration/concurrency_controls_test.rb diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index 8b0928bc..7282ab6d 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -1,3 +1,14 @@ class SolidQueue::BlockedExecution < SolidQueue::Execution assume_attributes_from_job :concurrency_limit, :concurrency_key + + def self.release(concurrency_key) + where(concurrency_key: concurrency_key).order(:priority).first&.release + end + + def release + transaction do + job.prepare_for_execution + destroy! + end + end end diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index e8b191ed..8bd4af1d 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -14,13 +14,24 @@ def acquire_concurrency_lock Semaphore.wait_for(concurrency_key, concurrency_limit) end + def release_concurrency_lock + return false unless concurrency_limited? + + Semaphore.release(concurrency_key) + end + def block BlockedExecution.create_or_find_by!(job_id: id) end + def release_next_blocked_job + BlockedExecution.release(concurrency_key) + end + def concurrency_limited? concurrency_limit.to_i > 0 && concurrency_key.present? end + end end end diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index de4552cf..a9118584 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -34,6 +34,7 @@ def finished! else touch(:finished_at) end + dispatch_blocked_jobs end def finished? @@ -42,6 +43,7 @@ def finished? def failed_with(exception) FailedExecution.create_or_find_by!(job_id: id, exception: exception) + dispatch_blocked_jobs end def discard @@ -68,6 +70,12 @@ def dispatch end end + def dispatch_blocked_jobs + if release_concurrency_lock + release_next_blocked_job + end + end + def schedule ScheduledExecution.create_or_find_by!(job_id: id) end diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index 8a6f24c6..d969cd64 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -1,20 +1,26 @@ class SolidQueue::Semaphore < SolidQueue::Record - def self.wait_for(identifier, limit) - if semaphore = find_by(identifier: identifier) - semaphore.value > 0 && attempt_to_update(identifier) - else - attempt_to_create(identifier, limit) + class << self + def wait_for(identifier, limit) + if semaphore = find_by(identifier: identifier) + semaphore.value > 0 && attempt_to_update(identifier) + else + attempt_to_create(identifier, limit) + end end - end - def self.attempt_to_create(identifier, limit) - create!(identifier: identifier, value: limit - 1) - true - rescue ActiveRecord::RecordNotUnique - attempt_to_update(identifier) - end + def attempt_to_create(identifier, limit) + create!(identifier: identifier, value: limit - 1) + true + rescue ActiveRecord::RecordNotUnique + attempt_to_update(identifier) + end - def self.attempt_to_update(identifier) - where(identifier: identifier).where("value > 0").update_all("value = COALESCE(value, 1) - 1") > 0 + def attempt_to_update(identifier) + where(identifier: identifier).where("value > 0").update_all("value = COALESCE(value, 1) - 1") > 0 + end + + def release(identifier) + where(identifier: identifier).update_all("value = COALESCE(value, 0) + 1") > 0 + end end end diff --git a/lib/active_job/concurrency_controls.rb b/lib/active_job/concurrency_controls.rb index edb9785e..b73d6f17 100644 --- a/lib/active_job/concurrency_controls.rb +++ b/lib/active_job/concurrency_controls.rb @@ -7,7 +7,7 @@ module ConcurrencyControls DEFAULT_CONCURRENCY_KEY = ->(*) { self.name } included do - class_attribute :concurrency_limit, default: 1 + class_attribute :concurrency_limit, default: 0 # No limit class_attribute :concurrency_key, default: DEFAULT_CONCURRENCY_KEY, instance_accessor: false end diff --git a/test/dummy/app/jobs/update_result_job.rb b/test/dummy/app/jobs/update_result_job.rb index c2a4459f..055d5c2a 100644 --- a/test/dummy/app/jobs/update_result_job.rb +++ b/test/dummy/app/jobs/update_result_job.rb @@ -1,7 +1,15 @@ class UpdateResultJob < ApplicationJob - def perform(job_result, name:, pause: 0.1) - job_result.update!(status: "started_#{name}") - sleep(pause) - job_result.update!(status: "completed_#{name}") + include ActiveJob::ConcurrencyControls + + limit_concurrency limit: 1, key: ->(job_result, **) { job_result } + + def perform(job_result, name:, pause: nil) + job_result.status += "s#{name}" + job_result.save! + + sleep(pause) if pause + + job_result.status += "c#{name}" + job_result.save! end end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb new file mode 100644 index 00000000..06519ae1 --- /dev/null +++ b/test/integration/concurrency_controls_test.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true +require "test_helper" + +class ConcurrencyControlsTest < ActiveSupport::TestCase + self.use_transactional_tests = false + + setup do + SolidQueue::Job.delete_all + + @result = JobResult.create!(queue_name: "default", status: "seq: ") + + default_worker = { queues: "default", polling_interval: 1, processes: 3 } + @pid = run_supervisor_as_fork(load_configuration_from: { workers: [ default_worker ] }) + + wait_for_registered_processes(4, timeout: 0.2.second) # 3 workers working the default queue + supervisor + end + + teardown do + terminate_process(@pid) if process_exists?(@pid) + end + + test "run several conflicting jobs and prevent overlapping" do + ("A".."F").each do |name| + UpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds) + end + + ("G".."K").each do |name| + UpdateResultJob.perform_later(@result, name: name) + end + + wait_for_jobs_to_finish_for(4.seconds) + assert_stored_sequence @result, ("A".."K").to_a + end + + private + def assert_stored_sequence(result, sequence) + expected = "seq: " + sequence.map { |name| "s#{name}c#{name}"}.join + skip_active_record_query_cache do + assert_equal expected, result.reload.status + end + end +end diff --git a/test/integration/jobs_lifecycle_test.rb b/test/integration/jobs_lifecycle_test.rb index 6cf9cbdb..a5be505e 100644 --- a/test/integration/jobs_lifecycle_test.rb +++ b/test/integration/jobs_lifecycle_test.rb @@ -4,7 +4,7 @@ class JobsLifecycleTest < ActiveSupport::TestCase setup do @worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 0.5) - @scheduler = SolidQueue::Scheduler.new(batch_size: 10, polling_interval: 0.5) + @scheduler = SolidQueue::Scheduler.new(batch_size: 10, polling_interval: 1) end teardown do diff --git a/test/integration/processes_lifecycle_test.rb b/test/integration/processes_lifecycle_test.rb index 1a436025..d3f5dbc9 100644 --- a/test/integration/processes_lifecycle_test.rb +++ b/test/integration/processes_lifecycle_test.rb @@ -13,7 +13,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase end teardown do - terminate_supervisor if process_exists?(@pid) + terminate_process(@pid) if process_exists?(@pid) end test "enqueue jobs in multiple queues" do @@ -26,7 +26,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase 6.times { |i| assert_completed_job_results("job_#{i}", :background) } 6.times { |i| assert_completed_job_results("job_#{i}", :default) } - terminate_supervisor + terminate_process(@pid) assert_clean_termination end @@ -150,7 +150,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase assert_job_status(job, :failed) end - terminate_supervisor + terminate_process(@pid) assert_clean_termination end @@ -175,17 +175,12 @@ class ProcessLifecycleTest < ActiveSupport::TestCase end assert process_exists?(@pid) - - terminate_supervisor + terminate_process(@pid) assert_clean_termination end private - def terminate_supervisor - terminate_process(@pid) - end - def terminate_registered_processes skip_active_record_query_cache do SolidQueue::Process.find_each do |process| diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index d834ac7a..1dee0e82 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -1,10 +1,17 @@ require "test_helper" class SolidQueue::JobTest < ActiveSupport::TestCase - class NonOverlappingJob < UpdateResultJob + class NonOverlappingJob < ApplicationJob include ActiveJob::ConcurrencyControls limit_concurrency limit: 1, key: ->(job_result, **) { job_result } + + def perform(job_result) + end + end + + setup do + @result = JobResult.create!(queue_name: "default") end test "enqueue active job to be executed right away" do @@ -48,10 +55,6 @@ class NonOverlappingJob < UpdateResultJob assert Time.now < execution.scheduled_at end - setup do - @result = JobResult.create!(queue_name: "default") - end - test "enqueue jobs with concurrency controls" do active_job = NonOverlappingJob.perform_later(@result, name: "A") assert_equal 1, active_job.concurrency_limit From e217ae08a9cc4b3dea75c90ac8837a75819af8f0 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 6 Nov 2023 10:24:19 +0100 Subject: [PATCH 06/32] Test concurrency limited to more than 1 job of a kind --- .../app/jobs/sequential_update_result_job.rb | 5 +++ .../app/jobs/throttled_update_result_job.rb | 5 +++ test/dummy/app/jobs/update_result_job.rb | 2 - test/integration/concurrency_controls_test.rb | 40 +++++++++++++++++-- 4 files changed, 46 insertions(+), 6 deletions(-) create mode 100644 test/dummy/app/jobs/sequential_update_result_job.rb create mode 100644 test/dummy/app/jobs/throttled_update_result_job.rb diff --git a/test/dummy/app/jobs/sequential_update_result_job.rb b/test/dummy/app/jobs/sequential_update_result_job.rb new file mode 100644 index 00000000..279fafab --- /dev/null +++ b/test/dummy/app/jobs/sequential_update_result_job.rb @@ -0,0 +1,5 @@ +class SequentialUpdateResultJob < UpdateResultJob + include ActiveJob::ConcurrencyControls + + limit_concurrency limit: 1, key: ->(job_result, **) { job_result } +end diff --git a/test/dummy/app/jobs/throttled_update_result_job.rb b/test/dummy/app/jobs/throttled_update_result_job.rb new file mode 100644 index 00000000..8db632db --- /dev/null +++ b/test/dummy/app/jobs/throttled_update_result_job.rb @@ -0,0 +1,5 @@ +class ThrottledUpdateResultJob < UpdateResultJob + include ActiveJob::ConcurrencyControls + + limit_concurrency limit: 3, key: ->(job_result, **) { job_result } +end diff --git a/test/dummy/app/jobs/update_result_job.rb b/test/dummy/app/jobs/update_result_job.rb index 055d5c2a..f43d2d87 100644 --- a/test/dummy/app/jobs/update_result_job.rb +++ b/test/dummy/app/jobs/update_result_job.rb @@ -1,8 +1,6 @@ class UpdateResultJob < ApplicationJob include ActiveJob::ConcurrencyControls - limit_concurrency limit: 1, key: ->(job_result, **) { job_result } - def perform(job_result, name:, pause: nil) job_result.status += "s#{name}" job_result.save! diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 06519ae1..20d2e105 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -9,7 +9,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase @result = JobResult.create!(queue_name: "default", status: "seq: ") - default_worker = { queues: "default", polling_interval: 1, processes: 3 } + default_worker = { queues: "default", polling_interval: 1, processes: 3, threads: 2 } @pid = run_supervisor_as_fork(load_configuration_from: { workers: [ default_worker ] }) wait_for_registered_processes(4, timeout: 0.2.second) # 3 workers working the default queue + supervisor @@ -19,19 +19,51 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase terminate_process(@pid) if process_exists?(@pid) end - test "run several conflicting jobs and prevent overlapping" do + test "run several conflicting jobs over the same record sequentially" do ("A".."F").each do |name| - UpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds) + SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds) end ("G".."K").each do |name| - UpdateResultJob.perform_later(@result, name: name) + SequentialUpdateResultJob.perform_later(@result, name: name) end wait_for_jobs_to_finish_for(4.seconds) + assert_no_pending_jobs + assert_stored_sequence @result, ("A".."K").to_a end + test "run several jobs over the same record limiting concurrency" do + incr = 0 + # C is the last one to update the record + # A: 0 to 0.5 + # B: 0 to 1.0 + # C: 0 to 1.5 + assert_no_difference -> { SolidQueue::BlockedExecution.count } do + ("A".."C").each do |name| + ThrottledUpdateResultJob.perform_later(@result, name: name, pause: (0.5 + incr).seconds) + incr += 0.5 + end + end + + sleep(0.01) # To ensure these aren't picked up before ABC + # D to H: 0.51 to 0.76 (starting after A finishes, and in order, 5 * 0.05 = 0.25) + # These would finish all before B and C + assert_difference -> { SolidQueue::BlockedExecution.count }, +5 do + ("D".."H").each do |name| + ThrottledUpdateResultJob.perform_later(@result, name: name, pause: 0.05.seconds) + end + end + + wait_for_jobs_to_finish_for(3.seconds) + assert_no_pending_jobs + + # C would have started in the beginning, seeing the status empty, and would finish after + # all other jobs, so it'll do the last update with only itself + assert_stored_sequence(@result, [ "C" ]) + end + private def assert_stored_sequence(result, sequence) expected = "seq: " + sequence.map { |name| "s#{name}c#{name}"}.join From 873760de46a233b6c3b865c33891d0bf1f14410d Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 6 Nov 2023 10:40:34 +0100 Subject: [PATCH 07/32] Dispatch blocked jobs after job perform finishes outside that transaction Otherwise, if a job finishes just fine or fails, and something goes wrong when dispatching blocked jobs, we'll fail to mark the job as finished or failed just because we couldn't dispatch blocked jobs, but not because anything went wrong with the job itself. Also: test sequential jobs when some of them fail. We must continue processing jobs normally. --- app/models/solid_queue/claimed_execution.rb | 2 ++ .../solid_queue/job/concurrency_controls.rb | 6 ++++++ app/models/solid_queue/job/executable.rb | 8 -------- test/dummy/app/jobs/update_result_job.rb | 4 ++-- test/integration/concurrency_controls_test.rb | 16 ++++++++++++++++ 5 files changed, 26 insertions(+), 10 deletions(-) diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index 63d62571..05af5257 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -31,6 +31,8 @@ def perform else failed_with(result.error) end + ensure + job.dispatch_blocked_jobs end def release diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 8bd4af1d..71f126f6 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -7,6 +7,12 @@ module ConcurrencyControls has_one :blocked_execution, dependent: :destroy end + def dispatch_blocked_jobs + if release_concurrency_lock + release_next_blocked_job + end + end + private def acquire_concurrency_lock return true unless concurrency_limited? diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index a9118584..de4552cf 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -34,7 +34,6 @@ def finished! else touch(:finished_at) end - dispatch_blocked_jobs end def finished? @@ -43,7 +42,6 @@ def finished? def failed_with(exception) FailedExecution.create_or_find_by!(job_id: id, exception: exception) - dispatch_blocked_jobs end def discard @@ -70,12 +68,6 @@ def dispatch end end - def dispatch_blocked_jobs - if release_concurrency_lock - release_next_blocked_job - end - end - def schedule ScheduledExecution.create_or_find_by!(job_id: id) end diff --git a/test/dummy/app/jobs/update_result_job.rb b/test/dummy/app/jobs/update_result_job.rb index f43d2d87..3c9de511 100644 --- a/test/dummy/app/jobs/update_result_job.rb +++ b/test/dummy/app/jobs/update_result_job.rb @@ -1,11 +1,11 @@ class UpdateResultJob < ApplicationJob include ActiveJob::ConcurrencyControls - def perform(job_result, name:, pause: nil) + def perform(job_result, name:, pause: nil, exception: nil) job_result.status += "s#{name}" - job_result.save! sleep(pause) if pause + raise exception.new if exception job_result.status += "c#{name}" job_result.save! diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 20d2e105..b3ebee66 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -64,6 +64,22 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert_stored_sequence(@result, [ "C" ]) end + test "run several jobs over the same record sequentially, with some of them failing" do + ("A".."F").each_with_index do |name, i| + # A, C, E will fail, for i= 0, 2, 4 + SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (RuntimeError if i.even?)) + end + + ("G".."K").each do |name| + SequentialUpdateResultJob.perform_later(@result, name: name) + end + + wait_for_jobs_to_finish_for(4.seconds) + assert_equal 3, SolidQueue::FailedExecution.count + + assert_stored_sequence @result, [ "B", "D", "F" ] + ("G".."K").to_a + end + private def assert_stored_sequence(result, sequence) expected = "seq: " + sequence.map { |name| "s#{name}c#{name}"}.join From d0eeed2e695f1aa723a0ca85d24f42bd73dd35a9 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 6 Nov 2023 17:05:46 +0100 Subject: [PATCH 08/32] Avoid race-conditions when releasing blocked executions If two processes try to release blocked executions for the same key, for example, if the concurrency limit is 2, and 2 jobs finish at the same time, both would try to release the first one, and one of them would fail to do that. To avoid that, select the first one but lock the record, and have the SELECT ... FOR UPDATE to use SKIP LOCKED so we don't have to wait on the lock in other processes. --- app/models/solid_queue/blocked_execution.rb | 27 ++++++++++++------- .../solid_queue/job/concurrency_controls.rb | 1 - test/integration/concurrency_controls_test.rb | 4 +-- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index 7282ab6d..69bb2ab9 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -1,14 +1,23 @@ -class SolidQueue::BlockedExecution < SolidQueue::Execution - assume_attributes_from_job :concurrency_limit, :concurrency_key +module SolidQueue + class BlockedExecution < SolidQueue::Execution + assume_attributes_from_job :concurrency_limit, :concurrency_key - def self.release(concurrency_key) - where(concurrency_key: concurrency_key).order(:priority).first&.release - end + has_one :semaphore, foreign_key: :identifier, primary_key: :concurrency_key + + scope :releasable, -> { joins(:semaphore).merge(Semaphore.available) } + scope :ordered, -> { order(priority: :asc) } + + class << self + def release(concurrency_key) + ordered.where(concurrency_key: concurrency_key).limit(1).lock("FOR UPDATE SKIP LOCKED").each(&:release) + end + end - def release - transaction do - job.prepare_for_execution - destroy! + def release + transaction do + job.prepare_for_execution + destroy! + end end end end diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 71f126f6..4e49b2bf 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -37,7 +37,6 @@ def release_next_blocked_job def concurrency_limited? concurrency_limit.to_i > 0 && concurrency_key.present? end - end end end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index b3ebee66..b85acd66 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -28,7 +28,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase SequentialUpdateResultJob.perform_later(@result, name: name) end - wait_for_jobs_to_finish_for(4.seconds) + wait_for_jobs_to_finish_for(3.seconds) assert_no_pending_jobs assert_stored_sequence @result, ("A".."K").to_a @@ -74,7 +74,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase SequentialUpdateResultJob.perform_later(@result, name: name) end - wait_for_jobs_to_finish_for(4.seconds) + wait_for_jobs_to_finish_for(3.seconds) assert_equal 3, SolidQueue::FailedExecution.count assert_stored_sequence @result, [ "B", "D", "F" ] + ("G".."K").to_a From 9fa0aa21c582925752dc38e47b9baec21f96b9db Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 6 Nov 2023 20:50:28 +0100 Subject: [PATCH 09/32] Correct transactions to release blocked executions When a job finishes or fails, we need to: - Increment the semaphore if any, in its own transaction. This needs to happen regardless of whether other blocked executions are unblocked by this semaphore change. - Try to unblock the next blocked execution. For this, we need to acquire the semaphore again, as it's possible another job part of the same concurrency group is just being enqueued at the same time. Then, we need to move the blocked execution to "ready", and then delete the blocked execution. This all needs to happen in the same transaction, without going throuhg the job. The previous implementation could very well update the existing blocked execution and then delete it, leaving the job in limbo. --- app/models/solid_queue/blocked_execution.rb | 15 +++++++-- app/models/solid_queue/claimed_execution.rb | 2 +- .../solid_queue/job/concurrency_controls.rb | 4 +-- app/models/solid_queue/semaphore.rb | 33 +++++++++++-------- 4 files changed, 36 insertions(+), 18 deletions(-) diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index 69bb2ab9..9898d029 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -15,9 +15,20 @@ def release(concurrency_key) def release transaction do - job.prepare_for_execution - destroy! + if acquire_concurrency_lock + promote_to_ready + destroy! + end end end + + private + def acquire_concurrency_lock + Semaphore.wait_for(concurrency_key, concurrency_limit) + end + + def promote_to_ready + ReadyExecution.create_or_find_by!(job_id: job_id) + end end end diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index 05af5257..d2844ce4 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -32,7 +32,7 @@ def perform failed_with(result.error) end ensure - job.dispatch_blocked_jobs + job.unblock_blocked_jobs end def release diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 4e49b2bf..277da578 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -7,7 +7,7 @@ module ConcurrencyControls has_one :blocked_execution, dependent: :destroy end - def dispatch_blocked_jobs + def unblock_blocked_jobs if release_concurrency_lock release_next_blocked_job end @@ -23,7 +23,7 @@ def acquire_concurrency_lock def release_concurrency_lock return false unless concurrency_limited? - Semaphore.release(concurrency_key) + Semaphore.release(concurrency_key, concurrency_limit) end def block diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index d969cd64..22d308e5 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -1,26 +1,33 @@ class SolidQueue::Semaphore < SolidQueue::Record + scope :available, -> { where("value > 0") } + class << self def wait_for(identifier, limit) if semaphore = find_by(identifier: identifier) - semaphore.value > 0 && attempt_to_update(identifier) + semaphore.value > 0 && attempt_decrement(identifier) else - attempt_to_create(identifier, limit) + attempt_creation(identifier, limit) end end - def attempt_to_create(identifier, limit) - create!(identifier: identifier, value: limit - 1) - true - rescue ActiveRecord::RecordNotUnique - attempt_to_update(identifier) + def release(identifier, concurrency_limit) + attempt_increment(identifier, concurrency_limit) end - def attempt_to_update(identifier) - where(identifier: identifier).where("value > 0").update_all("value = COALESCE(value, 1) - 1") > 0 - end + private + def attempt_creation(identifier, limit) + create!(identifier: identifier, value: limit - 1) + true + rescue ActiveRecord::RecordNotUnique + attempt_decrement(identifier) + end - def release(identifier) - where(identifier: identifier).update_all("value = COALESCE(value, 0) + 1") > 0 - end + def attempt_decrement(identifier) + available.where(identifier: identifier).update_all("value = value - 1") > 0 + end + + def attempt_increment(identifier, limit) + where("value < ?", limit).where(identifier: identifier).update_all("value = value + 1") > 0 + end end end From 7ab243d11f94d6f2086d28ce992b25f1018caaf7 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 6 Nov 2023 12:31:46 +0100 Subject: [PATCH 10/32] Unblock releasable blocked executions before polling That's it, blocked executions with an available semaphore. Try to unblock at most one per concurrency key, to unblock the whole group. --- app/models/solid_queue/blocked_execution.rb | 14 ++++++++- app/models/solid_queue/execution.rb | 18 ++++++++---- .../solid_queue/job/concurrency_controls.rb | 2 +- lib/solid_queue/worker.rb | 9 ++++++ .../solid_queue/blocked_executions.yml | 0 test/fixtures/solid_queue/semaphores.yml | 0 test/integration/concurrency_controls_test.rb | 29 +++++++++++++++++++ 7 files changed, 65 insertions(+), 7 deletions(-) create mode 100644 test/fixtures/solid_queue/blocked_executions.yml create mode 100644 test/fixtures/solid_queue/semaphores.yml diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index 9898d029..0a059b0c 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -8,7 +8,17 @@ class BlockedExecution < SolidQueue::Execution scope :ordered, -> { order(priority: :asc) } class << self - def release(concurrency_key) + def unblock(count) + release_many releasable.select(:concurrency_key).distinct.limit(count).pluck(:concurrency_key) + end + + def release_many(concurrency_keys) + # We want to release exactly one blocked execution for each concurrency key, and we need to do it + # one by one, locking each record and acquiring the semaphore individually for each of them: + Array(concurrency_keys).each { |concurrency_key| release_one(concurrency_key) } + end + + def release_one(concurrency_key) ordered.where(concurrency_key: concurrency_key).limit(1).lock("FOR UPDATE SKIP LOCKED").each(&:release) end end @@ -18,6 +28,8 @@ def release if acquire_concurrency_lock promote_to_ready destroy! + + SolidQueue.logger.info("[SolidQueue] Unblocked job #{job.id} under #{concurrency_key}") end end end diff --git a/app/models/solid_queue/execution.rb b/app/models/solid_queue/execution.rb index e4925bc3..56768882 100644 --- a/app/models/solid_queue/execution.rb +++ b/app/models/solid_queue/execution.rb @@ -1,9 +1,17 @@ -class SolidQueue::Execution < SolidQueue::Record - include JobAttributes +module SolidQueue + class Execution < SolidQueue::Record + include JobAttributes - self.abstract_class = true + self.abstract_class = true - belongs_to :job + belongs_to :job - alias_method :discard, :destroy + alias_method :discard, :destroy + + class << self + def queued_as(queues) + QueueParser.new(queues, self).scoped_relation + end + end + end end diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 277da578..39108424 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -31,7 +31,7 @@ def block end def release_next_blocked_job - BlockedExecution.release(concurrency_key) + BlockedExecution.release_one(concurrency_key) end def concurrency_limited? diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index 9ed69931..7bbd7858 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -16,6 +16,15 @@ def initialize(**options) private def run + unblock_executions + poll_and_dispatch_executions + end + + def unblock_executions + SolidQueue::BlockedExecution.queued_as(queues).unblock(pool.size) + end + + def poll_and_dispatch_executions claimed_executions = with_polling_volume do SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process.id) end diff --git a/test/fixtures/solid_queue/blocked_executions.yml b/test/fixtures/solid_queue/blocked_executions.yml new file mode 100644 index 00000000..e69de29b diff --git a/test/fixtures/solid_queue/semaphores.yml b/test/fixtures/solid_queue/semaphores.yml new file mode 100644 index 00000000..e69de29b diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index b85acd66..2b443990 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -80,6 +80,35 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert_stored_sequence @result, [ "B", "D", "F" ] + ("G".."K").to_a end + test "rely on worker to unblock blocked executions" do + # Simulate a scenario where we got an available semaphore and some stuck jobs + job = SequentialUpdateResultJob.perform_later(@result, name: "A") + wait_for_jobs_to_finish_for(2.seconds) + assert_no_pending_jobs + + # Lock the semaphore so we can enqueue jobs and leave them blocked + skip_active_record_query_cache do + assert SolidQueue::Semaphore.wait_for(job.concurrency_key, job.concurrency_limit) + end + + # Now enqueue more jobs under that same key. They'll be all locked. Use priorities + # to ensure order. + assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do + ("B".."K").each_with_index do |name, i| + SequentialUpdateResultJob.set(priority: i).perform_later(@result, name: name) + end + end + + # Then unlock the semaphore: this would be as if the first job had released + # the semaphore but hadn't unblocked any jobs + assert SolidQueue::Semaphore.release(job.concurrency_key, job.concurrency_limit) + + wait_for_jobs_to_finish_for(2.seconds) + assert_no_pending_jobs + + assert_stored_sequence @result, ("A".."K").to_a + end + private def assert_stored_sequence(result, sequence) expected = "seq: " + sequence.map { |name| "s#{name}c#{name}"}.join From 04dd571bcf214daefc825b3976099f2c9effbf5b Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 6 Nov 2023 22:55:59 +0100 Subject: [PATCH 11/32] Consider blocked executions without a semaphore as releasable We'll have to clear semaphores eventually because they might get stuck, and the simplest way is to delete them. Then, we might end up with blocked executions without semaphore, so we also need to unblock these. --- app/models/solid_queue/blocked_execution.rb | 2 +- app/models/solid_queue/semaphore.rb | 1 + test/integration/concurrency_controls_test.rb | 47 ++++++++++++++++--- 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index 0a059b0c..0de67a1d 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -4,7 +4,7 @@ class BlockedExecution < SolidQueue::Execution has_one :semaphore, foreign_key: :identifier, primary_key: :concurrency_key - scope :releasable, -> { joins(:semaphore).merge(Semaphore.available) } + scope :releasable, -> { left_outer_joins(:semaphore).merge(Semaphore.available.or(Semaphore.where(id: nil))) } scope :ordered, -> { order(priority: :asc) } class << self diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index 22d308e5..d58c3cdf 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -1,5 +1,6 @@ class SolidQueue::Semaphore < SolidQueue::Record scope :available, -> { where("value > 0") } + scope :locked, -> { where(value: 0) } class << self def wait_for(identifier, limit) diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 2b443990..c8495c8c 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -80,7 +80,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert_stored_sequence @result, [ "B", "D", "F" ] + ("G".."K").to_a end - test "rely on worker to unblock blocked executions" do + test "rely on worker to unblock blocked executions with an available semaphore" do # Simulate a scenario where we got an available semaphore and some stuck jobs job = SequentialUpdateResultJob.perform_later(@result, name: "A") wait_for_jobs_to_finish_for(2.seconds) @@ -94,8 +94,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # Now enqueue more jobs under that same key. They'll be all locked. Use priorities # to ensure order. assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do - ("B".."K").each_with_index do |name, i| - SequentialUpdateResultJob.set(priority: i).perform_later(@result, name: name) + ("B".."K").each do |name| + SequentialUpdateResultJob.perform_later(@result, name: name) end end @@ -103,17 +103,50 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # the semaphore but hadn't unblocked any jobs assert SolidQueue::Semaphore.release(job.concurrency_key, job.concurrency_limit) + # And wait for workers to release the jobs wait_for_jobs_to_finish_for(2.seconds) assert_no_pending_jobs - assert_stored_sequence @result, ("A".."K").to_a + # We can't ensure the order between B and C, because it depends on which worker wins when + # unblocking, as one will try to unblock B and another C + assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a + end + + test "rely on worker to unblock blocked executions with a missing semaphore" do + # Simulate a scenario where we got an available semaphore and some stuck jobs + job = SequentialUpdateResultJob.perform_later(@result, name: "A") + wait_for_jobs_to_finish_for(2.seconds) + assert_no_pending_jobs + + # Lock the semaphore so we can enqueue jobs and leave them blocked + skip_active_record_query_cache do + assert SolidQueue::Semaphore.wait_for(job.concurrency_key, job.concurrency_limit) + end + + # Now enqueue more jobs under that same key. They'll be all locked + assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do + ("B".."K").each do |name| + SequentialUpdateResultJob.perform_later(@result, name: name) + end + end + + # Then delete the semaphore, as if we had cleared it + SolidQueue::Semaphore.find_by(identifier: job.concurrency_key).destroy! + + # And wait for workers to release the jobs + wait_for_jobs_to_finish_for(2.seconds) + assert_no_pending_jobs + + # We can't ensure the order between B and C, because it depends on which worker wins when + # unblocking, as one will try to unblock B and another C + assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a end private - def assert_stored_sequence(result, sequence) - expected = "seq: " + sequence.map { |name| "s#{name}c#{name}"}.join + def assert_stored_sequence(result, *sequences) + expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}"}.join } skip_active_record_query_cache do - assert_equal expected, result.reload.status + assert_includes expected, result.reload.status end end end From 7814338da8bfd164db5b4421b1a93ae8e6d35369 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Tue, 7 Nov 2023 11:55:59 +0100 Subject: [PATCH 12/32] Tweak concurrency related tables Add `expires_at` to semaphores, so we can easily expire them. --- app/models/solid_queue/blocked_execution.rb | 4 +-- app/models/solid_queue/semaphore.rb | 26 +++++++++---------- ...create_solid_queue_concurrency_controls.rb | 7 +++-- test/dummy/db/schema.rb | 8 +++--- test/integration/concurrency_controls_test.rb | 2 +- 5 files changed, 26 insertions(+), 21 deletions(-) diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index 0de67a1d..777858c1 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -2,9 +2,9 @@ module SolidQueue class BlockedExecution < SolidQueue::Execution assume_attributes_from_job :concurrency_limit, :concurrency_key - has_one :semaphore, foreign_key: :identifier, primary_key: :concurrency_key + has_one :semaphore, foreign_key: :concurrency_key, primary_key: :concurrency_key - scope :releasable, -> { left_outer_joins(:semaphore).merge(Semaphore.available.or(Semaphore.where(id: nil))) } + scope :releasable, -> { left_outer_joins(:execution_semaphore).merge(Semaphore.available.or(Semaphore.where(id: nil))) } scope :ordered, -> { order(priority: :asc) } class << self diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index d58c3cdf..e17ab88c 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -3,32 +3,32 @@ class SolidQueue::Semaphore < SolidQueue::Record scope :locked, -> { where(value: 0) } class << self - def wait_for(identifier, limit) - if semaphore = find_by(identifier: identifier) - semaphore.value > 0 && attempt_decrement(identifier) + def wait_for(concurrency_key, limit) + if semaphore = find_by(concurrency_key: concurrency_key) + semaphore.value > 0 && attempt_decrement(concurrency_key) else - attempt_creation(identifier, limit) + attempt_creation(concurrency_key, limit) end end - def release(identifier, concurrency_limit) - attempt_increment(identifier, concurrency_limit) + def release(concurrency_key, concurrency_limit) + attempt_increment(concurrency_key, concurrency_limit) end private - def attempt_creation(identifier, limit) - create!(identifier: identifier, value: limit - 1) + def attempt_creation(concurrency_key, limit) + create!(concurrency_key: concurrency_key, value: limit - 1) true rescue ActiveRecord::RecordNotUnique - attempt_decrement(identifier) + attempt_decrement(concurrency_key) end - def attempt_decrement(identifier) - available.where(identifier: identifier).update_all("value = value - 1") > 0 + def attempt_decrement(concurrency_key) + available.where(concurrency_key: concurrency_key).update_all("value = value - 1") > 0 end - def attempt_increment(identifier, limit) - where("value < ?", limit).where(identifier: identifier).update_all("value = value + 1") > 0 + def attempt_increment(concurrency_key, limit) + where("value < ?", limit).where(concurrency_key: concurrency_key).update_all("value = value + 1") > 0 end end end diff --git a/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb b/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb index c1b9ec28..25464f4c 100644 --- a/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb +++ b/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb @@ -11,14 +11,17 @@ def change t.integer :priority, default: 0, null: false t.integer :concurrency_limit, null: false - t.string :concurrency_key, null: false, index: true + t.string :concurrency_key, null: false t.datetime :created_at, null: false + + t.index [ :priority, :concurrency_key, :queue_name, :job_id ], name: "index_solid_queue_blocked_executions_for_release" end create_table :solid_queue_semaphores do |t| - t.string :identifier, null: false, index: { unique: true } + t.string :concurrency_key, null: false, index: { unique: true } t.integer :value, null: false, default: 1 + t.datetime :expires_at t.timestamps end diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index d319a7ad..c61d7b90 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -26,8 +26,8 @@ t.integer "concurrency_limit", null: false t.string "concurrency_key", null: false t.datetime "created_at", null: false - t.index ["concurrency_key"], name: "index_solid_queue_blocked_executions_on_concurrency_key" t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true + t.index ["priority", "concurrency_key", "queue_name", "job_id"], name: "index_solid_queue_blocked_executions_for_release" end create_table "solid_queue_claimed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| @@ -98,11 +98,13 @@ end create_table "solid_queue_semaphores", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| - t.string "identifier", null: false + t.string "concurrency_key", null: false t.integer "value", default: 1, null: false + t.datetime "expires_at", null: false t.datetime "created_at", null: false t.datetime "updated_at", null: false - t.index ["identifier"], name: "index_solid_queue_semaphores_on_identifier", unique: true + t.index ["concurrency_key"], name: "index_solid_queue_semaphores_on_concurrency_key", unique: true + t.index ["expires_at"], name: "index_solid_queue_semaphores_on_expires_at" end end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index c8495c8c..7d003fec 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -131,7 +131,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end # Then delete the semaphore, as if we had cleared it - SolidQueue::Semaphore.find_by(identifier: job.concurrency_key).destroy! + SolidQueue::Semaphore.find_by(concurrency_key: job.concurrency_key).destroy! # And wait for workers to release the jobs wait_for_jobs_to_finish_for(2.seconds) From 016674b80a2c4e217d177e58e1bf32ca49a77251 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 16 Nov 2023 19:29:25 +0100 Subject: [PATCH 13/32] Delegate concurrency limit and duration to job class and store `expires_at` We don't need to store the concurrency limit and max duration as the Solid Queue job has the job class name available to constantize it and get them from there. Also, stop unblocking executions in the worker before polling, keep that to polling only. We'll have the supervisor do this, as well as expiring semaphores. --- app/models/solid_queue/blocked_execution.rb | 6 ++--- app/models/solid_queue/job.rb | 1 - .../solid_queue/job/concurrency_controls.rb | 10 ++++++-- app/models/solid_queue/semaphore.rb | 24 +++++++++--------- ...create_solid_queue_concurrency_controls.rb | 4 +-- lib/active_job/concurrency_controls.rb | 7 ++++-- lib/solid_queue.rb | 1 + lib/solid_queue/worker.rb | 25 ++++++++----------- .../app/jobs/sequential_update_result_job.rb | 2 +- .../app/jobs/throttled_update_result_job.rb | 2 +- test/dummy/db/schema.rb | 2 -- test/integration/concurrency_controls_test.rb | 11 +++++--- test/models/solid_queue/job_test.rb | 2 +- 13 files changed, 51 insertions(+), 46 deletions(-) diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index 777858c1..5b3bccef 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -1,6 +1,6 @@ module SolidQueue class BlockedExecution < SolidQueue::Execution - assume_attributes_from_job :concurrency_limit, :concurrency_key + assume_attributes_from_job :concurrency_key has_one :semaphore, foreign_key: :concurrency_key, primary_key: :concurrency_key @@ -36,11 +36,11 @@ def release private def acquire_concurrency_lock - Semaphore.wait_for(concurrency_key, concurrency_limit) + Semaphore.wait_for(concurrency_key, job.concurrency_limit, job.concurrency_limit_duration) end def promote_to_ready - ReadyExecution.create_or_find_by!(job_id: job_id) + ReadyExecution.create!(job_id: job_id, queue_name: queue_name, priority: priority) end end end diff --git a/app/models/solid_queue/job.rb b/app/models/solid_queue/job.rb index ac22c126..0cf0e798 100644 --- a/app/models/solid_queue/job.rb +++ b/app/models/solid_queue/job.rb @@ -19,7 +19,6 @@ def enqueue_active_job(active_job, scheduled_at: Time.current) scheduled_at: scheduled_at, class_name: active_job.class.name, arguments: active_job.serialize, - concurrency_limit: active_job.try(:concurrency_limit), concurrency_key: active_job.try(:concurrency_key) end diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 39108424..021ed5bc 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -5,6 +5,8 @@ module ConcurrencyControls included do has_one :blocked_execution, dependent: :destroy + + delegate :concurrency_limit, :concurrency_limit_duration, to: :job_class end def unblock_blocked_jobs @@ -17,13 +19,13 @@ def unblock_blocked_jobs def acquire_concurrency_lock return true unless concurrency_limited? - Semaphore.wait_for(concurrency_key, concurrency_limit) + Semaphore.wait_for(concurrency_key, concurrency_limit, concurrency_limit_duration) end def release_concurrency_lock return false unless concurrency_limited? - Semaphore.release(concurrency_key, concurrency_limit) + Semaphore.release(concurrency_key, concurrency_limit, concurrency_limit_duration) end def block @@ -37,6 +39,10 @@ def release_next_blocked_job def concurrency_limited? concurrency_limit.to_i > 0 && concurrency_key.present? end + + def job_class + @job_class ||= class_name.safe_constantize + end end end end diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index e17ab88c..461c01d8 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -3,32 +3,32 @@ class SolidQueue::Semaphore < SolidQueue::Record scope :locked, -> { where(value: 0) } class << self - def wait_for(concurrency_key, limit) + def wait_for(concurrency_key, limit, duration) if semaphore = find_by(concurrency_key: concurrency_key) - semaphore.value > 0 && attempt_decrement(concurrency_key) + semaphore.value > 0 && attempt_decrement(concurrency_key, duration) else - attempt_creation(concurrency_key, limit) + attempt_creation(concurrency_key, limit, duration) end end - def release(concurrency_key, concurrency_limit) - attempt_increment(concurrency_key, concurrency_limit) + def release(concurrency_key, limit, duration) + attempt_increment(concurrency_key, limit, duration) end private - def attempt_creation(concurrency_key, limit) - create!(concurrency_key: concurrency_key, value: limit - 1) + def attempt_creation(concurrency_key, limit, duration) + create!(concurrency_key: concurrency_key, value: limit - 1, expires_at: duration.from_now) true rescue ActiveRecord::RecordNotUnique - attempt_decrement(concurrency_key) + attempt_decrement(concurrency_key, duration) end - def attempt_decrement(concurrency_key) - available.where(concurrency_key: concurrency_key).update_all("value = value - 1") > 0 + def attempt_decrement(concurrency_key, duration) + available.where(concurrency_key: concurrency_key).update_all([ "value = value - 1, expires_at = ?", duration.from_now ]) > 0 end - def attempt_increment(concurrency_key, limit) - where("value < ?", limit).where(concurrency_key: concurrency_key).update_all("value = value + 1") > 0 + def attempt_increment(concurrency_key, limit, duration) + where("value < ?", limit).where(concurrency_key: concurrency_key).update_all([ "value = value + 1, expires_at = ?", duration.from_now ]) > 0 end end end diff --git a/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb b/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb index 25464f4c..050b0e41 100644 --- a/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb +++ b/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb @@ -1,7 +1,6 @@ class CreateSolidQueueConcurrencyControls < ActiveRecord::Migration[7.1] def change change_table :solid_queue_jobs do |t| - t.integer :concurrency_limit t.string :concurrency_key end @@ -10,7 +9,6 @@ def change t.string :queue_name, null: false t.integer :priority, default: 0, null: false - t.integer :concurrency_limit, null: false t.string :concurrency_key, null: false t.datetime :created_at, null: false @@ -21,7 +19,7 @@ def change create_table :solid_queue_semaphores do |t| t.string :concurrency_key, null: false, index: { unique: true } t.integer :value, null: false, default: 1 - t.datetime :expires_at + t.datetime :expires_at, null: false, index: true t.timestamps end diff --git a/lib/active_job/concurrency_controls.rb b/lib/active_job/concurrency_controls.rb index b73d6f17..bed96af1 100644 --- a/lib/active_job/concurrency_controls.rb +++ b/lib/active_job/concurrency_controls.rb @@ -7,14 +7,17 @@ module ConcurrencyControls DEFAULT_CONCURRENCY_KEY = ->(*) { self.name } included do - class_attribute :concurrency_limit, default: 0 # No limit class_attribute :concurrency_key, default: DEFAULT_CONCURRENCY_KEY, instance_accessor: false + + class_attribute :concurrency_limit, default: 0 # No limit + class_attribute :concurrency_limit_duration, default: SolidQueue.default_concurrency_control_period end class_methods do - def limit_concurrency(limit: 1, key: DEFAULT_CONCURRENCY_KEY) + def restrict_concurrency_with(limit: 1, key: DEFAULT_CONCURRENCY_KEY, duration: SolidQueue.default_concurrency_control_period) self.concurrency_limit = limit self.concurrency_key = key + self.concurrency_limit_duration = duration end end diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index ed077fa8..45855449 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -34,6 +34,7 @@ module SolidQueue mattr_accessor :supervisor, default: false mattr_accessor :delete_finished_jobs, default: true + mattr_accessor :default_concurrency_control_period, default: 15.minutes def self.supervisor? supervisor diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index 7bbd7858..69b4fc55 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -16,23 +16,12 @@ def initialize(**options) private def run - unblock_executions - poll_and_dispatch_executions - end - - def unblock_executions - SolidQueue::BlockedExecution.queued_as(queues).unblock(pool.size) - end - - def poll_and_dispatch_executions - claimed_executions = with_polling_volume do - SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process.id) - end + polled_executions = poll - if claimed_executions.size > 0 - procline "performing #{claimed_executions.count} jobs" + if polled_executions.size > 0 + procline "performing #{polled_executions.count} jobs" - claimed_executions.each do |execution| + polled_executions.each do |execution| pool.post(execution) end else @@ -41,6 +30,12 @@ def poll_and_dispatch_executions end end + def poll + with_polling_volume do + SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process.id) + end + end + def shutdown super diff --git a/test/dummy/app/jobs/sequential_update_result_job.rb b/test/dummy/app/jobs/sequential_update_result_job.rb index 279fafab..bd43b46b 100644 --- a/test/dummy/app/jobs/sequential_update_result_job.rb +++ b/test/dummy/app/jobs/sequential_update_result_job.rb @@ -1,5 +1,5 @@ class SequentialUpdateResultJob < UpdateResultJob include ActiveJob::ConcurrencyControls - limit_concurrency limit: 1, key: ->(job_result, **) { job_result } + restrict_concurrency_with limit: 1, key: ->(job_result, **) { job_result } end diff --git a/test/dummy/app/jobs/throttled_update_result_job.rb b/test/dummy/app/jobs/throttled_update_result_job.rb index 8db632db..e0a1f004 100644 --- a/test/dummy/app/jobs/throttled_update_result_job.rb +++ b/test/dummy/app/jobs/throttled_update_result_job.rb @@ -1,5 +1,5 @@ class ThrottledUpdateResultJob < UpdateResultJob include ActiveJob::ConcurrencyControls - limit_concurrency limit: 3, key: ->(job_result, **) { job_result } + restrict_concurrency_with limit: 3, key: ->(job_result, **) { job_result } end diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index c61d7b90..287d1bb9 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -23,7 +23,6 @@ t.bigint "job_id" t.string "queue_name", null: false t.integer "priority", default: 0, null: false - t.integer "concurrency_limit", null: false t.string "concurrency_key", null: false t.datetime "created_at", null: false t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true @@ -55,7 +54,6 @@ t.datetime "finished_at" t.datetime "created_at", null: false t.datetime "updated_at", null: false - t.integer "concurrency_limit" t.string "concurrency_key" t.index ["active_job_id"], name: "index_solid_queue_jobs_on_job_id" t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name" diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 7d003fec..0a7f9f5d 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -81,14 +81,17 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end test "rely on worker to unblock blocked executions with an available semaphore" do + skip "Moving this task to the supervisor" + # Simulate a scenario where we got an available semaphore and some stuck jobs job = SequentialUpdateResultJob.perform_later(@result, name: "A") + wait_for_jobs_to_finish_for(2.seconds) assert_no_pending_jobs # Lock the semaphore so we can enqueue jobs and leave them blocked skip_active_record_query_cache do - assert SolidQueue::Semaphore.wait_for(job.concurrency_key, job.concurrency_limit) + assert SolidQueue::Semaphore.wait_for(job.concurrency_key, job.concurrency_limit, job.concurrency_limit_duration) end # Now enqueue more jobs under that same key. They'll be all locked. Use priorities @@ -101,7 +104,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # Then unlock the semaphore: this would be as if the first job had released # the semaphore but hadn't unblocked any jobs - assert SolidQueue::Semaphore.release(job.concurrency_key, job.concurrency_limit) + assert SolidQueue::Semaphore.release(job.concurrency_key, job.concurrency_limit, job.concurrency_limit_duration) # And wait for workers to release the jobs wait_for_jobs_to_finish_for(2.seconds) @@ -113,6 +116,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end test "rely on worker to unblock blocked executions with a missing semaphore" do + skip "Moving this task to the supervisor" + # Simulate a scenario where we got an available semaphore and some stuck jobs job = SequentialUpdateResultJob.perform_later(@result, name: "A") wait_for_jobs_to_finish_for(2.seconds) @@ -120,7 +125,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # Lock the semaphore so we can enqueue jobs and leave them blocked skip_active_record_query_cache do - assert SolidQueue::Semaphore.wait_for(job.concurrency_key, job.concurrency_limit) + assert SolidQueue::Semaphore.wait_for(job.concurrency_key, job.concurrency_limit, job.concurrency_limit_duration) end # Now enqueue more jobs under that same key. They'll be all locked diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index 1dee0e82..bda03c90 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -4,7 +4,7 @@ class SolidQueue::JobTest < ActiveSupport::TestCase class NonOverlappingJob < ApplicationJob include ActiveJob::ConcurrencyControls - limit_concurrency limit: 1, key: ->(job_result, **) { job_result } + restrict_concurrency_with limit: 1, key: ->(job_result, **) { job_result } def perform(job_result) end From f22a46ce08cc8d36e3726cad02c042480d5f90b1 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 16 Nov 2023 20:21:20 +0100 Subject: [PATCH 14/32] Include concurrency controls extension directly in ActiveJob --- lib/solid_queue/engine.rb | 6 ++++++ test/dummy/app/jobs/sequential_update_result_job.rb | 2 -- test/dummy/app/jobs/throttled_update_result_job.rb | 2 -- test/dummy/app/jobs/update_result_job.rb | 2 -- test/models/solid_queue/job_test.rb | 2 -- 5 files changed, 6 insertions(+), 8 deletions(-) diff --git a/lib/solid_queue/engine.rb b/lib/solid_queue/engine.rb index 9ba7f37a..72d7889e 100644 --- a/lib/solid_queue/engine.rb +++ b/lib/solid_queue/engine.rb @@ -27,5 +27,11 @@ class Engine < ::Rails::Engine self.logger = app.logger end end + + initializer "solid_queue.active_job.extensions" do + ActiveSupport.on_load :active_job do + include ActiveJob::ConcurrencyControls + end + end end end diff --git a/test/dummy/app/jobs/sequential_update_result_job.rb b/test/dummy/app/jobs/sequential_update_result_job.rb index bd43b46b..ccfbcf21 100644 --- a/test/dummy/app/jobs/sequential_update_result_job.rb +++ b/test/dummy/app/jobs/sequential_update_result_job.rb @@ -1,5 +1,3 @@ class SequentialUpdateResultJob < UpdateResultJob - include ActiveJob::ConcurrencyControls - restrict_concurrency_with limit: 1, key: ->(job_result, **) { job_result } end diff --git a/test/dummy/app/jobs/throttled_update_result_job.rb b/test/dummy/app/jobs/throttled_update_result_job.rb index e0a1f004..2be0d96b 100644 --- a/test/dummy/app/jobs/throttled_update_result_job.rb +++ b/test/dummy/app/jobs/throttled_update_result_job.rb @@ -1,5 +1,3 @@ class ThrottledUpdateResultJob < UpdateResultJob - include ActiveJob::ConcurrencyControls - restrict_concurrency_with limit: 3, key: ->(job_result, **) { job_result } end diff --git a/test/dummy/app/jobs/update_result_job.rb b/test/dummy/app/jobs/update_result_job.rb index 3c9de511..04571eb6 100644 --- a/test/dummy/app/jobs/update_result_job.rb +++ b/test/dummy/app/jobs/update_result_job.rb @@ -1,6 +1,4 @@ class UpdateResultJob < ApplicationJob - include ActiveJob::ConcurrencyControls - def perform(job_result, name:, pause: nil, exception: nil) job_result.status += "s#{name}" diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index bda03c90..34175e30 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -2,8 +2,6 @@ class SolidQueue::JobTest < ActiveSupport::TestCase class NonOverlappingJob < ApplicationJob - include ActiveJob::ConcurrencyControls - restrict_concurrency_with limit: 1, key: ->(job_result, **) { job_result } def perform(job_result) From ede2bfbe37e40701f61243342f558d7db7242b2f Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 16 Nov 2023 22:55:56 +0100 Subject: [PATCH 15/32] Use a concurrent timer task to expire semaphores and unblock executions This task is managed by the scheduler and uses its same polling interval to run. --- app/models/solid_queue/blocked_execution.rb | 2 +- app/models/solid_queue/semaphore.rb | 1 + lib/solid_queue/process_registration.rb | 4 +- lib/solid_queue/scheduler.rb | 70 +++++++++++++------ test/integration/concurrency_controls_test.rb | 18 +++-- 5 files changed, 60 insertions(+), 35 deletions(-) diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index 5b3bccef..296d504c 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -4,7 +4,7 @@ class BlockedExecution < SolidQueue::Execution has_one :semaphore, foreign_key: :concurrency_key, primary_key: :concurrency_key - scope :releasable, -> { left_outer_joins(:execution_semaphore).merge(Semaphore.available.or(Semaphore.where(id: nil))) } + scope :releasable, -> { left_outer_joins(:semaphore).merge(Semaphore.available.or(Semaphore.where(id: nil))) } scope :ordered, -> { order(priority: :asc) } class << self diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index 461c01d8..e55c67b3 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -1,6 +1,7 @@ class SolidQueue::Semaphore < SolidQueue::Record scope :available, -> { where("value > 0") } scope :locked, -> { where(value: 0) } + scope :expired, -> { where(expires_at: ...Time.current)} class << self def wait_for(concurrency_key, limit, duration) diff --git a/lib/solid_queue/process_registration.rb b/lib/solid_queue/process_registration.rb index e0e9f076..2e7fe0ad 100644 --- a/lib/solid_queue/process_registration.rb +++ b/lib/solid_queue/process_registration.rb @@ -9,7 +9,7 @@ module ProcessRegistration define_callbacks :start, :run, :shutdown set_callback :start, :before, :register - set_callback :start, :before, :start_heartbeat + set_callback :start, :before, :launch_heartbeat set_callback :run, :after, -> { stop unless registered? } @@ -43,7 +43,7 @@ def registered? process.persisted? end - def start_heartbeat + def launch_heartbeat @heartbeat_task = Concurrent::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) { heartbeat } @heartbeat_task.execute end diff --git a/lib/solid_queue/scheduler.rb b/lib/solid_queue/scheduler.rb index e122e3fb..dc6b5f7e 100644 --- a/lib/solid_queue/scheduler.rb +++ b/lib/solid_queue/scheduler.rb @@ -1,34 +1,60 @@ # frozen_string_literal: true -class SolidQueue::Scheduler - include SolidQueue::Runner +module SolidQueue + class Scheduler + include Runner - attr_accessor :batch_size, :polling_interval + attr_accessor :batch_size, :polling_interval - def initialize(**options) - options = options.dup.with_defaults(SolidQueue::Configuration::SCHEDULER_DEFAULTS) + set_callback :start, :before, :launch_concurrency_maintenance + set_callback :shutdown, :before, :stop_concurrency_maintenance - @batch_size = options[:batch_size] - @polling_interval = options[:polling_interval] - end + def initialize(**options) + options = options.dup.with_defaults(SolidQueue::Configuration::SCHEDULER_DEFAULTS) + + @batch_size = options[:batch_size] + @polling_interval = options[:polling_interval] + end - private - def run - with_polling_volume do - batch = SolidQueue::ScheduledExecution.next_batch(batch_size) + private + def run + with_polling_volume do + batch = SolidQueue::ScheduledExecution.next_batch(batch_size).tap(&:load) - if batch.size > 0 - procline "preparing #{batch.size} jobs for execution" + if batch.size > 0 + procline "preparing #{batch.size} jobs for execution" - SolidQueue::ScheduledExecution.prepare_batch(batch) - else - procline "waiting" - interruptible_sleep(polling_interval) + SolidQueue::ScheduledExecution.prepare_batch(batch) + else + procline "waiting" + interruptible_sleep(polling_interval) + end end end - end - def metadata - super.merge(batch_size: batch_size, polling_interval: polling_interval) - end + def launch_concurrency_maintenance + @concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: polling_interval) do + expire_semaphores + unblock_blocked_executions + end + + @concurrency_maintenance_task.execute + end + + def stop_concurrency_maintenance + @concurrency_maintenance_task.shutdown + end + + def expire_semaphores + Semaphore.expired.in_batches(of: batch_size, &:delete_all) + end + + def unblock_blocked_executions + BlockedExecution.unblock(batch_size) + end + + def metadata + super.merge(batch_size: batch_size, polling_interval: polling_interval) + end + end end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 0a7f9f5d..ba1a1183 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -10,7 +10,9 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase @result = JobResult.create!(queue_name: "default", status: "seq: ") default_worker = { queues: "default", polling_interval: 1, processes: 3, threads: 2 } - @pid = run_supervisor_as_fork(load_configuration_from: { workers: [ default_worker ] }) + scheduler = { polling_interval: 1, batch_size: 200 } + + @pid = run_supervisor_as_fork(mode: :all, load_configuration_from: { workers: [ default_worker ], scheduler: scheduler }) wait_for_registered_processes(4, timeout: 0.2.second) # 3 workers working the default queue + supervisor end @@ -80,9 +82,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert_stored_sequence @result, [ "B", "D", "F" ] + ("G".."K").to_a end - test "rely on worker to unblock blocked executions with an available semaphore" do - skip "Moving this task to the supervisor" - + test "rely on scheduler to unblock blocked executions with an available semaphore" do # Simulate a scenario where we got an available semaphore and some stuck jobs job = SequentialUpdateResultJob.perform_later(@result, name: "A") @@ -115,9 +115,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a end - test "rely on worker to unblock blocked executions with a missing semaphore" do - skip "Moving this task to the supervisor" - + test "rely on scheduler to unblock blocked executions with an expired semaphore" do # Simulate a scenario where we got an available semaphore and some stuck jobs job = SequentialUpdateResultJob.perform_later(@result, name: "A") wait_for_jobs_to_finish_for(2.seconds) @@ -135,10 +133,10 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end end - # Then delete the semaphore, as if we had cleared it - SolidQueue::Semaphore.find_by(concurrency_key: job.concurrency_key).destroy! + # Simulate semaphore expiration + SolidQueue::Semaphore.find_by(concurrency_key: job.concurrency_key).update(expires_at: 1.hour.ago) - # And wait for workers to release the jobs + # And wait for scheduler to release the jobs wait_for_jobs_to_finish_for(2.seconds) assert_no_pending_jobs From 8484ca89d7f5c83c1515c3a89fc8ca8560d459a0 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 20 Nov 2023 13:30:06 +0100 Subject: [PATCH 16/32] Don't instantiate job class to check concurrency limit if the key is not set --- app/models/solid_queue/job/concurrency_controls.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 021ed5bc..8ffab81b 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -37,7 +37,7 @@ def release_next_blocked_job end def concurrency_limited? - concurrency_limit.to_i > 0 && concurrency_key.present? + concurrency_key.present? && concurrency_limit.to_i > 0 end def job_class From 91256ee432907fe7e9ff9ccf9a77a7e37d5290c6 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 20 Nov 2023 17:33:49 +0100 Subject: [PATCH 17/32] Rename Semaphore.concurrency_key to Semaphore.key and simplify method sigantures We can simply pass a job, since both Active Job and Solid Queue's Job respond to the same methods, required for the semaphore to be waited on, or signaled. The "concurrency" part of the "concurrency_key" attribute is redundant. --- app/models/solid_queue/blocked_execution.rb | 4 +-- .../solid_queue/job/concurrency_controls.rb | 4 +-- app/models/solid_queue/semaphore.rb | 26 +++++++++---------- ...create_solid_queue_concurrency_controls.rb | 2 +- test/dummy/db/schema.rb | 4 +-- test/integration/concurrency_controls_test.rb | 8 +++--- 6 files changed, 24 insertions(+), 24 deletions(-) diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index 296d504c..397591a7 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -2,7 +2,7 @@ module SolidQueue class BlockedExecution < SolidQueue::Execution assume_attributes_from_job :concurrency_key - has_one :semaphore, foreign_key: :concurrency_key, primary_key: :concurrency_key + has_one :semaphore, foreign_key: :key, primary_key: :concurrency_key scope :releasable, -> { left_outer_joins(:semaphore).merge(Semaphore.available.or(Semaphore.where(id: nil))) } scope :ordered, -> { order(priority: :asc) } @@ -36,7 +36,7 @@ def release private def acquire_concurrency_lock - Semaphore.wait_for(concurrency_key, job.concurrency_limit, job.concurrency_limit_duration) + Semaphore.wait(job) end def promote_to_ready diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 8ffab81b..e22d60c1 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -19,13 +19,13 @@ def unblock_blocked_jobs def acquire_concurrency_lock return true unless concurrency_limited? - Semaphore.wait_for(concurrency_key, concurrency_limit, concurrency_limit_duration) + Semaphore.wait(self) end def release_concurrency_lock return false unless concurrency_limited? - Semaphore.release(concurrency_key, concurrency_limit, concurrency_limit_duration) + Semaphore.signal(self) end def block diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index e55c67b3..7bb3dc05 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -4,32 +4,32 @@ class SolidQueue::Semaphore < SolidQueue::Record scope :expired, -> { where(expires_at: ...Time.current)} class << self - def wait_for(concurrency_key, limit, duration) - if semaphore = find_by(concurrency_key: concurrency_key) - semaphore.value > 0 && attempt_decrement(concurrency_key, duration) + def wait(job) + if semaphore = find_by(key: job.concurrency_key) + semaphore.value > 0 && attempt_decrement(job.concurrency_key, job.concurrency_limit_duration) else - attempt_creation(concurrency_key, limit, duration) + attempt_creation(job.concurrency_key, job.concurrency_limit, job.concurrency_limit_duration) end end - def release(concurrency_key, limit, duration) - attempt_increment(concurrency_key, limit, duration) + def signal(job) + attempt_increment(job.concurrency_key, job.concurrency_limit, job.concurrency_limit_duration) end private - def attempt_creation(concurrency_key, limit, duration) - create!(concurrency_key: concurrency_key, value: limit - 1, expires_at: duration.from_now) + def attempt_creation(key, limit, duration) + create!(key: key, value: limit - 1, expires_at: duration.from_now) true rescue ActiveRecord::RecordNotUnique - attempt_decrement(concurrency_key, duration) + attempt_decrement(key, duration) end - def attempt_decrement(concurrency_key, duration) - available.where(concurrency_key: concurrency_key).update_all([ "value = value - 1, expires_at = ?", duration.from_now ]) > 0 + def attempt_decrement(key, duration) + available.where(key: key).update_all([ "value = value - 1, expires_at = ?", duration.from_now ]) > 0 end - def attempt_increment(concurrency_key, limit, duration) - where("value < ?", limit).where(concurrency_key: concurrency_key).update_all([ "value = value + 1, expires_at = ?", duration.from_now ]) > 0 + def attempt_increment(key, limit, duration) + where("value < ?", limit).where(key: key).update_all([ "value = value + 1, expires_at = ?", duration.from_now ]) > 0 end end end diff --git a/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb b/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb index 050b0e41..4c6a48e1 100644 --- a/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb +++ b/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb @@ -17,7 +17,7 @@ def change end create_table :solid_queue_semaphores do |t| - t.string :concurrency_key, null: false, index: { unique: true } + t.string :key, null: false, index: { unique: true } t.integer :value, null: false, default: 1 t.datetime :expires_at, null: false, index: true diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index 287d1bb9..54252c8d 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -96,13 +96,13 @@ end create_table "solid_queue_semaphores", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| - t.string "concurrency_key", null: false + t.string "key", null: false t.integer "value", default: 1, null: false t.datetime "expires_at", null: false t.datetime "created_at", null: false t.datetime "updated_at", null: false - t.index ["concurrency_key"], name: "index_solid_queue_semaphores_on_concurrency_key", unique: true t.index ["expires_at"], name: "index_solid_queue_semaphores_on_expires_at" + t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true end end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index ba1a1183..10f82b03 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -91,7 +91,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # Lock the semaphore so we can enqueue jobs and leave them blocked skip_active_record_query_cache do - assert SolidQueue::Semaphore.wait_for(job.concurrency_key, job.concurrency_limit, job.concurrency_limit_duration) + assert SolidQueue::Semaphore.wait(job) end # Now enqueue more jobs under that same key. They'll be all locked. Use priorities @@ -104,7 +104,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # Then unlock the semaphore: this would be as if the first job had released # the semaphore but hadn't unblocked any jobs - assert SolidQueue::Semaphore.release(job.concurrency_key, job.concurrency_limit, job.concurrency_limit_duration) + assert SolidQueue::Semaphore.signal(job) # And wait for workers to release the jobs wait_for_jobs_to_finish_for(2.seconds) @@ -123,7 +123,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # Lock the semaphore so we can enqueue jobs and leave them blocked skip_active_record_query_cache do - assert SolidQueue::Semaphore.wait_for(job.concurrency_key, job.concurrency_limit, job.concurrency_limit_duration) + assert SolidQueue::Semaphore.wait(job) end # Now enqueue more jobs under that same key. They'll be all locked @@ -134,7 +134,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end # Simulate semaphore expiration - SolidQueue::Semaphore.find_by(concurrency_key: job.concurrency_key).update(expires_at: 1.hour.ago) + SolidQueue::Semaphore.find_by(key: job.concurrency_key).update(expires_at: 1.hour.ago) # And wait for scheduler to release the jobs wait_for_jobs_to_finish_for(2.seconds) From 9f80e5f5fc0c31bb18390fcb85e2d399343f407c Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 20 Nov 2023 17:36:00 +0100 Subject: [PATCH 18/32] Improve a bit some execution methods (DRY and order) --- app/models/solid_queue/blocked_execution.rb | 2 +- app/models/solid_queue/execution.rb | 6 ++---- app/models/solid_queue/ready_execution.rb | 4 ++++ app/models/solid_queue/scheduled_execution.rb | 6 +----- 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index 397591a7..40bdfb38 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -40,7 +40,7 @@ def acquire_concurrency_lock end def promote_to_ready - ReadyExecution.create!(job_id: job_id, queue_name: queue_name, priority: priority) + ReadyExecution.create!(ready_attributes) end end end diff --git a/app/models/solid_queue/execution.rb b/app/models/solid_queue/execution.rb index 56768882..366c4e24 100644 --- a/app/models/solid_queue/execution.rb +++ b/app/models/solid_queue/execution.rb @@ -8,10 +8,8 @@ class Execution < SolidQueue::Record alias_method :discard, :destroy - class << self - def queued_as(queues) - QueueParser.new(queues, self).scoped_relation - end + def ready_attributes + attributes.slice("job_id", "queue_name", "priority") end end end diff --git a/app/models/solid_queue/ready_execution.rb b/app/models/solid_queue/ready_execution.rb index 7701d815..243c4f7e 100644 --- a/app/models/solid_queue/ready_execution.rb +++ b/app/models/solid_queue/ready_execution.rb @@ -14,6 +14,10 @@ def claim(queue_list, limit, process_id) end end + def queued_as(queues) + QueueParser.new(queues, self).scoped_relation + end + private def select_and_lock(queue_relation, process_id, limit) return [] if limit <= 0 diff --git a/app/models/solid_queue/scheduled_execution.rb b/app/models/solid_queue/scheduled_execution.rb index 2045d1e7..4f65e9db 100644 --- a/app/models/solid_queue/scheduled_execution.rb +++ b/app/models/solid_queue/scheduled_execution.rb @@ -10,7 +10,7 @@ def prepare_batch(batch) prepared_at = Time.current rows = batch.map do |scheduled_execution| - scheduled_execution.execution_ready_attributes.merge(created_at: prepared_at) + scheduled_execution.ready_attributes.merge(created_at: prepared_at) end if rows.any? @@ -23,8 +23,4 @@ def prepare_batch(batch) SolidQueue.logger.info("[SolidQueue] Prepared scheduled batch with #{rows.size} jobs at #{prepared_at}") end end - - def execution_ready_attributes - attributes.slice("job_id", "queue_name", "priority") - end end From 8993dd9cfb70fb1fe352c10351312d201dcdeb0f Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 20 Nov 2023 18:01:15 +0100 Subject: [PATCH 19/32] Rename `concurrency_limit_duration` to `concurrency_duration` --- app/models/solid_queue/job/concurrency_controls.rb | 2 +- app/models/solid_queue/semaphore.rb | 6 +++--- lib/active_job/concurrency_controls.rb | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index e22d60c1..8fe3a31a 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -6,7 +6,7 @@ module ConcurrencyControls included do has_one :blocked_execution, dependent: :destroy - delegate :concurrency_limit, :concurrency_limit_duration, to: :job_class + delegate :concurrency_limit, :concurrency_duration, to: :job_class end def unblock_blocked_jobs diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index 7bb3dc05..a4bb9be4 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -6,14 +6,14 @@ class SolidQueue::Semaphore < SolidQueue::Record class << self def wait(job) if semaphore = find_by(key: job.concurrency_key) - semaphore.value > 0 && attempt_decrement(job.concurrency_key, job.concurrency_limit_duration) + semaphore.value > 0 && attempt_decrement(job.concurrency_key, job.concurrency_duration) else - attempt_creation(job.concurrency_key, job.concurrency_limit, job.concurrency_limit_duration) + attempt_creation(job.concurrency_key, job.concurrency_limit, job.concurrency_duration) end end def signal(job) - attempt_increment(job.concurrency_key, job.concurrency_limit, job.concurrency_limit_duration) + attempt_increment(job.concurrency_key, job.concurrency_limit, job.concurrency_duration) end private diff --git a/lib/active_job/concurrency_controls.rb b/lib/active_job/concurrency_controls.rb index bed96af1..895d7cfb 100644 --- a/lib/active_job/concurrency_controls.rb +++ b/lib/active_job/concurrency_controls.rb @@ -10,14 +10,14 @@ module ConcurrencyControls class_attribute :concurrency_key, default: DEFAULT_CONCURRENCY_KEY, instance_accessor: false class_attribute :concurrency_limit, default: 0 # No limit - class_attribute :concurrency_limit_duration, default: SolidQueue.default_concurrency_control_period + class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period end class_methods do def restrict_concurrency_with(limit: 1, key: DEFAULT_CONCURRENCY_KEY, duration: SolidQueue.default_concurrency_control_period) self.concurrency_limit = limit self.concurrency_key = key - self.concurrency_limit_duration = duration + self.concurrency_duration = duration end end From cfaf02f0c29b39c87a23b60044952fc7cbe35221 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 20 Nov 2023 18:08:53 +0100 Subject: [PATCH 20/32] Remove unused scope --- app/models/solid_queue/semaphore.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index a4bb9be4..57b76556 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -1,6 +1,5 @@ class SolidQueue::Semaphore < SolidQueue::Record scope :available, -> { where("value > 0") } - scope :locked, -> { where(value: 0) } scope :expired, -> { where(expires_at: ...Time.current)} class << self From 6e14ea0696a15044e40618894557a480e9e2d1f6 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 20 Nov 2023 18:18:02 +0100 Subject: [PATCH 21/32] Rename concurrency limit spec to `limits_concurrency to: limit ...` Thanks to @jorgemanrubia for the suggestion! --- lib/active_job/concurrency_controls.rb | 4 ++-- test/dummy/app/jobs/sequential_update_result_job.rb | 2 +- test/dummy/app/jobs/throttled_update_result_job.rb | 2 +- test/models/solid_queue/job_test.rb | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/active_job/concurrency_controls.rb b/lib/active_job/concurrency_controls.rb index 895d7cfb..ea3f3ee9 100644 --- a/lib/active_job/concurrency_controls.rb +++ b/lib/active_job/concurrency_controls.rb @@ -14,8 +14,8 @@ module ConcurrencyControls end class_methods do - def restrict_concurrency_with(limit: 1, key: DEFAULT_CONCURRENCY_KEY, duration: SolidQueue.default_concurrency_control_period) - self.concurrency_limit = limit + def limits_concurrency(to: 1, key: DEFAULT_CONCURRENCY_KEY, duration: SolidQueue.default_concurrency_control_period) + self.concurrency_limit = to self.concurrency_key = key self.concurrency_duration = duration end diff --git a/test/dummy/app/jobs/sequential_update_result_job.rb b/test/dummy/app/jobs/sequential_update_result_job.rb index ccfbcf21..a3afa33f 100644 --- a/test/dummy/app/jobs/sequential_update_result_job.rb +++ b/test/dummy/app/jobs/sequential_update_result_job.rb @@ -1,3 +1,3 @@ class SequentialUpdateResultJob < UpdateResultJob - restrict_concurrency_with limit: 1, key: ->(job_result, **) { job_result } + limits_concurrency key: ->(job_result, **) { job_result } end diff --git a/test/dummy/app/jobs/throttled_update_result_job.rb b/test/dummy/app/jobs/throttled_update_result_job.rb index 2be0d96b..c6deb5b4 100644 --- a/test/dummy/app/jobs/throttled_update_result_job.rb +++ b/test/dummy/app/jobs/throttled_update_result_job.rb @@ -1,3 +1,3 @@ class ThrottledUpdateResultJob < UpdateResultJob - restrict_concurrency_with limit: 3, key: ->(job_result, **) { job_result } + limits_concurrency to: 3, key: ->(job_result, **) { job_result } end diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index 34175e30..2ad8ad67 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -2,7 +2,7 @@ class SolidQueue::JobTest < ActiveSupport::TestCase class NonOverlappingJob < ApplicationJob - restrict_concurrency_with limit: 1, key: ->(job_result, **) { job_result } + limits_concurrency key: ->(job_result, **) { job_result } def perform(job_result) end From 7c3007ab9734f02e736cf8fee673dbd136e3b0db Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Wed, 22 Nov 2023 20:30:47 +0100 Subject: [PATCH 22/32] Remove unused method, replaced by a scope --- app/models/solid_queue/ready_execution.rb | 4 ---- 1 file changed, 4 deletions(-) diff --git a/app/models/solid_queue/ready_execution.rb b/app/models/solid_queue/ready_execution.rb index 243c4f7e..7701d815 100644 --- a/app/models/solid_queue/ready_execution.rb +++ b/app/models/solid_queue/ready_execution.rb @@ -14,10 +14,6 @@ def claim(queue_list, limit, process_id) end end - def queued_as(queues) - QueueParser.new(queues, self).scoped_relation - end - private def select_and_lock(queue_relation, process_id, limit) return [] if limit <= 0 From 363ae88bebd1f15b2fe3d87b6214849dd8b8d2c6 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Wed, 22 Nov 2023 20:41:07 +0100 Subject: [PATCH 23/32] Fix index on `blocked_executions` for releasing and add index on semaphores Also, include `job_id` in the ORDER when releasing blocked executions. Thanks to @djmb for the suggestions. --- app/models/solid_queue/blocked_execution.rb | 1 - app/models/solid_queue/execution.rb | 2 ++ app/models/solid_queue/ready_execution.rb | 1 - .../20231103204612_create_solid_queue_concurrency_controls.rb | 4 +++- test/dummy/db/schema.rb | 3 ++- 5 files changed, 7 insertions(+), 4 deletions(-) diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index 40bdfb38..58fcb7a3 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -5,7 +5,6 @@ class BlockedExecution < SolidQueue::Execution has_one :semaphore, foreign_key: :key, primary_key: :concurrency_key scope :releasable, -> { left_outer_joins(:semaphore).merge(Semaphore.available.or(Semaphore.where(id: nil))) } - scope :ordered, -> { order(priority: :asc) } class << self def unblock(count) diff --git a/app/models/solid_queue/execution.rb b/app/models/solid_queue/execution.rb index 366c4e24..76b9a8a9 100644 --- a/app/models/solid_queue/execution.rb +++ b/app/models/solid_queue/execution.rb @@ -4,6 +4,8 @@ class Execution < SolidQueue::Record self.abstract_class = true + scope :ordered, -> { order(priority: :asc, job_id: :asc) } + belongs_to :job alias_method :discard, :destroy diff --git a/app/models/solid_queue/ready_execution.rb b/app/models/solid_queue/ready_execution.rb index 7701d815..6928781b 100644 --- a/app/models/solid_queue/ready_execution.rb +++ b/app/models/solid_queue/ready_execution.rb @@ -1,7 +1,6 @@ module SolidQueue class ReadyExecution < Execution scope :queued_as, ->(queue_name) { where(queue_name: queue_name) } - scope :ordered, -> { order(priority: :asc, job_id: :asc) } assume_attributes_from_job diff --git a/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb b/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb index 4c6a48e1..8b94c08f 100644 --- a/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb +++ b/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb @@ -13,7 +13,7 @@ def change t.datetime :created_at, null: false - t.index [ :priority, :concurrency_key, :queue_name, :job_id ], name: "index_solid_queue_blocked_executions_for_release" + t.index [ :concurrency_key, :priority, :job_id ], name: "index_solid_queue_blocked_executions_for_release" end create_table :solid_queue_semaphores do |t| @@ -22,6 +22,8 @@ def change t.datetime :expires_at, null: false, index: true t.timestamps + + t.index [ :key, :value ] end end end diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index 54252c8d..bf80c155 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -26,7 +26,7 @@ t.string "concurrency_key", null: false t.datetime "created_at", null: false t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true - t.index ["priority", "concurrency_key", "queue_name", "job_id"], name: "index_solid_queue_blocked_executions_for_release" + t.index ["priority", "concurrency_key", "job_id"], name: "index_solid_queue_blocked_executions_for_release" end create_table "solid_queue_claimed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| @@ -102,6 +102,7 @@ t.datetime "created_at", null: false t.datetime "updated_at", null: false t.index ["expires_at"], name: "index_solid_queue_semaphores_on_expires_at" + t.index ["key", "value"], name: "index_solid_queue_semaphores_on_key_and_value" t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true end From fbef301b7d793e9e3802dde029e36e4fcf8038fa Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Wed, 22 Nov 2023 22:15:26 +0100 Subject: [PATCH 24/32] Use intermediate Proxy object for the Semaphore low-level actions Building on a suggestion from @jorgemanrubia. --- app/models/solid_queue/semaphore.rb | 53 ++++++++++++++----- test/integration/concurrency_controls_test.rb | 8 +-- 2 files changed, 45 insertions(+), 16 deletions(-) diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index 57b76556..6b0a1c00 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -4,31 +4,60 @@ class SolidQueue::Semaphore < SolidQueue::Record class << self def wait(job) - if semaphore = find_by(key: job.concurrency_key) - semaphore.value > 0 && attempt_decrement(job.concurrency_key, job.concurrency_duration) + Proxy.new(job, self).wait + end + + def signal(job) + Proxy.new(job, self).signal + end + end + + class Proxy + def initialize(job, proxied_class) + @job = job + @proxied_class = proxied_class + end + + def wait + if semaphore = proxied_class.find_by(key: key) + semaphore.value > 0 && attempt_decrement else - attempt_creation(job.concurrency_key, job.concurrency_limit, job.concurrency_duration) + attempt_creation end end - def signal(job) - attempt_increment(job.concurrency_key, job.concurrency_limit, job.concurrency_duration) + def signal + attempt_increment end private - def attempt_creation(key, limit, duration) - create!(key: key, value: limit - 1, expires_at: duration.from_now) + attr_reader :job, :proxied_class + + def attempt_creation + proxied_class.create!(key: key, value: limit - 1, expires_at: expires_at) true rescue ActiveRecord::RecordNotUnique - attempt_decrement(key, duration) + attempt_decrement + end + + def attempt_decrement + proxied_class.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0 + end + + def attempt_increment + proxied_class.where(key: key, value: ...limit).update_all([ "value = value + 1, expires_at = ?", expires_at ]) > 0 + end + + def key + job.concurrency_key end - def attempt_decrement(key, duration) - available.where(key: key).update_all([ "value = value - 1, expires_at = ?", duration.from_now ]) > 0 + def expires_at + job.concurrency_duration.from_now end - def attempt_increment(key, limit, duration) - where("value < ?", limit).where(key: key).update_all([ "value = value + 1, expires_at = ?", duration.from_now ]) > 0 + def limit + job.concurrency_limit end end end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 10f82b03..651fbf5a 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -86,7 +86,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # Simulate a scenario where we got an available semaphore and some stuck jobs job = SequentialUpdateResultJob.perform_later(@result, name: "A") - wait_for_jobs_to_finish_for(2.seconds) + wait_for_jobs_to_finish_for(3.seconds) assert_no_pending_jobs # Lock the semaphore so we can enqueue jobs and leave them blocked @@ -107,7 +107,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert SolidQueue::Semaphore.signal(job) # And wait for workers to release the jobs - wait_for_jobs_to_finish_for(2.seconds) + wait_for_jobs_to_finish_for(3.seconds) assert_no_pending_jobs # We can't ensure the order between B and C, because it depends on which worker wins when @@ -118,7 +118,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase test "rely on scheduler to unblock blocked executions with an expired semaphore" do # Simulate a scenario where we got an available semaphore and some stuck jobs job = SequentialUpdateResultJob.perform_later(@result, name: "A") - wait_for_jobs_to_finish_for(2.seconds) + wait_for_jobs_to_finish_for(3.seconds) assert_no_pending_jobs # Lock the semaphore so we can enqueue jobs and leave them blocked @@ -137,7 +137,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase SolidQueue::Semaphore.find_by(key: job.concurrency_key).update(expires_at: 1.hour.ago) # And wait for scheduler to release the jobs - wait_for_jobs_to_finish_for(2.seconds) + wait_for_jobs_to_finish_for(3.seconds) assert_no_pending_jobs # We can't ensure the order between B and C, because it depends on which worker wins when From 427482ee4636a19b0948e62a7bcebc051bbc0f3a Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 23 Nov 2023 15:47:01 +0100 Subject: [PATCH 25/32] Add support for concurrency controls across jobs (aka. concurrency "groups") That's it, if we wanted for example, to limit concurrency for a group of jobs together because they act on the same kind of record, and thus, we can't have the job class be part of the concurrency key, as that would keep them all separated. With this change, we have two parts to build the concurrency key: - The concurrency group, which defaults to the job class name - And the concurrency key itself, that defaults to the first argument This also fixes the previous calculation of the key, that was working by chance because it was being executed in the context of the included module, not the job class. --- lib/active_job/concurrency_controls.rb | 27 +++++++++++++++++++++----- test/dummy/db/schema.rb | 2 +- test/models/solid_queue/job_test.rb | 24 ++++++++++++++++++++++- 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/lib/active_job/concurrency_controls.rb b/lib/active_job/concurrency_controls.rb index ea3f3ee9..e17edd13 100644 --- a/lib/active_job/concurrency_controls.rb +++ b/lib/active_job/concurrency_controls.rb @@ -4,32 +4,49 @@ module ActiveJob module ConcurrencyControls extend ActiveSupport::Concern - DEFAULT_CONCURRENCY_KEY = ->(*) { self.name } + DEFAULT_CONCURRENCY_KEY = ->(*args) { args&.first } + DEFAULT_CONCURRENCY_GROUP = ->(*) { self.class.name } included do class_attribute :concurrency_key, default: DEFAULT_CONCURRENCY_KEY, instance_accessor: false + class_attribute :concurrency_group, default: DEFAULT_CONCURRENCY_GROUP, instance_accessor: false class_attribute :concurrency_limit, default: 0 # No limit class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period end class_methods do - def limits_concurrency(to: 1, key: DEFAULT_CONCURRENCY_KEY, duration: SolidQueue.default_concurrency_control_period) + def limits_concurrency(to: 1, key: DEFAULT_CONCURRENCY_KEY, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period) self.concurrency_limit = to self.concurrency_key = key self.concurrency_duration = duration + self.concurrency_group = group end end def concurrency_key - param = self.class.concurrency_key.call(*arguments) + param = compute_concurrency_parameter(self.class.concurrency_key) case param when ActiveRecord::Base - [ self.class.name, param.class.name, param.id ] + [ concurrency_group, param.class.name, param.id ] else - [ self.class.name, param ] + [ concurrency_group, param ] end.compact.join("/") end + + private + def concurrency_group + compute_concurrency_parameter(self.class.concurrency_group) + end + + def compute_concurrency_parameter(option) + case option + when String, Symbol + option.to_s + when Proc + instance_exec(*arguments, &option) + end + end end end diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index bf80c155..77f06a26 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -25,8 +25,8 @@ t.integer "priority", default: 0, null: false t.string "concurrency_key", null: false t.datetime "created_at", null: false + t.index ["concurrency_key", "priority", "job_id"], name: "index_solid_queue_blocked_executions_for_release" t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true - t.index ["priority", "concurrency_key", "job_id"], name: "index_solid_queue_blocked_executions_for_release" end create_table "solid_queue_claimed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index 2ad8ad67..a936a9c7 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -2,12 +2,20 @@ class SolidQueue::JobTest < ActiveSupport::TestCase class NonOverlappingJob < ApplicationJob - limits_concurrency key: ->(job_result, **) { job_result } + limits_concurrency # Using all defaults def perform(job_result) end end + class NonOverlappingGroupedJob1 < NonOverlappingJob + limits_concurrency group: "MyGroup" + end + + class NonOverlappingGroupedJob2 < NonOverlappingJob + limits_concurrency group: "MyGroup" + end + setup do @result = JobResult.create!(queue_name: "default") end @@ -63,6 +71,20 @@ def perform(job_result) assert_equal active_job.concurrency_key, job.concurrency_key end + test "enqueue jobs with concurrency controls in the same concurrency group" do + assert_ready do + active_job = NonOverlappingGroupedJob1.perform_later(@result, name: "A") + assert_equal 1, active_job.concurrency_limit + assert_equal "MyGroup/JobResult/#{@result.id}", active_job.concurrency_key + end + + assert_blocked do + active_job = NonOverlappingGroupedJob2.perform_later(@result, name: "B") + assert_equal 1, active_job.concurrency_limit + assert_equal "MyGroup/JobResult/#{@result.id}", active_job.concurrency_key + end + end + test "block jobs when concurrency limits are reached" do assert_ready do NonOverlappingJob.perform_later(@result, name: "A") From 93eb7f8313312b91c7c63ababba8157109da7fe8 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 23 Nov 2023 15:59:03 +0100 Subject: [PATCH 26/32] Don't have a default key for concurrency controls, force always a choice there This is less surprising and helps making it clear what the key is. --- lib/active_job/concurrency_controls.rb | 9 ++++----- test/models/solid_queue/job_test.rb | 6 +++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/lib/active_job/concurrency_controls.rb b/lib/active_job/concurrency_controls.rb index e17edd13..5caaff0b 100644 --- a/lib/active_job/concurrency_controls.rb +++ b/lib/active_job/concurrency_controls.rb @@ -4,11 +4,10 @@ module ActiveJob module ConcurrencyControls extend ActiveSupport::Concern - DEFAULT_CONCURRENCY_KEY = ->(*args) { args&.first } DEFAULT_CONCURRENCY_GROUP = ->(*) { self.class.name } included do - class_attribute :concurrency_key, default: DEFAULT_CONCURRENCY_KEY, instance_accessor: false + class_attribute :concurrency_key, instance_accessor: false class_attribute :concurrency_group, default: DEFAULT_CONCURRENCY_GROUP, instance_accessor: false class_attribute :concurrency_limit, default: 0 # No limit @@ -16,11 +15,11 @@ module ConcurrencyControls end class_methods do - def limits_concurrency(to: 1, key: DEFAULT_CONCURRENCY_KEY, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period) - self.concurrency_limit = to + def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period) self.concurrency_key = key - self.concurrency_duration = duration + self.concurrency_limit = to self.concurrency_group = group + self.concurrency_duration = duration end end diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index a936a9c7..d73bff41 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -2,18 +2,18 @@ class SolidQueue::JobTest < ActiveSupport::TestCase class NonOverlappingJob < ApplicationJob - limits_concurrency # Using all defaults + limits_concurrency key: ->(job_result, **) { job_result } def perform(job_result) end end class NonOverlappingGroupedJob1 < NonOverlappingJob - limits_concurrency group: "MyGroup" + limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup" end class NonOverlappingGroupedJob2 < NonOverlappingJob - limits_concurrency group: "MyGroup" + limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup" end setup do From ee5fcc68c3c2871dec8b6ec9ecae823d9a3097ed Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 23 Nov 2023 20:10:21 +0100 Subject: [PATCH 27/32] Use a separate config option for concurrency maintenance task interval --- lib/solid_queue/configuration.rb | 3 ++- lib/solid_queue/scheduler.rb | 5 +++-- test/integration/concurrency_controls_test.rb | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index e1a857f2..64f69056 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -10,7 +10,8 @@ class Configuration SCHEDULER_DEFAULTS = { batch_size: 500, - polling_interval: 300 + polling_interval: 300, + concurrency_maintenance_interval: 600 } def initialize(mode: :work, load_from: nil) diff --git a/lib/solid_queue/scheduler.rb b/lib/solid_queue/scheduler.rb index dc6b5f7e..ab11ef09 100644 --- a/lib/solid_queue/scheduler.rb +++ b/lib/solid_queue/scheduler.rb @@ -4,7 +4,7 @@ module SolidQueue class Scheduler include Runner - attr_accessor :batch_size, :polling_interval + attr_accessor :batch_size, :polling_interval, :concurrency_maintenance_interval set_callback :start, :before, :launch_concurrency_maintenance set_callback :shutdown, :before, :stop_concurrency_maintenance @@ -14,6 +14,7 @@ def initialize(**options) @batch_size = options[:batch_size] @polling_interval = options[:polling_interval] + @concurrency_maintenance_interval = options[:concurrency_maintenance_interval] end private @@ -33,7 +34,7 @@ def run end def launch_concurrency_maintenance - @concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: polling_interval) do + @concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: concurrency_maintenance_interval) do expire_semaphores unblock_blocked_executions end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 651fbf5a..2dbbb586 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -10,7 +10,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase @result = JobResult.create!(queue_name: "default", status: "seq: ") default_worker = { queues: "default", polling_interval: 1, processes: 3, threads: 2 } - scheduler = { polling_interval: 1, batch_size: 200 } + scheduler = { polling_interval: 1, batch_size: 200, concurrency_maintenance_interval: 1 } @pid = run_supervisor_as_fork(mode: :all, load_configuration_from: { workers: [ default_worker ], scheduler: scheduler }) From feef30b5b18987718bf092f1b50ecabced4a868e Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 23 Nov 2023 20:24:11 +0100 Subject: [PATCH 28/32] Remove unused option in test helper and unused test helper --- test/integration/processes_lifecycle_test.rb | 8 -------- test/test_helper.rb | 17 +++++------------ 2 files changed, 5 insertions(+), 20 deletions(-) diff --git a/test/integration/processes_lifecycle_test.rb b/test/integration/processes_lifecycle_test.rb index d3f5dbc9..65a2fddb 100644 --- a/test/integration/processes_lifecycle_test.rb +++ b/test/integration/processes_lifecycle_test.rb @@ -181,14 +181,6 @@ class ProcessLifecycleTest < ActiveSupport::TestCase end private - def terminate_registered_processes - skip_active_record_query_cache do - SolidQueue::Process.find_each do |process| - terminate_process(process.metadata["pid"], from_parent: false) - end - end - end - def assert_clean_termination wait_for_registered_processes 0, timeout: 0.2.second assert_no_registered_processes diff --git a/test/test_helper.rb b/test/test_helper.rb index 63470368..61482adf 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -79,22 +79,15 @@ def find_processes_registered_as(kind) end end - def terminate_process(pid, timeout: 10, signal: :TERM, from_parent: true) + def terminate_process(pid, timeout: 10, signal: :TERM) signal_process(pid, signal) - wait_for_process_termination_with_timeout(pid, timeout: timeout, from_parent: from_parent) + wait_for_process_termination_with_timeout(pid, timeout: timeout) end - def wait_for_process_termination_with_timeout(pid, timeout: 10, from_parent: true, exitstatus: 0) + def wait_for_process_termination_with_timeout(pid, timeout: 10, exitstatus: 0) Timeout.timeout(timeout) do - if from_parent - Process.waitpid(pid) - assert exitstatus, $?.exitstatus - else - loop do - break unless process_exists?(pid) - sleep 0.05 - end - end + Process.waitpid(pid) + assert exitstatus, $?.exitstatus end rescue Timeout::Error signal_process(pid, :KILL) From 9d0da034f945a4cf4f8e460a321f2486cfbf06d5 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 23 Nov 2023 20:45:11 +0100 Subject: [PATCH 29/32] Store expires_at time in blocked_executions We'll use this to improve the queries performed by the concurrency maintenance task. --- app/models/solid_queue/blocked_execution.rb | 5 +++++ ...20231103204612_create_solid_queue_concurrency_controls.rb | 2 ++ test/dummy/db/schema.rb | 2 ++ test/models/solid_queue/job_test.rb | 3 +++ 4 files changed, 12 insertions(+) diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index 58fcb7a3..8db7b03a 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -1,6 +1,7 @@ module SolidQueue class BlockedExecution < SolidQueue::Execution assume_attributes_from_job :concurrency_key + before_create :set_expires_at has_one :semaphore, foreign_key: :key, primary_key: :concurrency_key @@ -34,6 +35,10 @@ def release end private + def set_expires_at + self.expires_at = job.concurrency_duration.from_now + end + def acquire_concurrency_lock Semaphore.wait(job) end diff --git a/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb b/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb index 8b94c08f..dec8d5a6 100644 --- a/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb +++ b/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb @@ -12,8 +12,10 @@ def change t.string :concurrency_key, null: false t.datetime :created_at, null: false + t.datetime :expires_at, null: false t.index [ :concurrency_key, :priority, :job_id ], name: "index_solid_queue_blocked_executions_for_release" + t.index [ :expires_at, :concurrency_key ], name: "index_solid_queue_blocked_executions_for_maintenance" end create_table :solid_queue_semaphores do |t| diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index 77f06a26..1ef94a31 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -25,7 +25,9 @@ t.integer "priority", default: 0, null: false t.string "concurrency_key", null: false t.datetime "created_at", null: false + t.datetime "expires_at", null: false t.index ["concurrency_key", "priority", "job_id"], name: "index_solid_queue_blocked_executions_for_release" + t.index ["expires_at", "concurrency_key"], name: "index_solid_queue_blocked_executions_for_maintenance" t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true end diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index d73bff41..de265b44 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -93,6 +93,9 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob assert_blocked do NonOverlappingJob.perform_later(@result, name: "B") end + + blocked_execution = SolidQueue::BlockedExecution.last + assert blocked_execution.expires_at <= SolidQueue.default_concurrency_control_period.from_now end private From 49f7d1352cf6ee456dcd8765525b562439c73e9d Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 23 Nov 2023 21:11:13 +0100 Subject: [PATCH 30/32] Add a few style related fixes across the board --- app/models/solid_queue/blocked_execution.rb | 2 +- app/models/solid_queue/claimed_execution.rb | 2 +- app/models/solid_queue/job/concurrency_controls.rb | 2 +- app/models/solid_queue/scheduled_execution.rb | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index 8db7b03a..dab37a7b 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -9,7 +9,7 @@ class BlockedExecution < SolidQueue::Execution class << self def unblock(count) - release_many releasable.select(:concurrency_key).distinct.limit(count).pluck(:concurrency_key) + release_many releasable.distinct.limit(count).pluck(:concurrency_key) end def release_many(concurrency_keys) diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index d2844ce4..369ed0ee 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -32,7 +32,7 @@ def perform failed_with(result.error) end ensure - job.unblock_blocked_jobs + job.unblock_next_blocked_job end def release diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb index 8fe3a31a..c949f37b 100644 --- a/app/models/solid_queue/job/concurrency_controls.rb +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -9,7 +9,7 @@ module ConcurrencyControls delegate :concurrency_limit, :concurrency_duration, to: :job_class end - def unblock_blocked_jobs + def unblock_next_blocked_job if release_concurrency_lock release_next_blocked_job end diff --git a/app/models/solid_queue/scheduled_execution.rb b/app/models/solid_queue/scheduled_execution.rb index 4f65e9db..acd5966d 100644 --- a/app/models/solid_queue/scheduled_execution.rb +++ b/app/models/solid_queue/scheduled_execution.rb @@ -1,5 +1,5 @@ class SolidQueue::ScheduledExecution < SolidQueue::Execution - scope :due, -> { where("scheduled_at <= ?", Time.current) } + scope :due, -> { where(scheduled_at: ..Time.current) } scope :ordered, -> { order(scheduled_at: :asc, priority: :asc) } scope :next_batch, ->(batch_size) { due.ordered.limit(batch_size) } From 77b67b0501ca5dc71e558a647ebebf4f3834879a Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Sun, 26 Nov 2023 13:35:10 +0100 Subject: [PATCH 31/32] Rely on blocked_executions.expires_at to select candidates to be released Sempahores' expiry times are bumped when new jobs are successfully queued for execution or when jobs finish and release the semaphores. This means that if a blocked execution's expiry time is in the past, the semaphore's expiry time is most likely in the past too. Here's why: we know that if we still have a blocked job execution after its expiry time has passed, it's because: 1. A job holding the semaphore hasn't finished yet, and in that case, the semaphore's expiry time would have expired as well and would be cleared up right before checking blocked jobs. 2. The job holding the semaphore finished and released the semaphore but failed to unblock the next job. In that case, when we inspect the blocked job's concurrency key, we'll see the semaphore released. a. It's possible a new job is enqueued in the meantime and claims the semaphore, so we wouldn't be able to unblock that blocked job. However, if this happens, it's also more likely that this new job will succeed at unblocking it when it is finished. The more jobs that are enqueued and run, bumping the semaphore's expiry time, the more likely we are to unblock the blocked jobs via the normal method. 3. The job holding the semaphore finished but failed to release the semaphore: this case is the same as 1, the semaphore will be cleared before unblocking the execution. We take advantage of this to select X (scheduler's batch size) distinct concurrency keys from expired blocked executions, and for that we can use the index on (expires_at, concurrency_key), that filters the elements, even if we have to scan all of them to find the distcint concurrency keys using a temporary table that would get at most X items long. Then, we'll check whether these (up to) X concurrency keys are releasable and try to release them. A potential problem would be if we happen to select X concurrency keys that are expired but turn out not to be releasable. I think this should be very unlikely because for this to happen, we'd have to failed to unblock X jobs via the regular method and that other jobs using the same concurrency keys were enqueued, claiming the semaphore (case 2.a. described above), before we had the chance to unblock them, for all of them. We'd need two exceptional things to happen at once: a large backlog of concurrent jobs (using different keys) and a large amount of failed unblocked jobs. Thanks to @djmb for thinking through this with me and all the help! --- app/models/solid_queue/blocked_execution.rb | 14 ++++++++++++-- app/models/solid_queue/semaphore.rb | 2 +- lib/solid_queue.rb | 2 +- .../dummy/app/jobs/sequential_update_result_job.rb | 2 +- test/integration/concurrency_controls_test.rb | 9 +++------ 5 files changed, 18 insertions(+), 11 deletions(-) diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index dab37a7b..bf77be4e 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -5,11 +5,13 @@ class BlockedExecution < SolidQueue::Execution has_one :semaphore, foreign_key: :key, primary_key: :concurrency_key - scope :releasable, -> { left_outer_joins(:semaphore).merge(Semaphore.available.or(Semaphore.where(id: nil))) } + scope :expired, -> { where(expires_at: ...Time.current) } class << self def unblock(count) - release_many releasable.distinct.limit(count).pluck(:concurrency_key) + expired.distinct.limit(count).pluck(:concurrency_key).then do |concurrency_keys| + release_many releasable(concurrency_keys) + end end def release_many(concurrency_keys) @@ -21,6 +23,14 @@ def release_many(concurrency_keys) def release_one(concurrency_key) ordered.where(concurrency_key: concurrency_key).limit(1).lock("FOR UPDATE SKIP LOCKED").each(&:release) end + + private + def releasable(concurrency_keys) + semaphores = Semaphore.where(key: concurrency_keys).pluck(:key, :value).index_by(&:key) + + # Concurrency keys without semaphore + concurrency keys with open semaphore + (concurrency_keys - semaphores.keys) | semaphores.select { |key, value| value > 0 }.map(&:first) + end end def release diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index 6b0a1c00..97d178c2 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -1,6 +1,6 @@ class SolidQueue::Semaphore < SolidQueue::Record scope :available, -> { where("value > 0") } - scope :expired, -> { where(expires_at: ...Time.current)} + scope :expired, -> { where(expires_at: ...Time.current) } class << self def wait(job) diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index 45855449..46ff42e6 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -34,7 +34,7 @@ module SolidQueue mattr_accessor :supervisor, default: false mattr_accessor :delete_finished_jobs, default: true - mattr_accessor :default_concurrency_control_period, default: 15.minutes + mattr_accessor :default_concurrency_control_period, default: 3.minutes def self.supervisor? supervisor diff --git a/test/dummy/app/jobs/sequential_update_result_job.rb b/test/dummy/app/jobs/sequential_update_result_job.rb index a3afa33f..8cf91a9b 100644 --- a/test/dummy/app/jobs/sequential_update_result_job.rb +++ b/test/dummy/app/jobs/sequential_update_result_job.rb @@ -1,3 +1,3 @@ class SequentialUpdateResultJob < UpdateResultJob - limits_concurrency key: ->(job_result, **) { job_result } + limits_concurrency key: ->(job_result, **) { job_result }, duration: 2.seconds end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 2dbbb586..0bbcc167 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -106,8 +106,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # the semaphore but hadn't unblocked any jobs assert SolidQueue::Semaphore.signal(job) - # And wait for workers to release the jobs - wait_for_jobs_to_finish_for(3.seconds) + # And wait for the scheduler to release the jobs + wait_for_jobs_to_finish_for(5.seconds) assert_no_pending_jobs # We can't ensure the order between B and C, because it depends on which worker wins when @@ -133,11 +133,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end end - # Simulate semaphore expiration - SolidQueue::Semaphore.find_by(key: job.concurrency_key).update(expires_at: 1.hour.ago) - # And wait for scheduler to release the jobs - wait_for_jobs_to_finish_for(3.seconds) + wait_for_jobs_to_finish_for(5.seconds) assert_no_pending_jobs # We can't ensure the order between B and C, because it depends on which worker wins when From 8a5de868cbb13d0fc951a2858b9b5d2ccabac1f2 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 27 Nov 2023 14:24:39 +0100 Subject: [PATCH 32/32] Fix bug when selecting releasable executions and make sure these are reported Via SolidQueue.on_thread_error, in test environment. --- app/models/solid_queue/blocked_execution.rb | 4 ++-- lib/solid_queue/scheduler.rb | 4 ++++ .../app/jobs/sequential_update_result_job.rb | 2 +- test/dummy/config/application.rb | 2 -- test/dummy/config/environments/development.rb | 2 ++ test/dummy/config/environments/test.rb | 4 ++++ test/integration/concurrency_controls_test.rb | 16 ++++++++++------ 7 files changed, 23 insertions(+), 11 deletions(-) diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb index bf77be4e..b469f685 100644 --- a/app/models/solid_queue/blocked_execution.rb +++ b/app/models/solid_queue/blocked_execution.rb @@ -26,10 +26,10 @@ def release_one(concurrency_key) private def releasable(concurrency_keys) - semaphores = Semaphore.where(key: concurrency_keys).pluck(:key, :value).index_by(&:key) + semaphores = Semaphore.where(key: concurrency_keys).select(:key, :value).index_by(&:key) # Concurrency keys without semaphore + concurrency keys with open semaphore - (concurrency_keys - semaphores.keys) | semaphores.select { |key, value| value > 0 }.map(&:first) + (concurrency_keys - semaphores.keys) | semaphores.select { |key, semaphore| semaphore.value > 0 }.map(&:first) end end diff --git a/lib/solid_queue/scheduler.rb b/lib/solid_queue/scheduler.rb index ab11ef09..22ac9f2f 100644 --- a/lib/solid_queue/scheduler.rb +++ b/lib/solid_queue/scheduler.rb @@ -39,6 +39,10 @@ def launch_concurrency_maintenance unblock_blocked_executions end + @concurrency_maintenance_task.add_observer do |_, _, error| + handle_thread_error(error) if error + end + @concurrency_maintenance_task.execute end diff --git a/test/dummy/app/jobs/sequential_update_result_job.rb b/test/dummy/app/jobs/sequential_update_result_job.rb index 8cf91a9b..a3afa33f 100644 --- a/test/dummy/app/jobs/sequential_update_result_job.rb +++ b/test/dummy/app/jobs/sequential_update_result_job.rb @@ -1,3 +1,3 @@ class SequentialUpdateResultJob < UpdateResultJob - limits_concurrency key: ->(job_result, **) { job_result }, duration: 2.seconds + limits_concurrency key: ->(job_result, **) { job_result } end diff --git a/test/dummy/config/application.rb b/test/dummy/config/application.rb index b58b0e4d..36568a49 100644 --- a/test/dummy/config/application.rb +++ b/test/dummy/config/application.rb @@ -28,8 +28,6 @@ class Application < Rails::Application # config.eager_load_paths << Rails.root.join("extras") config.active_job.queue_adapter = :solid_queue - - config.solid_queue.logger = ActiveSupport::Logger.new(nil) config.solid_queue.delete_finished_jobs = false end end diff --git a/test/dummy/config/environments/development.rb b/test/dummy/config/environments/development.rb index f811f685..e3714846 100644 --- a/test/dummy/config/environments/development.rb +++ b/test/dummy/config/environments/development.rb @@ -56,4 +56,6 @@ # Uncomment if you wish to allow Action Cable access from any origin. # config.action_cable.disable_request_forgery_protection = true + + config.solid_queue.logger = ActiveSupport::Logger.new(nil) end diff --git a/test/dummy/config/environments/test.rb b/test/dummy/config/environments/test.rb index eb2f1716..1fc968f6 100644 --- a/test/dummy/config/environments/test.rb +++ b/test/dummy/config/environments/test.rb @@ -47,4 +47,8 @@ # Annotate rendered view with file names. # config.action_view.annotate_rendered_view_with_filenames = true + + logger = ActiveSupport::Logger.new(STDOUT) + config.solid_queue.on_thread_error = ->(exception) { logger.error("#{exception.class.name}: #{exception.message}\n#{exception.backtrace.join("\n")}") } + config.solid_queue.logger = ActiveSupport::Logger.new(nil) end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 0bbcc167..6cf9d6d9 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -94,20 +94,20 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert SolidQueue::Semaphore.wait(job) end - # Now enqueue more jobs under that same key. They'll be all locked. Use priorities - # to ensure order. + # Now enqueue more jobs under that same key. They'll be all locked assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do ("B".."K").each do |name| SequentialUpdateResultJob.perform_later(@result, name: name) end end - # Then unlock the semaphore: this would be as if the first job had released - # the semaphore but hadn't unblocked any jobs + # Then unlock the semaphore and expire the jobs: this would be as if the first job had + # released the semaphore but hadn't unblocked any jobs + SolidQueue::BlockedExecution.update_all(expires_at: 15.minutes.ago) assert SolidQueue::Semaphore.signal(job) # And wait for the scheduler to release the jobs - wait_for_jobs_to_finish_for(5.seconds) + wait_for_jobs_to_finish_for(3.seconds) assert_no_pending_jobs # We can't ensure the order between B and C, because it depends on which worker wins when @@ -133,8 +133,12 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end end + # Simulate expiration of semaphore and executions + SolidQueue::Semaphore.where(key: job.concurrency_key).update_all(expires_at: 15.minutes.ago) + SolidQueue::BlockedExecution.update_all(expires_at: 15.minutes.ago) + # And wait for scheduler to release the jobs - wait_for_jobs_to_finish_for(5.seconds) + wait_for_jobs_to_finish_for(3.seconds) assert_no_pending_jobs # We can't ensure the order between B and C, because it depends on which worker wins when