Skip to content

Commit

Permalink
Refine metrics that are generated
Browse files Browse the repository at this point in the history
  • Loading branch information
reidmorrison committed Jun 28, 2024
1 parent 114f914 commit 70722e4
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 83 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ Gemfile.lock
TODO.md

.tool-versions
test/dummy/db/test.sqlite3
2 changes: 1 addition & 1 deletion lib/rails_semantic_logger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def self.subscriber_patterns(subscriber)
require("rails_semantic_logger/extensions/active_support/log_subscriber") if defined?(ActiveSupport::LogSubscriber)

begin
require 'rackup'
require "rackup"
rescue LoadError
# No need to do anything, will fall back to Rack
end
Expand Down
149 changes: 90 additions & 59 deletions lib/rails_semantic_logger/extensions/sidekiq/sidekiq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,26 @@ module Sidekiq
if defined?(::Sidekiq::JobLogger)
# Let Semantic Logger handle duration logging
class JobLogger
def call(item, queue)
def call(item, queue, &block)
klass = item["wrapped"] || item["class"]
metric = "Sidekiq/#{klass}/perform" if klass
logger = klass ? SemanticLogger[klass] : Sidekiq.logger
logger.info("Start #perform")
logger.measure_info(
"Completed #perform",
on_exception_level: :error,
log_exception: :full,
metric: metric
) do
yield

SemanticLogger.tagged(queue: queue) do
# Latency is the time between when the job was enqueued and when it started executing.
logger.info(
"Start #perform",
metric: "sidekiq.queue.latency",
metric_amount: job_latency_ms(item)
)

# Measure the duration of running the job
logger.measure_info(
"Completed #perform",
on_exception_level: :error,
log_exception: :full,
metric: "sidekiq.job.perform",
&block
)
end
end

Expand All @@ -60,14 +68,18 @@ def prepare(job_hash, &block)
end

def job_hash_context(job_hash)
h = {
class: job_hash["display_class"] || job_hash["wrapped"] || job_hash["class"],
jid: job_hash["jid"]
}
h[:bid] = job_hash["bid"] if job_hash["bid"]
h[:tags] = job_hash["tags"] if job_hash["tags"]
h = {jid: job_hash["jid"]}
h[:bid] = job_hash["bid"] if job_hash["bid"]
h[:tags] = job_hash["tags"] if job_hash["tags"]
h[:queue] = job_hash["queue"] if job_hash["queue"]
h
end

def job_latency_ms(job)
return unless job && job["enqueued_at"]

(Time.now.to_f - job["enqueued_at"].to_f) * 1000
end
end
end

Expand All @@ -80,10 +92,10 @@ def self.with_context(msg, &block)
end

def self.job_hash_context(job_hash)
klass = job_hash["wrapped"] || job_hash["class"]
event = { class: klass, jid: job_hash["jid"] }
event[:bid] = job_hash["bid"] if job_hash["bid"]
event
h = {jid: job_hash["jid"]}
h[:bid] = job_hash["bid"] if job_hash["bid"]
h[:queue] = job_hash["queue"] if job_hash["queue"]
h
end
end
end
Expand All @@ -93,49 +105,51 @@ def self.job_hash_context(job_hash)
if defined?(::Sidekiq::ExceptionHandler)
module ExceptionHandler
class Logger
def call(ex, ctx)
unless ctx.empty?
job_hash = ctx[:job] || {}
klass = job_hash["display_class"] || job_hash["wrapped"] || job_hash["class"]
logger = klass ? SemanticLogger[klass] : Sidekiq.logger
ctx[:context] ? logger.warn(ctx[:context], ctx) : logger.warn(ctx)
end
def call(_exception, ctx)
return if ctx.empty?

job_hash = ctx[:job] || {}
klass = job_hash["display_class"] || job_hash["wrapped"] || job_hash["class"]
logger = klass ? SemanticLogger[klass] : Sidekiq.logger
ctx[:context] ? logger.warn(ctx[:context], ctx) : logger.warn(ctx)
end
end
end
# Sidekiq >= v7
# Sidekiq >= v7
elsif defined?(::Sidekiq::Config)
class Config
remove_const :ERROR_HANDLER

