Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add child_spans for Sidekiq Queue instrumentation #2403

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
### Features

- Add `include_sentry_event` matcher for RSpec [#2424](https://github.com/getsentry/sentry-ruby/pull/2424)
- Add support for Sentry Cache instrumentation, when using Rails.cache [#2380](https://github.com/getsentry/sentry-ruby/pull/2380)
- Add support for Sentry Cache instrumentation, when using Rails.cache ([#2380](https://github.com/getsentry/sentry-ruby/pull/2380))
- Add support for Queue Instrumentation for Sidekiq. [#2403](https://github.com/getsentry/sentry-ruby/pull/2403)

Note: MemoryStore and FileStore require Rails 8.0+

Expand Down
2 changes: 2 additions & 0 deletions sentry-sidekiq/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ end

gem "rails", "> 5.0.0"

gem "timecop"

eval_gemfile File.expand_path("../Gemfile", __dir__)
35 changes: 30 additions & 5 deletions sentry-sidekiq/lib/sentry/sidekiq/sentry_context_middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,24 @@

module Sentry
module Sidekiq
module Helpers
def set_span_data(span, id:, queue:, latency: nil, retry_count: nil)
if span
span.set_data("messaging.message.id", id)
span.set_data("messaging.destination.name", queue)
span.set_data("messaging.message.receive.latency", latency) if latency
span.set_data("messaging.message.retry.count", retry_count) if retry_count
end
end
end

class SentryContextServerMiddleware
OP_NAME = "queue.sidekiq"
include Sentry::Sidekiq::Helpers

OP_NAME = "queue.process"
SPAN_ORIGIN = "auto.queue.sidekiq"

def call(_worker, job, queue)
def call(worker, job, queue)
return yield unless Sentry.initialized?

context_filter = Sentry::Sidekiq::ContextFilter.new(job)
Expand All @@ -23,7 +36,12 @@ def call(_worker, job, queue)
scope.set_contexts(sidekiq: job.merge("queue" => queue))
scope.set_transaction_name(context_filter.transaction_name, source: :task)
transaction = start_transaction(scope, job["trace_propagation_headers"])
scope.set_span(transaction) if transaction

if transaction
scope.set_span(transaction)

set_span_data(transaction, id: job["jid"], queue: queue, latency: ((Time.now.to_f - job["enqueued_at"]) * 1000).to_i, retry_count: job["retry_count"] || 0)
end

begin
yield
Expand Down Expand Up @@ -63,13 +81,20 @@ def finish_transaction(transaction, status)
end

class SentryContextClientMiddleware
def call(_worker_class, job, _queue, _redis_pool)
include Sentry::Sidekiq::Helpers

def call(worker_class, job, queue, _redis_pool)
return yield unless Sentry.initialized?

user = Sentry.get_current_scope.user
job["sentry_user"] = user unless user.empty?
job["trace_propagation_headers"] ||= Sentry.get_trace_propagation_headers
yield

Sentry.with_child_span(op: "queue.publish", description: worker_class.to_s) do |span|
set_span_data(span, id: job["jid"], queue: queue)

yield
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require "spec_helper"
require "timecop"

RSpec.shared_context "sidekiq", shared_context: :metadata do
let(:user) { { "id" => rand(10_000) } }
Expand Down Expand Up @@ -65,6 +66,30 @@
expect(transaction.contexts.dig(:trace, :origin)).to eq('auto.queue.sidekiq')
end

it "adds a queue.process spans" do
Timecop.freeze do
execute_worker(processor, HappyWorker)
execute_worker(processor, HappyWorker, jid: '123456', timecop_delay: Time.now + 86400)

expect(transport.events.count).to eq(2)

transaction = transport.events[0]
expect(transaction).not_to be_nil
expect(transaction.spans.count).to eq(0)
expect(transaction.contexts[:trace][:data]['messaging.message.id']).to eq('123123') # Default defined in #execute_worker
expect(transaction.contexts[:trace][:data]['messaging.destination.name']).to eq('default')
expect(transaction.contexts[:trace][:data]['messaging.message.retry.count']).to eq(0)
expect(transaction.contexts[:trace][:data]['messaging.message.receive.latency']).to eq(0)

transaction = transport.events[1]
expect(transaction).not_to be_nil
expect(transaction.spans.count).to eq(0)
expect(transaction.contexts[:trace][:data]['messaging.message.id']).to eq('123456') # Explicitly set above.
expect(transaction.contexts[:trace][:data]['messaging.destination.name']).to eq('default')
expect(transaction.contexts[:trace][:data]['messaging.message.receive.latency']).to eq(86400000)
end
end

context "with trace_propagation_headers" do
let(:parent_transaction) { Sentry.start_transaction(op: "sidekiq") }

Expand All @@ -73,6 +98,7 @@
execute_worker(processor, HappyWorker, trace_propagation_headers: trace_propagation_headers)

expect(transport.events.count).to eq(1)

transaction = transport.events[0]
expect(transaction).not_to be_nil
expect(transaction.contexts.dig(:trace, :trace_id)).to eq(parent_transaction.trace_id)
Expand Down Expand Up @@ -156,5 +182,18 @@
expect(second_headers["sentry-trace"]).to eq(transaction.to_sentry_trace)
expect(second_headers["baggage"]).to eq(transaction.to_baggage)
end

it "has a queue.publish span" do
message_id = client.push('queue' => 'default', 'class' => HappyWorker, 'args' => [])

transaction.finish

expect(transport.events.count).to eq(1)
event = transport.events.last
expect(event.spans.count).to eq(1)
expect(event.spans[0][:op]).to eq("queue.publish")
expect(event.spans[0][:data]['messaging.message.id']).to eq(message_id)
expect(event.spans[0][:data]['messaging.destination.name']).to eq('default')
end
end
end
2 changes: 1 addition & 1 deletion sentry-sidekiq/spec/sentry/sidekiq_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def retry_last_failed_job
expect(transaction.contexts.dig(:trace, :trace_id)).to be_a(String)
expect(transaction.contexts.dig(:trace, :span_id)).to be_a(String)
expect(transaction.contexts.dig(:trace, :status)).to eq("ok")
expect(transaction.contexts.dig(:trace, :op)).to eq("queue.sidekiq")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@frederikspang Did this break sampling? In the docs you show that you can do

case op
when /sidekiq/
  0.01
[...]

But this is always set to queue.process now so we are sampling a lot more than we should which is costing us money...

Was this intentional or am I misunderstanding something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is very much not intentional. I will submit a PR for docs as well. I was unaware of this example being there.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we've been running out of performance traces more often recently and were confused what had happened. And then I found that our sampler was not running correctly :-/

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this is not all about the docs. It's like a public API and can't just be changed like this without proper deprecations etc. It must be costing your customers a lot of money in both extra Sentry charges as well as debugging time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get what you're saying - Except it's not my customers. I've done this as OSS contribution to help the Sentry Ruby SDK conform to the guidelines and "predefined operations" as defined by the Sentry core system - ie making the Queues page work properly for Sidekiq jobs.

Deprecating and semantic versioning of what goes into what release version is very not much my job.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha I thought you worked for Sentry :)
Sorry if my tone was harsh, I didn’t mean to offend. I know mistakes can be made.
My message was more directed to Sentry the business.

Also the feature is great and many thanks for implementing it! 🙏

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @Linuus I'm one of the maintainers and I approved this PR. It didn't occur to me that transaction's op should be treated as a public interface here. Thanks for bringing this up and I'm very sorry that it caused such issues for you. I updated both the CHANGELOG and the release page with a warning that explains this change.

expect(transaction.contexts.dig(:trace, :op)).to eq("queue.process")
end

it "records transaction with exception" do
Expand Down
9 changes: 7 additions & 2 deletions sentry-sidekiq/spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,20 @@ def sidekiq_config(opts)

def execute_worker(processor, klass, **options)
klass_options = klass.sidekiq_options_hash || {}

# for Ruby < 2.6
klass_options.each do |k, v|
options[k.to_sym] = v
end

msg = Sidekiq.dump_json(jid: "123123", class: klass, args: [], **options)
jid = options.delete(:jid) || "123123"
timecop_delay = options.delete(:timecop_delay)

msg = Sidekiq.dump_json(created_at: Time.now.to_f, enqueued_at: Time.now.to_f, jid: jid, class: klass, args: [], **options)
Timecop.freeze(timecop_delay) if timecop_delay
work = Sidekiq::BasicFetch::UnitOfWork.new('queue:default', msg)
process_work(processor, work)
ensure
Timecop.return if timecop_delay
end

def process_work(processor, work)
Expand Down
Loading