diff --git a/app/models/solid_queue/blocked_execution.rb b/app/models/solid_queue/blocked_execution.rb new file mode 100644 index 00000000..b469f685 --- /dev/null +++ b/app/models/solid_queue/blocked_execution.rb @@ -0,0 +1,60 @@ +module SolidQueue + class BlockedExecution < SolidQueue::Execution + assume_attributes_from_job :concurrency_key + before_create :set_expires_at + + has_one :semaphore, foreign_key: :key, primary_key: :concurrency_key + + scope :expired, -> { where(expires_at: ...Time.current) } + + class << self + def unblock(count) + expired.distinct.limit(count).pluck(:concurrency_key).then do |concurrency_keys| + release_many releasable(concurrency_keys) + end + end + + def release_many(concurrency_keys) + # We want to release exactly one blocked execution for each concurrency key, and we need to do it + # one by one, locking each record and acquiring the semaphore individually for each of them: + Array(concurrency_keys).each { |concurrency_key| release_one(concurrency_key) } + end + + def release_one(concurrency_key) + ordered.where(concurrency_key: concurrency_key).limit(1).lock("FOR UPDATE SKIP LOCKED").each(&:release) + end + + private + def releasable(concurrency_keys) + semaphores = Semaphore.where(key: concurrency_keys).select(:key, :value).index_by(&:key) + + # Concurrency keys without semaphore + concurrency keys with open semaphore + (concurrency_keys - semaphores.keys) | semaphores.select { |key, semaphore| semaphore.value > 0 }.map(&:first) + end + end + + def release + transaction do + if acquire_concurrency_lock + promote_to_ready + destroy! + + SolidQueue.logger.info("[SolidQueue] Unblocked job #{job.id} under #{concurrency_key}") + end + end + end + + private + def set_expires_at + self.expires_at = job.concurrency_duration.from_now + end + + def acquire_concurrency_lock + Semaphore.wait(job) + end + + def promote_to_ready + ReadyExecution.create!(ready_attributes) + end + end +end diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index 63d62571..369ed0ee 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -31,6 +31,8 @@ def perform else failed_with(result.error) end + ensure + job.unblock_next_blocked_job end def release diff --git a/app/models/solid_queue/execution.rb b/app/models/solid_queue/execution.rb index e4925bc3..76b9a8a9 100644 --- a/app/models/solid_queue/execution.rb +++ b/app/models/solid_queue/execution.rb @@ -1,9 +1,17 @@ -class SolidQueue::Execution < SolidQueue::Record - include JobAttributes +module SolidQueue + class Execution < SolidQueue::Record + include JobAttributes - self.abstract_class = true + self.abstract_class = true - belongs_to :job + scope :ordered, -> { order(priority: :asc, job_id: :asc) } - alias_method :discard, :destroy + belongs_to :job + + alias_method :discard, :destroy + + def ready_attributes + attributes.slice("job_id", "queue_name", "priority") + end + end end diff --git a/app/models/solid_queue/job.rb b/app/models/solid_queue/job.rb index e1d889c5..0cf0e798 100644 --- a/app/models/solid_queue/job.rb +++ b/app/models/solid_queue/job.rb @@ -18,7 +18,8 @@ def enqueue_active_job(active_job, scheduled_at: Time.current) priority: active_job.priority, scheduled_at: scheduled_at, class_name: active_job.class.name, - arguments: active_job.serialize + arguments: active_job.serialize, + concurrency_key: active_job.try(:concurrency_key) end def enqueue(**kwargs) diff --git a/app/models/solid_queue/job/concurrency_controls.rb b/app/models/solid_queue/job/concurrency_controls.rb new file mode 100644 index 00000000..c949f37b --- /dev/null +++ b/app/models/solid_queue/job/concurrency_controls.rb @@ -0,0 +1,48 @@ +module SolidQueue + class Job + module ConcurrencyControls + extend ActiveSupport::Concern + + included do + has_one :blocked_execution, dependent: :destroy + + delegate :concurrency_limit, :concurrency_duration, to: :job_class + end + + def unblock_next_blocked_job + if release_concurrency_lock + release_next_blocked_job + end + end + + private + def acquire_concurrency_lock + return true unless concurrency_limited? + + Semaphore.wait(self) + end + + def release_concurrency_lock + return false unless concurrency_limited? + + Semaphore.signal(self) + end + + def block + BlockedExecution.create_or_find_by!(job_id: id) + end + + def release_next_blocked_job + BlockedExecution.release_one(concurrency_key) + end + + def concurrency_limited? + concurrency_key.present? && concurrency_limit.to_i > 0 + end + + def job_class + @job_class ||= class_name.safe_constantize + end + end + end +end diff --git a/app/models/solid_queue/job/executable.rb b/app/models/solid_queue/job/executable.rb index 27f17463..de4552cf 100644 --- a/app/models/solid_queue/job/executable.rb +++ b/app/models/solid_queue/job/executable.rb @@ -1,64 +1,85 @@ module SolidQueue - module Job::Executable - extend ActiveSupport::Concern + class Job + module Executable + extend ActiveSupport::Concern - included do - has_one :ready_execution, dependent: :destroy - has_one :claimed_execution, dependent: :destroy - has_one :failed_execution, dependent: :destroy + included do + include ConcurrencyControls - has_one :scheduled_execution, dependent: :destroy + has_one :ready_execution, dependent: :destroy + has_one :claimed_execution, dependent: :destroy + has_one :failed_execution, dependent: :destroy - after_create :prepare_for_execution + has_one :scheduled_execution, dependent: :destroy - scope :finished, -> { where.not(finished_at: nil) } - end - - STATUSES = %w[ ready claimed failed scheduled ] + after_create :prepare_for_execution - STATUSES.each do |status| - define_method("#{status}?") { public_send("#{status}_execution").present? } - end + scope :finished, -> { where.not(finished_at: nil) } + end - def prepare_for_execution - if due? - ReadyExecution.create_or_find_by!(job_id: id) - else - ScheduledExecution.create_or_find_by!(job_id: id) + %w[ ready claimed failed scheduled ].each do |status| + define_method("#{status}?") { public_send("#{status}_execution").present? } end - end - def finished! - if delete_finished_jobs? - destroy! - else - touch(:finished_at) + def prepare_for_execution + if due? then dispatch + else + schedule + end end - end - def finished? - finished_at.present? - end + def finished! + if delete_finished_jobs? + destroy! + else + touch(:finished_at) + end + end - def failed_with(exception) - FailedExecution.create_or_find_by!(job_id: id, exception: exception) - end + def finished? + finished_at.present? + end - def discard - destroy unless claimed? - end + def failed_with(exception) + FailedExecution.create_or_find_by!(job_id: id, exception: exception) + end - def retry - failed_execution&.retry - end + def discard + destroy unless claimed? + end - private - def due? - scheduled_at.nil? || scheduled_at <= Time.current + def retry + failed_execution&.retry end - def delete_finished_jobs? - SolidQueue.delete_finished_jobs + def failed_with(exception) + FailedExecution.create_or_find_by!(job_id: id, exception: exception) end + + private + def due? + scheduled_at.nil? || scheduled_at <= Time.current + end + + def dispatch + if acquire_concurrency_lock then ready + else + block + end + end + + def schedule + ScheduledExecution.create_or_find_by!(job_id: id) + end + + def ready + ReadyExecution.create_or_find_by!(job_id: id) + end + + + def delete_finished_jobs? + SolidQueue.delete_finished_jobs + end + end end end diff --git a/app/models/solid_queue/ready_execution.rb b/app/models/solid_queue/ready_execution.rb index 7701d815..6928781b 100644 --- a/app/models/solid_queue/ready_execution.rb +++ b/app/models/solid_queue/ready_execution.rb @@ -1,7 +1,6 @@ module SolidQueue class ReadyExecution < Execution scope :queued_as, ->(queue_name) { where(queue_name: queue_name) } - scope :ordered, -> { order(priority: :asc, job_id: :asc) } assume_attributes_from_job diff --git a/app/models/solid_queue/scheduled_execution.rb b/app/models/solid_queue/scheduled_execution.rb index 2045d1e7..acd5966d 100644 --- a/app/models/solid_queue/scheduled_execution.rb +++ b/app/models/solid_queue/scheduled_execution.rb @@ -1,5 +1,5 @@ class SolidQueue::ScheduledExecution < SolidQueue::Execution - scope :due, -> { where("scheduled_at <= ?", Time.current) } + scope :due, -> { where(scheduled_at: ..Time.current) } scope :ordered, -> { order(scheduled_at: :asc, priority: :asc) } scope :next_batch, ->(batch_size) { due.ordered.limit(batch_size) } @@ -10,7 +10,7 @@ def prepare_batch(batch) prepared_at = Time.current rows = batch.map do |scheduled_execution| - scheduled_execution.execution_ready_attributes.merge(created_at: prepared_at) + scheduled_execution.ready_attributes.merge(created_at: prepared_at) end if rows.any? @@ -23,8 +23,4 @@ def prepare_batch(batch) SolidQueue.logger.info("[SolidQueue] Prepared scheduled batch with #{rows.size} jobs at #{prepared_at}") end end - - def execution_ready_attributes - attributes.slice("job_id", "queue_name", "priority") - end end diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb new file mode 100644 index 00000000..97d178c2 --- /dev/null +++ b/app/models/solid_queue/semaphore.rb @@ -0,0 +1,63 @@ +class SolidQueue::Semaphore < SolidQueue::Record + scope :available, -> { where("value > 0") } + scope :expired, -> { where(expires_at: ...Time.current) } + + class << self + def wait(job) + Proxy.new(job, self).wait + end + + def signal(job) + Proxy.new(job, self).signal + end + end + + class Proxy + def initialize(job, proxied_class) + @job = job + @proxied_class = proxied_class + end + + def wait + if semaphore = proxied_class.find_by(key: key) + semaphore.value > 0 && attempt_decrement + else + attempt_creation + end + end + + def signal + attempt_increment + end + + private + attr_reader :job, :proxied_class + + def attempt_creation + proxied_class.create!(key: key, value: limit - 1, expires_at: expires_at) + true + rescue ActiveRecord::RecordNotUnique + attempt_decrement + end + + def attempt_decrement + proxied_class.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0 + end + + def attempt_increment + proxied_class.where(key: key, value: ...limit).update_all([ "value = value + 1, expires_at = ?", expires_at ]) > 0 + end + + def key + job.concurrency_key + end + + def expires_at + job.concurrency_duration.from_now + end + + def limit + job.concurrency_limit + end + end +end diff --git a/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb b/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb new file mode 100644 index 00000000..dec8d5a6 --- /dev/null +++ b/db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb @@ -0,0 +1,31 @@ +class CreateSolidQueueConcurrencyControls < ActiveRecord::Migration[7.1] + def change + change_table :solid_queue_jobs do |t| + t.string :concurrency_key + end + + create_table :solid_queue_blocked_executions do |t| + t.references :job, index: { unique: true } + t.string :queue_name, null: false + t.integer :priority, default: 0, null: false + + t.string :concurrency_key, null: false + + t.datetime :created_at, null: false + t.datetime :expires_at, null: false + + t.index [ :concurrency_key, :priority, :job_id ], name: "index_solid_queue_blocked_executions_for_release" + t.index [ :expires_at, :concurrency_key ], name: "index_solid_queue_blocked_executions_for_maintenance" + end + + create_table :solid_queue_semaphores do |t| + t.string :key, null: false, index: { unique: true } + t.integer :value, null: false, default: 1 + t.datetime :expires_at, null: false, index: true + + t.timestamps + + t.index [ :key, :value ] + end + end +end diff --git a/lib/active_job/concurrency_controls.rb b/lib/active_job/concurrency_controls.rb new file mode 100644 index 00000000..5caaff0b --- /dev/null +++ b/lib/active_job/concurrency_controls.rb @@ -0,0 +1,51 @@ +# frozen_string_literal: true + +module ActiveJob + module ConcurrencyControls + extend ActiveSupport::Concern + + DEFAULT_CONCURRENCY_GROUP = ->(*) { self.class.name } + + included do + class_attribute :concurrency_key, instance_accessor: false + class_attribute :concurrency_group, default: DEFAULT_CONCURRENCY_GROUP, instance_accessor: false + + class_attribute :concurrency_limit, default: 0 # No limit + class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period + end + + class_methods do + def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period) + self.concurrency_key = key + self.concurrency_limit = to + self.concurrency_group = group + self.concurrency_duration = duration + end + end + + def concurrency_key + param = compute_concurrency_parameter(self.class.concurrency_key) + + case param + when ActiveRecord::Base + [ concurrency_group, param.class.name, param.id ] + else + [ concurrency_group, param ] + end.compact.join("/") + end + + private + def concurrency_group + compute_concurrency_parameter(self.class.concurrency_group) + end + + def compute_concurrency_parameter(option) + case option + when String, Symbol + option.to_s + when Proc + instance_exec(*arguments, &option) + end + end + end +end diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index eb36e274..46ff42e6 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -2,6 +2,7 @@ require "solid_queue/engine" require "active_job/queue_adapters/solid_queue_adapter" +require "active_job/concurrency_controls" require "solid_queue/app_executor" require "solid_queue/interruptible" @@ -33,6 +34,7 @@ module SolidQueue mattr_accessor :supervisor, default: false mattr_accessor :delete_finished_jobs, default: true + mattr_accessor :default_concurrency_control_period, default: 3.minutes def self.supervisor? supervisor diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index e1a857f2..64f69056 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -10,7 +10,8 @@ class Configuration SCHEDULER_DEFAULTS = { batch_size: 500, - polling_interval: 300 + polling_interval: 300, + concurrency_maintenance_interval: 600 } def initialize(mode: :work, load_from: nil) diff --git a/lib/solid_queue/engine.rb b/lib/solid_queue/engine.rb index 9ba7f37a..72d7889e 100644 --- a/lib/solid_queue/engine.rb +++ b/lib/solid_queue/engine.rb @@ -27,5 +27,11 @@ class Engine < ::Rails::Engine self.logger = app.logger end end + + initializer "solid_queue.active_job.extensions" do + ActiveSupport.on_load :active_job do + include ActiveJob::ConcurrencyControls + end + end end end diff --git a/lib/solid_queue/process_registration.rb b/lib/solid_queue/process_registration.rb index e0e9f076..2e7fe0ad 100644 --- a/lib/solid_queue/process_registration.rb +++ b/lib/solid_queue/process_registration.rb @@ -9,7 +9,7 @@ module ProcessRegistration define_callbacks :start, :run, :shutdown set_callback :start, :before, :register - set_callback :start, :before, :start_heartbeat + set_callback :start, :before, :launch_heartbeat set_callback :run, :after, -> { stop unless registered? } @@ -43,7 +43,7 @@ def registered? process.persisted? end - def start_heartbeat + def launch_heartbeat @heartbeat_task = Concurrent::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) { heartbeat } @heartbeat_task.execute end diff --git a/lib/solid_queue/scheduler.rb b/lib/solid_queue/scheduler.rb index e122e3fb..22ac9f2f 100644 --- a/lib/solid_queue/scheduler.rb +++ b/lib/solid_queue/scheduler.rb @@ -1,34 +1,65 @@ # frozen_string_literal: true -class SolidQueue::Scheduler - include SolidQueue::Runner +module SolidQueue + class Scheduler + include Runner - attr_accessor :batch_size, :polling_interval + attr_accessor :batch_size, :polling_interval, :concurrency_maintenance_interval - def initialize(**options) - options = options.dup.with_defaults(SolidQueue::Configuration::SCHEDULER_DEFAULTS) + set_callback :start, :before, :launch_concurrency_maintenance + set_callback :shutdown, :before, :stop_concurrency_maintenance - @batch_size = options[:batch_size] - @polling_interval = options[:polling_interval] - end + def initialize(**options) + options = options.dup.with_defaults(SolidQueue::Configuration::SCHEDULER_DEFAULTS) + + @batch_size = options[:batch_size] + @polling_interval = options[:polling_interval] + @concurrency_maintenance_interval = options[:concurrency_maintenance_interval] + end - private - def run - with_polling_volume do - batch = SolidQueue::ScheduledExecution.next_batch(batch_size) + private + def run + with_polling_volume do + batch = SolidQueue::ScheduledExecution.next_batch(batch_size).tap(&:load) - if batch.size > 0 - procline "preparing #{batch.size} jobs for execution" + if batch.size > 0 + procline "preparing #{batch.size} jobs for execution" - SolidQueue::ScheduledExecution.prepare_batch(batch) - else - procline "waiting" - interruptible_sleep(polling_interval) + SolidQueue::ScheduledExecution.prepare_batch(batch) + else + procline "waiting" + interruptible_sleep(polling_interval) + end end end - end - def metadata - super.merge(batch_size: batch_size, polling_interval: polling_interval) - end + def launch_concurrency_maintenance + @concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: concurrency_maintenance_interval) do + expire_semaphores + unblock_blocked_executions + end + + @concurrency_maintenance_task.add_observer do |_, _, error| + handle_thread_error(error) if error + end + + @concurrency_maintenance_task.execute + end + + def stop_concurrency_maintenance + @concurrency_maintenance_task.shutdown + end + + def expire_semaphores + Semaphore.expired.in_batches(of: batch_size, &:delete_all) + end + + def unblock_blocked_executions + BlockedExecution.unblock(batch_size) + end + + def metadata + super.merge(batch_size: batch_size, polling_interval: polling_interval) + end + end end diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index 9ed69931..69b4fc55 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -16,14 +16,12 @@ def initialize(**options) private def run - claimed_executions = with_polling_volume do - SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process.id) - end + polled_executions = poll - if claimed_executions.size > 0 - procline "performing #{claimed_executions.count} jobs" + if polled_executions.size > 0 + procline "performing #{polled_executions.count} jobs" - claimed_executions.each do |execution| + polled_executions.each do |execution| pool.post(execution) end else @@ -32,6 +30,12 @@ def run end end + def poll + with_polling_volume do + SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process.id) + end + end + def shutdown super diff --git a/test/dummy/app/jobs/sequential_update_result_job.rb b/test/dummy/app/jobs/sequential_update_result_job.rb new file mode 100644 index 00000000..a3afa33f --- /dev/null +++ b/test/dummy/app/jobs/sequential_update_result_job.rb @@ -0,0 +1,3 @@ +class SequentialUpdateResultJob < UpdateResultJob + limits_concurrency key: ->(job_result, **) { job_result } +end diff --git a/test/dummy/app/jobs/throttled_update_result_job.rb b/test/dummy/app/jobs/throttled_update_result_job.rb new file mode 100644 index 00000000..c6deb5b4 --- /dev/null +++ b/test/dummy/app/jobs/throttled_update_result_job.rb @@ -0,0 +1,3 @@ +class ThrottledUpdateResultJob < UpdateResultJob + limits_concurrency to: 3, key: ->(job_result, **) { job_result } +end diff --git a/test/dummy/app/jobs/update_result_job.rb b/test/dummy/app/jobs/update_result_job.rb new file mode 100644 index 00000000..04571eb6 --- /dev/null +++ b/test/dummy/app/jobs/update_result_job.rb @@ -0,0 +1,11 @@ +class UpdateResultJob < ApplicationJob + def perform(job_result, name:, pause: nil, exception: nil) + job_result.status += "s#{name}" + + sleep(pause) if pause + raise exception.new if exception + + job_result.status += "c#{name}" + job_result.save! + end +end diff --git a/test/dummy/config/application.rb b/test/dummy/config/application.rb index b58b0e4d..36568a49 100644 --- a/test/dummy/config/application.rb +++ b/test/dummy/config/application.rb @@ -28,8 +28,6 @@ class Application < Rails::Application # config.eager_load_paths << Rails.root.join("extras") config.active_job.queue_adapter = :solid_queue - - config.solid_queue.logger = ActiveSupport::Logger.new(nil) config.solid_queue.delete_finished_jobs = false end end diff --git a/test/dummy/config/environments/development.rb b/test/dummy/config/environments/development.rb index f811f685..e3714846 100644 --- a/test/dummy/config/environments/development.rb +++ b/test/dummy/config/environments/development.rb @@ -56,4 +56,6 @@ # Uncomment if you wish to allow Action Cable access from any origin. # config.action_cable.disable_request_forgery_protection = true + + config.solid_queue.logger = ActiveSupport::Logger.new(nil) end diff --git a/test/dummy/config/environments/test.rb b/test/dummy/config/environments/test.rb index eb2f1716..1fc968f6 100644 --- a/test/dummy/config/environments/test.rb +++ b/test/dummy/config/environments/test.rb @@ -47,4 +47,8 @@ # Annotate rendered view with file names. # config.action_view.annotate_rendered_view_with_filenames = true + + logger = ActiveSupport::Logger.new(STDOUT) + config.solid_queue.on_thread_error = ->(exception) { logger.error("#{exception.class.name}: #{exception.message}\n#{exception.backtrace.join("\n")}") } + config.solid_queue.logger = ActiveSupport::Logger.new(nil) end diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index eb2d08d7..1ef94a31 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -19,6 +19,18 @@ t.datetime "updated_at", null: false end + create_table "solid_queue_blocked_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.bigint "job_id" + t.string "queue_name", null: false + t.integer "priority", default: 0, null: false + t.string "concurrency_key", null: false + t.datetime "created_at", null: false + t.datetime "expires_at", null: false + t.index ["concurrency_key", "priority", "job_id"], name: "index_solid_queue_blocked_executions_for_release" + t.index ["expires_at", "concurrency_key"], name: "index_solid_queue_blocked_executions_for_maintenance" + t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true + end + create_table "solid_queue_claimed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.bigint "job_id", null: false t.bigint "process_id" @@ -44,6 +56,7 @@ t.datetime "finished_at" t.datetime "created_at", null: false t.datetime "updated_at", null: false + t.string "concurrency_key" t.index ["active_job_id"], name: "index_solid_queue_jobs_on_job_id" t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name" t.index ["queue_name", "scheduled_at", "finished_at"], name: "index_solid_queue_jobs_for_alerting" @@ -84,4 +97,15 @@ t.index ["scheduled_at", "priority"], name: "index_solid_queue_scheduled_executions" end + create_table "solid_queue_semaphores", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "key", null: false + t.integer "value", default: 1, null: false + t.datetime "expires_at", null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["expires_at"], name: "index_solid_queue_semaphores_on_expires_at" + t.index ["key", "value"], name: "index_solid_queue_semaphores_on_key_and_value" + t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true + end + end diff --git a/test/fixtures/solid_queue/blocked_executions.yml b/test/fixtures/solid_queue/blocked_executions.yml new file mode 100644 index 00000000..e69de29b diff --git a/test/fixtures/solid_queue/semaphores.yml b/test/fixtures/solid_queue/semaphores.yml new file mode 100644 index 00000000..e69de29b diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb new file mode 100644 index 00000000..6cf9d6d9 --- /dev/null +++ b/test/integration/concurrency_controls_test.rb @@ -0,0 +1,156 @@ +# frozen_string_literal: true +require "test_helper" + +class ConcurrencyControlsTest < ActiveSupport::TestCase + self.use_transactional_tests = false + + setup do + SolidQueue::Job.delete_all + + @result = JobResult.create!(queue_name: "default", status: "seq: ") + + default_worker = { queues: "default", polling_interval: 1, processes: 3, threads: 2 } + scheduler = { polling_interval: 1, batch_size: 200, concurrency_maintenance_interval: 1 } + + @pid = run_supervisor_as_fork(mode: :all, load_configuration_from: { workers: [ default_worker ], scheduler: scheduler }) + + wait_for_registered_processes(4, timeout: 0.2.second) # 3 workers working the default queue + supervisor + end + + teardown do + terminate_process(@pid) if process_exists?(@pid) + end + + test "run several conflicting jobs over the same record sequentially" do + ("A".."F").each do |name| + SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds) + end + + ("G".."K").each do |name| + SequentialUpdateResultJob.perform_later(@result, name: name) + end + + wait_for_jobs_to_finish_for(3.seconds) + assert_no_pending_jobs + + assert_stored_sequence @result, ("A".."K").to_a + end + + test "run several jobs over the same record limiting concurrency" do + incr = 0 + # C is the last one to update the record + # A: 0 to 0.5 + # B: 0 to 1.0 + # C: 0 to 1.5 + assert_no_difference -> { SolidQueue::BlockedExecution.count } do + ("A".."C").each do |name| + ThrottledUpdateResultJob.perform_later(@result, name: name, pause: (0.5 + incr).seconds) + incr += 0.5 + end + end + + sleep(0.01) # To ensure these aren't picked up before ABC + # D to H: 0.51 to 0.76 (starting after A finishes, and in order, 5 * 0.05 = 0.25) + # These would finish all before B and C + assert_difference -> { SolidQueue::BlockedExecution.count }, +5 do + ("D".."H").each do |name| + ThrottledUpdateResultJob.perform_later(@result, name: name, pause: 0.05.seconds) + end + end + + wait_for_jobs_to_finish_for(3.seconds) + assert_no_pending_jobs + + # C would have started in the beginning, seeing the status empty, and would finish after + # all other jobs, so it'll do the last update with only itself + assert_stored_sequence(@result, [ "C" ]) + end + + test "run several jobs over the same record sequentially, with some of them failing" do + ("A".."F").each_with_index do |name, i| + # A, C, E will fail, for i= 0, 2, 4 + SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (RuntimeError if i.even?)) + end + + ("G".."K").each do |name| + SequentialUpdateResultJob.perform_later(@result, name: name) + end + + wait_for_jobs_to_finish_for(3.seconds) + assert_equal 3, SolidQueue::FailedExecution.count + + assert_stored_sequence @result, [ "B", "D", "F" ] + ("G".."K").to_a + end + + test "rely on scheduler to unblock blocked executions with an available semaphore" do + # Simulate a scenario where we got an available semaphore and some stuck jobs + job = SequentialUpdateResultJob.perform_later(@result, name: "A") + + wait_for_jobs_to_finish_for(3.seconds) + assert_no_pending_jobs + + # Lock the semaphore so we can enqueue jobs and leave them blocked + skip_active_record_query_cache do + assert SolidQueue::Semaphore.wait(job) + end + + # Now enqueue more jobs under that same key. They'll be all locked + assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do + ("B".."K").each do |name| + SequentialUpdateResultJob.perform_later(@result, name: name) + end + end + + # Then unlock the semaphore and expire the jobs: this would be as if the first job had + # released the semaphore but hadn't unblocked any jobs + SolidQueue::BlockedExecution.update_all(expires_at: 15.minutes.ago) + assert SolidQueue::Semaphore.signal(job) + + # And wait for the scheduler to release the jobs + wait_for_jobs_to_finish_for(3.seconds) + assert_no_pending_jobs + + # We can't ensure the order between B and C, because it depends on which worker wins when + # unblocking, as one will try to unblock B and another C + assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a + end + + test "rely on scheduler to unblock blocked executions with an expired semaphore" do + # Simulate a scenario where we got an available semaphore and some stuck jobs + job = SequentialUpdateResultJob.perform_later(@result, name: "A") + wait_for_jobs_to_finish_for(3.seconds) + assert_no_pending_jobs + + # Lock the semaphore so we can enqueue jobs and leave them blocked + skip_active_record_query_cache do + assert SolidQueue::Semaphore.wait(job) + end + + # Now enqueue more jobs under that same key. They'll be all locked + assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do + ("B".."K").each do |name| + SequentialUpdateResultJob.perform_later(@result, name: name) + end + end + + # Simulate expiration of semaphore and executions + SolidQueue::Semaphore.where(key: job.concurrency_key).update_all(expires_at: 15.minutes.ago) + SolidQueue::BlockedExecution.update_all(expires_at: 15.minutes.ago) + + # And wait for scheduler to release the jobs + wait_for_jobs_to_finish_for(3.seconds) + assert_no_pending_jobs + + # We can't ensure the order between B and C, because it depends on which worker wins when + # unblocking, as one will try to unblock B and another C + assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a + end + + private + def assert_stored_sequence(result, *sequences) + expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}"}.join } + skip_active_record_query_cache do + assert_includes expected, result.reload.status + end + end +end diff --git a/test/integration/jobs_lifecycle_test.rb b/test/integration/jobs_lifecycle_test.rb index 6cf9cbdb..a5be505e 100644 --- a/test/integration/jobs_lifecycle_test.rb +++ b/test/integration/jobs_lifecycle_test.rb @@ -4,7 +4,7 @@ class JobsLifecycleTest < ActiveSupport::TestCase setup do @worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 0.5) - @scheduler = SolidQueue::Scheduler.new(batch_size: 10, polling_interval: 0.5) + @scheduler = SolidQueue::Scheduler.new(batch_size: 10, polling_interval: 1) end teardown do diff --git a/test/integration/processes_lifecycle_test.rb b/test/integration/processes_lifecycle_test.rb index 1a436025..65a2fddb 100644 --- a/test/integration/processes_lifecycle_test.rb +++ b/test/integration/processes_lifecycle_test.rb @@ -13,7 +13,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase end teardown do - terminate_supervisor if process_exists?(@pid) + terminate_process(@pid) if process_exists?(@pid) end test "enqueue jobs in multiple queues" do @@ -26,7 +26,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase 6.times { |i| assert_completed_job_results("job_#{i}", :background) } 6.times { |i| assert_completed_job_results("job_#{i}", :default) } - terminate_supervisor + terminate_process(@pid) assert_clean_termination end @@ -150,7 +150,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase assert_job_status(job, :failed) end - terminate_supervisor + terminate_process(@pid) assert_clean_termination end @@ -175,25 +175,12 @@ class ProcessLifecycleTest < ActiveSupport::TestCase end assert process_exists?(@pid) - - terminate_supervisor + terminate_process(@pid) assert_clean_termination end private - def terminate_supervisor - terminate_process(@pid) - end - - def terminate_registered_processes - skip_active_record_query_cache do - SolidQueue::Process.find_each do |process| - terminate_process(process.metadata["pid"], from_parent: false) - end - end - end - def assert_clean_termination wait_for_registered_processes 0, timeout: 0.2.second assert_no_registered_processes diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index 14c69139..de265b44 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -1,10 +1,29 @@ require "test_helper" class SolidQueue::JobTest < ActiveSupport::TestCase + class NonOverlappingJob < ApplicationJob + limits_concurrency key: ->(job_result, **) { job_result } + + def perform(job_result) + end + end + + class NonOverlappingGroupedJob1 < NonOverlappingJob + limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup" + end + + class NonOverlappingGroupedJob2 < NonOverlappingJob + limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup" + end + + setup do + @result = JobResult.create!(queue_name: "default") + end + test "enqueue active job to be executed right away" do active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test") - assert_difference -> { SolidQueue::Job.count } => +1, -> { SolidQueue::ReadyExecution.count } => +1 do + assert_ready do SolidQueue::Job.enqueue_active_job(active_job) end @@ -24,7 +43,7 @@ class SolidQueue::JobTest < ActiveSupport::TestCase test "enqueue active job to be scheduled in the future" do active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test") - assert_difference -> { SolidQueue::Job.count } => +1, -> { SolidQueue::ScheduledExecution.count } => +1 do + assert_scheduled do SolidQueue::Job.enqueue_active_job(active_job, scheduled_at: 5.minutes.from_now) end @@ -41,4 +60,58 @@ class SolidQueue::JobTest < ActiveSupport::TestCase assert_equal 8, execution.priority assert Time.now < execution.scheduled_at end + + test "enqueue jobs with concurrency controls" do + active_job = NonOverlappingJob.perform_later(@result, name: "A") + assert_equal 1, active_job.concurrency_limit + assert_equal "SolidQueue::JobTest::NonOverlappingJob/JobResult/#{@result.id}", active_job.concurrency_key + + job = SolidQueue::Job.last + assert_equal active_job.concurrency_limit, job.concurrency_limit + assert_equal active_job.concurrency_key, job.concurrency_key + end + + test "enqueue jobs with concurrency controls in the same concurrency group" do + assert_ready do + active_job = NonOverlappingGroupedJob1.perform_later(@result, name: "A") + assert_equal 1, active_job.concurrency_limit + assert_equal "MyGroup/JobResult/#{@result.id}", active_job.concurrency_key + end + + assert_blocked do + active_job = NonOverlappingGroupedJob2.perform_later(@result, name: "B") + assert_equal 1, active_job.concurrency_limit + assert_equal "MyGroup/JobResult/#{@result.id}", active_job.concurrency_key + end + end + + test "block jobs when concurrency limits are reached" do + assert_ready do + NonOverlappingJob.perform_later(@result, name: "A") + end + + assert_blocked do + NonOverlappingJob.perform_later(@result, name: "B") + end + + blocked_execution = SolidQueue::BlockedExecution.last + assert blocked_execution.expires_at <= SolidQueue.default_concurrency_control_period.from_now + end + + private + def assert_ready(&block) + assert_difference -> { SolidQueue::Job.count } => +1, -> { SolidQueue::ReadyExecution.count } => +1, &block + end + + def assert_scheduled(&block) + assert_no_difference -> { SolidQueue::ReadyExecution.count } do + assert_difference -> { SolidQueue::Job.count } => +1, -> { SolidQueue::ScheduledExecution.count } => +1, &block + end + end + + def assert_blocked(&block) + assert_no_difference -> { SolidQueue::ReadyExecution.count } do + assert_difference -> { SolidQueue::Job.count } => +1, -> { SolidQueue::BlockedExecution.count } => +1, &block + end + end end diff --git a/test/test_helper.rb b/test/test_helper.rb index 63470368..61482adf 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -79,22 +79,15 @@ def find_processes_registered_as(kind) end end - def terminate_process(pid, timeout: 10, signal: :TERM, from_parent: true) + def terminate_process(pid, timeout: 10, signal: :TERM) signal_process(pid, signal) - wait_for_process_termination_with_timeout(pid, timeout: timeout, from_parent: from_parent) + wait_for_process_termination_with_timeout(pid, timeout: timeout) end - def wait_for_process_termination_with_timeout(pid, timeout: 10, from_parent: true, exitstatus: 0) + def wait_for_process_termination_with_timeout(pid, timeout: 10, exitstatus: 0) Timeout.timeout(timeout) do - if from_parent - Process.waitpid(pid) - assert exitstatus, $?.exitstatus - else - loop do - break unless process_exists?(pid) - sleep 0.05 - end - end + Process.waitpid(pid) + assert exitstatus, $?.exitstatus end rescue Timeout::Error signal_process(pid, :KILL)