From 70722e4eba760b4d1ee068ab7e22f5d4263ea064 Mon Sep 17 00:00:00 2001 From: Reid Morrison Date: Fri, 28 Jun 2024 16:27:55 -0400 Subject: [PATCH] Refine metrics that are generated --- .gitignore | 1 + lib/rails_semantic_logger.rb | 2 +- .../extensions/sidekiq/sidekiq.rb | 149 +++++++++++------- test/dummy/config/application.rb | 4 +- test/sidekiq_test.rb | 48 +++--- 5 files changed, 121 insertions(+), 83 deletions(-) diff --git a/.gitignore b/.gitignore index 50f6afc..5204e2b 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ Gemfile.lock TODO.md .tool-versions +test/dummy/db/test.sqlite3 diff --git a/lib/rails_semantic_logger.rb b/lib/rails_semantic_logger.rb index c6dc90f..876016b 100644 --- a/lib/rails_semantic_logger.rb +++ b/lib/rails_semantic_logger.rb @@ -69,7 +69,7 @@ def self.subscriber_patterns(subscriber) require("rails_semantic_logger/extensions/active_support/log_subscriber") if defined?(ActiveSupport::LogSubscriber) begin - require 'rackup' + require "rackup" rescue LoadError # No need to do anything, will fall back to Rack end diff --git a/lib/rails_semantic_logger/extensions/sidekiq/sidekiq.rb b/lib/rails_semantic_logger/extensions/sidekiq/sidekiq.rb index 7a38c3b..522ccd6 100644 --- a/lib/rails_semantic_logger/extensions/sidekiq/sidekiq.rb +++ b/lib/rails_semantic_logger/extensions/sidekiq/sidekiq.rb @@ -33,18 +33,26 @@ module Sidekiq if defined?(::Sidekiq::JobLogger) # Let Semantic Logger handle duration logging class JobLogger - def call(item, queue) + def call(item, queue, &block) klass = item["wrapped"] || item["class"] - metric = "Sidekiq/#{klass}/perform" if klass logger = klass ? SemanticLogger[klass] : Sidekiq.logger - logger.info("Start #perform") - logger.measure_info( - "Completed #perform", - on_exception_level: :error, - log_exception: :full, - metric: metric - ) do - yield + + SemanticLogger.tagged(queue: queue) do + # Latency is the time between when the job was enqueued and when it started executing. + logger.info( + "Start #perform", + metric: "sidekiq.queue.latency", + metric_amount: job_latency_ms(item) + ) + + # Measure the duration of running the job + logger.measure_info( + "Completed #perform", + on_exception_level: :error, + log_exception: :full, + metric: "sidekiq.job.perform", + &block + ) end end @@ -60,14 +68,18 @@ def prepare(job_hash, &block) end def job_hash_context(job_hash) - h = { - class: job_hash["display_class"] || job_hash["wrapped"] || job_hash["class"], - jid: job_hash["jid"] - } - h[:bid] = job_hash["bid"] if job_hash["bid"] - h[:tags] = job_hash["tags"] if job_hash["tags"] + h = {jid: job_hash["jid"]} + h[:bid] = job_hash["bid"] if job_hash["bid"] + h[:tags] = job_hash["tags"] if job_hash["tags"] + h[:queue] = job_hash["queue"] if job_hash["queue"] h end + + def job_latency_ms(job) + return unless job && job["enqueued_at"] + + (Time.now.to_f - job["enqueued_at"].to_f) * 1000 + end end end @@ -80,10 +92,10 @@ def self.with_context(msg, &block) end def self.job_hash_context(job_hash) - klass = job_hash["wrapped"] || job_hash["class"] - event = { class: klass, jid: job_hash["jid"] } - event[:bid] = job_hash["bid"] if job_hash["bid"] - event + h = {jid: job_hash["jid"]} + h[:bid] = job_hash["bid"] if job_hash["bid"] + h[:queue] = job_hash["queue"] if job_hash["queue"] + h end end end @@ -93,26 +105,26 @@ def self.job_hash_context(job_hash) if defined?(::Sidekiq::ExceptionHandler) module ExceptionHandler class Logger - def call(ex, ctx) - unless ctx.empty? - job_hash = ctx[:job] || {} - klass = job_hash["display_class"] || job_hash["wrapped"] || job_hash["class"] - logger = klass ? SemanticLogger[klass] : Sidekiq.logger - ctx[:context] ? logger.warn(ctx[:context], ctx) : logger.warn(ctx) - end + def call(_exception, ctx) + return if ctx.empty? + + job_hash = ctx[:job] || {} + klass = job_hash["display_class"] || job_hash["wrapped"] || job_hash["class"] + logger = klass ? SemanticLogger[klass] : Sidekiq.logger + ctx[:context] ? logger.warn(ctx[:context], ctx) : logger.warn(ctx) end end end - # Sidekiq >= v7 + # Sidekiq >= v7 elsif defined?(::Sidekiq::Config) class Config remove_const :ERROR_HANDLER - ERROR_HANDLER = ->(ex, ctx, cfg = Sidekiq.default_configuration) { + ERROR_HANDLER = lambda { |_ex, ctx, _cfg = Sidekiq.default_configuration| unless ctx.empty? job_hash = ctx[:job] || {} - klass = job_hash["display_class"] || job_hash["wrapped"] || job_hash["class"] - logger = klass ? SemanticLogger[klass] : Sidekiq.logger + klass = job_hash["display_class"] || job_hash["wrapped"] || job_hash["class"] + logger = klass ? SemanticLogger[klass] : Sidekiq.logger ctx[:context] ? logger.warn(ctx[:context], ctx) : logger.warn(ctx) end } @@ -120,22 +132,24 @@ class Config else # Sidekiq >= 6.5 # TODO: Not taking effect. See test/sidekiq_test.rb - def self.default_error_handler(ex, ctx) - binding.irb - unless ctx.empty? - job_hash = ctx[:job] || {} - klass = job_hash["display_class"] || job_hash["wrapped"] || job_hash["class"] - logger = klass ? SemanticLogger[klass] : Sidekiq.logger - ctx[:context] ? logger.warn(ctx[:context], ctx) : logger.warn(ctx) - end + def self.default_error_handler(_exception, ctx) + return if ctx.empty? + + job_hash = ctx[:job] || {} + klass = job_hash["display_class"] || job_hash["wrapped"] || job_hash["class"] + logger = klass ? SemanticLogger[klass] : Sidekiq.logger + ctx[:context] ? logger.warn(ctx[:context], ctx) : logger.warn(ctx) end end # Logging within each worker should use its own logger - if Sidekiq::VERSION.to_i == 4 + case Sidekiq::VERSION.to_i + when 4 module Worker def self.included(base) - raise ArgumentError, "You cannot include Sidekiq::Worker in an ActiveJob: #{base.name}" if base.ancestors.any? { |c| c.name == "ActiveJob::Base" } + raise ArgumentError, "You cannot include Sidekiq::Worker in an ActiveJob: #{base.name}" if base.ancestors.any? do |c| + c.name == "ActiveJob::Base" + end base.extend(ClassMethods) base.include(SemanticLogger::Loggable) @@ -144,10 +158,12 @@ def self.included(base) base.class_attribute :sidekiq_retries_exhausted_block end end - elsif Sidekiq::VERSION.to_i == 5 + when 5 module Worker def self.included(base) - raise ArgumentError, "You cannot include Sidekiq::Worker in an ActiveJob: #{base.name}" if base.ancestors.any? { |c| c.name == "ActiveJob::Base" } + raise ArgumentError, "You cannot include Sidekiq::Worker in an ActiveJob: #{base.name}" if base.ancestors.any? do |c| + c.name == "ActiveJob::Base" + end base.extend(ClassMethods) base.include(SemanticLogger::Loggable) @@ -156,10 +172,12 @@ def self.included(base) base.sidekiq_class_attribute :sidekiq_retries_exhausted_block end end - elsif Sidekiq::VERSION.to_i == 6 + when 6 module Worker def self.included(base) - raise ArgumentError, "Sidekiq::Worker cannot be included in an ActiveJob: #{base.name}" if base.ancestors.any? { |c| c.name == "ActiveJob::Base" } + raise ArgumentError, "Sidekiq::Worker cannot be included in an ActiveJob: #{base.name}" if base.ancestors.any? do |c| + c.name == "ActiveJob::Base" + end base.include(Options) base.extend(ClassMethods) @@ -169,7 +187,9 @@ def self.included(base) else module Job def self.included(base) - raise ArgumentError, "Sidekiq::Job cannot be included in an ActiveJob: #{base.name}" if base.ancestors.any? { |c| c.name == "ActiveJob::Base" } + raise ArgumentError, "Sidekiq::Job cannot be included in an ActiveJob: #{base.name}" if base.ancestors.any? do |c| + c.name == "ActiveJob::Base" + end base.include(Options) base.extend(ClassMethods) @@ -182,10 +202,10 @@ def self.included(base) # Convert string to machine readable format class Processor def log_context(job_hash) - klass = job_hash["wrapped"] || job_hash["class"] - event = { class: klass, jid: job_hash["jid"] } - event[:bid] = job_hash["bid"] if job_hash["bid"] - event + h = {jid: job_hash["jid"]} + h[:bid] = job_hash["bid"] if job_hash["bid"] + h[:queue] = job_hash["queue"] if job_hash["queue"] + h end end @@ -193,17 +213,28 @@ def log_context(job_hash) module Middleware module Server class Logging - def call(worker, item, queue) - worker.logger.info("Start #perform") - worker.logger.measure_info( - "Completed #perform", - on_exception_level: :error, - log_exception: :full, - metric: "Sidekiq/#{worker.class.name}/perform" - ) do - yield + def call(worker, item, queue, &block) + SemanticLogger.named_tags(queue: queue) do + worker.logger.info( + "Start #perform", + metric: "sidekiq.queue.latency", + metric_amount: job_latency_ms(item) + ) + worker.logger.measure_info( + "Completed #perform", + on_exception_level: :error, + log_exception: :full, + metric: "sidekiq.job.perform", + &block + ) end end + + def job_latency_ms(job) + return unless job && job["enqueued_at"] + + (Time.now.to_f - job["enqueued_at"]) * 1000 + end end end end diff --git a/test/dummy/config/application.rb b/test/dummy/config/application.rb index cc5ab4e..5dab859 100644 --- a/test/dummy/config/application.rb +++ b/test/dummy/config/application.rb @@ -6,9 +6,7 @@ module Dummy class Application < Rails::Application - if config.respond_to?(:load_defaults) - config.load_defaults "#{Rails::VERSION::MAJOR}.#{Rails::VERSION::MINOR}" - end + config.load_defaults "#{Rails::VERSION::MAJOR}.#{Rails::VERSION::MINOR}" if config.respond_to?(:load_defaults) # Configure sensitive parameters which will be filtered from the log file. config.filter_parameters += [:password] diff --git a/test/sidekiq_test.rb b/test/sidekiq_test.rb index bc30fe4..ffa264a 100644 --- a/test/sidekiq_test.rb +++ b/test/sidekiq_test.rb @@ -17,37 +17,37 @@ class SidekiqTest < Minitest::Test describe "#perform" do let(:config) { Sidekiq::Config.new(error_handlers: []) } - let(:msg) { Sidekiq.dump_json({ "class" => job.to_s, "args" => args }) } + let(:msg) { Sidekiq.dump_json({"class" => job.to_s, "args" => args, "enqueued_at" => 1.minute.ago}) } let(:uow) { Sidekiq::BasicFetch::UnitOfWork.new("queue:default", msg) } if Sidekiq::VERSION.to_i == 6 && Sidekiq::VERSION.to_f < 6.5 let(:processor) do - mgr = Minitest::Mock.new - opts = {:queues => ['default']} + mgr = Minitest::Mock.new + opts = {queues: ["default"]} opts[:fetch] = Sidekiq::BasicFetch.new(opts) Sidekiq::Processor.new(mgr, opts) end elsif Sidekiq::VERSION.to_i == 6 let(:processor) do # TODO: Sidekiq 6.5 default handler is not accepting the patch - handler = ->(ex, ctx) { + handler = lambda { |_ex, ctx| unless ctx.empty? job_hash = ctx[:job] || {} - klass = job_hash["display_class"] || job_hash["wrapped"] || job_hash["class"] - logger = klass ? SemanticLogger[klass] : Sidekiq.logger + klass = job_hash["display_class"] || job_hash["wrapped"] || job_hash["class"] + logger = klass ? SemanticLogger[klass] : Sidekiq.logger ctx[:context] ? logger.warn(ctx[:context], ctx) : logger.warn(ctx) end } - config = Sidekiq + config = Sidekiq config[:error_handlers] = [handler] - config[:fetch] = Sidekiq::BasicFetch.new(config) + config[:fetch] = Sidekiq::BasicFetch.new(config) Sidekiq::Processor.new(config) { |*args| } end elsif Sidekiq::VERSION.to_i < 7 let(:processor) do mgr = Minitest::Mock.new - mgr.expect(:options, {:queues => ['default']}) - mgr.expect(:options, {:queues => ['default']}) - mgr.expect(:options, {:queues => ['default']}) + mgr.expect(:options, {queues: ["default"]}) + mgr.expect(:options, {queues: ["default"]}) + mgr.expect(:options, {queues: ["default"]}) Sidekiq::Processor.new(mgr) end else @@ -67,17 +67,20 @@ class SidekiqTest < Minitest::Test level: :info, name: "SimpleJob", message_includes: "Start #perform", - named_tags: { class: "SimpleJob", jid: nil } + metric: "sidekiq.queue.latency", + named_tags: {jid: nil, queue: "default"} ) + assert messages[0].metric_amount.is_a?(Float) assert_semantic_logger_event( messages[1], level: :info, name: "SimpleJob", message_includes: "Completed #perform", - metric: "Sidekiq/SimpleJob/perform", - named_tags: { class: "SimpleJob", jid: nil } + metric: "sidekiq.job.perform", + named_tags: {jid: nil, queue: "default"} ) + assert messages[1].duration.is_a?(Float) end describe "with Bad Job" do @@ -98,27 +101,32 @@ class SidekiqTest < Minitest::Test level: :info, name: "BadJob", message: "Start #perform", - named_tags: { class: "BadJob", jid: nil } + metric: "sidekiq.queue.latency", + named_tags: {jid: nil, queue: "default"} ) + assert messages[0].metric_amount.is_a?(Float) assert_semantic_logger_event( messages[1], level: :error, name: "BadJob", message: "Completed #perform", - metric: "Sidekiq/BadJob/perform", - named_tags: { class: "BadJob", jid: nil } - # exception: { name: "ArgumentError", message: "This is a bad job" } + metric: "sidekiq.job.perform", + named_tags: {jid: nil, queue: "default"} + # exception: { name: "ArgumentError", message: "This is a bad job" } ) + assert messages[1].duration.is_a?(Float) assert_semantic_logger_event( messages[2], level: :warn, name: "BadJob", message: "Job raised exception", - payload_includes: { context: "Job raised exception", job: { "class" => "BadJob", "args" => [] } } - # exception: { name: "ArgumentError", message: "This is a bad job" } + payload_includes: {context: "Job raised exception"} + # exception: { name: "ArgumentError", message: "This is a bad job" } ) + assert_equal messages[2].payload[:job]["class"], "BadJob" + assert_equal messages[2].payload[:job]["args"], [] end end end