Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ActiveRecord usage for advisory locking #24

Merged
merged 1 commit into from
Mar 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 47 additions & 39 deletions lib/good_job/lockable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,66 @@ module Lockable
RecordAlreadyAdvisoryLockedError = Class.new(StandardError)

included do
scope :advisory_lock, (lambda do
original_query = self

cte_table = Arel::Table.new(:rows)
composed_cte = Arel::Nodes::As.new(cte_table, original_query.select(primary_key).except(:limit).arel)

query = cte_table.project(cte_table[:id])
.with(composed_cte)
.where(Arel.sql(sanitize_sql_for_conditions(["pg_try_advisory_lock(('x'||substr(md5(:table_name || \"#{cte_table.name}\".\"#{primary_key}\"::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }])))

limit = original_query.arel.ast.limit
query.limit = limit.value if limit.present?

unscoped.where(arel_table[:id].in(query)).merge(original_query.only(:order))
end)

scope :joins_advisory_locks, (lambda do
joins(<<~SQL)
join_sql = <<~SQL
LEFT JOIN pg_locks ON pg_locks.locktype = 'advisory'
AND pg_locks.objsubid = 1
AND pg_locks.classid = ('x'||substr(md5(good_jobs.id::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x'||substr(md5(good_jobs.id::text), 1, 16))::bit(64) << 32)::bit(32)::int
AND pg_locks.classid = ('x'||substr(md5(:table_name || "#{table_name}"."#{primary_key}"::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x'||substr(md5(:table_name || "#{table_name}"."#{primary_key}"::text), 1, 16))::bit(64) << 32)::bit(32)::int
SQL

joins(sanitize_sql_for_conditions([join_sql, { table_name: table_name }]))
end)

scope :advisory_unlocked, -> { joins_advisory_locks.where(pg_locks: { locktype: nil }) }
scope :advisory_locked, -> { joins_advisory_locks.where.not(pg_locks: { locktype: nil }) }
scope :owns_advisory_locked, -> { joins_advisory_locks.where('"pg_locks"."pid" = pg_backend_pid()') }

attr_accessor :create_with_advisory_lock

after_create -> { advisory_lock }, if: :create_with_advisory_lock
end

class_methods do
def first_advisory_locked_row(query)
find_by_sql(<<~SQL).first
WITH rows AS (#{query.to_sql})
SELECT rows.*
FROM rows
WHERE pg_try_advisory_lock(('x'||substr(md5(id::text), 1, 16))::bit(64)::bigint)
LIMIT 1
SQL
def with_advisory_lock(&block)
records = advisory_lock.to_a
begin
block.call(records)
ensure
records.each(&:advisory_unlock)
end
end
end

def advisory_lock
self.class.connection.execute(sanitize_sql_for_conditions(["SELECT 1 as one WHERE pg_try_advisory_lock(('x'||substr(md5(?), 1, 16))::bit(64)::bigint)", id])).ntuples.positive?
query = <<~SQL
SELECT 1 AS one
WHERE pg_try_advisory_lock(('x'||substr(md5(:table_name || :id::text), 1, 16))::bit(64)::bigint)
SQL
self.class.connection.execute(sanitize_sql_for_conditions([query, { table_name: self.class.table_name, id: send(self.class.primary_key) }])).ntuples.positive?
end

def advisory_unlock
query = <<~SQL
SELECT 1 AS one
WHERE pg_advisory_unlock(('x'||substr(md5(:table_name || :id::text), 1, 16))::bit(64)::bigint)
SQL
self.class.connection.execute(sanitize_sql_for_conditions([query, { table_name: self.class.table_name, id: send(self.class.primary_key) }])).ntuples.positive?
end

def advisory_lock!
Expand All @@ -45,38 +75,16 @@ def advisory_lock!
def with_advisory_lock
advisory_lock!
yield
rescue StandardError => e
advisory_unlock unless e.is_a? RecordAlreadyAdvisoryLockedError
raise
ensure
advisory_unlock unless $ERROR_INFO.is_a? RecordAlreadyAdvisoryLockedError
end

def advisory_locked?
self.class.connection.execute(<<~SQL).ntuples.positive?
SELECT 1 as one
FROM pg_locks
WHERE
locktype = 'advisory'
AND objsubid = 1
AND classid = ('x'||substr(md5('#{id}'), 1, 16))::bit(32)::int
AND objid = (('x'||substr(md5('#{id}'), 1, 16))::bit(64) << 32)::bit(32)::int
SQL
self.class.advisory_locked.where(id: send(self.class.primary_key)).any?
end

def owns_advisory_lock?
self.class.connection.execute(<<~SQL).ntuples.positive?
SELECT 1 as one
FROM pg_locks
WHERE
locktype = 'advisory'
AND objsubid = 1
AND classid = ('x'||substr(md5('#{id}'), 1, 16))::bit(32)::int
AND objid = (('x'||substr(md5('#{id}'), 1, 16))::bit(64) << 32)::bit(32)::int
AND pid = pg_backend_pid()
SQL
end

def advisory_unlock
self.class.connection.execute("SELECT pg_advisory_unlock(('x'||substr(md5('#{id}'), 1, 16))::bit(64)::bigint)").first["pg_advisory_unlock"]
self.class.owns_advisory_locked.where(id: send(self.class.primary_key)).any?
end

def advisory_unlock!
Expand Down
20 changes: 10 additions & 10 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,18 @@ def shutdown?

def create_thread
future = Concurrent::Future.new(args: [ordered_query], executor: @pool) do |query|
executed_job = false
good_job = nil

Rails.application.executor.wrap do
good_job = GoodJob::Job.first_advisory_locked_row(query)
break unless good_job
query.limit(1).with_advisory_lock do |good_jobs|
good_job = good_jobs.first
break unless good_job

executed_job = true
good_job.perform
good_job.advisory_unlock
good_job.perform
end
end

executed_job
good_job
end
future.add_observer(self, :task_observer)
future.execute
Expand All @@ -83,9 +83,9 @@ def timer_observer(time, executed_task, error)
ActiveSupport::Notifications.instrument("finished_timer_task.good_job", { result: executed_task, error: error, time: time })
end

def task_observer(time, executed_task, error)
ActiveSupport::Notifications.instrument("finished_job_task.good_job", { result: executed_task, error: error, time: time })
create_thread if executed_task
def task_observer(time, performed_job, error)
ActiveSupport::Notifications.instrument("finished_job_task.good_job", { good_job: performed_job, error: error, time: time })
create_thread if performed_job
end
end
end
51 changes: 49 additions & 2 deletions spec/good_job/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,62 @@ def perform(*_args, **_kwargs)
end

describe 'lockable' do
describe '.first_advisory_locked_row' do
describe '.advisory_lock' do
around do |example|
RSpec.configure do |config|
config.expect_with :rspec do |c|
original_max_formatted_output_length = c.instance_variable_get(:@max_formatted_output_length)

c.max_formatted_output_length = 1000000
example.run

c.max_formatted_output_length = original_max_formatted_output_length
end
end
end

it 'generates appropriate SQL' do
query = described_class.where(priority: 99).order(priority: :desc).limit(2).advisory_lock

expect(normalize_sql(query.to_sql)).to eq normalize_sql(<<~SQL)
SELECT "good_jobs".*
FROM "good_jobs"
WHERE "good_jobs"."id" IN (
WITH "rows" AS (
SELECT "good_jobs"."id"
FROM "good_jobs"
WHERE "good_jobs"."priority" = 99
ORDER BY "good_jobs"."priority" DESC
)
SELECT "rows"."id"
FROM "rows"
WHERE pg_try_advisory_lock(('x'||substr(md5('good_jobs' || "rows"."id"::text), 1, 16))::bit(64)::bigint)
LIMIT 2
)
ORDER BY "good_jobs"."priority" DESC
SQL
end

it 'returns first row of the query with a lock' do
expect(job).not_to be_advisory_locked
result_job = described_class.first_advisory_locked_row(described_class.all)
result_job = described_class.advisory_lock.first
expect(result_job).to eq job
expect(job).to be_advisory_locked
end
end

describe '.with_advisory_lock' do
it 'opens a block with a lock' do
records = nil
described_class.limit(2).with_advisory_lock do |results|
records = results
expect(records).to all be_advisory_locked
end

expect(records).to all be_advisory_unlocked
end
end

describe '#advisory_lock' do
it 'results in a locked record' do
job.advisory_lock!
Expand Down
7 changes: 7 additions & 0 deletions spec/support/sql_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module SqlHelper
def normalize_sql(sql)
sql.gsub(/\s/, ' ').gsub(/([\(\)])/, ' \1 ').squish
end
end

RSpec.configure { |c| c.include SqlHelper }