diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index c1313601..d4abf45a 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -29,8 +29,9 @@ def claiming(job_ids, process_id, &block) def release_all SolidQueue.instrument(:release_many_claimed) do |payload| includes(:job).tap do |executions| - payload[:size] = executions.size executions.each(&:release) + + payload[:size] = executions.size end end end @@ -38,11 +39,11 @@ def release_all def fail_all_with(error) SolidQueue.instrument(:fail_many_claimed) do |payload| includes(:job).tap do |executions| - payload[:size] = executions.size + executions.each { |execution| execution.failed_with(error) } + payload[:process_ids] = executions.map(&:process_id).uniq payload[:job_ids] = executions.map(&:job_id).uniq - - executions.each { |execution| execution.failed_with(error) } + payload[:size] = executions.size end end end diff --git a/app/models/solid_queue/process.rb b/app/models/solid_queue/process.rb index f6513912..4ab4bbb0 100644 --- a/app/models/solid_queue/process.rb +++ b/app/models/solid_queue/process.rb @@ -4,7 +4,7 @@ class SolidQueue::Process < SolidQueue::Record include Executor, Prunable belongs_to :supervisor, class_name: "SolidQueue::Process", optional: true, inverse_of: :supervisees - has_many :supervisees, class_name: "SolidQueue::Process", inverse_of: :supervisor, foreign_key: :supervisor_id, dependent: :destroy + has_many :supervisees, class_name: "SolidQueue::Process", inverse_of: :supervisor, foreign_key: :supervisor_id store :metadata, coder: JSON @@ -26,9 +26,18 @@ def heartbeat def deregister(pruned: false) SolidQueue.instrument :deregister_process, process: self, pruned: pruned do |payload| destroy! + + unless supervised? || pruned + supervisees.each(&:deregister) + end rescue Exception => error payload[:error] = error raise end end + + private + def supervised? + supervisor_id.present? + end end diff --git a/app/models/solid_queue/process/executor.rb b/app/models/solid_queue/process/executor.rb index 99919dbd..8dcd12aa 100644 --- a/app/models/solid_queue/process/executor.rb +++ b/app/models/solid_queue/process/executor.rb @@ -8,7 +8,7 @@ module Executor included do has_many :claimed_executions - after_destroy -> { claimed_executions.release_all }, if: :claims_executions? + after_destroy :release_all_claimed_executions end def fail_all_claimed_executions_with(error) @@ -17,6 +17,12 @@ def fail_all_claimed_executions_with(error) end end + def release_all_claimed_executions + if claims_executions? + claimed_executions.release_all + end + end + private def claims_executions? kind == "Worker" diff --git a/lib/solid_queue/supervisor/maintenance.rb b/lib/solid_queue/supervisor/maintenance.rb index 6af8384f..d9bf97b4 100644 --- a/lib/solid_queue/supervisor/maintenance.rb +++ b/lib/solid_queue/supervisor/maintenance.rb @@ -35,7 +35,7 @@ def prune_dead_processes def fail_orphaned_executions wrap_in_app_executor do - SolidQueue::ClaimedExecution.orphaned.fail_all_with(ProcessMissingError.new) + ClaimedExecution.orphaned.fail_all_with(ProcessMissingError.new) end end end diff --git a/test/integration/forked_processes_lifecycle_test.rb b/test/integration/forked_processes_lifecycle_test.rb index e0e8b245..19862066 100644 --- a/test/integration/forked_processes_lifecycle_test.rb +++ b/test/integration/forked_processes_lifecycle_test.rb @@ -10,7 +10,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase @pid = run_supervisor_as_fork(load_configuration_from: config_as_hash) wait_for_registered_processes(3, timeout: 3.second) - assert_registered_workers_for(:background, :default) + assert_registered_workers_for(:background, :default, supervisor_pid: @pid) end teardown do @@ -49,7 +49,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase assert_job_status(pause, :finished) # Termination is almost clean, but the supervisor remains - assert_registered_supervisor + assert_registered_supervisor_with(@pid) assert_no_registered_workers assert_no_claimed_jobs end @@ -217,7 +217,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase # And there's a new worker that has been registered for that queue: wait_for_registered_processes(3, timeout: 3.second) - assert_registered_workers_for(:background, :default) + assert_registered_workers_for(:background, :default, supervisor_pid: @pid) # And they can process jobs just fine enqueue_store_result_job("no_pause") @@ -272,17 +272,19 @@ def assert_clean_termination assert_not process_exists?(@pid) end - def assert_registered_workers_for(*queues) + def assert_registered_workers_for(*queues, supervisor_pid: nil) workers = find_processes_registered_as("Worker") registered_queues = workers.map { |process| process.metadata["queues"] }.compact assert_equal queues.map(&:to_s).sort, registered_queues.sort - assert_equal [ @pid ], workers.map { |process| process.supervisor.pid }.uniq + if supervisor_pid + assert_equal [ supervisor_pid ], workers.map { |process| process.supervisor.pid }.uniq + end end - def assert_registered_supervisor + def assert_registered_supervisor_with(pid) processes = find_processes_registered_as("Supervisor(fork)") assert_equal 1, processes.count - assert_equal @pid, processes.first.pid + assert_equal pid, processes.first.pid end def assert_no_registered_workers diff --git a/test/models/solid_queue/process_test.rb b/test/models/solid_queue/process_test.rb index d95d2d72..80bd04a2 100644 --- a/test/models/solid_queue/process_test.rb +++ b/test/models/solid_queue/process_test.rb @@ -34,6 +34,28 @@ class SolidQueue::ProcessTest < ActiveSupport::TestCase assert jobs.all?(&:failed?) end + test "prune processes including their supervisor with expired heartbeats and fail claimed executions" do + supervisor = SolidQueue::Process.register(kind: "Supervisor(fork)", pid: 42, name: "supervisor-42") + process = SolidQueue::Process.register(kind: "Worker", pid: 43, name: "worker-43", supervisor_id: supervisor.id) + 3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) } + jobs = SolidQueue::Job.last(3) + + SolidQueue::ReadyExecution.claim("*", 5, process.id) + + travel_to 10.minutes.from_now + + assert_difference -> { SolidQueue::Process.count }, -2 do + assert_difference -> { SolidQueue::FailedExecution.count }, 3 do + assert_difference -> { SolidQueue::ClaimedExecution.count }, -3 do + SolidQueue::Process.prune + end + end + end + + jobs.each(&:reload) + assert jobs.all?(&:failed?) + end + test "hostname's with special characters are properly loaded" do worker = SolidQueue::Worker.new(queues: "*", threads: 3, polling_interval: 0.2) hostname = "Basecamp’s-Computer"