ERROR_HANDLER = ->(ex, ctx, cfg = Sidekiq.default_configuration) {
ERROR_HANDLER = lambda { |_ex, ctx, _cfg = Sidekiq.default_configuration|
unless ctx.empty?
job_hash = ctx[:job] || {}
klass = job_hash["display_class"] || job_hash["wrapped"] || job_hash["class"]
logger = klass ? SemanticLogger[klass] : Sidekiq.logger
klass = job_hash["display_class"] || job_hash["wrapped"] || job_hash["class"]
logger = klass ? SemanticLogger[klass] : Sidekiq.logger
ctx[:context] ? logger.warn(ctx[:context], ctx) : logger.warn(ctx)
end
}
end
else
# Sidekiq >= 6.5
# TODO: Not taking effect. See test/sidekiq_test.rb
def self.default_error_handler(ex, ctx)
binding.irb
unless ctx.empty?
job_hash = ctx[:job] || {}
klass = job_hash["display_class"] || job_hash["wrapped"] || job_hash["class"]
logger = klass ? SemanticLogger[klass] : Sidekiq.logger
ctx[:context] ? logger.warn(ctx[:context], ctx) : logger.warn(ctx)
end
def self.default_error_handler(_exception, ctx)
return if ctx.empty?

job_hash = ctx[:job] || {}
klass = job_hash["display_class"] || job_hash["wrapped"] || job_hash["class"]
logger = klass ? SemanticLogger[klass] : Sidekiq.logger
ctx[:context] ? logger.warn(ctx[:context], ctx) : logger.warn(ctx)
end
end

# Logging within each worker should use its own logger
if Sidekiq::VERSION.to_i == 4
case Sidekiq::VERSION.to_i
when 4
module Worker
def self.included(base)
raise ArgumentError, "You cannot include Sidekiq::Worker in an ActiveJob: #{base.name}" if base.ancestors.any? { |c| c.name == "ActiveJob::Base" }
raise ArgumentError, "You cannot include Sidekiq::Worker in an ActiveJob: #{base.name}" if base.ancestors.any? do |c|
c.name == "ActiveJob::Base"
end

base.extend(ClassMethods)
base.include(SemanticLogger::Loggable)
Expand All @@ -144,10 +158,12 @@ def self.included(base)
base.class_attribute :sidekiq_retries_exhausted_block
end
end
elsif Sidekiq::VERSION.to_i == 5
when 5
module Worker
def self.included(base)
raise ArgumentError, "You cannot include Sidekiq::Worker in an ActiveJob: #{base.name}" if base.ancestors.any? { |c| c.name == "ActiveJob::Base" }
raise ArgumentError, "You cannot include Sidekiq::Worker in an ActiveJob: #{base.name}" if base.ancestors.any? do |c|
c.name == "ActiveJob::Base"
end

base.extend(ClassMethods)
base.include(SemanticLogger::Loggable)
Expand All @@ -156,10 +172,12 @@ def self.included(base)
base.sidekiq_class_attribute :sidekiq_retries_exhausted_block
end
end
elsif Sidekiq::VERSION.to_i == 6
when 6
module Worker
def self.included(base)
raise ArgumentError, "Sidekiq::Worker cannot be included in an ActiveJob: #{base.name}" if base.ancestors.any? { |c| c.name == "ActiveJob::Base" }
raise ArgumentError, "Sidekiq::Worker cannot be included in an ActiveJob: #{base.name}" if base.ancestors.any? do |c|
c.name == "ActiveJob::Base"
end

base.include(Options)
base.extend(ClassMethods)
Expand All @@ -169,7 +187,9 @@ def self.included(base)
else
module Job
def self.included(base)
raise ArgumentError, "Sidekiq::Job cannot be included in an ActiveJob: #{base.name}" if base.ancestors.any? { |c| c.name == "ActiveJob::Base" }
raise ArgumentError, "Sidekiq::Job cannot be included in an ActiveJob: #{base.name}" if base.ancestors.any? do |c|
c.name == "ActiveJob::Base"
end

base.include(Options)
base.extend(ClassMethods)
Expand All @@ -182,28 +202,39 @@ def self.included(base)
# Convert string to machine readable format
class Processor
def log_context(job_hash)
klass = job_hash["wrapped"] || job_hash["class"]
event = { class: klass, jid: job_hash["jid"] }
event[:bid] = job_hash["bid"] if job_hash["bid"]
event
h = {jid: job_hash["jid"]}
h[:bid] = job_hash["bid"] if job_hash["bid"]
h[:queue] = job_hash["queue"] if job_hash["queue"]
h
end
end

# Let Semantic Logger handle duration logging
module Middleware
module Server
class Logging
def call(worker, item, queue)
worker.logger.info("Start #perform")
worker.logger.measure_info(
"Completed #perform",
on_exception_level: :error,
log_exception: :full,
metric: "Sidekiq/#{worker.class.name}/perform"
) do
yield
def call(worker, item, queue, &block)
SemanticLogger.named_tags(queue: queue) do
worker.logger.info(
"Start #perform",
metric: "sidekiq.queue.latency",
metric_amount: job_latency_ms(item)
)
worker.logger.measure_info(
"Completed #perform",
on_exception_level: :error,
log_exception: :full,
metric: "sidekiq.job.perform",
&block
)
end
end

def job_latency_ms(job)
return unless job && job["enqueued_at"]

(Time.now.to_f - job["enqueued_at"]) * 1000
end
end
end
end
Expand Down
4 changes: 1 addition & 3 deletions test/dummy/config/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@

module Dummy
class Application < Rails::Application
if config.respond_to?(:load_defaults)
config.load_defaults "#{Rails::VERSION::MAJOR}.#{Rails::VERSION::MINOR}"
end
config.load_defaults "#{Rails::VERSION::MAJOR}.#{Rails::VERSION::MINOR}" if config.respond_to?(:load_defaults)

# Configure sensitive parameters which will be filtered from the log file.
config.filter_parameters += [:password]
Expand Down
48 changes: 28 additions & 20 deletions test/sidekiq_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,37 @@ class SidekiqTest < Minitest::Test

describe "#perform" do
let(:config) { Sidekiq::Config.new(error_handlers: []) }
let(:msg) { Sidekiq.dump_json({ "class" => job.to_s, "args" => args }) }
let(:msg) { Sidekiq.dump_json({"class" => job.to_s, "args" => args, "enqueued_at" => 1.minute.ago}) }
let(:uow) { Sidekiq::BasicFetch::UnitOfWork.new("queue:default", msg) }
if Sidekiq::VERSION.to_i == 6 && Sidekiq::VERSION.to_f < 6.5
let(:processor) do
mgr = Minitest::Mock.new
opts = {:queues => ['default']}
mgr = Minitest::Mock.new
opts = {queues: ["default"]}
opts[:fetch] = Sidekiq::BasicFetch.new(opts)
Sidekiq::Processor.new(mgr, opts)
end
elsif Sidekiq::VERSION.to_i == 6
let(:processor) do
# TODO: Sidekiq 6.5 default handler is not accepting the patch
handler = ->(ex, ctx) {
handler = lambda { |_ex, ctx|
unless ctx.empty?
job_hash = ctx[:job] || {}
klass = job_hash["display_class"] || job_hash["wrapped"] || job_hash["class"]
logger = klass ? SemanticLogger[klass] : Sidekiq.logger
klass = job_hash["display_class"] || job_hash["wrapped"] || job_hash["class"]
logger = klass ? SemanticLogger[klass] : Sidekiq.logger
ctx[:context] ? logger.warn(ctx[:context], ctx) : logger.warn(ctx)
end
}
config = Sidekiq
config = Sidekiq
config[:error_handlers] = [handler]
config[:fetch] = Sidekiq::BasicFetch.new(config)
config[:fetch] = Sidekiq::BasicFetch.new(config)
Sidekiq::Processor.new(config) { |*args| }
end
elsif Sidekiq::VERSION.to_i < 7
let(:processor) do
mgr = Minitest::Mock.new
mgr.expect(:options, {:queues => ['default']})
mgr.expect(:options, {:queues => ['default']})
mgr.expect(:options, {:queues => ['default']})
mgr.expect(:options, {queues: ["default"]})
mgr.expect(:options, {queues: ["default"]})
mgr.expect(:options, {queues: ["default"]})
Sidekiq::Processor.new(mgr)
end
else
Expand All @@ -67,17 +67,20 @@ class SidekiqTest < Minitest::Test
level: :info,
name: "SimpleJob",
message_includes: "Start #perform",
named_tags: { class: "SimpleJob", jid: nil }
metric: "sidekiq.queue.latency",
named_tags: {jid: nil, queue: "default"}
)
assert messages[0].metric_amount.is_a?(Float)

assert_semantic_logger_event(
messages[1],
level: :info,
name: "SimpleJob",
message_includes: "Completed #perform",
metric: "Sidekiq/SimpleJob/perform",
named_tags: { class: "SimpleJob", jid: nil }
metric: "sidekiq.job.perform",
named_tags: {jid: nil, queue: "default"}
)
assert messages[1].duration.is_a?(Float)
end

describe "with Bad Job" do
Expand All @@ -98,27 +101,32 @@ class SidekiqTest < Minitest::Test
level: :info,
name: "BadJob",
message: "Start #perform",
named_tags: { class: "BadJob", jid: nil }
metric: "sidekiq.queue.latency",
named_tags: {jid: nil, queue: "default"}
)
assert messages[0].metric_amount.is_a?(Float)

assert_semantic_logger_event(
messages[1],
level: :error,
name: "BadJob",
message: "Completed #perform",
metric: "Sidekiq/BadJob/perform",
named_tags: { class: "BadJob", jid: nil }
# exception: { name: "ArgumentError", message: "This is a bad job" }
metric: "sidekiq.job.perform",
named_tags: {jid: nil, queue: "default"}
# exception: { name: "ArgumentError", message: "This is a bad job" }
)
assert messages[1].duration.is_a?(Float)

assert_semantic_logger_event(
messages[2],
level: :warn,
name: "BadJob",
message: "Job raised exception",
payload_includes: { context: "Job raised exception", job: { "class" => "BadJob", "args" => [] } }
# exception: { name: "ArgumentError", message: "This is a bad job" }
payload_includes: {context: "Job raised exception"}
# exception: { name: "ArgumentError", message: "This is a bad job" }
)
assert_equal messages[2].payload[:job]["class"], "BadJob"
assert_equal messages[2].payload[:job]["args"], []
end
end
end
Expand Down

0 comments on commit 70722e4

Please sign in to comment.