Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
marcotc committed Sep 18, 2024
1 parent 70054de commit 72d8d17
Show file tree
Hide file tree
Showing 11 changed files with 347 additions and 132 deletions.
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,4 @@ end
# TODO: Remove this once the issue is resolved: https://github.com/ffi/ffi/issues/1107
gem 'ffi', '~> 1.16.3', require: false

gem 'aws-sdk'
# gem 'aws-sdk'
35 changes: 28 additions & 7 deletions lib/datadog/tracing/contrib/aws/instrumentation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,45 @@ def add_handlers(handlers, _)

# Generates Spans for all interactions with AWS
class Handler < Seahorse::Client::Handler
# Some services contain trace propagation information (e.g. SQS) that affect what active trace
# we'll use for the AWS span.
# But because this information is only available after the request is made, we need to make the AWS
# request first, then create the trace and span with correct distributed trace parenting.
def call(context)
Tracing.trace(Ext::SPAN_COMMAND) do |span, trace|
@handler.call(context).tap do
annotate!(span, trace, ParsedContext.new(context))
end
config = configuration

# Find the AWS service instrumentation
parsed_context = ParsedContext.new(context)
aws_service = parsed_context.safely(:resource).split('.')[0]
handler = Datadog::Tracing::Contrib::Aws::SERVICE_HANDLERS[aws_service]

# Execute handler stack, to ensure we have the response object before the trace and span are created
start_time = Core::Utils::Time.now.utc # Save the start time as the span creation is delayed
begin
response = @handler.call(context)
rescue Exception => e
# Catch exception to reraise it inside the trace block, to ensure the span has correct error information
end

Tracing.trace(Ext::SPAN_COMMAND, start_time: start_time) do |span, trace|
handler.before_span(config, context, response) if handler

annotate!(config, span, trace, parsed_context, aws_service)

raise e if e
end

response
end

private

# rubocop:disable Metrics/AbcSize
def annotate!(span, trace, context)
config = configuration
def annotate!(config, span, trace, context, aws_service)
span.service = config[:service_name]
span.type = Tracing::Metadata::Ext::HTTP::TYPE_OUTBOUND
span.name = Ext::SPAN_COMMAND
span.resource = context.safely(:resource)
aws_service = span.resource.split('.')[0]
span.set_tag(Ext::TAG_AWS_SERVICE, aws_service)
params = context.safely(:params)
if (handler = Datadog::Tracing::Contrib::Aws::SERVICE_HANDLERS[aws_service])
Expand Down
14 changes: 11 additions & 3 deletions lib/datadog/tracing/contrib/aws/service/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,25 @@ module Aws
module Service
# Base class for all AWS service-specific tag handlers.
class Base
def before_span(config, context, response); end
def process(config, trace, context); end
def add_tags(span, params); end

MESSAGE_ATTRIBUTES_LIMIT = 10 # Can't set more than 10 message attributes

def extract_propagation!(context)
message_attributes = context.params[:message_attributes]
# Extract the `_datadog` message attribute and decode its JSON content.
def extract_propagation!(response, data_type)
messages = response.data.messages

return unless (message = messages[0])

message_attributes = message.message_attributes

return unless message_attributes && (datadog = message_attributes['_datadog'])

Tracing.continue_trace!(Contrib.extract(datadog))
if ((data = datadog[data_type]) && (parsed_data = JSON.parse(data)))
Tracing.continue_trace!(Contrib.extract(parsed_data))
end
end

def inject_propagation(trace, params, data_type)
Expand Down
9 changes: 4 additions & 5 deletions lib/datadog/tracing/contrib/aws/service/sns.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ module Aws
module Service
# SNS tag handlers.
class SNS < Base
PROPAGATION_DATATYPE = 'Binary'

def process(config, trace, context)
return unless config[:propagation]

case context.operation
when :publish
inject_propagation(trace, context.params, 'Binary')
when :publish_batch
context.params[:publish_batch_request_entries].each do |entry|
inject_propagation(trace, entry, 'Binary')
end
inject_propagation(trace, context.params, PROPAGATION_DATATYPE)
# TODO: when :publish_batch # Future support for batch publishing
end
end

