From 8f0bc5dc95614f321e0806196280e2ea6c8e86e7 Mon Sep 17 00:00:00 2001 From: Marco Costa Date: Thu, 19 Sep 2024 14:16:05 -0700 Subject: [PATCH] Add SNS/SQS trace propagation --- docs/GettingStarted.md | 4 +- lib/datadog/tracing.rb | 1 + .../contrib/aws/configuration/settings.rb | 23 +++ lib/datadog/tracing/contrib/aws/ext.rb | 3 + .../tracing/contrib/aws/instrumentation.rb | 48 +++-- .../tracing/contrib/aws/service/base.rb | 31 ++++ .../tracing/contrib/aws/service/sns.rb | 12 ++ .../tracing/contrib/aws/service/sqs.rb | 18 ++ lib/datadog/tracing/distributed.rb | 59 ++++++ lib/datadog/tracing/trace_digest.rb | 54 ++++++ sig/datadog/tracing/contrib/aws/ext.rbs | 2 + .../tracing/contrib/aws/service/base.rbs | 2 + sig/datadog/tracing/contrib/component.rbs | 7 + sig/datadog/tracing/distributed.rbs | 12 ++ .../tracing/distributed/propagation.rbs | 13 +- .../contrib/aws/instrumentation_spec.rb | 104 ++++++++++- .../contrib/aws/service/shared_examples.rb | 169 ++++++++++++++++++ .../tracing/contrib/aws/service/sns_spec.rb | 7 + .../tracing/contrib/aws/service/sqs_spec.rb | 13 ++ spec/datadog/tracing/distributed_spec.rb | 33 ++++ spec/datadog/tracing/trace_digest_spec.rb | 51 ++++++ 21 files changed, 650 insertions(+), 16 deletions(-) create mode 100644 lib/datadog/tracing/distributed.rb create mode 100644 sig/datadog/tracing/distributed.rbs create mode 100644 spec/datadog/tracing/contrib/aws/service/shared_examples.rb create mode 100644 spec/datadog/tracing/distributed_spec.rb diff --git a/docs/GettingStarted.md b/docs/GettingStarted.md index 8a1196fa138..2f03e47313d 100644 --- a/docs/GettingStarted.md +++ b/docs/GettingStarted.md @@ -548,7 +548,7 @@ require 'aws-sdk' require 'datadog' Datadog.configure do |c| - c.tracing.instrument :aws, **options + c.tracing.instrument :aws, propagation: true, **options end # Perform traced call @@ -562,6 +562,8 @@ Aws::S3::Client.new.list_buckets | `enabled` | `DD_TRACE_AWS_ENABLED` | `Bool` | Whether the integration should create spans. | `true` | | `service_name` | `DD_TRACE_AWS_SERVICE_NAME` | `String` | Name of application running the `aws` instrumentation. May be overridden by `global_default_service_name`. [See _Additional Configuration_ for more details](#additional-configuration) | `aws` | | `peer_service` | `DD_TRACE_AWS_PEER_SERVICE` | `String` | Name of external service the application connects to | `nil` | +| `propagation` | `DD_TRACE_AWS_PROPAGATION_ENABLED` | `Bool` | Enables distributed trace propagation for SNS and SQS messages. | `false` | +| `parentage_style` | `DD_TRACE_AWS_TRACE_PARENTAGE_STYLE` | `String` | Controls whether the local trace is parented to the SQS message consumed. Possible values are: `local`, `distributed`. This option is always disable (the equivalent to `local`) if `propagation` is disabled. | `propagation` | ### Concurrent Ruby diff --git a/lib/datadog/tracing.rb b/lib/datadog/tracing.rb index af3877ce717..24586ccd058 100644 --- a/lib/datadog/tracing.rb +++ b/lib/datadog/tracing.rb @@ -2,6 +2,7 @@ require_relative 'core' require_relative 'tracing/pipeline' +require_relative 'tracing/distributed' module Datadog # Datadog APM tracing public API. diff --git a/lib/datadog/tracing/contrib/aws/configuration/settings.rb b/lib/datadog/tracing/contrib/aws/configuration/settings.rb index c6c64bf5d9c..846cbc23c2e 100644 --- a/lib/datadog/tracing/contrib/aws/configuration/settings.rb +++ b/lib/datadog/tracing/contrib/aws/configuration/settings.rb @@ -45,6 +45,29 @@ class Settings < Contrib::Configuration::Settings o.type :string, nilable: true o.env Ext::ENV_PEER_SERVICE end + + # Enables distributed trace propagation for SNS and SQS messages. + # @default `DD_TRACE_AWS_PROPAGATION_ENABLED` environment variable, otherwise `false` + # @return [Boolean] + option :propagation do |o| + o.type :bool + o.env Ext::ENV_PROPAGATION_ENABLED + o.default false + end + + # Controls whether the local trace is parented to the SQS message consumed. + # Possible values are: + # `local`: The local active trace is used; SNS has no effect on trace parentage. + # `distributed`: The local active trace becomes a child of the propagation context from the SQS message. + # + # This option is always disable (the equivalent to`local`) if `propagation` is disabled. + # @default `DD_TRACE_AWS_TRACE_PARENTAGE_STYLE` environment variable, otherwise `local` + # @return [String] + option :parentage_style do |o| + o.type :string + o.env Ext::ENV_TRACE_PARENTAGE_STYLE + o.default 'distributed' + end end end end diff --git a/lib/datadog/tracing/contrib/aws/ext.rb b/lib/datadog/tracing/contrib/aws/ext.rb index cefb9bca435..b9c6941edc3 100644 --- a/lib/datadog/tracing/contrib/aws/ext.rb +++ b/lib/datadog/tracing/contrib/aws/ext.rb @@ -13,6 +13,9 @@ module Ext # @!visibility private ENV_ANALYTICS_ENABLED = 'DD_TRACE_AWS_ANALYTICS_ENABLED' ENV_ANALYTICS_SAMPLE_RATE = 'DD_TRACE_AWS_ANALYTICS_SAMPLE_RATE' + ENV_PROPAGATION_ENABLED = 'DD_TRACE_AWS_PROPAGATION_ENABLED' + ENV_TRACE_PARENTAGE_STYLE = 'DD_TRACE_AWS_TRACE_PARENTAGE_STYLE' + DEFAULT_PEER_SERVICE_NAME = 'aws' SPAN_COMMAND = 'aws.command' TAG_AGENT = 'aws.agent' diff --git a/lib/datadog/tracing/contrib/aws/instrumentation.rb b/lib/datadog/tracing/contrib/aws/instrumentation.rb index 7194ef9ae01..41e3bf404bb 100644 --- a/lib/datadog/tracing/contrib/aws/instrumentation.rb +++ b/lib/datadog/tracing/contrib/aws/instrumentation.rb @@ -18,33 +18,56 @@ def add_handlers(handlers, _) # Generates Spans for all interactions with AWS class Handler < Seahorse::Client::Handler + # Some services contain trace propagation information (e.g. SQS) that affect what active trace + # we'll use for the AWS span. + # But because this information is only available after the request is made, we need to make the AWS + # request first, then create the trace and span with correct distributed trace parenting. def call(context) - Tracing.trace(Ext::SPAN_COMMAND) do |span| - @handler.call(context).tap do - annotate!(span, ParsedContext.new(context)) - end + config = configuration + + # Find the AWS service instrumentation + parsed_context = ParsedContext.new(context) + aws_service = parsed_context.safely(:resource).split('.')[0] + handler = Datadog::Tracing::Contrib::Aws::SERVICE_HANDLERS[aws_service] + + # Execute handler stack, to ensure we have the response object before the trace and span are created + start_time = Core::Utils::Time.now.utc # Save the start time as the span creation is delayed + begin + response = @handler.call(context) + rescue Exception => e # rubocop:disable Lint/RescueException + # Catch exception to reraise it inside the trace block, to ensure the span has correct error information + # This matches the behavior of {Datadog::Tracing::SpanOperation#measure} end + + Tracing.trace(Ext::SPAN_COMMAND, start_time: start_time) do |span, trace| + handler.before_span(config, context, response) if handler + + annotate!(config, span, trace, parsed_context, aws_service) + + raise e if e + end + + response end private - # rubocop:disable Metrics/AbcSize - def annotate!(span, context) - span.service = configuration[:service_name] + def annotate!(config, span, trace, context, aws_service) + span.service = config[:service_name] span.type = Tracing::Metadata::Ext::HTTP::TYPE_OUTBOUND span.name = Ext::SPAN_COMMAND span.resource = context.safely(:resource) - aws_service = span.resource.split('.')[0] span.set_tag(Ext::TAG_AWS_SERVICE, aws_service) params = context.safely(:params) if (handler = Datadog::Tracing::Contrib::Aws::SERVICE_HANDLERS[aws_service]) + handler.process(config, trace, context) handler.add_tags(span, params) end - if configuration[:peer_service] + if config[:peer_service] span.set_tag( Tracing::Metadata::Ext::TAG_PEER_SERVICE, - configuration[:peer_service] + config[:peer_service] ) end @@ -61,8 +84,8 @@ def annotate!(span, context) span.set_tag(Tracing::Metadata::Ext::TAG_PEER_HOSTNAME, context.safely(:host)) # Set analytics sample rate - if Contrib::Analytics.enabled?(configuration[:analytics_enabled]) - Contrib::Analytics.set_sample_rate(span, configuration[:analytics_sample_rate]) + if Contrib::Analytics.enabled?(config[:analytics_enabled]) + Contrib::Analytics.set_sample_rate(span, config[:analytics_sample_rate]) end Contrib::Analytics.set_measured(span) @@ -77,7 +100,6 @@ def annotate!(span, context) Contrib::SpanAttributeSchema.set_peer_service!(span, Ext::PEER_SERVICE_SOURCES) end - # rubocop:enable Metrics/AbcSize def configuration Datadog.configuration.tracing[:aws] diff --git a/lib/datadog/tracing/contrib/aws/service/base.rb b/lib/datadog/tracing/contrib/aws/service/base.rb index c6b496c1df7..0688b0c604f 100644 --- a/lib/datadog/tracing/contrib/aws/service/base.rb +++ b/lib/datadog/tracing/contrib/aws/service/base.rb @@ -7,7 +7,38 @@ module Aws module Service # Base class for all AWS service-specific tag handlers. class Base + def before_span(config, context, response); end + def process(config, trace, context); end def add_tags(span, params); end + + MESSAGE_ATTRIBUTES_LIMIT = 10 # Can't set more than 10 message attributes + + # Extract the `_datadog` message attribute and decode its JSON content. + def extract_propagation!(response, data_type) + messages = response.data.messages + + # DEV: Extract the context from the first message today. + # DEV: Use span links in the future to support multiple messages related to a single span. + return unless (message = messages[0]) + + message_attributes = message.message_attributes + + return unless message_attributes && (datadog = message_attributes['_datadog']) + + if (data = datadog[data_type]) && (parsed_data = JSON.parse(data)) + Tracing.continue_trace!(Distributed.extract(parsed_data)) + end + end + + def inject_propagation(trace, params, data_type) + message_attributes = (params[:message_attributes] ||= {}) + return if message_attributes.size >= MESSAGE_ATTRIBUTES_LIMIT + + data = {} + if Distributed.inject(trace.to_digest, data) + message_attributes['_datadog'] = { :data_type => data_type, :binary_value => data.to_json } + end + end end end end diff --git a/lib/datadog/tracing/contrib/aws/service/sns.rb b/lib/datadog/tracing/contrib/aws/service/sns.rb index 40cb0022344..0a74dccb77c 100644 --- a/lib/datadog/tracing/contrib/aws/service/sns.rb +++ b/lib/datadog/tracing/contrib/aws/service/sns.rb @@ -10,6 +10,18 @@ module Aws module Service # SNS tag handlers. class SNS < Base + PROPAGATION_DATATYPE = 'Binary' + + def process(config, trace, context) + return unless config[:propagation] + + case context.operation + when :publish + inject_propagation(trace, context.params, PROPAGATION_DATATYPE) + # TODO: when :publish_batch # Future support for batch publishing + end + end + def add_tags(span, params) topic_arn = params[:topic_arn] topic_name = params[:name] diff --git a/lib/datadog/tracing/contrib/aws/service/sqs.rb b/lib/datadog/tracing/contrib/aws/service/sqs.rb index 2ab6c0cb03f..0d0d43dc677 100644 --- a/lib/datadog/tracing/contrib/aws/service/sqs.rb +++ b/lib/datadog/tracing/contrib/aws/service/sqs.rb @@ -10,6 +10,24 @@ module Aws module Service # SQS tag handlers. class SQS < Base + DATATYPE = 'String' + def before_span(config, context, response) + return unless context.operation == :receive_message && config[:propagation] + + # Parent the current trace based on distributed message attributes + extract_propagation!(response, 'string_value') if config[:parentage_style] == 'distributed' + end + + def process(config, trace, context) + return unless config[:propagation] + + case context.operation + when :send_message + inject_propagation(trace, context.params, 'String') + # TODO: when :send_message_batch # Future support for batch sending + end + end + def add_tags(span, params) queue_url = params[:queue_url] queue_name = params[:queue_name] diff --git a/lib/datadog/tracing/distributed.rb b/lib/datadog/tracing/distributed.rb new file mode 100644 index 00000000000..c4f50d851e0 --- /dev/null +++ b/lib/datadog/tracing/distributed.rb @@ -0,0 +1,59 @@ +# frozen_string_literal: true + +require_relative 'distributed/b3_multi' +require_relative 'distributed/b3_single' +require_relative 'distributed/datadog' +require_relative 'distributed/none' +require_relative 'distributed/propagation' +require_relative 'distributed/trace_context' +require_relative 'contrib/component' + +module Datadog + module Tracing + # Namespace for distributed tracing propagation and correlation + module Distributed + module_function + + # Inject distributed headers into the given request + # @param digest [Datadog::Tracing::TraceDigest] the trace to inject + # @param data [Hash] the request to inject + def inject(digest, data) + raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation + + @propagation.inject!(digest, data) + end + + # Extract distributed headers from the given request + # @param data [Hash] the request to extract from + # @return [Datadog::Tracing::TraceDigest,nil] the extracted trace digest or nil if none was found + def extract(data) + raise 'Please invoke Datadog.configure at least once before calling this method' unless @propagation + + @propagation.extract(data) + end + + Contrib::Component.register('distributed') do |config| + tracing = config.tracing + # DEV: evaluate propagation_style in case it overrides propagation_style_extract & propagation_extract_first + tracing.propagation_style + + @propagation = Propagation.new( + propagation_styles: { + Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_MULTI_HEADER => + B3Multi.new(fetcher: Fetcher), + Configuration::Ext::Distributed::PROPAGATION_STYLE_B3_SINGLE_HEADER => + B3Single.new(fetcher: Fetcher), + Configuration::Ext::Distributed::PROPAGATION_STYLE_DATADOG => + Datadog.new(fetcher: Fetcher), + Configuration::Ext::Distributed::PROPAGATION_STYLE_TRACE_CONTEXT => + TraceContext.new(fetcher: Fetcher), + Configuration::Ext::Distributed::PROPAGATION_STYLE_NONE => None.new + }, + propagation_style_inject: tracing.propagation_style_inject, + propagation_style_extract: tracing.propagation_style_extract, + propagation_extract_first: tracing.propagation_extract_first + ) + end + end + end +end diff --git a/lib/datadog/tracing/trace_digest.rb b/lib/datadog/tracing/trace_digest.rb index 790013e56e6..b5463942e78 100644 --- a/lib/datadog/tracing/trace_digest.rb +++ b/lib/datadog/tracing/trace_digest.rb @@ -180,6 +180,60 @@ def merge(field_value_pairs) }.merge!(field_value_pairs) ) end + + # rubocop:disable Metrics/AbcSize,Metrics/PerceivedComplexity,Metrics/CyclomaticComplexity + def ==(other) + self.class == other.class && + span_id == other.span_id && + span_name == other.span_name && + span_resource == other.span_resource && + span_service == other.span_service && + span_type == other.span_type && + trace_distributed_tags == other.trace_distributed_tags && + trace_hostname == other.trace_hostname && + trace_id == other.trace_id && + trace_name == other.trace_name && + trace_origin == other.trace_origin && + trace_process_id == other.trace_process_id && + trace_resource == other.trace_resource && + trace_runtime_id == other.trace_runtime_id && + trace_sampling_priority == other.trace_sampling_priority && + trace_service == other.trace_service && + trace_distributed_id == other.trace_distributed_id && + trace_flags == other.trace_flags && + trace_state == other.trace_state && + trace_state_unknown_fields == other.trace_state_unknown_fields && + span_remote == other.span_remote + end + # rubocop:enable Metrics/AbcSize,Metrics/PerceivedComplexity,Metrics/CyclomaticComplexity + + alias eql? == + + def hash + [ + self.class, + span_id, + span_name, + span_resource, + span_service, + span_type, + trace_distributed_tags, + trace_hostname, + trace_id, + trace_name, + trace_origin, + trace_process_id, + trace_resource, + trace_runtime_id, + trace_sampling_priority, + trace_service, + trace_distributed_id, + trace_flags, + trace_state, + trace_state_unknown_fields, + span_remote + ].hash + end end end end diff --git a/sig/datadog/tracing/contrib/aws/ext.rbs b/sig/datadog/tracing/contrib/aws/ext.rbs index 8e47361fa56..67a4d05844b 100644 --- a/sig/datadog/tracing/contrib/aws/ext.rbs +++ b/sig/datadog/tracing/contrib/aws/ext.rbs @@ -6,6 +6,7 @@ module Datadog ENV_ENABLED: "DD_TRACE_AWS_ENABLED" ENV_PEER_SERVICE: "DD_TRACE_AWS_PEER_SERVICE" + ENV_PROPAGATION_ENABLED: string ENV_SERVICE_NAME: "DD_TRACE_AWS_SERVICE_NAME" ENV_ANALYTICS_ENABLED: "DD_TRACE_AWS_ANALYTICS_ENABLED" @@ -14,6 +15,7 @@ module Datadog DEFAULT_PEER_SERVICE_NAME: "aws" + ENV_TRACE_PARENTAGE_STYLE: string PEER_SERVICE_SOURCES: Array[String] SPAN_COMMAND: "aws.command" diff --git a/sig/datadog/tracing/contrib/aws/service/base.rbs b/sig/datadog/tracing/contrib/aws/service/base.rbs index e13f0f3a95e..7c5b5be164f 100644 --- a/sig/datadog/tracing/contrib/aws/service/base.rbs +++ b/sig/datadog/tracing/contrib/aws/service/base.rbs @@ -4,6 +4,8 @@ module Datadog module Aws module Service class Base + MESSAGE_ATTRIBUTES_LIMIT: int + def add_tags: (untyped span, untyped params) -> nil end end diff --git a/sig/datadog/tracing/contrib/component.rbs b/sig/datadog/tracing/contrib/component.rbs index 00e7d4103ff..b6623db7960 100644 --- a/sig/datadog/tracing/contrib/component.rbs +++ b/sig/datadog/tracing/contrib/component.rbs @@ -3,6 +3,13 @@ module Datadog module Contrib module Component @registry: Hash[String, Proc] + + def self.register: (string name) { (untyped) -> void } -> void + def self.configure: (Core::Configuration::Settings config) -> void + + private + + def self.unregister: (string name) -> void end end end diff --git a/sig/datadog/tracing/distributed.rbs b/sig/datadog/tracing/distributed.rbs new file mode 100644 index 00000000000..be463e9d12d --- /dev/null +++ b/sig/datadog/tracing/distributed.rbs @@ -0,0 +1,12 @@ +module Datadog + module Tracing + module Distributed + @propagation: Propagation + + def inject: (TraceDigest digest, ::Hash[untyped, untyped] data) -> (bool | nil) + + def extract: (::Hash[untyped, untyped] data) -> (TraceDigest | nil) + end + end +end + diff --git a/sig/datadog/tracing/distributed/propagation.rbs b/sig/datadog/tracing/distributed/propagation.rbs index 47ad8507c72..c1a68ae962a 100644 --- a/sig/datadog/tracing/distributed/propagation.rbs +++ b/sig/datadog/tracing/distributed/propagation.rbs @@ -2,7 +2,18 @@ module Datadog module Tracing module Distributed class Propagation - def initialize: (propagation_styles: untyped) -> void + @propagation_styles: Hash[String,untyped] + @propagation_extract_first: bool + @propagation_style_inject: Array[untyped] + @propagation_style_extract: Array[untyped] + + def initialize: ( + propagation_styles: Hash[String,untyped], + propagation_style_inject: Array[String], + propagation_style_extract: Array[String], + propagation_extract_first: bool, + ) -> void + def inject!: (untyped digest, untyped data) -> (nil | untyped) def extract: (untyped data) -> untyped end diff --git a/spec/datadog/tracing/contrib/aws/instrumentation_spec.rb b/spec/datadog/tracing/contrib/aws/instrumentation_spec.rb index 88e6c7cbee0..a5283129f0c 100644 --- a/spec/datadog/tracing/contrib/aws/instrumentation_spec.rb +++ b/spec/datadog/tracing/contrib/aws/instrumentation_spec.rb @@ -255,6 +255,7 @@ end describe '#send_message_batch' do + # TODO: SHOULD TAG ALL MESSAGES subject!(:send_message_batch) do client.send_message_batch( { @@ -379,6 +380,44 @@ .to eq('sqs.us-stubbed-1.amazonaws.com') end end + + describe '#receive_message' do + subject!(:receive_message) do + client.receive_message( + { + queue_url: 'https://sqs.us-stubbed-1.amazonaws.com/123456789012/MyQueueName', + attribute_names: ['All'], + max_number_of_messages: 1, + visibility_timeout: 1, + wait_time_seconds: 1, + receive_request_attempt_id: 'my_receive_request_attempt_1', + } + ) + end + + let(:responses) do + { receive_message: { + messages: [ + message_attributes: { + '_datadog' => { + string_value: 'String', + data_type: 'String' + } + } + ] + } } + end + + it 'generates a span' do + expect(span.name).to eq('aws.command') + expect(span.service).to eq('aws') + expect(span.type).to eq('http') + expect(span.resource).to eq('sqs.receive_message') + + expect(span.get_tag('aws.agent')).to eq('aws-sdk-ruby') + expect(span.get_tag('aws.operation')).to eq('receive_message') + end + end end context 'with an SNS client' do @@ -389,7 +428,14 @@ client.publish( { topic_arn: 'arn:aws:sns:us-west-2:123456789012:my-topic-name', - message: 'Hello, world!' + message: 'Hello, world!', + message_attributes: { + 'String' => { + data_type: 'String', # required + string_value: 'String', + binary_value: 'data', + }, + }, } ) end @@ -435,6 +481,62 @@ end end + describe '#publish_batch' do + subject!(:publish_batch) do + client.publish_batch( + topic_arn: 'arn:aws:sns:us-west-2:123456789012:my-topic-name', + publish_batch_request_entries: [ + { id: 'id1', message: 'body1' }, + { id: 'id2', message: 'body2' }, + ] + ) + end + + let(:configuration_options) { super().merge(propagation: true) } + + let(:responses) do + { + publish_batch: { + successful: [{ id: 'id1' }], + failed: [{ id: 'id2', code: 'error_code', sender_fault: true }] + } + } + end + + it_behaves_like 'schema version span' + it_behaves_like 'environment service name', 'DD_TRACE_AWS_SERVICE_NAME' + it_behaves_like 'configured peer service span', 'DD_TRACE_AWS_PEER_SERVICE' + it_behaves_like 'a peer service span' do + let(:peer_service_val) { 'my-topic-name' } + let(:peer_service_source) { 'topicname' } + end + + it 'generates a span' do + expect(span.name).to eq('aws.command') + expect(span.service).to eq('aws') + expect(span.type).to eq('http') + expect(span.resource).to eq('sns.publish_batch') + + expect(span.get_tag('aws.agent')).to eq('aws-sdk-ruby') + expect(span.get_tag('aws.operation')).to eq('publish_batch') + expect(span.get_tag('region')).to eq('us-stubbed-1') + expect(span.get_tag('aws_service')).to eq('sns') + expect(span.get_tag('aws_account')).to eq('123456789012') + expect(span.get_tag('topicname')).to eq('my-topic-name') + expect(span.get_tag('path')).to eq('') + expect(span.get_tag('host')).to eq('sns.us-stubbed-1.amazonaws.com') + expect(span.get_tag('http.method')).to eq('POST') + expect(span.get_tag('http.status_code')).to eq('200') + expect(span.get_tag('span.kind')).to eq('client') + + expect(span.get_tag(Datadog::Tracing::Metadata::Ext::TAG_COMPONENT)).to eq('aws') + expect(span.get_tag(Datadog::Tracing::Metadata::Ext::TAG_OPERATION)) + .to eq('command') + expect(span.get_tag(Datadog::Tracing::Metadata::Ext::TAG_PEER_HOSTNAME)) + .to eq('sns.us-stubbed-1.amazonaws.com') + end + end + describe '#create_topic' do subject!(:create_topic) do client.create_topic( diff --git a/spec/datadog/tracing/contrib/aws/service/shared_examples.rb b/spec/datadog/tracing/contrib/aws/service/shared_examples.rb new file mode 100644 index 00000000000..76d1194ddf0 --- /dev/null +++ b/spec/datadog/tracing/contrib/aws/service/shared_examples.rb @@ -0,0 +1,169 @@ +require 'datadog/tracing/contrib/aws/parsed_context' + +require 'aws-sdk-sqs' + +RSpec.shared_examples 'injects AWS attribute propagation' do + subject(:inject_propagation) { service.process(config, trace, context) } + + let(:config) { { propagation: true } } + let(:trace) { Datadog::Tracing::TraceOperation.new(id: trace_id, parent_span_id: span_id) } + let(:context) { instance_double(Datadog::Tracing::Contrib::Aws::ParsedContext, params: params, operation: operation) } + let(:params) { {} } + let(:trace_id) { 1 } + let(:span_id) { 2 } + + before { Datadog.configure { |c| c.tracing.instrument :aws } } + + context 'without preexisting message attributes' do + it 'adds a propagation attribute' do + inject_propagation + expect(params[:message_attributes]).to eq( + '_datadog' => { + binary_value: + '{"x-datadog-trace-id":"1","x-datadog-parent-id":"2",' \ + '"traceparent":"00-00000000000000000000000000000001-0000000000000002-00",' \ + '"tracestate":"dd=p:0000000000000002"}', + data_type: data_type + } + ) + end + end + + context 'with existing message attributes' do + let(:params) { { message_attributes: message_attributes } } + let(:message_attributes) { { 'existing' => { data_type: 'Number', string_value: 1 } } } + + it 'adds a propagation attribute' do + expect { inject_propagation }.to change { message_attributes.keys }.from(['existing']).to(['existing', '_datadog']) + end + end + + context 'with 10 message attributes already set' do + let(:params) { { message_attributes: message_attributes } } + let(:message_attributes) do + Array.new(10) do |i| + ["attr#{i}", { data_type: 'Number', string_value: i }] + end.to_h + end + + it 'does not add a propagation attribute' do + expect { inject_propagation }.to_not(change { params }) + end + end + + context 'disabled' do + let(:config) { { propagation: false } } + + it 'does not add a propagation attribute' do + expect { inject_propagation }.to_not(change { params }) + end + end +end + +RSpec.shared_examples 'extract AWS attribute propagation' do + subject(:extract_propagation) { service.before_span(config, context, response) } + + let(:config) { { propagation: true, parentage_style: parentage_style } } + let(:parentage_style) { 'distributed' } + let(:trace) do + Datadog::Tracing::TraceOperation.new( + id: 1, + parent_span_id: 2, + remote_parent: true, + trace_state: 'unrelated=state', + sampling_priority: 0 + ) + end + let(:context) { instance_double(Datadog::Tracing::Contrib::Aws::ParsedContext, operation: operation) } + let(:response) do + result = Aws::SQS::Types::ReceiveMessageResult.new(messages: messages) + Seahorse::Client::Response.new(data: result) + end + let(:messages) { [] } + + before { Datadog.configure { |c| c.tracing.instrument :aws } } + + context 'without message attributes' do + context 'without an active trace' do + it 'does not create trace' do + extract_propagation + expect(Datadog::Tracing.active_trace).to be_nil + end + end + + context 'with an active trace' do + before { Datadog::Tracing.continue_trace!(trace.to_digest) } + + it 'does not change active trace' do + extract_propagation + expect(Datadog::Tracing.active_trace.to_digest).to eq(trace.to_digest) + end + end + end + + context 'with message attributes' do + let(:messages) { [message] } + let(:message) { Aws::SQS::Types::Message.new(message_attributes: message_attributes) } + let(:message_attributes) { { '_datadog' => attribute } } + let(:attribute) do + Aws::SQS::Types::MessageAttributeValue.new( + string_value: + '{"traceparent":"00-00000000000000000000000000000001-0000000000000002-00",' \ + '"tracestate":"dd=p:0000000000000002,unrelated=state"}', + data_type: data_type + ) + end + + context 'without an active trace' do + it 'creates trace' do + extract_propagation + expect(Datadog::Tracing.active_trace.to_digest).to eq(trace.to_digest) + end + end + + context 'with an active trace' do + it 'overrides the existing trace' do + existing_trace = Datadog::Tracing.continue_trace!(nil) + expect { extract_propagation }.to( + change { Datadog::Tracing.active_trace.to_digest }.from(existing_trace.to_digest).to(trace.to_digest) + ) + end + end + + context 'with a local parentage style' do + let(:parentage_style) { 'local' } + + it 'does not create a remote trace' do + extract_propagation + expect(Datadog::Tracing.active_trace).to be_nil + end + end + + context 'with multiple messages' do + let(:messages) { [message, other_message] } + let(:other_message) { Aws::SQS::Types::Message.new(message_attributes: other_message_attributes) } + let(:other_message_attributes) { { '_datadog' => other_attribute } } + let(:other_attribute) do + Aws::SQS::Types::MessageAttributeValue.new( + string_value: + '{"traceparent":"00-00000000000000000000000000000008-0000000000000009-00",' \ + '"tracestate":"dd=p:0000000000000009,oops=not-this-one"}', + data_type: data_type + ) + end + + it 'extracts the first message attributes' do + extract_propagation + expect(Datadog::Tracing.active_trace.to_digest).to eq(trace.to_digest) + end + end + end + + context 'disabled' do + let(:config) { { propagation: false } } + + it 'does not add a propagation attribute' do + expect { extract_propagation }.to_not(change { params }) + end + end +end diff --git a/spec/datadog/tracing/contrib/aws/service/sns_spec.rb b/spec/datadog/tracing/contrib/aws/service/sns_spec.rb index 8b3737b43e6..c1a3b454429 100644 --- a/spec/datadog/tracing/contrib/aws/service/sns_spec.rb +++ b/spec/datadog/tracing/contrib/aws/service/sns_spec.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require 'datadog/tracing/contrib/aws/service/sns' +require_relative 'shared_examples' RSpec.describe Datadog::Tracing::Contrib::Aws::Service::SNS do let(:span) { instance_double('Span') } @@ -37,4 +38,10 @@ expect(span).to have_received(:set_tag).with(Datadog::Tracing::Contrib::Aws::Ext::TAG_TOPIC_NAME, nil) end end + + it_behaves_like 'injects AWS attribute propagation' do + let(:service) { sns } + let(:operation) { :publish } + let(:data_type) { 'Binary' } + end end diff --git a/spec/datadog/tracing/contrib/aws/service/sqs_spec.rb b/spec/datadog/tracing/contrib/aws/service/sqs_spec.rb index fe8dac8300b..895ffd1ec3c 100644 --- a/spec/datadog/tracing/contrib/aws/service/sqs_spec.rb +++ b/spec/datadog/tracing/contrib/aws/service/sqs_spec.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require 'datadog/tracing/contrib/aws/service/sqs' +require_relative 'shared_examples' RSpec.describe Datadog::Tracing::Contrib::Aws::Service::SQS do let(:span) { instance_double('Span') } @@ -31,4 +32,16 @@ expect(span).to have_received(:set_tag).with(Datadog::Tracing::Contrib::Aws::Ext::TAG_QUEUE_NAME, 'AnotherQueueName') end end + + it_behaves_like 'injects AWS attribute propagation' do + let(:service) { sqs } + let(:operation) { :send_message } + let(:data_type) { 'String' } + end + + it_behaves_like 'extract AWS attribute propagation' do + let(:service) { sqs } + let(:operation) { :receive_message } + let(:data_type) { 'String' } + end end diff --git a/spec/datadog/tracing/distributed_spec.rb b/spec/datadog/tracing/distributed_spec.rb new file mode 100644 index 00000000000..260e9b5937c --- /dev/null +++ b/spec/datadog/tracing/distributed_spec.rb @@ -0,0 +1,33 @@ +require 'datadog/tracing/distributed' + +RSpec.describe Datadog::Tracing::Distributed do + context 'integration test' do + before { Datadog.configure {} } + + describe '#inject' do + subject(:inject) { described_class.inject(digest, data) } + let(:trace_id) { Datadog::Tracing::Utils::TraceId.next_id } + let(:span_id) { Datadog::Tracing::Utils.next_id } + let(:digest) do + Datadog::Tracing::TraceDigest.new(trace_id: trace_id, span_id: span_id) + end + let(:data) { {} } + + it 'injects distributed headers' do + inject + expect(data).to include('x-datadog-trace-id') + expect(data).to include('x-datadog-parent-id') + end + end + + describe '#extract' do + subject(:extract) { described_class.extract(data) } + + let(:data) { { 'x-datadog-trace-id' => '1', 'x-datadog-parent-id' => '2' } } + + it 'extracts distributed headers' do + is_expected.to be_a_kind_of(Datadog::Tracing::TraceDigest) + end + end + end +end diff --git a/spec/datadog/tracing/trace_digest_spec.rb b/spec/datadog/tracing/trace_digest_spec.rb index 3303b71378a..d919511e863 100644 --- a/spec/datadog/tracing/trace_digest_spec.rb +++ b/spec/datadog/tracing/trace_digest_spec.rb @@ -191,4 +191,55 @@ it { is_expected.to be_frozen } end + + describe '#==' do + let(:other) { described_class.new(**other_options) } + let(:other_options) { { span_id: 123 } } + + context 'when equal' do + let(:other_options) { options } + + it { is_expected.to eq(other) } + end + + context 'when not equal' do + let(:other_options) { { span_name: 'new span' } } + + it { is_expected.not_to eq(other) } + end + end + + describe '#eql?' do + let(:other) { described_class.new(**other_options) } + let(:other_options) { { span_id: 123 } } + + context 'when equal' do + let(:other_options) { options } + + it { is_expected.to eql(other) } + end + + context 'when not equal' do + let(:other_options) { { span_name: 'new span' } } + + it { is_expected.not_to eql(other) } + end + end + + describe '#hash' do + let(:other) { described_class.new(**other_options) } + let(:other_options) { { span_id: 123 } } + + context 'when equal' do + let(:other_options) { options } + + it { expect(trace_digest.hash).to eq(other.hash) } + end + + context 'when not equal' do + let(:other_options) { { span_name: 'new span' } } + + it { expect(trace_digest.hash).not_to eq(other.hash) } + end + end end