Skip to content

Commit

Permalink
Use Rails.logger and ActiveSupport::Notifications for logging instead…
Browse files Browse the repository at this point in the history
… of puts (#10)
  • Loading branch information
bensheldon authored Mar 6, 2020
1 parent fa161ab commit 889d036
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 34 deletions.
6 changes: 5 additions & 1 deletion lib/good_job.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
require "rails"
require 'good_job/railtie'

require 'good_job/logging'
require 'good_job/lockable'
require 'good_job/job'
require 'good_job/inline_scheduler'
Expand All @@ -8,5 +10,7 @@
require 'good_job/adapter'

module GoodJob
# Your code goes here...
include Logging

ActiveSupport.run_load_hooks(:good_job, self)
end
5 changes: 4 additions & 1 deletion lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ def enqueue_at(job, timestamp)
good_job = GoodJob::Job.create(params)
job.provider_job_id = good_job.id

@scheduler.enqueue(good_job) if inline?
GoodJob.tag_logger do
ActiveSupport::Notifications.instrument("create.good_job", { good_job: good_job, job: job })
@scheduler.enqueue(good_job) if inline?
end

good_job
end
Expand Down
60 changes: 60 additions & 0 deletions lib/good_job/logging.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
module GoodJob::Logging
extend ActiveSupport::Concern

included do
cattr_accessor :logger, default: ActiveSupport::TaggedLogging.new(ActiveSupport::Logger.new(STDOUT))

def self.tag_logger(*tags)
if logger.respond_to?(:tagged)
tags.unshift "GoodJob" unless logger.formatter.current_tags.include?("GoodJob")
logger.tagged(*tags) { yield }
else
yield
end
end
end

class LogSubscriber < ActiveSupport::LogSubscriber
def create(event)
good_job = event.payload[:good_job]

info do
"Created GoodJob resource with id #{good_job.id}"
end
end

def timer_task_finished(event)
result = event.payload[:result]
exception = event.payload[:error]

if exception
error do
"ERROR: #{exception}\n #{exception.backtrace}"
end
end
end

def job_finished(event)
result = event.payload[:result]
exception = event.payload[:error]

if exception
error do
"ERROR: #{exception}\n #{exception.backtrace}"
end
end
end

private

def logger
GoodJob.logger
end

def thread_name
Thread.current.name || Thread.current.object_id
end
end
end

GoodJob::Logging::LogSubscriber.attach_to :good_job
3 changes: 3 additions & 0 deletions lib/good_job/railtie.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
module GoodJob
class Railtie < ::Rails::Railtie
initializer "good_job.logger" do
ActiveSupport.on_load(:good_job) { self.logger = ::Rails.logger }
end
end
end
52 changes: 22 additions & 30 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,22 @@

module GoodJob
class Scheduler
MINIMUM_EXECUTION_INTERVAL = 0.1
MAX_THREADS = Concurrent.processor_count

DEFAULT_TIMER_OPTIONS = {
execution_interval: 1,
timeout_interval: 1,
run_now: true
}.freeze

MAX_THREADS = Concurrent.processor_count

DEFAULT_POOL_OPTIONS = {
name: 'good_job',
min_threads: 0,
max_threads: MAX_THREADS,
auto_terminate: true,
idletime: 0,
max_queue: 0,
fallback_policy: :abort # shouldn't matter -- 0 max queue
name: 'good_job',
min_threads: 0,
max_threads: MAX_THREADS,
auto_terminate: true,
idletime: 0,
max_queue: 0,
fallback_policy: :abort # shouldn't matter -- 0 max queue
}.freeze

def initialize(query = GoodJob::Job.all, **options)
Expand All @@ -30,10 +28,9 @@ def initialize(query = GoodJob::Job.all, **options)
@pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS)
@timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS) do
idle_threads = @pool.max_length - @pool.length
puts "There are idle_threads: #{idle_threads}"
create_thread if idle_threads.positive?
true
end
@timer.add_observer(TimerObserver.new)
@timer.execute
end

Expand All @@ -59,34 +56,29 @@ def shutdown(wait: true)
def create_thread
future = Concurrent::Future.new(args: [ordered_query], executor: @pool) do |query|
Rails.application.executor.wrap do
thread_name = Thread.current.name || Thread.current.object_id
while job = query.with_advisory_lock.first
puts "Executing job #{job.id} in thread #{thread_name}"
while good_job = query.with_advisory_lock.first
ActiveSupport::Notifications.instrument("job_started.good_job", { good_job: good_job })

JobWrapper.new(job).perform
JobWrapper.new(good_job).perform

job.advisory_unlock
good_job.advisory_unlock
end
true
end
true
end
future.add_observer(TaskObserver.new(self))
future.add_observer(TaskObserver.new)
future.execute
end

class TaskObserver
def initialize(scheduler)
@scheduler = scheduler
class TimerObserver
def update(time, result, error)
ActiveSupport::Notifications.instrument("timer_task_finished.good_job", { result: result, error: error, time: time })
end
end

def update(time, result, ex)
if result
puts "(#{time}) Execution successfully returned #{result}\n"
elsif ex.is_a?(Concurrent::TimeoutError)
puts "(#{time}) Execution timed out\n"
else
puts "(#{time}) Execution failed with error #{result} #{ex}\n"
end
class TaskObserver
def update(time, result, error)
ActiveSupport::Notifications.instrument("job_finished.good_job", { result: result, error: error, time: time })
end
end
end
Expand Down
7 changes: 7 additions & 0 deletions spec/good_job/railtie_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
require 'rails_helper'

RSpec.describe GoodJob::Railtie do
it 'copies over the Rails logger by default' do
expect(GoodJob.logger).to eq Rails.logger
end
end
3 changes: 1 addition & 2 deletions spec/good_job/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ def perform(*args, **kwargs)
end
end)

stub_const 'RetryableError', Class.new(StandardError)
stub_const 'ErrorJob', (Class.new(ApplicationJob) do
RetryableError = Class.new(StandardError)

self.queue_name = 'test'
self.priority = 50
retry_on(RetryableError, wait: 0, attempts: 3) do |job, error|
Expand Down

0 comments on commit 889d036

Please sign in to comment.