Skip to content

Commit

Permalink
Merge pull request #390 from delner/active_support_notifications_ensu…
Browse files Browse the repository at this point in the history
…re_clean_context

Extract #ensure_clean_context! from ActiveSupport::Notifications subscription
  • Loading branch information
Emanuele Palazzetti authored Apr 5, 2018
2 parents f1681f1 + c65431b commit 9dfc349
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 8 deletions.
26 changes: 20 additions & 6 deletions lib/ddtrace/contrib/active_support/notifications/subscription.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,20 @@ def initialize(tracer, span_name, options, &block)
@span_name = span_name
@options = options
@block = block
@before_trace_callbacks = []
@after_trace_callbacks = []
end

def before_trace(&block)
@before_trace_callbacks << block if block_given?
end

def after_trace(&block)
@after_trace_callbacks << block if block_given?
end

def start(_name, _id, _payload)
ensure_clean_context!
run_callbacks(@before_trace_callbacks)
tracer.trace(@span_name, @options)
end

Expand All @@ -28,6 +38,7 @@ def finish(name, id, payload)
return nil if span.nil?
block.call(span, name, id, payload)
span.finish
run_callbacks(@after_trace_callbacks)
end
end

Expand Down Expand Up @@ -57,11 +68,14 @@ def subscribers
@subscribers ||= {}
end

private

def ensure_clean_context!
return unless tracer.call_context.current_span
tracer.provider.context = Context.new
def run_callbacks(callbacks)
callbacks.each do |callback|
begin
callback.call
rescue StandardError => e
Datadog::Tracer.log.debug("ActiveSupport::Notifications callback failed: #{e.message}")
end
end
end
end
end
Expand Down
22 changes: 20 additions & 2 deletions lib/ddtrace/contrib/racecar/patcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,17 @@ module Patcher
option :service_name, default: 'racecar'

on_subscribe do
subscribe('process_message.racecar', self::NAME_MESSAGE, {}, configuration[:tracer], &method(:process))
subscribe('process_batch.racecar', self::NAME_BATCH, {}, configuration[:tracer], &method(:process))
# Subscribe to single messages
subscription(self::NAME_MESSAGE, {}, configuration[:tracer], &method(:process)).tap do |subscription|
subscription.before_trace(&method(:ensure_clean_context!))
subscription.subscribe('process_message.racecar')
end

# Subscribe to batch messages
subscription(self::NAME_BATCH, {}, configuration[:tracer], &method(:process)).tap do |subscription|
subscription.before_trace(&method(:ensure_clean_context!))
subscription.subscribe('process_batch.racecar')
end
end

class << self
Expand Down Expand Up @@ -61,6 +70,15 @@ def configuration
def compatible?
defined?(::Racecar) && defined?(::ActiveSupport::Notifications)
end

# Context objects are thread-bound.
# If Racecar re-uses threads, context from a previous trace
# could leak into the new trace. This "cleans" current context,
# preventing such a leak.
def ensure_clean_context!
return unless tracer.call_context.current_span
tracer.provider.context = Context.new
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,66 @@
end
end

describe '#before_trace' do
context 'given a block' do
let(:callback_block) { Proc.new { callback_spy.call } }
let(:callback_spy) { double('callback spy') }
before(:each) { subscription.before_trace(&callback_block) }

shared_examples_for 'a before_trace callback' do
context 'on #start' do
it do
expect(callback_spy).to receive(:call).ordered
expect(tracer).to receive(:trace).ordered
subscription.start(double('name'), double('id'), double('payload'))
end
end
end

context 'that doesn\'t raise an error' do
let(:callback_block) { Proc.new { callback_spy.call } }
it_behaves_like 'a before_trace callback'
end

context 'that raises an error' do
let(:callback_block) { Proc.new { callback_spy.call; raise ArgumentError.new('Fail!') } }
it_behaves_like 'a before_trace callback'
end
end
end

describe '#after_trace' do
context 'given a block' do
let(:callback_block) { Proc.new { callback_spy.call } }
let(:callback_spy) { double('callback spy') }
before(:each) { subscription.after_trace(&callback_block) }

shared_examples_for 'an after_trace callback' do
context 'on #finish' do
let(:span) { instance_double(Datadog::Span) }

it do
expect(tracer).to receive(:active_span).and_return(span).ordered
expect(spy).to receive(:call).ordered
expect(span).to receive(:finish).ordered
expect(callback_spy).to receive(:call).ordered
subscription.finish(double('name'), double('id'), double('payload'))
end
end
end

context 'that doesn\'t raise an error' do
let(:callback_block) { Proc.new { callback_spy.call } }
it_behaves_like 'an after_trace callback'
end

context 'that raises an error' do
let(:callback_block) { Proc.new { callback_spy.call; raise ArgumentError.new('Fail!') } }
it_behaves_like 'an after_trace callback'
end
end
end

describe '#subscribe' do
subject(:result) { subscription.subscribe(pattern) }
let(:pattern) { double('pattern') }
Expand Down

0 comments on commit 9dfc349

Please sign in to comment.