Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create propagate span event and publish it in to_digest #4193

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions benchmarks/tracing_trace.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ def trace(x, depth)
end
end

def benchmark_to_digest
def benchmark_propagate!
Datadog::Tracing.trace('op.name') do |span, trace|
Benchmark.ips do |x|
x.config(**benchmark_time)

x.report("trace.to_digest") do
trace.to_digest
x.report("trace.propagate!") do
trace.propagate!
end

x.save! "#{File.basename(__FILE__)}-results.json" unless VALIDATE_BENCHMARK_MODE
Expand All @@ -108,13 +108,13 @@ def benchmark_log_correlation
end
end

def benchmark_to_digest_continue
def benchmark_propagate_continue
Datadog::Tracing.trace('op.name') do |span, trace|
Benchmark.ips do |x|
x.config(**benchmark_time)

x.report("trace.to_digest - Continue") do
digest = trace.to_digest
x.report("trace.propagate! - Continue") do
digest = trace.propagate!
Datadog::Tracing.continue_trace!(digest)
end

Expand All @@ -136,7 +136,7 @@ def benchmark_propagation_datadog
end

Datadog::Tracing.trace('op.name') do |span, trace|
injected_trace_digest = trace.to_digest
injected_trace_digest = trace.propagate!
Benchmark.ips do |x|
x.config(**benchmark_time)

Expand All @@ -159,7 +159,7 @@ def benchmark_propagation_trace_context
end

Datadog::Tracing.trace('op.name') do |span, trace|
injected_trace_digest = trace.to_digest
injected_trace_digest = trace.propagate!
Benchmark.ips do |x|
x.config(**benchmark_time)

Expand Down Expand Up @@ -190,9 +190,9 @@ def run_benchmark(&block)
TracingTraceBenchmark.new.instance_exec do
run_benchmark { benchmark_no_writer }
run_benchmark { benchmark_no_network }
run_benchmark { benchmark_to_digest }
run_benchmark { benchmark_propagate! }
run_benchmark { benchmark_log_correlation }
run_benchmark { benchmark_to_digest_continue }
run_benchmark { benchmark_propagate_continue }
run_benchmark { benchmark_propagation_datadog }
run_benchmark { benchmark_propagation_trace_context }
end
2 changes: 1 addition & 1 deletion docs/GettingStarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -2429,7 +2429,7 @@ On the client:
```ruby
Datadog::Tracing.trace('web.call') do |span, trace|
# Inject trace headers into request headers (`env` must be a Hash)
Datadog::Tracing::Contrib::HTTP.inject(trace.to_digest, env)
Datadog::Tracing::Contrib::HTTP.inject(trace.propagate!, env)
end
```

Expand Down
5 changes: 3 additions & 2 deletions lib/datadog/opentelemetry/trace.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def start_trace_copy(trace, parent_span: nil)
digest = if parent_span
digest_with_parent_span(trace, parent_span)
else
trace.to_digest
trace.to_digest_no_propagation
end

# Create a new TraceOperation, attached to the current Datadog Tracer.
Expand All @@ -30,7 +30,8 @@ def start_trace_copy(trace, parent_span: nil)
# This supports the implementation of `OpenTelemetry::Trace.context_with_span`,
# which allows you to specific any span as the arbitrary parent of a new span.
def digest_with_parent_span(trace, parent_span)
digest = trace.to_digest
# sampling_priority added here right upon otel span creation
digest = trace.to_digest_no_propagation

Tracing::TraceDigest.new(
span_id: parent_span.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def initialize(composited_executor)
# post method runs the task within composited executor - in a different thread. The original arguments are
# captured to be propagated to the composited executor post method
def post(*args, &task)
digest = Tracing.active_trace && Tracing.active_trace.to_digest
digest = Tracing.active_trace && Tracing.active_trace.propagate!
executor = @composited_executor.is_a?(Symbol) ? Concurrent.executor(@composited_executor) : @composited_executor

# Pass the original arguments to the composited executor, which
Expand Down
2 changes: 1 addition & 1 deletion lib/datadog/tracing/contrib/ethon/multi_patch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def datadog_multi_span
Ext::SPAN_MULTI_REQUEST,
service: datadog_configuration[:service_name]
)
@datadog_multi_trace_digest = Tracing.active_trace.to_digest
@datadog_multi_trace_digest = Tracing.active_trace.propagate!

