diff --git a/AWS.Messaging.sln b/AWS.Messaging.sln index 2ffc190..a51efaa 100644 --- a/AWS.Messaging.sln +++ b/AWS.Messaging.sln @@ -31,6 +31,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AWS.Messaging.Lambda", "src EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LambdaMessaging", "sampleapps\LambdaMessaging\LambdaMessaging.csproj", "{F74A4CF0-D814-426E-8149-46758E86AFE3}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AWS.Messaging.Telemetry.OpenTelemetry", "src\AWS.Messaging.Telemetry.OpenTelemetry\AWS.Messaging.Telemetry.OpenTelemetry.csproj", "{C529DC6E-72DA-49ED-908A-21DBC40F26C0}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -69,6 +71,10 @@ Global {F74A4CF0-D814-426E-8149-46758E86AFE3}.Debug|Any CPU.Build.0 = Debug|Any CPU {F74A4CF0-D814-426E-8149-46758E86AFE3}.Release|Any CPU.ActiveCfg = Release|Any CPU {F74A4CF0-D814-426E-8149-46758E86AFE3}.Release|Any CPU.Build.0 = Release|Any CPU + {C529DC6E-72DA-49ED-908A-21DBC40F26C0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C529DC6E-72DA-49ED-908A-21DBC40F26C0}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C529DC6E-72DA-49ED-908A-21DBC40F26C0}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C529DC6E-72DA-49ED-908A-21DBC40F26C0}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -82,6 +88,7 @@ Global {A174942B-AF9C-4935-AD7B-AF651BACE63C} = {80DB2C77-6ADD-4A60-B27D-763BDF9659D3} {24FA3671-8C2B-4B64-865C-68FB6237E34D} = {2D0A561B-0B97-4259-8603-3AF5437BB652} {F74A4CF0-D814-426E-8149-46758E86AFE3} = {1AA8985B-897C-4BD5-9735-FD8B33FEBFFB} + {C529DC6E-72DA-49ED-908A-21DBC40F26C0} = {2D0A561B-0B97-4259-8603-3AF5437BB652} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {7B2B759D-6455-4089-8173-3F1619567B36} diff --git a/sampleapps/PublisherAPI/Program.cs b/sampleapps/PublisherAPI/Program.cs index 554193c..a6b5b57 100644 --- a/sampleapps/PublisherAPI/Program.cs +++ b/sampleapps/PublisherAPI/Program.cs @@ -2,6 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 using System.Text.Json; +using AWS.Messaging.Telemetry.OpenTelemetry; +using OpenTelemetry.Resources; +using OpenTelemetry.Trace; using PublisherAPI.Models; var builder = WebApplication.CreateBuilder(args); @@ -33,6 +36,11 @@ // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen(); +builder.Services.AddOpenTelemetry() + .ConfigureResource(resource => resource.AddService("PublisherAPI")) + .WithTracing(tracing => tracing + .AddAWSMessagingInstrumentation() + .AddConsoleExporter()); var app = builder.Build(); diff --git a/sampleapps/PublisherAPI/PublisherAPI.csproj b/sampleapps/PublisherAPI/PublisherAPI.csproj index e930d3e..439008d 100644 --- a/sampleapps/PublisherAPI/PublisherAPI.csproj +++ b/sampleapps/PublisherAPI/PublisherAPI.csproj @@ -7,10 +7,13 @@ + + + diff --git a/sampleapps/SubscriberService/Program.cs b/sampleapps/SubscriberService/Program.cs index dcfdc32..d5d456d 100644 --- a/sampleapps/SubscriberService/Program.cs +++ b/sampleapps/SubscriberService/Program.cs @@ -2,11 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 using System.Text.Json; +using AWS.Messaging.Telemetry.OpenTelemetry; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; +using OpenTelemetry.Resources; +using OpenTelemetry.Trace; using SubscriberService.MessageHandlers; using SubscriberService.Models; @@ -37,7 +39,12 @@ await Host.CreateDefaultBuilder(args) PropertyNamingPolicy = JsonNamingPolicy.CamelCase, }; }); - }); + }) + .AddOpenTelemetry() + .ConfigureResource(resource => resource.AddService("SubscriberService")) + .WithTracing(tracing => tracing + .AddAWSMessagingInstrumentation() + .AddConsoleExporter()); }) .Build() .RunAsync(); diff --git a/sampleapps/SubscriberService/SubscriberService.csproj b/sampleapps/SubscriberService/SubscriberService.csproj index 1e522ad..7215a0c 100644 --- a/sampleapps/SubscriberService/SubscriberService.csproj +++ b/sampleapps/SubscriberService/SubscriberService.csproj @@ -20,9 +20,12 @@ + + + diff --git a/src/AWS.Messaging.Telemetry.OpenTelemetry/AWS.Messaging.Telemetry.OpenTelemetry.csproj b/src/AWS.Messaging.Telemetry.OpenTelemetry/AWS.Messaging.Telemetry.OpenTelemetry.csproj new file mode 100644 index 0000000..001c2fd --- /dev/null +++ b/src/AWS.Messaging.Telemetry.OpenTelemetry/AWS.Messaging.Telemetry.OpenTelemetry.csproj @@ -0,0 +1,35 @@ + + + + net6.0 + enable + enable + true + AWS Message Processing Framework Instrumention for OpenTelemetry + https://github.com/awslabs/aws-dotnet-messaging + true + Major + README.md + CA1727 + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/AWS.Messaging.Telemetry.OpenTelemetry/Constants.cs b/src/AWS.Messaging.Telemetry.OpenTelemetry/Constants.cs new file mode 100644 index 0000000..0f1a587 --- /dev/null +++ b/src/AWS.Messaging.Telemetry.OpenTelemetry/Constants.cs @@ -0,0 +1,15 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +namespace AWS.Messaging.Telemetry.OpenTelemetry; + +/// +/// Constants related to the OpenTelemetry instrumentation for AWS.Messaging +/// +public class Constants +{ + /// + /// OpenTelemetry activity source name + /// + public const string SourceName = "AWS.Messaging"; +} diff --git a/src/AWS.Messaging.Telemetry.OpenTelemetry/OpenTelemetryProvider.cs b/src/AWS.Messaging.Telemetry.OpenTelemetry/OpenTelemetryProvider.cs new file mode 100644 index 0000000..ead3ed2 --- /dev/null +++ b/src/AWS.Messaging.Telemetry.OpenTelemetry/OpenTelemetryProvider.cs @@ -0,0 +1,83 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using OpenTelemetry; +using OpenTelemetry.Context.Propagation; + +namespace AWS.Messaging.Telemetry.OpenTelemetry; + +/// +/// Creates OpenTelemetry traces +/// +public class OpenTelemetryProvider : ITelemetryProvider +{ + private static readonly ActivitySource _activitySource = new ActivitySource(Constants.SourceName, TelemetryKeys.AWSMessagingAssemblyVersion); + + /// + public ITelemetryTrace Trace(string traceName) + { + var activity = _activitySource.StartActivity(traceName, ActivityKind.Producer); + if (activity != null) + { + return new OpenTelemetryTrace(activity); + } + + // If we initially failed to create an activity, attempt to force creation with + // a link to the current activity, see https://opentelemetry.io/docs/instrumentation/net/manual/#creating-new-root-activities + var parentActivity = Activity.Current; + Activity.Current = null; + ActivityLink[]? links = null; + if (parentActivity != null) + { + links = new[] { new ActivityLink(parentActivity.Context) }; + } + + activity = _activitySource.StartActivity(traceName, ActivityKind.Producer, parentContext: default, links: links); + + return new OpenTelemetryTrace(activity, parentActivity); + } + + /// + public ITelemetryTrace Trace(string traceName, MessageEnvelope envelope) + { + var propogatedContext = Propagators.DefaultTextMapPropagator.Extract(default, envelope, ExtractTraceContextFromEnvelope); + Baggage.Current = propogatedContext.Baggage; + + var activity = _activitySource.StartActivity(traceName, ActivityKind.Consumer, parentContext: propogatedContext.ActivityContext); + if (activity != null) + { + return new OpenTelemetryTrace(activity); + } + + // If we initially failed to create an activity, attempt to force creation with + // a link to the current activity, see https://opentelemetry.io/docs/instrumentation/net/manual/#creating-new-root-activities + var parentActivity = Activity.Current; + Activity.Current = null; + ActivityLink[]? links = null; + if (parentActivity != null) + { + links = new[] { new ActivityLink(parentActivity.Context) }; + } + + activity = _activitySource.StartActivity(traceName, ActivityKind.Consumer, parentContext: propogatedContext.ActivityContext, links: links); + + return new OpenTelemetryTrace(activity, parentActivity); + } + + /// + /// Extracts propagation context from a , meant to be used with + /// + /// Inbound message envelope + /// Context key + /// Context value + private IEnumerable ExtractTraceContextFromEnvelope(MessageEnvelope envelope, string key) + { + if (envelope.Metadata.TryGetValue(key, out var jsonElement)) + { + return new string[] { jsonElement.ToString() }; + } + + return Enumerable.Empty(); + } +} diff --git a/src/AWS.Messaging.Telemetry.OpenTelemetry/OpenTelemetryTrace.cs b/src/AWS.Messaging.Telemetry.OpenTelemetry/OpenTelemetryTrace.cs new file mode 100644 index 0000000..dd38f79 --- /dev/null +++ b/src/AWS.Messaging.Telemetry.OpenTelemetry/OpenTelemetryTrace.cs @@ -0,0 +1,105 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using System.Text.Json; +using OpenTelemetry; +using OpenTelemetry.Context.Propagation; +using OpenTelemetry.Trace; + +namespace AWS.Messaging.Telemetry.OpenTelemetry; + +/// +/// An OpenTelemetry trace (wrapper around a ) +/// +public class OpenTelemetryTrace : ITelemetryTrace +{ + private readonly Activity? _activity; + private readonly Activity? _parentToRestore; + + /// + /// Creates a new trace + /// + /// New trace + /// Optional parent activity that will be set as when this trace is disposed + public OpenTelemetryTrace(Activity? activity, Activity? parentToRestore = null) + { + _activity = activity; + _parentToRestore = parentToRestore; + } + + /// + public void AddException(Exception exception, bool fatal = true) + { + _activity?.RecordException(exception); + + if (fatal) + { + _activity?.SetStatus(ActivityStatusCode.Error, exception.Message); + } + } + + /// + public void AddMetadata(string key, object value) + { + if (_activity != null && _activity.IsAllDataRequested) + { + _activity.SetTag(key, value); + } + } + + /// + public void RecordTelemetryContext(MessageEnvelope envelope) + { + ActivityContext contextToInject = default; + if (_activity != null) + { + contextToInject = _activity.Context; + } + // Even if an "AWS.Messaging" activity was not created, we still + // propogate the current activity (if it exists) through the message envelope + else if (Activity.Current != null) + { + contextToInject = Activity.Current.Context; + } + + Propagators.DefaultTextMapPropagator.Inject(new PropagationContext(contextToInject, Baggage.Current), envelope, InjectTraceContextIntoEnvelope); + } + + /// + /// Stores propagation context in the , meant to be used with + /// + /// Outbound message envelope + /// Context key + /// Context value + private void InjectTraceContextIntoEnvelope(MessageEnvelope envelope, string key, string value) + { + envelope.Metadata[key] = JsonSerializer.SerializeToElement(value); + } + + private bool _disposed; + + /// + /// Disposes the inner , and also restores the parent activity if set + /// + /// Indicates whether the call comes from Dispose (true) or a finalizer (false) + protected virtual void Dispose(bool disposing) + { + if (!_disposed) + { + _activity?.Dispose(); + if (_parentToRestore != null) + { + Activity.Current = _parentToRestore; + } + _disposed = true; + } + } + + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } +} diff --git a/src/AWS.Messaging.Telemetry.OpenTelemetry/README.md b/src/AWS.Messaging.Telemetry.OpenTelemetry/README.md new file mode 100644 index 0000000..879d3c6 --- /dev/null +++ b/src/AWS.Messaging.Telemetry.OpenTelemetry/README.md @@ -0,0 +1,55 @@ +# OpenTelemetry plugin for AWS Message Processing Framework for .NET + +**Notice:** *This library is still in early active development and is not ready for use beyond experimentation.* + +This package is an [Instrumentation +Library](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/glossary.md#instrumentation-library), which instruments the [AWS Message Processing Framework for .NET](https://github.com/awslabs/aws-dotnet-messaging) to collect traces about +messages that are sent and received. + +## Configuration + +### 1. Install Packages +Add a reference to [`AWS.Messaging.Telemetry.OpenTelemetry`](https://www.nuget.org/packages/AWS.Messaging.Telemetry.OpenTelemetry) and [`OpenTelemetry.Extensions.Hosting`](https://www.nuget.org/packages/OpenTelemetry.Extensions.Hosting). + +In this example, we're going to configure OpenTelemetry on our `IServiceCollection`, so also add a reference to [`OpenTelemetry.Extensions.Hosting`](https://www.nuget.org/packages/OpenTelemetry.Extensions.Hosting). This is not required if starting and stopping tracing via `CreateTracerProviderBuilder`. + +You may also add a reference to one or more [exporters](https://opentelemetry.io/docs/instrumentation/net/exporters/) to visualize your telemetry data. + +```shell +dotnet add package --prerelease AWS.Messaging.Telemetry.OpenTelemetry +dotnet add package OpenTelemetry.Extensions.Hosting +``` + +### 2. Enable Instrumentation +In the `Startup` class add a call to `AddOpenTelemetry` to configure OpenTelemetry. On the `TracerProviderBuilder`, call `AddAWSMessagingInstrumentation` to begin capturing traces for the AWS Message Processing Framework for .NET. + +```csharp +public class Startup +{ + public void ConfigureServices(IServiceCollection services) + { + services.AddAWSMessageBus(builder => + { + builder.AddSQSPoller("https://sqs.us-west-2.amazonaws.com/012345678910/MPF"); + builder.AddMessageHandler("chatMessage"); + }); + + services.AddOpenTelemetry() + .ConfigureResource(resource => resource.AddService("myApplication")) + .WithTracing(tracing => tracing + .AddAWSMessagingInstrumentation() + .AddConsoleExporter()); + } +} +``` + +# Useful Links +* [AWS Message Processing Framework for .NET Design Document](../../docs/design/message-processing-framework-design.md) + +# Security + +See [CONTRIBUTING](CONTRIBUTING.md#security-issue-notifications) for more information. + +# License + +This project is licensed under the Apache-2.0 License. diff --git a/src/AWS.Messaging.Telemetry.OpenTelemetry/TracerProviderBuilderExtensions.cs b/src/AWS.Messaging.Telemetry.OpenTelemetry/TracerProviderBuilderExtensions.cs new file mode 100644 index 0000000..40f61e2 --- /dev/null +++ b/src/AWS.Messaging.Telemetry.OpenTelemetry/TracerProviderBuilderExtensions.cs @@ -0,0 +1,30 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +using Microsoft.Extensions.DependencyInjection; +using OpenTelemetry.Trace; + +namespace AWS.Messaging.Telemetry.OpenTelemetry; + +/// +/// Extensions for a to enable instrumentation for AWS Messaging +/// +public static class TracerProviderBuilderExtensions +{ + /// + /// Enables AWS Messaging Instrumentation for OpenTelemetry + /// + /// being configured. + /// The instance of to chain the calls. + public static TracerProviderBuilder AddAWSMessagingInstrumentation(this TracerProviderBuilder builder) + { + builder.ConfigureServices(services => + { + services.AddSingleton(); + }); + + builder.AddSource(Constants.SourceName); + + return builder; + } +} diff --git a/src/AWS.Messaging/Configuration/AWSClientProvider.cs b/src/AWS.Messaging/Configuration/AWSClientProvider.cs index 4655591..e83755c 100644 --- a/src/AWS.Messaging/Configuration/AWSClientProvider.cs +++ b/src/AWS.Messaging/Configuration/AWSClientProvider.cs @@ -3,6 +3,7 @@ using System.Reflection; using Amazon.Runtime; +using AWS.Messaging.Telemetry; namespace AWS.Messaging.Configuration; @@ -12,8 +13,7 @@ namespace AWS.Messaging.Configuration; internal class AWSClientProvider : IAWSClientProvider { private const string _userAgentHeader = "User-Agent"; - private static readonly string _assemblyVersion = Assembly.GetExecutingAssembly().GetName().Version?.ToString() ?? string.Empty; - private static readonly string _userAgentString = $"lib/aws-dotnet-messaging_{_assemblyVersion}"; + private static readonly string _userAgentString = $"lib/aws-dotnet-messaging_{TelemetryKeys.AWSMessagingAssemblyVersion}"; private readonly IServiceProvider _serviceProvider; diff --git a/src/AWS.Messaging/MessageEnvelope.cs b/src/AWS.Messaging/MessageEnvelope.cs index 794bb78..557acc7 100644 --- a/src/AWS.Messaging/MessageEnvelope.cs +++ b/src/AWS.Messaging/MessageEnvelope.cs @@ -1,6 +1,7 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +using System.Text.Json; using System.Text.Json.Serialization; namespace AWS.Messaging; @@ -45,9 +46,11 @@ public abstract class MessageEnvelope /// /// This stores different metadata that is not modeled as a top-level property in MessageEnvelope class. + /// These entries will also be serialized as top-level properties when sending the message, which + /// can be used for CloudEvents Extension Attributes. /// [JsonExtensionData] - public Dictionary Metadata { get; set; } = new Dictionary(); + public Dictionary Metadata { get; set; } = new Dictionary(); /// /// Stores metadata related to Amazon SQS. diff --git a/src/AWS.Messaging/Serialization/EnvelopeSerializer.cs b/src/AWS.Messaging/Serialization/EnvelopeSerializer.cs index 49f26b8..d5e7781 100644 --- a/src/AWS.Messaging/Serialization/EnvelopeSerializer.cs +++ b/src/AWS.Messaging/Serialization/EnvelopeSerializer.cs @@ -95,6 +95,17 @@ public async ValueTask SerializeAsync(MessageEnvelope envelope) ["data"] = _messageSerializer.Serialize(message) }; + // Write any Metadata as top-level keys + // This may be useful for any extensions defined in + // https://github.com/cloudevents/spec/tree/main/cloudevents/extensions + foreach (var key in envelope.Metadata.Keys) + { + if (!blob.ContainsKey(key)) // don't overwrite any reserved keys + { + blob[key] = JsonSerializer.SerializeToNode(envelope.Metadata[key]); + } + } + var jsonString = blob.ToJsonString(); var serializedMessage = await InvokePostSerializationCallback(jsonString); diff --git a/src/AWS.Messaging/Services/HandlerInvoker.cs b/src/AWS.Messaging/Services/HandlerInvoker.cs index 574c03c..b7667d5 100644 --- a/src/AWS.Messaging/Services/HandlerInvoker.cs +++ b/src/AWS.Messaging/Services/HandlerInvoker.cs @@ -49,6 +49,10 @@ public async Task InvokeAsync(MessageEnvelope messageEnvel trace.AddMetadata(TelemetryKeys.MessageId, messageEnvelope.Id); trace.AddMetadata(TelemetryKeys.MessageType, messageEnvelope.MessageTypeIdentifier); trace.AddMetadata(TelemetryKeys.HandlerType, subscriberMapping.HandlerType.FullName!); + if (!string.IsNullOrEmpty(messageEnvelope.SQSMetadata?.MessageID)) + { + trace.AddMetadata(TelemetryKeys.SqsMessageId, messageEnvelope.SQSMetadata.MessageID); + } using (var scope = _serviceProvider.CreateScope()) { diff --git a/src/AWS.Messaging/Telemetry/TelemetryKeys.cs b/src/AWS.Messaging/Telemetry/TelemetryKeys.cs index 2234c22..315c49a 100644 --- a/src/AWS.Messaging/Telemetry/TelemetryKeys.cs +++ b/src/AWS.Messaging/Telemetry/TelemetryKeys.cs @@ -1,11 +1,22 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 +using System.Reflection; + namespace AWS.Messaging.Telemetry; -internal static class TelemetryKeys +/// +/// Constants related to telemetry +/// +public static class TelemetryKeys { + /// + /// Current version of the AWS.Messaging package + /// + public static string AWSMessagingAssemblyVersion = Assembly.GetExecutingAssembly().GetName().Version?.ToString() ?? string.Empty; + internal const string QueueUrl = "aws.messaging.sqs.queueurl"; + internal const string SqsMessageId = "aws.messaging.sqs.messageId"; internal const string TopicUrl = "aws.messaging.sns.topicUrl"; internal const string EventBusName = "aws.messaging.eventBridge.eventBusName"; internal const string ObjectType = "aws.messaging.objectType"; diff --git a/test/AWS.Messaging.UnitTests/AWS.Messaging.UnitTests.csproj b/test/AWS.Messaging.UnitTests/AWS.Messaging.UnitTests.csproj index e011800..7a17fc0 100644 --- a/test/AWS.Messaging.UnitTests/AWS.Messaging.UnitTests.csproj +++ b/test/AWS.Messaging.UnitTests/AWS.Messaging.UnitTests.csproj @@ -7,6 +7,7 @@ + @@ -20,6 +21,7 @@ + runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/test/AWS.Messaging.UnitTests/OpenTelemetryTests.cs b/test/AWS.Messaging.UnitTests/OpenTelemetryTests.cs new file mode 100644 index 0000000..075ea18 --- /dev/null +++ b/test/AWS.Messaging.UnitTests/OpenTelemetryTests.cs @@ -0,0 +1,213 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text.Json; +using System.Threading.Tasks; +using Amazon.SQS; +using AWS.Messaging.Configuration; +using AWS.Messaging.Publishers; +using AWS.Messaging.Serialization; +using AWS.Messaging.Services; +using AWS.Messaging.Telemetry; +using AWS.Messaging.Telemetry.OpenTelemetry; +using AWS.Messaging.UnitTests.MessageHandlers; +using AWS.Messaging.UnitTests.Models; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Moq; +using OpenTelemetry; +using OpenTelemetry.Resources; +using OpenTelemetry.Trace; +using Xunit; + +namespace AWS.Messaging.UnitTests; + +public class OpenTelemetryTests +{ + private readonly ServiceProvider _serviceProvider; + private readonly MessageRoutingPublisher _publisher; + private readonly HandlerInvoker _handler; + private readonly SubscriberMapping _subscriberMapping; + + /// + /// Initializes all the services needed to publish and handle + /// messages without actually using SQS + /// + public OpenTelemetryTests() + { + var envelopeSerializer = new Mock(); + var messageConfiguration = new Mock(); + var logger = new Mock>(); + var publisherMapping = new PublisherMapping(typeof(ChatMessage), new SQSPublisherConfiguration("endpoint"), PublisherTargetType.SQS_PUBLISHER); + _subscriberMapping = new SubscriberMapping(typeof(ChatMessageHandler), typeof(ChatMessage)); + + envelopeSerializer.SetReturnsDefault(ValueTask.FromResult(new MessageEnvelope() + { + Id = "1234", + Source = new Uri("/aws/messaging/unittest", UriKind.Relative) + })); + + messageConfiguration.Setup(x => x.GetPublisherMapping(typeof(ChatMessage))).Returns(publisherMapping); + messageConfiguration.Setup(x => x.GetSubscriberMapping(typeof(ChatMessage))).Returns(_subscriberMapping); + + var services = new ServiceCollection(); + services.AddSingleton(new Mock().Object); + services.AddSingleton(logger.Object); + services.AddSingleton(messageConfiguration.Object); + services.AddSingleton(envelopeSerializer.Object); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + + _serviceProvider = services.BuildServiceProvider(); + + _publisher = new MessageRoutingPublisher( + _serviceProvider, + messageConfiguration.Object, + logger.Object, + new DefaultTelemetryFactory(_serviceProvider)); + + _handler = new HandlerInvoker( + _serviceProvider, + new NullLogger(), + new DefaultTelemetryFactory(_serviceProvider)); + } + + /// + /// Verifies that the expected traces and tags are created when publishing a message + /// + [Fact] + public async Task OpenTelemetry_Publisher_ExpectedTracesAndTags() + { + var activities = new List(); + + using (var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource(Constants.SourceName) + .ConfigureResource(resource => resource.AddService("unittest")) + .AddInMemoryExporter(activities).Build()) + { + + await _publisher.PublishAsync(new ChatMessage { MessageDescription = "Test Description" }); + } + + Assert.Equal(2, activities.Count); + Assert.Equal("AWS.Messaging: Publish to AWS SQS", activities[0].OperationName); + Assert.Equal(4, activities[0].Tags.Count()); + Assert.Contains(new KeyValuePair("aws.messaging.objectType", "AWS.Messaging.UnitTests.Models.ChatMessage"), activities[0].Tags); + Assert.Contains(new KeyValuePair("aws.messaging.messageType", "AWS.Messaging.UnitTests.Models.ChatMessage"), activities[0].Tags); + Assert.Contains(new KeyValuePair("aws.messaging.sqs.queueurl", "endpoint"), activities[0].Tags); + Assert.Contains(new KeyValuePair("aws.messaging.messageId", "1234"), activities[0].Tags); + + Assert.Equal("AWS.Messaging: Routing message to AWS service", activities[1].OperationName); + Assert.Equal(2, activities[1].Tags.Count()); + Assert.Contains(new KeyValuePair("aws.messaging.objectType", "AWS.Messaging.UnitTests.Models.ChatMessage"), activities[1].Tags); + Assert.Contains(new KeyValuePair("aws.messaging.publishTargetType", "SQS"), activities[1].Tags); + } + + /// + /// Verifies that when we need to manipulate in order + /// to force creation of our activity, that we reset the original activity at the end. + /// + [Fact] + public async Task OpenTelemetry_Publisher_ResetsParentActivity() + { + var activities = new List(); + + // Start a non-MPF activity + var existingActivity = new Activity("current").Start(); + Activity.Current = existingActivity; + + using (var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource(Constants.SourceName) + .ConfigureResource(resource => resource.AddService("unittest")) + .AddInMemoryExporter(activities).Build()) + { + await _publisher.PublishAsync(new ChatMessage { MessageDescription = "Test Description" }); + } + + // We expect the top-level MPF activity to reset Activity.Current once disposed + Assert.Equal(existingActivity, Activity.Current); + existingActivity.Stop(); + } + + /// + /// Verifies that the expected traces and tags are created when handling a message + /// + [Fact] + public async Task OpenTelemetry_Handler_ExpectedTracesAndTags() + { + var activities = new List(); + var envelope = new MessageEnvelope() + { + MessageTypeIdentifier = "AWS.Messaging.UnitTests.Models.ChatMessage", + Id = "1234", + SQSMetadata = new SQSMetadata() + { + MessageID = "4567" + }, + Message = new ChatMessage { MessageDescription = "Test Description" } + }; + + using (var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource(Constants.SourceName) + .ConfigureResource(resource => resource.AddService("unittest")) + .AddInMemoryExporter(activities).Build()) + { + + await _handler.InvokeAsync(envelope, _subscriberMapping); + } + + Assert.Single(activities); + Assert.Equal("AWS.Messaging: Processing message", activities[0].OperationName); + Assert.Equal(4, activities[0].Tags.Count()); + Assert.Contains(new KeyValuePair("aws.messaging.messageId", "1234"), activities[0].Tags); + Assert.Contains(new KeyValuePair("aws.messaging.messageType", "AWS.Messaging.UnitTests.Models.ChatMessage"), activities[0].Tags); + Assert.Contains(new KeyValuePair("aws.messaging.handlerType", "AWS.Messaging.UnitTests.MessageHandlers.ChatMessageHandler"), activities[0].Tags); + Assert.Contains(new KeyValuePair("aws.messaging.sqs.messageId", "4567"), activities[0].Tags); + } + + /// + /// Verifies that the handler trace has the correct parent when included + /// in the message envelope + /// + [Fact] + public async Task OpenTelemetry_Handler_ParentFromEnvelope() + { + var activities = new List(); + var envelope = new MessageEnvelope() + { + MessageTypeIdentifier = "AWS.Messaging.UnitTests.Models.ChatMessage", + Id = "1234", + SQSMetadata = new SQSMetadata() + { + MessageID = "4567" + }, + Metadata = new Dictionary + { + { "traceparent", JsonDocument.Parse("\"00-d2d8865217873923d2d74cf680a30ac3-d63e320582f9ff94-01\"").RootElement } + }, + Message = new ChatMessage { MessageDescription = "Test Description" } + }; + + using (var tracerProvider = Sdk.CreateTracerProviderBuilder() + .AddSource(Constants.SourceName) + .ConfigureResource(resource => resource.AddService("unittest")) + .AddInMemoryExporter(activities).Build()) + { + + await _handler.InvokeAsync(envelope, _subscriberMapping); + } + + Assert.Single(activities); + Assert.Equal("AWS.Messaging: Processing message", activities[0].OperationName); + + // The MPF activity's parent should be the one specified in envelope.Metadata above + Assert.Equal("00-d2d8865217873923d2d74cf680a30ac3-d63e320582f9ff94-01", activities[0].ParentId); + } +} diff --git a/test/AWS.Messaging.UnitTests/SerializationTests/EnvelopeSerializerTests.cs b/test/AWS.Messaging.UnitTests/SerializationTests/EnvelopeSerializerTests.cs index 7a77fc1..83fd740 100644 --- a/test/AWS.Messaging.UnitTests/SerializationTests/EnvelopeSerializerTests.cs +++ b/test/AWS.Messaging.UnitTests/SerializationTests/EnvelopeSerializerTests.cs @@ -343,7 +343,7 @@ public async Task SerializationCallbacks_AreCorrectlyInvoked() var serializedMessage = await envelopeSerializer.SerializeAsync(messageEnvelope); // ASSERT - Check expected base 64 encoded string - var expectedserializedMessage = "eyJpZCI6IjEyMyIsInNvdXJjZSI6Ii9hd3MvbWVzc2FnaW5nIiwic3BlY3ZlcnNpb24iOiIxLjAiLCJ0eXBlIjoiYWRkcmVzc0luZm8iLCJ0aW1lIjoiMjAwMC0xMi0wNVQxMDozMDo1NSswMDowMCIsImRhdGEiOiJ7XHUwMDIyVW5pdFx1MDAyMjoxMjMsXHUwMDIyU3RyZWV0XHUwMDIyOlx1MDAyMlByaW5jZSBTdFx1MDAyMixcdTAwMjJaaXBDb2RlXHUwMDIyOlx1MDAyMjAwMDAxXHUwMDIyfSJ9"; + var expectedserializedMessage = "eyJpZCI6IjEyMyIsInNvdXJjZSI6Ii9hd3MvbWVzc2FnaW5nIiwic3BlY3ZlcnNpb24iOiIxLjAiLCJ0eXBlIjoiYWRkcmVzc0luZm8iLCJ0aW1lIjoiMjAwMC0xMi0wNVQxMDozMDo1NSswMDowMCIsImRhdGEiOiJ7XHUwMDIyVW5pdFx1MDAyMjoxMjMsXHUwMDIyU3RyZWV0XHUwMDIyOlx1MDAyMlByaW5jZSBTdFx1MDAyMixcdTAwMjJaaXBDb2RlXHUwMDIyOlx1MDAyMjAwMDAxXHUwMDIyfSIsIklzLURlbGl2ZXJlZCI6ZmFsc2V9"; Assert.Equal(expectedserializedMessage, serializedMessage); // ACT - Convert To Envelope from base 64 Encoded Message @@ -362,7 +362,7 @@ public async Task SerializationCallbacks_AreCorrectlyInvoked() Assert.Equal("1.0", envelope.Version); Assert.Equal("/aws/messaging", envelope.Source?.ToString()); Assert.Equal("addressInfo", envelope.MessageTypeIdentifier); - Assert.Equal(true, envelope.Metadata["Is-Delivered"]); + Assert.True(envelope.Metadata["Is-Delivered"].GetBoolean()); var subscribeMapping = conversionResult.Mapping; Assert.NotNull(subscribeMapping); @@ -376,7 +376,7 @@ public class MockSerializationCallback : ISerializationCallback { public ValueTask PreSerializationAsync(MessageEnvelope messageEnvelope) { - messageEnvelope.Metadata["Is-Delivered"] = false; + messageEnvelope.Metadata["Is-Delivered"] = JsonSerializer.SerializeToElement(false); return ValueTask.CompletedTask; } @@ -396,7 +396,7 @@ public ValueTask PreDeserializationAsync(string message) public ValueTask PostDeserializationAsync(MessageEnvelope messageEnvelope) { - messageEnvelope.Metadata["Is-Delivered"] = true; + messageEnvelope.Metadata["Is-Delivered"] = JsonSerializer.SerializeToElement(true); return ValueTask.CompletedTask; } }