Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Rails.logger and ActiveSupport::Notifications for logging instead of puts #10

Merged
merged 1 commit into from
Mar 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lib/good_job.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
require "rails"
require 'good_job/railtie'

require 'good_job/logging'
require 'good_job/lockable'
require 'good_job/job'
require 'good_job/inline_scheduler'
Expand All @@ -8,5 +10,7 @@
require 'good_job/adapter'

module GoodJob
# Your code goes here...
include Logging

ActiveSupport.run_load_hooks(:good_job, self)
end
5 changes: 4 additions & 1 deletion lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ def enqueue_at(job, timestamp)
good_job = GoodJob::Job.create(params)
job.provider_job_id = good_job.id

@scheduler.enqueue(good_job) if inline?
GoodJob.tag_logger do
ActiveSupport::Notifications.instrument("create.good_job", { good_job: good_job, job: job })
@scheduler.enqueue(good_job) if inline?
end

good_job
end
Expand Down
60 changes: 60 additions & 0 deletions lib/good_job/logging.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
module GoodJob::Logging
extend ActiveSupport::Concern

included do
cattr_accessor :logger, default: ActiveSupport::TaggedLogging.new(ActiveSupport::Logger.new(STDOUT))

def self.tag_logger(*tags)
if logger.respond_to?(:tagged)
tags.unshift "GoodJob" unless logger.formatter.current_tags.include?("GoodJob")
logger.tagged(*tags) { yield }
else
yield
end
end
end

class LogSubscriber < ActiveSupport::LogSubscriber
def create(event)
good_job = event.payload[:good_job]

info do
"Created GoodJob resource with id #{good_job.id}"
end
end

def timer_task_finished(event)
result = event.payload[:result]
exception = event.payload[:error]

if exception
error do
"ERROR: #{exception}\n #{exception.backtrace}"
end
end
end

def job_finished(event)
result = event.payload[:result]
exception = event.payload[:error]

if exception
error do
"ERROR: #{exception}\n #{exception.backtrace}"
end
end
end

private

def logger
GoodJob.logger
end

def thread_name
Thread.current.name || Thread.current.object_id
end
end
end

GoodJob::Logging::LogSubscriber.attach_to :good_job
3 changes: 3 additions & 0 deletions lib/good_job/railtie.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
module GoodJob
class Railtie < ::Rails::Railtie
initializer "good_job.logger" do
ActiveSupport.on_load(:good_job) { self.logger = ::Rails.logger }
end
end
end
52 changes: 22 additions & 30 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,22 @@

module GoodJob
class Scheduler
MINIMUM_EXECUTION_INTERVAL = 0.1
MAX_THREADS = Concurrent.processor_count

DEFAULT_TIMER_OPTIONS = {
execution_interval: 1,
timeout_interval: 1,
run_now: true
}.freeze

MAX_THREADS = Concurrent.processor_count

DEFAULT_POOL_OPTIONS = {
name: 'good_job',
min_threads: 0,
max_threads: MAX_THREADS,
auto_terminate: true,
idletime: 0,
max_queue: 0,
fallback_policy: :abort # shouldn't matter -- 0 max queue
name: 'good_job',
min_threads: 0,
max_threads: MAX_THREADS,
auto_terminate: true,
idletime: 0,
max_queue: 0,
fallback_policy: :abort # shouldn't matter -- 0 max queue
}.freeze

def initialize(query = GoodJob::Job.all, **options)
Expand All @@ -30,10 +28,9 @@ def initialize(query = GoodJob::Job.all, **options)
@pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS)
@timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS) do
idle_threads = @pool.max_length - @pool.length
puts "There are idle_threads: #{idle_threads}"
create_thread if idle_threads.positive?
true
end
@timer.add_observer(TimerObserver.new)
@timer.execute
end

Expand All @@ -59,34 +56,29 @@ def shutdown(wait: true)
def create_thread
future = Concurrent::Future.new(args: [ordered_query], executor: @pool) do |query|
Rails.application.executor.wrap do
thread_name = Thread.current.name || Thread.current.object_id
while job = query.with_advisory_lock.first
puts "Executing job #{job.id} in thread #{thread_name}"
while good_job = query.with_advisory_lock.first
ActiveSupport::Notifications.instrument("job_started.good_job", { good_job: good_job })

JobWrapper.new(job).perform
JobWrapper.new(good_job).perform

job.advisory_unlock
good_job.advisory_unlock
end
true
end
true
end
future.add_observer(TaskObserver.new(self))
future.add_observer(TaskObserver.new)
future.execute
end

class TaskObserver
def initialize(scheduler)
@scheduler = scheduler
class TimerObserver
def update(time, result, error)
ActiveSupport::Notifications.instrument("timer_task_finished.good_job", { result: result, error: error, time: time })
end
end

def update(time, result, ex)
if result
puts "(#{time}) Execution successfully returned #{result}\n"
elsif ex.is_a?(Concurrent::TimeoutError)
puts "(#{time}) Execution timed out\n"
else
puts "(#{time}) Execution failed with error #{result} #{ex}\n"
end
class TaskObserver
def update(time, result, error)
ActiveSupport::Notifications.instrument("job_finished.good_job", { result: result, error: error, time: time })
end
end
end
Expand Down
7 changes: 7 additions & 0 deletions spec/good_job/railtie_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
require 'rails_helper'

RSpec.describe GoodJob::Railtie do
it 'copies over the Rails logger by default' do
expect(GoodJob.logger).to eq Rails.logger
end
end
3 changes: 1 addition & 2 deletions spec/good_job/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ def perform(*args, **kwargs)
end
end)

stub_const 'RetryableError', Class.new(StandardError)
stub_const 'ErrorJob', (Class.new(ApplicationJob) do
RetryableError = Class.new(StandardError)

self.queue_name = 'test'
self.priority = 50
retry_on(RetryableError, wait: 0, attempts: 3) do |job, error|
Expand Down