Expand Down
23 changes: 8 additions & 15 deletions lib/datadog/tracing/contrib/aws/service/sqs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,21 @@ module Aws
module Service
# SQS tag handlers.
class SQS < Base
def before_span(config, context)
# DEV: Because we only support tracing propagation today, having separate `propagation and `propagation_style`
# options seems redundant. But when the DSM propagation is introduced, it's possible for `propagation` to be
# enable and `propagation_style` to disable, while DSM propagation is still enabled, as its data is not
# directly related to tracing parentage.
if config[:propagation] && config[:parentage_style] == 'distributed' && context.operation == :receive_message
extract_propagation!(context)
end
DATATYPE = 'String'
def before_span(config, context, response)
return unless context.operation == :receive_message && config[:propagation]

# Parent the current trace based on distributed message attributes
extract_propagation!(response, 'string_value') if config[:parentage_style] == 'distributed'
end

def process(config, trace, context)
return unless config[:propagation]

case context.operation
when :send_message
inject_propagation(trace, context, 'String')
when :send_message_batch
if config[:batch_propagation]
inject_propagation(trace, context, 'String')
else
inject_propagation(trace, context, 'String')
end
inject_propagation(trace, context.params, 'String')
# TODO when :send_message_batch # Future support for batch sending
end
end

Expand Down
52 changes: 52 additions & 0 deletions lib/datadog/tracing/trace_digest.rb
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,58 @@ def merge(field_value_pairs)
}.merge!(field_value_pairs)
)
end

def ==(other)
self.class == other.class &&
span_id == other.span_id &&
span_name == other.span_name &&
span_resource == other.span_resource &&
span_service == other.span_service &&
span_type == other.span_type &&
trace_distributed_tags == other.trace_distributed_tags &&
trace_hostname == other.trace_hostname &&
trace_id == other.trace_id &&
trace_name == other.trace_name &&
trace_origin == other.trace_origin &&
trace_process_id == other.trace_process_id &&
trace_resource == other.trace_resource &&
trace_runtime_id == other.trace_runtime_id &&
trace_sampling_priority == other.trace_sampling_priority &&
trace_service == other.trace_service &&
trace_distributed_id == other.trace_distributed_id &&
trace_flags == other.trace_flags &&
trace_state == other.trace_state &&
trace_state_unknown_fields == other.trace_state_unknown_fields &&
span_remote == other.span_remote
end

alias eql? ==

def hash
[
self.class,
span_id,
span_name,
span_resource,
span_service,
span_type,
trace_distributed_tags,
trace_hostname,
trace_id,
trace_name,
trace_origin,
trace_process_id,
trace_resource,
trace_runtime_id,
trace_sampling_priority,
trace_service,
trace_distributed_id,
trace_flags,
trace_state,
trace_state_unknown_fields,
span_remote
].hash
end
end
end
end
12 changes: 10 additions & 2 deletions spec/datadog/tracing/contrib/aws/instrumentation_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,16 @@

let(:responses) do
{ receive_message: {
messages: []
} }
messages: [
message_attributes: {
'_datadog' => {
string_value: 'String',
data_type: 'String'
}
}
]
}
}
end

it 'generates a span' do
Expand Down
165 changes: 165 additions & 0 deletions spec/datadog/tracing/contrib/aws/service/shared_examples.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
require 'datadog/tracing/contrib/aws/parsed_context'

require 'aws-sdk-sqs'

RSpec.shared_examples 'injects AWS attribute propagation' do
subject(:inject_propagation) { service.process(config, trace, context) }

let(:config) { { propagation: true } }
let(:trace) { Datadog::Tracing::TraceOperation.new(id: trace_id, parent_span_id: span_id) }
let(:context) { instance_double(Datadog::Tracing::Contrib::Aws::ParsedContext, params: params, operation: operation) }
let(:params) { {} }
let(:trace_id) { 1 }
let(:span_id) { 2 }

before { Datadog.configure { |c| c.tracing.instrument :aws } }

context 'without preexisting message attributes' do
it 'adds a propagation attribute' do
inject_propagation
expect(params[:message_attributes]).to eq(
'_datadog' => {
binary_value:
'{"x-datadog-trace-id":"1","x-datadog-parent-id":"2",' \
'"traceparent":"00-00000000000000000000000000000001-0000000000000002-00",' \
'"tracestate":"dd=p:0000000000000002"}',
data_type: data_type
}
)
end
end

