Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce concurrency controls #38

Merged
merged 32 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e1c7884
Implement a very basic concurrency control API for ActiveJob
rosa Nov 3, 2023
f405340
Implement basic concurrency control on enqueuing using semaphores
rosa Nov 5, 2023
4ce223a
Move concurrency control unit tests to job model test
rosa Nov 5, 2023
6050088
Refactor slightly how executions assume attributes from job
rosa Nov 5, 2023
5c14277
Dispatch blocked executions when a concurrency-limited job finishes
rosa Nov 5, 2023
e217ae0
Test concurrency limited to more than 1 job of a kind
rosa Nov 6, 2023
873760d
Dispatch blocked jobs after job perform finishes outside that transac…
rosa Nov 6, 2023
d0eeed2
Avoid race-conditions when releasing blocked executions
rosa Nov 6, 2023
9fa0aa2
Correct transactions to release blocked executions
rosa Nov 6, 2023
7ab243d
Unblock releasable blocked executions before polling
rosa Nov 6, 2023
04dd571
Consider blocked executions without a semaphore as releasable
rosa Nov 6, 2023
7814338
Tweak concurrency related tables
rosa Nov 7, 2023
016674b
Delegate concurrency limit and duration to job class and store `expir…
rosa Nov 16, 2023
f22a46c
Include concurrency controls extension directly in ActiveJob
rosa Nov 16, 2023
ede2bfb
Use a concurrent timer task to expire semaphores and unblock executions
rosa Nov 16, 2023
8484ca8
Don't instantiate job class to check concurrency limit if the key is …
rosa Nov 20, 2023
91256ee
Rename Semaphore.concurrency_key to Semaphore.key and simplify method…
rosa Nov 20, 2023
9f80e5f
Improve a bit some execution methods (DRY and order)
rosa Nov 20, 2023
8993dd9
Rename `concurrency_limit_duration` to `concurrency_duration`
rosa Nov 20, 2023
cfaf02f
Remove unused scope
rosa Nov 20, 2023
6e14ea0
Rename concurrency limit spec to `limits_concurrency to: limit ...`
rosa Nov 20, 2023
7c3007a
Remove unused method, replaced by a scope
rosa Nov 22, 2023
363ae88
Fix index on `blocked_executions` for releasing and add index on sema…
rosa Nov 22, 2023
fbef301
Use intermediate Proxy object for the Semaphore low-level actions
rosa Nov 22, 2023
427482e
Add support for concurrency controls across jobs (aka. concurrency "g…
rosa Nov 23, 2023
93eb7f8
Don't have a default key for concurrency controls, force always a cho…
rosa Nov 23, 2023
ee5fcc6
Use a separate config option for concurrency maintenance task interval
rosa Nov 23, 2023
feef30b
Remove unused option in test helper and unused test helper
rosa Nov 23, 2023
9d0da03
Store expires_at time in blocked_executions
rosa Nov 23, 2023
49f7d13
Add a few style related fixes across the board
rosa Nov 23, 2023
77b67b0
Rely on blocked_executions.expires_at to select candidates to be rele…
rosa Nov 26, 2023
8a5de86
Fix bug when selecting releasable executions and make sure these are …
rosa Nov 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions app/models/solid_queue/blocked_execution.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
module SolidQueue
class BlockedExecution < SolidQueue::Execution
assume_attributes_from_job :concurrency_key
before_create :set_expires_at

has_one :semaphore, foreign_key: :key, primary_key: :concurrency_key

scope :expired, -> { where(expires_at: ...Time.current) }

class << self
def unblock(count)
expired.distinct.limit(count).pluck(:concurrency_key).then do |concurrency_keys|
release_many releasable(concurrency_keys)
end
end

def release_many(concurrency_keys)
# We want to release exactly one blocked execution for each concurrency key, and we need to do it
# one by one, locking each record and acquiring the semaphore individually for each of them:
Array(concurrency_keys).each { |concurrency_key| release_one(concurrency_key) }
end

def release_one(concurrency_key)
ordered.where(concurrency_key: concurrency_key).limit(1).lock("FOR UPDATE SKIP LOCKED").each(&:release)
rosa marked this conversation as resolved.
Show resolved Hide resolved
end

private
def releasable(concurrency_keys)
semaphores = Semaphore.where(key: concurrency_keys).select(:key, :value).index_by(&:key)

# Concurrency keys without semaphore + concurrency keys with open semaphore
(concurrency_keys - semaphores.keys) | semaphores.select { |key, semaphore| semaphore.value > 0 }.map(&:first)
end
end

def release
transaction do
if acquire_concurrency_lock
promote_to_ready
destroy!

SolidQueue.logger.info("[SolidQueue] Unblocked job #{job.id} under #{concurrency_key}")
end
end
end

private
def set_expires_at
self.expires_at = job.concurrency_duration.from_now
end

def acquire_concurrency_lock
Semaphore.wait(job)
end

def promote_to_ready
ReadyExecution.create!(ready_attributes)
end
end
end
2 changes: 2 additions & 0 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def perform
else
failed_with(result.error)
end
ensure
job.unblock_next_blocked_job
end

def release
Expand Down
18 changes: 13 additions & 5 deletions app/models/solid_queue/execution.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
class SolidQueue::Execution < SolidQueue::Record
include JobAttributes
module SolidQueue
class Execution < SolidQueue::Record
include JobAttributes

self.abstract_class = true
self.abstract_class = true

belongs_to :job
scope :ordered, -> { order(priority: :asc, job_id: :asc) }

alias_method :discard, :destroy
belongs_to :job

alias_method :discard, :destroy

def ready_attributes
attributes.slice("job_id", "queue_name", "priority")
end
end
end
3 changes: 2 additions & 1 deletion app/models/solid_queue/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ def enqueue_active_job(active_job, scheduled_at: Time.current)
priority: active_job.priority,
scheduled_at: scheduled_at,
class_name: active_job.class.name,
arguments: active_job.serialize
arguments: active_job.serialize,
concurrency_key: active_job.try(:concurrency_key)
end

def enqueue(**kwargs)
Expand Down
48 changes: 48 additions & 0 deletions app/models/solid_queue/job/concurrency_controls.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
module SolidQueue
class Job
module ConcurrencyControls
extend ActiveSupport::Concern

included do
has_one :blocked_execution, dependent: :destroy

delegate :concurrency_limit, :concurrency_duration, to: :job_class
end

def unblock_next_blocked_job
if release_concurrency_lock
release_next_blocked_job
end
end

private
def acquire_concurrency_lock
return true unless concurrency_limited?

Semaphore.wait(self)
end

def release_concurrency_lock
return false unless concurrency_limited?

Semaphore.signal(self)
end

def block
BlockedExecution.create_or_find_by!(job_id: id)
end

def release_next_blocked_job
BlockedExecution.release_one(concurrency_key)
end

def concurrency_limited?
concurrency_key.present? && concurrency_limit.to_i > 0
end

def job_class
@job_class ||= class_name.safe_constantize
end
end
end
end
109 changes: 65 additions & 44 deletions app/models/solid_queue/job/executable.rb
Original file line number Diff line number Diff line change
@@ -1,64 +1,85 @@
module SolidQueue
module Job::Executable
extend ActiveSupport::Concern
class Job
module Executable
extend ActiveSupport::Concern

included do
has_one :ready_execution, dependent: :destroy
has_one :claimed_execution, dependent: :destroy
has_one :failed_execution, dependent: :destroy
included do
include ConcurrencyControls

has_one :scheduled_execution, dependent: :destroy
has_one :ready_execution, dependent: :destroy
has_one :claimed_execution, dependent: :destroy
has_one :failed_execution, dependent: :destroy

after_create :prepare_for_execution
has_one :scheduled_execution, dependent: :destroy

scope :finished, -> { where.not(finished_at: nil) }
end

STATUSES = %w[ ready claimed failed scheduled ]
after_create :prepare_for_execution

STATUSES.each do |status|
define_method("#{status}?") { public_send("#{status}_execution").present? }
end
scope :finished, -> { where.not(finished_at: nil) }
end

def prepare_for_execution
if due?
ReadyExecution.create_or_find_by!(job_id: id)
else
ScheduledExecution.create_or_find_by!(job_id: id)
%w[ ready claimed failed scheduled ].each do |status|
define_method("#{status}?") { public_send("#{status}_execution").present? }
end
end

def finished!
if delete_finished_jobs?
destroy!
else
touch(:finished_at)
def prepare_for_execution
if due? then dispatch
else
schedule
end
end
end

def finished?
finished_at.present?
end
def finished!
if delete_finished_jobs?
destroy!
else
touch(:finished_at)
end
end

def failed_with(exception)
FailedExecution.create_or_find_by!(job_id: id, exception: exception)
end
def finished?
finished_at.present?
end

def discard
destroy unless claimed?
end
def failed_with(exception)
FailedExecution.create_or_find_by!(job_id: id, exception: exception)
end

def retry
failed_execution&.retry
end
def discard
destroy unless claimed?
end

private
def due?
scheduled_at.nil? || scheduled_at <= Time.current
def retry
failed_execution&.retry
end

def delete_finished_jobs?
SolidQueue.delete_finished_jobs
def failed_with(exception)
FailedExecution.create_or_find_by!(job_id: id, exception: exception)
end

private
def due?
scheduled_at.nil? || scheduled_at <= Time.current
end

def dispatch
if acquire_concurrency_lock then ready
else
block
end
end

def schedule
ScheduledExecution.create_or_find_by!(job_id: id)
end

def ready
ReadyExecution.create_or_find_by!(job_id: id)
end


def delete_finished_jobs?
SolidQueue.delete_finished_jobs
end
end
end
end
1 change: 0 additions & 1 deletion app/models/solid_queue/ready_execution.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
module SolidQueue
class ReadyExecution < Execution
scope :queued_as, ->(queue_name) { where(queue_name: queue_name) }
scope :ordered, -> { order(priority: :asc, job_id: :asc) }

assume_attributes_from_job

Expand Down
8 changes: 2 additions & 6 deletions app/models/solid_queue/scheduled_execution.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
class SolidQueue::ScheduledExecution < SolidQueue::Execution
scope :due, -> { where("scheduled_at <= ?", Time.current) }
scope :due, -> { where(scheduled_at: ..Time.current) }
scope :ordered, -> { order(scheduled_at: :asc, priority: :asc) }
scope :next_batch, ->(batch_size) { due.ordered.limit(batch_size) }

Expand All @@ -10,7 +10,7 @@ def prepare_batch(batch)
prepared_at = Time.current

rows = batch.map do |scheduled_execution|
scheduled_execution.execution_ready_attributes.merge(created_at: prepared_at)
scheduled_execution.ready_attributes.merge(created_at: prepared_at)
end

if rows.any?
Expand All @@ -23,8 +23,4 @@ def prepare_batch(batch)
SolidQueue.logger.info("[SolidQueue] Prepared scheduled batch with #{rows.size} jobs at #{prepared_at}")
end
end

def execution_ready_attributes
attributes.slice("job_id", "queue_name", "priority")
end
end
63 changes: 63 additions & 0 deletions app/models/solid_queue/semaphore.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
class SolidQueue::Semaphore < SolidQueue::Record
scope :available, -> { where("value > 0") }
scope :expired, -> { where(expires_at: ...Time.current) }

class << self
def wait(job)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For your consideration: the semaphore static interface where it receives a job and then pass several jobs' attributes around feels like it could benefit from an extracting a method object. I understand why you did this, since it's a special class that acts on group of records, not a regular active record object. Still you may consider to use an additional inner class with a name like Control or Proxy that exposes the interface you want for a given job. I haven't tested this, but it could look something like:

# When using a semaphore

   private
      def acquire_concurrency_lock
        semaphore.wait
      end

      def semaphore
        Semaphore.for(job)
      end

# The exposed proxy

class SolidQueue::Semaphore < SolidQueue::Record
  scope :available, -> { where("value > 0") }
  scope :expired, -> { where(expires_at: ...Time.current)}

  class << self
    def for(job)
      Control.new(job)
    end
  end
  
  class Control
    attr_reader :job
    
    def initialize(job)
      @job = job
    end
    
    def wait
      if semaphore = self.class.find_by(key: key)
        semaphore.value > 0 && attempt_decrement
      else
        attempt_creation
      end
    end
    
    def signal
      attempt_increment
    end
    
    private
      def key
        job.concurrency_key
      end
      
      def limit
        job.concurrency_limit
      end
      
      def duration
        job.concurrency_duration
      end

      def attempt_creation
        self.class.create!(key: key, value: limit - 1, expires_at: duration.from_now)
        true
      rescue ActiveRecord::RecordNotUnique
        self.class.attempt_decrement(key, duration)
      end

      def attempt_decrement
        self.class.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", duration.from_now ]) > 0
      end

      def attempt_increment
        where("value < ?", limit).where(key: key).update_all([ "value = value + 1, expires_at = ?", duration.from_now ]) > 0
      end
  end
end

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jorgemanrubia! I'll think about it and see how it could look like. I agree with you, passing the job attributes around doesn't look so nice, and you're spot on about why I did it this way.

Thanks so much for your suggestion, I'll play with it 🙏

Copy link
Member Author

@rosa rosa Nov 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorgemanrubia, I took a stab at it in 61fec0d, what do you think? I think I like it! I chose to keep the methods Semaphore.signal(job) and Semaphore.wait(job), but encapsulate the rest. I like to use the static interface from outside, although I don't feel strongly.

Proxy.new(job, self).wait
end

def signal(job)
Proxy.new(job, self).signal
end
end

class Proxy
def initialize(job, proxied_class)
@job = job
@proxied_class = proxied_class
end

def wait
if semaphore = proxied_class.find_by(key: key)
semaphore.value > 0 && attempt_decrement
else
attempt_creation
end
end

def signal
attempt_increment
end

private
attr_reader :job, :proxied_class

def attempt_creation
proxied_class.create!(key: key, value: limit - 1, expires_at: expires_at)
true
rescue ActiveRecord::RecordNotUnique
attempt_decrement
end

def attempt_decrement
proxied_class.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0
end

def attempt_increment
proxied_class.where(key: key, value: ...limit).update_all([ "value = value + 1, expires_at = ?", expires_at ]) > 0
end

def key
job.concurrency_key
end

def expires_at
job.concurrency_duration.from_now
end

def limit
job.concurrency_limit
end
end
end
Loading