@datadog_multi_span.set_tag(Tracing::Metadata::Ext::TAG_COMPONENT, Ext::TAG_COMPONENT)
@datadog_multi_span.set_tag(Tracing::Metadata::Ext::TAG_OPERATION, Ext::TAG_OPERATION_MULTI_REQUEST)
Expand Down
2 changes: 1 addition & 1 deletion lib/datadog/tracing/contrib/propagation/sql_comment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def self.prepend_comment(sql, span_op, trace_op, mode)
# When tracing is disabled, trace_operation is a dummy object that does not contain data to build traceparent
if config.tracing.enabled
tags[Ext::KEY_TRACEPARENT] =
Tracing::Distributed::TraceContext.new(fetcher: nil).send(:build_traceparent, trace_op.to_digest)
Tracing::Distributed::TraceContext.new(fetcher: nil).send(:build_traceparent, trace_op.propagate!)
else
Datadog.logger.warn(
'Sql comment propagation with `full` mode is aborted, because tracing is disabled. '\
Expand Down
2 changes: 1 addition & 1 deletion lib/datadog/tracing/distributed/propagation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def inject!(digest, data)
return nil
end

digest = digest.to_digest if digest.respond_to?(:to_digest)
digest = digest.propagate! if digest.respond_to?(:propagate!)

if digest.trace_id.nil?
::Datadog.logger.debug('Cannot inject distributed trace data: digest.trace_id is nil.')
Expand Down
50 changes: 45 additions & 5 deletions lib/datadog/tracing/trace_operation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -301,17 +301,48 @@ def flush!
build_trace(spans, !finished)
end

# DEV-3.0: Sampling is a side effect of generating the digest in to_digest. With 3.0 we should remove the side effect
# and push users to use propagate! instead of to_digest. With that we can remove this method.
def to_digest_no_propagation
span_id = @active_span && @active_span.id
span_id ||= @parent_span_id unless finished?
TraceDigest.new(
span_id: span_id,
span_name: (@active_span && @active_span.name),
span_resource: (@active_span && @active_span.resource),
span_service: (@active_span && @active_span.service),
span_type: (@active_span && @active_span.type),
trace_distributed_tags: distributed_tags,
trace_hostname: @hostname,
trace_id: @id,
trace_name: name,
trace_origin: @origin,
trace_process_id: Core::Environment::Identity.pid,
trace_resource: resource,
trace_runtime_id: Core::Environment::Identity.id,
trace_sampling_priority: @sampling_priority,
trace_service: service,
trace_state: @trace_state,
trace_state_unknown_fields: @trace_state_unknown_fields,
span_remote: (@remote_parent && @active_span.nil?),
).freeze
end

def propagate!
to_digest
end

# Returns a set of trace headers used for continuing traces.
# Used for propagation across execution contexts.
# Data should reflect the active state of the trace.
# DEV-3.0: Sampling is a side effect of generating the digest.
# We should move the sample call to inject and right before moving to new contexts(threads, forking etc.)
# DEV-3.0: Sampling is a side effect of generating the digest in to_digest.
# With 3.0 we should remove the side effect and push users to use propagate! instead of to_digest.
def to_digest
# Resolve current span ID
span_id = @active_span && @active_span.id
span_id ||= @parent_span_id unless finished?
# sample the trace_operation with the tracer
@tracer&.sample_trace(self) unless sampling_priority
events.trace_propagated.publish(self)

TraceDigest.new(
span_id: span_id,
Expand Down Expand Up @@ -380,12 +411,14 @@ class Events
attr_reader \
:span_before_start,
:span_finished,
:trace_finished
:trace_finished,
:trace_propagated

def initialize
@span_before_start = SpanBeforeStart.new
@span_finished = SpanFinished.new
@trace_finished = TraceFinished.new
@trace_propagated = TracePropagated.new
end

# Triggered before a span starts.
Expand All @@ -402,6 +435,13 @@ def initialize
end
end

# Triggered when trace is being propagated between applications or contexts
class TracePropagated < Tracing::Event
def initialize
super(:trace_propagated)
end
end

# Triggered when the trace finishes, regardless of error.
class TraceFinished < Tracing::Event
def initialize
Expand Down Expand Up @@ -514,7 +554,7 @@ def build_trace(spans, partial = false)
end

# Returns tracer tags that will be propagated if this span's context
# is exported through {.to_digest}.
# is exported through {.propagate!}.
# @return [Hash] key value pairs of distributed tags
def distributed_tags
meta.select { |name, _| name.start_with?(Metadata::Ext::Distributed::TAGS_PREFIX) }
Expand Down
8 changes: 6 additions & 2 deletions lib/datadog/tracing/tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def continue_trace!(digest, key = nil, &block)
# Sample a span, tagging the trace as appropriate.
def sample_trace(trace_op)
begin
@sampler.sample!(trace_op)
@sampler.sample!(trace_op) if trace_op.sampling_priority.nil?
rescue StandardError => e
SAMPLE_TRACE_LOG_ONLY_ONCE.run do
Datadog.logger.warn { "Failed to sample trace: #{e.class.name} #{e} at #{Array(e.backtrace).first}" }
Expand Down Expand Up @@ -362,7 +362,12 @@ def bind_trace_events!(trace_op)
event_span_op.service ||= @default_service
end

events.trace_propagated.subscribe do |event_trace_op|
sample_trace(event_trace_op)
end

events.span_finished.subscribe do |event_span, event_trace_op|
sample_trace(trace_op)
sample_span(event_trace_op, event_span)
flush_trace(event_trace_op)
end
Expand Down Expand Up @@ -498,7 +503,6 @@ def sample_span(trace_op, span)

# Flush finished spans from the trace buffer, send them to writer.
def flush_trace(trace_op)
sample_trace(trace_op) unless trace_op.sampling_priority
begin
trace = @trace_flush.consume!(trace_op)
write(trace) if trace && !trace.empty?
Expand Down
2 changes: 2 additions & 0 deletions sig/datadog/tracing/trace_operation.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ module Datadog
def build_span: (untyped op_name, ?events: untyped?, ?on_error: untyped?, ?resource: untyped?, ?service: untyped?, ?start_time: untyped?, ?tags: untyped?, ?type: untyped?) -> untyped
def flush!: () { (untyped) -> untyped } -> untyped
def to_digest: () -> untyped
def propagate!: () -> untyped
def to_digest_no_propagation: () -> untyped
def fork_clone: () -> untyped

class Events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
end
let(:trace_op) do
double(
to_digest: Datadog::Tracing::TraceDigest.new(
propagate!: Datadog::Tracing::TraceDigest.new(
trace_id: 0xC0FFEE,
span_id: 0xBEE,
trace_flags: 0xFE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
expect(Datadog::Tracing::Contrib::Propagation::SqlComment).to have_received(:prepend_comment).with(
sql_statement,
a_span_operation_with(service: service_name),
duck_type(:to_digest),
duck_type(:propagate!),
propagation_mode
)
end
Expand All @@ -90,7 +90,7 @@
expect(Datadog::Tracing::Contrib::Propagation::SqlComment).to have_received(:prepend_comment).with(
sql_statement,
a_span_operation_with(service: service_name),
duck_type(:to_digest),
duck_type(:propagate!),
propagation_mode
)
end
Expand Down
37 changes: 19 additions & 18 deletions spec/datadog/tracing/trace_operation_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,18 @@
end

context 'given' do
context ':trace_operation_samples' do
let(:tracer) { instance_double(Datadog::Tracing::Tracer) }
let(:trace_op) { described_class.new(tracer: tracer) }

describe '#to_digest' do
before do
allow(tracer).to receive(:sample_trace)
end

it 'calls tracer.sample_trace' do
expect(tracer).to receive(:sample_trace).with(trace_op)
trace_op.to_digest
context ':trace_operation_triggers_trace_propagated_event' do
let(:events) { instance_double('Events') }
let(:trace_propagated) { instance_double('TracePropagated') }
let(:trace_op) { described_class.new(events: events) }
before do
allow(events).to receive(:trace_propagated).and_return(trace_propagated)
allow(trace_propagated).to receive(:publish)
end
describe '#propagate!' do
it 'calls events.trace_propagated.publish with self' do
expect(trace_propagated).to receive(:publish).with(trace_op)
trace_op.propagate!
end
end
end
Expand Down Expand Up @@ -1868,9 +1868,9 @@ def span
end
end

describe '#to_digest' do
subject(:to_digest) { trace_op.to_digest }
let(:digest) { to_digest }
describe '#propagate!' do
subject(:propagate!) { trace_op.propagate! }
let(:digest) { propagate! }

context 'when the trace' do
context 'is empty' do
Expand Down Expand Up @@ -1955,7 +1955,7 @@ def span
) do |parent, trace|
@parent = parent
trace.set_tag('_dd.p.test', 'value')
to_digest
propagate!
end
end
end
Expand Down Expand Up @@ -2326,7 +2326,8 @@ def span
[
:span_before_start,
:span_finished,
:trace_finished
:trace_finished,
:trace_propagated,
].each do |event|
expect(new_events.send(event).subscriptions).to eq(old_events.send(event).subscriptions)
end
Expand Down Expand Up @@ -2688,7 +2689,7 @@ def span

workers = nil
trace.measure('start_inserts', resource: 'inventory', service: 'job-worker') do
trace_digest = trace.to_digest
trace_digest = trace.propagate!

workers = Array.new(5) do |index|
Thread.new do
Expand Down
4 changes: 2 additions & 2 deletions spec/datadog/tracing/tracer_integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def lang_tag(span)

workers = nil
tracer.trace('start_inserts', resource: 'inventory') do
trace_digest = trace.to_digest
trace_digest = trace.propagate!

workers = Array.new(5) do |index|
Thread.new do
Expand Down Expand Up @@ -305,7 +305,7 @@ def lang_tag(span)
describe 'distributed trace' do
let(:extract) { Datadog::Tracing::Contrib::HTTP.extract(rack_headers) }
let(:trace) { Datadog::Tracing.continue_trace!(extract) }
let(:inject) { {}.tap { |env| Datadog::Tracing::Contrib::HTTP.inject(trace.to_digest, env) } }
let(:inject) { {}.tap { |env| Datadog::Tracing::Contrib::HTTP.inject(trace.propagate!, env) } }

let(:rack_headers) { headers.map { |k, v| [RackSupport.header_to_rack(k), v] }.to_h }

Expand Down
2 changes: 1 addition & 1 deletion spec/datadog/tracing/tracer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@
trace_state_unknown_fields: 'any;field',
)
expect(digest.span_remote).to be true
expect(trace.to_digest.span_remote).to be false
expect(trace.propagate!.span_remote).to be false

expect(trace.send(:distributed_tags)).to eq('_dd.p.test' => 'value')

Expand Down
Loading