Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rely on ActiveSupport::Notifications and ActiveSupport::LogSubscriber for logging and instrumentation #208

Merged
merged 12 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions app/models/solid_queue/blocked_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,25 @@ class BlockedExecution < Execution
scope :expired, -> { where(expires_at: ...Time.current) }

class << self
def unblock(count)
expired.distinct.limit(count).pluck(:concurrency_key).then do |concurrency_keys|
release_many releasable(concurrency_keys)
def unblock(limit)
SolidQueue.instrument(:release_many_blocked, limit: limit) do |payload|
expired.distinct.limit(limit).pluck(:concurrency_key).then do |concurrency_keys|
payload[:size] = release_many releasable(concurrency_keys)
end
end
end

def release_many(concurrency_keys)
# We want to release exactly one blocked execution for each concurrency key, and we need to do it
# one by one, locking each record and acquiring the semaphore individually for each of them:
Array(concurrency_keys).each { |concurrency_key| release_one(concurrency_key) }
Array(concurrency_keys).count { |concurrency_key| release_one(concurrency_key) }
end

def release_one(concurrency_key)
transaction do
ordered.where(concurrency_key: concurrency_key).limit(1).non_blocking_lock.each(&:release)
if execution = ordered.where(concurrency_key: concurrency_key).limit(1).non_blocking_lock.first
execution.release
end
end
end

Expand All @@ -38,12 +42,14 @@ def releasable(concurrency_keys)
end

def release
transaction do
if acquire_concurrency_lock
promote_to_ready
destroy!
SolidQueue.instrument(:release_blocked, job_id: job.id, concurrency_key: concurrency_key, released: false) do |payload|
transaction do
if acquire_concurrency_lock
promote_to_ready
destroy!

SolidQueue.logger.debug("[SolidQueue] Unblocked job #{job.id} under #{concurrency_key}")
payload[:released] = true
end
end
end
end
Expand Down
15 changes: 11 additions & 4 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ def claiming(job_ids, process_id, &block)
end

def release_all
includes(:job).each(&:release)
SolidQueue.instrument(:release_many_claimed) do |payload|
includes(:job).tap do |executions|
payload[:size] = executions.size
executions.each(&:release)
end
end
end

def discard_all_in_batches(*)
Expand All @@ -45,9 +50,11 @@ def perform
end

def release
transaction do
job.dispatch_bypassing_concurrency_limits
destroy!
SolidQueue.instrument(:release_claimed, job_id: job.id, process_id: process_id) do
transaction do
job.dispatch_bypassing_concurrency_limits
destroy!
end
end
end

Expand Down
47 changes: 32 additions & 15 deletions app/models/solid_queue/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ class UndiscardableError < StandardError; end
belongs_to :job

class << self
def type
model_name.element.sub("_execution", "").to_sym
end

def create_all_from_jobs(jobs)
insert_all execution_data_from_jobs(jobs)
end
Expand All @@ -27,25 +31,32 @@ def discard_all_in_batches(batch_size: 500)
pending = count
discarded = 0

loop do
transaction do
job_ids = limit(batch_size).order(:job_id).lock.pluck(:job_id)
SolidQueue.instrument(:discard_all, batch_size: batch_size, status: type, batches: 0, size: 0) do |payload|
loop do
transaction do
job_ids = limit(batch_size).order(:job_id).lock.pluck(:job_id)
discarded = discard_jobs job_ids

discard_jobs job_ids
discarded = where(job_id: job_ids).delete_all
pending -= discarded
end
where(job_id: job_ids).delete_all
pending -= discarded

payload[:size] += discarded
payload[:batches] += 1
end

break if pending <= 0 || discarded == 0
break if pending <= 0 || discarded == 0
end
end
end

def discard_all_from_jobs(jobs)
transaction do
job_ids = lock_all_from_jobs(jobs)
SolidQueue.instrument(:discard_all, jobs_size: jobs.size, status: type) do |payload|
transaction do
job_ids = lock_all_from_jobs(jobs)

