Skip to content

Commit

Permalink
Fix issue when pruning a supervisor and its supervisees via callbacks
Browse files Browse the repository at this point in the history
In that case we weren't correctly failing executions as a consequence of
the prune because the processes would get deleted via a callback without
going through the regular prune. Leave this to the prune itself instead
of relying on the callbacks and only deregister supervised processes
when not pruning. This also saves a query because we check whether we
have a supervisor or not before trying to find supervisees.
  • Loading branch information
rosa committed Aug 27, 2024
1 parent 3a6eec8 commit 7901a8e
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 14 deletions.
9 changes: 5 additions & 4 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,21 @@ def claiming(job_ids, process_id, &block)
def release_all
SolidQueue.instrument(:release_many_claimed) do |payload|
includes(:job).tap do |executions|
payload[:size] = executions.size
executions.each(&:release)

payload[:size] = executions.size
end
end
end

def fail_all_with(error)
SolidQueue.instrument(:fail_many_claimed) do |payload|
includes(:job).tap do |executions|
payload[:size] = executions.size
executions.each { |execution| execution.failed_with(error) }

payload[:process_ids] = executions.map(&:process_id).uniq
payload[:job_ids] = executions.map(&:job_id).uniq

executions.each { |execution| execution.failed_with(error) }
payload[:size] = executions.size
end
end
end
Expand Down
11 changes: 10 additions & 1 deletion app/models/solid_queue/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ class SolidQueue::Process < SolidQueue::Record
include Executor, Prunable

belongs_to :supervisor, class_name: "SolidQueue::Process", optional: true, inverse_of: :supervisees
has_many :supervisees, class_name: "SolidQueue::Process", inverse_of: :supervisor, foreign_key: :supervisor_id, dependent: :destroy
has_many :supervisees, class_name: "SolidQueue::Process", inverse_of: :supervisor, foreign_key: :supervisor_id

store :metadata, coder: JSON

Expand All @@ -26,9 +26,18 @@ def heartbeat
def deregister(pruned: false)
SolidQueue.instrument :deregister_process, process: self, pruned: pruned do |payload|
destroy!

unless supervised? || pruned
supervisees.each(&:deregister)
end
rescue Exception => error
payload[:error] = error
raise
end
end

private
def supervised?
supervisor_id.present?
end
end
8 changes: 7 additions & 1 deletion app/models/solid_queue/process/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module Executor
included do
has_many :claimed_executions

after_destroy -> { claimed_executions.release_all }, if: :claims_executions?
after_destroy :release_all_claimed_executions
end

def fail_all_claimed_executions_with(error)
Expand All @@ -17,6 +17,12 @@ def fail_all_claimed_executions_with(error)
end
end

def release_all_claimed_executions
if claims_executions?
claimed_executions.release_all
end
end

private
def claims_executions?
kind == "Worker"
Expand Down
2 changes: 1 addition & 1 deletion lib/solid_queue/supervisor/maintenance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def prune_dead_processes

def fail_orphaned_executions
wrap_in_app_executor do
SolidQueue::ClaimedExecution.orphaned.fail_all_with(ProcessMissingError.new)
ClaimedExecution.orphaned.fail_all_with(ProcessMissingError.new)
end
end
end
Expand Down
16 changes: 9 additions & 7 deletions test/integration/forked_processes_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
@pid = run_supervisor_as_fork(load_configuration_from: config_as_hash)

wait_for_registered_processes(3, timeout: 3.second)
assert_registered_workers_for(:background, :default)
assert_registered_workers_for(:background, :default, supervisor_pid: @pid)
end

teardown do
Expand Down Expand Up @@ -49,7 +49,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
assert_job_status(pause, :finished)

# Termination is almost clean, but the supervisor remains
assert_registered_supervisor
assert_registered_supervisor_with(@pid)
assert_no_registered_workers
assert_no_claimed_jobs
end
Expand Down Expand Up @@ -217,7 +217,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase

# And there's a new worker that has been registered for that queue:
wait_for_registered_processes(3, timeout: 3.second)
assert_registered_workers_for(:background, :default)
assert_registered_workers_for(:background, :default, supervisor_pid: @pid)

# And they can process jobs just fine
enqueue_store_result_job("no_pause")
Expand Down Expand Up @@ -272,17 +272,19 @@ def assert_clean_termination
assert_not process_exists?(@pid)
end

def assert_registered_workers_for(*queues)
def assert_registered_workers_for(*queues, supervisor_pid: nil)
workers = find_processes_registered_as("Worker")
registered_queues = workers.map { |process| process.metadata["queues"] }.compact
assert_equal queues.map(&:to_s).sort, registered_queues.sort
assert_equal [ @pid ], workers.map { |process| process.supervisor.pid }.uniq
if supervisor_pid
assert_equal [ supervisor_pid ], workers.map { |process| process.supervisor.pid }.uniq
end
end

def assert_registered_supervisor
def assert_registered_supervisor_with(pid)
processes = find_processes_registered_as("Supervisor(fork)")
assert_equal 1, processes.count
assert_equal @pid, processes.first.pid
assert_equal pid, processes.first.pid
end

def assert_no_registered_workers
Expand Down
22 changes: 22 additions & 0 deletions test/models/solid_queue/process_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,28 @@ class SolidQueue::ProcessTest < ActiveSupport::TestCase
assert jobs.all?(&:failed?)
end

test "prune processes including their supervisor with expired heartbeats and fail claimed executions" do
supervisor = SolidQueue::Process.register(kind: "Supervisor(fork)", pid: 42, name: "supervisor-42")
process = SolidQueue::Process.register(kind: "Worker", pid: 43, name: "worker-43", supervisor_id: supervisor.id)
3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) }
jobs = SolidQueue::Job.last(3)

SolidQueue::ReadyExecution.claim("*", 5, process.id)

travel_to 10.minutes.from_now

assert_difference -> { SolidQueue::Process.count }, -2 do
assert_difference -> { SolidQueue::FailedExecution.count }, 3 do
assert_difference -> { SolidQueue::ClaimedExecution.count }, -3 do
SolidQueue::Process.prune
end
end
end

jobs.each(&:reload)
assert jobs.all?(&:failed?)
end

test "hostname's with special characters are properly loaded" do
worker = SolidQueue::Worker.new(queues: "*", threads: 3, polling_interval: 0.2)
hostname = "Basecamp’s-Computer"
Expand Down

0 comments on commit 7901a8e

Please sign in to comment.