Description
I have got some load on SolidQueue in my production app.
Our workers work in an environment where the worker can be killed at any time (cloud-type infrastructure). Because of this, over the course of time, we will develop some jobs that show as "in-process" but the worker that was running them has died. So, they have been "in process" for 13 days, etc.
I'm able to query the jobs and find the jobs that are in process but not assigned to any current active worker.
current_worker_ids = SolidQueue::Process.select(:id).where(kind: "Worker").map { |x| x.id }
SolidQueue::Job.joins(:claimed_execution).where(finished_at: nil).where.not(claimed_execution: {process_id: current_worker_ids} .where_assoc_not_exists(:failed_execution)
I've built a method to try to recover the jobs.
def requeue_abandoned!
count = 0
total_to_queue = abandoned_in_progress_jobs_count
logger.info "Requeuing #{total_to_queue} abandoned jobs"
abandoned_in_progress_jobs.find_each do |job|
job.claimed_execution.delete
schedule = SolidQueue::ScheduledExecution.create_or_find_by!(job_id: job.id)
schedule.update!(scheduled_at: Time.zone.now)
logger.info "Requeued #{count} of #{total_to_queue} jobs" if count % 100 == 0
end
logger.info "Requeued #{count} of #{total_to_queue} jobs"
true
end
As you can see, it deletes the claimed execution. Then, it tries to find the scheduled execution and set its time to now to make it ready.
This seems to work. BUT, it throws a nasty error and 0 of my workers are now working.
2024-02-21 07:28:01.643 | DETAIL: Key (job_id)=(109233) already exists. |
-- | -- | --
| | 2024-02-21 07:28:01.643 | /usr/local/bundle/ruby/3.2.0/gems/activerecord-7.1.3/lib/active_record/connection_adapters/postgresql_adapter.rb:894:in `exec_params': ERROR: duplicate key value violates unique constraint "index_solid_queue_claimed_executions_on_job_id" (PG::UniqueViolation)
So, I'm wondering if I don't understand from reading the docs how these executions work. It COULD be that this error is somewhat unrelated to what I did above (which I did for about 5000 jobs). But, I'd guess this was related.
The documentation on the executions and stuff is pretty sparse, and I'm not sure I really "got" how this works. Any documentation help would help me get to the bottom of this.
I'd appreciate it.