Skip to content

Commit

Permalink
Fix further compatibility issues with ruby < 2.5
Browse files Browse the repository at this point in the history
  • Loading branch information
sudoremo committed May 7, 2024
1 parent 63dd22e commit 0e2bbb7
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 86 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Workhorse Changelog

## 1.2.19 - 2024-05-07

* Fix further compatibility issues with `ruby < 2.5`

Sitrox reference: #124538.

## 1.2.18 - 2024-05-07

* Fix compatibility with `ruby < 2.5`
Expand Down
52 changes: 28 additions & 24 deletions lib/workhorse/performer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,45 @@ def initialize(db_job_id, worker)
end

def perform
fail 'Performer can only run once.' if @started
@started = true
perform!
rescue Exception => e
Workhorse.on_exception.call(e)
begin
fail 'Performer can only run once.' if @started
@started = true
perform!
rescue Exception => e
Workhorse.on_exception.call(e)
end
end

private

def perform!
Thread.current[:workhorse_current_performer] = self
begin
Thread.current[:workhorse_current_performer] = self

ActiveRecord::Base.connection_pool.with_connection do
if defined?(Rails) && Rails.respond_to?(:application) && Rails.application && Rails.application.respond_to?(:executor)
Rails.application.executor.wrap do
ActiveRecord::Base.connection_pool.with_connection do
if defined?(Rails) && Rails.respond_to?(:application) && Rails.application && Rails.application.respond_to?(:executor)
Rails.application.executor.wrap do
perform_wrapped
end
else
perform_wrapped
end
else
perform_wrapped
end
end
rescue Exception => e
# ---------------------------------------------------------------
# Mark job as failed
# ---------------------------------------------------------------
log %(#{e.message}\n#{e.backtrace.join("\n")}), :error
rescue Exception => e
# ---------------------------------------------------------------
# Mark job as failed
# ---------------------------------------------------------------
log %(#{e.message}\n#{e.backtrace.join("\n")}), :error

Workhorse.tx_callback.call do
log 'Mark failed', :debug
@db_job.mark_failed!(e)
end
Workhorse.tx_callback.call do
log 'Mark failed', :debug
@db_job.mark_failed!(e)
end

fail e
ensure
Thread.current[:workhorse_current_performer] = nil
fail e
ensure
Thread.current[:workhorse_current_performer] = nil
end
end

def perform_wrapped
Expand Down
96 changes: 49 additions & 47 deletions lib/workhorse/poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -141,60 +141,62 @@ def sleep
end

def with_global_lock(name: :workhorse, timeout: 2, &_block)
if @is_oracle
result = Workhorse::DbJob.connection.select_all(
"SELECT DBMS_LOCK.REQUEST(#{ORACLE_LOCK_HANDLE}, #{ORACLE_LOCK_MODE}, #{timeout}) FROM DUAL"
).first.values.last
begin
if @is_oracle
result = Workhorse::DbJob.connection.select_all(
"SELECT DBMS_LOCK.REQUEST(#{ORACLE_LOCK_HANDLE}, #{ORACLE_LOCK_MODE}, #{timeout}) FROM DUAL"
).first.values.last

success = result == 0
else
result = Workhorse::DbJob.connection.select_all(
"SELECT GET_LOCK(CONCAT(DATABASE(), '_#{name}'), #{timeout})"
).first.values.last
success = result == 1
end
success = result == 0
else
result = Workhorse::DbJob.connection.select_all(
"SELECT GET_LOCK(CONCAT(DATABASE(), '_#{name}'), #{timeout})"
).first.values.last
success = result == 1
end

if success
@global_lock_fails = 0
@max_global_lock_fails_reached = false
else
@global_lock_fails += 1
if success
@global_lock_fails = 0
@max_global_lock_fails_reached = false
else
@global_lock_fails += 1

unless @max_global_lock_fails_reached
worker.log 'Could not obtain global lock, retrying with next poll.', :warn
end
unless @max_global_lock_fails_reached
worker.log 'Could not obtain global lock, retrying with next poll.', :warn
end

if @global_lock_fails > Workhorse.max_global_lock_fails && !@max_global_lock_fails_reached
@max_global_lock_fails_reached = true

worker.log 'Could not obtain global lock, retrying with next poll. ' \
'This will be the last such message for this worker until ' \
'the issue is resolved.', :warn

message = "Worker reached maximum number of consecutive times (#{Workhorse.max_global_lock_fails}) " \
"where the global lock could no be acquired within the specified timeout (#{timeout}). " \
'A worker that obtained this lock may have crashed without ending the database ' \
'connection properly. On MySQL, use "show processlist;" to see which connection(s) ' \
'is / are holding the lock for a long period of time and consider killing them using ' \
"MySQL's \"kill <Id>\" command. This message will be issued only once per worker " \
'and may only be re-triggered if the error happens again *after* the lock has ' \
'been solved in the meantime.'

worker.log message
exception = StandardError.new(message)
Workhorse.on_exception.call(exception)
if @global_lock_fails > Workhorse.max_global_lock_fails && !@max_global_lock_fails_reached
@max_global_lock_fails_reached = true

worker.log 'Could not obtain global lock, retrying with next poll. ' \
'This will be the last such message for this worker until ' \
'the issue is resolved.', :warn

message = "Worker reached maximum number of consecutive times (#{Workhorse.max_global_lock_fails}) " \
"where the global lock could no be acquired within the specified timeout (#{timeout}). " \
'A worker that obtained this lock may have crashed without ending the database ' \
'connection properly. On MySQL, use "show processlist;" to see which connection(s) ' \
'is / are holding the lock for a long period of time and consider killing them using ' \
"MySQL's \"kill <Id>\" command. This message will be issued only once per worker " \
'and may only be re-triggered if the error happens again *after* the lock has ' \
'been solved in the meantime.'

worker.log message
exception = StandardError.new(message)
Workhorse.on_exception.call(exception)
end
end
end

return unless success
return unless success

yield
ensure
if success
if @is_oracle
Workhorse::DbJob.connection.execute("SELECT DBMS_LOCK.RELEASE(#{ORACLE_LOCK_HANDLE}) FROM DUAL")
else
Workhorse::DbJob.connection.execute("SELECT RELEASE_LOCK(CONCAT(DATABASE(), '_#{name}'))")
yield
ensure
if success
if @is_oracle
Workhorse::DbJob.connection.execute("SELECT DBMS_LOCK.RELEASE(#{ORACLE_LOCK_HANDLE}) FROM DUAL")
else
Workhorse::DbJob.connection.execute("SELECT RELEASE_LOCK(CONCAT(DATABASE(), '_#{name}'))")
end
end
end
end
Expand Down
10 changes: 6 additions & 4 deletions lib/workhorse/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ def post
active_threads.increment

@executor.post do
yield
ensure
active_threads.decrement
@on_idle.try(:call)
begin
yield
ensure
active_threads.decrement
@on_idle.try(:call)
end
end
end
end
Expand Down
26 changes: 15 additions & 11 deletions lib/workhorse/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -144,19 +144,23 @@ def idle
end

def perform(db_job_id)
mutex.synchronize do
assert_state! :running
log "Posting job #{db_job_id} to thread pool"

@pool.post do
Workhorse::Performer.new(db_job_id, self).perform
rescue Exception => e
log %(#{e.message}\n#{e.backtrace.join("\n")}), :error
Workhorse.on_exception.call(e)
begin
mutex.synchronize do
assert_state! :running
log "Posting job #{db_job_id} to thread pool"

@pool.post do
begin
Workhorse::Performer.new(db_job_id, self).perform
rescue Exception => e
log %(#{e.message}\n#{e.backtrace.join("\n")}), :error
Workhorse.on_exception.call(e)
end
end
end
rescue Exception => e
Workhorse.on_exception.call(e)
end
rescue Exception => e
Workhorse.on_exception.call(e)
end

private
Expand Down

0 comments on commit 0e2bbb7

Please sign in to comment.