diff --git a/lib/good_job/lockable.rb b/lib/good_job/lockable.rb index 59eb67737..80d1f7a7f 100644 --- a/lib/good_job/lockable.rb +++ b/lib/good_job/lockable.rb @@ -5,36 +5,66 @@ module Lockable RecordAlreadyAdvisoryLockedError = Class.new(StandardError) included do + scope :advisory_lock, (lambda do + original_query = self + + cte_table = Arel::Table.new(:rows) + composed_cte = Arel::Nodes::As.new(cte_table, original_query.select(primary_key).except(:limit).arel) + + query = cte_table.project(cte_table[:id]) + .with(composed_cte) + .where(Arel.sql(sanitize_sql_for_conditions(["pg_try_advisory_lock(('x'||substr(md5(:table_name || \"#{cte_table.name}\".\"#{primary_key}\"::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }]))) + + limit = original_query.arel.ast.limit + query.limit = limit.value if limit.present? + + unscoped.where(arel_table[:id].in(query)).merge(original_query.only(:order)) + end) + scope :joins_advisory_locks, (lambda do - joins(<<~SQL) + join_sql = <<~SQL LEFT JOIN pg_locks ON pg_locks.locktype = 'advisory' AND pg_locks.objsubid = 1 - AND pg_locks.classid = ('x'||substr(md5(good_jobs.id::text), 1, 16))::bit(32)::int - AND pg_locks.objid = (('x'||substr(md5(good_jobs.id::text), 1, 16))::bit(64) << 32)::bit(32)::int + AND pg_locks.classid = ('x'||substr(md5(:table_name || "#{table_name}"."#{primary_key}"::text), 1, 16))::bit(32)::int + AND pg_locks.objid = (('x'||substr(md5(:table_name || "#{table_name}"."#{primary_key}"::text), 1, 16))::bit(64) << 32)::bit(32)::int SQL + + joins(sanitize_sql_for_conditions([join_sql, { table_name: table_name }])) end) scope :advisory_unlocked, -> { joins_advisory_locks.where(pg_locks: { locktype: nil }) } + scope :advisory_locked, -> { joins_advisory_locks.where.not(pg_locks: { locktype: nil }) } + scope :owns_advisory_locked, -> { joins_advisory_locks.where('"pg_locks"."pid" = pg_backend_pid()') } attr_accessor :create_with_advisory_lock - after_create -> { advisory_lock }, if: :create_with_advisory_lock end class_methods do - def first_advisory_locked_row(query) - find_by_sql(<<~SQL).first - WITH rows AS (#{query.to_sql}) - SELECT rows.* - FROM rows - WHERE pg_try_advisory_lock(('x'||substr(md5(id::text), 1, 16))::bit(64)::bigint) - LIMIT 1 - SQL + def with_advisory_lock(&block) + records = advisory_lock.to_a + begin + block.call(records) + ensure + records.each(&:advisory_unlock) + end end end def advisory_lock - self.class.connection.execute(sanitize_sql_for_conditions(["SELECT 1 as one WHERE pg_try_advisory_lock(('x'||substr(md5(?), 1, 16))::bit(64)::bigint)", id])).ntuples.positive? + query = <<~SQL + SELECT 1 AS one + WHERE pg_try_advisory_lock(('x'||substr(md5(:table_name || :id::text), 1, 16))::bit(64)::bigint) + SQL + self.class.connection.execute(sanitize_sql_for_conditions([query, { table_name: self.class.table_name, id: send(self.class.primary_key) }])).ntuples.positive? + end + + def advisory_unlock + query = <<~SQL + SELECT 1 AS one + WHERE pg_advisory_unlock(('x'||substr(md5(:table_name || :id::text), 1, 16))::bit(64)::bigint) + SQL + self.class.connection.execute(sanitize_sql_for_conditions([query, { table_name: self.class.table_name, id: send(self.class.primary_key) }])).ntuples.positive? end def advisory_lock! @@ -45,38 +75,16 @@ def advisory_lock! def with_advisory_lock advisory_lock! yield - rescue StandardError => e - advisory_unlock unless e.is_a? RecordAlreadyAdvisoryLockedError - raise + ensure + advisory_unlock unless $ERROR_INFO.is_a? RecordAlreadyAdvisoryLockedError end def advisory_locked? - self.class.connection.execute(<<~SQL).ntuples.positive? - SELECT 1 as one - FROM pg_locks - WHERE - locktype = 'advisory' - AND objsubid = 1 - AND classid = ('x'||substr(md5('#{id}'), 1, 16))::bit(32)::int - AND objid = (('x'||substr(md5('#{id}'), 1, 16))::bit(64) << 32)::bit(32)::int - SQL + self.class.advisory_locked.where(id: send(self.class.primary_key)).any? end def owns_advisory_lock? - self.class.connection.execute(<<~SQL).ntuples.positive? - SELECT 1 as one - FROM pg_locks - WHERE - locktype = 'advisory' - AND objsubid = 1 - AND classid = ('x'||substr(md5('#{id}'), 1, 16))::bit(32)::int - AND objid = (('x'||substr(md5('#{id}'), 1, 16))::bit(64) << 32)::bit(32)::int - AND pid = pg_backend_pid() - SQL - end - - def advisory_unlock - self.class.connection.execute("SELECT pg_advisory_unlock(('x'||substr(md5('#{id}'), 1, 16))::bit(64)::bigint)").first["pg_advisory_unlock"] + self.class.owns_advisory_locked.where(id: send(self.class.primary_key)).any? end def advisory_unlock! diff --git a/lib/good_job/scheduler.rb b/lib/good_job/scheduler.rb index 89eb49aa1..921f6fa27 100644 --- a/lib/good_job/scheduler.rb +++ b/lib/good_job/scheduler.rb @@ -62,18 +62,18 @@ def shutdown? def create_thread future = Concurrent::Future.new(args: [ordered_query], executor: @pool) do |query| - executed_job = false + good_job = nil Rails.application.executor.wrap do - good_job = GoodJob::Job.first_advisory_locked_row(query) - break unless good_job + query.limit(1).with_advisory_lock do |good_jobs| + good_job = good_jobs.first + break unless good_job - executed_job = true - good_job.perform - good_job.advisory_unlock + good_job.perform + end end - executed_job + good_job end future.add_observer(self, :task_observer) future.execute @@ -83,9 +83,9 @@ def timer_observer(time, executed_task, error) ActiveSupport::Notifications.instrument("finished_timer_task.good_job", { result: executed_task, error: error, time: time }) end - def task_observer(time, executed_task, error) - ActiveSupport::Notifications.instrument("finished_job_task.good_job", { result: executed_task, error: error, time: time }) - create_thread if executed_task + def task_observer(time, performed_job, error) + ActiveSupport::Notifications.instrument("finished_job_task.good_job", { good_job: performed_job, error: error, time: time }) + create_thread if performed_job end end end diff --git a/spec/good_job/job_spec.rb b/spec/good_job/job_spec.rb index 1802aa8c4..77b8063c7 100644 --- a/spec/good_job/job_spec.rb +++ b/spec/good_job/job_spec.rb @@ -15,15 +15,62 @@ def perform(*_args, **_kwargs) end describe 'lockable' do - describe '.first_advisory_locked_row' do + describe '.advisory_lock' do + around do |example| + RSpec.configure do |config| + config.expect_with :rspec do |c| + original_max_formatted_output_length = c.instance_variable_get(:@max_formatted_output_length) + + c.max_formatted_output_length = 1000000 + example.run + + c.max_formatted_output_length = original_max_formatted_output_length + end + end + end + + it 'generates appropriate SQL' do + query = described_class.where(priority: 99).order(priority: :desc).limit(2).advisory_lock + + expect(normalize_sql(query.to_sql)).to eq normalize_sql(<<~SQL) + SELECT "good_jobs".* + FROM "good_jobs" + WHERE "good_jobs"."id" IN ( + WITH "rows" AS ( + SELECT "good_jobs"."id" + FROM "good_jobs" + WHERE "good_jobs"."priority" = 99 + ORDER BY "good_jobs"."priority" DESC + ) + SELECT "rows"."id" + FROM "rows" + WHERE pg_try_advisory_lock(('x'||substr(md5('good_jobs' || "rows"."id"::text), 1, 16))::bit(64)::bigint) + LIMIT 2 + ) + ORDER BY "good_jobs"."priority" DESC + SQL + end + it 'returns first row of the query with a lock' do expect(job).not_to be_advisory_locked - result_job = described_class.first_advisory_locked_row(described_class.all) + result_job = described_class.advisory_lock.first expect(result_job).to eq job expect(job).to be_advisory_locked end end + describe '.with_advisory_lock' do + it 'opens a block with a lock' do + records = nil + described_class.limit(2).with_advisory_lock do |results| + records = results + expect(records).to all be_advisory_locked + end + + expect(records).to all be_advisory_unlocked + end + end + describe '#advisory_lock' do it 'results in a locked record' do job.advisory_lock! diff --git a/spec/support/sql_helper.rb b/spec/support/sql_helper.rb new file mode 100644 index 000000000..21e87b7ba --- /dev/null +++ b/spec/support/sql_helper.rb @@ -0,0 +1,7 @@ +module SqlHelper + def normalize_sql(sql) + sql.gsub(/\s/, ' ').gsub(/([\(\)])/, ' \1 ').squish + end +end + +RSpec.configure { |c| c.include SqlHelper }