Skip to content

Commit

Permalink
Lock GoodJob::Job on active_job_id instead of row id; add separator h…
Browse files Browse the repository at this point in the history
…yphen to lock key
  • Loading branch information
bensheldon committed Aug 18, 2021
1 parent 299ec19 commit d16079b
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 28 deletions.
2 changes: 1 addition & 1 deletion lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class Job < Object.const_get(GoodJob.active_record_parent_class)
DEFAULT_PRIORITY = 0

self.table_name = 'good_jobs'
self.advisory_lockable_column = 'id'
self.advisory_lockable_column = 'active_job_id'

attr_readonly :serialized_params

Expand Down
8 changes: 4 additions & 4 deletions lib/good_job/lockable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ module Lockable
composed_cte = Arel::Nodes::As.new(cte_table, Arel::Nodes::SqlLiteral.new([cte_type, "(", cte_query.to_sql, ")"].join(' ')))
query = cte_table.project(cte_table[:id])
.with(composed_cte)
.where(Arel.sql(sanitize_sql_for_conditions(["#{function}(('x' || substr(md5(:table_name || #{connection.quote_table_name(cte_table.name)}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }])))
.where(Arel.sql(sanitize_sql_for_conditions(["#{function}(('x' || substr(md5(:table_name || '-' || #{connection.quote_table_name(cte_table.name)}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }])))

limit = original_query.arel.ast.limit
query.limit = limit.value if limit.present?
Expand All @@ -75,8 +75,8 @@ module Lockable
join_sql = <<~SQL.squish
LEFT JOIN pg_locks ON pg_locks.locktype = 'advisory'
AND pg_locks.objsubid = 1
AND pg_locks.classid = ('x' || substr(md5(:table_name || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x' || substr(md5(:table_name || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64) << 32)::bit(32)::int
AND pg_locks.classid = ('x' || substr(md5(:table_name || '-' || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x' || substr(md5(:table_name || '-' || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64) << 32)::bit(32)::int
SQL

joins(sanitize_sql_for_conditions([join_sql, { table_name: table_name }]))
Expand Down Expand Up @@ -315,7 +315,7 @@ def advisory_unlock!(key: lockable_key, function: self.class.advisory_unlockable
# Default Advisory Lock key
# @return [String]
def lockable_key
[self.class.table_name, self[self.class._advisory_lockable_column]].join
"#{self.class.table_name}-#{self[self.class._advisory_lockable_column]}"
end

delegate :pg_or_jdbc_query, to: :class
Expand Down
2 changes: 1 addition & 1 deletion spec/lib/good_job/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def perform(result_value = nil, raise_error: false)
end

describe '#executable?' do
let(:good_job) { described_class.create! }
let(:good_job) { described_class.create!(active_job_id: SecureRandom.uuid) }

it 'is true when locked' do
good_job.with_advisory_lock do
Expand Down
50 changes: 28 additions & 22 deletions spec/lib/good_job/lockable_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

RSpec.describe GoodJob::Lockable do
let(:model_class) { GoodJob::Job }
let(:job) { model_class.create(queue_name: "default") }
let(:job) { model_class.create(active_job_id: SecureRandom.uuid, queue_name: "default") }

describe '.advisory_lock' do
around do |example|
Expand All @@ -19,26 +19,32 @@
end
end

it 'generates appropriate SQL' do
query = model_class.where(priority: 99).order(priority: :desc).limit(2).advisory_lock
context 'when using default lockable column' do
before do
allow(model_class).to receive(:advisory_lockable_column).and_return(:id)
end

expect(normalize_sql(query.to_sql)).to eq normalize_sql(<<~SQL.squish)
SELECT "good_jobs".*
FROM "good_jobs"
WHERE "good_jobs"."id" IN (
WITH "rows" AS #{'MATERIALIZED' if model_class.supports_cte_materialization_specifiers?} (
SELECT "good_jobs"."id"
FROM "good_jobs"
WHERE "good_jobs"."priority" = 99
ORDER BY "good_jobs"."priority" DESC
it 'generates appropriate SQL' do
query = model_class.where(priority: 99).order(priority: :desc).limit(2).advisory_lock

expect(normalize_sql(query.to_sql)).to eq normalize_sql(<<~SQL.squish)
SELECT "good_jobs".*
FROM "good_jobs"
WHERE "good_jobs"."id" IN (
WITH "rows" AS #{'MATERIALIZED' if model_class.supports_cte_materialization_specifiers?} (
SELECT "good_jobs"."id", "good_jobs"."id"
FROM "good_jobs"
WHERE "good_jobs"."priority" = 99
ORDER BY "good_jobs"."priority" DESC
)
SELECT "rows"."id"
FROM "rows"
WHERE pg_try_advisory_lock(('x' || substr(md5('good_jobs' || '-' || "rows"."id"::text), 1, 16))::bit(64)::bigint)
LIMIT 2
)
SELECT "rows"."id"
FROM "rows"
WHERE pg_try_advisory_lock(('x' || substr(md5('good_jobs' || "rows"."id"::text), 1, 16))::bit(64)::bigint)
LIMIT 2
)
ORDER BY "good_jobs"."priority" DESC
SQL
ORDER BY "good_jobs"."priority" DESC
SQL
end
end

it 'can be customized with `lockable_column`' do
Expand All @@ -56,7 +62,7 @@
)
SELECT "rows"."id"
FROM "rows"
WHERE pg_try_advisory_lock(('x' || substr(md5('good_jobs' || "rows"."queue_name"::text), 1, 16))::bit(64)::bigint)
WHERE pg_try_advisory_lock(('x' || substr(md5('good_jobs' || '-' || "rows"."queue_name"::text), 1, 16))::bit(64)::bigint)
LIMIT 2
)
ORDER BY "good_jobs"."priority" DESC
Expand All @@ -76,10 +82,10 @@
expect(job).not_to be_advisory_locked
result_job = model_class.advisory_lock(column: :queue_name).first
expect(result_job).to eq job
expect(job).to be_advisory_locked(key: "good_jobsdefault")
expect(job).to be_advisory_locked(key: "good_jobs-default")
expect(job).not_to be_advisory_locked # on default key

job.advisory_unlock(key: "good_jobsdefault")
job.advisory_unlock(key: "good_jobs-default")
end
end

Expand Down

0 comments on commit d16079b

Please sign in to comment.