discard_jobs job_ids
where(job_id: job_ids).delete_all
payload[:size] = discard_jobs job_ids
where(job_id: job_ids).delete_all
end
end
end

Expand All @@ -59,10 +70,16 @@ def discard_jobs(job_ids)
end
end

def type
self.class.type
end

def discard
with_lock do
job.destroy
destroy
SolidQueue.instrument(:discard, job_id: job_id, status: type) do
with_lock do
job.destroy
destroy
end
end
end
end
Expand Down
8 changes: 5 additions & 3 deletions app/models/solid_queue/execution/dispatching.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ module Dispatching
def dispatch_jobs(job_ids)
jobs = Job.where(id: job_ids)

Job.dispatch_all(jobs).map(&:id).tap do |dispatched_job_ids|
where(job_id: dispatched_job_ids).order(:job_id).delete_all
SolidQueue.logger.info("[SolidQueue] Dispatched #{dispatched_job_ids.size} jobs")
Job.dispatch_all(jobs).map(&:id).then do |dispatched_job_ids|
if dispatched_job_ids.none? then 0
else
where(job_id: dispatched_job_ids).order(:job_id).delete_all
end
end
end
end
Expand Down
14 changes: 9 additions & 5 deletions app/models/solid_queue/failed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,19 @@ class FailedExecution < Execution
attr_accessor :exception

def self.retry_all(jobs)
transaction do
dispatch_jobs lock_all_from_jobs(jobs)
SolidQueue.instrument(:retry_all, jobs_size: jobs.size) do |payload|
transaction do
payload[:size] = dispatch_jobs lock_all_from_jobs(jobs)
end
end
end

def retry
with_lock do
job.prepare_for_execution
destroy!
SolidQueue.instrument(:retry, job_id: job.id) do
with_lock do
job.prepare_for_execution
destroy!
end
end
end

Expand Down
2 changes: 1 addition & 1 deletion app/models/solid_queue/job/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def status
if finished?
:finished
elsif execution.present?
execution.model_name.element.sub("_execution", "").to_sym
execution.type
end
end

Expand Down
19 changes: 13 additions & 6 deletions app/models/solid_queue/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,24 @@ class SolidQueue::Process < SolidQueue::Record
after_destroy -> { claimed_executions.release_all }

def self.register(**attributes)
create!(attributes.merge(last_heartbeat_at: Time.current))
SolidQueue.instrument :register_process, **attributes do
create!(attributes.merge(last_heartbeat_at: Time.current))
end
rescue Exception => error
SolidQueue.instrument :register_process, **attributes.merge(error: error)
raise
end

def heartbeat
touch(:last_heartbeat_at)
end

def deregister
destroy!
rescue Exception
SolidQueue.logger.error("[SolidQueue] Error deregistering process #{id} - #{metadata}")
raise
def deregister(pruned: false)
SolidQueue.instrument :deregister_process, process: self, pruned: pruned, claimed_size: claimed_executions.size do |payload|
destroy!
rescue Exception => error
payload[:error] = error
raise
end
end
end
9 changes: 5 additions & 4 deletions app/models/solid_queue/process/prunable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ module SolidQueue::Process::Prunable

class_methods do
def prune
prunable.non_blocking_lock.find_in_batches(batch_size: 50) do |batch|
batch.each do |process|
SolidQueue.logger.info("[SolidQueue] Pruning dead process #{process.id} - #{process.metadata}")
process.deregister
SolidQueue.instrument :prune_processes, size: 0 do |payload|
prunable.non_blocking_lock.find_in_batches(batch_size: 50) do |batch|
payload[:size] += batch.size