context 'with existing message attributes' do
let(:params) { { message_attributes: message_attributes } }
let(:message_attributes) { { 'existing' => { data_type: 'Number', string_value: 1 } } }

it 'adds a propagation attribute' do
expect { inject_propagation }.to change { message_attributes.keys }.from(['existing']).to(['existing', '_datadog'])
end
end

context 'with 10 message attributes already set' do
let(:params) { { message_attributes: message_attributes } }
let(:message_attributes) do
Array.new(10) do |i|
["attr#{i}", { data_type: 'Number', string_value: i }]
end.to_h
end

it 'does not add a propagation attribute' do
expect { inject_propagation }.to_not(change { params })
end
end

context 'disabled' do
let(:config) { { propagation: false } }

it 'does not add a propagation attribute' do
expect { inject_propagation }.to_not(change { params })
end
end
end

RSpec.shared_examples 'extract AWS attribute propagation' do
subject(:extract_propagation) { service.before_span(config, context, response) }

let(:config) { { propagation: true, parentage_style: parentage_style } }
let(:parentage_style) { 'distributed' }
let(:trace) do
Datadog::Tracing::TraceOperation.new(
id: 1,
parent_span_id: 2,
remote_parent: true,
trace_state: 'unrelated=state',
sampling_priority: 0
)
end
let(:context) { instance_double(Datadog::Tracing::Contrib::Aws::ParsedContext, operation: operation) }
let(:response) do
result = Aws::SQS::Types::ReceiveMessageResult.new(messages: messages)
Seahorse::Client::Response.new(data: result)
end
let(:messages) { [] }

before { Datadog.configure { |c| c.tracing.instrument :aws } }

context 'without message attributes' do
context 'without an active trace' do
it 'does not create trace' do
extract_propagation
expect(Datadog::Tracing.active_trace).to be_nil
end
end

context 'with an active trace' do
before { Datadog::Tracing.continue_trace!(trace.to_digest) }

it 'does not change active trace' do
extract_propagation
expect(Datadog::Tracing.active_trace.to_digest).to eq(trace.to_digest)
end
end
end

context 'with message attributes' do
let(:messages) { [message] }
let(:message) { Aws::SQS::Types::Message.new(message_attributes: message_attributes) }
let(:message_attributes) { { '_datadog' => attribute } }
let(:attribute) { Aws::SQS::Types::MessageAttributeValue.new(
string_value:
'{"traceparent":"00-00000000000000000000000000000001-0000000000000002-00",' \
'"tracestate":"dd=p:0000000000000002,unrelated=state"}',
data_type: data_type
) }

context 'without an active trace' do
it 'creates trace' do
extract_propagation
expect(Datadog::Tracing.active_trace.to_digest).to eq(trace.to_digest)
end
end

context 'with an active trace' do
it 'overrides the existing trace' do
existing_trace = Datadog::Tracing.continue_trace!(nil)
expect { extract_propagation }.to(
change { Datadog::Tracing.active_trace.to_digest }.from(existing_trace.to_digest).to(trace.to_digest)
)
end
end

context 'with a local parentage style' do
let(:parentage_style) { 'local' }

it 'does not create a remote trace' do
extract_propagation
expect(Datadog::Tracing.active_trace).to be_nil
end
end

context 'with multiple messages' do
let(:messages) { [message, other_message] }
let(:other_message) { Aws::SQS::Types::Message.new(message_attributes: other_message_attributes) }
let(:other_message_attributes) { { '_datadog' => other_attribute } }
let(:other_attribute) { Aws::SQS::Types::MessageAttributeValue.new(
string_value:
'{"traceparent":"00-00000000000000000000000000000008-0000000000000009-00",' \
'"tracestate":"dd=p:0000000000000009,oops=not-this-one"}',
data_type: data_type
) }

it 'extracts the first message attributes' do
extract_propagation
expect(Datadog::Tracing.active_trace.to_digest).to eq(trace.to_digest)
end
end
end

context 'disabled' do
let(:config) { { propagation: false } }

it 'does not add a propagation attribute' do
expect { extract_propagation }.to_not(change { params })
end
end
end
Loading

0 comments on commit 72d8d17

Please sign in to comment.