Skip to content

Commit

Permalink
pass the connection pool on initialise the adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
ankithads committed Jun 27, 2024
1 parent f413153 commit 8f096af
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 23 deletions.
8 changes: 6 additions & 2 deletions lib/que.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,14 @@ def connection=(connection)
self.adapter =
if connection.to_s == "ActiveRecord"
Adapters::ActiveRecord.new
elsif connection.to_s == "Que::Adapters::Yugabyte"
Adapters::Yugabyte.new
else
case connection.class.to_s
when "Que::Adapters::ActiveRecordWithLock" then

Adapters::ActiveRecordWithLock.new(
job_connection_pool: connection.job_connection_pool,
lock_connection_pool: connection.lock_connection_pool
)
when "Sequel::Postgres::Database" then Adapters::Sequel.new(connection)
when "ConnectionPool" then Adapters::ConnectionPool.new(connection)
when "PG::Connection" then Adapters::PG.new(connection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,39 @@
# https://github.com/que-rb/que/blob/80d6067861a41766c3adb7e29b230ce93d94c8a4/lib/que/active_job/extensions.rb
module Que
module Adapters
class Yugabyte < Que::Adapters::ActiveRecord
def initialize
class ActiveRecordWithLock < Que::Adapters::ActiveRecord
attr_accessor :job_connection_pool, :lock_connection_pool
def initialize(job_connection_pool:, lock_connection_pool:)
@job_connection_pool = job_connection_pool
@lock_connection_pool = lock_connection_pool
super
end

def checkout_activerecord_adapter(&block)
YugabyteRecord.connection_pool.with_connection(&block)
@job_connection_pool.with_connection(&block)
end

def checkout_lock_database_connection
# when multiple threads are running we need to make sure
# the acquiring and releasing of advisory locks is done by the
# same connection
Thread.current[:db_connection] ||= LockDatabaseRecord.connection_pool.checkout
Thread.current[:db_connection] ||= lock_connection_pool.checkout
end

def lock_database_connection
Thread.current[:db_connection]
end

def release_lock_database_connection
LockDatabaseRecord.connection_pool.checkin(Thread.current[:db_connection])
@lock_connection_pool.checkin(Thread.current[:db_connection])
end

def execute(command, params=[])
if command == :lock_job
case command
when :lock_job then
queue, cursor = params
lock_job_with_lock_database(queue, cursor)
elsif command == :unlock_job
when :unlock_job then
job_id = params[0]
unlock_job(job_id)
else
Expand All @@ -43,7 +47,7 @@ def lock_job_with_lock_database(queue, cursor)
result = Que.execute(:find_job_to_lock, [queue, cursor])
return result if result.empty?

if locked?(result.first['job_id'])
if pg_try_advisory_lock?(result.first['job_id'])
return result
end

Expand All @@ -52,11 +56,11 @@ def lock_job_with_lock_database(queue, cursor)
end

def cleanup!
YugabyteRecord.connection_pool.release_connection
LockDatabaseRecord.connection_pool.release_connection
@job_connection_pool.release_connection
@lock_connection_pool.release_connection
end

def locked?(job_id)
def pg_try_advisory_lock?(job_id)
lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})").try(:first)&.fetch('pg_try_advisory_lock')
end

Expand Down
2 changes: 1 addition & 1 deletion lib/que/adapters/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Adapters
autoload :PG, "que/adapters/pg"
autoload :Pond, "que/adapters/pond"
autoload :Sequel, "que/adapters/sequel"
autoload :Yugabyte, "que/adapters/yugabyte"
autoload :ActiveRecordWithLock, "que/adapters/active_record_with_lock"

class UnavailableConnection < StandardError; end

Expand Down
2 changes: 1 addition & 1 deletion lib/que/locker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def lock_job_query(queue, cursor)

def handle_expired_cursors!
@consolidated_queues.each do |queue|
queue_cursor_expires_at = @queue_expires_at.fetch(queue, monotonic_now)
queue_cursor_expires_at = @queue_expires_at.fetch(queue, monotonic_now)
reset_cursor_for!(queue) if queue_cursor_expires_at < monotonic_now
end
end
Expand Down
13 changes: 5 additions & 8 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
require_relative "./helpers/sleep_job"
require_relative "./helpers/interruptible_sleep_job"
require_relative "./helpers/user"
require_relative "../lib/que/adapters/yugabyte"


def postgres_now
ActiveRecord::Base.connection.execute("SELECT NOW();")[0]["now"]
Expand Down Expand Up @@ -44,23 +44,21 @@ class LockDatabaseRecord < ActiveRecord::Base
end

class YugabyteRecord < ActiveRecord::Base
def self.establish_lock_database_connection
establish_connection(
adapter: "postgresql",
host: ENV.fetch("PGHOST", "localhost"),
user: ENV.fetch("PGUSER", "ubuntu"),
password: ENV.fetch("PGPASSWORD", "password"),
database: ENV.fetch("PGDATABASE", "que-test"),
)
end
def self.connection
establish_lock_database_connection.connection
end
end

# Make sure our test database is prepared to run Que
if ENV['YUGABYTE_QUE_WORKER_ENABLED']
Que.connection = Que::Adapters::Yugabyte
Que.connection = Que::Adapters::ActiveRecordWithLock.new(
job_connection_pool: YugabyteRecord.connection_pool,
lock_connection_pool: LockDatabaseRecord.connection_pool,
)
else
Que.connection = ActiveRecord
end
Expand Down Expand Up @@ -91,7 +89,6 @@ def self.connection
end

config.before do
LockDatabaseRecord.connection_pool.release_connection
QueJob.delete_all
FakeJob.log = []
ExceptionalJob.log = []
Expand Down

0 comments on commit 8f096af

Please sign in to comment.