Skip to content

Commit

Permalink
add yugabyte adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
ankithads committed Jun 25, 2024
1 parent 55741c3 commit 5c6353d
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 8 deletions.
2 changes: 2 additions & 0 deletions lib/que.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def connection=(connection)
self.adapter =
if connection.to_s == "ActiveRecord"
Adapters::ActiveRecord.new
elsif connection.to_s == "Que::Adapters::Yugabyte"
Adapters::Yugabyte.new
else
case connection.class.to_s
when "Sequel::Postgres::Database" then Adapters::Sequel.new(connection)
Expand Down
1 change: 1 addition & 0 deletions lib/que/adapters/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module Adapters
autoload :PG, "que/adapters/pg"
autoload :Pond, "que/adapters/pond"
autoload :Sequel, "que/adapters/sequel"
autoload :Yugabyte, "que/adapters/yugabyte"

class UnavailableConnection < StandardError; end

Expand Down
75 changes: 75 additions & 0 deletions lib/que/adapters/yugabyte.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# frozen_string_literal: true

# https://github.com/que-rb/que/blob/80d6067861a41766c3adb7e29b230ce93d94c8a4/lib/que/active_job/extensions.rb
module Que
module Adapters
class Yugabyte < Que::Adapters::ActiveRecord
def initialize
super
end

def checkout_activerecord_adapter(&block)
YugabyteRecord.connection_pool.with_connection(&block)
end

# def establish_lock_database_connection
# Thread.current["lock_database_connection_#{Thread.current.__id__}"] = LockDatabaseRecord.connection
# end

# def lock_database_connection
# # connection = @lock_database_connection[Thread.current.name]
# # return connection unless connection.nil?
# # @lock_database_connection[Thread.current.name] = LockDatabaseRecord.connection
# @lock_database_connection ||= LockDatabaseRecord.connection
# end

def setup_lock_database_connection
::LockDatabaseRecord.connection
end

# def execute(command, params=[])
# if command == :lock_job
# queue, cursor, lock_database_connection = params
# lock_job_with_lock_database(queue, cursor, lock_database_connection)
# elsif command == :unlock_job
# job_id, lock_database_connection = params
# unlock_job(job_id, lock_database_connection)
# else
# super(command, params)
# end
# end

def lock_job_with_lock_database(queue, cursor, lock_database_connection)
query = QueJob.select(:job_id, :queue, :priority, :run_at, :job_class, :retryable, :args, :error_count)
.select("extract(epoch from (now() - run_at)) as latency")
.where("queue = ? AND job_id >= ? AND run_at <= ?", queue, cursor, Time.now)
.where(retryable: true)
.order(:priority, :run_at, :job_id)
.limit(1).to_sql

result = Que.execute(query)
return result if result.empty?

if locked?(result.first['job_id'], lock_database_connection)
return result
end

# continue the recursion to fetch the next available job
lock_job_with_lock_database(queue, result.first['job_id'], lock_database_connection)
end

def cleanup!
YugabyteRecord.connection_pool.release_connection
LockDatabaseRecord.connection_pool.release_connection
end

def locked?(job_id, lock_database_connection)
lock_database_connection.execute("SELECT pg_try_advisory_lock(#{job_id})").first["pg_try_advisory_lock"]
end

def unlock_job(job_id, lock_database_connection)
lock_database_connection.execute("SELECT pg_advisory_unlock(#{job_id})")
end
end
end
end
48 changes: 40 additions & 8 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
require_relative "./helpers/sleep_job"
require_relative "./helpers/interruptible_sleep_job"
require_relative "./helpers/user"
require_relative "../lib/que/adapters/yugabyte"

def postgres_now
ActiveRecord::Base.connection.execute("SELECT NOW();")[0]["now"]
Expand All @@ -22,20 +23,15 @@ def establish_database_connection
ActiveRecord::Base.establish_connection(
adapter: "postgresql",
host: ENV.fetch("PGHOST", "localhost"),
user: ENV.fetch("PGUSER", "postgres"),
password: ENV.fetch("PGPASSWORD", ""),
user: ENV.fetch("PGUSER", "ubuntu"),
password: ENV.fetch("PGPASSWORD", "password"),
database: ENV.fetch("PGDATABASE", "que-test"),
)
end

establish_database_connection

# Make sure our test database is prepared to run Que
Que.connection = ActiveRecord
Que.migrate!


class LockDataBaseRecord < ActiveRecord::Base
class LockDatabaseRecord < ActiveRecord::Base
def self.establish_lock_database_connection
establish_connection(
adapter: "postgresql",
Expand All @@ -51,10 +47,46 @@ def self.connection
end
end

class YugabyteRecord < ActiveRecord::Base
def self.establish_lock_database_connection
establish_connection(
adapter: "postgresql",
host: ENV.fetch("PGHOST", "localhost"),
user: ENV.fetch("PGUSER", "ubuntu"),
password: ENV.fetch("PGPASSWORD", "password"),
database: ENV.fetch("PGDATABASE", "que-test"),
)
end
def self.connection
establish_lock_database_connection.connection
end
end

# Make sure our test database is prepared to run Que
if ENV['YUGABYTE_QUE_WORKER_ENABLED']
Que.connection = Que::Adapters::Yugabyte
else
Que.connection = ActiveRecord
end

Que.migrate!

# Ensure we have a logger, so that we can test the code paths that log
Que.logger = Logger.new("/dev/null")


RSpec.configure do |config|
# config.before(:each, :with_yugabyte_adapter) do
# Que.adapter.cleanup!
# Que.connection = Que::Adapters::Yugabyte
# end

# config.after(:each, :with_yugabyte_adapter) do
# Que.adapter.cleanup!
# Que.connection = ActiveRecord
# end
config.filter_run_when_matching :conditional_test if ENV['YUGABYTE_QUE_WORKER_ENABLED']

config.before do
QueJob.delete_all
FakeJob.log = []
Expand Down

0 comments on commit 5c6353d

Please sign in to comment.