batch.each { |process| process.deregister(pruned: true) }
end
end
end
Expand Down
6 changes: 3 additions & 3 deletions app/models/solid_queue/recurring_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ class RecurringExecution < Execution
class << self
def record(task_key, run_at, &block)
transaction do
if job_id = block.call
create!(job_id: job_id, task_key: task_key, run_at: run_at)
block.call.tap do |active_job|
create!(job_id: active_job.provider_job_id, task_key: task_key, run_at: run_at)
end
end
rescue ActiveRecord::RecordNotUnique
SolidQueue.logger.info("[SolidQueue] Skipped recurring task #{task_key} at #{run_at} — already dispatched")
# Task already dispatched
end

def clear_in_batches(batch_size: 500)
Expand Down
4 changes: 3 additions & 1 deletion app/models/solid_queue/scheduled_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ def dispatch_next_batch(batch_size)
job_ids = next_batch(batch_size).non_blocking_lock.pluck(:job_id)
if job_ids.empty? then []
else
dispatch_jobs(job_ids)
SolidQueue.instrument(:dispatch_scheduled, batch_size: batch_size) do |payload|
payload[:size] = dispatch_jobs(job_ids)
end
end
end
end
Expand Down
24 changes: 14 additions & 10 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
loader.setup

module SolidQueue
extend self

mattr_accessor :logger, default: ActiveSupport::Logger.new($stdout)
mattr_accessor :app_executor, :on_thread_error, :connects_to

Expand All @@ -39,17 +41,19 @@ module SolidQueue
mattr_accessor :clear_finished_jobs_after, default: 1.day
mattr_accessor :default_concurrency_control_period, default: 3.minutes

class << self
def supervisor?
supervisor
end
def supervisor?
supervisor
end

def silence_polling?
silence_polling
end
def silence_polling?
silence_polling
end

def preserve_finished_jobs?
preserve_finished_jobs
end

def preserve_finished_jobs?
preserve_finished_jobs
end
def instrument(channel, **options, &block)
ActiveSupport::Notifications.instrument("#{channel}.solid_queue", **options, &block)
end
end
2 changes: 1 addition & 1 deletion lib/solid_queue/app_executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def wrap_in_app_executor(&block)
end

def handle_thread_error(error)
SolidQueue.logger.error("[SolidQueue] #{error}")
SolidQueue.instrument(:thread_error, error: error)

if SolidQueue.on_thread_error
SolidQueue.on_thread_error.call(error)
Expand Down
8 changes: 4 additions & 4 deletions lib/solid_queue/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ def initialize(**options)
@recurring_schedule = RecurringSchedule.new(options[:recurring_tasks])
end

def metadata
super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.tasks.presence)
end

private
def poll
batch = dispatch_next_batch
Expand Down Expand Up @@ -50,9 +54,5 @@ def unload_recurring_schedule
def set_procline
procline "waiting"
end

def metadata
super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.tasks.presence)
end
end
end
16 changes: 11 additions & 5 deletions lib/solid_queue/dispatcher/recurring_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,16 @@ def next_time
end

def enqueue(at:)
if using_solid_queue_adapter?
perform_later_and_record(run_at: at)
else
perform_later
SolidQueue.instrument(:enqueue_recurring_task, task: key, at: at) do |payload|
if using_solid_queue_adapter?
perform_later_and_record(run_at: at)
else
payload[:other_adapter] = true

perform_later
end.tap do |active_job|
payload[:active_job_id] = active_job&.job_id
end
end
end

Expand All @@ -59,7 +65,7 @@ def using_solid_queue_adapter?
end

def perform_later_and_record(run_at:)
RecurringExecution.record(key, run_at) { perform_later.provider_job_id }
RecurringExecution.record(key, run_at) { perform_later }
end

def perform_later
Expand Down
4 changes: 3 additions & 1 deletion lib/solid_queue/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ class Engine < ::Rails::Engine

initializer "solid_queue.logger" do |app|
ActiveSupport.on_load(:solid_queue) do
self.logger = app.logger
self.logger ||= app.logger
end

SolidQueue::LogSubscriber.attach_to :solid_queue
end

initializer "solid_queue.active_job.extensions" do
Expand Down
Loading
Loading